aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/syncrep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r--src/backend/replication/syncrep.c88
1 files changed, 60 insertions, 28 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfba6cf..a2b17f15163 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
@@ -358,6 +358,60 @@ SyncRepInitConfig(void)
}
/*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple standbys with the same lowest priority value, the first one
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+ WalSnd *result = NULL;
+ int result_priority = 0;
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* Use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ int this_priority;
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == 0)
+ continue;
+
+ /* Must have a lower priority value than any previous ones */
+ if (result != NULL && result_priority <= this_priority)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ result = (WalSnd *) walsnd;
+ result_priority = this_priority;
+
+ /*
+ * If priority is equal to 1, there cannot be any other WAL senders
+ * with a lower priority, so we're done.
+ */
+ if (this_priority == 1)
+ return result;
+ }
+
+ return result;
+}
+
+/*
* Update the LSNs on each queue based upon our latest state. This
* implements a simple policy of first-valid-standby-releases-waiter.
*
@@ -368,11 +422,9 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- volatile WalSnd *syncWalSnd = NULL;
+ WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
- int priority = 0;
- int i;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
/*
* We're a potential sync standby. Release waiters if we are the highest
- * priority standby. If there are multiple standbys with same priorities
- * then we use the first mentioned standby. If you change this, also
- * change pg_stat_get_wal_senders().
+ * priority standby.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ syncWalSnd = SyncRepGetSynchronousStandby();
- for (i = 0; i < max_wal_senders; i++)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
- if (walsnd->pid != 0 &&
- walsnd->state == WALSNDSTATE_STREAMING &&
- walsnd->sync_standby_priority > 0 &&
- (priority == 0 ||
- priority > walsnd->sync_standby_priority) &&
- !XLogRecPtrIsInvalid(walsnd->flush))
- {
- priority = walsnd->sync_standby_priority;
- syncWalSnd = walsnd;
- }
- }
-
- /*
- * We should have found ourselves at least.
- */
- Assert(syncWalSnd);
+ /* We should have found ourselves at least */
+ Assert(syncWalSnd != NULL);
/*
* If we aren't managing the highest priority standby then just leave.