diff options
Diffstat (limited to 'src/backend/replication/syncrep.c')
-rw-r--r-- | src/backend/replication/syncrep.c | 549 |
1 files changed, 418 insertions, 131 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 12c0c3bee3e..d454e7f3683 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -4,8 +4,8 @@ * * Synchronous replication is new as of PostgreSQL 9.1. * - * If requested, transaction commits wait until their commit LSN is - * acknowledged by the synchronous standby. + * If requested, transaction commits wait until their commit LSN are + * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming @@ -13,7 +13,7 @@ * * The essence of this design is that it isolates all logic about * waiting/releasing onto the primary. The primary defines which standbys - * it wishes to wait for. The standby is completely unaware of the + * it wishes to wait for. The standbys are completely unaware of the * durability requirements of transactions on the primary, reducing the * complexity of the code and streamlining both standby operations and * network bandwidth because there is no requirement to ship @@ -21,19 +21,32 @@ * * Replication is either synchronous or not synchronous (async). If it is * async, we just fastpath out of here. If it is sync, then we wait for - * the write or flush location on the standby before releasing the waiting - * backend. Further complexity in that interaction is expected in later - * releases. + * the write, flush or apply location on the standby before releasing + * the waiting backend. Further complexity in that interaction is + * expected in later releases. * * The best performing way to manage the waiting backends is to have a * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby - * will release waiters from the queue. + * In 9.5 or before only a single standby could be considered as + * synchronous. In 9.6 we support multiple synchronous standbys. + * The number of synchronous standbys that transactions must wait for + * replies from is specified in synchronous_standby_names. + * This parameter also specifies a list of standby names, + * which determines the priority of each standby for being chosen as + * a synchronous standby. The standbys whose names appear earlier + * in the list are given higher priority and will be considered as + * synchronous. Other standby servers appearing later in this list + * represent potential synchronous standbys. If any of the current + * synchronous standbys disconnects for whatever reason, it will be + * replaced immediately with the next-highest-priority standby. + * + * Before the standbys chosen from synchronous_standby_names can + * become the synchronous standbys they must have caught up with + * the primary; that may take some time. Once caught up, + * the current higher priority standbys which are considered as + * synchronous at that moment will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * @@ -65,12 +78,17 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; +SyncRepConfigData *SyncRepConfig; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); +static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + bool *am_sync); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING @@ -343,6 +361,11 @@ SyncRepInitConfig(void) { int priority; + /* Update the config data of synchronous replication */ + SyncRepFreeConfig(SyncRepConfig); + SyncRepConfig = NULL; + SyncRepUpdateConfig(); + /* * Determine if we are a potential sync standby and remember the result * for handling replies from standby. @@ -360,62 +383,8 @@ SyncRepInitConfig(void) } /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. - */ -WalSnd * -SyncRepGetSynchronousStandby(void) -{ - WalSnd *result = NULL; - int result_priority = 0; - int i; - - for (i = 0; i < max_wal_senders; i++) - { - /* Use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - int this_priority; - - /* Must be active */ - if (walsnd->pid == 0) - continue; - - /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a lower priority value than any previous ones */ - if (result != NULL && result_priority <= this_priority) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) - continue; - - result = (WalSnd *) walsnd; - result_priority = this_priority; - - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. - */ - if (this_priority == 1) - return result; - } - - return result; -} - -/* * Update the LSNs on each queue based upon our latest state. This - * implements a simple policy of first-valid-standby-releases-waiter. + * implements a simple policy of first-valid-sync-standby-releases-waiter. * * Other policies are possible, which would change what we do here and * perhaps also which information we store as well. @@ -424,7 +393,11 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; + XLogRecPtr writePtr; + XLogRecPtr flushPtr; + XLogRecPtr applyPtr; + bool got_oldest; + bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -438,25 +411,44 @@ SyncRepReleaseWaiters(void) if (MyWalSnd->sync_standby_priority == 0 || MyWalSnd->state < WALSNDSTATE_STREAMING || XLogRecPtrIsInvalid(MyWalSnd->flush)) + { + announce_next_takeover = true; return; + } /* - * We're a potential sync standby. Release waiters if we are the highest - * priority standby. + * We're a potential sync standby. Release waiters if there are + * enough sync standbys and we are considered as sync. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + /* + * Check whether we are a sync standby or not, and calculate + * the oldest positions among all sync standbys. + */ + got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, + &applyPtr, &am_sync); + + /* + * If we are managing a sync standby, though we weren't + * prior to this, then announce we are now a sync standby. + */ + if (announce_next_takeover && am_sync) + { + announce_next_takeover = false; + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, MyWalSnd->sync_standby_priority))); + } /* - * If we aren't managing the highest priority standby then just leave. + * If the number of sync standbys is less than requested or we aren't + * managing a sync standby then just leave. */ - if (syncWalSnd != MyWalSnd) + if (!got_oldest || !am_sync) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; + announce_next_takeover = !am_sync; return; } @@ -464,40 +456,267 @@ SyncRepReleaseWaiters(void) * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } - if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) { - walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", - numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, - numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", + numwrite, (uint32) (writePtr >> 32), (uint32) writePtr, + numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr, + numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr); +} + +/* + * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * + * Return false if the number of sync standbys is less than + * synchronous_standby_names specifies. Otherwise return true and + * store the oldest positions into *writePtr, *flushPtr and *applyPtr. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static bool +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, bool *am_sync) +{ + List *sync_standbys; + ListCell *cell; + + *writePtr = InvalidXLogRecPtr; + *flushPtr = InvalidXLogRecPtr; + *applyPtr = InvalidXLogRecPtr; + *am_sync = false; + + /* Get standbys that are considered as synchronous at this moment */ + sync_standbys = SyncRepGetSyncStandbys(am_sync); /* - * If we are managing the highest priority standby, though we weren't - * prior to this, then announce we are now the sync standby. + * Quick exit if we are not managing a sync standby or there are not + * enough synchronous standbys. */ - if (announce_next_takeover) + if (!(*am_sync) || list_length(sync_standbys) < SyncRepConfig->num_sync) { - announce_next_takeover = false; - ereport(LOG, - (errmsg("standby \"%s\" is now the synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); + list_free(sync_standbys); + return false; + } + + /* + * Scan through all sync standbys and calculate the oldest + * Write, Flush and Apply positions. + */ + foreach (cell, sync_standbys) + { + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + + SpinLockAcquire(&walsnd->mutex); + write = walsnd->write; + flush = walsnd->flush; + apply = walsnd->apply; + SpinLockRelease(&walsnd->mutex); + + if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) + *writePtr = write; + if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush) + *flushPtr = flush; + if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) + *applyPtr = apply; + } + + list_free(sync_standbys); + return true; +} + +/* + * Return the list of sync standbys, or NIL if no sync standby is connected. + * + * If there are multiple standbys with the same priority, + * the first one found is selected preferentially. + * The caller must hold SyncRepLock. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +List * +SyncRepGetSyncStandbys(bool *am_sync) +{ + List *result = NIL; + List *pending = NIL; + int lowest_priority; + int next_highest_priority; + int this_priority; + int priority; + int i; + bool am_in_pending = false; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent + * code rearrangement */ + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return NIL; + + if (am_sync != NULL) + *am_sync = false; + + lowest_priority = list_length(SyncRepConfig->members); + next_highest_priority = lowest_priority + 1; + + /* + * Find the sync standbys which have the highest priority (i.e, 1). + * Also store all the other potential sync standbys into the pending list, + * in order to scan it later and find other sync standbys from it quickly. + */ + for (i = 0; i < max_wal_senders; i++) + { + walsnd = &WalSndCtl->walsnds[i]; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + this_priority = walsnd->sync_standby_priority; + if (this_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * If the priority is equal to 1, consider this standby as sync + * and append it to the result. Otherwise append this standby + * to the pending list to check if it's actually sync or not later. + */ + if (this_priority == 1) + { + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + if (list_length(result) == SyncRepConfig->num_sync) + { + list_free(pending); + return result; /* Exit if got enough sync standbys */ + } + } + else + { + pending = lappend_int(pending, i); + if (am_sync != NULL && walsnd == MyWalSnd) + am_in_pending = true; + + /* + * Track the highest priority among the standbys in the pending + * list, in order to use it as the starting priority for later scan + * of the list. This is useful to find quickly the sync standbys + * from the pending list later because we can skip unnecessary + * scans for the unused priorities. + */ + if (this_priority < next_highest_priority) + next_highest_priority = this_priority; + } + } + + /* + * Consider all pending standbys as sync if the number of them plus + * already-found sync ones is lower than the configuration requests. + */ + if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) + { + bool needfree = (result != NIL && pending != NIL); + + /* + * Set *am_sync to true if this walsender is in the pending list + * because all pending standbys are considered as sync. + */ + if (am_sync != NULL && !(*am_sync)) + *am_sync = am_in_pending; + + result = list_concat(result, pending); + if (needfree) + pfree(pending); + return result; + } + + /* + * Find the sync standbys from the pending list. + */ + priority = next_highest_priority; + while (priority <= lowest_priority) + { + ListCell *cell; + ListCell *prev = NULL; + ListCell *next; + + next_highest_priority = lowest_priority + 1; + + for (cell = list_head(pending); cell != NULL; cell = next) + { + i = lfirst_int(cell); + walsnd = &WalSndCtl->walsnds[i]; + + next = lnext(cell); + + this_priority = walsnd->sync_standby_priority; + if (this_priority == priority) + { + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + + /* + * We should always exit here after the scan of pending list + * starts because we know that the list has enough elements + * to reach SyncRepConfig->num_sync. + */ + if (list_length(result) == SyncRepConfig->num_sync) + { + list_free(pending); + return result; /* Exit if got enough sync standbys */ + } + + /* + * Remove the entry for this sync standby from the list + * to prevent us from looking at the same entry again. + */ + pending = list_delete_cell(pending, cell, prev); + + continue; + } + + if (this_priority < next_highest_priority) + next_highest_priority = this_priority; + + prev = cell; + } + + priority = next_highest_priority; } + + /* never reached, but keep compiler quiet */ + Assert(false); + return result; } /* @@ -511,8 +730,7 @@ SyncRepReleaseWaiters(void) static int SyncRepGetStandbyPriority(void) { - char *rawstring; - List *elemlist; + List *members; ListCell *l; int priority = 0; bool found = false; @@ -524,20 +742,11 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; - /* Need a modifiable copy of string */ - rawstring = pstrdup(SyncRepStandbyNames); - - /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) - { - /* syntax error in list */ - pfree(rawstring); - list_free(elemlist); - /* GUC machinery will have already complained - no need to do again */ + if (!SyncStandbysDefined()) return 0; - } - foreach(l, elemlist) + members = SyncRepConfig->members; + foreach(l, members) { char *standby_name = (char *) lfirst(l); @@ -551,9 +760,6 @@ SyncRepGetStandbyPriority(void) } } - pfree(rawstring); - list_free(elemlist); - return (found ? priority : 0); } @@ -661,6 +867,45 @@ SyncRepUpdateSyncStandbysDefined(void) } } +/* + * Parse synchronous_standby_names and update the config data + * of synchronous standbys. + */ +void +SyncRepUpdateConfig(void) +{ + int parse_rc; + + if (!SyncStandbysDefined()) + return; + + /* + * check_synchronous_standby_names() verifies the setting value of + * synchronous_standby_names before this function is called. So + * syncrep_yyparse() must not cause an error here. + */ + syncrep_scanner_init(SyncRepStandbyNames); + parse_rc = syncrep_yyparse(); + Assert(parse_rc == 0); + syncrep_scanner_finish(); + + SyncRepConfig = syncrep_parse_result; + syncrep_parse_result = NULL; +} + +/* + * Free a previously-allocated config data of synchronous replication. + */ +void +SyncRepFreeConfig(SyncRepConfigData *config) +{ + if (!config) + return; + + list_free_deep(config->members); + pfree(config); +} + #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode) @@ -705,32 +950,74 @@ SyncRepQueueIsOrderedByLSN(int mode) bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { - char *rawstring; - List *elemlist; - - /* Need a modifiable copy of string */ - rawstring = pstrdup(*newval); + int parse_rc; - /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) + if (*newval != NULL && (*newval)[0] != '\0') { - /* syntax error in list */ - GUC_check_errdetail("List syntax is invalid."); - pfree(rawstring); - list_free(elemlist); - return false; - } + syncrep_scanner_init(*newval); + parse_rc = syncrep_yyparse(); + syncrep_scanner_finish(); - /* - * Any additional validation of standby names should go here. - * - * Don't attempt to set WALSender priority because this is executed by - * postmaster at startup, not WALSender, so the application_name is not - * yet correctly set. - */ + if (parse_rc != 0) + { + GUC_check_errcode(ERRCODE_SYNTAX_ERROR); + GUC_check_errdetail("synchronous_standby_names parser returned %d", + parse_rc); + return false; + } + + /* + * Warn if num_sync exceeds the number of names of potential sync + * standbys. This setting doesn't make sense in most cases because + * it implies that enough number of sync standbys will not appear, + * which makes transaction commits wait for sync replication + * infinitely. + * + * If there are more than one standbys having the same name and + * priority, we can see enough sync standbys to complete transaction + * commits. However it's not recommended to run multiple standbys + * with the same priority because we cannot gain full control of + * the selection of sync standbys from them. + * + * OTOH, that setting is OK if we understand the above problem + * regarding the selection of sync standbys and intentionally + * specify * to match all the standbys. + */ + if (syncrep_parse_result->num_sync > + list_length(syncrep_parse_result->members)) + { + ListCell *l; + bool has_asterisk = false; + + foreach(l, syncrep_parse_result->members) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, "*") == 0) + { + has_asterisk = true; + break; + } + } + + /* + * Only the postmaster warns this inappropriate setting + * to avoid cluttering the log. + */ + if (!has_asterisk && !IsUnderPostmaster) + ereport(WARNING, + (errmsg("The configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)", + syncrep_parse_result->num_sync, list_length(syncrep_parse_result->members)), + errhint("Specify more names of potential synchronous standbys in synchronous_standby_names."))); + } - pfree(rawstring); - list_free(elemlist); + /* + * syncrep_yyparse sets the global syncrep_parse_result as side effect. + * But this function is required to just check, so frees it + * after parsing the parameter. + */ + SyncRepFreeConfig(syncrep_parse_result); + } return true; } |