aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c566
1 files changed, 566 insertions, 0 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
new file mode 100644
index 00000000000..f805e673e11
--- /dev/null
+++ b/src/backend/replication/walreceiver.c
@@ -0,0 +1,566 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiver.c
+ *
+ * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
+ * is the process in the standby server that takes charge of receiving
+ * XLOG records from a primary server during streaming replication.
+ *
+ * When the startup process determines that it's time to start streaming,
+ * it instructs postmaster to start walreceiver. Walreceiver first connects
+ * connects to the primary server (it will be served by a walsender process
+ * in the primary server), and then keeps receiving XLOG records and
+ * writing them to the disk as long as the connection is alive. As XLOG
+ * records are received and flushed to disk, it updates the
+ * WalRcv->receivedUpTo variable in shared memory, to inform the startup
+ * process of how far it can proceed with XLOG replay.
+ *
+ * Normal termination is by SIGTERM, which instructs the walreceiver to
+ * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
+ * process, the walreceiver will simply abort and exit on SIGQUIT. A close
+ * of the connection and a FATAL error are treated not as a crash but as
+ * normal operation.
+ *
+ * This file contains the server-facing parts of walreceiver. The libpq-
+ * specific parts are in the libpqwalreceiver module. It's loaded
+ * dynamically to avoid linking the server with libpq.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "storage/ipc.h"
+#include "storage/pmsignal.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/resowner.h"
+
+/* libpqreceiver hooks to these when loaded */
+walrcv_connect_type walrcv_connect = NULL;
+walrcv_receive_type walrcv_receive = NULL;
+walrcv_disconnect_type walrcv_disconnect = NULL;
+
+#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
+
+/*
+ * These variables are used similarly to openLogFile/Id/Seg/Off,
+ * but for walreceiver to write the XLOG.
+ */
+static int recvFile = -1;
+static uint32 recvId = 0;
+static uint32 recvSeg = 0;
+static uint32 recvOff = 0;
+
+/*
+ * Flags set by interrupt handlers of walreceiver for later service in the
+ * main loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+static void ProcessWalRcvInterrupts(void);
+static void EnableWalRcvImmediateExit(void);
+static void DisableWalRcvImmediateExit(void);
+
+/*
+ * About SIGTERM handling:
+ *
+ * We can't just exit(1) within SIGTERM signal handler, because the signal
+ * might arrive in the middle of some critical operation, like while we're
+ * holding a spinlock. We also can't just set a flag in signal handler and
+ * check it in the main loop, because we perform some blocking libpq
+ * operations like PQexec(), which can take a long time to finish.
+ *
+ * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
+ * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
+ * sets got_SIGTERM flag, which is checked in the main loop when convenient.
+ *
+ * This is very much like what regular backends do with ImmediateInterruptOK,
+ * ProcessInterrupts() etc.
+ */
+static volatile bool WalRcvImmediateInterruptOK = false;
+
+static void
+ProcessWalRcvInterrupts(void)
+{
+ /*
+ * Although walreceiver interrupt handling doesn't use the same scheme
+ * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we
+ * receive any incoming signals on Win32.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ if (got_SIGTERM)
+ {
+ WalRcvImmediateInterruptOK = false;
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating walreceiver process due to administrator command")));
+ }
+}
+
+static void
+EnableWalRcvImmediateExit()
+{
+ WalRcvImmediateInterruptOK = true;
+ ProcessWalRcvInterrupts();
+}
+
+static void
+DisableWalRcvImmediateExit()
+{
+ WalRcvImmediateInterruptOK = false;
+ ProcessWalRcvInterrupts();
+}
+
+/* Signal handlers */
+static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvShutdownHandler(SIGNAL_ARGS);
+static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+
+/* Prototypes for private functions */
+static void InitWalRcv(void);
+static void WalRcvKill(int code, Datum arg);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+static void XLogWalRcvFlush(void);
+
+/*
+ * LogstreamResult indicates the byte positions that we have already
+ * written/fsynced.
+ */
+static struct
+{
+ XLogRecPtr Write; /* last byte + 1 written out in the standby */
+ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
+} LogstreamResult;
+
+/* Main entry point for walreceiver process */
+void
+WalReceiverMain(void)
+{
+ sigjmp_buf local_sigjmp_buf;
+ MemoryContext walrcv_context;
+ char conninfo[MAXCONNINFO];
+ XLogRecPtr startpoint;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+ if (walrcv_connect == NULL || walrcv_receive == NULL ||
+ walrcv_disconnect == NULL)
+ elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
+ /* Mark walreceiver in progress */
+ InitWalRcv();
+
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (walreceiver probably never has
+ * any child processes, but for consistency we make all postmaster child
+ * processes do this.)
+ */
+#ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+#endif
+
+ /* Properly accept or ignore signals the postmaster might send us */
+ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGINT, SIG_IGN);
+ pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
+ pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR2, SIG_IGN);
+
+ /* Reset some signals that are accepted by postmaster but not here */
+ pqsignal(SIGCHLD, SIG_DFL);
+ pqsignal(SIGTTIN, SIG_DFL);
+ pqsignal(SIGTTOU, SIG_DFL);
+ pqsignal(SIGCONT, SIG_DFL);
+ pqsignal(SIGWINCH, SIG_DFL);
+
+ /* We allow SIGQUIT (quickdie) at all times */
+ sigdelset(&BlockSig, SIGQUIT);
+
+ /*
+ * Create a resource owner to keep track of our resources (not clear that
+ * we need this, but may as well have one).
+ */
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks.
+ */
+ walrcv_context = AllocSetContextCreate(TopMemoryContext,
+ "Wal Receiver",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(walrcv_context);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is heavily based on bgwriter.c, q.v.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Reset WalRcvImmediateInterruptOK */
+ DisableWalRcvImmediateExit();
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /* Disconnect any previous connection. */
+ EnableWalRcvImmediateExit();
+ walrcv_disconnect();
+ DisableWalRcvImmediateExit();
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(walrcv_context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(walrcv_context);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep at least 1 second after any error. A write error is likely
+ * to be repeated, and we don't want to be filling the error logs as
+ * fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /* Unblock signals (they were blocked when the postmaster forked us) */
+ PG_SETMASK(&UnBlockSig);
+
+ /* Fetch connection information from shared memory */
+ SpinLockAcquire(&walrcv->mutex);
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Establish the connection to the primary for XLOG streaming */
+ EnableWalRcvImmediateExit();
+ walrcv_connect(conninfo, startpoint);
+ DisableWalRcvImmediateExit();
+
+ /* Loop until end-of-streaming or error */
+ for (;;)
+ {
+ XLogRecPtr recptr;
+ char *buf;
+ int len;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+
+ /*
+ * Exit walreceiver if we're not in recovery. This should not happen,
+ * but cross-check the status here.
+ */
+ if (!RecoveryInProgress())
+ ereport(FATAL,
+ (errmsg("cannot continue XLOG streaming, recovery has already ended")));
+
+ /* Process any requests or signals received recently */
+ ProcessWalRcvInterrupts();
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Wait a while for data to arrive */
+ if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
+ {
+ /* Write received WAL records to disk */
+ XLogWalRcvWrite(buf, len, recptr);
+
+ /* Receive any more WAL records we can without sleeping */
+ while(walrcv_receive(0, &recptr, &buf, &len))
+ XLogWalRcvWrite(buf, len, recptr);
+
+ /*
+ * Now that we've written some records, flush them to disk and
+ * let the startup process know about them.
+ */
+ XLogWalRcvFlush();
+ }
+ }
+}
+
+/* Advertise our pid in shared memory, so that startup process can kill us. */
+static void
+InitWalRcv(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /*
+ * WalRcv should be set up already (if we are a backend, we inherit
+ * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ */
+ if (walrcv == NULL)
+ elog(PANIC, "walreceiver control data uninitialized");
+
+ /* If we've already been requested to stop, don't start up */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->pid == 0);
+ if (walrcv->walRcvState == WALRCV_STOPPED ||
+ walrcv->walRcvState == WALRCV_STOPPING)
+ {
+ walrcv->walRcvState = WALRCV_STOPPED;
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ }
+ walrcv->pid = MyProcPid;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Arrange to clean up at walreceiver exit */
+ on_shmem_exit(WalRcvKill, 0);
+}
+
+/*
+ * Clear our pid from shared memory at exit.
+ */
+static void
+WalRcvKill(int code, Datum arg)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ bool stopped = false;
+
+ SpinLockAcquire(&walrcv->mutex);
+ if (walrcv->walRcvState == WALRCV_STOPPING ||
+ walrcv->walRcvState == WALRCV_STOPPED)
+ {
+ walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
+ elog(LOG, "walreceiver stopped");
+ }
+ walrcv->pid = 0;
+ SpinLockRelease(&walrcv->mutex);
+
+ walrcv_disconnect();
+
+ /* If requested to stop, tell postmaster to not restart us. */
+ if (stopped)
+ SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+WalRcvSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+}
+
+/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
+static void
+WalRcvShutdownHandler(SIGNAL_ARGS)
+{
+ got_SIGTERM = true;
+
+ /* Don't joggle the elbow of proc_exit */
+ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
+ ProcessWalRcvInterrupts();
+}
+
+/*
+ * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm, so we need to stop what we're doing and
+ * exit.
+ */
+static void
+WalRcvQuickDieHandler(SIGNAL_ARGS)
+{
+ PG_SETMASK(&BlockSig);
+
+ /*
+ * We DO NOT want to run proc_exit() callbacks -- we're here because
+ * shared memory may be corrupted, so we don't want to try to clean up our
+ * transaction. Just nail the windows shut and get out of town. Now that
+ * there's an atexit callback to prevent third-party code from breaking
+ * things by calling exit() directly, we have to reset the callbacks
+ * explicitly to make this work as intended.
+ */
+ on_exit_reset();
+
+ /*
+ * Note we do exit(2) not exit(0). This is to force the postmaster into a
+ * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+ * backend. This is necessary precisely because we don't clean up our
+ * shared memory state. (The "dead man switch" mechanism in pmsignal.c
+ * should ensure the postmaster sees this as a crash, too, but no harm
+ * in being doubly sure.)
+ */
+ exit(2);
+}
+
+/*
+ * Write XLOG data to disk.
+ */
+static void
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+ int startoff;
+ int byteswritten;
+
+ while (nbytes > 0)
+ {
+ int segbytes;
+
+ if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
+ {
+ bool use_existent;
+
+ /*
+ * XLOG segment files will be re-read in recovery operation soon,
+ * so we don't need to advise the OS to release any cache page.
+ */
+ if (recvFile >= 0)
+ {
+ /*
+ * fsync() before we switch to next file. We would otherwise
+ * have to reopen this file to fsync it later
+ */
+ XLogWalRcvFlush();
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log file %u, segment %u: %m",
+ recvId, recvSeg)));
+ }
+ recvFile = -1;
+
+ /* Create/use new log file */
+ XLByteToSeg(recptr, recvId, recvSeg);
+ use_existent = true;
+ recvFile = XLogFileInit(recvId, recvSeg,
+ &use_existent, true);
+ recvOff = 0;
+ }
+
+ /* Calculate the start offset of the received logs */
+ startoff = recptr.xrecoff % XLogSegSize;
+
+ if (startoff + nbytes > XLogSegSize)
+ segbytes = XLogSegSize - startoff;
+ else
+ segbytes = nbytes;
+
+ /* Need to seek in the file? */
+ if (recvOff != startoff)
+ {
+ if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, "
+ "segment %u to offset %u: %m",
+ recvId, recvSeg, startoff)));
+ recvOff = startoff;
+ }
+
+ /* OK to write the logs */
+ errno = 0;
+
+ byteswritten = write(recvFile, buf, segbytes);
+ if (byteswritten <= 0)
+ {
+ /* if write didn't set errno, assume no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to log file %u, segment %u "
+ "at offset %u, length %lu: %m",
+ recvId, recvSeg,
+ recvOff, (unsigned long) segbytes)));
+ }
+
+ /* Update state for write */
+ XLByteAdvance(recptr, byteswritten);
+
+ recvOff += byteswritten;
+ nbytes -= byteswritten;
+ buf += byteswritten;
+
+ LogstreamResult.Write = recptr;
+
+ /*
+ * XXX: Should we signal bgwriter to start a restartpoint
+ * if we've consumed too much xlog since the last one, like
+ * in normal processing? But this is not worth doing unless
+ * a restartpoint can be created independently from a
+ * checkpoint record.
+ */
+ }
+}
+
+/* Flush the log to disk */
+static void
+XLogWalRcvFlush(void)
+{
+ if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ char activitymsg[50];
+
+ issue_xlog_fsync(recvFile, recvId, recvSeg);
+
+ LogstreamResult.Flush = LogstreamResult.Write;
+
+ /* Update shared-memory status */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->receivedUpto = LogstreamResult.Flush;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Report XLOG streaming progress in PS display */
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+ set_ps_display(activitymsg, false);
+ }
+}