diff options
author | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-04-07 18:13:13 +0200 |
---|---|---|
committer | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-04-07 20:06:36 +0200 |
commit | 2c7ea57e56ca5f668c32d4266e0a3e45b455bef5 (patch) | |
tree | c4b80357147f2212e571dd1a4522c2b73068a783 /src/backend/commands/subscriptioncmds.c | |
parent | d7ab2a9a3c0a2800ab36bb48d1cc97370067777e (diff) | |
download | postgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.tar.gz postgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.zip |
Revert "Logical decoding of sequences"
This reverts a sequence of commits, implementing features related to
logical decoding and replication of sequences:
- 0da92dc530c9251735fc70b20cd004d9630a1266
- 80901b32913ffa59bf157a4d88284b2b3a7511d9
- b779d7d8fdae088d70da5ed9fcd8205035676df3
- d5ed9da41d96988d905b49bebb273a9b2d6e2915
- a180c2b34de0989269fdb819bff241a249bf5380
- 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a
- 2d2232933b02d9396113662e44dca5f120d6830e
- 002c9dd97a0c874fd1693a570383e2dd38cd40d5
- 05843b1aa49df2ecc9b97c693b755bd1b6f856a9
The implementation has issues, mostly due to combining transactional and
non-transactional behavior of sequences. It's not clear how this could
be fixed, but it'll require reworking significant part of the patch.
Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 101 |
1 files changed, 13 insertions, 88 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 057ab4b6a3f..2e8d8afead8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -90,7 +90,6 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -639,9 +638,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *relations; + List *tables; ListCell *lc; - char sync_state; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -658,17 +657,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table and sequence list from publisher and build - * local relation sync status info. + * Get the table list from publisher and build local table status + * info. */ - relations = fetch_table_list(wrconn, publications); - relations = list_concat(relations, - fetch_sequence_list(wrconn, publications)); - - foreach(lc, relations) + tables = fetch_table_list(wrconn, publications); + foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -679,7 +675,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, sync_state, + AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -705,12 +701,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * * Note that if tables were specified but copy_data is false * then it is safe to enable two_phase up-front because those - * relations are already initially in READY state. When the - * subscription has no relations, we leave the twophase state - * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * tables are already initially in READY state. When the + * subscription has no tables, we leave the twophase state as + * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && relations != NIL) + if (opts.twophase && !opts.copy_data && tables != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -786,10 +782,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the list of relations from publisher. */ + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); - pubrel_names = list_concat(pubrel_names, - fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -1814,75 +1808,6 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } /* - * Get the list of sequences which belong to specified publications on the - * publisher connection. - */ -static List * -fetch_sequence_list(WalReceiverConn *wrconn, List *publications) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; - List *tablelist = NIL; - - Assert(list_length(publications) > 0); - - initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" - " FROM pg_catalog.pg_publication_sequences s\n" - " WHERE s.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } - appendStringInfoChar(&cmd, ')'); - - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive list of replicated sequences from the publisher: %s", - res->err))); - - /* Process sequences. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - char *nspname; - char *relname; - bool isnull; - RangeVar *rv; - - nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); - Assert(!isnull); - - rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); - - ExecClearTuple(slot); - } - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); - - return tablelist; -} - -/* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop * them manually, if required. |