diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/postmaster/walwriter.c | 6 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 302 | ||||
-rw-r--r-- | src/backend/tcop/postgres.c | 6 | ||||
-rw-r--r-- | src/include/replication/syncrep.h | 4 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 7 |
5 files changed, 213 insertions, 112 deletions
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index d0d7c9bebf0..e97ac63ed09 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -49,6 +49,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "postmaster/walwriter.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -216,6 +217,9 @@ WalWriterMain(void) */ PG_SETMASK(&UnBlockSig); + /* Do this once before starting the loop, then just at SIGHUP time. */ + SyncRepUpdateSyncStandbysDefined(); + /* * Loop forever */ @@ -237,6 +241,8 @@ WalWriterMain(void) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + /* update global shmem state for sync rep */ + SyncRepUpdateSyncStandbysDefined(); } if (shutdown_requested) { diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 3ef9cdd87c3..b70de3993f2 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -55,6 +55,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/guc_tables.h" @@ -65,10 +66,13 @@ bool synchronous_replication = false; /* Only set in user backends */ char *SyncRepStandbyNames; -static bool sync_standbys_defined = false; /* Is there at least one name? */ +#define SyncStandbysDefined() \ + (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') + static bool announce_next_takeover = true; static void SyncRepQueueInsert(void); +static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING @@ -83,6 +87,12 @@ static bool SyncRepQueueIsOrderedByLSN(void); /* * Wait for synchronous replication, if requested by user. + * + * Initially backends start in state SYNC_REP_NOT_WAITING and then + * change that state to SYNC_REP_WAITING before adding ourselves + * to the wait queue. During SyncRepWakeQueue() a WALSender changes + * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. + * This backend then resets its state to SYNC_REP_NOT_WAITING. */ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) @@ -95,10 +105,49 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * there are no sync replication standby names defined. * Note that those standbys don't need to be connected. */ - if (!SyncRepRequested() || !sync_standbys_defined) + if (!SyncRepRequested() || !SyncStandbysDefined()) return; Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(WalSndCtl != NULL); + + /* Reset the latch before adding ourselves to the queue. */ + ResetLatch(&MyProc->waitLatch); + + /* + * Set our waitLSN so WALSender will know when to wake us, and add + * ourselves to the queue. + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); + if (!WalSndCtl->sync_standbys_defined) + { + /* + * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is + * not set. See SyncRepUpdateSyncStandbysDefined. + */ + LWLockRelease(SyncRepLock); + return; + } + MyProc->waitLSN = XactCommitLSN; + MyProc->syncRepState = SYNC_REP_WAITING; + SyncRepQueueInsert(); + Assert(SyncRepQueueIsOrderedByLSN()); + LWLockRelease(SyncRepLock); + + /* Alter ps display to show waiting for sync rep. */ + if (update_process_title) + { + int len; + + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 32 + 1); + memcpy(new_status, old_status, len); + sprintf(new_status + len, " waiting for %X/%X", + XactCommitLSN.xlogid, XactCommitLSN.xrecoff); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting ..." */ + } /* * Wait for specified LSN to be confirmed. @@ -108,117 +157,105 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) */ for (;;) { + int syncRepState; + + /* + * Wait on latch for up to 60 seconds. This allows us to + * check for postmaster death regularly while waiting. + * Note that timeout here does not necessarily release from loop. + */ + WaitLatch(&MyProc->waitLatch, 60000000L); + + /* Must reset the latch before testing state. */ ResetLatch(&MyProc->waitLatch); /* - * Synchronous Replication state machine within user backend - * - * Initially backends start in state SYNC_REP_NOT_WAITING and then - * change that state to SYNC_REP_WAITING before adding ourselves - * to the wait queue. During SyncRepWakeQueue() a WALSender changes - * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. - * This backend then resets its state to SYNC_REP_NOT_WAITING when - * we exit normally, or SYNC_REP_MUST_DISCONNECT in abnormal cases. - * - * We read MyProc->syncRepState without SyncRepLock, which - * assumes that read access is atomic. + * Try checking the state without the lock first. There's no guarantee + * that we'll read the most up-to-date value, so if it looks like we're + * still waiting, recheck while holding the lock. But if it looks like + * we're done, we must really be done, because once walsender changes + * the state to SYNC_REP_WAIT_COMPLETE, it will never update it again, + * so we can't be seeing a stale value in that case. */ - switch (MyProc->syncRepState) + syncRepState = MyProc->syncRepState; + if (syncRepState == SYNC_REP_WAITING) { - case SYNC_REP_NOT_WAITING: - /* - * Set our waitLSN so WALSender will know when to wake us. - * We set this before we add ourselves to queue, so that - * any proc on the queue can be examined freely without - * taking a lock on each process in the queue, as long as - * they hold SyncRepLock. - */ - MyProc->waitLSN = XactCommitLSN; - MyProc->syncRepState = SYNC_REP_WAITING; - - /* - * Add to queue while holding lock. - */ - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - SyncRepQueueInsert(); - Assert(SyncRepQueueIsOrderedByLSN()); - LWLockRelease(SyncRepLock); - - /* - * Alter ps display to show waiting for sync rep. - */ - if (update_process_title) - { - int len; - - old_status = get_ps_display(&len); - new_status = (char *) palloc(len + 32 + 1); - memcpy(new_status, old_status, len); - sprintf(new_status + len, " waiting for %X/%X", - XactCommitLSN.xlogid, XactCommitLSN.xrecoff); - set_ps_display(new_status, false); - new_status[len] = '\0'; /* truncate off " waiting ..." */ - } - - break; - - case SYNC_REP_WAITING: - /* - * Check for conditions that would cause us to leave the - * wait state before the LSN has been reached. - */ - if (!PostmasterIsAlive(true)) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - SHMQueueDelete(&(MyProc->syncRepLinks)); - LWLockRelease(SyncRepLock); - - MyProc->syncRepState = SYNC_REP_MUST_DISCONNECT; - return; - } - - /* - * We don't receive SIGHUPs at this point, so resetting - * synchronous_standby_names has no effect on waiters. - */ - - /* Continue waiting */ - - break; - - case SYNC_REP_WAIT_COMPLETE: - /* - * WalSender has checked our LSN and has removed us from - * queue. Cleanup local state and leave. - */ - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); - - MyProc->syncRepState = SYNC_REP_NOT_WAITING; - MyProc->waitLSN.xlogid = 0; - MyProc->waitLSN.xrecoff = 0; - - if (new_status) - { - /* Reset ps display */ - set_ps_display(new_status, false); - pfree(new_status); - } - - return; - - case SYNC_REP_MUST_DISCONNECT: - return; - - default: - elog(FATAL, "invalid syncRepState"); + LWLockAcquire(SyncRepLock, LW_SHARED); + syncRepState = MyProc->syncRepState; + LWLockRelease(SyncRepLock); } + if (syncRepState == SYNC_REP_WAIT_COMPLETE) + break; /* - * Wait on latch for up to 60 seconds. This allows us to - * check for postmaster death regularly while waiting. - * Note that timeout here does not necessarily release from loop. + * If a wait for synchronous replication is pending, we can neither + * acknowledge the commit nor raise ERROR or FATAL. The latter + * would lead the client to believe that that the transaction + * aborted, which is not true: it's already committed locally. + * The former is no good either: the client has requested + * synchronous replication, and is entitled to assume that an + * acknowledged commit is also replicated, which may not be true. + * So in this case we issue a WARNING (which some clients may + * be able to interpret) and shut off further output. We do NOT + * reset ProcDiePending, so that the process will die after the + * commit is cleaned up. */ - WaitLatch(&MyProc->waitLatch, 60000000L); + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for replication and terminating connection due to administrator command"), + errdetail("The transaction has already been committed locally but might have not been replicated to the standby."))); + whereToSendOutput = DestNone; + SyncRepCancelWait(); + break; + } + + /* + * It's unclear what to do if a query cancel interrupt arrives. We + * can't actually abort at this point, but ignoring the interrupt + * altogether is not helpful, so we just terminate the wait with + * a suitable warning. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for synchronous replication due to user request"), + errdetail("The transaction has committed locally, but may not have replicated to the standby."))); + SyncRepCancelWait(); + break; + } + + /* + * If the postmaster dies, we'll probably never get an acknowledgement, + * because all the wal sender processes will exit. So just bail out. + */ + if (!PostmasterIsAlive(true)) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + SyncRepCancelWait(); + break; + } + } + + /* + * WalSender has checked our LSN and has removed us from queue. Clean up + * state and leave. It's OK to reset these shared memory fields without + * holding SyncRepLock, because any walsenders will ignore us anyway when + * we're not on the queue. + */ + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + if (new_status) + { + /* Reset ps display */ + set_ps_display(new_status, false); + pfree(new_status); } } @@ -257,6 +294,19 @@ SyncRepQueueInsert(void) SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks)); } +/* + * Acquire SyncRepLock and cancel any wait currently in progress. + */ +static void +SyncRepCancelWait(void) +{ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) + SHMQueueDelete(&(MyProc->syncRepLinks)); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + LWLockRelease(SyncRepLock); +} + void SyncRepCleanupAtProcExit(int code, Datum arg) { @@ -506,6 +556,43 @@ SyncRepWakeQueue(bool all) return numprocs; } +/* + * WAL writer calls this as needed to update the shared sync_standbys_defined + * flag, so that backends don't remain permanently wedged if + * synchronous_standby_names is unset. It's safe to check the current value + * without the lock, because it's only ever updated by one process. But we + * must take the lock to change it. + */ +void +SyncRepUpdateSyncStandbysDefined(void) +{ + bool sync_standbys_defined = SyncStandbysDefined(); + + if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * If synchronous_standby_names has been reset to empty, it's futile + * for backends to continue to waiting. Since the user no longer + * wants synchronous replication, we'd better wake them up. + */ + if (!sync_standbys_defined) + SyncRepWakeQueue(true); + + /* + * Only allow people to join the queue when there are synchronous + * standbys defined. Without this interlock, there's a race + * condition: we might wake up all the current waiters; then, some + * backend that hasn't yet reloaded its config might go to sleep on + * the queue (and never wake up). This prevents that. + */ + WalSndCtl->sync_standbys_defined = sync_standbys_defined; + + LWLockRelease(SyncRepLock); + } +} + #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(void) @@ -568,13 +655,6 @@ assign_synchronous_standby_names(const char *newval, bool doit, GucSource source } /* - * Is there at least one sync standby? If so cache this knowledge to - * improve performance of SyncRepWaitForLSN() for all-async configs. - */ - if (doit && list_length(elemlist) > 0) - sync_standbys_defined = true; - - /* * Any additional validation of standby names should go here. * * Don't attempt to set WALSender priority because this is executed by diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 39b7b5b678e..79c0f68966e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2643,6 +2643,9 @@ die(SIGNAL_ARGS) InterruptHoldoffCount--; ProcessInterrupts(); } + + /* Interrupt any sync rep wait which is currently in progress. */ + SetLatch(&(MyProc->waitLatch)); } errno = save_errno; @@ -2681,6 +2684,9 @@ StatementCancelHandler(SIGNAL_ARGS) InterruptHoldoffCount--; ProcessInterrupts(); } + + /* Interrupt any sync rep wait which is currently in progress. */ + SetLatch(&(MyProc->waitLatch)); } errno = save_errno; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 9171eb61766..188ec65745c 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -26,7 +26,6 @@ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 -#define SYNC_REP_MUST_DISCONNECT 3 /* user-settable parameters for synchronous replication */ extern bool synchronous_replication; @@ -42,6 +41,9 @@ extern void SyncRepCleanupAtProcExit(int code, Datum arg); extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); +/* called by wal writer */ +extern void SyncRepUpdateSyncStandbysDefined(void); + /* called by various procs */ extern int SyncRepWakeQueue(bool all); extern const char *assign_synchronous_standby_names(const char *newval, bool doit, GucSource source); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 2e5b2096ead..150a71fdddf 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -78,6 +78,13 @@ typedef struct */ XLogRecPtr lsn; + /* + * Are any sync standbys defined? Waiting backends can't reload the + * config file safely, so WAL writer updates this value as needed. + * Protected by SyncRepLock. + */ + bool sync_standbys_defined; + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; |