aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c83
1 files changed, 81 insertions, 2 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d165d518e1b..0f9e5755b9e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
LogicalRepCtxStruct *LogicalRepCtx;
+typedef struct LogicalRepWorkerId
+{
+ Oid subid;
+ Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -250,6 +258,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
}
/*
+ * Similar to logicalrep_worker_find(), but returns list of all workers for
+ * the subscription, instead just one.
+ */
+List *
+logicalrep_workers_find(Oid subid, bool only_running)
+{
+ int i;
+ List *res = NIL;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ res = lappend(res, w);
+ }
+
+ return res;
+}
+
+/*
* Start new apply background worker.
*/
void
@@ -514,6 +546,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ MemoryContext oldctx;
+
+ /* Make sure we store the info in context that survives until commit. */
+ oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+ wid = palloc(sizeof(LogicalRepWorkerId));
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
* Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
@@ -754,14 +807,40 @@ ApplyLauncherShmemInit(void)
}
/*
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+ return (on_commit_stop_workers != NIL);
+}
+
+/*
* Wakeup the launcher on commit if requested.
*/
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ if (isCommit)
+ {
+ ListCell *lc;
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
+ }
+
+ /*
+ * No need to pfree on_commit_stop_workers. It was allocated in
+ * transaction memory context, which is going to be cleaned soon.
+ */
+ on_commit_stop_workers = NIL;
on_commit_launcher_wakeup = false;
}