diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..4aed0dfcebb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle) { - BgwHandleStatus status; - int rc; + bool result = false; + bool dropped_latch = false; for (;;) { + BgwHandleStatus status; pid_t pid; + int rc; CHECK_FOR_INTERRUPTS(); @@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { + result = worker->in_use; LWLockRelease(LogicalRepWorkerLock); - return worker->in_use; + break; } LWLockRelease(LogicalRepWorkerLock); @@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, if (generation == worker->generation) logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); - return false; + break; /* result is already false */ } /* @@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, { ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); + dropped_latch = true; } } + + /* + * If we had to clear a latch event in order to wait, be sure to restore + * it before exiting. Otherwise caller may miss events. + */ + if (dropped_latch) + SetLatch(MyLatch); + + return result; } /* @@ -328,7 +341,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, if (max_active_replication_origins == 0) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0"))); + errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0"))); /* * We need to do the modification of the shared memory under lock so that @@ -1016,7 +1029,7 @@ logicalrep_launcher_attach_dshmem(void) last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); dsa_pin_mapping(last_start_times_dsa); last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, - LogicalRepCtx->last_start_dsh, 0); + LogicalRepCtx->last_start_dsh, NULL); } MemoryContextSwitchTo(oldcontext); @@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg) (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); + if (!logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID)) + { + /* + * We get here either if we failed to launch a worker + * (perhaps for resource-exhaustion reasons) or if we + * launched one but it immediately quit. Either way, it + * seems appropriate to try again after + * wal_retrieve_retry_interval. + */ + wait_time = Min(wait_time, + wal_retrieve_retry_interval); + } } else { |