diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/Makefile | 4 | ||||
-rw-r--r-- | src/backend/catalog/aclchk.c | 2 | ||||
-rw-r--r-- | src/backend/catalog/dependency.c | 9 | ||||
-rw-r--r-- | src/backend/catalog/objectaddress.c | 149 | ||||
-rw-r--r-- | src/backend/catalog/pg_publication.c | 332 | ||||
-rw-r--r-- | src/backend/commands/alter.c | 1 | ||||
-rw-r--r-- | src/backend/commands/event_trigger.c | 4 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 490 | ||||
-rw-r--r-- | src/backend/commands/seclabel.c | 1 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 28 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 22 | ||||
-rw-r--r-- | src/backend/nodes/equalfuncs.c | 21 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 307 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 19 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 7 | ||||
-rw-r--r-- | src/backend/utils/cache/syscache.c | 23 |
16 files changed, 1305 insertions, 114 deletions
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e773612..4e6efda97f3 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_namespace.h \ + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154ee..ce0a4ff14e3 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3427,6 +3427,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -3566,6 +3567,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_TRANSFORM: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 91c3e976e01..9f8eb1a37fd 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -49,6 +49,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -178,6 +179,7 @@ static const Oid object_classes[] = { ExtensionRelationId, /* OCLASS_EXTENSION */ EventTriggerRelationId, /* OCLASS_EVENT_TRIGGER */ PolicyRelationId, /* OCLASS_POLICY */ + PublicationNamespaceRelationId, /* OCLASS_PUBLICATION_NAMESPACE */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ @@ -1456,6 +1458,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePolicyById(object->objectId); break; + case OCLASS_PUBLICATION_NAMESPACE: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_PUBLICATION_REL: RemovePublicationRelById(object->objectId); break; @@ -2850,6 +2856,9 @@ getObjectClass(const ObjectAddress *object) case PolicyRelationId: return OCLASS_POLICY; + case PublicationNamespaceRelationId: + return OCLASS_PUBLICATION_NAMESPACE; + case PublicationRelationId: return OCLASS_PUBLICATION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 8c94939baa8..2bae3fbb174 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -48,6 +48,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -825,6 +826,10 @@ static const struct object_type_map { "publication", OBJECT_PUBLICATION }, + /* OCLASS_PUBLICATION_NAMESPACE */ + { + "publication namespace", OBJECT_PUBLICATION_NAMESPACE + }, /* OCLASS_PUBLICATION_REL */ { "publication relation", OBJECT_PUBLICATION_REL @@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1113,6 +1120,10 @@ get_object_address(ObjectType objtype, Node *object, address = get_object_address_usermapping(castNode(List, object), missing_ok); break; + case OBJECT_PUBLICATION_NAMESPACE: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_PUBLICATION_REL: address = get_object_address_publication_rel(castNode(List, object), &relation, @@ -1936,6 +1947,49 @@ get_object_address_publication_rel(List *object, } /* + * Find the ObjectAddress for a publication schema. The first element of the + * object parameter is the schema name, the second is the publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + Publication *pub; + char *pubname; + char *schemaname; + Oid schemaid; + + ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaid = get_namespace_oid(schemaname, missing_ok); + if (!OidIsValid(schemaid)) + return address; + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + +/* * Find the ObjectAddress for a default ACL. */ static ObjectAddress @@ -2206,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2849,6 +2905,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) } /* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd strings which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_namespace pnform; + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + pnform = (Form_pg_publication_namespace) GETSTRUCT(tup); + *pubname = get_publication_name(pnform->pnpubid, missing_ok); + if (!(*pubname)) + { + ReleaseSysCache(tup); + return false; + } + + *nspname = get_namespace_name(pnform->pnnspid); + if (!(*nspname)) + { + Oid schemaid = pnform->pnnspid; + + pfree(*pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + schemaid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + +/* * getObjectDescription: build an object description for messages * * The result is a palloc'd string. NULL is returned for an undefined @@ -3872,6 +3977,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; @@ -4473,6 +4594,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication"); break; + case OCLASS_PUBLICATION_NAMESPACE: + appendStringInfoString(&buffer, "publication namespace"); + break; + case OCLASS_PUBLICATION_REL: appendStringInfoString(&buffer, "publication relation"); break; @@ -5683,6 +5808,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9cd0c82f93c..fed83b89a98 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,7 +28,9 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" @@ -77,6 +79,30 @@ check_publication_add_relation(Relation targetrel) } /* + * Check if schema can be in given publication and throw appropriate error if + * not. + */ +static void +check_publication_add_schema(Oid schemaid) +{ + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("This operation is not supported for system schemas."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("Temporary schemas cannot be replicated."))); +} + +/* * Returns if relation represented by oid and Form_pg_class entry * is publishable. * @@ -106,6 +132,53 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) } /* + * Filter out the partitions whose parent tables were also specified in + * the publication. + */ +static List * +filter_partitions(List *relids, List *schemarelids) +{ + List *result = NIL; + ListCell *lc; + ListCell *lc2; + + foreach(lc, relids) + { + bool skip = false; + List *ancestors = NIL; + Oid relid = lfirst_oid(lc); + + if (get_rel_relispartition(relid)) + ancestors = get_partition_ancestors(relid); + + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + /* + * Check if the parent table exists in the published table list. + * + * XXX As of now, we do this if the partition relation or the + * partition relation's ancestor is present in schema publication + * relations. + */ + if (list_member_oid(relids, ancestor) && + (list_member_oid(schemarelids, relid) || + list_member_oid(schemarelids, ancestor))) + { + skip = true; + break; + } + } + + if (!skip) + result = lappend_oid(result, relid); + } + + return result; +} + +/* * Another variant of this, taking a Relation. */ bool @@ -262,6 +335,89 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_namespace]; + bool nulls[Natts_pg_publication_namespace]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaid), pub->name))); + } + + check_publication_add_schema(schemaid); + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId, + Anum_pg_publication_namespace_oid); + values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_namespace_pnpubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_namespace_pnnspid - 1] = + ObjectIdGetDatum(schemaid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * publication_add_relation for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(schemaid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -429,6 +585,151 @@ GetAllTablesPublicationRelations(bool pubviaroot) } /* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all schemas associated with the publication */ + pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_namespace_pnpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, + PublicationNamespacePnnspidPnpubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_namespace pubsch; + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->pnnspid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + +/* + * Gets the list of publication oids associated with a specified schema. + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + +/* + * Get the list of publishable relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + Assert(OidIsValid(schemaid)); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaid); + + /* get all the relations present in the specified schema */ + scan = table_beginscan_catalog(classRel, 1, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + char relkind; + + if (!is_publishable_class(relid, relForm)) + continue; + + relkind = get_rel_relkind(relid); + if (relkind == RELKIND_RELATION) + result = lappend_oid(result, relid); + else if (relkind == RELKIND_PARTITIONED_TABLE) + { + List *partitionrels = NIL; + + /* + * It is quite possible that some of the partitions are in a + * different schema than the parent table, so we need to get such + * partitions separately. + */ + partitionrels = GetPubPartitionOptionRelations(partitionrels, + pub_partopt, + relForm->oid); + result = list_concat_unique_oid(result, partitionrels); + } + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA + * publication. + */ +List * +GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(pubid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + +/* * Get publication using oid * * The Publication struct and its data are palloc'ed here. @@ -555,12 +856,41 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) + { tables = GetAllTablesPublicationRelations(publication->pubviaroot); + } else - tables = GetPublicationRelations(publication->oid, + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + tables = list_concat_unique_oid(relids, schemarelids); + if (schemarelids && publication->pubviaroot) + { + /* + * If the publication publishes partition changes via their + * respective root partitioned tables, we must exclude + * partitions in favor of including the root partitioned + * tables. Otherwise, the function could return both the child + * and parent tables which could cause data of the child table + * to be double-published on the subscriber side. + * + * XXX As of now, we do this when a publication has associated + * schema or for all tables publication. See + * GetAllTablesPublicationRelations(). + */ + tables = filter_partitions(tables, schemarelids); + } + } + funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c47d54e96bb..40044070cf3 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_EVENT_TRIGGER: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e9..df264329d8b 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_POLICY: case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROUTINE: case OBJECT_RULE: @@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_EXTENSION: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c7f91611dc..d1fff13d2e9 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,7 +25,9 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -34,6 +36,7 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" @@ -45,11 +48,16 @@ #include "utils/syscache.h" #include "utils/varlena.h" +static List *OpenReliIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); +static void LockSchemaList(List *schemalist); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -136,6 +144,97 @@ parse_publication_options(ParseState *pstate, } /* + * Convert the PublicationObjSpecType list into schema oid list and + * PublicationTable list. + */ +static void +ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, + List **rels, List **schemas) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + + if (!pubobjspec_list) + return; + + foreach(cell, pubobjspec_list) + { + Oid schemaid; + List *search_path; + + pubobj = (PublicationObjSpec *) lfirst(cell); + + switch (pubobj->pubobjtype) + { + case PUBLICATIONOBJ_TABLE: + *rels = lappend(*rels, pubobj->pubtable); + break; + case PUBLICATIONOBJ_REL_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_CURRSCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + default: + /* shouldn't happen */ + elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype); + break; + } + } +} + +/* + * Check if any of the given relation's schema is a member of the given schema + * list. + */ +static void +CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, + PublicationObjSpecType checkobjtype) +{ + ListCell *lc; + + foreach(lc, rels) + { + PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); + Relation rel = pub_rel->relation; + Oid relSchemaId = RelationGetNamespace(rel); + + if (list_member_oid(schemaidlist, relSchemaId)) + { + if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); + } + } +} + +/* * Create new publication. */ ObjectAddress @@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; + List *relations = NIL; + List *schemaidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - if (stmt->tables) - { - List *rels; - - Assert(list_length(stmt->tables) > 0); - - rels = OpenTableList(stmt->tables); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); - } - else if (stmt->for_all_tables) + /* Associate objects with the publication. */ + if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } + else + { + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + /* FOR ALL TABLES IN SCHEMA requires superuser */ + if (list_length(schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); + + if (list_length(relations) > 0) + { + List *rels; + + rels = OpenTableList(relations); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); + } + + if (list_length(schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); + } + } table_close(rel, RowExclusiveLock); @@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, } else { + List *relids = NIL; + List *schemarelids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); InvalidatePublicationRels(relids); } @@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids) * Add or remove table to/from publication. */ static void -AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, - HeapTuple tup) +AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!tables && stmt->action != DEFELEM_SET) + return; - Assert(list_length(stmt->tables) > 0); + rels = OpenTableList(tables); - rels = OpenTableList(stmt->tables); + if (stmt->action == DEFELEM_ADD) + { + List *schemas = NIL; - if (stmt->tableAction == DEFELEM_ADD) + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + } + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { @@ -441,10 +582,110 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, } /* + * Alter the publication schemas. + * + * Add or remove schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, + HeapTuple tup, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* + * It is quite possible that for the SET case user has not specified any + * schemas in which case we need to remove all the existing schemas. + */ + if (!schemaidlist && stmt->action != DEFELEM_SET) + return; + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + if (stmt->action == DEFELEM_ADD) + { + List *rels; + List *reloids; + + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + rels = OpenReliIdList(reloids); + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_REL_IN_SCHEMA); + + CloseTableList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + } + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaidlist); + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(delschemas); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + } +} + +/* + * Check if relations and schemas can be in a given publication and throw + * appropriate error if not. + */ +static void +CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + schemaidlist && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* + * Check that user is allowed to manipulate the publication tables in + * schema + */ + if (schemaidlist && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (tables && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); +} + +/* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -474,7 +715,41 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); else - AlterPublicationTables(stmt, rel, tup); + { + List *relations = NIL; + List *schemaidlist = NIL; + + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + CheckAlterPublication(stmt, tup, relations, schemaidlist); + + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent alter to add table(s) that were already going + * to become part of the publication by adding corresponding schema(s) + * via this command and similarly it will prevent the concurrent + * addition of schema(s) for which there is any corresponding table + * being added by this command. + */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessExclusiveLock); + + /* + * It is possible that by the time we acquire the lock on publication, + * concurrent DDL has removed it. We can test this by checking the + * existence of publication. + */ + if (!SearchSysCacheExists1(PUBLICATIONOID, + ObjectIdGetDatum(pubform->oid))) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", + stmt->pubname)); + + AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationSchemas(stmt, tup, schemaidlist); + } /* Cleanup. */ heap_freetuple(tup); @@ -552,9 +827,71 @@ RemovePublicationById(Oid pubid) } /* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_namespace pubsch; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * RemovePublicationRelById for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * Open relations specified by a relid list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. + */ +static List * +OpenReliIdList(List *relids) +{ + ListCell *lc; + List *rels = NIL; + + foreach(lc, relids) + { + PublicationRelInfo *pub_rel; + Oid relid = lfirst_oid(lc); + Relation rel = table_open(relid, + ShareUpdateExclusiveLock); + + pub_rel = palloc(sizeof(PublicationRelInfo)); + pub_rel->relation = rel; + rels = lappend(rels, pub_rel); + } + + return rels; +} + +/* * Open relations specified by a PublicationTable list. - * In the returned list of PublicationRelInfo, tables are locked - * in ShareUpdateExclusiveLock mode in order to add them to a publication. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -659,6 +996,35 @@ CloseTableList(List *rels) } /* + * Lock the schemas specified in the schema list in AccessShareLock mode in + * order to prevent concurrent schema deletion. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + + foreach(lc, schemalist) + { + Oid schemaid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock); + + /* + * It is possible that by the time we acquire the lock on schema, + * concurrent DDL has removed it. We can test this by checking the + * existence of schema. + */ + if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid))) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema with OID %u does not exist", schemaid)); + } +} + +/* * Add listed tables to the publication. */ static void @@ -728,6 +1094,68 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } /* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + ObjectAddress obj; + + obj = publication_add_schema(pubid, schemaid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationNamespaceRelationId, + obj.objectId, 0); + } + } +} + +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tables from schema \"%s\" are not part of the publication", + get_namespace_name(schemaid)))); + } + + ObjectAddressSet(obj, PublicationNamespaceRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + +/* * Internal workhorse for changing a publication owner */ static void diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index 308e0adb553..53c18628a7d 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPERATOR: case OBJECT_OPFAMILY: case OBJECT_POLICY: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1a2f159f24e..857cc5ce6e2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EXTENSION: case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); + /* + * Check that setting the relation to a different schema won't result in a + * publication having both a schema and the same schema's table, as this + * is not supported. + */ + if (stmt->objectType == OBJECT_TABLE) + { + ListCell *lc; + List *schemaPubids = GetSchemaPublications(nspOid); + List *relPubids = GetRelationPublications(RelationGetRelid(rel)); + + foreach(lc, relPubids) + { + Oid pubid = lfirst_oid(lc); + + if (list_member_oid(schemaPubids, pubid)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move table \"%s\" to schema \"%s\"", + RelationGetRelationName(rel), stmt->newschema), + errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".", + stmt->newschema, + RelationGetRelationName(rel), + get_publication_name(pubid, false))); + } + } + /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 70e9e54d3e5..82464c98896 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4810,6 +4810,19 @@ _copyPartitionCmd(const PartitionCmd *from) return newnode; } +static PublicationObjSpec * +_copyPublicationObject(const PublicationObjSpec *from) +{ + PublicationObjSpec *newnode = makeNode(PublicationObjSpec); + + COPY_SCALAR_FIELD(pubobjtype); + COPY_STRING_FIELD(name); + COPY_NODE_FIELD(pubtable); + COPY_LOCATION_FIELD(location); + + return newnode; +} + static PublicationTable * _copyPublicationTable(const PublicationTable *from) { @@ -4827,7 +4840,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); return newnode; @@ -4840,9 +4853,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } @@ -5887,6 +5900,9 @@ copyObjectImpl(const void *from) case T_PartitionCmd: retval = _copyPartitionCmd(from); break; + case T_PublicationObjSpec: + retval = _copyPublicationObject(from); + break; case T_PublicationTable: retval = _copyPublicationTable(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 19eff201024..f537d3eb968 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2297,6 +2297,18 @@ _equalAlterTSConfigurationStmt(const AlterTSConfigurationStmt *a, } static bool +_equalPublicationObject(const PublicationObjSpec *a, + const PublicationObjSpec *b) +{ + COMPARE_SCALAR_FIELD(pubobjtype); + COMPARE_STRING_FIELD(name); + COMPARE_NODE_FIELD(pubtable); + COMPARE_LOCATION_FIELD(location); + + return true; +} + +static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); @@ -2310,7 +2322,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); return true; @@ -2322,9 +2334,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } @@ -3894,6 +3906,9 @@ equal(const void *a, const void *b) case T_PartitionCmd: retval = _equalPartitionCmd(a, b); break; + case T_PublicationObjSpec: + retval = _equalPublicationObject(a, b); + break; case T_PublicationTable: retval = _equalPublicationTable(a, b); break; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 08f1bf1031c..d0eb80e69cb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -195,12 +195,17 @@ static Node *makeXmlExpr(XmlExprOp op, char *name, List *named_args, static List *mergeTableFuncParameters(List *func_args, List *columns); static TypeName *TableFuncTypeName(List *columns); static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner); +static RangeVar *makeRangeVarFromQualifiedName(char *name, List *rels, + int location, + core_yyscan_t yyscanner); static void SplitColQualList(List *qualList, List **constraintList, CollateClause **collClause, core_yyscan_t yyscanner); static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); +static void preprocess_pubobj_list(List *pubobjspec_list, + core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %} @@ -256,6 +261,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + PublicationObjSpec *publicationobjectspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -425,14 +431,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list publication_table_list + drop_option_list pub_obj_list %type <node> opt_routine_body %type <groupclause> group_clause %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables publication_table %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -517,6 +522,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> table_ref %type <jexpr> joined_table %type <range> relation_expr +%type <range> extended_relation_expr %type <range> relation_expr_opt_alias %type <node> tablesample_clause opt_repeatable_clause %type <target> target_el set_target insert_column_item @@ -553,6 +559,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <str> createdb_opt_name plassign_target %type <node> var_value zone_value %type <rolespec> auth_ident RoleSpec opt_granted_by +%type <publicationobjectspec> PublicationObjSpec %type <keyword> unreserved_keyword type_func_name_keyword %type <keyword> col_name_keyword reserved_keyword @@ -9591,69 +9598,131 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] + * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] + * + * pub_obj is one of: + * + * TABLE table [, ...] + * ALL TABLES IN SCHEMA schema [, ...] * *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR pub_obj_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $6; + n->pubobjects = (List *)$5; + preprocess_pubobj_list(n->pubobjects, yyscanner); $$ = (Node *)n; } ; -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE publication_table_list +/* + * FOR TABLE and FOR ALL TABLES IN SCHEMA specifications + * + * This rule parses publication objects with and without keyword prefixes. + * + * The actual type of the object without keyword prefix depends on the previous + * one with keyword prefix. It will be preprocessed in preprocess_pubobj_list(). + * + * For the object without keyword prefix, we cannot just use relation_expr here, + * because some extended expressions in relation_expr cannot be used as a + * schemaname and we cannot differentiate it. So, we extract the rules from + * relation_expr here. + */ +PublicationObjSpec: + TABLE relation_expr { - $$ = (Node *) $3; + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; } - | FOR ALL TABLES + | ALL TABLES IN_P SCHEMA ColId { - $$ = (Node *) makeInteger(true); + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + $$->name = $5; + $$->location = @5; } - ; + | ALL TABLES IN_P SCHEMA CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + $$->location = @5; + } + | ColId + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->name = $1; + $$->location = @1; + } + | ColId indirection + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->location = @1; + } + /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ + | extended_relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $1; + } + | CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->location = @1; + } + ; -publication_table_list: - publication_table +pub_obj_list: PublicationObjSpec { $$ = list_make1($1); } - | publication_table_list ',' publication_table - { $$ = lappend($1, $3); } - ; - -publication_table: relation_expr - { - PublicationTable *n = makeNode(PublicationTable); - n->relation = $1; - $$ = (Node *) n; - } + | pub_obj_list ',' PublicationObjSpec + { $$ = lappend($1, $3); } ; /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) * - * ALTER PUBLICATION name ADD TABLE table [, table2] + * ALTER PUBLICATION name ADD pub_obj [, ...] + * + * ALTER PUBLICATION name DROP pub_obj [, ...] + * + * ALTER PUBLICATION name SET pub_obj [, ...] * - * ALTER PUBLICATION name DROP TABLE table [, table2] + * pub_obj is one of: * - * ALTER PUBLICATION name SET TABLE table [, table2] + * TABLE table_name [, ...] + * ALL TABLES IN SCHEMA schema_name [, ...] * *****************************************************************************/ @@ -9665,28 +9734,31 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE publication_table_list + | ALTER PUBLICATION name ADD_P pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE publication_table_list + | ALTER PUBLICATION name SET pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_SET; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE publication_table_list + | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -12430,7 +12502,14 @@ relation_expr: $$->inh = true; $$->alias = NULL; } - | qualified_name '*' + | extended_relation_expr + { + $$ = $1; + } + ; + +extended_relation_expr: + qualified_name '*' { /* inheritance query, explicitly */ $$ = $1; @@ -15104,28 +15183,7 @@ qualified_name: } | ColId indirection { - check_qualified_name($2, yyscanner); - $$ = makeRangeVar(NULL, NULL, @1); - switch (list_length($2)) - { - case 1: - $$->catalogname = NULL; - $$->schemaname = $1; - $$->relname = strVal(linitial($2)); - break; - case 2: - $$->catalogname = $1; - $$->schemaname = strVal(linitial($2)); - $$->relname = strVal(lsecond($2)); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("improper qualified name (too many dotted names): %s", - NameListToString(lcons(makeString($1), $2))), - parser_errposition(@1))); - break; - } + $$ = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); } ; @@ -17102,6 +17160,43 @@ makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner) return r; } +/* + * Convert a relation_name with name and namelist to a RangeVar using + * makeRangeVar. + */ +static RangeVar * +makeRangeVarFromQualifiedName(char *name, List *namelist, int location, + core_yyscan_t yyscanner) +{ + RangeVar *r; + + check_qualified_name(namelist, yyscanner); + r = makeRangeVar(NULL, NULL, location); + + switch (list_length(namelist)) + { + case 1: + r->catalogname = NULL; + r->schemaname = name; + r->relname = strVal(linitial(namelist)); + break; + case 2: + r->catalogname = name; + r->schemaname = strVal(linitial(namelist)); + r->relname = strVal(lsecond(namelist)); + break; + default: + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(lcons(makeString(name), namelist))), + parser_errposition(location)); + break; + } + + return r; +} + /* Separate Constraint nodes from COLLATE clauses in a ColQualList */ static void SplitColQualList(List *qualList, @@ -17210,6 +17305,74 @@ processCASbits(int cas_bits, int location, const char *constrType, } } +/* + * Process pubobjspec_list to check for errors in any of the objects and + * convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType. + */ +static void +preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_CONTINUATION; + + if (!pubobjspec_list) + return; + + pubobj = (PublicationObjSpec *) linitial(pubobjspec_list); + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"), + parser_errposition(pubobj->location)); + + foreach(cell, pubobjspec_list) + { + pubobj = (PublicationObjSpec *) lfirst(cell); + + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + pubobj->pubobjtype = prevobjtype; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* relation name or pubtable must be set for this type of object */ + if (!pubobj->name && !pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid table name at or near"), + parser_errposition(pubobj->location)); + else if (pubobj->name) + { + /* convert it to PublicationTable */ + PublicationTable *pubtable = makeNode(PublicationTable); + pubtable->relation = makeRangeVar(NULL, pubobj->name, + pubobj->location); + pubobj->pubtable = pubtable; + pubobj->name = NULL; + } + } + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA || + pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) + { + /* + * We can distinguish between the different type of schema + * objects based on whether name and pubtable is set. + */ + if (pubobj->name) + pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + else if (!pubobj->name && !pubobj->pubtable) + pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + else + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid schema name at or near"), + parser_errposition(pubobj->location)); + } + + prevobjtype = pubobj->pubobjtype; + } +} + /*---------- * Recursive view transformation * diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd933..6f6a203dea7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; @@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } } - if (list_member_oid(pubids, pub->oid) || ancestor_published) + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) publish = true; } @@ -1343,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } /* - * Publication relation map syscache invalidation callback + * Publication relation/schema map syscache invalidation callback */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index b54c9117669..9fa9e671a11 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5550,6 +5550,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5565,6 +5566,9 @@ GetRelationPublicationActions(Relation relation) /* Fetch the publication membership info. */ puboids = GetRelationPublications(RelationGetRelid(relation)); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5577,6 +5581,9 @@ GetRelationPublicationActions(Relation relation) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + schemaid = get_rel_namespace(ancestor); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid)); } } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8d..56870b46e45 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,7 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" @@ -617,6 +618,28 @@ static const struct cachedesc cacheinfo[] = { }, 8 }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACE */ + PublicationNamespaceObjectIndexId, + 1, + { + Anum_pg_publication_namespace_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ + PublicationNamespacePnnspidPnpubidIndexId, + 2, + { + Anum_pg_publication_namespace_pnnspid, + Anum_pg_publication_namespace_pnpubid, + 0, + 0 + }, + 64 + }, {PublicationRelationId, /* PUBLICATIONOID */ PublicationObjectIndexId, 1, |