aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/logical/launcher.c12
-rw-r--r--src/backend/replication/logical/tablesync.c156
2 files changed, 100 insertions, 68 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 86a2b14807f..961110c94be 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -515,7 +515,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
- * Wake up (using latch) the logical replication worker.
+ * Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
logicalrep_worker_wakeup(Oid subid, Oid relid)
@@ -523,19 +523,25 @@ logicalrep_worker_wakeup(Oid subid, Oid relid)
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
worker = logicalrep_worker_find(subid, relid, true);
- LWLockRelease(LogicalRepWorkerLock);
if (worker)
logicalrep_worker_wakeup_ptr(worker);
+
+ LWLockRelease(LogicalRepWorkerLock);
}
/*
- * Wake up (using latch) the logical replication worker.
+ * Wake up (using latch) the specified logical replication worker.
+ *
+ * Caller must hold lock, else worker->proc could change under us.
*/
void
logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
{
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
SetLatch(&worker->proc->procLatch);
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ef12dfd26a..32abf5b368a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -212,8 +212,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
*
* Used when transitioning from SYNCWAIT state to CATCHUP.
*
- * Returns false if the apply worker has disappeared or the table state has been
- * reset.
+ * Returns false if the apply worker has disappeared.
*/
static bool
wait_for_worker_state_change(char expected_state)
@@ -226,17 +225,30 @@ wait_for_worker_state_change(char expected_state)
CHECK_FOR_INTERRUPTS();
- /* Bail if the apply has died. */
+ /*
+ * Done if already in correct state. (We assume this fetch is atomic
+ * enough to not give a misleading answer if we do it with no lock.)
+ */
+ if (MyLogicalRepWorker->relstate == expected_state)
+ return true;
+
+ /*
+ * Bail out if the apply worker has died, else signal it we're
+ * waiting.
+ */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
InvalidOid, false);
+ if (worker && worker->proc)
+ logicalrep_worker_wakeup_ptr(worker);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- return false;
-
- if (MyLogicalRepWorker->relstate == expected_state)
- return true;
+ break;
+ /*
+ * Wait. We expect to get a latch signal back from the apply worker,
+ * but use a timeout in case it dies without sending one.
+ */
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
@@ -245,7 +257,8 @@ wait_for_worker_state_change(char expected_state)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
- ResetLatch(MyLatch);
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
}
return false;
@@ -422,83 +435,96 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
else
{
LogicalRepWorker *syncworker;
- int nsyncworkers = 0;
+ /*
+ * Look for a sync worker for this relation.
+ */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
rstate->relid, false);
+
if (syncworker)
{
+ /* Found one, update our copy of its state */
SpinLockAcquire(&syncworker->relmutex);
rstate->state = syncworker->relstate;
rstate->lsn = syncworker->relstate_lsn;
+ if (rstate->state == SUBREL_STATE_SYNCWAIT)
+ {
+ /*
+ * Sync worker is waiting for apply. Tell sync worker it
+ * can catchup now.
+ */
+ syncworker->relstate = SUBREL_STATE_CATCHUP;
+ syncworker->relstate_lsn =
+ Max(syncworker->relstate_lsn, current_lsn);
+ }
SpinLockRelease(&syncworker->relmutex);
+
+ /* If we told worker to catch up, wait for it. */
+ if (rstate->state == SUBREL_STATE_SYNCWAIT)
+ {
+ /* Signal the sync worker, as it may be waiting for us. */
+ if (syncworker->proc)
+ logicalrep_worker_wakeup_ptr(syncworker);
+
+ /* Now safe to release the LWLock */
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /*
+ * Enter busy loop and wait for synchronization worker to
+ * reach expected state (or die trying).
+ */
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ wait_for_relation_state_change(rstate->relid,
+ SUBREL_STATE_SYNCDONE);
+ }
+ else
+ LWLockRelease(LogicalRepWorkerLock);
}
else
-
+ {
/*
* If there is no sync worker for this table yet, count
* running sync workers for this subscription, while we have
- * the lock, for later.
+ * the lock.
*/
- nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
- LWLockRelease(LogicalRepWorkerLock);
-
- /*
- * There is a worker synchronizing the relation and waiting for
- * apply to do something.
- */
- if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT)
- {
- /*
- * Tell sync worker it can catchup now. We'll wait for it so
- * it does not get lost.
- */
- SpinLockAcquire(&syncworker->relmutex);
- syncworker->relstate = SUBREL_STATE_CATCHUP;
- syncworker->relstate_lsn =
- Max(syncworker->relstate_lsn, current_lsn);
- SpinLockRelease(&syncworker->relmutex);
+ int nsyncworkers =
+ logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
- /* Signal the sync worker, as it may be waiting for us. */
- logicalrep_worker_wakeup_ptr(syncworker);
+ /* Now safe to release the LWLock */
+ LWLockRelease(LogicalRepWorkerLock);
/*
- * Enter busy loop and wait for synchronization worker to
- * reach expected state (or die trying).
+ * If there are free sync worker slot(s), start a new sync
+ * worker for the table.
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
- wait_for_relation_state_change(rstate->relid,
- SUBREL_STATE_SYNCDONE);
- }
-
- /*
- * If there is no sync worker registered for the table and there
- * is some free sync worker slot, start a new sync worker for the
- * table.
- */
- else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
- {
- TimestampTz now = GetCurrentTimestamp();
- struct tablesync_start_time_mapping *hentry;
- bool found;
-
- hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found);
-
- if (!found ||
- TimestampDifferenceExceeds(hentry->last_start_time, now,
- wal_retrieve_retry_interval))
+ if (nsyncworkers < max_sync_workers_per_subscription)
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid);
- hentry->last_start_time = now;
+ TimestampTz now = GetCurrentTimestamp();
+ struct tablesync_start_time_mapping *hentry;
+ bool found;
+
+ hentry = hash_search(last_start_times, &rstate->relid,
+ HASH_ENTER, &found);
+
+ if (!found ||
+ TimestampDifferenceExceeds(hentry->last_start_time, now,
+ wal_retrieve_retry_interval))
+ {
+ logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid);
+ hentry->last_start_time = now;
+ }
}
}
}
@@ -512,7 +538,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
- * Process state possible change(s) of tables that are being synchronized.
+ * Process possible state change(s) of tables that are being synchronized.
*/
void
process_syncing_tables(XLogRecPtr current_lsn)