aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c11
-rw-r--r--src/backend/replication/walsender.c187
-rw-r--r--src/backend/storage/ipc/procsignal.c4
-rw-r--r--src/include/replication/walsender.h3
-rw-r--r--src/include/replication/walsender_private.h3
-rw-r--r--src/include/storage/procsignal.h1
6 files changed, 181 insertions, 28 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 35ee7d1cb6a..70d2570dc2c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8324,6 +8324,17 @@ ShutdownXLOG(int code, Datum arg)
ereport(IsPostmasterEnvironment ? LOG : NOTICE,
(errmsg("shutting down")));
+ /*
+ * Signal walsenders to move to stopping state.
+ */
+ WalSndInitStopping();
+
+ /*
+ * Wait for WAL senders to be in stopping state. This prevents commands
+ * from writing new WAL.
+ */
+ WalSndWaitStopping();
+
if (RecoveryInProgress())
CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
else
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 27aa3e6bc78..c9a6f7019d8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -24,11 +24,17 @@
* are treated as not a crash but approximately normal termination;
* the walsender will exit quickly without sending any more XLOG records.
*
- * If the server is shut down, postmaster sends us SIGUSR2 after all
- * regular backends have exited and the shutdown checkpoint has been written.
- * This instructs walsender to send any outstanding WAL, including the
- * shutdown checkpoint record, wait for it to be replicated to the standby,
- * and then exit.
+ * If the server is shut down, checkpointer sends us
+ * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
+ * the backend is idle or runs an SQL query this causes the backend to
+ * shutdown, if logical replication is in progress all existing WAL records
+ * are processed followed by a shutdown. Otherwise this causes the walsender
+ * to switch to the "stopping" state. In this state, the walsender will reject
+ * any further replication commands. The checkpointer begins the shutdown
+ * checkpoint once all walsenders are confirmed as stopping. When the shutdown
+ * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
+ * walsender to send any outstanding WAL, including the shutdown checkpoint
+ * record, wait for it to be replicated to the standby, and then exit.
*
*
* Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
@@ -177,13 +183,14 @@ static bool WalSndCaughtUp = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
-static volatile sig_atomic_t walsender_ready_to_stop = false;
+static volatile sig_atomic_t got_SIGUSR2 = false;
+static volatile sig_atomic_t got_STOPPING = false;
/*
- * This is set while we are streaming. When not set, SIGUSR2 signal will be
- * handled like SIGTERM. When set, the main loop is responsible for checking
- * walsender_ready_to_stop and terminating when it's set (after streaming any
- * remaining WAL).
+ * This is set while we are streaming. When not set
+ * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
+ * the main loop is responsible for checking got_STOPPING and terminating when
+ * it's set (after streaming any remaining WAL).
*/
static volatile sig_atomic_t replication_active = false;
@@ -300,7 +307,8 @@ WalSndErrorCleanup(void)
ReplicationSlotCleanup();
replication_active = false;
- if (walsender_ready_to_stop)
+
+ if (got_STOPPING || got_SIGUSR2)
proc_exit(0);
/* Revert back to startup state */
@@ -677,7 +685,7 @@ StartReplication(StartReplicationCmd *cmd)
WalSndLoop(XLogSendPhysical);
replication_active = false;
- if (walsender_ready_to_stop)
+ if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
@@ -1055,7 +1063,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
{
ereport(LOG,
(errmsg("terminating walsender process after promotion")));
- walsender_ready_to_stop = true;
+ got_STOPPING = true;
}
WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -1106,7 +1114,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
ReplicationSlotRelease();
replication_active = false;
- if (walsender_ready_to_stop)
+ if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
@@ -1311,6 +1319,14 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Check for input from the client */
ProcessRepliesIfAny();
+ /*
+ * If we're shutting down, trigger pending WAL to be written out,
+ * otherwise we'd possibly end up waiting for WAL that never gets
+ * written, because walwriter has shut down already.
+ */
+ if (got_STOPPING)
+ XLogBackgroundFlush();
+
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr();
@@ -1326,7 +1342,7 @@ WalSndWaitForWal(XLogRecPtr loc)
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
*/
- if (walsender_ready_to_stop)
+ if (got_STOPPING)
break;
/*
@@ -1401,6 +1417,22 @@ exec_replication_command(const char *cmd_string)
MemoryContext old_context;
/*
+ * If WAL sender has been told that shutdown is getting close, switch its
+ * status accordingly to handle the next replication commands correctly.
+ */
+ if (got_STOPPING)
+ WalSndSetState(WALSNDSTATE_STOPPING);
+
+ /*
+ * Throw error if in stopping mode. We need prevent commands that could
+ * generate WAL while the shutdown checkpoint is being written. To be
+ * safe, we just prohibit all new commands.
+ */
+ if (MyWalSnd->state == WALSNDSTATE_STOPPING)
+ ereport(ERROR,
+ (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
+
+ /*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
* command arrives. Clean up the old stuff if there's anything.
*/
@@ -2128,7 +2160,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
* normal termination at shutdown, or a promotion, the walsender
* is not sure which.
*/
- if (walsender_ready_to_stop)
+ if (got_SIGUSR2)
WalSndDone(send_data);
}
@@ -2443,6 +2475,10 @@ XLogSendPhysical(void)
XLogRecPtr endptr;
Size nbytes;
+ /* If requested switch the WAL sender to the stopping state. */
+ if (got_STOPPING)
+ WalSndSetState(WALSNDSTATE_STOPPING);
+
if (streamingDoneSending)
{
WalSndCaughtUp = true;
@@ -2733,7 +2769,16 @@ XLogSendLogical(void)
* point, then we're caught up.
*/
if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+ {
WalSndCaughtUp = true;
+
+ /*
+ * Have WalSndLoop() terminate the connection in an orderly
+ * manner, after writing out all the pending data.
+ */
+ if (got_STOPPING)
+ got_SIGUSR2 = true;
+ }
}
/* Update shared memory status */
@@ -2843,6 +2888,26 @@ WalSndRqstFileReload(void)
}
}
+/*
+ * Handle PROCSIG_WALSND_INIT_STOPPING signal.
+ */
+void
+HandleWalSndInitStopping(void)
+{
+ Assert(am_walsender);
+
+ /*
+ * If replication has not yet started, die like with SIGTERM. If
+ * replication is active, only set a flag and wake up the main loop. It
+ * will send any outstanding WAL, wait for it to be replicated to the
+ * standby, and then exit gracefully.
+ */
+ if (!replication_active)
+ kill(MyProcPid, SIGTERM);
+ else
+ got_STOPPING = true;
+}
+
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)
@@ -2856,22 +2921,17 @@ WalSndSigHupHandler(SIGNAL_ARGS)
errno = save_errno;
}
-/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
+/*
+ * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
+ * sender should already have been switched to WALSNDSTATE_STOPPING at
+ * this point.
+ */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
int save_errno = errno;
- /*
- * If replication has not yet started, die like with SIGTERM. If
- * replication is active, only set a flag and wake up the main loop. It
- * will send any outstanding WAL, wait for it to be replicated to the
- * standby, and then exit gracefully.
- */
- if (!replication_active)
- kill(MyProcPid, SIGTERM);
-
- walsender_ready_to_stop = true;
+ got_SIGUSR2 = true;
SetLatch(MyLatch);
errno = save_errno;
@@ -2969,6 +3029,77 @@ WalSndWakeup(void)
}
}
+/*
+ * Signal all walsenders to move to stopping state.
+ *
+ * This will trigger walsenders to move to a state where no further WAL can be
+ * generated. See this file's header for details.
+ */
+void
+WalSndInitStopping(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ pid_t pid;
+
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (pid == 0)
+ continue;
+
+ SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
+ }
+}
+
+/*
+ * Wait that all the WAL senders have quit or reached the stopping state. This
+ * is used by the checkpointer to control when the shutdown checkpoint can
+ * safely be performed.
+ */
+void
+WalSndWaitStopping(void)
+{
+ for (;;)
+ {
+ int i;
+ bool all_stopped = true;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ WalSndState state;
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ SpinLockAcquire(&walsnd->mutex);
+
+ if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
+ continue;
+ }
+
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (state != WALSNDSTATE_STOPPING)
+ {
+ all_stopped = false;
+ break;
+ }
+ }
+
+ /* safe to leave if confirmation is done for all WAL senders */
+ if (all_stopped)
+ return;
+
+ pg_usleep(10000L); /* wait for 10 msec */
+ }
+}
+
/* Set state for current walsender (only called in walsender) */
void
WalSndSetState(WalSndState state)
@@ -3002,6 +3133,8 @@ WalSndGetStateString(WalSndState state)
return "catchup";
case WALSNDSTATE_STREAMING:
return "streaming";
+ case WALSNDSTATE_STOPPING:
+ return "stopping";
}
return "UNKNOWN";
}
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 4a21d5512d2..b9302ac630c 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -20,6 +20,7 @@
#include "access/parallel.h"
#include "commands/async.h"
#include "miscadmin.h"
+#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/ipc.h"
#include "storage/proc.h"
@@ -270,6 +271,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
HandleParallelMessageInterrupt();
+ if (CheckProcSignal(PROCSIG_WALSND_INIT_STOPPING))
+ HandleWalSndInitStopping();
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2ca903872e4..c50e450ec2a 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -44,6 +44,9 @@ extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
+extern void WalSndInitStopping(void);
+extern void WalSndWaitStopping(void);
+extern void HandleWalSndInitStopping(void);
extern void WalSndRqstFileReload(void);
/*
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 2c59056cefd..36311e124c4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -24,7 +24,8 @@ typedef enum WalSndState
WALSNDSTATE_STARTUP = 0,
WALSNDSTATE_BACKUP,
WALSNDSTATE_CATCHUP,
- WALSNDSTATE_STREAMING
+ WALSNDSTATE_STREAMING,
+ WALSNDSTATE_STOPPING
} WalSndState;
/*
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index d068dde5d76..553f0f43f7e 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -32,6 +32,7 @@ typedef enum
PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */
PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */
PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
+ PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for shutdown */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,