diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/Makefile | 4 | ||||
-rw-r--r-- | src/backend/replication/.gitignore | 2 | ||||
-rw-r--r-- | src/backend/replication/Makefile | 11 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 549 | ||||
-rw-r--r-- | src/backend/replication/syncrep_gram.y | 86 | ||||
-rw-r--r-- | src/backend/replication/syncrep_scanner.l | 144 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 19 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 2 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 | ||||
-rw-r--r-- | src/include/replication/syncrep.h | 31 | ||||
-rw-r--r-- | src/tools/msvc/Mkvcbuild.pm | 2 |
11 files changed, 706 insertions, 146 deletions
diff --git a/src/backend/Makefile b/src/backend/Makefile index d22dbbf53b6..ec2dc7be404 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -203,7 +203,7 @@ distprep: $(MAKE) -C parser gram.c gram.h scan.c $(MAKE) -C bootstrap bootparse.c bootscanner.c $(MAKE) -C catalog schemapg.h postgres.bki postgres.description postgres.shdescription - $(MAKE) -C replication repl_gram.c repl_scanner.c + $(MAKE) -C replication repl_gram.c repl_scanner.c syncrep_gram.c syncrep_scanner.c $(MAKE) -C storage/lmgr lwlocknames.h $(MAKE) -C utils fmgrtab.c fmgroids.h errcodes.h $(MAKE) -C utils/misc guc-file.c @@ -320,6 +320,8 @@ maintainer-clean: distclean catalog/postgres.shdescription \ replication/repl_gram.c \ replication/repl_scanner.c \ + replication/syncrep_gram.c \ + replication/syncrep_scanner.c \ storage/lmgr/lwlocknames.c \ storage/lmgr/lwlocknames.h \ utils/fmgroids.h \ diff --git a/src/backend/replication/.gitignore b/src/backend/replication/.gitignore index 2a0491d1496..d1df6147bd0 100644 --- a/src/backend/replication/.gitignore +++ b/src/backend/replication/.gitignore @@ -1,2 +1,4 @@ /repl_gram.c /repl_scanner.c +/syncrep_gram.c +/syncrep_scanner.c diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index b73370eaa98..c99717e0aee 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o slot.o slotfuncs.o syncrep.o + repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o SUBDIRS = logical @@ -24,5 +24,10 @@ include $(top_srcdir)/src/backend/common.mk # repl_scanner is compiled as part of repl_gram repl_gram.o: repl_scanner.c -# repl_gram.c and repl_scanner.c are in the distribution tarball, so -# they are not cleaned here. +# syncrep_scanner is complied as part of syncrep_gram +syncrep_gram.o: syncrep_scanner.c +syncrep_scanner.c: FLEXFLAGS = -CF -p +syncrep_scanner.c: FLEX_NO_BACKUP=yes + +# repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c +# are in the distribution tarball, so they are not cleaned here. diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 12c0c3bee3e..d454e7f3683 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -4,8 +4,8 @@ * * Synchronous replication is new as of PostgreSQL 9.1. * - * If requested, transaction commits wait until their commit LSN is - * acknowledged by the synchronous standby. + * If requested, transaction commits wait until their commit LSN are + * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming @@ -13,7 +13,7 @@ * * The essence of this design is that it isolates all logic about * waiting/releasing onto the primary. The primary defines which standbys - * it wishes to wait for. The standby is completely unaware of the + * it wishes to wait for. The standbys are completely unaware of the * durability requirements of transactions on the primary, reducing the * complexity of the code and streamlining both standby operations and * network bandwidth because there is no requirement to ship @@ -21,19 +21,32 @@ * * Replication is either synchronous or not synchronous (async). If it is * async, we just fastpath out of here. If it is sync, then we wait for - * the write or flush location on the standby before releasing the waiting - * backend. Further complexity in that interaction is expected in later - * releases. + * the write, flush or apply location on the standby before releasing + * the waiting backend. Further complexity in that interaction is + * expected in later releases. * * The best performing way to manage the waiting backends is to have a * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby - * will release waiters from the queue. + * In 9.5 or before only a single standby could be considered as + * synchronous. In 9.6 we support multiple synchronous standbys. + * The number of synchronous standbys that transactions must wait for + * replies from is specified in synchronous_standby_names. + * This parameter also specifies a list of standby names, + * which determines the priority of each standby for being chosen as + * a synchronous standby. The standbys whose names appear earlier + * in the list are given higher priority and will be considered as + * synchronous. Other standby servers appearing later in this list + * represent potential synchronous standbys. If any of the current + * synchronous standbys disconnects for whatever reason, it will be + * replaced immediately with the next-highest-priority standby. + * + * Before the standbys chosen from synchronous_standby_names can + * become the synchronous standbys they must have caught up with + * the primary; that may take some time. Once caught up, + * the current higher priority standbys which are considered as + * synchronous at that moment will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * @@ -65,12 +78,17 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; +SyncRepConfigData *SyncRepConfig; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); +static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + bool *am_sync); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING @@ -343,6 +361,11 @@ SyncRepInitConfig(void) { int priority; + /* Update the config data of synchronous replication */ + SyncRepFreeConfig(SyncRepConfig); + SyncRepConfig = NULL; + SyncRepUpdateConfig(); + /* * Determine if we are a potential sync standby and remember the result * for handling replies from standby. @@ -360,62 +383,8 @@ SyncRepInitConfig(void) } /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. - */ -WalSnd * -SyncRepGetSynchronousStandby(void) -{ - WalSnd *result = NULL; - int result_priority = 0; - int i; - - for (i = 0; i < max_wal_senders; i++) - { - /* Use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - int this_priority; - - /* Must be active */ - if (walsnd->pid == 0) - continue; - - /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a lower priority value than any previous ones */ - if (result != NULL && result_priority <= this_priority) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) - continue; - - result = (WalSnd *) walsnd; - result_priority = this_priority; - - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. - */ - if (this_priority == 1) - return result; - } - - return result; -} - -/* * Update the LSNs on each queue based upon our latest state. This - * implements a simple policy of first-valid-standby-releases-waiter. + * implements a simple policy of first-valid-sync-standby-releases-waiter. * * Other policies are possible, which would change what we do here and * perhaps also which information we store as well. @@ -424,7 +393,11 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; + XLogRecPtr writePtr; + XLogRecPtr flushPtr; + XLogRecPtr applyPtr; + bool got_oldest; + bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -438,25 +411,44 @@ SyncRepReleaseWaiters(void) if (MyWalSnd->sync_standby_priority == 0 || MyWalSnd->state < WALSNDSTATE_STREAMING || XLogRecPtrIsInvalid(MyWalSnd->flush)) + { + announce_next_takeover = true; return; + } /* - * We're a potential sync standby. Release waiters if we are the highest - * priority standby. + * We're a potential sync standby. Release waiters if there are + * enough sync standbys and we are considered as sync. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + /* + * Check whether we are a sync standby or not, and calculate + * the oldest positions among all sync standbys. + */ + got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, + &applyPtr, &am_sync); + + /* + * If we are managing a sync standby, though we weren't + * prior to this, then announce we are now a sync standby. + */ + if (announce_next_takeover && am_sync) + { + announce_next_takeover = false; + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, MyWalSnd->sync_standby_priority))); + } /* - * If we aren't managing the highest priority standby then just leave. + * If the number of sync standbys is less than requested or we aren't + * managing a sync standby then just leave. */ - if (syncWalSnd != MyWalSnd) + if (!got_oldest || !am_sync) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; + announce_next_takeover = !am_sync; return; } @@ -464,40 +456,267 @@ SyncRepReleaseWaiters(void) * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } - if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) { - walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", - numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, - numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", + numwrite, (uint32) (writePtr >> 32), (uint32) writePtr, + numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr, + numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr); +} + +/* + * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * + * Return false if the number of sync standbys is less than + * synchronous_standby_names specifies. Otherwise return true and + * store the oldest positions into *writePtr, *flushPtr and *applyPtr. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static bool +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, bool *am_sync) +{ + List *sync_standbys; + ListCell *cell; + + *writePtr = InvalidXLogRecPtr; + *flushPtr = InvalidXLogRecPtr; + *applyPtr = InvalidXLogRecPtr; + *am_sync = false; + + /* Get standbys that are considered as synchronous at this moment */ + sync_standbys = SyncRepGetSyncStandbys(am_sync); /* - * If we are managing the highest priority standby, though we weren't - * prior to this, then announce we are now the sync standby. + * Quick exit if we are not managing a sync standby or there are not + * enough synchronous standbys. */ - if (announce_next_takeover) + if (!(*am_sync) || list_length(sync_standbys) < SyncRepConfig->num_sync) { - announce_next_takeover = false; - ereport(LOG, - (errmsg("standby \"%s\" is now the synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); + list_free(sync_standbys); + return false; + } + + /* + * Scan through all sync standbys and calculate the oldest + * Write, Flush and Apply positions. + */ + foreach (cell, sync_standbys) + { + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + + SpinLockAcquire(&walsnd->mutex); + write = walsnd->write; + flush = walsnd->flush; + apply = walsnd->apply; + SpinLockRelease(&walsnd->mutex); + + if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) + *writePtr = write; + if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush) + *flushPtr = flush; + if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) + *applyPtr = apply; + } + + list_free(sync_standbys); + return true; +} + +/* + * Return the list of sync standbys, or NIL if no sync standby is connected. + * + * If there are multiple standbys with the same priority, + * the first one found is selected preferentially. + * The caller must hold SyncRepLock. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +List * +SyncRepGetSyncStandbys(bool *am_sync) +{ + List *result = NIL; + List *pending = NIL; + int lowest_priority; + int next_highest_priority; + int this_priority; + int priority; + int i; + bool am_in_pending = false; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent + * code rearrangement */ + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return NIL; + + if (am_sync != NULL) + *am_sync = false; + + lowest_priority = list_length(SyncRepConfig->members); + next_highest_priority = lowest_priority + 1; + + /* + * Find the sync standbys which have the highest priority (i.e, 1). + * Also store all the other potential sync standbys into the pending list, + * in order to scan it later and find other sync standbys from it quickly. + */ + for (i = 0; i < max_wal_senders; i++) + { + walsnd = &WalSndCtl->walsnds[i]; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + this_priority = walsnd->sync_standby_priority; + if (this_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * If the priority is equal to 1, consider this standby as sync + * and append it to the result. Otherwise append this standby + * to the pending list to check if it's actually sync or not later. + */ + if (this_priority == 1) + { + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + if (list_length(result) == SyncRepConfig->num_sync) + { + list_free(pending); + return result; /* Exit if got enough sync standbys */ + } + } + else + { + pending = lappend_int(pending, i); + if (am_sync != NULL && walsnd == MyWalSnd) + am_in_pending = true; + + /* + * Track the highest priority among the standbys in the pending + * list, in order to use it as the starting priority for later scan + * of the list. This is useful to find quickly the sync standbys + * from the pending list later because we can skip unnecessary + * scans for the unused priorities. + */ + if (this_priority < next_highest_priority) + next_highest_priority = this_priority; + } + } + + /* + * Consider all pending standbys as sync if the number of them plus + * already-found sync ones is lower than the configuration requests. + */ + if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) + { + bool needfree = (result != NIL && pending != NIL); + + /* + * Set *am_sync to true if this walsender is in the pending list + * because all pending standbys are considered as sync. + */ + if (am_sync != NULL && !(*am_sync)) + *am_sync = am_in_pending; + + result = list_concat(result, pending); + if (needfree) + pfree(pending); + return result; + } + + /* + * Find the sync standbys from the pending list. + */ + priority = next_highest_priority; + while (priority <= lowest_priority) + { + ListCell *cell; + ListCell *prev = NULL; + ListCell *next; + + next_highest_priority = lowest_priority + 1; + + for (cell = list_head(pending); cell != NULL; cell = next) + { + i = lfirst_int(cell); + walsnd = &WalSndCtl->walsnds[i]; + + next = lnext(cell); + + this_priority = walsnd->sync_standby_priority; + if (this_priority == priority) + { + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + + /* + * We should always exit here after the scan of pending list + * starts because we know that the list has enough elements + * to reach SyncRepConfig->num_sync. + */ + if (list_length(result) == SyncRepConfig->num_sync) + { + list_free(pending); + return result; /* Exit if got enough sync standbys */ + } + + /* + * Remove the entry for this sync standby from the list + * to prevent us from looking at the same entry again. + */ + pending = list_delete_cell(pending, cell, prev); + + continue; + } + + if (this_priority < next_highest_priority) + next_highest_priority = this_priority; + + prev = cell; + } + + priority = next_highest_priority; } + + /* never reached, but keep compiler quiet */ + Assert(false); + return result; } /* @@ -511,8 +730,7 @@ SyncRepReleaseWaiters(void) static int SyncRepGetStandbyPriority(void) { - char *rawstring; - List *elemlist; + List *members; ListCell *l; int priority = 0; bool found = false; @@ -524,20 +742,11 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; - /* Need a modifiable copy of string */ - rawstring = pstrdup(SyncRepStandbyNames); - - /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) - { - /* syntax error in list */ - pfree(rawstring); - list_free(elemlist); - /* GUC machinery will have already complained - no need to do again */ + if (!SyncStandbysDefined()) return 0; - } - foreach(l, elemlist) + members = SyncRepConfig->members; + foreach(l, members) { char *standby_name = (char *) lfirst(l); @@ -551,9 +760,6 @@ SyncRepGetStandbyPriority(void) } } - pfree(rawstring); - list_free(elemlist); - return (found ? priority : 0); } @@ -661,6 +867,45 @@ SyncRepUpdateSyncStandbysDefined(void) } } +/* + * Parse synchronous_standby_names and update the config data + * of synchronous standbys. + */ +void +SyncRepUpdateConfig(void) +{ + int parse_rc; + + if (!SyncStandbysDefined()) + return; + + /* + * check_synchronous_standby_names() verifies the setting value of + * synchronous_standby_names before this function is called. So + * syncrep_yyparse() must not cause an error here. + */ + syncrep_scanner_init(SyncRepStandbyNames); + parse_rc = syncrep_yyparse(); + Assert(parse_rc == 0); + syncrep_scanner_finish(); + + SyncRepConfig = syncrep_parse_result; + syncrep_parse_result = NULL; +} + +/* + * Free a previously-allocated config data of synchronous replication. + */ +void +SyncRepFreeConfig(SyncRepConfigData *config) +{ + if (!config) + return; + + list_free_deep(config->members); + pfree(config); +} + #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode) @@ -705,32 +950,74 @@ SyncRepQueueIsOrderedByLSN(int mode) bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { - char *rawstring; - List *elemlist; - - /* Need a modifiable copy of string */ - rawstring = pstrdup(*newval); + int parse_rc; - /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) + if (*newval != NULL && (*newval)[0] != '\0') { - /* syntax error in list */ - GUC_check_errdetail("List syntax is invalid."); - pfree(rawstring); - list_free(elemlist); - return false; - } + syncrep_scanner_init(*newval); + parse_rc = syncrep_yyparse(); + syncrep_scanner_finish(); - /* - * Any additional validation of standby names should go here. - * - * Don't attempt to set WALSender priority because this is executed by - * postmaster at startup, not WALSender, so the application_name is not - * yet correctly set. - */ + if (parse_rc != 0) + { + GUC_check_errcode(ERRCODE_SYNTAX_ERROR); + GUC_check_errdetail("synchronous_standby_names parser returned %d", + parse_rc); + return false; + } + + /* + * Warn if num_sync exceeds the number of names of potential sync + * standbys. This setting doesn't make sense in most cases because + * it implies that enough number of sync standbys will not appear, + * which makes transaction commits wait for sync replication + * infinitely. + * + * If there are more than one standbys having the same name and + * priority, we can see enough sync standbys to complete transaction + * commits. However it's not recommended to run multiple standbys + * with the same priority because we cannot gain full control of + * the selection of sync standbys from them. + * + * OTOH, that setting is OK if we understand the above problem + * regarding the selection of sync standbys and intentionally + * specify * to match all the standbys. + */ + if (syncrep_parse_result->num_sync > + list_length(syncrep_parse_result->members)) + { + ListCell *l; + bool has_asterisk = false; + + foreach(l, syncrep_parse_result->members) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, "*") == 0) + { + has_asterisk = true; + break; + } + } + + /* + * Only the postmaster warns this inappropriate setting + * to avoid cluttering the log. + */ + if (!has_asterisk && !IsUnderPostmaster) + ereport(WARNING, + (errmsg("The configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)", + syncrep_parse_result->num_sync, list_length(syncrep_parse_result->members)), + errhint("Specify more names of potential synchronous standbys in synchronous_standby_names."))); + } - pfree(rawstring); - list_free(elemlist); + /* + * syncrep_yyparse sets the global syncrep_parse_result as side effect. + * But this function is required to just check, so frees it + * after parsing the parameter. + */ + SyncRepFreeConfig(syncrep_parse_result); + } return true; } diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y new file mode 100644 index 00000000000..380fedc1712 --- /dev/null +++ b/src/backend/replication/syncrep_gram.y @@ -0,0 +1,86 @@ +%{ +/*------------------------------------------------------------------------- + * + * syncrep_gram.y - Parser for synchronous_standby_names + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/syncrep_gram.y + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "replication/syncrep.h" +#include "utils/formatting.h" + +/* Result of the parsing is returned here */ +SyncRepConfigData *syncrep_parse_result; + +static SyncRepConfigData *create_syncrep_config(char *num_sync, List *members); + +/* + * Bison doesn't allocate anything that needs to live across parser calls, + * so we can easily have it use palloc instead of malloc. This prevents + * memory leaks if we error out during parsing. Note this only works with + * bison >= 2.0. However, in bison 1.875 the default is to use alloca() + * if possible, so there's not really much problem anyhow, at least if + * you're building with gcc. + */ +#define YYMALLOC palloc +#define YYFREE pfree + +%} + +%expect 0 +%name-prefix="syncrep_yy" + +%union +{ + char *str; + List *list; + SyncRepConfigData *config; +} + +%token <str> NAME NUM + +%type <config> result standby_config +%type <list> standby_list +%type <str> standby_name + +%start result + +%% +result: + standby_config { syncrep_parse_result = $1; } +; +standby_config: + standby_list { $$ = create_syncrep_config("1", $1); } + | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); } +; +standby_list: + standby_name { $$ = list_make1($1);} + | standby_list ',' standby_name { $$ = lappend($1, $3);} +; +standby_name: + NAME { $$ = $1; } + | NUM { $$ = $1; } +; +%% + +static SyncRepConfigData * +create_syncrep_config(char *num_sync, List *members) +{ + SyncRepConfigData *config = + (SyncRepConfigData *) palloc(sizeof(SyncRepConfigData)); + + config->num_sync = atoi(num_sync); + config->members = members; + return config; +} + +#include "syncrep_scanner.c" diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l new file mode 100644 index 00000000000..968265e3bbb --- /dev/null +++ b/src/backend/replication/syncrep_scanner.l @@ -0,0 +1,144 @@ +%{ +/*------------------------------------------------------------------------- + * + * syncrep_scanner.l + * a lexical scanner for synchronous_standby_names + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/syncrep_scanner.l + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "lib/stringinfo.h" + +/* + * flex emits a yy_fatal_error() function that it calls in response to + * critical errors like malloc failure, file I/O errors, and detection of + * internal inconsistency. That function prints a message and calls exit(). + * Mutate it to instead call ereport(FATAL), which terminates this process. + * + * The process that causes this fatal error should be terminated. + * Otherwise it has to abandon the new setting value of + * synchronous_standby_names and keep running with the previous one + * while the other processes switch to the new one. + * This inconsistency of the setting that each process is based on + * can cause a serious problem. Though it's basically not good idea to + * use FATAL here because it can take down the postmaster, + * we should do that in order to avoid such an inconsistency. + */ +#undef fprintf +#define fprintf(file, fmt, msg) syncrep_flex_fatal(fmt, msg) + +static void +syncrep_flex_fatal(const char *fmt, const char *msg) +{ + ereport(FATAL, (errmsg_internal("%s", msg))); +} + +/* Handles to the buffer that the lexer uses internally */ +static YY_BUFFER_STATE scanbufhandle; + +static StringInfoData xdbuf; + +%} + +%option 8bit +%option never-interactive +%option nounput +%option noinput +%option noyywrap +%option warn +%option prefix="syncrep_yy" + +/* + * <xd> delimited identifiers (double-quoted identifiers) + */ +%x xd + +space [ \t\n\r\f\v] + +undquoted_start [^ ,\(\)\"] +undquoted_cont [^ ,\(\)] +undquoted_name {undquoted_start}{undquoted_cont}* +dquoted_name [^\"]+ + +/* Double-quoted string */ +dquote \" +xdstart {dquote} +xddouble {dquote}{dquote} +xdstop {dquote} +xdinside {dquoted_name} + +%% +{space}+ { /* ignore */ } +{xdstart} { + initStringInfo(&xdbuf); + BEGIN(xd); + } +<xd>{xddouble} { + appendStringInfoChar(&xdbuf, '\"'); + } +<xd>{xdinside} { + appendStringInfoString(&xdbuf, yytext); + } +<xd>{xdstop} { + yylval.str = pstrdup(xdbuf.data); + pfree(xdbuf.data); + BEGIN(INITIAL); + return NAME; + } +"," { return ','; } +"(" { return '('; } +")" { return ')'; } +[1-9][0-9]* { + yylval.str = pstrdup(yytext); + return NUM; + } +{undquoted_name} { + yylval.str = pstrdup(yytext); + return NAME; + } +%% + +void +yyerror(const char *message) +{ + ereport(IsUnderPostmaster ? DEBUG2 : LOG, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s at or near \"%s\"", message, yytext))); +} + +void +syncrep_scanner_init(const char *str) +{ + Size slen = strlen(str); + char *scanbuf; + + /* + * Might be left over after ereport() + */ + if (YY_CURRENT_BUFFER) + yy_delete_buffer(YY_CURRENT_BUFFER); + + /* + * Make a scan buffer with special termination needed by flex. + */ + scanbuf = (char *) palloc(slen + 2); + memcpy(scanbuf, str, slen); + scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR; + scanbufhandle = yy_scan_buffer(scanbuf, slen + 2); +} + +void +syncrep_scanner_finish(void) +{ + yy_delete_buffer(scanbufhandle); + scanbufhandle = NULL; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f98475cbf39..e4a0119c5e3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2751,7 +2751,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - WalSnd *sync_standby; + List *sync_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -2780,12 +2780,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standby. + * Allocate and update the config data of synchronous replication, + * and then get the currently active synchronous standbys. */ + SyncRepUpdateConfig(); LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standby = SyncRepGetSynchronousStandby(); + sync_standbys = SyncRepGetSyncStandbys(NULL); LWLockRelease(SyncRepLock); + /* + * Free the previously-allocated config data because a backend + * no longer needs it. The next call of this function needs to + * allocate and update the config data newly because the setting + * of sync replication might be changed between the calls. + */ + SyncRepFreeConfig(SyncRepConfig); + SyncRepConfig = NULL; + for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; @@ -2856,7 +2867,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (walsnd == sync_standby) + else if (list_member_int(sync_standbys, i)) values[7] = CStringGetTextDatum("sync"); else values[7] = CStringGetTextDatum("potential"); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index e48e412b0f3..b7afe6b39c4 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -3448,7 +3448,7 @@ static struct config_string ConfigureNamesString[] = { {"synchronous_standby_names", PGC_SIGHUP, REPLICATION_MASTER, - gettext_noop("List of names of potential synchronous standbys."), + gettext_noop("Number of synchronous standbys and list of names of potential synchronous ones."), NULL, GUC_LIST_INPUT }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ec4427f2d88..8da3ff14c66 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -240,7 +240,7 @@ # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep - # comma-separated list of application_name + # number of sync standbys and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c005a425836..14b56649da1 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -32,6 +32,18 @@ #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* + * Struct for the configuration of synchronous replication. + */ +typedef struct SyncRepConfigData +{ + int num_sync; /* number of sync standbys that we need to wait for */ + List *members; /* list of names of potential sync standbys */ +} SyncRepConfigData; + +extern SyncRepConfigData *syncrep_parse_result; +extern SyncRepConfigData *SyncRepConfig; + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; @@ -45,14 +57,25 @@ extern void SyncRepCleanupAtProcExit(void); extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); +/* called by wal sender and user backend */ +extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern void SyncRepUpdateConfig(void); +extern void SyncRepFreeConfig(SyncRepConfigData *config); + /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); -/* forward declaration to avoid pulling in walsender_private.h */ -struct WalSnd; -extern struct WalSnd *SyncRepGetSynchronousStandby(void); - extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); +/* + * Internal functions for parsing synchronous_standby_names grammar, + * in syncrep_gram.y and syncrep_scanner.l + */ +extern int syncrep_yyparse(void); +extern int syncrep_yylex(void); +extern void syncrep_yyerror(const char *str); +extern void syncrep_scanner_init(const char *query_string); +extern void syncrep_scanner_finish(void); + #endif /* _SYNCREP_H */ diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index dbc09b80683..7c9091819c0 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -156,7 +156,7 @@ sub mkvcbuild 'bootparse.y'); $postgres->AddFiles('src/backend/utils/misc', 'guc-file.l'); $postgres->AddFiles('src/backend/replication', 'repl_scanner.l', - 'repl_gram.y'); + 'repl_gram.y', 'syncrep_scanner.l', 'syncrep_gram.y'); $postgres->AddDefine('BUILDING_DLL'); $postgres->AddLibrary('secur32.lib'); $postgres->AddLibrary('ws2_32.lib'); |