diff options
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r-- | src/backend/replication/syncrep.c | 35 |
1 files changed, 28 insertions, 7 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 92faf4e8e0d..2da9cba5dc7 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to the wait queue. During SyncRepWakeQueue() a WALSender changes * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. * This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN + * represents a commit record. If it doesn't, then we wait only for the WAL + * to be flushed if synchronous_commit is set to the higher level of + * remote_apply, because only commit records provide apply feedback. */ void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode; + + /* Cap the level for anything other than commit to remote flush only. */ + if (commit) + mode = SyncRepWaitMode; + else + mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); /* * Fast exit if user has not requested sync replication, or there are no @@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XactCommitLSN <= WalSndCtl->lsn[mode]) + lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; @@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ - MyProc->waitLSN = XactCommitLSN; + MyProc->waitLSN = lsn; MyProc->syncRepState = SYNC_REP_WAITING; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); @@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status = (char *) palloc(len + 32 + 1); memcpy(new_status, old_status, len); sprintf(new_status + len, " waiting for %X/%X", - (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); + (uint32) (lsn >> 32), (uint32) lsn); set_ps_display(new_status, false); new_status[len] = '\0'; /* truncate off " waiting ..." */ } @@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; |