diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xact.c | 11 | ||||
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 38 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 467 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 147 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 236 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 18 | ||||
-rw-r--r-- | src/backend/tcop/utility.c | 3 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_subscription_rel.h | 2 | ||||
-rw-r--r-- | src/include/commands/subscriptioncmds.h | 2 | ||||
-rw-r--r-- | src/include/replication/logicallauncher.h | 2 | ||||
-rw-r--r-- | src/include/replication/slot.h | 3 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 1 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/subscription.out | 21 | ||||
-rw-r--r-- | src/test/regress/sql/subscription.sql | 22 | ||||
-rw-r--r-- | src/test/subscription/t/004_sync.pl | 21 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 2 |
19 files changed, 707 insertions, 302 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a2068e3fd45..3c8b4eb3622 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2432,15 +2432,6 @@ PrepareTransaction(void) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE a transaction that has exported snapshots"))); - /* - * Don't allow PREPARE but for transaction that has/might kill logical - * replication workers. - */ - if (XactManipulatesLogicalReplicationWorkers()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has manipulated logical replication workers"))); - /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -4899,7 +4890,6 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); - AtEOSubXact_ApplyLauncher(true, s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -5059,7 +5049,6 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); - AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } /* diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 44cb285b686..c32fc8137d8 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -29,6 +29,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -337,6 +338,13 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) char substate; bool isnull; Datum d; + Relation rel; + + /* + * This is to avoid the race condition with AlterSubscription which tries + * to remove this relstate. + */ + rel = table_open(SubscriptionRelRelationId, AccessShareLock); /* Try finding the mapping. */ tup = SearchSysCache2(SUBSCRIPTIONRELMAP, @@ -363,6 +371,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) /* Cleanup */ ReleaseSysCache(tup); + table_close(rel, AccessShareLock); + return substate; } @@ -403,6 +413,34 @@ RemoveSubscriptionRel(Oid subid, Oid relid) scan = table_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { + Form_pg_subscription_rel subrel; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + /* + * We don't allow to drop the relation mapping when the table + * synchronization is in progress unless the caller updates the + * corresponding subscription as well. This is to ensure that we don't + * leave tablesync slots or origins in the system when the + * corresponding table is dropped. + */ + if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not drop relation mapping for subscription \"%s\"", + get_subscription_name(subrel->srsubid, false)), + errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".", + get_rel_name(relid), subrel->srsubstate), + /* + * translator: first %s is a SQL ALTER command and second %s is a + * SQL DROP command + */ + errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.", + "ALTER SUBSCRIPTION ... ENABLE", + "DROP SUBSCRIPTION ..."))); + } + CatalogTupleDelete(rel, &tup->t_self); } table_endscan(scan); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5ccbc9dd50f..5cf874e0b46 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/worker_internal.h" @@ -46,6 +47,8 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); + /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) Oid *pubrel_local_oids; ListCell *lc; int off; + int remove_rel_len; + Relation rel = NULL; + typedef struct SubRemoveRels + { + Oid relid; + char state; + } SubRemoveRels; + SubRemoveRels *sub_remove_rels; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - - /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + PG_TRY(); + { + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); - /* We are done with the remote side, close connection. */ - walrcv_disconnect(wrconn); + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid); + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); - /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup is - * important. - */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); - off = 0; - foreach(lc, subrel_states) - { - SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup + * is important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - subrel_local_oids[off++] = relstate->relid; - } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + /* + * Rels that we want to remove from subscription and drop any slots + * and origins corresponding to them. + */ + sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + + /* + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for + * it. + * + * Also builds array of local oids of remote tables for the next step. + */ + off = 0; + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; - /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for it. - * - * Also builds array of local oids of remote tables for the next step. - */ - off = 0; - pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + relid = RangeVarGetRelid(rv, AccessShareLock, false); - foreach(lc, pubrel_names) - { - RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); - relid = RangeVarGetRelid(rv, AccessShareLock, false); + pubrel_local_oids[off++] = relid; - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } - pubrel_local_oids[off++] = relid; + /* + * Next remove state for tables we should not care about anymore using + * the data we collected above + */ + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + remove_rel_len = 0; + for (off = 0; off < list_length(subrel_states); off++) { - AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); - } - } + Oid relid = subrel_local_oids[off]; - /* - * Next remove state for tables we should not care about anymore using the - * data we collected above - */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + char state; + XLogRecPtr statelsn; + + /* + * Lock pg_subscription_rel with AccessExclusiveLock to + * prevent any race conditions with the apply worker + * re-launching workers at the same time this code is trying + * to remove those tables. + * + * Even if new worker for this particular rel is restarted it + * won't be able to make any progress as we hold exclusive + * lock on subscription_rel till the transaction end. It will + * simply exit as there is no corresponding rel entry. + * + * This locking also ensures that the state of rels won't + * change till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + /* Last known rel state. */ + state = GetSubscriptionRelState(sub->oid, relid, &statelsn); + + sub_remove_rels[remove_rel_len].relid = relid; + sub_remove_rels[remove_rel_len++].state = state; + + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop(sub->oid, relid); + + /* + * For READY state, we would have already dropped the + * tablesync origin. + */ + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created for + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The apply worker can also + * concurrently try to drop the origin and by this time + * the origin might be already removed. For these reasons, + * passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(sub->oid, relid, originname); + replorigin_drop_by_name(originname, true, false); + } - for (off = 0; off < list_length(subrel_states); off++) - { - Oid relid = subrel_local_oids[off]; + ereport(DEBUG1, + (errmsg("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + } - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + /* + * Drop the tablesync slots associated with removed tables. This has + * to be at the end because otherwise if there is an error while doing + * the database operations we won't be able to rollback dropped slots. + */ + for (off = 0; off < remove_rel_len; off++) { - RemoveSubscriptionRel(sub->oid, relid); - - logicalrep_worker_stop_at_commit(sub->oid, relid); - - ereport(DEBUG1, - (errmsg("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + if (sub_remove_rels[off].state != SUBREL_STATE_READY && + sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + /* + * For READY/SYNCDONE states we know the tablesync slot has + * already been dropped by the tablesync worker. + * + * For other states, there is no certainty, maybe the slot + * does not exist yet. Also, if we fail after removing some of + * the slots, next time, it will again try to drop already + * dropped slots and fail. For these reasons, we allow + * missing_ok = true for the drop. + */ + ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); + } } } + PG_FINALLY(); + { + if (wrconn) + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + if (rel) + table_close(rel, NoLock); } /* * Alter the existing subscription. */ ObjectAddress -AlterSubscription(AlterSubscriptionStmt *stmt) +AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { Relation rel; ObjectAddress myself; @@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, NULL, /* no "binary" */ NULL, NULL); /* no "streaming" */ + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); + AlterSubscription_refresh(sub, copy_data); break; @@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; char *err = NULL; WalReceiverConn *wrconn = NULL; - StringInfoData cmd; Form_pg_subscription form; + List *rstates; /* * Lock pg_subscription with AccessExclusiveLock to ensure that the @@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Cleanup of tablesync replication origins. + * + * Any READY-state relations would already have dealt with clean-ups. + * + * Note that the state can't change because we have already stopped both + * the apply and tablesync workers and they can't restart because of + * exclusive lock on the subscription. + */ + rstates = GetSubscriptionNotReadyRelations(subid); + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created for tablesync + * worker so passing missing_ok = true. This can happen for the states + * before SUBREL_STATE_FINISHEDCOPY. + */ + ReplicationOriginNameForTablesync(subid, relid, originname); + replorigin_drop_by_name(originname, true, false); + } + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); @@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * If there is no slot associated with the subscription, we can finish * here. */ - if (!slotname) + if (!slotname && rstates == NIL) { table_close(rel, NoLock); return; } /* - * Otherwise drop the replication slot at the publisher node using the - * replication connection. + * Try to acquire the connection necessary for dropping slots. + * + * Note: If the slotname is NONE/NULL then we allow the command to finish + * and users need to manually cleanup the apply and tablesync worker slots + * later. + * + * This has to be at the end because otherwise if there is an error while + * doing the database operations we won't be able to rollback dropped + * slot. */ load_file("libpqwalreceiver", false); - initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); - wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err), - /* translator: %s is an SQL ALTER command */ - errhint("Use %s to disassociate the subscription from the slot.", - "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); + { + if (!slotname) + { + /* be tidy */ + list_free(rstates); + table_close(rel, NoLock); + return; + } + else + { + ReportSlotConnectionError(rstates, subid, slotname, err); + } + } + + PG_TRY(); + { + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Drop the tablesync slots associated with removed tables. + * + * For SYNCDONE/READY states, the tablesync slot is known to have + * already been dropped by the tablesync worker. + * + * For other states, there is no certainty, maybe the slot does + * not exist yet. Also, if we fail after removing some of the + * slots, next time, it will again try to drop already dropped + * slots and fail. For these reasons, we allow missing_ok = true + * for the drop. + */ + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + ReplicationSlotNameForTablesync(subid, relid, syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); + } + } + + list_free(rstates); + + /* + * If there is a slot associated with the subscription, then drop the + * replication slot at the publisher. + */ + if (slotname) + ReplicationSlotDropAtPubNode(wrconn, slotname, false); + + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + table_close(rel, NoLock); +} + +/* + * Drop the replication slot at the publisher node using the replication + * connection. + * + * missing_ok - if true then only issue a LOG message if the slot doesn't + * exist. + */ +void +ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok) +{ + StringInfoData cmd; + + Assert(wrconn); + + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); PG_TRY(); { @@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) res = walrcv_exec(wrconn, cmd.data, 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, + if (res->status == WALRCV_OK_COMMAND) + { + /* NOTICE. Success. */ + ereport(NOTICE, + (errmsg("dropped replication slot \"%s\" on publisher", + slotname))); + } + else if (res->status == WALRCV_ERROR && + missing_ok && + res->sqlstate == ERRCODE_UNDEFINED_OBJECT) + { + /* LOG. Error, but missing_ok = true. */ + ereport(LOG, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), errdetail("The error was: %s", res->err))); + } else - ereport(NOTICE, - (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); + { + /* ERROR. */ + ereport(ERROR, + (errmsg("could not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", res->err))); + } walrcv_clear_result(res); } PG_FINALLY(); { - walrcv_disconnect(wrconn); + pfree(cmd.data); } PG_END_TRY(); - - pfree(cmd.data); - - table_close(rel, NoLock); } /* @@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * This is to report the connection failure while dropping replication slots. + * Here, we report the WARNING for all tablesync slots so that user can drop + * them manually, if required. + */ +static void +ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) +{ + ListCell *lc; + + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Caller needs to ensure that relstate doesn't change underneath us. + * See DropSubscription where we get the relstates. + */ + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + ReplicationSlotNameForTablesync(subid, relid, syncslotname); + elog(WARNING, "could not drop tablesync replication slot \"%s\"", + syncslotname); + } + } + + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", slotname), + errdetail("The error was: %s", err), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s to disassociate the subscription from the slot.", + "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); +} diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e9582748617..77146961408 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -982,6 +982,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, { PGresult *pgres = NULL; WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult)); + char *diag_sqlstate; if (MyDatabaseId == InvalidOid) ereport(ERROR, @@ -1025,6 +1026,13 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, case PGRES_BAD_RESPONSE: walres->status = WALRCV_ERROR; walres->err = pchomp(PQerrorMessage(conn->streamConn)); + diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE); + if (diag_sqlstate) + walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); break; } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 186514cd9ed..58082dde186 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,20 +73,6 @@ typedef struct LogicalRepWorkerId Oid relid; } LogicalRepWorkerId; -typedef struct StopWorkersData -{ - int nestDepth; /* Sub-transaction nest level */ - List *workers; /* List of LogicalRepWorkerId */ - struct StopWorkersData *parent; /* This need not be an immediate - * subtransaction parent */ -} StopWorkersData; - -/* - * Stack of StopWorkersData elements. Each stack element contains the workers - * to be stopped for that subtransaction. - */ -static StopWorkersData *on_commit_stop_workers = NULL; - static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -547,51 +533,6 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Request worker for specified sub/rel to be stopped on commit. - */ -void -logicalrep_worker_stop_at_commit(Oid subid, Oid relid) -{ - int nestDepth = GetCurrentTransactionNestLevel(); - LogicalRepWorkerId *wid; - MemoryContext oldctx; - - /* Make sure we store the info in context that survives until commit. */ - oldctx = MemoryContextSwitchTo(TopTransactionContext); - - /* Check that previous transactions were properly cleaned up. */ - Assert(on_commit_stop_workers == NULL || - nestDepth >= on_commit_stop_workers->nestDepth); - - /* - * Push a new stack element if we don't already have one for the current - * nestDepth. - */ - if (on_commit_stop_workers == NULL || - nestDepth > on_commit_stop_workers->nestDepth) - { - StopWorkersData *newdata = palloc(sizeof(StopWorkersData)); - - newdata->nestDepth = nestDepth; - newdata->workers = NIL; - newdata->parent = on_commit_stop_workers; - on_commit_stop_workers = newdata; - } - - /* - * Finally add a new worker into the worker list of the current - * subtransaction. - */ - wid = palloc(sizeof(LogicalRepWorkerId)); - wid->subid = subid; - wid->relid = relid; - on_commit_stop_workers->workers = - lappend(on_commit_stop_workers->workers, wid); - - MemoryContextSwitchTo(oldctx); -} - -/* * Wake up (using latch) any logical replication worker for specified sub/rel. */ void @@ -820,109 +761,21 @@ ApplyLauncherShmemInit(void) } /* - * Check whether current transaction has manipulated logical replication - * workers. - */ -bool -XactManipulatesLogicalReplicationWorkers(void) -{ - return (on_commit_stop_workers != NULL); -} - -/* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { - - Assert(on_commit_stop_workers == NULL || - (on_commit_stop_workers->nestDepth == 1 && - on_commit_stop_workers->parent == NULL)); - if (isCommit) { - ListCell *lc; - - if (on_commit_stop_workers != NULL) - { - List *workers = on_commit_stop_workers->workers; - - foreach(lc, workers) - { - LogicalRepWorkerId *wid = lfirst(lc); - - logicalrep_worker_stop(wid->subid, wid->relid); - } - } - if (on_commit_launcher_wakeup) ApplyLauncherWakeup(); } - /* - * No need to pfree on_commit_stop_workers. It was allocated in - * transaction memory context, which is going to be cleaned soon. - */ - on_commit_stop_workers = NULL; on_commit_launcher_wakeup = false; } /* - * On commit, merge the current on_commit_stop_workers list into the - * immediate parent, if present. - * On rollback, discard the current on_commit_stop_workers list. - * Pop out the stack. - */ -void -AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) -{ - StopWorkersData *parent; - - /* Exit immediately if there's no work to do at this level. */ - if (on_commit_stop_workers == NULL || - on_commit_stop_workers->nestDepth < nestDepth) - return; - - Assert(on_commit_stop_workers->nestDepth == nestDepth); - - parent = on_commit_stop_workers->parent; - - if (isCommit) - { - /* - * If the upper stack element is not an immediate parent - * subtransaction, just decrement the notional nesting depth without - * doing any real work. Else, we need to merge the current workers - * list into the parent. - */ - if (!parent || parent->nestDepth < nestDepth - 1) - { - on_commit_stop_workers->nestDepth--; - return; - } - - parent->workers = - list_concat(parent->workers, on_commit_stop_workers->workers); - } - else - { - /* - * Abandon everything that was done at this nesting level. Explicitly - * free memory to avoid a transaction-lifespan leak. - */ - list_free_deep(on_commit_stop_workers->workers); - } - - /* - * We have taken care of the current subtransaction workers list for both - * abort or commit. So we are ready to pop the stack. - */ - pfree(on_commit_stop_workers); - on_commit_stop_workers = parent; -} - -/* * Request wakeup of the launcher on commit of the transaction. * * This is used to send launcher signal to stop sleeping and process the diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ccbdbcf08f9..19cc8046786 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -31,8 +31,11 @@ * table state to INIT. * - Tablesync worker starts; changes table state from INIT to DATASYNC while * copying. - * - Tablesync worker finishes the copy and sets table state to SYNCWAIT; - * waits for state change. + * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync + * worker specific) state to indicate when the copy phase has completed, so + * if the worker crashes with this (non-memory) state then the copy will not + * be re-attempted. + * - Tablesync worker then sets table state to SYNCWAIT; waits for state change. * - Apply worker periodically checks for tables in SYNCWAIT state. When * any appear, it sets the table state to CATCHUP and starts loop-waiting * until either the table state is set to SYNCDONE or the sync worker @@ -48,8 +51,8 @@ * point it sets state to READY and stops tracking. Again, there might * be zero changes in between. * - * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> - * CATCHUP -> SYNCDONE -> READY. + * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY + * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about * subscribed tables and their state. The catalog holds all states @@ -58,6 +61,7 @@ * Example flows look like this: * - Apply is in front: * sync:8 + * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:10 * -> set in memory CATCHUP @@ -73,6 +77,7 @@ * * - Sync is in front: * sync:10 + * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:8 * -> set in memory CATCHUP @@ -101,7 +106,10 @@ #include "replication/logicalrelation.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "replication/slot.h" +#include "replication/origin.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -269,26 +277,52 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - Assert(IsTransactionState()); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) { TimeLineID tli; + char syncslotname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * UpdateSubscriptionRelState must be called within a transaction. + * That transaction will be ended within the finish_sync_worker(). + */ + if (!IsTransactionState()) + StartTransactionCommand(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); + /* End wal streaming so wrconn can be re-used to drop the slot. */ walrcv_endstreaming(wrconn, &tli); + + /* + * Cleanup the tablesync slot. + * + * This has to be done after updating the state because otherwise if + * there is an error while doing the database operations we won't be + * able to rollback dropped slot. + */ + ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + syncslotname); + + /* + * It is important to give an error if we are unable to drop the slot, + * otherwise, it won't be dropped till the corresponding subscription + * is dropped. So passing missing_ok = false. + */ + ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); + finish_sync_worker(); } else @@ -403,6 +437,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { + char originname[NAMEDATALEN]; + rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -411,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } + /* + * Remove the tablesync origin tracking if exists. + * + * The normal case origin drop is done here instead of in the + * process_syncing_tables_for_sync function because we don't + * allow to drop the origin till the process owning the origin + * is alive. + * + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + rstate->relid, + originname); + replorigin_drop_by_name(originname, true, false); + + /* + * Update the state to READY only after the origin cleanup. + */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); @@ -806,6 +863,50 @@ copy_table(Relation rel) } /* + * Determine the tablesync slot name. + * + * The name must not exceed NAMEDATALEN - 1 because of remote node constraints + * on slot name length. We append system_identifier to avoid slot_name + * collision with subscriptions in other clusters. With the current scheme + * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum + * length of slot_name will be 50. + * + * The returned slot name is either: + * - stored in the supplied buffer (syncslotname), or + * - palloc'ed in current memory context (if syncslotname = NULL). + * + * Note: We don't use the subscription slot name as part of tablesync slot name + * because we are responsible for cleaning up these slots and it could become + * impossible to recalculate what name to cleanup if the subscription slot name + * had changed. + */ +char * +ReplicationSlotNameForTablesync(Oid suboid, Oid relid, + char syncslotname[NAMEDATALEN]) +{ + if (syncslotname) + sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid, + GetSystemIdentifier()); + else + syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid, + GetSystemIdentifier()); + + return syncslotname; +} + +/* + * Form the origin name for tablesync. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForTablesync(Oid suboid, Oid relid, + char originname[NAMEDATALEN]) +{ + snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid); +} + +/* * Start syncing the table in the sync worker. * * If nothing needs to be done to sync the table, we exit the worker without @@ -822,6 +923,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) XLogRecPtr relstate_lsn; Relation rel; WalRcvExecResult *res; + char originname[NAMEDATALEN]; + RepOriginId originid; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -847,19 +950,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) finish_sync_worker(); /* doesn't return */ } - /* - * To build a slot name for the sync work, we are limited to NAMEDATALEN - - * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars - * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the - * NAMEDATALEN on the remote that matters, but this scheme will also work - * reasonably if that is different.) - */ - StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ - slotname = psprintf("%.*s_%u_sync_%u", - NAMEDATALEN - 28, - MySubscription->slotname, - MySubscription->oid, - MyLogicalRepWorker->relid); + /* Calculate the name of the tablesync slot. */ + slotname = ReplicationSlotNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + NULL /* use palloc */ ); /* * Here we use the slot name instead of the subscription name as the @@ -872,7 +966,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) (errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || - MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC); + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || + MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); + + /* Assign the origin tracking record name. */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) + { + /* + * We have previously errored out before finishing the copy so the + * replication slot might exist. We want to remove the slot if it + * already exists and proceed. + * + * XXX We could also instead try to drop the slot, last time we failed + * but for that, we might need to clean up the copy state as it might + * be in the middle of fetching the rows. Also, if there is a network + * breakdown then it wouldn't have succeeded so trying it next time + * seems like a better bet. + */ + ReplicationSlotDropAtPubNode(wrconn, slotname, true); + } + else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) + { + /* + * The COPY phase was previously done, but tablesync then crashed + * before it was able to finish normally. + */ + StartTransactionCommand(); + + /* + * The origin tracking name must already exist. It was created first + * time this tablesync was launched. + */ + originid = replorigin_by_name(originname, false); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + + CommitTransactionCommand(); + + goto copy_table_done; + } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -888,9 +1025,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); pgstat_report_stat(false); - /* - * We want to do the table data sync in a single transaction. - */ StartTransactionCommand(); /* @@ -916,13 +1050,46 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_clear_result(res); /* - * Create a new temporary logical decoding slot. This slot will be used + * Create a new permanent logical decoding slot. This slot will be used * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ - walrcv_create_slot(wrconn, slotname, true, + walrcv_create_slot(wrconn, slotname, false /* permanent */ , CRS_USE_SNAPSHOT, origin_startpos); + /* + * Setup replication origin tracking. The purpose of doing this before the + * copy is to avoid doing the copy again due to any error in setting up + * origin tracking. + */ + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist, so create it now. + * + * Then advance to the LSN got from walrcv_create_slot. This is WAL + * logged for the purpose of recovery. Locks are to prevent the + * replication origin from vanishing while advancing. + */ + originid = replorigin_create(originname); + + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + replorigin_session_setup(originid); + replorigin_session_origin = originid; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin \"%s\" already exists", + originname))); + } + /* Now do the initial data copy */ PushActiveSnapshot(GetTransactionSnapshot()); copy_table(rel); @@ -941,6 +1108,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommandCounterIncrement(); /* + * Update the persisted state to indicate the COPY phase is done; make it + * visible to others. + */ + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relstate_lsn); + + CommitTransactionCommand(); + +copy_table_done: + + elog(DEBUG1, + "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", + originname, + (uint32) (*origin_startpos >> 32), + (uint32) *origin_startpos); + + /* * We are done with the initial data synchronization, update the state. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eb7db89cef7..cfc924cd893 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* The synchronization worker runs in single transaction. */ - if (!am_tablesync_worker()) - { - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - } + /* Commit the per-stream transaction */ + CommitTransactionCommand(); in_streamed_transaction = false; @@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s) /* Cleanup the subxact info */ cleanup_subxact_info(); - /* The synchronization worker runs in single transaction */ - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); return; } @@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); } } @@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) { - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 1d81071c357..05bb698cf45 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1786,7 +1786,8 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_AlterSubscriptionStmt: - address = AlterSubscription((AlterSubscriptionStmt *) parsetree); + address = AlterSubscription((AlterSubscriptionStmt *) parsetree, + isTopLevel); break; case T_DropSubscriptionStmt: diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 638830aaac1..2efd937e12e 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202102021 +#define CATALOG_VERSION_NO 202102121 #endif diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 2bea2c52aa7..ed94f57baa1 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn * NULL) */ +#define SUBREL_STATE_FINISHEDCOPY 'f' /* tablesync copy phase is completed + * (sublsn NULL) */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of * apply (sublsn set) */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index a81865079d1..3b926f35d76 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -20,7 +20,7 @@ extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel); -extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt); +extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel); extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 421ec1580d8..301e494f7ba 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -22,9 +22,7 @@ extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); -extern bool XactManipulatesLogicalReplicationWorkers(void); extern void AtEOXact_ApplyLauncher(bool isCommit); -extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 53f636c56f5..5f52335f15f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,6 +15,7 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" +#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4313f516d35..a97a59a6a30 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -210,6 +210,7 @@ typedef enum typedef struct WalRcvExecResult { WalRcvExecStatus status; + int sqlstate; char *err; Tuplestorestate *tuplestore; TupleDesc tupledesc; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index d046022e49c..4a5adc2fdac 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -77,13 +77,14 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid); extern void logicalrep_worker_stop(Oid subid, Oid relid); -extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); +extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); + void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 2fa9bce66a4..7802279cb2e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -201,6 +201,27 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); (1 row) DROP SUBSCRIPTION regress_testsub; +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub + WITH (enabled = true, create_slot = false, copy_data = false); +-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction +-- block or function +BEGIN; +ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true); +ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block +END; +BEGIN; +ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; +ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block +END; +CREATE FUNCTION func() RETURNS VOID AS +$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; +SELECT func(); +ERROR: ALTER SUBSCRIPTION with refresh cannot be executed from a function +CONTEXT: SQL function "func" statement 1 +ALTER SUBSCRIPTION regress_testsub DISABLE; +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +DROP FUNCTION func; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 14fa0b247e1..ca0d7827429 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -147,6 +147,28 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub + WITH (enabled = true, create_slot = false, copy_data = false); + +-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction +-- block or function +BEGIN; +ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true); +END; + +BEGIN; +ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; +END; + +CREATE FUNCTION func() RETURNS VOID AS +$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; +SELECT func(); + +ALTER SUBSCRIPTION regress_testsub DISABLE; +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +DROP FUNCTION func; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index e111ab91810..c7926681b66 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 8; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -149,7 +149,26 @@ $result = $node_subscriber->safe_psql('postgres', is($result, qq(20), 'changes for table added after subscription initialized replicated'); +# clean up +$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next"); +$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next"); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +# Table tap_rep already has the same records on both publisher and subscriber +# at this time. Recreate the subscription which will do the initial copy of +# the table again and fails due to unique constraint violation. +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"); + +$result = $node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# DROP SUBSCRIPTION must clean up slots on the publisher side when the +# subscriber is stuck on data copy for constraint violation. +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1d540fe489f..bab4f3adb3b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2397,7 +2397,6 @@ StdAnalyzeData StdRdOptions Step StopList -StopWorkersData StrategyNumber StreamCtl StreamXidHash @@ -2408,6 +2407,7 @@ SubLink SubLinkType SubPlan SubPlanState +SubRemoveRels SubTransactionId SubXactCallback SubXactCallbackItem |