aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/syncrep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r--src/backend/replication/syncrep.c549
1 files changed, 418 insertions, 131 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 12c0c3bee3e..d454e7f3683 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -4,8 +4,8 @@
*
* Synchronous replication is new as of PostgreSQL 9.1.
*
- * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the synchronous standby.
+ * If requested, transaction commits wait until their commit LSN are
+ * acknowledged by the synchronous standbys.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
@@ -13,7 +13,7 @@
*
* The essence of this design is that it isolates all logic about
* waiting/releasing onto the primary. The primary defines which standbys
- * it wishes to wait for. The standby is completely unaware of the
+ * it wishes to wait for. The standbys are completely unaware of the
* durability requirements of transactions on the primary, reducing the
* complexity of the code and streamlining both standby operations and
* network bandwidth because there is no requirement to ship
@@ -21,19 +21,32 @@
*
* Replication is either synchronous or not synchronous (async). If it is
* async, we just fastpath out of here. If it is sync, then we wait for
- * the write or flush location on the standby before releasing the waiting
- * backend. Further complexity in that interaction is expected in later
- * releases.
+ * the write, flush or apply location on the standby before releasing
+ * the waiting backend. Further complexity in that interaction is
+ * expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
* single ordered queue of waiting backends, so that we can avoid
* searching the through all waiters each time we receive a reply.
*
- * In 9.1 we support only a single synchronous standby, chosen from a
- * priority list of synchronous_standby_names. Before it can become the
- * synchronous standby it must have caught up with the primary; that may
- * take some time. Once caught up, the current highest priority standby
- * will release waiters from the queue.
+ * In 9.5 or before only a single standby could be considered as
+ * synchronous. In 9.6 we support multiple synchronous standbys.
+ * The number of synchronous standbys that transactions must wait for
+ * replies from is specified in synchronous_standby_names.
+ * This parameter also specifies a list of standby names,
+ * which determines the priority of each standby for being chosen as
+ * a synchronous standby. The standbys whose names appear earlier
+ * in the list are given higher priority and will be considered as
+ * synchronous. Other standby servers appearing later in this list
+ * represent potential synchronous standbys. If any of the current
+ * synchronous standbys disconnects for whatever reason, it will be
+ * replaced immediately with the next-highest-priority standby.
+ *
+ * Before the standbys chosen from synchronous_standby_names can
+ * become the synchronous standbys they must have caught up with
+ * the primary; that may take some time. Once caught up,
+ * the current higher priority standbys which are considered as
+ * synchronous at that moment will release waiters from the queue.
*
* Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
@@ -65,12 +78,17 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
+SyncRepConfigData *SyncRepConfig;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
+static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ bool *am_sync);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
@@ -343,6 +361,11 @@ SyncRepInitConfig(void)
{
int priority;
+ /* Update the config data of synchronous replication */
+ SyncRepFreeConfig(SyncRepConfig);
+ SyncRepConfig = NULL;
+ SyncRepUpdateConfig();
+
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
@@ -360,62 +383,8 @@ SyncRepInitConfig(void)
}
/*
- * Find the WAL sender servicing the synchronous standby with the lowest
- * priority value, or NULL if no synchronous standby is connected. If there
- * are multiple standbys with the same lowest priority value, the first one
- * found is selected. The caller must hold SyncRepLock.
- */
-WalSnd *
-SyncRepGetSynchronousStandby(void)
-{
- WalSnd *result = NULL;
- int result_priority = 0;
- int i;
-
- for (i = 0; i < max_wal_senders; i++)
- {
- /* Use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- int this_priority;
-
- /* Must be active */
- if (walsnd->pid == 0)
- continue;
-
- /* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
- continue;
-
- /* Must be synchronous */
- this_priority = walsnd->sync_standby_priority;
- if (this_priority == 0)
- continue;
-
- /* Must have a lower priority value than any previous ones */
- if (result != NULL && result_priority <= this_priority)
- continue;
-
- /* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
- continue;
-
- result = (WalSnd *) walsnd;
- result_priority = this_priority;
-
- /*
- * If priority is equal to 1, there cannot be any other WAL senders
- * with a lower priority, so we're done.
- */
- if (this_priority == 1)
- return result;
- }
-
- return result;
-}
-
-/*
* Update the LSNs on each queue based upon our latest state. This
- * implements a simple policy of first-valid-standby-releases-waiter.
+ * implements a simple policy of first-valid-sync-standby-releases-waiter.
*
* Other policies are possible, which would change what we do here and
* perhaps also which information we store as well.
@@ -424,7 +393,11 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- WalSnd *syncWalSnd;
+ XLogRecPtr writePtr;
+ XLogRecPtr flushPtr;
+ XLogRecPtr applyPtr;
+ bool got_oldest;
+ bool am_sync;
int numwrite = 0;
int numflush = 0;
int numapply = 0;
@@ -438,25 +411,44 @@ SyncRepReleaseWaiters(void)
if (MyWalSnd->sync_standby_priority == 0 ||
MyWalSnd->state < WALSNDSTATE_STREAMING ||
XLogRecPtrIsInvalid(MyWalSnd->flush))
+ {
+ announce_next_takeover = true;
return;
+ }
/*
- * We're a potential sync standby. Release waiters if we are the highest
- * priority standby.
+ * We're a potential sync standby. Release waiters if there are
+ * enough sync standbys and we are considered as sync.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- syncWalSnd = SyncRepGetSynchronousStandby();
- /* We should have found ourselves at least */
- Assert(syncWalSnd != NULL);
+ /*
+ * Check whether we are a sync standby or not, and calculate
+ * the oldest positions among all sync standbys.
+ */
+ got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
+ &applyPtr, &am_sync);
+
+ /*
+ * If we are managing a sync standby, though we weren't
+ * prior to this, then announce we are now a sync standby.
+ */
+ if (announce_next_takeover && am_sync)
+ {
+ announce_next_takeover = false;
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
+ application_name, MyWalSnd->sync_standby_priority)));
+ }
/*
- * If we aren't managing the highest priority standby then just leave.
+ * If the number of sync standbys is less than requested or we aren't
+ * managing a sync standby then just leave.
*/
- if (syncWalSnd != MyWalSnd)
+ if (!got_oldest || !am_sync)
{
LWLockRelease(SyncRepLock);
- announce_next_takeover = true;
+ announce_next_takeover = !am_sync;
return;
}
@@ -464,40 +456,267 @@ SyncRepReleaseWaiters(void)
* Set the lsn first so that when we wake backends they will release up to
* this location.
*/
- if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+ if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+ walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
- if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
+ if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+ walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
- if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
}
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
- numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
- numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
+ numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
+ numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
+}
+
+/*
+ * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ *
+ * Return false if the number of sync standbys is less than
+ * synchronous_standby_names specifies. Otherwise return true and
+ * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static bool
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr, bool *am_sync)
+{
+ List *sync_standbys;
+ ListCell *cell;
+
+ *writePtr = InvalidXLogRecPtr;
+ *flushPtr = InvalidXLogRecPtr;
+ *applyPtr = InvalidXLogRecPtr;
+ *am_sync = false;
+
+ /* Get standbys that are considered as synchronous at this moment */
+ sync_standbys = SyncRepGetSyncStandbys(am_sync);
/*
- * If we are managing the highest priority standby, though we weren't
- * prior to this, then announce we are now the sync standby.
+ * Quick exit if we are not managing a sync standby or there are not
+ * enough synchronous standbys.
*/
- if (announce_next_takeover)
+ if (!(*am_sync) || list_length(sync_standbys) < SyncRepConfig->num_sync)
{
- announce_next_takeover = false;
- ereport(LOG,
- (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
- application_name, MyWalSnd->sync_standby_priority)));
+ list_free(sync_standbys);
+ return false;
+ }
+
+ /*
+ * Scan through all sync standbys and calculate the oldest
+ * Write, Flush and Apply positions.
+ */
+ foreach (cell, sync_standbys)
+ {
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+
+ SpinLockAcquire(&walsnd->mutex);
+ write = walsnd->write;
+ flush = walsnd->flush;
+ apply = walsnd->apply;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
+ *writePtr = write;
+ if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
+ *flushPtr = flush;
+ if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
+ *applyPtr = apply;
+ }
+
+ list_free(sync_standbys);
+ return true;
+}
+
+/*
+ * Return the list of sync standbys, or NIL if no sync standby is connected.
+ *
+ * If there are multiple standbys with the same priority,
+ * the first one found is selected preferentially.
+ * The caller must hold SyncRepLock.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+List *
+SyncRepGetSyncStandbys(bool *am_sync)
+{
+ List *result = NIL;
+ List *pending = NIL;
+ int lowest_priority;
+ int next_highest_priority;
+ int this_priority;
+ int priority;
+ int i;
+ bool am_in_pending = false;
+ volatile WalSnd *walsnd; /* Use volatile pointer to prevent
+ * code rearrangement */
+
+ /* Quick exit if sync replication is not requested */
+ if (SyncRepConfig == NULL)
+ return NIL;
+
+ if (am_sync != NULL)
+ *am_sync = false;
+
+ lowest_priority = list_length(SyncRepConfig->members);
+ next_highest_priority = lowest_priority + 1;
+
+ /*
+ * Find the sync standbys which have the highest priority (i.e, 1).
+ * Also store all the other potential sync standbys into the pending list,
+ * in order to scan it later and find other sync standbys from it quickly.
+ */
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ walsnd = &WalSndCtl->walsnds[i];
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == 0)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ /*
+ * If the priority is equal to 1, consider this standby as sync
+ * and append it to the result. Otherwise append this standby
+ * to the pending list to check if it's actually sync or not later.
+ */
+ if (this_priority == 1)
+ {
+ result = lappend_int(result, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ *am_sync = true;
+ if (list_length(result) == SyncRepConfig->num_sync)
+ {
+ list_free(pending);
+ return result; /* Exit if got enough sync standbys */
+ }
+ }
+ else
+ {
+ pending = lappend_int(pending, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ am_in_pending = true;
+
+ /*
+ * Track the highest priority among the standbys in the pending
+ * list, in order to use it as the starting priority for later scan
+ * of the list. This is useful to find quickly the sync standbys
+ * from the pending list later because we can skip unnecessary
+ * scans for the unused priorities.
+ */
+ if (this_priority < next_highest_priority)
+ next_highest_priority = this_priority;
+ }
+ }
+
+ /*
+ * Consider all pending standbys as sync if the number of them plus
+ * already-found sync ones is lower than the configuration requests.
+ */
+ if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
+ {
+ bool needfree = (result != NIL && pending != NIL);
+
+ /*
+ * Set *am_sync to true if this walsender is in the pending list
+ * because all pending standbys are considered as sync.
+ */
+ if (am_sync != NULL && !(*am_sync))
+ *am_sync = am_in_pending;
+
+ result = list_concat(result, pending);
+ if (needfree)
+ pfree(pending);
+ return result;
+ }
+
+ /*
+ * Find the sync standbys from the pending list.
+ */
+ priority = next_highest_priority;
+ while (priority <= lowest_priority)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ ListCell *next;
+
+ next_highest_priority = lowest_priority + 1;
+
+ for (cell = list_head(pending); cell != NULL; cell = next)
+ {
+ i = lfirst_int(cell);
+ walsnd = &WalSndCtl->walsnds[i];
+
+ next = lnext(cell);
+
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == priority)
+ {
+ result = lappend_int(result, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ *am_sync = true;
+
+ /*
+ * We should always exit here after the scan of pending list
+ * starts because we know that the list has enough elements
+ * to reach SyncRepConfig->num_sync.
+ */
+ if (list_length(result) == SyncRepConfig->num_sync)
+ {
+ list_free(pending);
+ return result; /* Exit if got enough sync standbys */
+ }
+
+ /*
+ * Remove the entry for this sync standby from the list
+ * to prevent us from looking at the same entry again.
+ */
+ pending = list_delete_cell(pending, cell, prev);
+
+ continue;
+ }
+
+ if (this_priority < next_highest_priority)
+ next_highest_priority = this_priority;
+
+ prev = cell;
+ }
+
+ priority = next_highest_priority;
}
+
+ /* never reached, but keep compiler quiet */
+ Assert(false);
+ return result;
}
/*
@@ -511,8 +730,7 @@ SyncRepReleaseWaiters(void)
static int
SyncRepGetStandbyPriority(void)
{
- char *rawstring;
- List *elemlist;
+ List *members;
ListCell *l;
int priority = 0;
bool found = false;
@@ -524,20 +742,11 @@ SyncRepGetStandbyPriority(void)
if (am_cascading_walsender)
return 0;
- /* Need a modifiable copy of string */
- rawstring = pstrdup(SyncRepStandbyNames);
-
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- pfree(rawstring);
- list_free(elemlist);
- /* GUC machinery will have already complained - no need to do again */
+ if (!SyncStandbysDefined())
return 0;
- }
- foreach(l, elemlist)
+ members = SyncRepConfig->members;
+ foreach(l, members)
{
char *standby_name = (char *) lfirst(l);
@@ -551,9 +760,6 @@ SyncRepGetStandbyPriority(void)
}
}
- pfree(rawstring);
- list_free(elemlist);
-
return (found ? priority : 0);
}
@@ -661,6 +867,45 @@ SyncRepUpdateSyncStandbysDefined(void)
}
}
+/*
+ * Parse synchronous_standby_names and update the config data
+ * of synchronous standbys.
+ */
+void
+SyncRepUpdateConfig(void)
+{
+ int parse_rc;
+
+ if (!SyncStandbysDefined())
+ return;
+
+ /*
+ * check_synchronous_standby_names() verifies the setting value of
+ * synchronous_standby_names before this function is called. So
+ * syncrep_yyparse() must not cause an error here.
+ */
+ syncrep_scanner_init(SyncRepStandbyNames);
+ parse_rc = syncrep_yyparse();
+ Assert(parse_rc == 0);
+ syncrep_scanner_finish();
+
+ SyncRepConfig = syncrep_parse_result;
+ syncrep_parse_result = NULL;
+}
+
+/*
+ * Free a previously-allocated config data of synchronous replication.
+ */
+void
+SyncRepFreeConfig(SyncRepConfigData *config)
+{
+ if (!config)
+ return;
+
+ list_free_deep(config->members);
+ pfree(config);
+}
+
#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(int mode)
@@ -705,32 +950,74 @@ SyncRepQueueIsOrderedByLSN(int mode)
bool
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{
- char *rawstring;
- List *elemlist;
-
- /* Need a modifiable copy of string */
- rawstring = pstrdup(*newval);
+ int parse_rc;
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ if (*newval != NULL && (*newval)[0] != '\0')
{
- /* syntax error in list */
- GUC_check_errdetail("List syntax is invalid.");
- pfree(rawstring);
- list_free(elemlist);
- return false;
- }
+ syncrep_scanner_init(*newval);
+ parse_rc = syncrep_yyparse();
+ syncrep_scanner_finish();
- /*
- * Any additional validation of standby names should go here.
- *
- * Don't attempt to set WALSender priority because this is executed by
- * postmaster at startup, not WALSender, so the application_name is not
- * yet correctly set.
- */
+ if (parse_rc != 0)
+ {
+ GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+ GUC_check_errdetail("synchronous_standby_names parser returned %d",
+ parse_rc);
+ return false;
+ }
+
+ /*
+ * Warn if num_sync exceeds the number of names of potential sync
+ * standbys. This setting doesn't make sense in most cases because
+ * it implies that enough number of sync standbys will not appear,
+ * which makes transaction commits wait for sync replication
+ * infinitely.
+ *
+ * If there are more than one standbys having the same name and
+ * priority, we can see enough sync standbys to complete transaction
+ * commits. However it's not recommended to run multiple standbys
+ * with the same priority because we cannot gain full control of
+ * the selection of sync standbys from them.
+ *
+ * OTOH, that setting is OK if we understand the above problem
+ * regarding the selection of sync standbys and intentionally
+ * specify * to match all the standbys.
+ */
+ if (syncrep_parse_result->num_sync >
+ list_length(syncrep_parse_result->members))
+ {
+ ListCell *l;
+ bool has_asterisk = false;
+
+ foreach(l, syncrep_parse_result->members)
+ {
+ char *standby_name = (char *) lfirst(l);
+
+ if (pg_strcasecmp(standby_name, "*") == 0)
+ {
+ has_asterisk = true;
+ break;
+ }
+ }
+
+ /*
+ * Only the postmaster warns this inappropriate setting
+ * to avoid cluttering the log.
+ */
+ if (!has_asterisk && !IsUnderPostmaster)
+ ereport(WARNING,
+ (errmsg("The configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)",
+ syncrep_parse_result->num_sync, list_length(syncrep_parse_result->members)),
+ errhint("Specify more names of potential synchronous standbys in synchronous_standby_names.")));
+ }
- pfree(rawstring);
- list_free(elemlist);
+ /*
+ * syncrep_yyparse sets the global syncrep_parse_result as side effect.
+ * But this function is required to just check, so frees it
+ * after parsing the parameter.
+ */
+ SyncRepFreeConfig(syncrep_parse_result);
+ }
return true;
}