aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/syncrep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r--src/backend/replication/syncrep.c35
1 files changed, 28 insertions, 7 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 92faf4e8e0d..2da9cba5dc7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
+ *
+ * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
+ * represents a commit record. If it doesn't, then we wait only for the WAL
+ * to be flushed if synchronous_commit is set to the higher level of
+ * remote_apply, because only commit records provide apply feedback.
*/
void
-SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char *new_status = NULL;
const char *old_status;
- int mode = SyncRepWaitMode;
+ int mode;
+
+ /* Cap the level for anything other than commit to remote flush only. */
+ if (commit)
+ mode = SyncRepWaitMode;
+ else
+ mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/*
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XactCommitLSN <= WalSndCtl->lsn[mode])
+ lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
- MyProc->waitLSN = XactCommitLSN;
+ MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
- (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
+ (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
+ int numapply = 0;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
+ }
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+ numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
+ numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/*
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;