aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/catalog/pg_subscription.c31
-rw-r--r--src/backend/commands/subscriptioncmds.c60
-rw-r--r--src/backend/replication/logical/tablesync.c35
-rw-r--r--src/include/catalog/pg_subscription.h5
4 files changed, 58 insertions, 73 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc9159f2c..89bf5ec9337 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -35,6 +35,37 @@
static List *textarray_to_stringlist(ArrayType *textarray);
/*
+ * Add a comma-separated list of publication names to the 'dest' string.
+ */
+void
+GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
+{
+ ListCell *lc;
+ bool first = true;
+
+ Assert(publications != NIL);
+
+ foreach(lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(dest, ", ");
+
+ if (quote_literal)
+ appendStringInfoString(dest, quote_literal_cstr(pubname));
+ else
+ {
+ appendStringInfoChar(dest, '"');
+ appendStringInfoString(dest, pubname);
+ appendStringInfoChar(dest, '"');
+ }
+ }
+}
+
+/*
* Fetch the subscription from the syscache.
*/
Subscription *
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b80..33575614e72 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -440,37 +440,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
}
/*
- * Add publication names from the list to a string.
- */
-static void
-get_publications_str(List *publications, StringInfo dest, bool quote_literal)
-{
- ListCell *lc;
- bool first = true;
-
- Assert(publications != NIL);
-
- foreach(lc, publications)
- {
- char *pubname = strVal(lfirst(lc));
-
- if (first)
- first = false;
- else
- appendStringInfoString(dest, ", ");
-
- if (quote_literal)
- appendStringInfoString(dest, quote_literal_cstr(pubname));
- else
- {
- appendStringInfoChar(dest, '"');
- appendStringInfoString(dest, pubname);
- appendStringInfoChar(dest, '"');
- }
- }
-}
-
-/*
* Check that the specified publications are present on the publisher.
*/
static void
@@ -486,7 +455,7 @@ check_publications(WalReceiverConn *wrconn, List *publications)
appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
" pg_catalog.pg_publication t WHERE\n"
" t.pubname IN (");
- get_publications_str(publications, cmd, true);
+ GetPublicationsStr(publications, cmd, true);
appendStringInfoChar(cmd, ')');
res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
@@ -523,7 +492,7 @@ check_publications(WalReceiverConn *wrconn, List *publications)
/* Prepare the list of non-existent publication(s) for error message. */
StringInfo pubnames = makeStringInfo();
- get_publications_str(publicationsCopy, pubnames, false);
+ GetPublicationsStr(publicationsCopy, pubnames, false);
ereport(WARNING,
errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg_plural("publication %s does not exist on the publisher",
@@ -2151,7 +2120,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
" JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
" pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
"WHERE C.oid = GPT.relid AND P.pubname IN (");
- get_publications_str(publications, &cmd, true);
+ GetPublicationsStr(publications, &cmd, true);
appendStringInfoString(&cmd, ")\n");
/*
@@ -2208,7 +2177,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
StringInfo pubnames = makeStringInfo();
/* Prepare the list of publication(s) for warning message. */
- get_publications_str(publist, pubnames, false);
+ GetPublicationsStr(publist, pubnames, false);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
@@ -2243,17 +2212,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
List *tablelist = NIL;
int server_version = walrcv_server_version(wrconn);
bool check_columnlist = (server_version >= 150000);
+ StringInfo pub_names = makeStringInfo();
initStringInfo(&cmd);
+ /* Build the pub_names comma-separated string. */
+ GetPublicationsStr(publications, pub_names, true);
+
/* Get the list of tables from the publisher. */
if (server_version >= 160000)
{
- StringInfoData pub_names;
-
tableRow[2] = INT2VECTOROID;
- initStringInfo(&pub_names);
- get_publications_str(publications, &pub_names, true);
/*
* From version 16, we allowed passing multiple publications to the
@@ -2275,9 +2244,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
" FROM pg_publication\n"
" WHERE pubname IN ( %s )) AS gpt\n"
" ON gpt.relid = c.oid\n",
- pub_names.data);
-
- pfree(pub_names.data);
+ pub_names->data);
}
else
{
@@ -2288,12 +2255,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
if (check_columnlist)
appendStringInfoString(&cmd, ", t.attnames\n");
- appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
- " WHERE t.pubname IN (");
- get_publications_str(publications, &cmd, true);
- appendStringInfoChar(&cmd, ')');
+ appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
+ " WHERE t.pubname IN ( %s )",
+ pub_names->data);
}
+ destroyStringInfo(pub_names);
+
res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
pfree(cmd.data);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e03e7613926..d4b5d210e3e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -802,7 +802,7 @@ fetch_remote_table_info(char *nspname, char *relname,
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
- ListCell *lc;
+ StringInfo pub_names = NULL;
Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
@@ -856,15 +856,10 @@ fetch_remote_table_info(char *nspname, char *relname,
WalRcvExecResult *pubres;
TupleTableSlot *tslot;
Oid attrsRow[] = {INT2VECTOROID};
- StringInfoData pub_names;
- initStringInfo(&pub_names);
- foreach(lc, MySubscription->publications)
- {
- if (foreach_current_index(lc) > 0)
- appendStringInfoString(&pub_names, ", ");
- appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
- }
+ /* Build the pub_names comma-separated string. */
+ pub_names = makeStringInfo();
+ GetPublicationsStr(MySubscription->publications, pub_names, true);
/*
* Fetch info about column lists for the relation (from all the
@@ -881,7 +876,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" WHERE gpt.relid = %u AND c.oid = gpt.relid"
" AND p.pubname IN ( %s )",
lrel->remoteid,
- pub_names.data);
+ pub_names->data);
pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
lengthof(attrsRow), attrsRow);
@@ -936,8 +931,6 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecDropSingleTupleTableSlot(tslot);
walrcv_clear_result(pubres);
-
- pfree(pub_names.data);
}
/*
@@ -1039,19 +1032,8 @@ fetch_remote_table_info(char *nspname, char *relname,
*/
if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
{
- StringInfoData pub_names;
-
- /* Build the pubname list. */
- initStringInfo(&pub_names);
- foreach_node(String, pubstr, MySubscription->publications)
- {
- char *pubname = strVal(pubstr);
-
- if (foreach_current_index(pubstr) > 0)
- appendStringInfoString(&pub_names, ", ");
-
- appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
- }
+ /* Reuse the already-built pub_names. */
+ Assert(pub_names != NULL);
/* Check for row filters. */
resetStringInfo(&cmd);
@@ -1062,7 +1044,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" WHERE gpt.relid = %u"
" AND p.pubname IN ( %s )",
lrel->remoteid,
- pub_names.data);
+ pub_names->data);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
@@ -1101,6 +1083,7 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
+ destroyStringInfo(pub_names);
}
pfree(cmd.data);
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a27..b25f3fea566 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -20,7 +20,7 @@
#include "access/xlogdefs.h"
#include "catalog/genbki.h"
#include "catalog/pg_subscription_d.h"
-
+#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/*
@@ -180,4 +180,7 @@ extern void DisableSubscription(Oid subid);
extern int CountDBSubscriptions(Oid dbid);
+extern void GetPublicationsStr(List *publications, StringInfo dest,
+ bool quote_literal);
+
#endif /* PG_SUBSCRIPTION_H */