aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_publication.c211
-rw-r--r--src/backend/commands/subscriptioncmds.c59
2 files changed, 189 insertions, 81 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a98fcad421f..47637f28ab6 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -45,6 +45,14 @@
#include "utils/rel.h"
#include "utils/syscache.h"
+/* Records association between publication and published table */
+typedef struct
+{
+ Oid relid; /* OID of published table */
+ Oid pubid; /* OID of publication that publishes this
+ * table. */
+} published_rel;
+
static void publication_translate_columns(Relation targetrel, List *columns,
int *natts, AttrNumber **attrs);
@@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
}
/*
- * Filter out the partitions whose parent tables were also specified in
- * the publication.
+ * Returns true if the ancestor is in the list of published relations.
+ * Otherwise, returns false.
*/
-static List *
-filter_partitions(List *relids)
+static bool
+is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
+{
+ ListCell *lc;
+
+ foreach(lc, table_infos)
+ {
+ Oid relid = ((published_rel *) lfirst(lc))->relid;
+
+ if (relid == ancestor)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Filter out the partitions whose parent tables are also present in the list.
+ */
+static void
+filter_partitions(List *table_infos)
{
- List *result = NIL;
ListCell *lc;
- ListCell *lc2;
- foreach(lc, relids)
+ foreach(lc, table_infos)
{
bool skip = false;
List *ancestors = NIL;
- Oid relid = lfirst_oid(lc);
+ ListCell *lc2;
+ published_rel *table_info = (published_rel *) lfirst(lc);
- if (get_rel_relispartition(relid))
- ancestors = get_partition_ancestors(relid);
+ if (get_rel_relispartition(table_info->relid))
+ ancestors = get_partition_ancestors(table_info->relid);
foreach(lc2, ancestors)
{
Oid ancestor = lfirst_oid(lc2);
- /* Check if the parent table exists in the published table list. */
- if (list_member_oid(relids, ancestor))
+ if (is_ancestor_member_tableinfos(ancestor, table_infos))
{
skip = true;
break;
}
}
- if (!skip)
- result = lappend_oid(result, relid);
+ if (skip)
+ table_infos = foreach_delete_current(table_infos, lc);
}
-
- return result;
}
/*
@@ -1026,22 +1049,27 @@ GetPublicationByName(const char *pubname, bool missing_ok)
}
/*
- * Returns information of tables in a publication.
+ * Get information of the tables in the given publication array.
+ *
+ * Returns pubid, relid, column list, row filter for each table.
*/
Datum
pg_get_publication_tables(PG_FUNCTION_ARGS)
{
-#define NUM_PUBLICATION_TABLES_ELEM 3
+#define NUM_PUBLICATION_TABLES_ELEM 4
FuncCallContext *funcctx;
- char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
- Publication *publication;
- List *tables;
+ List *table_infos = NIL;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
TupleDesc tupdesc;
MemoryContext oldcontext;
+ ArrayType *arr;
+ Datum *elems;
+ int nelems,
+ i;
+ bool viaroot = false;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
@@ -1049,68 +1077,108 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- publication = GetPublicationByName(pubname, false);
-
/*
- * Publications support partitioned tables, although all changes are
- * replicated using leaf partition identity and schema, so we only
- * need those.
+ * Deconstruct the parameter into elements where each element is a
+ * publication name.
*/
- if (publication->alltables)
- {
- tables = GetAllTablesPublicationRelations(publication->pubviaroot);
- }
- else
+ arr = PG_GETARG_ARRAYTYPE_P(0);
+ deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+ &elems, NULL, &nelems);
+
+ /* Get Oids of tables from each publication. */
+ for (i = 0; i < nelems; i++)
{
- List *relids,
- *schemarelids;
-
- relids = GetPublicationRelations(publication->oid,
- publication->pubviaroot ?
- PUBLICATION_PART_ROOT :
- PUBLICATION_PART_LEAF);
- schemarelids = GetAllSchemaPublicationRelations(publication->oid,
- publication->pubviaroot ?
- PUBLICATION_PART_ROOT :
- PUBLICATION_PART_LEAF);
- tables = list_concat_unique_oid(relids, schemarelids);
+ Publication *pub_elem;
+ List *pub_elem_tables = NIL;
+ ListCell *lc;
+
+ pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
/*
- * If the publication publishes partition changes via their
- * respective root partitioned tables, we must exclude partitions
- * in favor of including the root partitioned tables. Otherwise,
- * the function could return both the child and parent tables
- * which could cause data of the child table to be
- * double-published on the subscriber side.
+ * Publications support partitioned tables. If
+ * publish_via_partition_root is false, all changes are replicated
+ * using leaf partition identity and schema, so we only need
+ * those. Otherwise, get the partitioned table itself.
*/
- if (publication->pubviaroot)
- tables = filter_partitions(tables);
+ if (pub_elem->alltables)
+ pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+ else
+ {
+ List *relids,
+ *schemarelids;
+
+ relids = GetPublicationRelations(pub_elem->oid,
+ pub_elem->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+ pub_elem->pubviaroot ?
+ PUBLICATION_PART_ROOT :
+ PUBLICATION_PART_LEAF);
+ pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
+ }
+
+ /*
+ * Record the published table and the corresponding publication so
+ * that we can get row filters and column lists later.
+ *
+ * When a table is published by multiple publications, to obtain
+ * all row filters and column lists, the structure related to this
+ * table will be recorded multiple times.
+ */
+ foreach(lc, pub_elem_tables)
+ {
+ published_rel *table_info = (published_rel *) palloc(sizeof(published_rel));
+
+ table_info->relid = lfirst_oid(lc);
+ table_info->pubid = pub_elem->oid;
+ table_infos = lappend(table_infos, table_info);
+ }
+
+ /* At least one publication is using publish_via_partition_root. */
+ if (pub_elem->pubviaroot)
+ viaroot = true;
}
+ /*
+ * If the publication publishes partition changes via their respective
+ * root partitioned tables, we must exclude partitions in favor of
+ * including the root partitioned tables. Otherwise, the function
+ * could return both the child and parent tables which could cause
+ * data of the child table to be double-published on the subscriber
+ * side.
+ */
+ if (viaroot)
+ filter_partitions(table_infos);
+
/* Construct a tuple descriptor for the result rows. */
tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
OIDOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
INT2VECTOROID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
PG_NODE_TREEOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
- funcctx->user_fctx = (void *) tables;
+ funcctx->user_fctx = (void *) table_infos;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
- tables = (List *) funcctx->user_fctx;
+ table_infos = (List *) funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(tables))
+ if (funcctx->call_cntr < list_length(table_infos))
{
HeapTuple pubtuple = NULL;
HeapTuple rettuple;
- Oid relid = list_nth_oid(tables, funcctx->call_cntr);
+ Publication *pub;
+ published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
+ Oid relid = table_info->relid;
Oid schemaid = get_rel_namespace(relid);
Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
@@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* Form tuple with appropriate data.
*/
- publication = GetPublicationByName(pubname, false);
+ pub = GetPublication(table_info->pubid);
- values[0] = ObjectIdGetDatum(relid);
+ values[0] = ObjectIdGetDatum(pub->oid);
+ values[1] = ObjectIdGetDatum(relid);
/*
* We don't consider row filters or column lists for FOR ALL TABLES or
* FOR TABLES IN SCHEMA publications.
*/
- if (!publication->alltables &&
+ if (!pub->alltables &&
!SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
ObjectIdGetDatum(schemaid),
- ObjectIdGetDatum(publication->oid)))
+ ObjectIdGetDatum(pub->oid)))
pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relid),
- ObjectIdGetDatum(publication->oid));
+ ObjectIdGetDatum(pub->oid));
if (HeapTupleIsValid(pubtuple))
{
/* Lookup the column list attribute. */
- values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+ values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
Anum_pg_publication_rel_prattrs,
- &(nulls[1]));
+ &(nulls[2]));
/* Null indicates no filter. */
- values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+ values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
Anum_pg_publication_rel_prqual,
- &(nulls[2]));
+ &(nulls[3]));
}
else
{
- nulls[1] = true;
nulls[2] = true;
+ nulls[3] = true;
}
/* Show all columns when the column list is not specified. */
- if (nulls[1] == true)
+ if (nulls[2])
{
Relation rel = table_open(relid, AccessShareLock);
int nattnums = 0;
@@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
if (nattnums > 0)
{
- values[1] = PointerGetDatum(buildint2vector(attnums, nattnums));
- nulls[1] = false;
+ values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
+ nulls[2] = false;
}
table_close(rel, AccessShareLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8a26ddab1c7..93a238412aa 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1936,21 +1936,60 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
+ Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
List *tablelist = NIL;
- bool check_columnlist = (walrcv_server_version(wrconn) >= 150000);
+ int server_version = walrcv_server_version(wrconn);
+ bool check_columnlist = (server_version >= 150000);
initStringInfo(&cmd);
- appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
- /* Get column lists for each relation if the publisher supports it */
- if (check_columnlist)
- appendStringInfoString(&cmd, ", t.attnames\n");
+ /* Get the list of tables from the publisher. */
+ if (server_version >= 160000)
+ {
+ StringInfoData pub_names;
- appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
- " WHERE t.pubname IN (");
- get_publications_str(publications, &cmd, true);
- appendStringInfoChar(&cmd, ')');
+ tableRow[2] = INT2VECTOROID;
+ initStringInfo(&pub_names);
+ get_publications_str(publications, &pub_names, true);
+
+ /*
+ * From version 16, we allowed passing multiple publications to the
+ * function pg_get_publication_tables. This helped to filter out the
+ * partition table whose ancestor is also published in this
+ * publication array.
+ *
+ * Join pg_get_publication_tables with pg_publication to exclude
+ * non-existing publications.
+ *
+ * Note that attrs are always stored in sorted order so we don't need
+ * to worry if different publications have specified them in a
+ * different order. See publication_translate_columns.
+ */
+ appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
+ " FROM pg_class c\n"
+ " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
+ " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+ " FROM pg_publication\n"
+ " WHERE pubname IN ( %s )) AS gpt\n"
+ " ON gpt.relid = c.oid\n",
+ pub_names.data);
+
+ pfree(pub_names.data);
+ }
+ else
+ {
+ tableRow[2] = NAMEARRAYOID;
+ appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
+
+ /* Get column lists for each relation if the publisher supports it */
+ if (check_columnlist)
+ appendStringInfoString(&cmd, ", t.attnames\n");
+
+ appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
+ " WHERE t.pubname IN (");
+ get_publications_str(publications, &cmd, true);
+ appendStringInfoChar(&cmd, ')');
+ }
res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
pfree(cmd.data);