diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6dc3f6ee000..87824b8fec3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, @@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *subname; char *conninfo; char *slotname; + List *subworkers; + ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; RepOriginId originid; @@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* + * If we are dropping the replication slot, stop all the subscription + * workers immediately, so that the slot becomes accessible. Otherwise + * just schedule the stopping for the end of the transaction. + * + * New workers won't be started because we hold an exclusive lock on the + * subscription till the end of the transaction. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (slotname) + logicalrep_worker_stop(w->subid, w->relid); + else + logicalrep_worker_stop_at_commit(w->subid, w->relid); + } + list_free(subworkers); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); |