diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 83 |
1 files changed, 81 insertions, 2 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index d165d518e1b..0f9e5755b9e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct LogicalRepCtxStruct *LogicalRepCtx; +typedef struct LogicalRepWorkerId +{ + Oid subid; + Oid relid; +} LogicalRepWorkerId; + +static List *on_commit_stop_workers = NIL; + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -250,6 +258,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) } /* + * Similar to logicalrep_worker_find(), but returns list of all workers for + * the subscription, instead just one. + */ +List * +logicalrep_workers_find(Oid subid, bool only_running) +{ + int i; + List *res = NIL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && (!only_running || w->proc)) + res = lappend(res, w); + } + + return res; +} + +/* * Start new apply background worker. */ void @@ -514,6 +546,27 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* + * Request worker for specified sub/rel to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext oldctx; + + /* Make sure we store the info in context that survives until commit. */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + wid = palloc(sizeof(LogicalRepWorkerId)); + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(oldctx); +} + +/* * Wake up (using latch) any logical replication worker for specified sub/rel. */ void @@ -754,14 +807,40 @@ ApplyLauncherShmemInit(void) } /* + * Check whether current transaction has manipulated logical replication + * workers. + */ +bool +XactManipulatesLogicalReplicationWorkers(void) +{ + return (on_commit_stop_workers != NIL); +} + +/* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + if (isCommit) + { + ListCell *lc; + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup(); + } + + /* + * No need to pfree on_commit_stop_workers. It was allocated in + * transaction memory context, which is going to be cleaned soon. + */ + on_commit_stop_workers = NIL; on_commit_launcher_wakeup = false; } |