aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/applyparallelworker.c4
-rw-r--r--src/backend/replication/logical/launcher.c5
-rw-r--r--src/backend/replication/logical/worker.c7
-rw-r--r--src/include/replication/worker_internal.h2
4 files changed, 17 insertions, 1 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 45186837795..ee7a18137fc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -873,6 +873,8 @@ ParallelApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
char originname[NAMEDATALEN];
+ InitializingApplyWorker = true;
+
/* Setup signal handling. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
@@ -940,6 +942,8 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Setup replication origin tracking. */
StartTransactionCommand();
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 970d170e73a..ceea1262315 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -797,8 +797,11 @@ logicalrep_worker_onexit(int code, Datum arg)
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits.
+ *
+ * The locks will be acquired once the worker is initialized.
*/
- LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+ if (!InitializingApplyWorker)
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup();
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dbf88c95531..879309b316c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -331,6 +331,9 @@ static TransactionId stream_xid = InvalidTransactionId;
*/
static uint32 parallel_stream_nchanges = 0;
+/* Are we initializing a apply worker? */
+bool InitializingApplyWorker = false;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -4526,6 +4529,8 @@ ApplyWorkerMain(Datum main_arg)
WalRcvStreamOptions options;
int server_version;
+ InitializingApplyWorker = true;
+
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
@@ -4548,6 +4553,8 @@ ApplyWorkerMain(Datum main_arg)
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dce71d2c501..b57eed052f6 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,8 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
extern PGDLLIMPORT bool in_remote_transaction;
+extern PGDLLIMPORT bool InitializingApplyWorker;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);