aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c11
-rw-r--r--src/backend/catalog/pg_subscription.c38
-rw-r--r--src/backend/commands/subscriptioncmds.c467
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c8
-rw-r--r--src/backend/replication/logical/launcher.c147
-rw-r--r--src/backend/replication/logical/tablesync.c236
-rw-r--r--src/backend/replication/logical/worker.c18
-rw-r--r--src/backend/tcop/utility.c3
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_subscription_rel.h2
-rw-r--r--src/include/commands/subscriptioncmds.h2
-rw-r--r--src/include/replication/logicallauncher.h2
-rw-r--r--src/include/replication/slot.h3
-rw-r--r--src/include/replication/walreceiver.h1
-rw-r--r--src/include/replication/worker_internal.h3
-rw-r--r--src/test/regress/expected/subscription.out21
-rw-r--r--src/test/regress/sql/subscription.sql22
-rw-r--r--src/test/subscription/t/004_sync.pl21
-rw-r--r--src/tools/pgindent/typedefs.list2
19 files changed, 707 insertions, 302 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a2068e3fd45..3c8b4eb3622 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2432,15 +2432,6 @@ PrepareTransaction(void)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has exported snapshots")));
- /*
- * Don't allow PREPARE but for transaction that has/might kill logical
- * replication workers.
- */
- if (XactManipulatesLogicalReplicationWorkers())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
-
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
@@ -4899,7 +4890,6 @@ CommitSubTransaction(void)
AtEOSubXact_HashTables(true, s->nestingLevel);
AtEOSubXact_PgStat(true, s->nestingLevel);
AtSubCommit_Snapshot(s->nestingLevel);
- AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
/*
* We need to restore the upper transaction's read-only state, in case the
@@ -5059,7 +5049,6 @@ AbortSubTransaction(void)
AtEOSubXact_HashTables(false, s->nestingLevel);
AtEOSubXact_PgStat(false, s->nestingLevel);
AtSubAbort_Snapshot(s->nestingLevel);
- AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
}
/*
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 44cb285b686..c32fc8137d8 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -29,6 +29,7 @@
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
@@ -337,6 +338,13 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
char substate;
bool isnull;
Datum d;
+ Relation rel;
+
+ /*
+ * This is to avoid the race condition with AlterSubscription which tries
+ * to remove this relstate.
+ */
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
/* Try finding the mapping. */
tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
@@ -363,6 +371,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
/* Cleanup */
ReleaseSysCache(tup);
+ table_close(rel, AccessShareLock);
+
return substate;
}
@@ -403,6 +413,34 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
scan = table_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
+ Form_pg_subscription_rel subrel;
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ /*
+ * We don't allow to drop the relation mapping when the table
+ * synchronization is in progress unless the caller updates the
+ * corresponding subscription as well. This is to ensure that we don't
+ * leave tablesync slots or origins in the system when the
+ * corresponding table is dropped.
+ */
+ if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not drop relation mapping for subscription \"%s\"",
+ get_subscription_name(subrel->srsubid, false)),
+ errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
+ get_rel_name(relid), subrel->srsubstate),
+ /*
+ * translator: first %s is a SQL ALTER command and second %s is a
+ * SQL DROP command
+ */
+ errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
+ "ALTER SUBSCRIPTION ... ENABLE",
+ "DROP SUBSCRIPTION ...")));
+ }
+
CatalogTupleDelete(rel, &tup->t_self);
}
table_endscan(scan);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5ccbc9dd50f..5cf874e0b46 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
@@ -46,6 +47,8 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
Oid *pubrel_local_oids;
ListCell *lc;
int off;
+ int remove_rel_len;
+ Relation rel = NULL;
+ typedef struct SubRemoveRels
+ {
+ Oid relid;
+ char state;
+ } SubRemoveRels;
+ SubRemoveRels *sub_remove_rels;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
- /* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ PG_TRY();
+ {
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
- /* We are done with the remote side, close connection. */
- walrcv_disconnect(wrconn);
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
- /* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid);
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
- /*
- * Build qsorted array of local table oids for faster lookup. This can
- * potentially contain all tables in the database so speed of lookup is
- * important.
- */
- subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
- off = 0;
- foreach(lc, subrel_states)
- {
- SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
- subrel_local_oids[off++] = relstate->relid;
- }
- qsort(subrel_local_oids, list_length(subrel_states),
- sizeof(Oid), oid_cmp);
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
- /*
- * Walk over the remote tables and try to match them to locally known
- * tables. If the table is not known locally create a new state for it.
- *
- * Also builds array of local oids of remote tables for the next step.
- */
- off = 0;
- pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
- foreach(lc, pubrel_names)
- {
- RangeVar *rv = (RangeVar *) lfirst(lc);
- Oid relid;
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
- relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ pubrel_local_oids[off++] = relid;
- /* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
- pubrel_local_oids[off++] = relid;
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
- if (!bsearch(&relid, subrel_local_oids,
- list_length(subrel_states), sizeof(Oid), oid_cmp))
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
{
- AddSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
- }
- }
+ Oid relid = subrel_local_oids[off];
- /*
- * Next remove state for tables we should not care about anymore using the
- * data we collected above
- */
- qsort(pubrel_local_oids, list_length(pubrel_names),
- sizeof(Oid), oid_cmp);
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
- for (off = 0; off < list_length(subrel_states); off++)
- {
- Oid relid = subrel_local_oids[off];
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
- if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
{
- RemoveSubscriptionRel(sub->oid, relid);
-
- logicalrep_worker_stop_at_commit(sub->oid, relid);
-
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name)));
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
}
}
+ PG_FINALLY();
+ {
+ if (wrconn)
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ if (rel)
+ table_close(rel, NoLock);
}
/*
* Alter the existing subscription.
*/
ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt)
+AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
@@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+
AlterSubscription_refresh(sub, copy_data);
break;
@@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char originname[NAMEDATALEN];
char *err = NULL;
WalReceiverConn *wrconn = NULL;
- StringInfoData cmd;
Form_pg_subscription form;
+ List *rstates;
/*
* Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
list_free(subworkers);
+ /*
+ * Cleanup of tablesync replication origins.
+ *
+ * Any READY-state relations would already have dealt with clean-ups.
+ *
+ * Note that the state can't change because we have already stopped both
+ * the apply and tablesync workers and they can't restart because of
+ * exclusive lock on the subscription.
+ */
+ rstates = GetSubscriptionNotReadyRelations(subid);
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for tablesync
+ * worker so passing missing_ok = true. This can happen for the states
+ * before SUBREL_STATE_FINISHEDCOPY.
+ */
+ ReplicationOriginNameForTablesync(subid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
@@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!slotname && rstates == NIL)
{
table_close(rel, NoLock);
return;
}
/*
- * Otherwise drop the replication slot at the publisher node using the
- * replication connection.
+ * Try to acquire the connection necessary for dropping slots.
+ *
+ * Note: If the slotname is NONE/NULL then we allow the command to finish
+ * and users need to manually cleanup the apply and tablesync worker slots
+ * later.
+ *
+ * This has to be at the end because otherwise if there is an error while
+ * doing the database operations we won't be able to rollback dropped
+ * slot.
*/
load_file("libpqwalreceiver", false);
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
-
wrconn = walrcv_connect(conninfo, true, subname, &err);
if (wrconn == NULL)
- ereport(ERROR,
- (errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
- errdetail("The error was: %s", err),
- /* translator: %s is an SQL ALTER command */
- errhint("Use %s to disassociate the subscription from the slot.",
- "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+ {
+ if (!slotname)
+ {
+ /* be tidy */
+ list_free(rstates);
+ table_close(rel, NoLock);
+ return;
+ }
+ else
+ {
+ ReportSlotConnectionError(rstates, subid, slotname, err);
+ }
+ }
+
+ PG_TRY();
+ {
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync slots associated with removed tables.
+ *
+ * For SYNCDONE/READY states, the tablesync slot is known to have
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot does
+ * not exist yet. Also, if we fail after removing some of the
+ * slots, next time, it will again try to drop already dropped
+ * slots and fail. For these reasons, we allow missing_ok = true
+ * for the drop.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
+ list_free(rstates);
+
+ /*
+ * If there is a slot associated with the subscription, then drop the
+ * replication slot at the publisher.
+ */
+ if (slotname)
+ ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ table_close(rel, NoLock);
+}
+
+/*
+ * Drop the replication slot at the publisher node using the replication
+ * connection.
+ *
+ * missing_ok - if true then only issue a LOG message if the slot doesn't
+ * exist.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+ StringInfoData cmd;
+
+ Assert(wrconn);
+
+ load_file("libpqwalreceiver", false);
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
PG_TRY();
{
@@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
+ if (res->status == WALRCV_OK_COMMAND)
+ {
+ /* NOTICE. Success. */
+ ereport(NOTICE,
+ (errmsg("dropped replication slot \"%s\" on publisher",
+ slotname)));
+ }
+ else if (res->status == WALRCV_ERROR &&
+ missing_ok &&
+ res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
+ {
+ /* LOG. Error, but missing_ok = true. */
+ ereport(LOG,
(errmsg("could not drop the replication slot \"%s\" on publisher",
slotname),
errdetail("The error was: %s", res->err)));
+ }
else
- ereport(NOTICE,
- (errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ {
+ /* ERROR. */
+ ereport(ERROR,
+ (errmsg("could not drop the replication slot \"%s\" on publisher",
+ slotname),
+ errdetail("The error was: %s", res->err)));
+ }
walrcv_clear_result(res);
}
PG_FINALLY();
{
- walrcv_disconnect(wrconn);
+ pfree(cmd.data);
}
PG_END_TRY();
-
- pfree(cmd.data);
-
- table_close(rel, NoLock);
}
/*
@@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+
+/*
+ * This is to report the connection failure while dropping replication slots.
+ * Here, we report the WARNING for all tablesync slots so that user can drop
+ * them manually, if required.
+ */
+static void
+ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
+{
+ ListCell *lc;
+
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Caller needs to ensure that relstate doesn't change underneath us.
+ * See DropSubscription where we get the relstates.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ elog(WARNING, "could not drop tablesync replication slot \"%s\"",
+ syncslotname);
+ }
+ }
+
+ ereport(ERROR,
+ (errmsg("could not connect to publisher when attempting to "
+ "drop the replication slot \"%s\"", slotname),
+ errdetail("The error was: %s", err),
+ /* translator: %s is an SQL ALTER command */
+ errhint("Use %s to disassociate the subscription from the slot.",
+ "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e9582748617..77146961408 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -982,6 +982,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
{
PGresult *pgres = NULL;
WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
+ char *diag_sqlstate;
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
@@ -1025,6 +1026,13 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
case PGRES_BAD_RESPONSE:
walres->status = WALRCV_ERROR;
walres->err = pchomp(PQerrorMessage(conn->streamConn));
+ diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
+ if (diag_sqlstate)
+ walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+ diag_sqlstate[1],
+ diag_sqlstate[2],
+ diag_sqlstate[3],
+ diag_sqlstate[4]);
break;
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 186514cd9ed..58082dde186 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,20 +73,6 @@ typedef struct LogicalRepWorkerId
Oid relid;
} LogicalRepWorkerId;
-typedef struct StopWorkersData
-{
- int nestDepth; /* Sub-transaction nest level */
- List *workers; /* List of LogicalRepWorkerId */
- struct StopWorkersData *parent; /* This need not be an immediate
- * subtransaction parent */
-} StopWorkersData;
-
-/*
- * Stack of StopWorkersData elements. Each stack element contains the workers
- * to be stopped for that subtransaction.
- */
-static StopWorkersData *on_commit_stop_workers = NULL;
-
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -547,51 +533,6 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
- * Request worker for specified sub/rel to be stopped on commit.
- */
-void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
-{
- int nestDepth = GetCurrentTransactionNestLevel();
- LogicalRepWorkerId *wid;
- MemoryContext oldctx;
-
- /* Make sure we store the info in context that survives until commit. */
- oldctx = MemoryContextSwitchTo(TopTransactionContext);
-
- /* Check that previous transactions were properly cleaned up. */
- Assert(on_commit_stop_workers == NULL ||
- nestDepth >= on_commit_stop_workers->nestDepth);
-
- /*
- * Push a new stack element if we don't already have one for the current
- * nestDepth.
- */
- if (on_commit_stop_workers == NULL ||
- nestDepth > on_commit_stop_workers->nestDepth)
- {
- StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
-
- newdata->nestDepth = nestDepth;
- newdata->workers = NIL;
- newdata->parent = on_commit_stop_workers;
- on_commit_stop_workers = newdata;
- }
-
- /*
- * Finally add a new worker into the worker list of the current
- * subtransaction.
- */
- wid = palloc(sizeof(LogicalRepWorkerId));
- wid->subid = subid;
- wid->relid = relid;
- on_commit_stop_workers->workers =
- lappend(on_commit_stop_workers->workers, wid);
-
- MemoryContextSwitchTo(oldctx);
-}
-
-/*
* Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
@@ -820,109 +761,21 @@ ApplyLauncherShmemInit(void)
}
/*
- * Check whether current transaction has manipulated logical replication
- * workers.
- */
-bool
-XactManipulatesLogicalReplicationWorkers(void)
-{
- return (on_commit_stop_workers != NULL);
-}
-
-/*
* Wakeup the launcher on commit if requested.
*/
void
AtEOXact_ApplyLauncher(bool isCommit)
{
-
- Assert(on_commit_stop_workers == NULL ||
- (on_commit_stop_workers->nestDepth == 1 &&
- on_commit_stop_workers->parent == NULL));
-
if (isCommit)
{
- ListCell *lc;
-
- if (on_commit_stop_workers != NULL)
- {
- List *workers = on_commit_stop_workers->workers;
-
- foreach(lc, workers)
- {
- LogicalRepWorkerId *wid = lfirst(lc);
-
- logicalrep_worker_stop(wid->subid, wid->relid);
- }
- }
-
if (on_commit_launcher_wakeup)
ApplyLauncherWakeup();
}
- /*
- * No need to pfree on_commit_stop_workers. It was allocated in
- * transaction memory context, which is going to be cleaned soon.
- */
- on_commit_stop_workers = NULL;
on_commit_launcher_wakeup = false;
}
/*
- * On commit, merge the current on_commit_stop_workers list into the
- * immediate parent, if present.
- * On rollback, discard the current on_commit_stop_workers list.
- * Pop out the stack.
- */
-void
-AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
-{
- StopWorkersData *parent;
-
- /* Exit immediately if there's no work to do at this level. */
- if (on_commit_stop_workers == NULL ||
- on_commit_stop_workers->nestDepth < nestDepth)
- return;
-
- Assert(on_commit_stop_workers->nestDepth == nestDepth);
-
- parent = on_commit_stop_workers->parent;
-
- if (isCommit)
- {
- /*
- * If the upper stack element is not an immediate parent
- * subtransaction, just decrement the notional nesting depth without
- * doing any real work. Else, we need to merge the current workers
- * list into the parent.
- */
- if (!parent || parent->nestDepth < nestDepth - 1)
- {
- on_commit_stop_workers->nestDepth--;
- return;
- }
-
- parent->workers =
- list_concat(parent->workers, on_commit_stop_workers->workers);
- }
- else
- {
- /*
- * Abandon everything that was done at this nesting level. Explicitly
- * free memory to avoid a transaction-lifespan leak.
- */
- list_free_deep(on_commit_stop_workers->workers);
- }
-
- /*
- * We have taken care of the current subtransaction workers list for both
- * abort or commit. So we are ready to pop the stack.
- */
- pfree(on_commit_stop_workers);
- on_commit_stop_workers = parent;
-}
-
-/*
* Request wakeup of the launcher on commit of the transaction.
*
* This is used to send launcher signal to stop sleeping and process the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ccbdbcf08f9..19cc8046786 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -31,8 +31,11 @@
* table state to INIT.
* - Tablesync worker starts; changes table state from INIT to DATASYNC while
* copying.
- * - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- * waits for state change.
+ * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
+ * worker specific) state to indicate when the copy phase has completed, so
+ * if the worker crashes with this (non-memory) state then the copy will not
+ * be re-attempted.
+ * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
* - Apply worker periodically checks for tables in SYNCWAIT state. When
* any appear, it sets the table state to CATCHUP and starts loop-waiting
* until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +51,8 @@
* point it sets state to READY and stops tracking. Again, there might
* be zero changes in between.
*
- * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- * CATCHUP -> SYNCDONE -> READY.
+ * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
+ * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. The catalog holds all states
@@ -58,6 +61,7 @@
* Example flows look like this:
* - Apply is in front:
* sync:8
+ * -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:10
* -> set in memory CATCHUP
@@ -73,6 +77,7 @@
*
* - Sync is in front:
* sync:10
+ * -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:8
* -> set in memory CATCHUP
@@ -101,7 +106,10 @@
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -269,26 +277,52 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
- Assert(IsTransactionState());
-
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
current_lsn >= MyLogicalRepWorker->relstate_lsn)
{
TimeLineID tli;
+ char syncslotname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /*
+ * UpdateSubscriptionRelState must be called within a transaction.
+ * That transaction will be ended within the finish_sync_worker().
+ */
+ if (!IsTransactionState())
+ StartTransactionCommand();
+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
+ /* End wal streaming so wrconn can be re-used to drop the slot. */
walrcv_endstreaming(wrconn, &tli);
+
+ /*
+ * Cleanup the tablesync slot.
+ *
+ * This has to be done after updating the state because otherwise if
+ * there is an error while doing the database operations we won't be
+ * able to rollback dropped slot.
+ */
+ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ syncslotname);
+
+ /*
+ * It is important to give an error if we are unable to drop the slot,
+ * otherwise, it won't be dropped till the corresponding subscription
+ * is dropped. So passing missing_ok = false.
+ */
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+
finish_sync_worker();
}
else
@@ -403,6 +437,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
+ char originname[NAMEDATALEN];
+
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@@ -411,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
}
+ /*
+ * Remove the tablesync origin tracking if exists.
+ *
+ * The normal case origin drop is done here instead of in the
+ * process_syncing_tables_for_sync function because we don't
+ * allow to drop the origin till the process owning the origin
+ * is alive.
+ *
+ * There is a chance that the user is concurrently performing
+ * refresh for the subscription where we remove the table
+ * state and its origin and by this time the origin might be
+ * already removed. So passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ rstate->relid,
+ originname);
+ replorigin_drop_by_name(originname, true, false);
+
+ /*
+ * Update the state to READY only after the origin cleanup.
+ */
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
rstate->lsn);
@@ -806,6 +863,50 @@ copy_table(Relation rel)
}
/*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
+ * on slot name length. We append system_identifier to avoid slot_name
+ * collision with subscriptions in other clusters. With the current scheme
+ * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
+ * length of slot_name will be 50.
+ *
+ * The returned slot name is either:
+ * - stored in the supplied buffer (syncslotname), or
+ * - palloc'ed in current memory context (if syncslotname = NULL).
+ *
+ * Note: We don't use the subscription slot name as part of tablesync slot name
+ * because we are responsible for cleaning up these slots and it could become
+ * impossible to recalculate what name to cleanup if the subscription slot name
+ * had changed.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ char syncslotname[NAMEDATALEN])
+{
+ if (syncslotname)
+ sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+ GetSystemIdentifier());
+ else
+ syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+ GetSystemIdentifier());
+
+ return syncslotname;
+}
+
+/*
+ * Form the origin name for tablesync.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
+ char originname[NAMEDATALEN])
+{
+ snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid);
+}
+
+/*
* Start syncing the table in the sync worker.
*
* If nothing needs to be done to sync the table, we exit the worker without
@@ -822,6 +923,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
XLogRecPtr relstate_lsn;
Relation rel;
WalRcvExecResult *res;
+ char originname[NAMEDATALEN];
+ RepOriginId originid;
/* Check the state of the table synchronization. */
StartTransactionCommand();
@@ -847,19 +950,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
finish_sync_worker(); /* doesn't return */
}
- /*
- * To build a slot name for the sync work, we are limited to NAMEDATALEN -
- * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
- * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
- * NAMEDATALEN on the remote that matters, but this scheme will also work
- * reasonably if that is different.)
- */
- StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
- slotname = psprintf("%.*s_%u_sync_%u",
- NAMEDATALEN - 28,
- MySubscription->slotname,
- MySubscription->oid,
- MyLogicalRepWorker->relid);
+ /* Calculate the name of the tablesync slot. */
+ slotname = ReplicationSlotNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ NULL /* use palloc */ );
/*
* Here we use the slot name instead of the subscription name as the
@@ -872,7 +966,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
(errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+ MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+ MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+ /* Assign the origin tracking record name. */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname);
+
+ if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
+ {
+ /*
+ * We have previously errored out before finishing the copy so the
+ * replication slot might exist. We want to remove the slot if it
+ * already exists and proceed.
+ *
+ * XXX We could also instead try to drop the slot, last time we failed
+ * but for that, we might need to clean up the copy state as it might
+ * be in the middle of fetching the rows. Also, if there is a network
+ * breakdown then it wouldn't have succeeded so trying it next time
+ * seems like a better bet.
+ */
+ ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ }
+ else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+ {
+ /*
+ * The COPY phase was previously done, but tablesync then crashed
+ * before it was able to finish normally.
+ */
+ StartTransactionCommand();
+
+ /*
+ * The origin tracking name must already exist. It was created first
+ * time this tablesync was launched.
+ */
+ originid = replorigin_by_name(originname, false);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ *origin_startpos = replorigin_session_get_progress(false);
+
+ CommitTransactionCommand();
+
+ goto copy_table_done;
+ }
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -888,9 +1025,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommitTransactionCommand();
pgstat_report_stat(false);
- /*
- * We want to do the table data sync in a single transaction.
- */
StartTransactionCommand();
/*
@@ -916,13 +1050,46 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_clear_result(res);
/*
- * Create a new temporary logical decoding slot. This slot will be used
+ * Create a new permanent logical decoding slot. This slot will be used
* for the catchup phase after COPY is done, so tell it to use the
* snapshot to make the final data consistent.
*/
- walrcv_create_slot(wrconn, slotname, true,
+ walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
+ /*
+ * Setup replication origin tracking. The purpose of doing this before the
+ * copy is to avoid doing the copy again due to any error in setting up
+ * origin tracking.
+ */
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ {
+ /*
+ * Origin tracking does not exist, so create it now.
+ *
+ * Then advance to the LSN got from walrcv_create_slot. This is WAL
+ * logged for the purpose of recovery. Locks are to prevent the
+ * replication origin from vanishing while advancing.
+ */
+ originid = replorigin_create(originname);
+
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+ true /* go backward */ , true /* WAL log */ );
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication origin \"%s\" already exists",
+ originname)));
+ }
+
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
@@ -941,6 +1108,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommandCounterIncrement();
/*
+ * Update the persisted state to indicate the COPY phase is done; make it
+ * visible to others.
+ */
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_FINISHEDCOPY,
+ MyLogicalRepWorker->relstate_lsn);
+
+ CommitTransactionCommand();
+
+copy_table_done:
+
+ elog(DEBUG1,
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ originname,
+ (uint32) (*origin_startpos >> 32),
+ (uint32) *origin_startpos);
+
+ /*
* We are done with the initial data synchronization, update the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eb7db89cef7..cfc924cd893 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
/* We must be in a valid transaction state */
Assert(IsTransactionState());
- /* The synchronization worker runs in single transaction. */
- if (!am_tablesync_worker())
- {
- /* Commit the per-stream transaction */
- CommitTransactionCommand();
- }
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
in_streamed_transaction = false;
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
/* Cleanup the subxact info */
cleanup_subxact_info();
- /* The synchronization worker runs in single transaction */
- if (!am_tablesync_worker())
- CommitTransactionCommand();
+ CommitTransactionCommand();
return;
}
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
/* write the updated subxact list */
subxact_info_write(MyLogicalRepWorker->subid, xid);
- if (!am_tablesync_worker())
- CommitTransactionCommand();
+ CommitTransactionCommand();
}
}
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
static void
apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
{
- /* The synchronization worker runs in single transaction. */
- if (IsTransactionState() && !am_tablesync_worker())
+ if (IsTransactionState())
{
/*
* Update origin state so we can restart streaming from correct
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1d81071c357..05bb698cf45 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1786,7 +1786,8 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_AlterSubscriptionStmt:
- address = AlterSubscription((AlterSubscriptionStmt *) parsetree);
+ address = AlterSubscription((AlterSubscriptionStmt *) parsetree,
+ isTopLevel);
break;
case T_DropSubscriptionStmt:
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 638830aaac1..2efd937e12e 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202102021
+#define CATALOG_VERSION_NO 202102121
#endif
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 2bea2c52aa7..ed94f57baa1 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg
#define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */
#define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn
* NULL) */
+#define SUBREL_STATE_FINISHEDCOPY 'f' /* tablesync copy phase is completed
+ * (sublsn NULL) */
#define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of
* apply (sublsn set) */
#define SUBREL_STATE_READY 'r' /* ready (sublsn set) */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index a81865079d1..3b926f35d76 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -20,7 +20,7 @@
extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt,
bool isTopLevel);
-extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
+extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel);
extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 421ec1580d8..301e494f7ba 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,9 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
-extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
-extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53f636c56f5..5f52335f15f 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
+#include "replication/walreceiver.h"
/*
* Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4313f516d35..a97a59a6a30 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -210,6 +210,7 @@ typedef enum
typedef struct WalRcvExecResult
{
WalRcvExecStatus status;
+ int sqlstate;
char *err;
Tuplestorestate *tuplestore;
TupleDesc tupledesc;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index d046022e49c..4a5adc2fdac 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,13 +77,14 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
+extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2fa9bce66a4..7802279cb2e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -201,6 +201,27 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
(1 row)
DROP SUBSCRIPTION regress_testsub;
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+ WITH (enabled = true, create_slot = false, copy_data = false);
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block
+END;
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block
+END;
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+ERROR: ALTER SUBSCRIPTION with refresh cannot be executed from a function
+CONTEXT: SQL function "func" statement 1
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 14fa0b247e1..ca0d7827429 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -147,6 +147,28 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+ WITH (enabled = true, create_slot = false, copy_data = false);
+
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+END;
+
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+END;
+
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index e111ab91810..c7926681b66 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 8;
# Initialize publisher node
my $node_publisher = get_new_node('publisher');
@@ -149,7 +149,26 @@ $result = $node_subscriber->safe_psql('postgres',
is($result, qq(20),
'changes for table added after subscription initialized replicated');
+# clean up
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next");
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+# Table tap_rep already has the same records on both publisher and subscriber
+# at this time. Recreate the subscription which will do the initial copy of
+# the table again and fails due to unique constraint violation.
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub");
+
+$result = $node_subscriber->poll_query_until('postgres', $started_query)
+ or die "Timed out while waiting for subscriber to start sync";
+
+# DROP SUBSCRIPTION must clean up slots on the publisher side when the
+# subscriber is stuck on data copy for constraint violation.
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1d540fe489f..bab4f3adb3b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2397,7 +2397,6 @@ StdAnalyzeData
StdRdOptions
Step
StopList
-StopWorkersData
StrategyNumber
StreamCtl
StreamXidHash
@@ -2408,6 +2407,7 @@ SubLink
SubLinkType
SubPlan
SubPlanState
+SubRemoveRels
SubTransactionId
SubXactCallback
SubXactCallbackItem