aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c9
-rw-r--r--src/backend/commands/subscriptioncmds.c28
-rw-r--r--src/backend/replication/logical/launcher.c83
-rw-r--r--src/include/replication/logicallauncher.h1
-rw-r--r--src/include/replication/worker_internal.h2
5 files changed, 117 insertions, 6 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69fe4b4..50c3c3b5e5e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has exported snapshots")));
+ /*
+ * Don't allow PREPARE but for transaction that has/might kill logical
+ * replication workers.
+ */
+ if (XactManipulatesLogicalReplicationWorkers())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6dc3f6ee000..87824b8fec3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
namespace = get_namespace_name(get_rel_namespace(relid));
ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char *subname;
char *conninfo;
char *slotname;
+ List *subworkers;
+ ListCell *lc;
char originname[NAMEDATALEN];
char *err = NULL;
RepOriginId originid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /*
+ * If we are dropping the replication slot, stop all the subscription
+ * workers immediately, so that the slot becomes accessible. Otherwise
+ * just schedule the stopping for the end of the transaction.
+ *
+ * New workers won't be started because we hold an exclusive lock on the
+ * subscription till the end of the transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d165d518e1b..0f9e5755b9e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
LogicalRepCtxStruct *LogicalRepCtx;
+typedef struct LogicalRepWorkerId
+{
+ Oid subid;
+ Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -250,6 +258,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
}
/*
+ * Similar to logicalrep_worker_find(), but returns list of all workers for
+ * the subscription, instead just one.
+ */
+List *
+logicalrep_workers_find(Oid subid, bool only_running)
+{
+ int i;
+ List *res = NIL;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ res = lappend(res, w);
+ }
+
+ return res;
+}
+
+/*
* Start new apply background worker.
*/
void
@@ -514,6 +546,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ MemoryContext oldctx;
+
+ /* Make sure we store the info in context that survives until commit. */
+ oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+ wid = palloc(sizeof(LogicalRepWorkerId));
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
* Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
@@ -754,14 +807,40 @@ ApplyLauncherShmemInit(void)
}
/*
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+ return (on_commit_stop_workers != NIL);
+}
+
+/*
* Wakeup the launcher on commit if requested.
*/
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ if (isCommit)
+ {
+ ListCell *lc;
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
+ }
+
+ /*
+ * No need to pfree on_commit_stop_workers. It was allocated in
+ * transaction memory context, which is going to be cleaned soon.
+ */
+ on_commit_stop_workers = NIL;
on_commit_launcher_wakeup = false;
}
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d326e22..78016c448f3 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3d087..7b8728cced0 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,9 +71,11 @@ extern bool in_remote_transaction;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);