aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/Makefile4
-rw-r--r--src/backend/replication/.gitignore2
-rw-r--r--src/backend/replication/Makefile11
-rw-r--r--src/backend/replication/syncrep.c549
-rw-r--r--src/backend/replication/syncrep_gram.y86
-rw-r--r--src/backend/replication/syncrep_scanner.l144
-rw-r--r--src/backend/replication/walsender.c19
-rw-r--r--src/backend/utils/misc/guc.c2
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
-rw-r--r--src/include/replication/syncrep.h31
-rw-r--r--src/tools/msvc/Mkvcbuild.pm2
11 files changed, 706 insertions, 146 deletions
diff --git a/src/backend/Makefile b/src/backend/Makefile
index d22dbbf53b6..ec2dc7be404 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -203,7 +203,7 @@ distprep:
$(MAKE) -C parser gram.c gram.h scan.c
$(MAKE) -C bootstrap bootparse.c bootscanner.c
$(MAKE) -C catalog schemapg.h postgres.bki postgres.description postgres.shdescription
- $(MAKE) -C replication repl_gram.c repl_scanner.c
+ $(MAKE) -C replication repl_gram.c repl_scanner.c syncrep_gram.c syncrep_scanner.c
$(MAKE) -C storage/lmgr lwlocknames.h
$(MAKE) -C utils fmgrtab.c fmgroids.h errcodes.h
$(MAKE) -C utils/misc guc-file.c
@@ -320,6 +320,8 @@ maintainer-clean: distclean
catalog/postgres.shdescription \
replication/repl_gram.c \
replication/repl_scanner.c \
+ replication/syncrep_gram.c \
+ replication/syncrep_scanner.c \
storage/lmgr/lwlocknames.c \
storage/lmgr/lwlocknames.h \
utils/fmgroids.h \
diff --git a/src/backend/replication/.gitignore b/src/backend/replication/.gitignore
index 2a0491d1496..d1df6147bd0 100644
--- a/src/backend/replication/.gitignore
+++ b/src/backend/replication/.gitignore
@@ -1,2 +1,4 @@
/repl_gram.c
/repl_scanner.c
+/syncrep_gram.c
+/syncrep_scanner.c
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index b73370eaa98..c99717e0aee 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
- repl_gram.o slot.o slotfuncs.o syncrep.o
+ repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
SUBDIRS = logical
@@ -24,5 +24,10 @@ include $(top_srcdir)/src/backend/common.mk
# repl_scanner is compiled as part of repl_gram
repl_gram.o: repl_scanner.c
-# repl_gram.c and repl_scanner.c are in the distribution tarball, so
-# they are not cleaned here.
+# syncrep_scanner is complied as part of syncrep_gram
+syncrep_gram.o: syncrep_scanner.c
+syncrep_scanner.c: FLEXFLAGS = -CF -p
+syncrep_scanner.c: FLEX_NO_BACKUP=yes
+
+# repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
+# are in the distribution tarball, so they are not cleaned here.
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 12c0c3bee3e..d454e7f3683 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -4,8 +4,8 @@
*
* Synchronous replication is new as of PostgreSQL 9.1.
*
- * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the synchronous standby.
+ * If requested, transaction commits wait until their commit LSN are
+ * acknowledged by the synchronous standbys.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
@@ -13,7 +13,7 @@
*
* The essence of this design is that it isolates all logic about
* waiting/releasing onto the primary. The primary defines which standbys
- * it wishes to wait for. The standby is completely unaware of the
+ * it wishes to wait for. The standbys are completely unaware of the
* durability requirements of transactions on the primary, reducing the
* complexity of the code and streamlining both standby operations and
* network bandwidth because there is no requirement to ship
@@ -21,19 +21,32 @@
*
* Replication is either synchronous or not synchronous (async). If it is
* async, we just fastpath out of here. If it is sync, then we wait for
- * the write or flush location on the standby before releasing the waiting
- * backend. Further complexity in that interaction is expected in later
- * releases.
+ * the write, flush or apply location on the standby before releasing
+ * the waiting backend. Further complexity in that interaction is
+ * expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
* single ordered queue of waiting backends, so that we can avoid
* searching the through all waiters each time we receive a reply.
*
- * In 9.1 we support only a single synchronous standby, chosen from a
- * priority list of synchronous_standby_names. Before it can become the
- * synchronous standby it must have caught up with the primary; that may
- * take some time. Once caught up, the current highest priority standby
- * will release waiters from the queue.
+ * In 9.5 or before only a single standby could be considered as
+ * synchronous. In 9.6 we support multiple synchronous standbys.
+ * The number of synchronous standbys that transactions must wait for
+ * replies from is specified in synchronous_standby_names.
+ * This parameter also specifies a list of standby names,
+ * which determines the priority of each standby for being chosen as
+ * a synchronous standby. The standbys whose names appear earlier
+ * in the list are given higher priority and will be considered as
+ * synchronous. Other standby servers appearing later in this list
+ * represent potential synchronous standbys. If any of the current
+ * synchronous standbys disconnects for whatever reason, it will be
+ * replaced immediately with the next-highest-priority standby.
+ *
+ * Before the standbys chosen from synchronous_standby_names can
+ * become the synchronous standbys they must have caught up with
+ * the primary; that may take some time. Once caught up,
+ * the current higher priority standbys which are considered as
+ * synchronous at that moment will release waiters from the queue.
*
* Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
@@ -65,12 +78,17 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
+SyncRepConfigData *SyncRepConfig;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
+static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ bool *am_sync);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
@@ -343,6 +361,11 @@ SyncRepInitConfig(void)
{
int priority;
+ /* Update the config data of synchronous replication */
+ SyncRepFreeConfig(SyncRepConfig);
+ SyncRepConfig = NULL;
+ SyncRepUpdateConfig();
+
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
@@ -360,62 +383,8 @@ SyncRepInitConfig(void)
}
/*
- * Find the WAL sender servicing the synchronous standby with the lowest
- * priority value, or NULL if no synchronous standby is connected. If there
- * are multiple standbys with the same lowest priority value, the first one
- * found is selected. The caller must hold SyncRepLock.
- */
-WalSnd *
-SyncRepGetSynchronousStandby(void)
-{
- WalSnd *result = NULL;
- int result_priority = 0;
- int i;
-
- for (i = 0; i < max_wal_senders; i++)
- {
- /* Use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- int this_priority;
-
- /* Must be active */
- if (walsnd->pid == 0)
- continue;
-
- /* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
- continue;
-
- /* Must be synchronous */
- this_priority = walsnd->sync_standby_priority;
- if (this_priority == 0)
- continue;
-
- /* Must have a lower priority value than any previous ones */
- if (result != NULL && result_priority <= this_priority)
- continue;
-
- /* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
- continue;
-
- result = (WalSnd *) walsnd;
- result_priority = this_priority;
-
- /*
- * If priority is equal to 1, there cannot be any other WAL senders
- * with a lower priority, so we're done.
- */
- if (this_priority == 1)
- return result;
- }
-
- return result;
-}
-
-/*
* Update the LSNs on each queue based upon our latest state. This
- * implements a simple policy of first-valid-standby-releases-waiter.
+ * implements a simple policy of first-valid-sync-standby-releases-waiter.
*
* Other policies are possible, which would change what we do here and
* perhaps also which information we store as well.
@@ -424,7 +393,11 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- WalSnd *syncWalSnd;
+ XLogRecPtr writePtr;
+ XLogRecPtr flushPtr;
+ XLogRecPtr applyPtr;
+ bool got_oldest;
+ bool am_sync;
int numwrite = 0;
int numflush = 0;
int numapply = 0;
@@ -438,25 +411,44 @@ SyncRepReleaseWaiters(void)
if (MyWalSnd->sync_standby_priority == 0 ||
MyWalSnd->state < WALSNDSTATE_STREAMING ||
XLogRecPtrIsInvalid(MyWalSnd->flush))
+ {
+ announce_next_takeover = true;
return;
+ }
/*
- * We're a potential sync standby. Release waiters if we are the highest
- * priority standby.
+ * We're a potential sync standby. Release waiters if there are
+ * enough sync standbys and we are considered as sync.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- syncWalSnd = SyncRepGetSynchronousStandby();
- /* We should have found ourselves at least */
- Assert(syncWalSnd != NULL);
+ /*
+ * Check whether we are a sync standby or not, and calculate
+ * the oldest positions among all sync standbys.
+ */
+ got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
+ &applyPtr, &am_sync);
+
+ /*
+ * If we are managing a sync standby, though we weren't
+ * prior to this, then announce we are now a sync standby.
+ */
+ if (announce_next_takeover && am_sync)
+ {
+ announce_next_takeover = false;
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
+ application_name, MyWalSnd->sync_standby_priority)));
+ }
/*
- * If we aren't managing the highest priority standby then just leave.
+ * If the number of sync standbys is less than requested or we aren't
+ * managing a sync standby then just leave.
*/
- if (syncWalSnd != MyWalSnd)
+ if (!got_oldest || !am_sync)
{
LWLockRelease(SyncRepLock);
- announce_next_takeover = true;
+ announce_next_takeover = !am_sync;
return;
}
@@ -464,40 +456,267 @@ SyncRepReleaseWaiters(void)
* Set the lsn first so that when we wake backends they will release up to
* this location.
*/
- if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+ if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+ walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
- if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
+ if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+ walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
- if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
+ if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
{
- walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
+ walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
}
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
- numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
- numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
- numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
+ numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
+ numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
+}
+
+/*
+ * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ *
+ * Return false if the number of sync standbys is less than
+ * synchronous_standby_names specifies. Otherwise return true and
+ * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static bool
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr, bool *am_sync)
+{
+ List *sync_standbys;
+ ListCell *cell;
+
+ *writePtr = InvalidXLogRecPtr;
+ *flushPtr = InvalidXLogRecPtr;
+ *applyPtr = InvalidXLogRecPtr;
+ *am_sync = false;
+
+ /* Get standbys that are considered as synchronous at this moment */
+ sync_standbys = SyncRepGetSyncStandbys(am_sync);
/*
- * If we are managing the highest priority standby, though we weren't
- * prior to this, then announce we are now the sync standby.
+ * Quick exit if we are not managing a sync standby or there are not
+ * enough synchronous standbys.
*/
- if (announce_next_takeover)
+ if (!(*am_sync) || list_length(sync_standbys) < SyncRepConfig->num_sync)
{
- announce_next_takeover = false;
- ereport(LOG,
- (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
- application_name, MyWalSnd->sync_standby_priority)));
+ list_free(sync_standbys);
+ return false;
+ }
+
+ /*
+ * Scan through all sync standbys and calculate the oldest
+ * Write, Flush and Apply positions.
+ */
+ foreach (cell, sync_standbys)
+ {
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+
+ SpinLockAcquire(&walsnd->mutex);
+ write = walsnd->write;
+ flush = walsnd->flush;
+ apply = walsnd->apply;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
+ *writePtr = write;
+ if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
+ *flushPtr = flush;
+ if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
+ *applyPtr = apply;
+ }
+
+ list_free(sync_standbys);
+ return true;
+}
+
+/*
+ * Return the list of sync standbys, or NIL if no sync standby is connected.
+ *
+ * If there are multiple standbys with the same priority,
+ * the first one found is selected preferentially.
+ * The caller must hold SyncRepLock.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+List *
+SyncRepGetSyncStandbys(bool *am_sync)
+{
+ List *result = NIL;
+ List *pending = NIL;
+ int lowest_priority;
+ int next_highest_priority;
+ int this_priority;
+ int priority;
+ int i;
+ bool am_in_pending = false;
+ volatile WalSnd *walsnd; /* Use volatile pointer to prevent
+ * code rearrangement */
+
+ /* Quick exit if sync replication is not requested */
+ if (SyncRepConfig == NULL)
+ return NIL;
+
+ if (am_sync != NULL)
+ *am_sync = false;
+
+ lowest_priority = list_length(SyncRepConfig->members);
+ next_highest_priority = lowest_priority + 1;
+
+ /*
+ * Find the sync standbys which have the highest priority (i.e, 1).
+ * Also store all the other potential sync standbys into the pending list,
+ * in order to scan it later and find other sync standbys from it quickly.
+ */
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ walsnd = &WalSndCtl->walsnds[i];
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == 0)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ /*
+ * If the priority is equal to 1, consider this standby as sync
+ * and append it to the result. Otherwise append this standby
+ * to the pending list to check if it's actually sync or not later.
+ */
+ if (this_priority == 1)
+ {
+ result = lappend_int(result, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ *am_sync = true;
+ if (list_length(result) == SyncRepConfig->num_sync)
+ {
+ list_free(pending);
+ return result; /* Exit if got enough sync standbys */
+ }
+ }
+ else
+ {
+ pending = lappend_int(pending, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ am_in_pending = true;
+
+ /*
+ * Track the highest priority among the standbys in the pending
+ * list, in order to use it as the starting priority for later scan
+ * of the list. This is useful to find quickly the sync standbys
+ * from the pending list later because we can skip unnecessary
+ * scans for the unused priorities.
+ */
+ if (this_priority < next_highest_priority)
+ next_highest_priority = this_priority;
+ }
+ }
+
+ /*
+ * Consider all pending standbys as sync if the number of them plus
+ * already-found sync ones is lower than the configuration requests.
+ */
+ if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
+ {
+ bool needfree = (result != NIL && pending != NIL);
+
+ /*
+ * Set *am_sync to true if this walsender is in the pending list
+ * because all pending standbys are considered as sync.
+ */
+ if (am_sync != NULL && !(*am_sync))
+ *am_sync = am_in_pending;
+
+ result = list_concat(result, pending);
+ if (needfree)
+ pfree(pending);
+ return result;
+ }
+
+ /*
+ * Find the sync standbys from the pending list.
+ */
+ priority = next_highest_priority;
+ while (priority <= lowest_priority)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ ListCell *next;
+
+ next_highest_priority = lowest_priority + 1;
+
+ for (cell = list_head(pending); cell != NULL; cell = next)
+ {
+ i = lfirst_int(cell);
+ walsnd = &WalSndCtl->walsnds[i];
+
+ next = lnext(cell);
+
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == priority)
+ {
+ result = lappend_int(result, i);
+ if (am_sync != NULL && walsnd == MyWalSnd)
+ *am_sync = true;
+
+ /*
+ * We should always exit here after the scan of pending list
+ * starts because we know that the list has enough elements
+ * to reach SyncRepConfig->num_sync.
+ */
+ if (list_length(result) == SyncRepConfig->num_sync)
+ {
+ list_free(pending);
+ return result; /* Exit if got enough sync standbys */
+ }
+
+ /*
+ * Remove the entry for this sync standby from the list
+ * to prevent us from looking at the same entry again.
+ */
+ pending = list_delete_cell(pending, cell, prev);
+
+ continue;
+ }
+
+ if (this_priority < next_highest_priority)
+ next_highest_priority = this_priority;
+
+ prev = cell;
+ }
+
+ priority = next_highest_priority;
}
+
+ /* never reached, but keep compiler quiet */
+ Assert(false);
+ return result;
}
/*
@@ -511,8 +730,7 @@ SyncRepReleaseWaiters(void)
static int
SyncRepGetStandbyPriority(void)
{
- char *rawstring;
- List *elemlist;
+ List *members;
ListCell *l;
int priority = 0;
bool found = false;
@@ -524,20 +742,11 @@ SyncRepGetStandbyPriority(void)
if (am_cascading_walsender)
return 0;
- /* Need a modifiable copy of string */
- rawstring = pstrdup(SyncRepStandbyNames);
-
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- pfree(rawstring);
- list_free(elemlist);
- /* GUC machinery will have already complained - no need to do again */
+ if (!SyncStandbysDefined())
return 0;
- }
- foreach(l, elemlist)
+ members = SyncRepConfig->members;
+ foreach(l, members)
{
char *standby_name = (char *) lfirst(l);
@@ -551,9 +760,6 @@ SyncRepGetStandbyPriority(void)
}
}
- pfree(rawstring);
- list_free(elemlist);
-
return (found ? priority : 0);
}
@@ -661,6 +867,45 @@ SyncRepUpdateSyncStandbysDefined(void)
}
}
+/*
+ * Parse synchronous_standby_names and update the config data
+ * of synchronous standbys.
+ */
+void
+SyncRepUpdateConfig(void)
+{
+ int parse_rc;
+
+ if (!SyncStandbysDefined())
+ return;
+
+ /*
+ * check_synchronous_standby_names() verifies the setting value of
+ * synchronous_standby_names before this function is called. So
+ * syncrep_yyparse() must not cause an error here.
+ */
+ syncrep_scanner_init(SyncRepStandbyNames);
+ parse_rc = syncrep_yyparse();
+ Assert(parse_rc == 0);
+ syncrep_scanner_finish();
+
+ SyncRepConfig = syncrep_parse_result;
+ syncrep_parse_result = NULL;
+}
+
+/*
+ * Free a previously-allocated config data of synchronous replication.
+ */
+void
+SyncRepFreeConfig(SyncRepConfigData *config)
+{
+ if (!config)
+ return;
+
+ list_free_deep(config->members);
+ pfree(config);
+}
+
#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(int mode)
@@ -705,32 +950,74 @@ SyncRepQueueIsOrderedByLSN(int mode)
bool
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{
- char *rawstring;
- List *elemlist;
-
- /* Need a modifiable copy of string */
- rawstring = pstrdup(*newval);
+ int parse_rc;
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ if (*newval != NULL && (*newval)[0] != '\0')
{
- /* syntax error in list */
- GUC_check_errdetail("List syntax is invalid.");
- pfree(rawstring);
- list_free(elemlist);
- return false;
- }
+ syncrep_scanner_init(*newval);
+ parse_rc = syncrep_yyparse();
+ syncrep_scanner_finish();
- /*
- * Any additional validation of standby names should go here.
- *
- * Don't attempt to set WALSender priority because this is executed by
- * postmaster at startup, not WALSender, so the application_name is not
- * yet correctly set.
- */
+ if (parse_rc != 0)
+ {
+ GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+ GUC_check_errdetail("synchronous_standby_names parser returned %d",
+ parse_rc);
+ return false;
+ }
+
+ /*
+ * Warn if num_sync exceeds the number of names of potential sync
+ * standbys. This setting doesn't make sense in most cases because
+ * it implies that enough number of sync standbys will not appear,
+ * which makes transaction commits wait for sync replication
+ * infinitely.
+ *
+ * If there are more than one standbys having the same name and
+ * priority, we can see enough sync standbys to complete transaction
+ * commits. However it's not recommended to run multiple standbys
+ * with the same priority because we cannot gain full control of
+ * the selection of sync standbys from them.
+ *
+ * OTOH, that setting is OK if we understand the above problem
+ * regarding the selection of sync standbys and intentionally
+ * specify * to match all the standbys.
+ */
+ if (syncrep_parse_result->num_sync >
+ list_length(syncrep_parse_result->members))
+ {
+ ListCell *l;
+ bool has_asterisk = false;
+
+ foreach(l, syncrep_parse_result->members)
+ {
+ char *standby_name = (char *) lfirst(l);
+
+ if (pg_strcasecmp(standby_name, "*") == 0)
+ {
+ has_asterisk = true;
+ break;
+ }
+ }
+
+ /*
+ * Only the postmaster warns this inappropriate setting
+ * to avoid cluttering the log.
+ */
+ if (!has_asterisk && !IsUnderPostmaster)
+ ereport(WARNING,
+ (errmsg("The configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)",
+ syncrep_parse_result->num_sync, list_length(syncrep_parse_result->members)),
+ errhint("Specify more names of potential synchronous standbys in synchronous_standby_names.")));
+ }
- pfree(rawstring);
- list_free(elemlist);
+ /*
+ * syncrep_yyparse sets the global syncrep_parse_result as side effect.
+ * But this function is required to just check, so frees it
+ * after parsing the parameter.
+ */
+ SyncRepFreeConfig(syncrep_parse_result);
+ }
return true;
}
diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y
new file mode 100644
index 00000000000..380fedc1712
--- /dev/null
+++ b/src/backend/replication/syncrep_gram.y
@@ -0,0 +1,86 @@
+%{
+/*-------------------------------------------------------------------------
+ *
+ * syncrep_gram.y - Parser for synchronous_standby_names
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/syncrep_gram.y
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/syncrep.h"
+#include "utils/formatting.h"
+
+/* Result of the parsing is returned here */
+SyncRepConfigData *syncrep_parse_result;
+
+static SyncRepConfigData *create_syncrep_config(char *num_sync, List *members);
+
+/*
+ * Bison doesn't allocate anything that needs to live across parser calls,
+ * so we can easily have it use palloc instead of malloc. This prevents
+ * memory leaks if we error out during parsing. Note this only works with
+ * bison >= 2.0. However, in bison 1.875 the default is to use alloca()
+ * if possible, so there's not really much problem anyhow, at least if
+ * you're building with gcc.
+ */
+#define YYMALLOC palloc
+#define YYFREE pfree
+
+%}
+
+%expect 0
+%name-prefix="syncrep_yy"
+
+%union
+{
+ char *str;
+ List *list;
+ SyncRepConfigData *config;
+}
+
+%token <str> NAME NUM
+
+%type <config> result standby_config
+%type <list> standby_list
+%type <str> standby_name
+
+%start result
+
+%%
+result:
+ standby_config { syncrep_parse_result = $1; }
+;
+standby_config:
+ standby_list { $$ = create_syncrep_config("1", $1); }
+ | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); }
+;
+standby_list:
+ standby_name { $$ = list_make1($1);}
+ | standby_list ',' standby_name { $$ = lappend($1, $3);}
+;
+standby_name:
+ NAME { $$ = $1; }
+ | NUM { $$ = $1; }
+;
+%%
+
+static SyncRepConfigData *
+create_syncrep_config(char *num_sync, List *members)
+{
+ SyncRepConfigData *config =
+ (SyncRepConfigData *) palloc(sizeof(SyncRepConfigData));
+
+ config->num_sync = atoi(num_sync);
+ config->members = members;
+ return config;
+}
+
+#include "syncrep_scanner.c"
diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l
new file mode 100644
index 00000000000..968265e3bbb
--- /dev/null
+++ b/src/backend/replication/syncrep_scanner.l
@@ -0,0 +1,144 @@
+%{
+/*-------------------------------------------------------------------------
+ *
+ * syncrep_scanner.l
+ * a lexical scanner for synchronous_standby_names
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/syncrep_scanner.l
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "lib/stringinfo.h"
+
+/*
+ * flex emits a yy_fatal_error() function that it calls in response to
+ * critical errors like malloc failure, file I/O errors, and detection of
+ * internal inconsistency. That function prints a message and calls exit().
+ * Mutate it to instead call ereport(FATAL), which terminates this process.
+ *
+ * The process that causes this fatal error should be terminated.
+ * Otherwise it has to abandon the new setting value of
+ * synchronous_standby_names and keep running with the previous one
+ * while the other processes switch to the new one.
+ * This inconsistency of the setting that each process is based on
+ * can cause a serious problem. Though it's basically not good idea to
+ * use FATAL here because it can take down the postmaster,
+ * we should do that in order to avoid such an inconsistency.
+ */
+#undef fprintf
+#define fprintf(file, fmt, msg) syncrep_flex_fatal(fmt, msg)
+
+static void
+syncrep_flex_fatal(const char *fmt, const char *msg)
+{
+ ereport(FATAL, (errmsg_internal("%s", msg)));
+}
+
+/* Handles to the buffer that the lexer uses internally */
+static YY_BUFFER_STATE scanbufhandle;
+
+static StringInfoData xdbuf;
+
+%}
+
+%option 8bit
+%option never-interactive
+%option nounput
+%option noinput
+%option noyywrap
+%option warn
+%option prefix="syncrep_yy"
+
+/*
+ * <xd> delimited identifiers (double-quoted identifiers)
+ */
+%x xd
+
+space [ \t\n\r\f\v]
+
+undquoted_start [^ ,\(\)\"]
+undquoted_cont [^ ,\(\)]
+undquoted_name {undquoted_start}{undquoted_cont}*
+dquoted_name [^\"]+
+
+/* Double-quoted string */
+dquote \"
+xdstart {dquote}
+xddouble {dquote}{dquote}
+xdstop {dquote}
+xdinside {dquoted_name}
+
+%%
+{space}+ { /* ignore */ }
+{xdstart} {
+ initStringInfo(&xdbuf);
+ BEGIN(xd);
+ }
+<xd>{xddouble} {
+ appendStringInfoChar(&xdbuf, '\"');
+ }
+<xd>{xdinside} {
+ appendStringInfoString(&xdbuf, yytext);
+ }
+<xd>{xdstop} {
+ yylval.str = pstrdup(xdbuf.data);
+ pfree(xdbuf.data);
+ BEGIN(INITIAL);
+ return NAME;
+ }
+"," { return ','; }
+"(" { return '('; }
+")" { return ')'; }
+[1-9][0-9]* {
+ yylval.str = pstrdup(yytext);
+ return NUM;
+ }
+{undquoted_name} {
+ yylval.str = pstrdup(yytext);
+ return NAME;
+ }
+%%
+
+void
+yyerror(const char *message)
+{
+ ereport(IsUnderPostmaster ? DEBUG2 : LOG,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s at or near \"%s\"", message, yytext)));
+}
+
+void
+syncrep_scanner_init(const char *str)
+{
+ Size slen = strlen(str);
+ char *scanbuf;
+
+ /*
+ * Might be left over after ereport()
+ */
+ if (YY_CURRENT_BUFFER)
+ yy_delete_buffer(YY_CURRENT_BUFFER);
+
+ /*
+ * Make a scan buffer with special termination needed by flex.
+ */
+ scanbuf = (char *) palloc(slen + 2);
+ memcpy(scanbuf, str, slen);
+ scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
+ scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
+}
+
+void
+syncrep_scanner_finish(void)
+{
+ yy_delete_buffer(scanbufhandle);
+ scanbufhandle = NULL;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f98475cbf39..e4a0119c5e3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2751,7 +2751,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- WalSnd *sync_standby;
+ List *sync_standbys;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -2780,12 +2780,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
/*
- * Get the currently active synchronous standby.
+ * Allocate and update the config data of synchronous replication,
+ * and then get the currently active synchronous standbys.
*/
+ SyncRepUpdateConfig();
LWLockAcquire(SyncRepLock, LW_SHARED);
- sync_standby = SyncRepGetSynchronousStandby();
+ sync_standbys = SyncRepGetSyncStandbys(NULL);
LWLockRelease(SyncRepLock);
+ /*
+ * Free the previously-allocated config data because a backend
+ * no longer needs it. The next call of this function needs to
+ * allocate and update the config data newly because the setting
+ * of sync replication might be changed between the calls.
+ */
+ SyncRepFreeConfig(SyncRepConfig);
+ SyncRepConfig = NULL;
+
for (i = 0; i < max_wal_senders; i++)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
@@ -2856,7 +2867,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
if (priority == 0)
values[7] = CStringGetTextDatum("async");
- else if (walsnd == sync_standby)
+ else if (list_member_int(sync_standbys, i))
values[7] = CStringGetTextDatum("sync");
else
values[7] = CStringGetTextDatum("potential");
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e48e412b0f3..b7afe6b39c4 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3448,7 +3448,7 @@ static struct config_string ConfigureNamesString[] =
{
{"synchronous_standby_names", PGC_SIGHUP, REPLICATION_MASTER,
- gettext_noop("List of names of potential synchronous standbys."),
+ gettext_noop("Number of synchronous standbys and list of names of potential synchronous ones."),
NULL,
GUC_LIST_INPUT
},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ec4427f2d88..8da3ff14c66 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -240,7 +240,7 @@
# These settings are ignored on a standby server.
#synchronous_standby_names = '' # standby servers that provide sync rep
- # comma-separated list of application_name
+ # number of sync standbys and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c005a425836..14b56649da1 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -32,6 +32,18 @@
#define SYNC_REP_WAITING 1
#define SYNC_REP_WAIT_COMPLETE 2
+/*
+ * Struct for the configuration of synchronous replication.
+ */
+typedef struct SyncRepConfigData
+{
+ int num_sync; /* number of sync standbys that we need to wait for */
+ List *members; /* list of names of potential sync standbys */
+} SyncRepConfigData;
+
+extern SyncRepConfigData *syncrep_parse_result;
+extern SyncRepConfigData *SyncRepConfig;
+
/* user-settable parameters for synchronous replication */
extern char *SyncRepStandbyNames;
@@ -45,14 +57,25 @@ extern void SyncRepCleanupAtProcExit(void);
extern void SyncRepInitConfig(void);
extern void SyncRepReleaseWaiters(void);
+/* called by wal sender and user backend */
+extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern void SyncRepUpdateConfig(void);
+extern void SyncRepFreeConfig(SyncRepConfigData *config);
+
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
-/* forward declaration to avoid pulling in walsender_private.h */
-struct WalSnd;
-extern struct WalSnd *SyncRepGetSynchronousStandby(void);
-
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
extern void assign_synchronous_commit(int newval, void *extra);
+/*
+ * Internal functions for parsing synchronous_standby_names grammar,
+ * in syncrep_gram.y and syncrep_scanner.l
+ */
+extern int syncrep_yyparse(void);
+extern int syncrep_yylex(void);
+extern void syncrep_yyerror(const char *str);
+extern void syncrep_scanner_init(const char *query_string);
+extern void syncrep_scanner_finish(void);
+
#endif /* _SYNCREP_H */
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index dbc09b80683..7c9091819c0 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -156,7 +156,7 @@ sub mkvcbuild
'bootparse.y');
$postgres->AddFiles('src/backend/utils/misc', 'guc-file.l');
$postgres->AddFiles('src/backend/replication', 'repl_scanner.l',
- 'repl_gram.y');
+ 'repl_gram.y', 'syncrep_scanner.l', 'syncrep_gram.y');
$postgres->AddDefine('BUILDING_DLL');
$postgres->AddLibrary('secur32.lib');
$postgres->AddLibrary('ws2_32.lib');