aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c28
1 files changed, 24 insertions, 4 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6dc3f6ee000..87824b8fec3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
namespace = get_namespace_name(get_rel_namespace(relid));
ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char *subname;
char *conninfo;
char *slotname;
+ List *subworkers;
+ ListCell *lc;
char originname[NAMEDATALEN];
char *err = NULL;
RepOriginId originid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /*
+ * If we are dropping the replication slot, stop all the subscription
+ * workers immediately, so that the slot becomes accessible. Otherwise
+ * just schedule the stopping for the end of the transaction.
+ *
+ * New workers won't be started because we hold an exclusive lock on the
+ * subscription till the end of the transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);