From 7e174fa793a2df89fe03d002a5087ef67abcdde8 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 4 Aug 2017 21:14:35 -0400 Subject: Only kill sync workers at commit time in subscription DDL This allows a transaction abort to avoid killing those workers. Author: Petr Jelinek --- src/backend/commands/subscriptioncmds.c | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) (limited to 'src/backend/commands') diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6dc3f6ee000..87824b8fec3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, @@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *subname; char *conninfo; char *slotname; + List *subworkers; + ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; RepOriginId originid; @@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* + * If we are dropping the replication slot, stop all the subscription + * workers immediately, so that the slot becomes accessible. Otherwise + * just schedule the stopping for the end of the transaction. + * + * New workers won't be started because we hold an exclusive lock on the + * subscription till the end of the transaction. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach (lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + if (slotname) + logicalrep_worker_stop(w->subid, w->relid); + else + logicalrep_worker_stop_at_commit(w->subid, w->relid); + } + list_free(subworkers); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); -- cgit v1.2.3