diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 211 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 59 |
2 files changed, 189 insertions, 81 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index a98fcad421f..47637f28ab6 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,6 +45,14 @@ #include "utils/rel.h" #include "utils/syscache.h" +/* Records association between publication and published table */ +typedef struct +{ + Oid relid; /* OID of published table */ + Oid pubid; /* OID of publication that publishes this + * table. */ +} published_rel; + static void publication_translate_columns(Relation targetrel, List *columns, int *natts, AttrNumber **attrs); @@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) } /* - * Filter out the partitions whose parent tables were also specified in - * the publication. + * Returns true if the ancestor is in the list of published relations. + * Otherwise, returns false. */ -static List * -filter_partitions(List *relids) +static bool +is_ancestor_member_tableinfos(Oid ancestor, List *table_infos) +{ + ListCell *lc; + + foreach(lc, table_infos) + { + Oid relid = ((published_rel *) lfirst(lc))->relid; + + if (relid == ancestor) + return true; + } + + return false; +} + +/* + * Filter out the partitions whose parent tables are also present in the list. + */ +static void +filter_partitions(List *table_infos) { - List *result = NIL; ListCell *lc; - ListCell *lc2; - foreach(lc, relids) + foreach(lc, table_infos) { bool skip = false; List *ancestors = NIL; - Oid relid = lfirst_oid(lc); + ListCell *lc2; + published_rel *table_info = (published_rel *) lfirst(lc); - if (get_rel_relispartition(relid)) - ancestors = get_partition_ancestors(relid); + if (get_rel_relispartition(table_info->relid)) + ancestors = get_partition_ancestors(table_info->relid); foreach(lc2, ancestors) { Oid ancestor = lfirst_oid(lc2); - /* Check if the parent table exists in the published table list. */ - if (list_member_oid(relids, ancestor)) + if (is_ancestor_member_tableinfos(ancestor, table_infos)) { skip = true; break; } } - if (!skip) - result = lappend_oid(result, relid); + if (skip) + table_infos = foreach_delete_current(table_infos, lc); } - - return result; } /* @@ -1026,22 +1049,27 @@ GetPublicationByName(const char *pubname, bool missing_ok) } /* - * Returns information of tables in a publication. + * Get information of the tables in the given publication array. + * + * Returns pubid, relid, column list, row filter for each table. */ Datum pg_get_publication_tables(PG_FUNCTION_ARGS) { -#define NUM_PUBLICATION_TABLES_ELEM 3 +#define NUM_PUBLICATION_TABLES_ELEM 4 FuncCallContext *funcctx; - char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - Publication *publication; - List *tables; + List *table_infos = NIL; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc; MemoryContext oldcontext; + ArrayType *arr; + Datum *elems; + int nelems, + i; + bool viaroot = false; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -1049,68 +1077,108 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) /* switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - publication = GetPublicationByName(pubname, false); - /* - * Publications support partitioned tables, although all changes are - * replicated using leaf partition identity and schema, so we only - * need those. + * Deconstruct the parameter into elements where each element is a + * publication name. */ - if (publication->alltables) - { - tables = GetAllTablesPublicationRelations(publication->pubviaroot); - } - else + arr = PG_GETARG_ARRAYTYPE_P(0); + deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, + &elems, NULL, &nelems); + + /* Get Oids of tables from each publication. */ + for (i = 0; i < nelems; i++) { - List *relids, - *schemarelids; - - relids = GetPublicationRelations(publication->oid, - publication->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); - schemarelids = GetAllSchemaPublicationRelations(publication->oid, - publication->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); - tables = list_concat_unique_oid(relids, schemarelids); + Publication *pub_elem; + List *pub_elem_tables = NIL; + ListCell *lc; + + pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false); /* - * If the publication publishes partition changes via their - * respective root partitioned tables, we must exclude partitions - * in favor of including the root partitioned tables. Otherwise, - * the function could return both the child and parent tables - * which could cause data of the child table to be - * double-published on the subscriber side. + * Publications support partitioned tables. If + * publish_via_partition_root is false, all changes are replicated + * using leaf partition identity and schema, so we only need + * those. Otherwise, get the partitioned table itself. */ - if (publication->pubviaroot) - tables = filter_partitions(tables); + if (pub_elem->alltables) + pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot); + else + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(pub_elem->oid, + pub_elem->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, + pub_elem->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + pub_elem_tables = list_concat_unique_oid(relids, schemarelids); + } + + /* + * Record the published table and the corresponding publication so + * that we can get row filters and column lists later. + * + * When a table is published by multiple publications, to obtain + * all row filters and column lists, the structure related to this + * table will be recorded multiple times. + */ + foreach(lc, pub_elem_tables) + { + published_rel *table_info = (published_rel *) palloc(sizeof(published_rel)); + + table_info->relid = lfirst_oid(lc); + table_info->pubid = pub_elem->oid; + table_infos = lappend(table_infos, table_info); + } + + /* At least one publication is using publish_via_partition_root. */ + if (pub_elem->pubviaroot) + viaroot = true; } + /* + * If the publication publishes partition changes via their respective + * root partitioned tables, we must exclude partitions in favor of + * including the root partitioned tables. Otherwise, the function + * could return both the child and parent tables which could cause + * data of the child table to be double-published on the subscriber + * side. + */ + if (viaroot) + filter_partitions(table_infos); + /* Construct a tuple descriptor for the result rows. */ tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid", + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid", OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs", + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs", INT2VECTOROID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual", PG_NODE_TREEOID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); - funcctx->user_fctx = (void *) tables; + funcctx->user_fctx = (void *) table_infos; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); - tables = (List *) funcctx->user_fctx; + table_infos = (List *) funcctx->user_fctx; - if (funcctx->call_cntr < list_length(tables)) + if (funcctx->call_cntr < list_length(table_infos)) { HeapTuple pubtuple = NULL; HeapTuple rettuple; - Oid relid = list_nth_oid(tables, funcctx->call_cntr); + Publication *pub; + published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr); + Oid relid = table_info->relid; Oid schemaid = get_rel_namespace(relid); Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; @@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * Form tuple with appropriate data. */ - publication = GetPublicationByName(pubname, false); + pub = GetPublication(table_info->pubid); - values[0] = ObjectIdGetDatum(relid); + values[0] = ObjectIdGetDatum(pub->oid); + values[1] = ObjectIdGetDatum(relid); /* * We don't consider row filters or column lists for FOR ALL TABLES or * FOR TABLES IN SCHEMA publications. */ - if (!publication->alltables && + if (!pub->alltables && !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(publication->oid))) + ObjectIdGetDatum(pub->oid))) pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), - ObjectIdGetDatum(publication->oid)); + ObjectIdGetDatum(pub->oid)); if (HeapTupleIsValid(pubtuple)) { /* Lookup the column list attribute. */ - values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prattrs, - &(nulls[1])); + &(nulls[2])); /* Null indicates no filter. */ - values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prqual, - &(nulls[2])); + &(nulls[3])); } else { - nulls[1] = true; nulls[2] = true; + nulls[3] = true; } /* Show all columns when the column list is not specified. */ - if (nulls[1] == true) + if (nulls[2]) { Relation rel = table_open(relid, AccessShareLock); int nattnums = 0; @@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) if (nattnums > 0) { - values[1] = PointerGetDatum(buildint2vector(attnums, nattnums)); - nulls[1] = false; + values[2] = PointerGetDatum(buildint2vector(attnums, nattnums)); + nulls[2] = false; } table_close(rel, AccessShareLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8a26ddab1c7..93a238412aa 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1936,21 +1936,60 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid}; List *tablelist = NIL; - bool check_columnlist = (walrcv_server_version(wrconn) >= 150000); + int server_version = walrcv_server_version(wrconn); + bool check_columnlist = (server_version >= 150000); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); - /* Get column lists for each relation if the publisher supports it */ - if (check_columnlist) - appendStringInfoString(&cmd, ", t.attnames\n"); + /* Get the list of tables from the publisher. */ + if (server_version >= 160000) + { + StringInfoData pub_names; - appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - get_publications_str(publications, &cmd, true); - appendStringInfoChar(&cmd, ')'); + tableRow[2] = INT2VECTOROID; + initStringInfo(&pub_names); + get_publications_str(publications, &pub_names, true); + + /* + * From version 16, we allowed passing multiple publications to the + * function pg_get_publication_tables. This helped to filter out the + * partition table whose ancestor is also published in this + * publication array. + * + * Join pg_get_publication_tables with pg_publication to exclude + * non-existing publications. + * + * Note that attrs are always stored in sorted order so we don't need + * to worry if different publications have specified them in a + * different order. See publication_translate_columns. + */ + appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" + " FROM pg_class c\n" + " JOIN pg_namespace n ON n.oid = c.relnamespace\n" + " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n" + " FROM pg_publication\n" + " WHERE pubname IN ( %s )) AS gpt\n" + " ON gpt.relid = c.oid\n", + pub_names.data); + + pfree(pub_names.data); + } + else + { + tableRow[2] = NAMEARRAYOID; + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + + /* Get column lists for each relation if the publisher supports it */ + if (check_columnlist) + appendStringInfoString(&cmd, ", t.attnames\n"); + + appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + } res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); pfree(cmd.data); |