diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2012-01-24 20:22:37 +0000 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2012-01-24 20:22:37 +0000 |
commit | 443b4821f1649bc617c5ce1f6f3ffc65842a8930 (patch) | |
tree | edbbc03b007575020733aff01cc137b62b3b5509 /src/backend | |
parent | 89dda5f2979fbe277809369ff88832ab39e83ff0 (diff) | |
download | postgresql-443b4821f1649bc617c5ce1f6f3ffc65842a8930.tar.gz postgresql-443b4821f1649bc617c5ce1f6f3ffc65842a8930.zip |
Add new replication mode synchronous_commit = 'write'.
Replication occurs only to memory on standby, not to disk,
so provides additional performance if user wishes to
reduce durability level slightly. Adds concept of multiple
independent sync rep queues.
Fujii Masao and Simon Riggs
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/syncrep.c | 112 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 3 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 5 |
3 files changed, 80 insertions, 40 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 6bf69f0d35b..1273a8b9ebf 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -20,8 +20,8 @@ * per-transaction state information. * * Replication is either synchronous or not synchronous (async). If it is - * async, we just fastpath out of here. If it is sync, then in 9.1 we wait - * for the flush location on the standby before releasing the waiting backend. + * async, we just fastpath out of here. If it is sync, then we wait for + * the write or flush location on the standby before releasing the waiting backend. * Further complexity in that interaction is expected in later releases. * * The best performing way to manage the waiting backends is to have a @@ -67,13 +67,15 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; -static void SyncRepQueueInsert(void); +static int SyncRepWaitMode = SYNC_REP_NO_WAIT; + +static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING -static bool SyncRepQueueIsOrderedByLSN(void); +static bool SyncRepQueueIsOrderedByLSN(int mode); #endif /* @@ -120,7 +122,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XLByteLE(XactCommitLSN, WalSndCtl->lsn)) + XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode])) { LWLockRelease(SyncRepLock); return; @@ -132,8 +134,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) */ MyProc->waitLSN = XactCommitLSN; MyProc->syncRepState = SYNC_REP_WAITING; - SyncRepQueueInsert(); - Assert(SyncRepQueueIsOrderedByLSN()); + SyncRepQueueInsert(SyncRepWaitMode); + Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode)); LWLockRelease(SyncRepLock); /* Alter ps display to show waiting for sync rep. */ @@ -267,18 +269,19 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) } /* - * Insert MyProc into SyncRepQueue, maintaining sorted invariant. + * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant. * * Usually we will go at tail of queue, though it's possible that we arrive * here out of order, so start at tail and work back to insertion point. */ static void -SyncRepQueueInsert(void) +SyncRepQueueInsert(int mode) { PGPROC *proc; - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -290,7 +293,7 @@ SyncRepQueueInsert(void) if (XLByteLT(proc->waitLSN, MyProc->waitLSN)) break; - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } @@ -298,7 +301,7 @@ SyncRepQueueInsert(void) if (proc) SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); else - SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks)); + SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); } /* @@ -368,7 +371,8 @@ SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; volatile WalSnd *syncWalSnd = NULL; - int numprocs = 0; + int numwrite = 0; + int numflush = 0; int priority = 0; int i; @@ -419,20 +423,28 @@ SyncRepReleaseWaiters(void) return; } - if (XLByteLT(walsndctl->lsn, MyWalSnd->flush)) + /* + * Set the lsn first so that when we wake backends they will release + * up to this location. + */ + if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write)) { - /* - * Set the lsn first so that when we wake backends they will release - * up to this location. - */ - walsndctl->lsn = MyWalSnd->flush; - numprocs = SyncRepWakeQueue(false); + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); + } + if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush)) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to %X/%X", - numprocs, + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + numwrite, + MyWalSnd->write.xlogid, + MyWalSnd->write.xrecoff, + numflush, MyWalSnd->flush.xlogid, MyWalSnd->flush.xrecoff); @@ -507,24 +519,26 @@ SyncRepGetStandbyPriority(void) } /* - * Walk queue from head. Set the state of any backends that need to be woken, - * remove them from the queue, and then wake them. Pass all = true to wake - * whole queue; otherwise, just wake up to the walsender's LSN. + * Walk the specified queue from head. Set the state of any backends that + * need to be woken, remove them from the queue, and then wake them. + * Pass all = true to wake whole queue; otherwise, just wake up to + * the walsender's LSN. * * Must hold SyncRepLock. */ int -SyncRepWakeQueue(bool all) +SyncRepWakeQueue(bool all, int mode) { volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; - Assert(SyncRepQueueIsOrderedByLSN()); + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(SyncRepQueueIsOrderedByLSN(mode)); - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -532,7 +546,7 @@ SyncRepWakeQueue(bool all) /* * Assume the queue is ordered by LSN */ - if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN)) + if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN)) return numprocs; /* @@ -540,7 +554,7 @@ SyncRepWakeQueue(bool all) * thisproc is valid, proc may be NULL after this. */ thisproc = proc; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); @@ -588,7 +602,12 @@ SyncRepUpdateSyncStandbysDefined(void) * wants synchronous replication, we'd better wake them up. */ if (!sync_standbys_defined) - SyncRepWakeQueue(true); + { + int i; + + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SyncRepWakeQueue(true, i); + } /* * Only allow people to join the queue when there are synchronous @@ -605,16 +624,18 @@ SyncRepUpdateSyncStandbysDefined(void) #ifdef USE_ASSERT_CHECKING static bool -SyncRepQueueIsOrderedByLSN(void) +SyncRepQueueIsOrderedByLSN(int mode) { PGPROC *proc = NULL; XLogRecPtr lastLSN; + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + lastLSN.xlogid = 0; lastLSN.xrecoff = 0; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), - &(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), + &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) @@ -628,7 +649,7 @@ SyncRepQueueIsOrderedByLSN(void) lastLSN = proc->waitLSN; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } @@ -675,3 +696,20 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source) return true; } + +void +assign_synchronous_commit(int newval, void *extra) +{ + switch (newval) + { + case SYNCHRONOUS_COMMIT_REMOTE_WRITE: + SyncRepWaitMode = SYNC_REP_WAIT_WRITE; + break; + case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: + SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; + break; + default: + SyncRepWaitMode = SYNC_REP_NO_WAIT; + break; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3611713434a..5f938124e72 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1410,7 +1410,8 @@ WalSndShmemInit(void) /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); - SHMQueueInit(&(WalSndCtl->SyncRepQueue)); + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9fc96b2126a..ec8f2f2309b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -370,11 +370,12 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", and "local" are documented, we + * Although only "on", "off", "write", and "local" are documented, we * accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, + {"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, @@ -3164,7 +3165,7 @@ static struct config_enum ConfigureNamesEnum[] = }, &synchronous_commit, SYNCHRONOUS_COMMIT_ON, synchronous_commit_options, - NULL, NULL, NULL + NULL, assign_synchronous_commit, NULL }, { |