aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2022-04-07 18:13:13 +0200
committerTomas Vondra <tomas.vondra@postgresql.org>2022-04-07 20:06:36 +0200
commit2c7ea57e56ca5f668c32d4266e0a3e45b455bef5 (patch)
treec4b80357147f2212e571dd1a4522c2b73068a783 /src/backend/commands/subscriptioncmds.c
parentd7ab2a9a3c0a2800ab36bb48d1cc97370067777e (diff)
downloadpostgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.tar.gz
postgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.zip
Revert "Logical decoding of sequences"
This reverts a sequence of commits, implementing features related to logical decoding and replication of sequences: - 0da92dc530c9251735fc70b20cd004d9630a1266 - 80901b32913ffa59bf157a4d88284b2b3a7511d9 - b779d7d8fdae088d70da5ed9fcd8205035676df3 - d5ed9da41d96988d905b49bebb273a9b2d6e2915 - a180c2b34de0989269fdb819bff241a249bf5380 - 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a - 2d2232933b02d9396113662e44dca5f120d6830e - 002c9dd97a0c874fd1693a570383e2dd38cd40d5 - 05843b1aa49df2ecc9b97c693b755bd1b6f856a9 The implementation has issues, mostly due to combining transactional and non-transactional behavior of sequences. It's not clear how this could be fixed, but it'll require reworking significant part of the patch. Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c101
1 files changed, 13 insertions, 88 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 057ab4b6a3f..2e8d8afead8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -90,7 +90,6 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
-static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -639,9 +638,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
char *err;
WalReceiverConn *wrconn;
- List *relations;
+ List *tables;
ListCell *lc;
- char sync_state;
+ char table_state;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -658,17 +657,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not.
*/
- sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
- * Get the table and sequence list from publisher and build
- * local relation sync status info.
+ * Get the table list from publisher and build local table status
+ * info.
*/
- relations = fetch_table_list(wrconn, publications);
- relations = list_concat(relations,
- fetch_sequence_list(wrconn, publications));
-
- foreach(lc, relations)
+ tables = fetch_table_list(wrconn, publications);
+ foreach(lc, tables)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
@@ -679,7 +675,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- AddSubscriptionRelState(subid, relid, sync_state,
+ AddSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr);
}
@@ -705,12 +701,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
*
* Note that if tables were specified but copy_data is false
* then it is safe to enable two_phase up-front because those
- * relations are already initially in READY state. When the
- * subscription has no relations, we leave the twophase state
- * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
+ * tables are already initially in READY state. When the
+ * subscription has no tables, we leave the twophase state as
+ * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
- if (opts.twophase && !opts.copy_data && relations != NIL)
+ if (opts.twophase && !opts.copy_data && tables != NIL)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -786,10 +782,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if (validate_publications)
check_publications(wrconn, validate_publications);
- /* Get the list of relations from publisher. */
+ /* Get the table list from publisher. */
pubrel_names = fetch_table_list(wrconn, sub->publications);
- pubrel_names = list_concat(pubrel_names,
- fetch_sequence_list(wrconn, sub->publications));
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid);
@@ -1814,75 +1808,6 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
}
/*
- * Get the list of sequences which belong to specified publications on the
- * publisher connection.
- */
-static List *
-fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
-{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[2] = {TEXTOID, TEXTOID};
- ListCell *lc;
- bool first;
- List *tablelist = NIL;
-
- Assert(list_length(publications) > 0);
-
- initStringInfo(&cmd);
- appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
- " FROM pg_catalog.pg_publication_sequences s\n"
- " WHERE s.pubname IN (");
- first = true;
- foreach(lc, publications)
- {
- char *pubname = strVal(lfirst(lc));
-
- if (first)
- first = false;
- else
- appendStringInfoString(&cmd, ", ");
-
- appendStringInfoString(&cmd, quote_literal_cstr(pubname));
- }
- appendStringInfoChar(&cmd, ')');
-
- res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
- pfree(cmd.data);
-
- if (res->status != WALRCV_OK_TUPLES)
- ereport(ERROR,
- (errmsg("could not receive list of replicated sequences from the publisher: %s",
- res->err)));
-
- /* Process sequences. */
- slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
- {
- char *nspname;
- char *relname;
- bool isnull;
- RangeVar *rv;
-
- nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
- Assert(!isnull);
- relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
- Assert(!isnull);
-
- rv = makeRangeVar(nspname, relname, -1);
- tablelist = lappend(tablelist, rv);
-
- ExecClearTuple(slot);
- }
- ExecDropSingleTupleTableSlot(slot);
-
- walrcv_clear_result(res);
-
- return tablelist;
-}
-
-/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
* them manually, if required.