aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/Makefile4
-rw-r--r--src/backend/catalog/aclchk.c2
-rw-r--r--src/backend/catalog/dependency.c9
-rw-r--r--src/backend/catalog/objectaddress.c149
-rw-r--r--src/backend/catalog/pg_publication.c332
-rw-r--r--src/backend/commands/alter.c1
-rw-r--r--src/backend/commands/event_trigger.c4
-rw-r--r--src/backend/commands/publicationcmds.c490
-rw-r--r--src/backend/commands/seclabel.c1
-rw-r--r--src/backend/commands/tablecmds.c28
-rw-r--r--src/backend/nodes/copyfuncs.c22
-rw-r--r--src/backend/nodes/equalfuncs.c21
-rw-r--r--src/backend/parser/gram.y307
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c19
-rw-r--r--src/backend/utils/cache/relcache.c7
-rw-r--r--src/backend/utils/cache/syscache.c23
16 files changed, 1305 insertions, 114 deletions
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index d297e773612..4e6efda97f3 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -68,8 +68,8 @@ CATALOG_HEADERS := \
pg_foreign_table.h pg_policy.h pg_replication_origin.h \
pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \
pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \
- pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \
- pg_subscription_rel.h
+ pg_sequence.h pg_publication.h pg_publication_namespace.h \
+ pg_publication_rel.h pg_subscription.h pg_subscription_rel.h
GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index 89792b154ee..ce0a4ff14e3 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -3427,6 +3427,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
case OBJECT_DEFAULT:
case OBJECT_DEFACL:
case OBJECT_DOMCONSTRAINT:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
@@ -3566,6 +3567,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
case OBJECT_DEFAULT:
case OBJECT_DEFACL:
case OBJECT_DOMCONSTRAINT:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_TRANSFORM:
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 91c3e976e01..9f8eb1a37fd 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -49,6 +49,7 @@
#include "catalog/pg_policy.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_statistic_ext.h"
@@ -178,6 +179,7 @@ static const Oid object_classes[] = {
ExtensionRelationId, /* OCLASS_EXTENSION */
EventTriggerRelationId, /* OCLASS_EVENT_TRIGGER */
PolicyRelationId, /* OCLASS_POLICY */
+ PublicationNamespaceRelationId, /* OCLASS_PUBLICATION_NAMESPACE */
PublicationRelationId, /* OCLASS_PUBLICATION */
PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */
SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */
@@ -1456,6 +1458,10 @@ doDeletion(const ObjectAddress *object, int flags)
RemovePolicyById(object->objectId);
break;
+ case OCLASS_PUBLICATION_NAMESPACE:
+ RemovePublicationSchemaById(object->objectId);
+ break;
+
case OCLASS_PUBLICATION_REL:
RemovePublicationRelById(object->objectId);
break;
@@ -2850,6 +2856,9 @@ getObjectClass(const ObjectAddress *object)
case PolicyRelationId:
return OCLASS_POLICY;
+ case PublicationNamespaceRelationId:
+ return OCLASS_PUBLICATION_NAMESPACE;
+
case PublicationRelationId:
return OCLASS_PUBLICATION;
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 8c94939baa8..2bae3fbb174 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -48,6 +48,7 @@
#include "catalog/pg_policy.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_statistic_ext.h"
@@ -825,6 +826,10 @@ static const struct object_type_map
{
"publication", OBJECT_PUBLICATION
},
+ /* OCLASS_PUBLICATION_NAMESPACE */
+ {
+ "publication namespace", OBJECT_PUBLICATION_NAMESPACE
+ },
/* OCLASS_PUBLICATION_REL */
{
"publication relation", OBJECT_PUBLICATION_REL
@@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object,
static ObjectAddress get_object_address_publication_rel(List *object,
Relation *relp,
bool missing_ok);
+static ObjectAddress get_object_address_publication_schema(List *object,
+ bool missing_ok);
static ObjectAddress get_object_address_defacl(List *object,
bool missing_ok);
static const ObjectPropertyType *get_object_property_data(Oid class_id);
@@ -1113,6 +1120,10 @@ get_object_address(ObjectType objtype, Node *object,
address = get_object_address_usermapping(castNode(List, object),
missing_ok);
break;
+ case OBJECT_PUBLICATION_NAMESPACE:
+ address = get_object_address_publication_schema(castNode(List, object),
+ missing_ok);
+ break;
case OBJECT_PUBLICATION_REL:
address = get_object_address_publication_rel(castNode(List, object),
&relation,
@@ -1936,6 +1947,49 @@ get_object_address_publication_rel(List *object,
}
/*
+ * Find the ObjectAddress for a publication schema. The first element of the
+ * object parameter is the schema name, the second is the publication name.
+ */
+static ObjectAddress
+get_object_address_publication_schema(List *object, bool missing_ok)
+{
+ ObjectAddress address;
+ Publication *pub;
+ char *pubname;
+ char *schemaname;
+ Oid schemaid;
+
+ ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid);
+
+ /* Fetch schema name and publication name from input list */
+ schemaname = strVal(linitial(object));
+ pubname = strVal(lsecond(object));
+
+ schemaid = get_namespace_oid(schemaname, missing_ok);
+ if (!OidIsValid(schemaid))
+ return address;
+
+ /* Now look up the pg_publication tuple */
+ pub = GetPublicationByName(pubname, missing_ok);
+ if (!pub)
+ return address;
+
+ /* Find the publication schema mapping in syscache */
+ address.objectId =
+ GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
+ Anum_pg_publication_namespace_oid,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pub->oid));
+ if (!OidIsValid(address.objectId) && !missing_ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication schema \"%s\" in publication \"%s\" does not exist",
+ schemaname, pubname)));
+
+ return address;
+}
+
+/*
* Find the ObjectAddress for a default ACL.
*/
static ObjectAddress
@@ -2206,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS)
case OBJECT_DOMCONSTRAINT:
case OBJECT_CAST:
case OBJECT_USER_MAPPING:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_DEFACL:
case OBJECT_TRANSFORM:
@@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS)
case OBJECT_PUBLICATION_REL:
objnode = (Node *) list_make2(name, linitial(args));
break;
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_USER_MAPPING:
objnode = (Node *) list_make2(linitial(name), linitial(args));
break;
@@ -2849,6 +2905,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId)
}
/*
+ * getPublicationSchemaInfo
+ *
+ * Get publication name and schema name from the object address into pubname and
+ * nspname. Both pubname and nspname are palloc'd strings which will be freed by
+ * the caller.
+ */
+static bool
+getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok,
+ char **pubname, char **nspname)
+{
+ HeapTuple tup;
+ Form_pg_publication_namespace pnform;
+
+ tup = SearchSysCache1(PUBLICATIONNAMESPACE,
+ ObjectIdGetDatum(object->objectId));
+ if (!HeapTupleIsValid(tup))
+ {
+ if (!missing_ok)
+ elog(ERROR, "cache lookup failed for publication schema %u",
+ object->objectId);
+ return false;
+ }
+
+ pnform = (Form_pg_publication_namespace) GETSTRUCT(tup);
+ *pubname = get_publication_name(pnform->pnpubid, missing_ok);
+ if (!(*pubname))
+ {
+ ReleaseSysCache(tup);
+ return false;
+ }
+
+ *nspname = get_namespace_name(pnform->pnnspid);
+ if (!(*nspname))
+ {
+ Oid schemaid = pnform->pnnspid;
+
+ pfree(*pubname);
+ ReleaseSysCache(tup);
+ if (!missing_ok)
+ elog(ERROR, "cache lookup failed for schema %u",
+ schemaid);
+ return false;
+ }
+
+ ReleaseSysCache(tup);
+ return true;
+}
+
+/*
* getObjectDescription: build an object description for messages
*
* The result is a palloc'd string. NULL is returned for an undefined
@@ -3872,6 +3977,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
break;
}
+ case OCLASS_PUBLICATION_NAMESPACE:
+ {
+ char *pubname;
+ char *nspname;
+
+ if (!getPublicationSchemaInfo(object, missing_ok,
+ &pubname, &nspname))
+ break;
+
+ appendStringInfo(&buffer, _("publication of schema %s in publication %s"),
+ nspname, pubname);
+ pfree(pubname);
+ pfree(nspname);
+ break;
+ }
+
case OCLASS_PUBLICATION_REL:
{
HeapTuple tup;
@@ -4473,6 +4594,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok)
appendStringInfoString(&buffer, "publication");
break;
+ case OCLASS_PUBLICATION_NAMESPACE:
+ appendStringInfoString(&buffer, "publication namespace");
+ break;
+
case OCLASS_PUBLICATION_REL:
appendStringInfoString(&buffer, "publication relation");
break;
@@ -5683,6 +5808,30 @@ getObjectIdentityParts(const ObjectAddress *object,
break;
}
+ case OCLASS_PUBLICATION_NAMESPACE:
+ {
+ char *pubname;
+ char *nspname;
+
+ if (!getPublicationSchemaInfo(object, missing_ok, &pubname,
+ &nspname))
+ break;
+ appendStringInfo(&buffer, "%s in publication %s",
+ nspname, pubname);
+
+ if (objargs)
+ *objargs = list_make1(pubname);
+ else
+ pfree(pubname);
+
+ if (objname)
+ *objname = list_make1(nspname);
+ else
+ pfree(nspname);
+
+ break;
+ }
+
case OCLASS_PUBLICATION_REL:
{
HeapTuple tup;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9cd0c82f93c..fed83b89a98 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -28,7 +28,9 @@
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/publicationcmds.h"
@@ -77,6 +79,30 @@ check_publication_add_relation(Relation targetrel)
}
/*
+ * Check if schema can be in given publication and throw appropriate error if
+ * not.
+ */
+static void
+check_publication_add_schema(Oid schemaid)
+{
+ /* Can't be system namespace */
+ if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema \"%s\" to publication",
+ get_namespace_name(schemaid)),
+ errdetail("This operation is not supported for system schemas.")));
+
+ /* Can't be temporary namespace */
+ if (isAnyTempNamespace(schemaid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema \"%s\" to publication",
+ get_namespace_name(schemaid)),
+ errdetail("Temporary schemas cannot be replicated.")));
+}
+
+/*
* Returns if relation represented by oid and Form_pg_class entry
* is publishable.
*
@@ -106,6 +132,53 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
}
/*
+ * Filter out the partitions whose parent tables were also specified in
+ * the publication.
+ */
+static List *
+filter_partitions(List *relids, List *schemarelids)
+{
+ List *result = NIL;
+ ListCell *lc;
+ ListCell *lc2;
+
+ foreach(lc, relids)
+ {
+ bool skip = false;
+ List *ancestors = NIL;
+ Oid relid = lfirst_oid(lc);
+
+ if (get_rel_relispartition(relid))
+ ancestors = get_partition_ancestors(relid);
+
+ foreach(lc2, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc2);
+
+ /*
+ * Check if the parent table exists in the published table list.
+ *
+ * XXX As of now, we do this if the partition relation or the
+ * partition relation's ancestor is present in schema publication
+ * relations.
+ */
+ if (list_member_oid(relids, ancestor) &&
+ (list_member_oid(schemarelids, relid) ||
+ list_member_oid(schemarelids, ancestor)))
+ {
+ skip = true;
+ break;
+ }
+ }
+
+ if (!skip)
+ result = lappend_oid(result, relid);
+ }
+
+ return result;
+}
+
+/*
* Another variant of this, taking a Relation.
*/
bool
@@ -262,6 +335,89 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
return myself;
}
+/*
+ * Insert new publication / schema mapping.
+ */
+ObjectAddress
+publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
+{
+ Relation rel;
+ HeapTuple tup;
+ Datum values[Natts_pg_publication_namespace];
+ bool nulls[Natts_pg_publication_namespace];
+ Oid psschid;
+ Publication *pub = GetPublication(pubid);
+ List *schemaRels = NIL;
+ ObjectAddress myself,
+ referenced;
+
+ rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
+
+ /*
+ * Check for duplicates. Note that this does not really prevent
+ * duplicates, it's here just to provide nicer error message in common
+ * case. The real protection is the unique key on the catalog.
+ */
+ if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pubid)))
+ {
+ table_close(rel, RowExclusiveLock);
+
+ if (if_not_exists)
+ return InvalidObjectAddress;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("schema \"%s\" is already member of publication \"%s\"",
+ get_namespace_name(schemaid), pub->name)));
+ }
+
+ check_publication_add_schema(schemaid);
+
+ /* Form a tuple */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+
+ psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
+ Anum_pg_publication_namespace_oid);
+ values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
+ values[Anum_pg_publication_namespace_pnpubid - 1] =
+ ObjectIdGetDatum(pubid);
+ values[Anum_pg_publication_namespace_pnnspid - 1] =
+ ObjectIdGetDatum(schemaid);
+
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+
+ /* Insert tuple into catalog */
+ CatalogTupleInsert(rel, tup);
+ heap_freetuple(tup);
+
+ ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
+
+ /* Add dependency on the publication */
+ ObjectAddressSet(referenced, PublicationRelationId, pubid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+ /* Add dependency on the schema */
+ ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+ /* Close the table */
+ table_close(rel, RowExclusiveLock);
+
+ /*
+ * Invalidate relcache so that publication info is rebuilt. See
+ * publication_add_relation for why we need to consider all the
+ * partitions.
+ */
+ schemaRels = GetSchemaPublicationRelations(schemaid,
+ PUBLICATION_PART_ALL);
+ InvalidatePublicationRels(schemaRels);
+
+ return myself;
+}
+
/* Gets list of publication oids for a relation */
List *
GetRelationPublications(Oid relid)
@@ -429,6 +585,151 @@ GetAllTablesPublicationRelations(bool pubviaroot)
}
/*
+ * Gets the list of schema oids for a publication.
+ *
+ * This should only be used FOR ALL TABLES IN SCHEMA publications.
+ */
+List *
+GetPublicationSchemas(Oid pubid)
+{
+ List *result = NIL;
+ Relation pubschsrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ /* Find all schemas associated with the publication */
+ pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_publication_namespace_pnpubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(pubid));
+
+ scan = systable_beginscan(pubschsrel,
+ PublicationNamespacePnnspidPnpubidIndexId,
+ true, NULL, 1, &scankey);
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_publication_namespace pubsch;
+
+ pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
+
+ result = lappend_oid(result, pubsch->pnnspid);
+ }
+
+ systable_endscan(scan);
+ table_close(pubschsrel, AccessShareLock);
+
+ return result;
+}
+
+/*
+ * Gets the list of publication oids associated with a specified schema.
+ */
+List *
+GetSchemaPublications(Oid schemaid)
+{
+ List *result = NIL;
+ CatCList *pubschlist;
+ int i;
+
+ /* Find all publications associated with the schema */
+ pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
+ ObjectIdGetDatum(schemaid));
+ for (i = 0; i < pubschlist->n_members; i++)
+ {
+ HeapTuple tup = &pubschlist->members[i]->tuple;
+ Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
+
+ result = lappend_oid(result, pubid);
+ }
+
+ ReleaseSysCacheList(pubschlist);
+
+ return result;
+}
+
+/*
+ * Get the list of publishable relation oids for a specified schema.
+ */
+List *
+GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ Assert(OidIsValid(schemaid));
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relnamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ schemaid);
+
+ /* get all the relations present in the specified schema */
+ scan = table_beginscan_catalog(classRel, 1, key);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+ char relkind;
+
+ if (!is_publishable_class(relid, relForm))
+ continue;
+
+ relkind = get_rel_relkind(relid);
+ if (relkind == RELKIND_RELATION)
+ result = lappend_oid(result, relid);
+ else if (relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ List *partitionrels = NIL;
+
+ /*
+ * It is quite possible that some of the partitions are in a
+ * different schema than the parent table, so we need to get such
+ * partitions separately.
+ */
+ partitionrels = GetPubPartitionOptionRelations(partitionrels,
+ pub_partopt,
+ relForm->oid);
+ result = list_concat_unique_oid(result, partitionrels);
+ }
+ }
+
+ table_endscan(scan);
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
+/*
+ * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA
+ * publication.
+ */
+List *
+GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
+{
+ List *result = NIL;
+ List *pubschemalist = GetPublicationSchemas(pubid);
+ ListCell *cell;
+
+ foreach(cell, pubschemalist)
+ {
+ Oid schemaid = lfirst_oid(cell);
+ List *schemaRels = NIL;
+
+ schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
+ result = list_concat(result, schemaRels);
+ }
+
+ return result;
+}
+
+/*
* Get publication using oid
*
* The Publication struct and its data are palloc'ed here.
@@ -555,12 +856,41 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* need those.
*/
if (publication->alltables)
+ {
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+ }
else
- tables = GetPublicationRelations(publication->oid,
+ {
+ List *relids,
+ *schemarelids;
+
+ relids = GetPublicationRelations(publication->oid,
publication->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ tables = list_concat_unique_oid(relids, schemarelids);
+ if (schemarelids && publication->pubviaroot)
+ {
+ /*
+ * If the publication publishes partition changes via their
+ * respective root partitioned tables, we must exclude
+ * partitions in favor of including the root partitioned
+ * tables. Otherwise, the function could return both the child
+ * and parent tables which could cause data of the child table
+ * to be double-published on the subscriber side.
+ *
+ * XXX As of now, we do this when a publication has associated
+ * schema or for all tables publication. See
+ * GetAllTablesPublicationRelations().
+ */
+ tables = filter_partitions(tables, schemarelids);
+ }
+ }
+
funcctx->user_fctx = (void *) tables;
MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index c47d54e96bb..40044070cf3 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
case OCLASS_EVENT_TRIGGER:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 71612d577e9..df264329d8b 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype)
case OBJECT_POLICY:
case OBJECT_PROCEDURE:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROUTINE:
case OBJECT_RULE:
@@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass)
case OCLASS_EXTENSION:
case OCLASS_POLICY:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
@@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype)
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
case OBJECT_PUBLICATION:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_ROLE:
case OBJECT_RULE:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 9c7f91611dc..d1fff13d2e9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -25,7 +25,9 @@
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -34,6 +36,7 @@
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -45,11 +48,16 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
+static List *OpenReliIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
+static void LockSchemaList(List *schemalist);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
static void
parse_publication_options(ParseState *pstate,
@@ -136,6 +144,97 @@ parse_publication_options(ParseState *pstate,
}
/*
+ * Convert the PublicationObjSpecType list into schema oid list and
+ * PublicationTable list.
+ */
+static void
+ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
+ List **rels, List **schemas)
+{
+ ListCell *cell;
+ PublicationObjSpec *pubobj;
+
+ if (!pubobjspec_list)
+ return;
+
+ foreach(cell, pubobjspec_list)
+ {
+ Oid schemaid;
+ List *search_path;
+
+ pubobj = (PublicationObjSpec *) lfirst(cell);
+
+ switch (pubobj->pubobjtype)
+ {
+ case PUBLICATIONOBJ_TABLE:
+ *rels = lappend(*rels, pubobj->pubtable);
+ break;
+ case PUBLICATIONOBJ_REL_IN_SCHEMA:
+ schemaid = get_namespace_oid(pubobj->name, false);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ case PUBLICATIONOBJ_CURRSCHEMA:
+ search_path = fetch_search_path(false);
+ if (search_path == NIL) /* nothing valid in search_path? */
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("no schema has been selected for CURRENT_SCHEMA"));
+
+ schemaid = linitial_oid(search_path);
+ list_free(search_path);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ default:
+ /* shouldn't happen */
+ elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
+ break;
+ }
+ }
+}
+
+/*
+ * Check if any of the given relation's schema is a member of the given schema
+ * list.
+ */
+static void
+CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
+ PublicationObjSpecType checkobjtype)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
+ Oid relSchemaId = RelationGetNamespace(rel);
+
+ if (list_member_oid(schemaidlist, relSchemaId))
+ {
+ if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema \"%s\" to publication",
+ get_namespace_name(relSchemaId)),
+ errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
+ RelationGetRelationName(rel),
+ get_namespace_name(relSchemaId)));
+ else if (checkobjtype == PUBLICATIONOBJ_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s.%s\" to publication",
+ get_namespace_name(relSchemaId),
+ RelationGetRelationName(rel)),
+ errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
+ get_namespace_name(relSchemaId)));
+ }
+ }
+}
+
+/*
* Create new publication.
*/
ObjectAddress
@@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root_given;
bool publish_via_partition_root;
AclResult aclresult;
+ List *relations = NIL;
+ List *schemaidlist = NIL;
/* must have CREATE privilege on database */
aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
@@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
/* Make the changes visible. */
CommandCounterIncrement();
- if (stmt->tables)
- {
- List *rels;
-
- Assert(list_length(stmt->tables) > 0);
-
- rels = OpenTableList(stmt->tables);
- PublicationAddTables(puboid, rels, true, NULL);
- CloseTableList(rels);
- }
- else if (stmt->for_all_tables)
+ /* Associate objects with the publication. */
+ if (stmt->for_all_tables)
{
/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcacheAll();
}
+ else
+ {
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ /* FOR ALL TABLES IN SCHEMA requires superuser */
+ if (list_length(schemaidlist) > 0 && !superuser())
+ ereport(ERROR,
+ errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
+
+ if (list_length(relations) > 0)
+ {
+ List *rels;
+
+ rels = OpenTableList(relations);
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+ PublicationAddTables(puboid, rels, true, NULL);
+ CloseTableList(rels);
+ }
+
+ if (list_length(schemaidlist) > 0)
+ {
+ /*
+ * Schema lock is held until the publication is created to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ PublicationAddSchemas(puboid, schemaidlist, true, NULL);
+ }
+ }
table_close(rel, RowExclusiveLock);
@@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
}
else
{
+ List *relids = NIL;
+ List *schemarelids = NIL;
+
/*
* For any partitioned tables contained in the publication, we must
* invalidate all partitions contained in the respective partition
* trees, not just those explicitly mentioned in the publication.
*/
- List *relids = GetPublicationRelations(pubform->oid,
- PUBLICATION_PART_ALL);
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ relids = list_concat_unique_oid(relids, schemarelids);
InvalidatePublicationRels(relids);
}
@@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids)
* Add or remove table to/from publication.
*/
static void
-AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
- HeapTuple tup)
+AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
Oid pubid = pubform->oid;
- /* Check that user is allowed to manipulate the publication tables. */
- if (pubform->puballtables)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("publication \"%s\" is defined as FOR ALL TABLES",
- NameStr(pubform->pubname)),
- errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * tables in which case we need to remove all the existing tables.
+ */
+ if (!tables && stmt->action != DEFELEM_SET)
+ return;
- Assert(list_length(stmt->tables) > 0);
+ rels = OpenTableList(tables);
- rels = OpenTableList(stmt->tables);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *schemas = NIL;
- if (stmt->tableAction == DEFELEM_ADD)
+ /*
+ * Check if the relation is member of the existing schema in the
+ * publication or member of the schema list specified.
+ */
+ schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
+ CheckObjSchemaNotAlreadyInPublication(rels, schemas,
+ PUBLICATIONOBJ_TABLE);
PublicationAddTables(pubid, rels, false, stmt);
- else if (stmt->tableAction == DEFELEM_DROP)
+ }
+ else if (stmt->action == DEFELEM_DROP)
PublicationDropTables(pubid, rels, false);
else /* DEFELEM_SET */
{
@@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
List *delrels = NIL;
ListCell *oldlc;
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_TABLE);
+
/* Calculate which relations to drop. */
foreach(oldlc, oldrelids)
{
@@ -441,10 +582,110 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
}
/*
+ * Alter the publication schemas.
+ *
+ * Add or remove schemas to/from publication.
+ */
+static void
+AlterPublicationSchemas(AlterPublicationStmt *stmt,
+ HeapTuple tup, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * schemas in which case we need to remove all the existing schemas.
+ */
+ if (!schemaidlist && stmt->action != DEFELEM_SET)
+ return;
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(schemaidlist);
+ if (stmt->action == DEFELEM_ADD)
+ {
+ List *rels;
+ List *reloids;
+
+ reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
+ rels = OpenReliIdList(reloids);
+
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_REL_IN_SCHEMA);
+
+ CloseTableList(rels);
+ PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
+ }
+ else if (stmt->action == DEFELEM_DROP)
+ PublicationDropSchemas(pubform->oid, schemaidlist, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldschemaids = GetPublicationSchemas(pubform->oid);
+ List *delschemas = NIL;
+
+ /* Identify which schemas should be dropped */
+ delschemas = list_difference_oid(oldschemaids, schemaidlist);
+
+ /*
+ * Schema lock is held until the publication is altered to prevent
+ * concurrent schema deletion.
+ */
+ LockSchemaList(delschemas);
+
+ /* And drop them */
+ PublicationDropSchemas(pubform->oid, delschemas, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
+ }
+}
+
+/*
+ * Check if relations and schemas can be in a given publication and throw
+ * appropriate error if not.
+ */
+static void
+CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *tables, List *schemaidlist)
+{
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) &&
+ schemaidlist && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to add or set schemas")));
+
+ /*
+ * Check that user is allowed to manipulate the publication tables in
+ * schema
+ */
+ if (schemaidlist && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (tables && pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+}
+
+/*
* Alter the existing publication.
*
- * This is dispatcher function for AlterPublicationOptions and
- * AlterPublicationTables.
+ * This is dispatcher function for AlterPublicationOptions,
+ * AlterPublicationSchemas and AlterPublicationTables.
*/
void
AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
@@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
if (stmt->options)
AlterPublicationOptions(pstate, stmt, rel, tup);
else
- AlterPublicationTables(stmt, rel, tup);
+ {
+ List *relations = NIL;
+ List *schemaidlist = NIL;
+
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ &schemaidlist);
+
+ CheckAlterPublication(stmt, tup, relations, schemaidlist);
+
+ /*
+ * Lock the publication so nobody else can do anything with it. This
+ * prevents concurrent alter to add table(s) that were already going
+ * to become part of the publication by adding corresponding schema(s)
+ * via this command and similarly it will prevent the concurrent
+ * addition of schema(s) for which there is any corresponding table
+ * being added by this command.
+ */
+ LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+ AccessExclusiveLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on publication,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of publication.
+ */
+ if (!SearchSysCacheExists1(PUBLICATIONOID,
+ ObjectIdGetDatum(pubform->oid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication \"%s\" does not exist",
+ stmt->pubname));
+
+ AlterPublicationTables(stmt, tup, relations, schemaidlist);
+ AlterPublicationSchemas(stmt, tup, schemaidlist);
+ }
/* Cleanup. */
heap_freetuple(tup);
@@ -552,9 +827,71 @@ RemovePublicationById(Oid pubid)
}
/*
+ * Remove schema from publication by mapping OID.
+ */
+void
+RemovePublicationSchemaById(Oid psoid)
+{
+ Relation rel;
+ HeapTuple tup;
+ List *schemaRels = NIL;
+ Form_pg_publication_namespace pubsch;
+
+ rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication schema %u", psoid);
+
+ pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
+
+ /*
+ * Invalidate relcache so that publication info is rebuilt. See
+ * RemovePublicationRelById for why we need to consider all the
+ * partitions.
+ */
+ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
+ PUBLICATION_PART_ALL);
+ InvalidatePublicationRels(schemaRels);
+
+ CatalogTupleDelete(rel, &tup->t_self);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Open relations specified by a relid list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenReliIdList(List *relids)
+{
+ ListCell *lc;
+ List *rels = NIL;
+
+ foreach(lc, relids)
+ {
+ PublicationRelInfo *pub_rel;
+ Oid relid = lfirst_oid(lc);
+ Relation rel = table_open(relid,
+ ShareUpdateExclusiveLock);
+
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
+ }
+
+ return rels;
+}
+
+/*
* Open relations specified by a PublicationTable list.
- * In the returned list of PublicationRelInfo, tables are locked
- * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
*/
static List *
OpenTableList(List *tables)
@@ -659,6 +996,35 @@ CloseTableList(List *rels)
}
/*
+ * Lock the schemas specified in the schema list in AccessShareLock mode in
+ * order to prevent concurrent schema deletion.
+ */
+static void
+LockSchemaList(List *schemalist)
+{
+ ListCell *lc;
+
+ foreach(lc, schemalist)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+ LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
+
+ /*
+ * It is possible that by the time we acquire the lock on schema,
+ * concurrent DDL has removed it. We can test this by checking the
+ * existence of schema.
+ */
+ if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("schema with OID %u does not exist", schemaid));
+ }
+}
+
+/*
* Add listed tables to the publication.
*/
static void
@@ -728,6 +1094,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
}
/*
+ * Add listed schemas to the publication.
+ */
+static void
+PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_tables);
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+ ObjectAddress obj;
+
+ obj = publication_add_schema(pubid, schemaid, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed schemas from the publication.
+ */
+static void
+PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid psid;
+
+ foreach(lc, schemas)
+ {
+ Oid schemaid = lfirst_oid(lc);
+
+ psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
+ Anum_pg_publication_namespace_oid,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(psid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("tables from schema \"%s\" are not part of the publication",
+ get_namespace_name(schemaid))));
+ }
+
+ ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+/*
* Internal workhorse for changing a publication owner
*/
static void
diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c
index 308e0adb553..53c18628a7d 100644
--- a/src/backend/commands/seclabel.c
+++ b/src/backend/commands/seclabel.c
@@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype)
case OBJECT_OPERATOR:
case OBJECT_OPFAMILY:
case OBJECT_POLICY:
+ case OBJECT_PUBLICATION_NAMESPACE:
case OBJECT_PUBLICATION_REL:
case OBJECT_RULE:
case OBJECT_STATISTIC_EXT:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1a2f159f24e..857cc5ce6e2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
case OCLASS_EXTENSION:
case OCLASS_EVENT_TRIGGER:
case OCLASS_PUBLICATION:
+ case OCLASS_PUBLICATION_NAMESPACE:
case OCLASS_PUBLICATION_REL:
case OCLASS_SUBSCRIPTION:
case OCLASS_TRANSFORM:
@@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema)
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
+ /*
+ * Check that setting the relation to a different schema won't result in a
+ * publication having both a schema and the same schema's table, as this
+ * is not supported.
+ */
+ if (stmt->objectType == OBJECT_TABLE)
+ {
+ ListCell *lc;
+ List *schemaPubids = GetSchemaPublications(nspOid);
+ List *relPubids = GetRelationPublications(RelationGetRelid(rel));
+
+ foreach(lc, relPubids)
+ {
+ Oid pubid = lfirst_oid(lc);
+
+ if (list_member_oid(schemaPubids, pubid))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move table \"%s\" to schema \"%s\"",
+ RelationGetRelationName(rel), stmt->newschema),
+ errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".",
+ stmt->newschema,
+ RelationGetRelationName(rel),
+ get_publication_name(pubid, false)));
+ }
+ }
+
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid);
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 70e9e54d3e5..82464c98896 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4810,6 +4810,19 @@ _copyPartitionCmd(const PartitionCmd *from)
return newnode;
}
+static PublicationObjSpec *
+_copyPublicationObject(const PublicationObjSpec *from)
+{
+ PublicationObjSpec *newnode = makeNode(PublicationObjSpec);
+
+ COPY_SCALAR_FIELD(pubobjtype);
+ COPY_STRING_FIELD(name);
+ COPY_NODE_FIELD(pubtable);
+ COPY_LOCATION_FIELD(location);
+
+ return newnode;
+}
+
static PublicationTable *
_copyPublicationTable(const PublicationTable *from)
{
@@ -4827,7 +4840,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from)
COPY_STRING_FIELD(pubname);
COPY_NODE_FIELD(options);
- COPY_NODE_FIELD(tables);
+ COPY_NODE_FIELD(pubobjects);
COPY_SCALAR_FIELD(for_all_tables);
return newnode;
@@ -4840,9 +4853,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from)
COPY_STRING_FIELD(pubname);
COPY_NODE_FIELD(options);
- COPY_NODE_FIELD(tables);
+ COPY_NODE_FIELD(pubobjects);
COPY_SCALAR_FIELD(for_all_tables);
- COPY_SCALAR_FIELD(tableAction);
+ COPY_SCALAR_FIELD(action);
return newnode;
}
@@ -5887,6 +5900,9 @@ copyObjectImpl(const void *from)
case T_PartitionCmd:
retval = _copyPartitionCmd(from);
break;
+ case T_PublicationObjSpec:
+ retval = _copyPublicationObject(from);
+ break;
case T_PublicationTable:
retval = _copyPublicationTable(from);
break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 19eff201024..f537d3eb968 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2297,6 +2297,18 @@ _equalAlterTSConfigurationStmt(const AlterTSConfigurationStmt *a,
}
static bool
+_equalPublicationObject(const PublicationObjSpec *a,
+ const PublicationObjSpec *b)
+{
+ COMPARE_SCALAR_FIELD(pubobjtype);
+ COMPARE_STRING_FIELD(name);
+ COMPARE_NODE_FIELD(pubtable);
+ COMPARE_LOCATION_FIELD(location);
+
+ return true;
+}
+
+static bool
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
@@ -2310,7 +2322,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a,
{
COMPARE_STRING_FIELD(pubname);
COMPARE_NODE_FIELD(options);
- COMPARE_NODE_FIELD(tables);
+ COMPARE_NODE_FIELD(pubobjects);
COMPARE_SCALAR_FIELD(for_all_tables);
return true;
@@ -2322,9 +2334,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
{
COMPARE_STRING_FIELD(pubname);
COMPARE_NODE_FIELD(options);
- COMPARE_NODE_FIELD(tables);
+ COMPARE_NODE_FIELD(pubobjects);
COMPARE_SCALAR_FIELD(for_all_tables);
- COMPARE_SCALAR_FIELD(tableAction);
+ COMPARE_SCALAR_FIELD(action);
return true;
}
@@ -3894,6 +3906,9 @@ equal(const void *a, const void *b)
case T_PartitionCmd:
retval = _equalPartitionCmd(a, b);
break;
+ case T_PublicationObjSpec:
+ retval = _equalPublicationObject(a, b);
+ break;
case T_PublicationTable:
retval = _equalPublicationTable(a, b);
break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 08f1bf1031c..d0eb80e69cb 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -195,12 +195,17 @@ static Node *makeXmlExpr(XmlExprOp op, char *name, List *named_args,
static List *mergeTableFuncParameters(List *func_args, List *columns);
static TypeName *TableFuncTypeName(List *columns);
static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner);
+static RangeVar *makeRangeVarFromQualifiedName(char *name, List *rels,
+ int location,
+ core_yyscan_t yyscanner);
static void SplitColQualList(List *qualList,
List **constraintList, CollateClause **collClause,
core_yyscan_t yyscanner);
static void processCASbits(int cas_bits, int location, const char *constrType,
bool *deferrable, bool *initdeferred, bool *not_valid,
bool *no_inherit, core_yyscan_t yyscanner);
+static void preprocess_pubobj_list(List *pubobjspec_list,
+ core_yyscan_t yyscanner);
static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%}
@@ -256,6 +261,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
PartitionSpec *partspec;
PartitionBoundSpec *partboundspec;
RoleSpec *rolespec;
+ PublicationObjSpec *publicationobjectspec;
struct SelectLimit *selectlimit;
SetQuantifier setquantifier;
struct GroupClause *groupclause;
@@ -425,14 +431,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
vacuum_relation_list opt_vacuum_relation_list
- drop_option_list publication_table_list
+ drop_option_list pub_obj_list
%type <node> opt_routine_body
%type <groupclause> group_clause
%type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
-%type <node> opt_publication_for_tables publication_for_tables publication_table
%type <list> opt_fdw_options fdw_options
%type <defelt> fdw_option
@@ -517,6 +522,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> table_ref
%type <jexpr> joined_table
%type <range> relation_expr
+%type <range> extended_relation_expr
%type <range> relation_expr_opt_alias
%type <node> tablesample_clause opt_repeatable_clause
%type <target> target_el set_target insert_column_item
@@ -553,6 +559,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <str> createdb_opt_name plassign_target
%type <node> var_value zone_value
%type <rolespec> auth_ident RoleSpec opt_granted_by
+%type <publicationobjectspec> PublicationObjSpec
%type <keyword> unreserved_keyword type_func_name_keyword
%type <keyword> col_name_keyword reserved_keyword
@@ -9591,69 +9598,131 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
/*****************************************************************************
*
- * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ]
+ * CREATE PUBLICATION name [WITH options]
+ *
+ * CREATE PUBLICATION FOR ALL TABLES [WITH options]
+ *
+ * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options]
+ *
+ * pub_obj is one of:
+ *
+ * TABLE table [, ...]
+ * ALL TABLES IN SCHEMA schema [, ...]
*
*****************************************************************************/
CreatePublicationStmt:
- CREATE PUBLICATION name opt_publication_for_tables opt_definition
+ CREATE PUBLICATION name opt_definition
{
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
n->pubname = $3;
- n->options = $5;
- if ($4 != NULL)
- {
- /* FOR TABLE */
- if (IsA($4, List))
- n->tables = (List *)$4;
- /* FOR ALL TABLES */
- else
- n->for_all_tables = true;
- }
+ n->options = $4;
+ $$ = (Node *)n;
+ }
+ | CREATE PUBLICATION name FOR ALL TABLES opt_definition
+ {
+ CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+ n->pubname = $3;
+ n->options = $7;
+ n->for_all_tables = true;
+ $$ = (Node *)n;
+ }
+ | CREATE PUBLICATION name FOR pub_obj_list opt_definition
+ {
+ CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+ n->pubname = $3;
+ n->options = $6;
+ n->pubobjects = (List *)$5;
+ preprocess_pubobj_list(n->pubobjects, yyscanner);
$$ = (Node *)n;
}
;
-opt_publication_for_tables:
- publication_for_tables { $$ = $1; }
- | /* EMPTY */ { $$ = NULL; }
- ;
-
-publication_for_tables:
- FOR TABLE publication_table_list
+/*
+ * FOR TABLE and FOR ALL TABLES IN SCHEMA specifications
+ *
+ * This rule parses publication objects with and without keyword prefixes.
+ *
+ * The actual type of the object without keyword prefix depends on the previous
+ * one with keyword prefix. It will be preprocessed in preprocess_pubobj_list().
+ *
+ * For the object without keyword prefix, we cannot just use relation_expr here,
+ * because some extended expressions in relation_expr cannot be used as a
+ * schemaname and we cannot differentiate it. So, we extract the rules from
+ * relation_expr here.
+ */
+PublicationObjSpec:
+ TABLE relation_expr
{
- $$ = (Node *) $3;
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_TABLE;
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = $2;
}
- | FOR ALL TABLES
+ | ALL TABLES IN_P SCHEMA ColId
{
- $$ = (Node *) makeInteger(true);
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA;
+ $$->name = $5;
+ $$->location = @5;
}
- ;
+ | ALL TABLES IN_P SCHEMA CURRENT_SCHEMA
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA;
+ $$->location = @5;
+ }
+ | ColId
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
+ $$->name = $1;
+ $$->location = @1;
+ }
+ | ColId indirection
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+ $$->location = @1;
+ }
+ /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
+ | extended_relation_expr
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = $1;
+ }
+ | CURRENT_SCHEMA
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
+ $$->location = @1;
+ }
+ ;
-publication_table_list:
- publication_table
+pub_obj_list: PublicationObjSpec
{ $$ = list_make1($1); }
- | publication_table_list ',' publication_table
- { $$ = lappend($1, $3); }
- ;
-
-publication_table: relation_expr
- {
- PublicationTable *n = makeNode(PublicationTable);
- n->relation = $1;
- $$ = (Node *) n;
- }
+ | pub_obj_list ',' PublicationObjSpec
+ { $$ = lappend($1, $3); }
;
/*****************************************************************************
*
* ALTER PUBLICATION name SET ( options )
*
- * ALTER PUBLICATION name ADD TABLE table [, table2]
+ * ALTER PUBLICATION name ADD pub_obj [, ...]
+ *
+ * ALTER PUBLICATION name DROP pub_obj [, ...]
+ *
+ * ALTER PUBLICATION name SET pub_obj [, ...]
*
- * ALTER PUBLICATION name DROP TABLE table [, table2]
+ * pub_obj is one of:
*
- * ALTER PUBLICATION name SET TABLE table [, table2]
+ * TABLE table_name [, ...]
+ * ALL TABLES IN SCHEMA schema_name [, ...]
*
*****************************************************************************/
@@ -9665,28 +9734,31 @@ AlterPublicationStmt:
n->options = $5;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name ADD_P TABLE publication_table_list
+ | ALTER PUBLICATION name ADD_P pub_obj_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
- n->tables = $6;
- n->tableAction = DEFELEM_ADD;
+ n->pubobjects = $5;
+ preprocess_pubobj_list(n->pubobjects, yyscanner);
+ n->action = DEFELEM_ADD;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name SET TABLE publication_table_list
+ | ALTER PUBLICATION name SET pub_obj_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
- n->tables = $6;
- n->tableAction = DEFELEM_SET;
+ n->pubobjects = $5;
+ preprocess_pubobj_list(n->pubobjects, yyscanner);
+ n->action = DEFELEM_SET;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name DROP TABLE publication_table_list
+ | ALTER PUBLICATION name DROP pub_obj_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
- n->tables = $6;
- n->tableAction = DEFELEM_DROP;
+ n->pubobjects = $5;
+ preprocess_pubobj_list(n->pubobjects, yyscanner);
+ n->action = DEFELEM_DROP;
$$ = (Node *)n;
}
;
@@ -12430,7 +12502,14 @@ relation_expr:
$$->inh = true;
$$->alias = NULL;
}
- | qualified_name '*'
+ | extended_relation_expr
+ {
+ $$ = $1;
+ }
+ ;
+
+extended_relation_expr:
+ qualified_name '*'
{
/* inheritance query, explicitly */
$$ = $1;
@@ -15104,28 +15183,7 @@ qualified_name:
}
| ColId indirection
{
- check_qualified_name($2, yyscanner);
- $$ = makeRangeVar(NULL, NULL, @1);
- switch (list_length($2))
- {
- case 1:
- $$->catalogname = NULL;
- $$->schemaname = $1;
- $$->relname = strVal(linitial($2));
- break;
- case 2:
- $$->catalogname = $1;
- $$->schemaname = strVal(linitial($2));
- $$->relname = strVal(lsecond($2));
- break;
- default:
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("improper qualified name (too many dotted names): %s",
- NameListToString(lcons(makeString($1), $2))),
- parser_errposition(@1)));
- break;
- }
+ $$ = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
}
;
@@ -17102,6 +17160,43 @@ makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner)
return r;
}
+/*
+ * Convert a relation_name with name and namelist to a RangeVar using
+ * makeRangeVar.
+ */
+static RangeVar *
+makeRangeVarFromQualifiedName(char *name, List *namelist, int location,
+ core_yyscan_t yyscanner)
+{
+ RangeVar *r;
+
+ check_qualified_name(namelist, yyscanner);
+ r = makeRangeVar(NULL, NULL, location);
+
+ switch (list_length(namelist))
+ {
+ case 1:
+ r->catalogname = NULL;
+ r->schemaname = name;
+ r->relname = strVal(linitial(namelist));
+ break;
+ case 2:
+ r->catalogname = name;
+ r->schemaname = strVal(linitial(namelist));
+ r->relname = strVal(lsecond(namelist));
+ break;
+ default:
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("improper qualified name (too many dotted names): %s",
+ NameListToString(lcons(makeString(name), namelist))),
+ parser_errposition(location));
+ break;
+ }
+
+ return r;
+}
+
/* Separate Constraint nodes from COLLATE clauses in a ColQualList */
static void
SplitColQualList(List *qualList,
@@ -17210,6 +17305,74 @@ processCASbits(int cas_bits, int location, const char *constrType,
}
}
+/*
+ * Process pubobjspec_list to check for errors in any of the objects and
+ * convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType.
+ */
+static void
+preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
+{
+ ListCell *cell;
+ PublicationObjSpec *pubobj;
+ PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_CONTINUATION;
+
+ if (!pubobjspec_list)
+ return;
+
+ pubobj = (PublicationObjSpec *) linitial(pubobjspec_list);
+ if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"),
+ parser_errposition(pubobj->location));
+
+ foreach(cell, pubobjspec_list)
+ {
+ pubobj = (PublicationObjSpec *) lfirst(cell);
+
+ if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION)
+ pubobj->pubobjtype = prevobjtype;
+
+ if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE)
+ {
+ /* relation name or pubtable must be set for this type of object */
+ if (!pubobj->name && !pubobj->pubtable)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid table name at or near"),
+ parser_errposition(pubobj->location));
+ else if (pubobj->name)
+ {
+ /* convert it to PublicationTable */
+ PublicationTable *pubtable = makeNode(PublicationTable);
+ pubtable->relation = makeRangeVar(NULL, pubobj->name,
+ pubobj->location);
+ pubobj->pubtable = pubtable;
+ pubobj->name = NULL;
+ }
+ }
+ else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA ||
+ pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA)
+ {
+ /*
+ * We can distinguish between the different type of schema
+ * objects based on whether name and pubtable is set.
+ */
+ if (pubobj->name)
+ pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA;
+ else if (!pubobj->name && !pubobj->pubtable)
+ pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA;
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid schema name at or near"),
+ parser_errposition(pubobj->location));
+ }
+
+ prevobjtype = pubobj->pubobjtype;
+ }
+}
+
/*----------
* Recursive view transformation
*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd933..6f6a203dea7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx)
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
rel_sync_cache_publication_cb,
(Datum) 0);
+ CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
+ rel_sync_cache_publication_cb,
+ (Datum) 0);
}
/*
@@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
/* Validate the entry */
if (!entry->replicate_valid)
{
+ Oid schemaId = get_rel_namespace(relid);
List *pubids = GetRelationPublications(relid);
+
+ /*
+ * We don't acquire a lock on the namespace system table as we build
+ * the cache entry using a historic snapshot and all the later changes
+ * are absorbed while decoding WAL.
+ */
+ List *schemaPubids = GetSchemaPublications(schemaId);
ListCell *lc;
Oid publish_as_relid = relid;
@@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Oid ancestor = lfirst_oid(lc2);
if (list_member_oid(GetRelationPublications(ancestor),
+ pub->oid) ||
+ list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
pub->oid))
{
ancestor_published = true;
@@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
}
- if (list_member_oid(pubids, pub->oid) || ancestor_published)
+ if (list_member_oid(pubids, pub->oid) ||
+ list_member_oid(schemaPubids, pub->oid) ||
+ ancestor_published)
publish = true;
}
@@ -1343,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
}
/*
- * Publication relation map syscache invalidation callback
+ * Publication relation/schema map syscache invalidation callback
*/
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index b54c9117669..9fa9e671a11 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5550,6 +5550,7 @@ GetRelationPublicationActions(Relation relation)
List *puboids;
ListCell *lc;
MemoryContext oldcxt;
+ Oid schemaid;
PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
/*
@@ -5565,6 +5566,9 @@ GetRelationPublicationActions(Relation relation)
/* Fetch the publication membership info. */
puboids = GetRelationPublications(RelationGetRelid(relation));
+ schemaid = RelationGetNamespace(relation);
+ puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid));
+
if (relation->rd_rel->relispartition)
{
/* Add publications that the ancestors are in too. */
@@ -5577,6 +5581,9 @@ GetRelationPublicationActions(Relation relation)
puboids = list_concat_unique_oid(puboids,
GetRelationPublications(ancestor));
+ schemaid = get_rel_namespace(ancestor);
+ puboids = list_concat_unique_oid(puboids,
+ GetSchemaPublications(schemaid));
}
}
puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index d6cb78dea8d..56870b46e45 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -50,6 +50,7 @@
#include "catalog/pg_partitioned_table.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_range.h"
#include "catalog/pg_replication_origin.h"
@@ -617,6 +618,28 @@ static const struct cachedesc cacheinfo[] = {
},
8
},
+ {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACE */
+ PublicationNamespaceObjectIndexId,
+ 1,
+ {
+ Anum_pg_publication_namespace_oid,
+ 0,
+ 0,
+ 0
+ },
+ 64
+ },
+ {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */
+ PublicationNamespacePnnspidPnpubidIndexId,
+ 2,
+ {
+ Anum_pg_publication_namespace_pnnspid,
+ Anum_pg_publication_namespace_pnpubid,
+ 0,
+ 0
+ },
+ 64
+ },
{PublicationRelationId, /* PUBLICATIONOID */
PublicationObjectIndexId,
1,