diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-03-29 10:46:58 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-03-29 10:46:58 +0530 |
commit | 062a8444242404242f7c2b814fed37b329639408 (patch) | |
tree | a797106924ef0b7ee1779b4341f20a78fc1bd9b1 /src/backend | |
parent | de5a47af2d8003dee123815bb7e58913be9a03f3 (diff) | |
download | postgresql-062a8444242404242f7c2b814fed37b329639408.tar.gz postgresql-062a8444242404242f7c2b814fed37b329639408.zip |
Avoid syncing data twice for the 'publish_via_partition_root' option.
When there are multiple publications for a subscription and one of those
publishes via the parent table by using publish_via_partition_root and the
other one directly publishes the child table, we end up copying the same
data twice during initial synchronization. The reason for this was that we
get both the parent and child tables from the publisher and try to copy
the data for both of them.
This patch extends the function pg_get_publication_tables() to take a
publication list as its input parameter. This allows us to exclude a
partition table whose ancestor is published by the same publication list.
This problem does exist in back-branches but we decide to fix it there in
a separate commit if required. The fix for back-branches requires quite
complicated changes to fetch the required table information from the
publisher as we can't update the function pg_get_publication_tables() in
back-branches. We are not sure whether we want to deviate and complicate
the code in back-branches for this problem as there are no field reports
yet.
Author: Wang wei
Reviewed-by: Peter Smith, Jacob Champion, Kuroda Hayato, Vignesh C, Osumi Takamichi, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 211 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 59 |
2 files changed, 189 insertions, 81 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index a98fcad421f..47637f28ab6 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,6 +45,14 @@ #include "utils/rel.h" #include "utils/syscache.h" +/* Records association between publication and published table */ +typedef struct +{ + Oid relid; /* OID of published table */ + Oid pubid; /* OID of publication that publishes this + * table. */ +} published_rel; + static void publication_translate_columns(Relation targetrel, List *columns, int *natts, AttrNumber **attrs); @@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) } /* - * Filter out the partitions whose parent tables were also specified in - * the publication. + * Returns true if the ancestor is in the list of published relations. + * Otherwise, returns false. */ -static List * -filter_partitions(List *relids) +static bool +is_ancestor_member_tableinfos(Oid ancestor, List *table_infos) +{ + ListCell *lc; + + foreach(lc, table_infos) + { + Oid relid = ((published_rel *) lfirst(lc))->relid; + + if (relid == ancestor) + return true; + } + + return false; +} + +/* + * Filter out the partitions whose parent tables are also present in the list. + */ +static void +filter_partitions(List *table_infos) { - List *result = NIL; ListCell *lc; - ListCell *lc2; - foreach(lc, relids) + foreach(lc, table_infos) { bool skip = false; List *ancestors = NIL; - Oid relid = lfirst_oid(lc); + ListCell *lc2; + published_rel *table_info = (published_rel *) lfirst(lc); - if (get_rel_relispartition(relid)) - ancestors = get_partition_ancestors(relid); + if (get_rel_relispartition(table_info->relid)) + ancestors = get_partition_ancestors(table_info->relid); foreach(lc2, ancestors) { Oid ancestor = lfirst_oid(lc2); - /* Check if the parent table exists in the published table list. */ - if (list_member_oid(relids, ancestor)) + if (is_ancestor_member_tableinfos(ancestor, table_infos)) { skip = true; break; } } - if (!skip) - result = lappend_oid(result, relid); + if (skip) + table_infos = foreach_delete_current(table_infos, lc); } - - return result; } /* @@ -1026,22 +1049,27 @@ GetPublicationByName(const char *pubname, bool missing_ok) } /* - * Returns information of tables in a publication. + * Get information of the tables in the given publication array. + * + * Returns pubid, relid, column list, row filter for each table. */ Datum pg_get_publication_tables(PG_FUNCTION_ARGS) { -#define NUM_PUBLICATION_TABLES_ELEM 3 +#define NUM_PUBLICATION_TABLES_ELEM 4 FuncCallContext *funcctx; - char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - Publication *publication; - List *tables; + List *table_infos = NIL; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc; MemoryContext oldcontext; + ArrayType *arr; + Datum *elems; + int nelems, + i; + bool viaroot = false; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -1049,68 +1077,108 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) /* switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - publication = GetPublicationByName(pubname, false); - /* - * Publications support partitioned tables, although all changes are - * replicated using leaf partition identity and schema, so we only - * need those. + * Deconstruct the parameter into elements where each element is a + * publication name. */ - if (publication->alltables) - { - tables = GetAllTablesPublicationRelations(publication->pubviaroot); - } - else + arr = PG_GETARG_ARRAYTYPE_P(0); + deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, + &elems, NULL, &nelems); + + /* Get Oids of tables from each publication. */ + for (i = 0; i < nelems; i++) { - List *relids, - *schemarelids; - - relids = GetPublicationRelations(publication->oid, - publication->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); - schemarelids = GetAllSchemaPublicationRelations(publication->oid, - publication->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); - tables = list_concat_unique_oid(relids, schemarelids); + Publication *pub_elem; + List *pub_elem_tables = NIL; + ListCell *lc; + + pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false); /* - * If the publication publishes partition changes via their - * respective root partitioned tables, we must exclude partitions - * in favor of including the root partitioned tables. Otherwise, - * the function could return both the child and parent tables - * which could cause data of the child table to be - * double-published on the subscriber side. + * Publications support partitioned tables. If + * publish_via_partition_root is false, all changes are replicated + * using leaf partition identity and schema, so we only need + * those. Otherwise, get the partitioned table itself. */ - if (publication->pubviaroot) - tables = filter_partitions(tables); + if (pub_elem->alltables) + pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot); + else + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(pub_elem->oid, + pub_elem->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, + pub_elem->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + pub_elem_tables = list_concat_unique_oid(relids, schemarelids); + } + + /* + * Record the published table and the corresponding publication so + * that we can get row filters and column lists later. + * + * When a table is published by multiple publications, to obtain + * all row filters and column lists, the structure related to this + * table will be recorded multiple times. + */ + foreach(lc, pub_elem_tables) + { + published_rel *table_info = (published_rel *) palloc(sizeof(published_rel)); + + table_info->relid = lfirst_oid(lc); + table_info->pubid = pub_elem->oid; + table_infos = lappend(table_infos, table_info); + } + + /* At least one publication is using publish_via_partition_root. */ + if (pub_elem->pubviaroot) + viaroot = true; } + /* + * If the publication publishes partition changes via their respective + * root partitioned tables, we must exclude partitions in favor of + * including the root partitioned tables. Otherwise, the function + * could return both the child and parent tables which could cause + * data of the child table to be double-published on the subscriber + * side. + */ + if (viaroot) + filter_partitions(table_infos); + /* Construct a tuple descriptor for the result rows. */ tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid", + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid", OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs", + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs", INT2VECTOROID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual", PG_NODE_TREEOID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); - funcctx->user_fctx = (void *) tables; + funcctx->user_fctx = (void *) table_infos; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); - tables = (List *) funcctx->user_fctx; + table_infos = (List *) funcctx->user_fctx; - if (funcctx->call_cntr < list_length(tables)) + if (funcctx->call_cntr < list_length(table_infos)) { HeapTuple pubtuple = NULL; HeapTuple rettuple; - Oid relid = list_nth_oid(tables, funcctx->call_cntr); + Publication *pub; + published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr); + Oid relid = table_info->relid; Oid schemaid = get_rel_namespace(relid); Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; @@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * Form tuple with appropriate data. */ - publication = GetPublicationByName(pubname, false); + pub = GetPublication(table_info->pubid); - values[0] = ObjectIdGetDatum(relid); + values[0] = ObjectIdGetDatum(pub->oid); + values[1] = ObjectIdGetDatum(relid); /* * We don't consider row filters or column lists for FOR ALL TABLES or * FOR TABLES IN SCHEMA publications. */ - if (!publication->alltables && + if (!pub->alltables && !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(publication->oid))) + ObjectIdGetDatum(pub->oid))) pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), - ObjectIdGetDatum(publication->oid)); + ObjectIdGetDatum(pub->oid)); if (HeapTupleIsValid(pubtuple)) { /* Lookup the column list attribute. */ - values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prattrs, - &(nulls[1])); + &(nulls[2])); /* Null indicates no filter. */ - values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prqual, - &(nulls[2])); + &(nulls[3])); } else { - nulls[1] = true; nulls[2] = true; + nulls[3] = true; } /* Show all columns when the column list is not specified. */ - if (nulls[1] == true) + if (nulls[2]) { Relation rel = table_open(relid, AccessShareLock); int nattnums = 0; @@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) if (nattnums > 0) { - values[1] = PointerGetDatum(buildint2vector(attnums, nattnums)); - nulls[1] = false; + values[2] = PointerGetDatum(buildint2vector(attnums, nattnums)); + nulls[2] = false; } table_close(rel, AccessShareLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8a26ddab1c7..93a238412aa 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1936,21 +1936,60 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid}; List *tablelist = NIL; - bool check_columnlist = (walrcv_server_version(wrconn) >= 150000); + int server_version = walrcv_server_version(wrconn); + bool check_columnlist = (server_version >= 150000); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); - /* Get column lists for each relation if the publisher supports it */ - if (check_columnlist) - appendStringInfoString(&cmd, ", t.attnames\n"); + /* Get the list of tables from the publisher. */ + if (server_version >= 160000) + { + StringInfoData pub_names; - appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - get_publications_str(publications, &cmd, true); - appendStringInfoChar(&cmd, ')'); + tableRow[2] = INT2VECTOROID; + initStringInfo(&pub_names); + get_publications_str(publications, &pub_names, true); + + /* + * From version 16, we allowed passing multiple publications to the + * function pg_get_publication_tables. This helped to filter out the + * partition table whose ancestor is also published in this + * publication array. + * + * Join pg_get_publication_tables with pg_publication to exclude + * non-existing publications. + * + * Note that attrs are always stored in sorted order so we don't need + * to worry if different publications have specified them in a + * different order. See publication_translate_columns. + */ + appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" + " FROM pg_class c\n" + " JOIN pg_namespace n ON n.oid = c.relnamespace\n" + " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n" + " FROM pg_publication\n" + " WHERE pubname IN ( %s )) AS gpt\n" + " ON gpt.relid = c.oid\n", + pub_names.data); + + pfree(pub_names.data); + } + else + { + tableRow[2] = NAMEARRAYOID; + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + + /* Get column lists for each relation if the publisher supports it */ + if (check_columnlist) + appendStringInfoString(&cmd, ", t.attnames\n"); + + appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + } res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); pfree(cmd.data); |