aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-10-27 07:44:52 +0530
committerAmit Kapila <akapila@postgresql.org>2021-10-27 07:44:52 +0530
commit5a2832465fd8984d089e8c44c094e6900d987fcd (patch)
tree68a10449d24643fcdac2b5363accfa7af2196128 /src/backend/commands
parentf0b051e322d530a340e62f2ae16d99acdbcb3d05 (diff)
downloadpostgresql-5a2832465fd8984d089e8c44c094e6900d987fcd.tar.gz
postgresql-5a2832465fd8984d089e8c44c094e6900d987fcd.zip
Allow publishing the tables of schema.
A new option "FOR ALL TABLES IN SCHEMA" in Create/Alter Publication allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. The new syntax allows specifying both the tables and schemas. For example: CREATE PUBLICATION pub1 FOR TABLE t1,t2,t3, ALL TABLES IN SCHEMA s1,s2; OR ALTER PUBLICATION pub1 ADD TABLE t1,t2,t3, ALL TABLES IN SCHEMA s1,s2; A new system table "pg_publication_namespace" has been added, to maintain the schemas that the user wants to publish through the publication. Modified the output plugin (pgoutput) to publish the changes if the relation is part of schema publication. Updates pg_dump to identify and dump schema publications. Updates the \d family of commands to display schema publications and \dRp+ variant will now display associated schemas if any. Author: Vignesh C, Hou Zhijie, Amit Kapila Syntax-Suggested-by: Tom Lane, Alvaro Herrera Reviewed-by: Greg Nancarrow, Masahiko Sawada, Hou Zhijie, Amit Kapila, Haiying Tang, Ajin Cherian, Rahila Syed, Bharath Rupireddy, Mark Dilger Tested-by: Haiying Tang Discussion: https://www.postgresql.org/message-id/CALDaNm0OANxuJ6RXqwZsM1MSY4s19nuH3734j4a72etDwvBETQ@mail.gmail.com
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/alter.c1
-rw-r--r--src/backend/commands/event_trigger.c4
-rw-r--r--src/backend/commands/publicationcmds.c490
-rw-r--r--src/backend/commands/seclabel.c1
-rw-r--r--src/backend/commands/tablecmds.c28
5 files changed, 493 insertions, 31 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index c47d54e96bb..40044070cf3 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
case OCLASS_EVENT_TRIGGER:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 71612d577e9..df264329d8b 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype)
case OBJECT_POLICY:
case OBJECT_PROCEDURE:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROUTINE:
case OBJECT_RULE:
@@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass)
case OCLASS_EXTENSION:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
@@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c7f91611dc..d1fff13d2e9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -25,7 +25,9 @@
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -34,6 +36,7 @@
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -45,11 +48,16 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
+static List *OpenReliIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
+static void LockSchemaList(List *schemalist);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
static void
parse_publication_options(ParseState *pstate,
@@ -136,6 +144,97 @@ parse_publication_options(ParseState *pstate,
}
/*
+ * Convert the PublicationObjSpecType list into schema oid list and
+ * PublicationTable list.
+ */
+static void
+ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
+ List **rels, List **schemas)
+{
+ ListCell *cell;
+ PublicationObjSpec *pubobj;
+
+ if (!pubobjspec_list)
+ return;
+
+ foreach(cell, pubobjspec_list)
+ {
+ Oid schemaid;
+ List *search_path;
+
+ pubobj = (PublicationObjSpec *) lfirst(cell);
+
+ switch (pubobj->pubobjtype)
+ {
+ case PUBLICATIONOBJ_TABLE:
+ *rels = lappend(*rels, pubobj->pubtable);
+ break;
+ case PUBLICATIONOBJ_REL_IN_SCHEMA:
+ schemaid = get_namespace_oid(pubobj->name, false);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ case PUBLICATIONOBJ_CURRSCHEMA:
+ search_path = fetch_search_path(false);
+ if (search_path == NIL) /* nothing valid in search_path? */
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("no schema has been selected for CURRENT_SCHEMA"));
+
+ schemaid = linitial_oid(search_path);
+ list_free(search_path);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ default:
+ /* shouldn't happen */
+ elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
+ break;
+ }
+ }
+}
+
+/*
+ * Check if any of the given relation's schema is a member of the given schema
+ * list.
+ */
+static void
+CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
+ PublicationObjSpecType checkobjtype)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
+ Oid relSchemaId = RelationGetNamespace(rel);
+
+ if (list_member_oid(schemaidlist, relSchemaId))
+ {
+ if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema \"%s\" to publication",
+ get_namespace_name(relSchemaId)),
+ errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
+ RelationGetRelationName(rel),
+ get_namespace_name(relSchemaId)));
+ else if (checkobjtype == PUBLICATIONOBJ_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s.%s\" to publication",
+ get_namespace_name(relSchemaId),
+ RelationGetRelationName(rel)),
+ errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
+ get_namespace_name(relSchemaId)));
+ }
+ }
+}
+
+/*
* Create new publication.
*/
ObjectAddress
@@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root_given;
bool publish_via_partition_root;
AclResult aclresult;
+ List *relations = NIL;
+ List *schemaidlist = NIL;
/* must have CREATE privilege on database */
aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
@@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
/* Make the changes visible. */
CommandCounterIncrement();
- if (stmt->tables)
- {
- List *rels;
-
- Assert(list_length(stmt->tables) > 0);
-
- rels = OpenTableList(stmt->tables);
- PublicationAddTables(puboid, rels, true, NULL);
- CloseTableList(rels);
- }
- else if (stmt->for_all_tables)
+ /* Associate objects with the publication. */
+ if (stmt->for_all_tables)
{
/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcacheAll();
}
+ else
+ {
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ /* FOR ALL TABLES IN SCHEMA requires superuser */
+ if (list_length(schemaidlist) > 0 && !superuser())
+ ereport(ERROR,
+ errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
+
+ if (list_length(relations) > 0)
+ {
+ List *rels;
+
+ rels = OpenTableList(relations);
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+ PublicationAddTables(puboid, rels, true, NULL);
+ CloseTableList(rels);
+ }
+
+ if (list_length(schemaidlist) > 0)
+ {
+ /*
+ * Schema lock is held until the publication is created to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ PublicationAddSchemas(puboid, schemaidlist, true, NULL);
+ }
+ }
table_close(rel, RowExclusiveLock);
@@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
}
else
{
+ List *relids = NIL;
+ List *schemarelids = NIL;
+
/*
* For any partitioned tables contained in the publication, we must
* invalidate all partitions contained in the respective partition
* trees, not just those explicitly mentioned in the publication.
*/
- List *relids = GetPublicationRelations(pubform->oid,
- PUBLICATION_PART_ALL);
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ relids = list_concat_unique_oid(relids, schemarelids);
InvalidatePublicationRels(relids);
}
@@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids)
* Add or remove table to/from publication.
*/
static void
-AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
- HeapTuple tup)
+AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
Oid pubid = pubform->oid;
- /* Check that user is allowed to manipulate the publication tables. */
- if (pubform->puballtables)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("publication \"%s\" is defined as FOR ALL TABLES",
- NameStr(pubform->pubname)),
- errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * tables in which case we need to remove all the existing tables.
+ */
+ if (!tables && stmt->action != DEFELEM_SET)
+ return;
- Assert(list_length(stmt->tables) > 0);
+ rels = OpenTableList(tables);
- rels = OpenTableList(stmt->tables);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *schemas = NIL;
- if (stmt->tableAction == DEFELEM_ADD)
+ /*
+ * Check if the relation is member of the existing schema in the
+ * publication or member of the schema list specified.
+ */
+ schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
+ CheckObjSchemaNotAlreadyInPublication(rels, schemas,
+ PUBLICATIONOBJ_TABLE);
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ }
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
List *delrels = NIL;
ListCell *oldlc;
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+
/* Calculate which relations to drop. */
foreach(oldlc, oldrelids)
{
@@ -441,10 +582,110 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
}
/*
+ * Alter the publication schemas.
+ *
+ * Add or remove schemas to/from publication.
+ */
+static void
+AlterPublicationSchemas(AlterPublicationStmt *stmt,
+ HeapTuple tup, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * schemas in which case we need to remove all the existing schemas.
+ */
+ if (!schemaidlist && stmt->action != DEFELEM_SET)
+ return;
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *rels;
+ List *reloids;
+
+ reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
+ rels = OpenReliIdList(reloids);
+
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_REL_IN_SCHEMA);
+
+ CloseTableList(rels);
+ PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
+ }
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSchemas(pubform->oid, schemaidlist, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldschemaids = GetPublicationSchemas(pubform->oid);
+ List *delschemas = NIL;
+
+ /* Identify which schemas should be dropped */
+ delschemas = list_difference_oid(oldschemaids, schemaidlist);
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(delschemas);
+
+ /* And drop them */
+ PublicationDropSchemas(pubform->oid, delschemas, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
+ }
+}
+
+/*
+ * Check if relations and schemas can be in a given publication and throw
+ * appropriate error if not.
+ */
+static void
+CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) &&
+ schemaidlist && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to add or set schemas")));
+
+ /*
+ * Check that user is allowed to manipulate the publication tables in
+ * schema
+ */
+ if (schemaidlist && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (tables && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+}
+
+/*
* Alter the existing publication.
*
- * This is dispatcher function for AlterPublicationOptions and
- * AlterPublicationTables.
+ * This is dispatcher function for AlterPublicationOptions,
+ * AlterPublicationSchemas and AlterPublicationTables.
*/
void
AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
@@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(pstate, stmt, rel, tup);
else
- AlterPublicationTables(stmt, rel, tup);
+ {
+ List *relations = NIL;
+ List *schemaidlist = NIL;
+
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ CheckAlterPublication(stmt, tup, relations, schemaidlist);
+
+ /*
+ * Lock the publication so nobody else can do anything with it. This
+ * prevents concurrent alter to add table(s) that were already going
+ * to become part of the publication by adding corresponding schema(s)
+ * via this command and similarly it will prevent the concurrent
+ * addition of schema(s) for which there is any corresponding table
+ * being added by this command.
+ */
+ LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+ AccessExclusiveLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on publication,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of publication.
+ */
+ if (!SearchSysCacheExists1(PUBLICATIONOID,
+ ObjectIdGetDatum(pubform->oid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication \"%s\" does not exist",
+ stmt->pubname));
+
+ AlterPublicationTables(stmt, tup, relations, schemaidlist);
+ AlterPublicationSchemas(stmt, tup, schemaidlist);
+ }
/* Cleanup. */
heap_freetuple(tup);
@@ -552,9 +827,71 @@ RemovePublicationById(Oid pubid)
}
/*
+ * Remove schema from publication by mapping OID.
+ */
+void
+RemovePublicationSchemaById(Oid psoid)
+{
+ Relation rel;
+ HeapTuple tup;
+ List *schemaRels = NIL;
+ Form_pg_publication_namespace pubsch;
+
+ rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication schema %u", psoid);
+
+ pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
+
+ /*
+ * Invalidate relcache so that publication info is rebuilt. See
+ * RemovePublicationRelById for why we need to consider all the
+ * partitions.
+ */
+ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
+ PUBLICATION_PART_ALL);
+ InvalidatePublicationRels(schemaRels);
+
+ CatalogTupleDelete(rel, &tup->t_self);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Open relations specified by a relid list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenReliIdList(List *relids)
+{
+ ListCell *lc;
+ List *rels = NIL;
+
+ foreach(lc, relids)
+ {
+ PublicationRelInfo *pub_rel;
+ Oid relid = lfirst_oid(lc);
+ Relation rel = table_open(relid,
+ ShareUpdateExclusiveLock);
+
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
+ }
+
+ return rels;
+}
+
+/*
* Open relations specified by a PublicationTable list.
- * In the returned list of PublicationRelInfo, tables are locked
- * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
*/
static List *
OpenTableList(List *tables)
@@ -659,6 +996,35 @@ CloseTableList(List *rels)
}
/*
+ * Lock the schemas specified in the schema list in AccessShareLock mode in
+ * order to prevent concurrent schema deletion.
+ */
+static void
+LockSchemaList(List *schemalist)
+{
+ ListCell *lc;
+
+ foreach(lc, schemalist)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+ LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on schema,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of schema.
+ */
+ if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("schema with OID %u does not exist", schemaid));
+ }
+}
+
+/*
* Add listed tables to the publication.
*/
static void
@@ -728,6 +1094,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
/*
+ * Add listed schemas to the publication.
+ */
+static void
+PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_tables);
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+ ObjectAddress obj;
+
+ obj = publication_add_schema(pubid, schemaid, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed schemas from the publication.
+ */
+static void
+PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid psid;
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
+ Anum_pg_publication_namespace_oid,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(psid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("tables from schema \"%s\" are not part of the publication",
+ get_namespace_name(schemaid))));
+ }
+
+ ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+/*
* Internal workhorse for changing a publication owner
*/
static void
diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c
index 308e0adb553..53c18628a7d 100644
--- a/src/backend/commands/seclabel.c
+++ b/src/backend/commands/seclabel.c
@@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype)
case OBJECT_OPERATOR:
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_RULE:
case OBJECT_STATISTIC_EXT:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1a2f159f24e..857cc5ce6e2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
case OCLASS_EXTENSION:
case OCLASS_EVENT_TRIGGER:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema)
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
+ /*
+ * Check that setting the relation to a different schema won't result in a
+ * publication having both a schema and the same schema's table, as this
+ * is not supported.
+ */
+ if (stmt->objectType == OBJECT_TABLE)
+ {
+ ListCell *lc;
+ List *schemaPubids = GetSchemaPublications(nspOid);
+ List *relPubids = GetRelationPublications(RelationGetRelid(rel));
+
+ foreach(lc, relPubids)
+ {
+ Oid pubid = lfirst_oid(lc);
+
+ if (list_member_oid(schemaPubids, pubid))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move table \"%s\" to schema \"%s\"",
+ RelationGetRelationName(rel), stmt->newschema),
+ errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".",
+ stmt->newschema,
+ RelationGetRelationName(rel),
+ get_publication_name(pubid, false)));
+ }
+ }
+
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid);