diff options
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 4 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 5 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 7 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 2 |
4 files changed, 17 insertions, 1 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 45186837795..ee7a18137fc 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -873,6 +873,8 @@ ParallelApplyWorkerMain(Datum main_arg) int worker_slot = DatumGetInt32(main_arg); char originname[NAMEDATALEN]; + InitializingApplyWorker = true; + /* Setup signal handling. */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGINT, SignalHandlerForShutdownRequest); @@ -940,6 +942,8 @@ ParallelApplyWorkerMain(Datum main_arg) InitializeApplyWorker(); + InitializingApplyWorker = false; + /* Setup replication origin tracking. */ StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 970d170e73a..ceea1262315 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -797,8 +797,11 @@ logicalrep_worker_onexit(int code, Datum arg) * Session level locks may be acquired outside of a transaction in * parallel apply mode and will not be released when the worker * terminates, so manually release all locks before the worker exits. + * + * The locks will be acquired once the worker is initialized. */ - LockReleaseAll(DEFAULT_LOCKMETHOD, true); + if (!InitializingApplyWorker) + LockReleaseAll(DEFAULT_LOCKMETHOD, true); ApplyLauncherWakeup(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index dbf88c95531..879309b316c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -331,6 +331,9 @@ static TransactionId stream_xid = InvalidTransactionId; */ static uint32 parallel_stream_nchanges = 0; +/* Are we initializing a apply worker? */ +bool InitializingApplyWorker = false; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -4526,6 +4529,8 @@ ApplyWorkerMain(Datum main_arg) WalRcvStreamOptions options; int server_version; + InitializingApplyWorker = true; + /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -4548,6 +4553,8 @@ ApplyWorkerMain(Datum main_arg) InitializeApplyWorker(); + InitializingApplyWorker = false; + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dce71d2c501..b57eed052f6 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,8 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern PGDLLIMPORT bool InitializingApplyWorker; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); |