diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/objectaddress.c | 44 | ||||
-rw-r--r-- | src/backend/catalog/pg_publication.c | 328 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 10 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 424 | ||||
-rw-r--r-- | src/backend/commands/sequence.c | 154 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 101 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 27 | ||||
-rw-r--r-- | src/backend/executor/execReplication.c | 4 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/nodes/equalfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 52 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 52 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 109 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 79 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 28 | ||||
-rw-r--r-- | src/backend/utils/cache/syscache.c | 6 |
17 files changed, 1328 insertions, 154 deletions
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index d7ce063997a..3fd17ea64f0 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -1930,12 +1930,14 @@ get_object_address_publication_schema(List *object, bool missing_ok) char *pubname; char *schemaname; Oid schemaid; + char *objtype; ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); /* Fetch schema name and publication name from input list */ schemaname = strVal(linitial(object)); pubname = strVal(lsecond(object)); + objtype = strVal(lthird(object)); schemaid = get_namespace_oid(schemaname, missing_ok); if (!OidIsValid(schemaid)) @@ -1948,10 +1950,12 @@ get_object_address_publication_schema(List *object, bool missing_ok) /* Find the publication schema mapping in syscache */ address.objectId = - GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pub->oid)); + ObjectIdGetDatum(pub->oid), + CharGetDatum(objtype[0])); + if (!OidIsValid(address.objectId) && !missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -2232,7 +2236,6 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: - case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2257,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) /* fall through to check args length */ /* FALLTHROUGH */ case OBJECT_OPERATOR: + case OBJECT_PUBLICATION_NAMESPACE: if (list_length(args) != 2) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -2327,6 +2331,8 @@ pg_get_object_address(PG_FUNCTION_ARGS) objnode = (Node *) list_make2(name, linitial(args)); break; case OBJECT_PUBLICATION_NAMESPACE: + objnode = (Node *) list_make3(linitial(name), linitial(args), lsecond(args)); + break; case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2881,11 +2887,12 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) * * Get publication name and schema name from the object address into pubname and * nspname. Both pubname and nspname are palloc'd strings which will be freed by - * the caller. + * the caller. The last parameter specifies which object type is included from + * the schema. */ static bool getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, - char **pubname, char **nspname) + char **pubname, char **nspname, char **objtype) { HeapTuple tup; Form_pg_publication_namespace pnform; @@ -2921,6 +2928,13 @@ getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, return false; } + /* + * The type is always a single character, but we need to pass it as a string, + * so allocate two charaters and set the first one. The second one is \0. + */ + *objtype = palloc0(2); + *objtype[0] = pnform->pntype; + ReleaseSysCache(tup); return true; } @@ -3926,15 +3940,17 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) { char *pubname; char *nspname; + char *objtype; if (!getPublicationSchemaInfo(object, missing_ok, - &pubname, &nspname)) + &pubname, &nspname, &objtype)) break; - appendStringInfo(&buffer, _("publication of schema %s in publication %s"), - nspname, pubname); + appendStringInfo(&buffer, _("publication of schema %s in publication %s type %s"), + nspname, pubname, objtype); pfree(pubname); pfree(nspname); + pfree(objtype); break; } @@ -5729,18 +5745,24 @@ getObjectIdentityParts(const ObjectAddress *object, { char *pubname; char *nspname; + char *objtype; if (!getPublicationSchemaInfo(object, missing_ok, &pubname, - &nspname)) + &nspname, &objtype)) break; - appendStringInfo(&buffer, "%s in publication %s", - nspname, pubname); + appendStringInfo(&buffer, "%s in publication %s type %s", + nspname, pubname, objtype); if (objargs) *objargs = list_make1(pubname); else pfree(pubname); + if (objargs) + *objargs = lappend(*objargs, objtype); + else + pfree(objtype); + if (objname) *objname = list_make1(nspname); else diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 789b895db89..5bcfc94e2ba 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -52,9 +52,10 @@ static void check_publication_add_relation(Relation targetrel) { - /* Must be a regular or partitioned table */ + /* Must be a regular or partitioned table, or a sequence */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && - RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE && + RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", @@ -131,7 +132,8 @@ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { return (reltuple->relkind == RELKIND_RELATION || - reltuple->relkind == RELKIND_PARTITIONED_TABLE) && + reltuple->relkind == RELKIND_PARTITIONED_TABLE || + reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -177,6 +179,52 @@ filter_partitions(List *relids) } /* + * Check the character is a valid object type for schema publication. + * + * This recognizes either 't' for tables or 's' for sequences. Places that + * need to handle 'u' for unsupported relkinds need to do that explicitlyl + */ +static void +AssertObjectTypeValid(char objectType) +{ +#ifdef USE_ASSERT_CHECKING + Assert(objectType == PUB_OBJTYPE_SEQUENCE || objectType == PUB_OBJTYPE_TABLE); +#endif +} + +/* + * Determine object type given the object type set for a schema. + */ +char +pub_get_object_type_for_relkind(char relkind) +{ + /* sequence maps directly to sequence relkind */ + if (relkind == RELKIND_SEQUENCE) + return PUB_OBJTYPE_SEQUENCE; + + /* for table, we match either regular or partitioned table */ + if (relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE) + return PUB_OBJTYPE_TABLE; + + return PUB_OBJTYPE_UNSUPPORTED; +} + +/* + * Determine if publication object type matches the relkind. + * + * Returns true if the relation matches object type replicated by this schema, + * false otherwise. + */ +static bool +pub_object_type_matches_relkind(char objectType, char relkind) +{ + AssertObjectTypeValid(objectType); + + return (pub_get_object_type_for_relkind(relkind) == objectType); +} + +/* * Another variant of this, taking a Relation. */ bool @@ -205,7 +253,7 @@ is_schema_publication(Oid pubid) ObjectIdGetDatum(pubid)); scan = systable_beginscan(pubschsrel, - PublicationNamespacePnnspidPnpubidIndexId, + PublicationNamespacePnnspidPnpubidPntypeIndexId, true, NULL, 1, &scankey); tup = systable_getnext(scan); result = HeapTupleIsValid(tup); @@ -313,7 +361,9 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level } else { - aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); + /* we only search for ancestors of tables, so PUB_OBJTYPE_TABLE */ + aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor), + PUB_OBJTYPE_TABLE); if (list_member_oid(aschemaPubids, puboid)) { topmost_relid = ancestor; @@ -436,7 +486,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, * Insert new publication / schema mapping. */ ObjectAddress -publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +publication_add_schema(Oid pubid, Oid schemaid, char objectType, bool if_not_exists) { Relation rel; HeapTuple tup; @@ -448,6 +498,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) ObjectAddress myself, referenced; + AssertObjectTypeValid(objectType); + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); /* @@ -455,9 +507,10 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ - if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + if (SearchSysCacheExists3(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid))) + ObjectIdGetDatum(pubid), + CharGetDatum(objectType))) { table_close(rel, RowExclusiveLock); @@ -483,6 +536,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) ObjectIdGetDatum(pubid); values[Anum_pg_publication_namespace_pnnspid - 1] = ObjectIdGetDatum(schemaid); + values[Anum_pg_publication_namespace_pntype - 1] = + CharGetDatum(objectType); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -508,7 +563,7 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) * publication_add_relation for why we need to consider all the * partitions. */ - schemaRels = GetSchemaPublicationRelations(schemaid, + schemaRels = GetSchemaPublicationRelations(schemaid, objectType, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -542,11 +597,14 @@ GetRelationPublications(Oid relid) /* * Gets list of relation oids for a publication. * - * This should only be used FOR TABLE publications, the FOR ALL TABLES - * should use GetAllTablesPublicationRelations(). + * This should only be used FOR TABLE / FOR SEQUENCE publications, the FOR + * ALL TABLES / SEQUENCES should use GetAllTablesPublicationRelations() + * and GetAllSequencesPublicationRelations(). + * + * XXX pub_partopt only matters for tables, not sequences. */ List * -GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt) { List *result; Relation pubrelsrel; @@ -554,6 +612,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) SysScanDesc scan; HeapTuple tup; + AssertObjectTypeValid(objectType); + /* Find all publications associated with the relation. */ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); @@ -568,11 +628,29 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { + char relkind; Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = GetPubPartitionOptionRelations(result, pub_partopt, - pubrel->prrelid); + relkind = get_rel_relkind(pubrel->prrelid); + + /* + * If the relkind does not match the requested object type, ignore the + * relation. For example we might be interested only in sequences, so + * we ignore tables. + */ + if (!pub_object_type_matches_relkind(objectType, relkind)) + continue; + + /* + * We don't have partitioned sequences, so just add them to the list. + * Otherwise consider adding all child relations, if requested. + */ + if (relkind == RELKIND_SEQUENCE) + result = lappend_oid(result, pubrel->prrelid); + else + result = GetPubPartitionOptionRelations(result, pub_partopt, + pubrel->prrelid); } systable_endscan(scan); @@ -623,6 +701,43 @@ GetAllTablesPublications(void) } /* + * Gets list of publication oids for publications marked as FOR ALL SEQUENCES. + */ +List * +GetAllSequencesPublications(void) +{ + List *result; + Relation rel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications that are marked as for all sequences. */ + rel = table_open(PublicationRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_puballsequences, + BTEqualStrategyNumber, F_BOOLEQ, + BoolGetDatum(true)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; + + result = lappend_oid(result, oid); + } + + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return result; +} + +/* * Gets list of all relation published by FOR ALL TABLES publication(s). * * If the publication publishes partition changes via their respective root @@ -688,28 +803,38 @@ GetAllTablesPublicationRelations(bool pubviaroot) /* * Gets the list of schema oids for a publication. * - * This should only be used FOR ALL TABLES IN SCHEMA publications. + * This should only be used FOR ALL TABLES IN SCHEMA and FOR ALL SEQUENCES + * publications. + * + * 'objectType' determines whether to get FOR TABLE or FOR SEQUENCES schemas */ List * -GetPublicationSchemas(Oid pubid) +GetPublicationSchemas(Oid pubid, char objectType) { List *result = NIL; Relation pubschsrel; - ScanKeyData scankey; + ScanKeyData scankey[2]; SysScanDesc scan; HeapTuple tup; + AssertObjectTypeValid(objectType); + /* Find all schemas associated with the publication */ pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); - ScanKeyInit(&scankey, + ScanKeyInit(&scankey[0], Anum_pg_publication_namespace_pnpubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); + ScanKeyInit(&scankey[1], + Anum_pg_publication_namespace_pntype, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(objectType)); + scan = systable_beginscan(pubschsrel, - PublicationNamespacePnnspidPnpubidIndexId, - true, NULL, 1, &scankey); + PublicationNamespacePnnspidPnpubidPntypeIndexId, + true, NULL, 2, scankey); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_publication_namespace pubsch; @@ -727,14 +852,26 @@ GetPublicationSchemas(Oid pubid) /* * Gets the list of publication oids associated with a specified schema. + * + * objectType specifies whether we're looking for schemas including tables or + * sequences. + * + * Note: relcache calls this for all object types, not just tables and sequences. + * Which is why we handle the PUB_OBJTYPE_UNSUPPORTED object type too. */ List * -GetSchemaPublications(Oid schemaid) +GetSchemaPublications(Oid schemaid, char objectType) { List *result = NIL; CatCList *pubschlist; int i; + /* unsupported object type */ + if (objectType == PUB_OBJTYPE_UNSUPPORTED) + return result; + + AssertObjectTypeValid(objectType); + /* Find all publications associated with the schema */ pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid)); @@ -742,6 +879,11 @@ GetSchemaPublications(Oid schemaid) { HeapTuple tup = &pubschlist->members[i]->tuple; Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + char pntype = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pntype; + + /* Skip schemas publishing a different object type. */ + if (pntype != objectType) + continue; result = lappend_oid(result, pubid); } @@ -753,9 +895,13 @@ GetSchemaPublications(Oid schemaid) /* * Get the list of publishable relation oids for a specified schema. + * + * objectType specifies whether this is FOR ALL TABLES IN SCHEMA or FOR ALL + * SEQUENCES IN SCHEMA */ List * -GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +GetSchemaPublicationRelations(Oid schemaid, char objectType, + PublicationPartOpt pub_partopt) { Relation classRel; ScanKeyData key[1]; @@ -764,6 +910,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) List *result = NIL; Assert(OidIsValid(schemaid)); + AssertObjectTypeValid(objectType); classRel = table_open(RelationRelationId, AccessShareLock); @@ -784,9 +931,16 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) continue; relkind = get_rel_relkind(relid); - if (relkind == RELKIND_RELATION) - result = lappend_oid(result, relid); - else if (relkind == RELKIND_PARTITIONED_TABLE) + + /* Skip if the relkind does not match FOR ALL TABLES / SEQUENCES. */ + if (!pub_object_type_matches_relkind(objectType, relkind)) + continue; + + /* + * If the object is a partitioned table, lookup all the child relations + * (if requested). Otherwise just add the object to the list. + */ + if (relkind == RELKIND_PARTITIONED_TABLE) { List *partitionrels = NIL; @@ -799,7 +953,11 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) pub_partopt, relForm->oid); result = list_concat_unique_oid(result, partitionrels); + continue; } + + /* non-partitioned tables and sequences */ + result = lappend_oid(result, relid); } table_endscan(scan); @@ -809,21 +967,25 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) /* * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA - * publication. + * or FOR ALL SEQUENCES IN SCHEMA publication. */ List * -GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetAllSchemaPublicationRelations(Oid pubid, char objectType, + PublicationPartOpt pub_partopt) { List *result = NIL; - List *pubschemalist = GetPublicationSchemas(pubid); + List *pubschemalist = GetPublicationSchemas(pubid, objectType); ListCell *cell; + AssertObjectTypeValid(objectType); + foreach(cell, pubschemalist) { Oid schemaid = lfirst_oid(cell); List *schemaRels = NIL; - schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + schemaRels = GetSchemaPublicationRelations(schemaid, objectType, + pub_partopt); result = list_concat(result, schemaRels); } @@ -831,6 +993,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) } /* + * Gets list of all relation published by FOR ALL SEQUENCES publication(s). + */ +List * +GetAllSequencesPublicationRelations(void) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_SEQUENCE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + table_close(classRel, AccessShareLock); + return result; +} + +/* * Get publication using oid * * The Publication struct and its data are palloc'ed here. @@ -852,10 +1050,12 @@ GetPublication(Oid pubid) pub->oid = pubid; pub->name = pstrdup(NameStr(pubform->pubname)); pub->alltables = pubform->puballtables; + pub->allsequences = pubform->puballsequences; pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; + pub->pubactions.pubsequence = pubform->pubsequence; pub->pubviaroot = pubform->pubviaroot; ReleaseSysCache(tup); @@ -966,10 +1166,12 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) *schemarelids; relids = GetPublicationRelations(publication->oid, + PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(publication->oid, + PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); @@ -1005,3 +1207,71 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + List *sequences; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + else + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + sequences = list_concat_unique_oid(relids, schemarelids); + } + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bd48ee7bd25..9ac8e9a2998 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -374,6 +374,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 1aad2e769cb..f890d3f0baa 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/htup_details.h" +#include "access/relation.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -67,15 +68,17 @@ typedef struct rf_context } rf_context; static List *OpenRelIdList(List *relids); -static List *OpenTableList(List *tables); -static void CloseTableList(List *rels); +static List *OpenRelationList(List *rels, char objectType); +static void CloseRelationList(List *rels); static void LockSchemaList(List *schemalist); -static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +static void PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); -static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); -static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt); -static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); +static void PublicationDropRelations(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, char objectType, + bool missing_ok); + static void parse_publication_options(ParseState *pstate, @@ -95,6 +98,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; + pubactions->pubsequence = true; *publish_via_partition_root = false; /* Parse options */ @@ -119,6 +123,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = false; pubactions->pubdelete = false; pubactions->pubtruncate = false; + pubactions->pubsequence = false; *publish_given = true; publish = defGetString(defel); @@ -141,6 +146,8 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; + else if (strcmp(publish_opt, "sequence") == 0) + pubactions->pubsequence = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -167,7 +174,9 @@ parse_publication_options(ParseState *pstate, */ static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, - List **rels, List **schemas) + List **tables, List **sequences, + List **tables_schemas, List **sequences_schemas, + List **schemas) { ListCell *cell; PublicationObjSpec *pubobj; @@ -185,12 +194,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: - *rels = lappend(*rels, pubobj->pubtable); + *tables = lappend(*tables, pubobj->pubtable); + break; + case PUBLICATIONOBJ_SEQUENCE: + *sequences = lappend(*sequences, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: schemaid = get_namespace_oid(pubobj->name, false); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA: @@ -204,6 +224,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, list_free(search_path); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; default: @@ -240,6 +275,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", RelationGetRelationName(rel), get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Sequence \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); else if (checkobjtype == PUBLICATIONOBJ_TABLE) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -248,6 +291,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, RelationGetRelationName(rel)), errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Sequence's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); } } } @@ -615,6 +666,7 @@ TransformPubWhereClauses(List *tables, const char *queryString, ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) { + ListCell *lc; Relation rel; ObjectAddress myself; Oid puboid; @@ -626,9 +678,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; + bool for_all_tables = false; + bool for_all_sequences = false; + + /* Translate the list of object types (represented by strings) to bool flags. */ + foreach (lc, stmt->for_all_objects) + { + char *val = strVal(lfirst(lc)); + if (strcmp(val, "tables") == 0) + for_all_tables = true; + else if (strcmp(val, "sequences") == 0) + for_all_sequences = true; + } + /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) @@ -636,7 +704,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) get_database_name(MyDatabaseId)); /* FOR ALL TABLES requires superuser */ - if (stmt->for_all_tables && !superuser()) + if (for_all_tables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); @@ -672,7 +740,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) Anum_pg_publication_oid); values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); values[Anum_pg_publication_puballtables - 1] = - BoolGetDatum(stmt->for_all_tables); + BoolGetDatum(for_all_tables); + values[Anum_pg_publication_puballsequences - 1] = + BoolGetDatum(for_all_sequences); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = @@ -681,6 +751,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubsequence - 1] = + BoolGetDatum(pubactions.pubsequence); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -698,45 +770,88 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CommandCounterIncrement(); /* Associate objects with the publication. */ - if (stmt->for_all_tables) + if (for_all_tables || for_all_sequences) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } - else + + /* + * If the publication might have either tables or sequences (directly or + * through a schema), process that. + */ + if (!for_all_tables || !for_all_sequences) { - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); /* FOR ALL TABLES IN SCHEMA requires superuser */ - if (list_length(schemaidlist) > 0 && !superuser()) + if (list_length(tables_schemaidlist) > 0 && !superuser()) ereport(ERROR, errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); - if (list_length(relations) > 0) + /* FOR ALL SEQUENCES IN SCHEMA requires superuser */ + if (list_length(sequences_schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL SEQUENCES IN SCHEMA publication")); + + /* tables added directly */ + if (list_length(tables) > 0) { List *rels; - rels = OpenTableList(relations); - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); + CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* sequences added directly */ + if (list_length(sequences) > 0) + { + List *rels; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* tables added through a schema */ + if (list_length(tables_schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(tables_schemaidlist); + PublicationAddSchemas(puboid, + tables_schemaidlist, PUB_OBJTYPE_TABLE, + true, NULL); } - if (list_length(schemaidlist) > 0) + /* sequences added through a schema */ + if (list_length(sequences_schemaidlist) > 0) { /* * Schema lock is held until the publication is created to prevent * concurrent schema deletion. */ - LockSchemaList(schemaidlist); - PublicationAddSchemas(puboid, schemaidlist, true, NULL); + LockSchemaList(sequences_schemaidlist); + PublicationAddSchemas(puboid, + sequences_schemaidlist, PUB_OBJTYPE_SEQUENCE, + true, NULL); } } @@ -799,6 +914,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, AccessShareLock); root_relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); foreach(lc, root_relids) @@ -857,6 +973,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; + + values[Anum_pg_publication_pubsequence - 1] = BoolGetDatum(pubactions.pubsequence); + replaces[Anum_pg_publication_pubsequence - 1] = true; } if (publish_via_partition_root_given) @@ -876,7 +995,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) { CacheInvalidateRelcacheAll(); } @@ -892,6 +1011,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, */ if (root_relids == NIL) relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ALL); else { @@ -905,7 +1025,20 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, lfirst_oid(lc)); } + /* tables */ + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); + + /* sequences */ + relids = list_concat_unique_oid(relids, + GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ALL)); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -960,7 +1093,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!tables && stmt->action != AP_SetObjects) return; - rels = OpenTableList(tables); + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); if (stmt->action == AP_AddObjects) { @@ -970,19 +1103,22 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, * Check if the relation is member of the existing schema in the * publication or member of the schema list specified. */ - schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_TABLE)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); - PublicationAddTables(pubid, rels, false, stmt); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropTables(pubid, rels, false); + PublicationDropRelations(pubid, rels, false); else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -1064,18 +1200,18 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, } /* And drop them. */ - PublicationDropTables(pubid, delrels, true); + PublicationDropRelations(pubid, delrels, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddTables(pubid, rels, true, stmt); + PublicationAddRelations(pubid, rels, true, stmt); - CloseTableList(delrels); + CloseRelationList(delrels); } - CloseTableList(rels); + CloseRelationList(rels); } /* @@ -1085,7 +1221,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, */ static void AlterPublicationSchemas(AlterPublicationStmt *stmt, - HeapTuple tup, List *schemaidlist) + HeapTuple tup, List *schemaidlist, + char objectType) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -1107,20 +1244,20 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, List *rels; List *reloids; - reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + reloids = GetPublicationRelations(pubform->oid, objectType, PUBLICATION_PART_ROOT); rels = OpenRelIdList(reloids); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLES_IN_SCHEMA); - CloseTableList(rels); - PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + CloseRelationList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropSchemas(pubform->oid, schemaidlist, false); + PublicationDropSchemas(pubform->oid, schemaidlist, objectType, false); else /* AP_SetObjects */ { - List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *oldschemaids = GetPublicationSchemas(pubform->oid, objectType); List *delschemas = NIL; /* Identify which schemas should be dropped */ @@ -1133,13 +1270,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, LockSchemaList(delschemas); /* And drop them */ - PublicationDropSchemas(pubform->oid, delschemas, true); + PublicationDropSchemas(pubform->oid, delschemas, objectType, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, true, stmt); } } @@ -1149,12 +1286,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist) + List *tables, List *tables_schemaidlist, + List *sequences, List *sequences_schemaidlist) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && - schemaidlist && !superuser()) + (tables_schemaidlist || sequences_schemaidlist) && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to add or set schemas"))); @@ -1163,13 +1301,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, * Check that user is allowed to manipulate the publication tables in * schema */ - if (schemaidlist && pubform->puballtables) + if (tables_schemaidlist && pubform->puballtables) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + /* + * Check that user is allowed to manipulate the publication sequences in + * schema + */ + if (sequences_schemaidlist && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications."))); + /* Check that user is allowed to manipulate the publication tables. */ if (tables && pubform->puballtables) ereport(ERROR, @@ -1177,6 +1326,107 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (sequences && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."))); +} + +/* + * Add or remove sequence to/from publication. + */ +static void +AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, + List *sequences, List *schemaidlist) +{ + List *rels = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + Oid pubid = pubform->oid; + + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!sequences && stmt->action != AP_SetObjects) + return; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + + if (stmt->action == AP_AddObjects) + { + List *schemas = NIL; + + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_SEQUENCE)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(pubid, rels, false, stmt); + } + else if (stmt->action == AP_DropObjects) + PublicationDropRelations(pubid, rels, false); + else /* DEFELEM_SET */ + { + List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ROOT); + List *delrels = NIL; + ListCell *oldlc; + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + + /* Calculate which relations to drop. */ + foreach(oldlc, oldrelids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + PublicationRelInfo *oldrel; + bool found = false; + + foreach(newlc, rels) + { + PublicationRelInfo *newpubrel; + + newpubrel = (PublicationRelInfo *) lfirst(newlc); + if (RelationGetRelid(newpubrel->relation) == oldrelid) + { + found = true; + break; + } + } + /* Not yet in the list, open it and add to the list */ + if (!found) + { + oldrel = palloc(sizeof(PublicationRelInfo)); + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrelid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); + } + } + + /* And drop them. */ + PublicationDropRelations(pubid, delrels, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddRelations(pubid, rels, true, stmt); + + CloseRelationList(delrels); + } + + CloseRelationList(rels); } /* @@ -1214,14 +1464,22 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) AlterPublicationOptions(pstate, stmt, rel, tup); else { - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; Oid pubid = pubform->oid; - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); - CheckAlterPublication(stmt, tup, relations, schemaidlist); + CheckAlterPublication(stmt, tup, + tables, tables_schemaidlist, + sequences, sequences_schemaidlist); heap_freetuple(tup); @@ -1249,9 +1507,16 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist, + AlterPublicationTables(stmt, tup, tables, tables_schemaidlist, pstate->p_sourcetext); - AlterPublicationSchemas(stmt, tup, schemaidlist); + + AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist); + + AlterPublicationSchemas(stmt, tup, tables_schemaidlist, + PUB_OBJTYPE_TABLE); + + AlterPublicationSchemas(stmt, tup, sequences_schemaidlist, + PUB_OBJTYPE_SEQUENCE); } /* Cleanup. */ @@ -1319,7 +1584,7 @@ RemovePublicationById(Oid pubid) pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) CacheInvalidateRelcacheAll(); CatalogTupleDelete(rel, &tup->t_self); @@ -1355,6 +1620,7 @@ RemovePublicationSchemaById(Oid psoid) * partitions. */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + pubsch->pntype, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -1397,29 +1663,45 @@ OpenRelIdList(List *relids) * add them to a publication. */ static List * -OpenTableList(List *tables) +OpenRelationList(List *rels, char objectType) { List *relids = NIL; - List *rels = NIL; + List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; /* * Open, share-lock, and check all the explicitly-specified relations */ - foreach(lc, tables) + foreach(lc, rels) { PublicationTable *t = lfirst_node(PublicationTable, lc); bool recurse = t->relation->inh; Relation rel; Oid myrelid; PublicationRelInfo *pub_rel; + char myrelkind; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); rel = table_openrv(t->relation, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); + myrelkind = get_rel_relkind(myrelid); + + /* + * Make sure the relkind matches the expected object type. This may + * happen e.g. when adding a sequence using ADD TABLE or a table + * using ADD SEQUENCE). + * + * XXX We let through unsupported object types (views etc.). Those + * will be caught later in check_publication_add_relation. + */ + if (pub_get_object_type_for_relkind(myrelkind) != PUB_OBJTYPE_UNSUPPORTED && + pub_get_object_type_for_relkind(myrelkind) != objectType) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("object type does not match type expected by command")); /* * Filter out duplicates if user specifies "foo, foo". @@ -1444,7 +1726,7 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) @@ -1498,7 +1780,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) @@ -1510,14 +1792,14 @@ OpenTableList(List *tables) list_free(relids); list_free(relids_with_rf); - return rels; + return result; } /* * Close all relations in the list. */ static void -CloseTableList(List *rels) +CloseRelationList(List *rels) { ListCell *lc; @@ -1565,12 +1847,12 @@ LockSchemaList(List *schemalist) * Add listed tables to the publication. */ static void -PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, rels) { @@ -1599,7 +1881,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, * Remove listed tables from the publication. */ static void -PublicationDropTables(Oid pubid, List *rels, bool missing_ok) +PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1639,19 +1921,19 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) * Add listed schemas to the publication. */ static void -PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt) +PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, schemas) { Oid schemaid = lfirst_oid(lc); ObjectAddress obj; - obj = publication_add_schema(pubid, schemaid, if_not_exists); + obj = publication_add_schema(pubid, schemaid, objectType, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -1667,7 +1949,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, * Remove listed schemas from the publication. */ static void -PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +PublicationDropSchemas(Oid pubid, List *schemas, char objectType, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1677,10 +1959,11 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) { Oid schemaid = lfirst_oid(lc); - psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + psid = GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid)); + ObjectIdGetDatum(pubid), + CharGetDatum(objectType)); if (!OidIsValid(psid)) { if (missing_ok) @@ -1735,6 +2018,13 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) NameStr(form->pubname)), errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); + if (form->puballsequences && !superuser_arg(newOwnerId)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to change owner of publication \"%s\"", + NameStr(form->pubname)), + errhint("The owner of a FOR ALL SEQUENCES publication must be a superuser."))); + if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c13cada3bf1..717bb0b2aa9 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -337,6 +337,160 @@ ResetSequence(Oid seq_relid) } /* + * Update the sequence state by modifying the existing sequence data row. + * + * This keeps the same relfilenode, so the behavior is non-transactional. + */ +static void +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) +{ + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqdatatuple; + Form_pg_sequence_data seq; + + /* open and lock sequence */ + init_sequence(seqrelid, &elm, &seqrel); + + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + + /* check the comment above nextval_internal()'s equivalent call. */ + if (RelationNeedsWAL(seqrel)) + { + GetTopTransactionId(); + + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + + /* ready to change the on-disk (or really, in-buffer) tuple */ + START_CRIT_SECTION(); + + seq->last_value = last_value; + seq->is_called = is_called; + seq->log_cnt = log_cnt; + + MarkBufferDirty(buf); + + /* XLOG stuff */ + if (RelationNeedsWAL(seqrel)) + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + Page page = BufferGetPage(buf); + + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); + + xlrec.node = seqrel->rd_node; + xlrec.created = false; + + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buf); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seqrel, NoLock); +} + +/* + * Update the sequence state by creating a new relfilenode. + * + * This creates a new relfilenode, to allow transactional behavior. + */ +static void +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) +{ + SeqTable elm; + Relation seqrel; + Buffer buf; + HeapTupleData seqdatatuple; + Form_pg_sequence_data seq; + HeapTuple tuple; + + /* open and lock sequence */ + init_sequence(seq_relid, &elm, &seqrel); + + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + + /* Copy the existing sequence tuple. */ + tuple = heap_copytuple(&seqdatatuple); + + /* Now we're done with the old page */ + UnlockReleaseBuffer(buf); + + /* + * Modify the copied tuple to update the sequence state (similar to what + * ResetSequence does). + */ + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + seq->last_value = last_value; + seq->is_called = is_called; + seq->log_cnt = log_cnt; + + /* + * Create a new storage file for the sequence - this is needed for the + * transactional behavior. + */ + RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence); + + /* + * Ensure sequence's relfrozenxid is at 0, since it won't contain any + * unfrozen XIDs. Same with relminmxid, since a sequence will never + * contain multixacts. + */ + Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId); + Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId); + + /* + * Insert the modified tuple into the new storage file. This does all the + * necessary WAL-logging etc. + */ + fill_seq_with_data(seqrel, tuple); + + /* Clear local cache so that we don't think we have cached numbers */ + /* Note that we do not change the currval() state */ + elm->cached = elm->last; + + relation_close(seqrel, NoLock); +} + +/* + * Set a sequence to a specified internal state. + * + * The change is made transactionally, so that on failure of the current + * transaction, the sequence will be restored to its previous state. + * We do that by creating a whole new relfilenode for the sequence; so this + * works much like the rewriting forms of ALTER TABLE. + * + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, + * which must not be released until end of transaction. Caller is also + * responsible for permissions checking. + */ +void +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called) +{ + if (transactional) + SetSequence_transactional(seq_relid, last_value, log_cnt, is_called); + else + SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called); +} + +/* * Initialize a sequence's relation with the specified tuple as content */ static void diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e16f04626de..abebffdf3bb 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -90,6 +90,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -541,9 +542,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; + List *relations; ListCell *lc; - char table_state; + char sync_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -558,14 +559,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Get the table and sequence list from publisher and build + * local relation sync status info. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); + + foreach(lc, relations) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -576,7 +580,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, table_state, + AddSubscriptionRelState(subid, relid, sync_state, InvalidXLogRecPtr); } @@ -602,12 +606,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * * Note that if tables were specified but copy_data is false * then it is safe to enable two_phase up-front because those - * tables are already initially in READY state. When the - * subscription has no tables, we leave the twophase state as - * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * relations are already initially in READY state. When the + * subscription has no relations, we leave the twophase state + * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && relations != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -677,8 +681,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { - /* Get the table list from publisher. */ + /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -1713,6 +1719,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } /* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); + first = true; + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated sequences from the publisher: %s", + res->err))); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + tablelist = lappend(tablelist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return tablelist; +} + +/* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop * them manually, if required. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 80faae985e9..124b9961dc9 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -42,6 +42,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" @@ -16381,11 +16382,14 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) * Check that setting the relation to a different schema won't result in a * publication having both a schema and the same schema's table, as this * is not supported. + * + * XXX We do this for tables and sequences, but it's better to keep the two + * blocks separate, to make the strings easier to translate. */ if (stmt->objectType == OBJECT_TABLE) { ListCell *lc; - List *schemaPubids = GetSchemaPublications(nspOid); + List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_TABLE); List *relPubids = GetRelationPublications(RelationGetRelid(rel)); foreach(lc, relPubids) @@ -16403,6 +16407,27 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) get_publication_name(pubid, false))); } } + else if (stmt->objectType == OBJECT_SEQUENCE) + { + ListCell *lc; + List *schemaPubids = GetSchemaPublications(nspOid, PUB_OBJTYPE_SEQUENCE); + List *relPubids = GetRelationPublications(RelationGetRelid(rel)); + + foreach(lc, relPubids) + { + Oid pubid = lfirst_oid(lc); + + if (list_member_oid(schemaPubids, pubid)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move sequence \"%s\" to schema \"%s\"", + RelationGetRelationName(rel), stmt->newschema), + errdetail("The schema \"%s\" and same schema's sequence \"%s\" cannot be part of the same publication \"%s\".", + stmt->newschema, + RelationGetRelationName(rel), + get_publication_name(pubid, false))); + } + } /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 13328141e23..0df7cf58747 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -636,7 +636,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d4f8455a2bd..55f720a88f4 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4862,7 +4862,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(pubobjects); - COPY_SCALAR_FIELD(for_all_tables); + COPY_NODE_FIELD(for_all_objects); return newnode; } @@ -4875,7 +4875,7 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(pubobjects); - COPY_SCALAR_FIELD(for_all_tables); + COPY_NODE_FIELD(for_all_objects); COPY_SCALAR_FIELD(action); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index f1002afe7a0..82562eb9b87 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2333,7 +2333,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(pubobjects); - COMPARE_SCALAR_FIELD(for_all_tables); + COMPARE_NODE_FIELD(for_all_objects); return true; } @@ -2345,7 +2345,7 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(pubobjects); - COMPARE_SCALAR_FIELD(for_all_tables); + COMPARE_NODE_FIELD(for_all_objects); COMPARE_SCALAR_FIELD(action); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 0036c2f9e2d..e327bc735fb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -446,7 +446,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list pub_obj_list + drop_option_list pub_obj_list pub_obj_type_list %type <node> opt_routine_body %type <groupclause> group_clause @@ -575,6 +575,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> var_value zone_value %type <rolespec> auth_ident RoleSpec opt_granted_by %type <publicationobjectspec> PublicationObjSpec +%type <node> pub_obj_type %type <keyword> unreserved_keyword type_func_name_keyword %type <keyword> col_name_keyword reserved_keyword @@ -9701,12 +9702,9 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec * * CREATE PUBLICATION FOR ALL TABLES [WITH options] * - * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] - * - * pub_obj is one of: + * CREATE PUBLICATION FOR ALL SEQUENCES [WITH options] * - * TABLE table [, ...] - * ALL TABLES IN SCHEMA schema [, ...] + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] * *****************************************************************************/ @@ -9718,12 +9716,12 @@ CreatePublicationStmt: n->options = $4; $$ = (Node *)n; } - | CREATE PUBLICATION name FOR ALL TABLES opt_definition + | CREATE PUBLICATION name FOR ALL pub_obj_type_list opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; n->options = $7; - n->for_all_tables = true; + n->for_all_objects = $6; $$ = (Node *)n; } | CREATE PUBLICATION name FOR pub_obj_list opt_definition @@ -9772,6 +9770,26 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA; $$->location = @5; } + | SEQUENCE relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; + } + | ALL SEQUENCES IN_P SCHEMA ColId + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA; + $$->name = $5; + $$->location = @5; + } + | ALL SEQUENCES IN_P SCHEMA CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA; + $$->location = @5; + } | ColId OptWhereClause { $$ = makeNode(PublicationObjSpec); @@ -9826,6 +9844,19 @@ pub_obj_list: PublicationObjSpec { $$ = lappend($1, $3); } ; +pub_obj_type: TABLES + { $$ = (Node *) makeString("tables"); } + | SEQUENCES + { $$ = (Node *) makeString("sequences"); } + ; + +pub_obj_type_list: pub_obj_type + { $$ = list_make1($1); } + | pub_obj_type_list ',' pub_obj_type + { $$ = lappend($1, $3); } + ; + + /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) @@ -9836,11 +9867,6 @@ pub_obj_list: PublicationObjSpec * * ALTER PUBLICATION name SET pub_obj [, ...] * - * pub_obj is one of: - * - * TABLE table_name [, ...] - * ALL TABLES IN SCHEMA schema_name [, ...] - * *****************************************************************************/ AlterPublicationStmt: diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index c9b0eeefd7e..3dbe85d61a2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -649,6 +649,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, } /* + * Write SEQUENCE to stream + */ +void +logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, + XLogRecPtr lsn, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + uint8 flags = 0; + char *relname; + + pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + + logicalrep_write_namespace(out, RelationGetNamespace(rel)); + relname = RelationGetRelationName(rel); + pq_sendstring(out, relname); + + pq_sendint8(out, transactional); + pq_sendint64(out, last_value); + pq_sendint64(out, log_cnt); + pq_sendint8(out, is_called); +} + +/* + * Read SEQUENCE from the stream. + */ +void +logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) +{ + /* XXX skipping flags and lsn */ + pq_getmsgint(in, 1); + pq_getmsgint64(in); + + /* Read relation name from stream */ + seqdata->nspname = pstrdup(logicalrep_read_namespace(in)); + seqdata->seqname = pstrdup(pq_getmsgstring(in)); + + seqdata->transactional = pq_getmsgint(in, 1); + seqdata->last_value = pq_getmsgint64(in); + seqdata->log_cnt = pq_getmsgint64(in); + seqdata->is_called = pq_getmsgint(in, 1); +} + +/* * Write relation description to the output stream. */ void @@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_SEQUENCE: + return "SEQUENCE"; } elog(ERROR, "invalid logical replication message type \"%c\"", action); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571c..d8b12d94bc3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1000,6 +1001,95 @@ copy_table(Relation rel) } /* + * Fetch sequence data (current state) from the remote node. + */ +static void +fetch_sequence_data(char *nspname, char *relname, + int64 *last_value, int64 *log_cnt, bool *is_called) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_sequence(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + List *qual = NIL; + StringInfoData cmd; + int64 last_value = 0, + log_cnt = 0; + bool is_called = 0; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel, &qual); + + /* sequences don't have row filters */ + Assert(!qual); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + Assert(lrel.relkind == RELKIND_SEQUENCE); + + fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); + + /* tablesync sets the sequences in non-transactional way */ + SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* Do the right action depending on the relation kind. */ + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + /* Now do the initial sequence copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_sequence(rel); + PopActiveSnapshot(); + } + else + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + } res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db8..f3868b3e1f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s) } /* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by SetSequence). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ + LockRelationOid(relid, AccessExclusiveLock); + + /* apply the sequence change */ + SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + +/* * Handle STREAM START message. */ static void @@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5fddab3a3d4..4cdc698cbb3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,6 +15,7 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "commands/defrem.h" #include "executor/executor.h" @@ -53,6 +54,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation relation, bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -208,6 +213,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->sequence_cb = pgoutput_sequence; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -224,6 +230,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; + cb->stream_sequence_cb = pgoutput_sequence; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; @@ -237,6 +244,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool publication_names_given = false; bool binary_option_given = false; bool messages_option_given = false; + bool sequences_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; @@ -244,6 +252,7 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = false; data->messages = false; data->two_phase = false; + data->sequences = true; foreach(lc, options) { @@ -312,6 +321,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->messages = defGetBoolean(defel); } + else if (strcmp(defel->defname, "sequences") == 0) + { + if (sequences_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sequences_option_given = true; + + data->sequences = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -1440,6 +1459,51 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pgoutput_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation relation, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + RelationSyncEntry *relentry; + + if (!data->sequences) + return; + + if (!is_publishable_relation(relation)) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + relentry = get_rel_sync_entry(data, relation); + + /* + * First check the sequence filter. + * + * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here. + */ + if (!relentry->pubactions.pubsequence) + return; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_sequence(ctx->out, + relation, + xid, + sequence_lsn, + transactional, + last_value, + log_cnt, + is_called); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ @@ -1725,7 +1789,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = + entry->pubactions.pubsequence = false; entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); @@ -1739,13 +1804,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); - + char objectType = pub_get_object_type_for_relkind(get_rel_relkind(relid)); /* * We don't acquire a lock on the namespace system table as we build * the cache entry using a historic snapshot and all the later changes * are absorbed while decoding WAL. */ - List *schemaPubids = GetSchemaPublications(schemaId); + List *schemaPubids = GetSchemaPublications(schemaId, objectType); ListCell *lc; Oid publish_as_relid = relid; int publish_ancestor_level = 0; @@ -1780,6 +1845,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->pubactions.pubsequence = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). @@ -1826,9 +1892,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * If this is a FOR ALL TABLES publication, pick the partition root - * and set the ancestor level accordingly. + * and set the ancestor level accordingly. If this is a FOR ALL + * SEQUENCES publication, we publish it too but we don't need to + * pick the partition root etc. */ - if (pub->alltables) + if (pub->alltables || pub->allsequences) { publish = true; if (pub->pubviaroot && am_partition) @@ -1889,6 +1957,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + entry->pubactions.pubsequence |= pub->pubactions.pubsequence; /* * We want to publish the changes as the top-most ancestor diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 3d05297b0d9..4f3fe1118a2 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -56,6 +56,7 @@ #include "catalog/pg_opclass.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_shseclabel.h" #include "catalog/pg_statistic_ext.h" @@ -5562,6 +5563,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) Oid schemaid; List *ancestors = NIL; Oid relid = RelationGetRelid(relation); + char relkind = relation->rd_rel->relkind; + char objType; /* * If not publishable, it publishes no actions. (pgoutput_change() will @@ -5588,8 +5591,15 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) /* Fetch the publication membership info. */ puboids = GetRelationPublications(relid); schemaid = RelationGetNamespace(relation); - puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + objType = pub_get_object_type_for_relkind(relkind); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid, objType)); + + /* + * If this is a partion (and thus a table), lookup all ancestors and track + * all publications them too. + */ if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5601,12 +5611,23 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + + /* include all publications publishing schema of all ancestors */ schemaid = get_rel_namespace(ancestor); puboids = list_concat_unique_oid(puboids, - GetSchemaPublications(schemaid)); + GetSchemaPublications(schemaid, + PUB_OBJTYPE_TABLE)); } } - puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + + /* + * Consider also FOR ALL TABLES and FOR ALL SEQUENCES publications, + * depending on the relkind of the relation. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + puboids = list_concat_unique_oid(puboids, GetAllSequencesPublications()); + else + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); foreach(lc, puboids) { @@ -5625,6 +5646,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->pubactions.pubupdate |= pubform->pubupdate; pubdesc->pubactions.pubdelete |= pubform->pubdelete; pubdesc->pubactions.pubtruncate |= pubform->pubtruncate; + pubdesc->pubactions.pubsequence |= pubform->pubsequence; /* * Check if all columns referenced in the filter expression are part of diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index f4e7819f1e2..a675877d195 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -630,12 +630,12 @@ static const struct cachedesc cacheinfo[] = { 64 }, {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ - PublicationNamespacePnnspidPnpubidIndexId, - 2, + PublicationNamespacePnnspidPnpubidPntypeIndexId, + 3, { Anum_pg_publication_namespace_pnnspid, Anum_pg_publication_namespace_pnpubid, - 0, + Anum_pg_publication_namespace_pntype, 0 }, 64 |