aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c101
1 files changed, 13 insertions, 88 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 057ab4b6a3f..2e8d8afead8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -90,7 +90,6 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
-static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -639,9 +638,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
char *err;
WalReceiverConn *wrconn;
- List *relations;
+ List *tables;
ListCell *lc;
- char sync_state;
+ char table_state;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -658,17 +657,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not.
*/
- sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
- * Get the table and sequence list from publisher and build
- * local relation sync status info.
+ * Get the table list from publisher and build local table status
+ * info.
*/
- relations = fetch_table_list(wrconn, publications);
- relations = list_concat(relations,
- fetch_sequence_list(wrconn, publications));
-
- foreach(lc, relations)
+ tables = fetch_table_list(wrconn, publications);
+ foreach(lc, tables)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
@@ -679,7 +675,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- AddSubscriptionRelState(subid, relid, sync_state,
+ AddSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr);
}
@@ -705,12 +701,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
*
* Note that if tables were specified but copy_data is false
* then it is safe to enable two_phase up-front because those
- * relations are already initially in READY state. When the
- * subscription has no relations, we leave the twophase state
- * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
+ * tables are already initially in READY state. When the
+ * subscription has no tables, we leave the twophase state as
+ * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
- if (opts.twophase && !opts.copy_data && relations != NIL)
+ if (opts.twophase && !opts.copy_data && tables != NIL)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -786,10 +782,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if (validate_publications)
check_publications(wrconn, validate_publications);
- /* Get the list of relations from publisher. */
+ /* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
- pubrel_names = list_concat(pubrel_names,
- fetch_sequence_list(wrconn, sub->publications));
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid);
@@ -1814,75 +1808,6 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
}
/*
- * Get the list of sequences which belong to specified publications on the
- * publisher connection.
- */
-static List *
-fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
-{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[2] = {TEXTOID, TEXTOID};
- ListCell *lc;
- bool first;
- List *tablelist = NIL;
-
- Assert(list_length(publications) > 0);
-
- initStringInfo(&cmd);
- appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
- " FROM pg_catalog.pg_publication_sequences s\n"
- " WHERE s.pubname IN (");
- first = true;
- foreach(lc, publications)
- {
- char *pubname = strVal(lfirst(lc));
-
- if (first)
- first = false;
- else
- appendStringInfoString(&cmd, ", ");
-
- appendStringInfoString(&cmd, quote_literal_cstr(pubname));
- }
- appendStringInfoChar(&cmd, ')');
-
- res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
- pfree(cmd.data);
-
- if (res->status != WALRCV_OK_TUPLES)
- ereport(ERROR,
- (errmsg("could not receive list of replicated sequences from the publisher: %s",
- res->err)));
-
- /* Process sequences. */
- slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
- {
- char *nspname;
- char *relname;
- bool isnull;
- RangeVar *rv;
-
- nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
- Assert(!isnull);
- relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
- Assert(!isnull);
-
- rv = makeRangeVar(nspname, relname, -1);
- tablelist = lappend(tablelist, rv);
-
- ExecClearTuple(slot);
- }
- ExecDropSingleTupleTableSlot(slot);
-
- walrcv_clear_result(res);
-
- return tablelist;
-}
-
-/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
* them manually, if required.