aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/alter.c1
-rw-r--r--src/backend/commands/event_trigger.c4
-rw-r--r--src/backend/commands/publicationcmds.c490
-rw-r--r--src/backend/commands/seclabel.c1
-rw-r--r--src/backend/commands/tablecmds.c28
5 files changed, 493 insertions, 31 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index c47d54e96bb..40044070cf3 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
case OCLASS_EVENT_TRIGGER:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 71612d577e9..df264329d8b 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype)
case OBJECT_POLICY:
case OBJECT_PROCEDURE:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROUTINE:
case OBJECT_RULE:
@@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass)
case OCLASS_EXTENSION:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
@@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c7f91611dc..d1fff13d2e9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -25,7 +25,9 @@
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -34,6 +36,7 @@
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -45,11 +48,16 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
+static List *OpenReliIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
+static void LockSchemaList(List *schemalist);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
static void
parse_publication_options(ParseState *pstate,
@@ -136,6 +144,97 @@ parse_publication_options(ParseState *pstate,
}
/*
+ * Convert the PublicationObjSpecType list into schema oid list and
+ * PublicationTable list.
+ */
+static void
+ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
+ List **rels, List **schemas)
+{
+ ListCell *cell;
+ PublicationObjSpec *pubobj;
+
+ if (!pubobjspec_list)
+ return;
+
+ foreach(cell, pubobjspec_list)
+ {
+ Oid schemaid;
+ List *search_path;
+
+ pubobj = (PublicationObjSpec *) lfirst(cell);
+
+ switch (pubobj->pubobjtype)
+ {
+ case PUBLICATIONOBJ_TABLE:
+ *rels = lappend(*rels, pubobj->pubtable);
+ break;
+ case PUBLICATIONOBJ_REL_IN_SCHEMA:
+ schemaid = get_namespace_oid(pubobj->name, false);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ case PUBLICATIONOBJ_CURRSCHEMA:
+ search_path = fetch_search_path(false);
+ if (search_path == NIL) /* nothing valid in search_path? */
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("no schema has been selected for CURRENT_SCHEMA"));
+
+ schemaid = linitial_oid(search_path);
+ list_free(search_path);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ default:
+ /* shouldn't happen */
+ elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
+ break;
+ }
+ }
+}
+
+/*
+ * Check if any of the given relation's schema is a member of the given schema
+ * list.
+ */
+static void
+CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
+ PublicationObjSpecType checkobjtype)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
+ Oid relSchemaId = RelationGetNamespace(rel);
+
+ if (list_member_oid(schemaidlist, relSchemaId))
+ {
+ if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema \"%s\" to publication",
+ get_namespace_name(relSchemaId)),
+ errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
+ RelationGetRelationName(rel),
+ get_namespace_name(relSchemaId)));
+ else if (checkobjtype == PUBLICATIONOBJ_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s.%s\" to publication",
+ get_namespace_name(relSchemaId),
+ RelationGetRelationName(rel)),
+ errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
+ get_namespace_name(relSchemaId)));
+ }
+ }
+}
+
+/*
* Create new publication.
*/
ObjectAddress
@@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root_given;
bool publish_via_partition_root;
AclResult aclresult;
+ List *relations = NIL;
+ List *schemaidlist = NIL;
/* must have CREATE privilege on database */
aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
@@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
/* Make the changes visible. */
CommandCounterIncrement();
- if (stmt->tables)
- {
- List *rels;
-
- Assert(list_length(stmt->tables) > 0);
-
- rels = OpenTableList(stmt->tables);
- PublicationAddTables(puboid, rels, true, NULL);
- CloseTableList(rels);
- }
- else if (stmt->for_all_tables)
+ /* Associate objects with the publication. */
+ if (stmt->for_all_tables)
{
/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcacheAll();
}
+ else
+ {
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ /* FOR ALL TABLES IN SCHEMA requires superuser */
+ if (list_length(schemaidlist) > 0 && !superuser())
+ ereport(ERROR,
+ errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
+
+ if (list_length(relations) > 0)
+ {
+ List *rels;
+
+ rels = OpenTableList(relations);
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+ PublicationAddTables(puboid, rels, true, NULL);
+ CloseTableList(rels);
+ }
+
+ if (list_length(schemaidlist) > 0)
+ {
+ /*
+ * Schema lock is held until the publication is created to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ PublicationAddSchemas(puboid, schemaidlist, true, NULL);
+ }
+ }
table_close(rel, RowExclusiveLock);
@@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
}
else
{
+ List *relids = NIL;
+ List *schemarelids = NIL;
+
/*
* For any partitioned tables contained in the publication, we must
* invalidate all partitions contained in the respective partition
* trees, not just those explicitly mentioned in the publication.
*/
- List *relids = GetPublicationRelations(pubform->oid,
- PUBLICATION_PART_ALL);
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ relids = list_concat_unique_oid(relids, schemarelids);
InvalidatePublicationRels(relids);
}
@@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids)
* Add or remove table to/from publication.
*/
static void
-AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
- HeapTuple tup)
+AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
Oid pubid = pubform->oid;
- /* Check that user is allowed to manipulate the publication tables. */
- if (pubform->puballtables)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("publication \"%s\" is defined as FOR ALL TABLES",
- NameStr(pubform->pubname)),
- errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * tables in which case we need to remove all the existing tables.
+ */
+ if (!tables && stmt->action != DEFELEM_SET)
+ return;
- Assert(list_length(stmt->tables) > 0);
+ rels = OpenTableList(tables);
- rels = OpenTableList(stmt->tables);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *schemas = NIL;
- if (stmt->tableAction == DEFELEM_ADD)
+ /*
+ * Check if the relation is member of the existing schema in the
+ * publication or member of the schema list specified.
+ */
+ schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
+ CheckObjSchemaNotAlreadyInPublication(rels, schemas,
+ PUBLICATIONOBJ_TABLE);
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ }
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
List *delrels = NIL;
ListCell *oldlc;
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+
/* Calculate which relations to drop. */
foreach(oldlc, oldrelids)
{
@@ -441,10 +582,110 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
}
/*
+ * Alter the publication schemas.
+ *
+ * Add or remove schemas to/from publication.
+ */
+static void
+AlterPublicationSchemas(AlterPublicationStmt *stmt,
+ HeapTuple tup, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * schemas in which case we need to remove all the existing schemas.
+ */
+ if (!schemaidlist && stmt->action != DEFELEM_SET)
+ return;
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *rels;
+ List *reloids;
+
+ reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
+ rels = OpenReliIdList(reloids);
+
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_REL_IN_SCHEMA);
+
+ CloseTableList(rels);
+ PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
+ }
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSchemas(pubform->oid, schemaidlist, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldschemaids = GetPublicationSchemas(pubform->oid);
+ List *delschemas = NIL;
+
+ /* Identify which schemas should be dropped */
+ delschemas = list_difference_oid(oldschemaids, schemaidlist);
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(delschemas);
+
+ /* And drop them */
+ PublicationDropSchemas(pubform->oid, delschemas, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
+ }
+}
+
+/*
+ * Check if relations and schemas can be in a given publication and throw
+ * appropriate error if not.
+ */
+static void
+CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) &&
+ schemaidlist && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to add or set schemas")));
+
+ /*
+ * Check that user is allowed to manipulate the publication tables in
+ * schema
+ */
+ if (schemaidlist && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (tables && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+}
+
+/*
* Alter the existing publication.
*
- * This is dispatcher function for AlterPublicationOptions and
- * AlterPublicationTables.
+ * This is dispatcher function for AlterPublicationOptions,
+ * AlterPublicationSchemas and AlterPublicationTables.
*/
void
AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
@@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(pstate, stmt, rel, tup);
else
- AlterPublicationTables(stmt, rel, tup);
+ {
+ List *relations = NIL;
+ List *schemaidlist = NIL;
+
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ CheckAlterPublication(stmt, tup, relations, schemaidlist);
+
+ /*
+ * Lock the publication so nobody else can do anything with it. This
+ * prevents concurrent alter to add table(s) that were already going
+ * to become part of the publication by adding corresponding schema(s)
+ * via this command and similarly it will prevent the concurrent
+ * addition of schema(s) for which there is any corresponding table
+ * being added by this command.
+ */
+ LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+ AccessExclusiveLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on publication,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of publication.
+ */
+ if (!SearchSysCacheExists1(PUBLICATIONOID,
+ ObjectIdGetDatum(pubform->oid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication \"%s\" does not exist",
+ stmt->pubname));
+
+ AlterPublicationTables(stmt, tup, relations, schemaidlist);
+ AlterPublicationSchemas(stmt, tup, schemaidlist);
+ }
/* Cleanup. */
heap_freetuple(tup);
@@ -552,9 +827,71 @@ RemovePublicationById(Oid pubid)
}
/*
+ * Remove schema from publication by mapping OID.
+ */
+void
+RemovePublicationSchemaById(Oid psoid)
+{
+ Relation rel;
+ HeapTuple tup;
+ List *schemaRels = NIL;
+ Form_pg_publication_namespace pubsch;
+
+ rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication schema %u", psoid);
+
+ pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
+
+ /*
+ * Invalidate relcache so that publication info is rebuilt. See
+ * RemovePublicationRelById for why we need to consider all the
+ * partitions.
+ */
+ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
+ PUBLICATION_PART_ALL);
+ InvalidatePublicationRels(schemaRels);
+
+ CatalogTupleDelete(rel, &tup->t_self);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Open relations specified by a relid list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenReliIdList(List *relids)
+{
+ ListCell *lc;
+ List *rels = NIL;
+
+ foreach(lc, relids)
+ {
+ PublicationRelInfo *pub_rel;
+ Oid relid = lfirst_oid(lc);
+ Relation rel = table_open(relid,
+ ShareUpdateExclusiveLock);
+
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
+ }
+
+ return rels;
+}
+
+/*
* Open relations specified by a PublicationTable list.
- * In the returned list of PublicationRelInfo, tables are locked
- * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
*/
static List *
OpenTableList(List *tables)
@@ -659,6 +996,35 @@ CloseTableList(List *rels)
}
/*
+ * Lock the schemas specified in the schema list in AccessShareLock mode in
+ * order to prevent concurrent schema deletion.
+ */
+static void
+LockSchemaList(List *schemalist)
+{
+ ListCell *lc;
+
+ foreach(lc, schemalist)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+ LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on schema,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of schema.
+ */
+ if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("schema with OID %u does not exist", schemaid));
+ }
+}
+
+/*
* Add listed tables to the publication.
*/
static void
@@ -728,6 +1094,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
/*
+ * Add listed schemas to the publication.
+ */
+static void
+PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_tables);
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+ ObjectAddress obj;
+
+ obj = publication_add_schema(pubid, schemaid, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed schemas from the publication.
+ */
+static void
+PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid psid;
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
+ Anum_pg_publication_namespace_oid,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(psid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("tables from schema \"%s\" are not part of the publication",
+ get_namespace_name(schemaid))));
+ }
+
+ ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+/*
* Internal workhorse for changing a publication owner
*/
static void
diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c
index 308e0adb553..53c18628a7d 100644
--- a/src/backend/commands/seclabel.c
+++ b/src/backend/commands/seclabel.c
@@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype)
case OBJECT_OPERATOR:
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_RULE:
case OBJECT_STATISTIC_EXT:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1a2f159f24e..857cc5ce6e2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
case OCLASS_EXTENSION:
case OCLASS_EVENT_TRIGGER:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema)
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
+ /*
+ * Check that setting the relation to a different schema won't result in a
+ * publication having both a schema and the same schema's table, as this
+ * is not supported.
+ */
+ if (stmt->objectType == OBJECT_TABLE)
+ {
+ ListCell *lc;
+ List *schemaPubids = GetSchemaPublications(nspOid);
+ List *relPubids = GetRelationPublications(RelationGetRelid(rel));
+
+ foreach(lc, relPubids)
+ {
+ Oid pubid = lfirst_oid(lc);
+
+ if (list_member_oid(schemaPubids, pubid))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move table \"%s\" to schema \"%s\"",
+ RelationGetRelationName(rel), stmt->newschema),
+ errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".",
+ stmt->newschema,
+ RelationGetRelationName(rel),
+ get_publication_name(pubid, false)));
+ }
+ }
+
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid);