From 32bc08b1d45d898aabdeeed09bcf1c2a407bbbed Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 20 Jan 2010 09:16:24 +0000 Subject: Rethink the way walreceiver is linked into the backend. Instead than shoving walreceiver as whole into a dynamically loaded module, split the libpq-specific parts of it into dynamically loaded module and keep the rest in the main backend binary. Although Tom fixed the Windows compilation problems with the old walreceiver module already, this is a cleaner division of labour and makes the code more readable. There's also the prospect of adding new transport methods as pluggable modules in the future, which this patch makes easier, though for now the API between libpqwalreceiver and walreceiver process should be considered private. The libpq-specific module is now in src/backend/replication/libpqwalreceiver, and the part linked with postgres binary is in src/backend/replication/walreceiver.c. --- src/Makefile | 10 +- src/backend/bootstrap/bootstrap.c | 17 +- src/backend/replication/Makefile | 4 +- src/backend/replication/README | 34 +- src/backend/replication/libpqwalreceiver/Makefile | 32 + .../libpqwalreceiver/libpqwalreceiver.c | 317 ++++++++ src/backend/replication/walreceiver.c | 566 +++++++++++++++ src/backend/replication/walreceiver/Makefile | 34 - src/backend/replication/walreceiver/walreceiver.c | 797 --------------------- src/backend/replication/walreceiverfuncs.c | 4 +- src/include/replication/walreceiver.h | 14 +- 11 files changed, 972 insertions(+), 857 deletions(-) create mode 100644 src/backend/replication/libpqwalreceiver/Makefile create mode 100644 src/backend/replication/libpqwalreceiver/libpqwalreceiver.c create mode 100644 src/backend/replication/walreceiver.c delete mode 100644 src/backend/replication/walreceiver/Makefile delete mode 100644 src/backend/replication/walreceiver/walreceiver.c (limited to 'src') diff --git a/src/Makefile b/src/Makefile index f4d726e7694..93b2d17a822 100644 --- a/src/Makefile +++ b/src/Makefile @@ -4,7 +4,7 @@ # # Copyright (c) 1994, Regents of the University of California # -# $PostgreSQL: pgsql/src/Makefile,v 1.49 2010/01/15 17:01:06 heikki Exp $ +# $PostgreSQL: pgsql/src/Makefile,v 1.50 2010/01/20 09:16:23 heikki Exp $ # #------------------------------------------------------------------------- @@ -21,7 +21,7 @@ all install installdirs uninstall distprep: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -52,7 +52,7 @@ clean: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -67,7 +67,7 @@ distclean maintainer-clean: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -82,7 +82,7 @@ coverage: $(MAKE) -C backend/utils/mb/conversion_procs $@ $(MAKE) -C backend/snowball $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 84dd6638efc..39658966085 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.257 2010/01/20 09:16:23 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -425,20 +425,7 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalReceiverProcess: /* don't set signals, walreceiver has its own agenda */ - { - PGFunction WalReceiverMain; - - /* - * Walreceiver is not linked directly into the server - * binary because we would then need to link the server - * with libpq. It's compiled as a dynamically loaded module - * to avoid that. - */ - WalReceiverMain = load_external_function("walreceiver", - "WalReceiverMain", - true, NULL); - WalReceiverMain(NULL); - } + WalReceiverMain(); proc_exit(1); /* should never return */ default: diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 7903c1ac5e4..64a966b1cc7 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -4,7 +4,7 @@ # Makefile for src/backend/replication # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.2 2010/01/20 09:16:24 heikki Exp $ # #------------------------------------------------------------------------- @@ -12,6 +12,6 @@ subdir = src/backend/replication top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = walsender.o walreceiverfuncs.o +OBJS = walsender.o walreceiverfuncs.o walreceiver.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README index 0f40dc79e90..8b15dea58eb 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -1,4 +1,36 @@ -$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $ +$PostgreSQL: pgsql/src/backend/replication/README,v 1.2 2010/01/20 09:16:24 heikki Exp $ + +Walreceiver - libpqwalreceiver API +---------------------------------- + +The transport-specific part of walreceiver, responsible for connecting to +the primary server and receiving WAL files, is loaded dynamically to avoid +having to link the main server binary with libpq. The dynamically loaded +module is in libpqwalreceiver subdirectory. + +The dynamically loaded module implements three functions: + + +bool walrcv_connect(char *conninfo, XLogRecPtr startpoint) + +Establish connection to the primary, and starts streaming from 'startpoint'. +Returns true on success. + + +bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) + +Retrieve any WAL record available through the connection, blocking for +maximum of 'timeout' ms. + + +void walrcv_disconnect(void); + +Disconnect. + + +This API should be considered internal at the moment, but we could open it +up for 3rd party replacements of libpqwalreceiver in the future, allowing +pluggable methods for receiveing WAL. Walreceiver IPC --------------- diff --git a/src/backend/replication/libpqwalreceiver/Makefile b/src/backend/replication/libpqwalreceiver/Makefile new file mode 100644 index 00000000000..df28b90c4ca --- /dev/null +++ b/src/backend/replication/libpqwalreceiver/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/libpqwalreceiver +# +# IDENTIFICATION +# $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/Makefile,v 1.1 2010/01/20 09:16:24 heikki Exp $ +# +#------------------------------------------------------------------------- + +subdir = src/backend/postmaster/libpqwalreceiver +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = libpqwalreceiver.o +SHLIB_LINK = $(libpq) +NAME = libpqwalreceiver + +all: submake-libpq all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c new file mode 100644 index 00000000000..54b86fd1350 --- /dev/null +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -0,0 +1,317 @@ +/*------------------------------------------------------------------------- + * + * libpqwalreceiver.c + * + * This file contains the libpq-specific parts of walreceiver. It's + * loaded as a dynamic module to avoid linking the main server binary with + * libpq. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "libpq-fe.h" +#include "access/xlog.h" +#include "miscadmin.h" +#include "replication/walreceiver.h" +#include "utils/builtins.h" + +#ifdef HAVE_POLL_H +#include +#endif +#ifdef HAVE_SYS_POLL_H +#include +#endif +#ifdef HAVE_SYS_SELECT_H +#include +#endif + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* Current connection to the primary, if any */ +static PGconn *streamConn = NULL; +static bool justconnected = false; + +/* Buffer for currently read records */ +static char *recvBuf = NULL; + +/* Prototypes for interface functions */ +static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); +static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, + int *len); +static void libpqrcv_disconnect(void); + +/* Prototypes for private functions */ +static bool libpq_select(int timeout_ms); + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* Tell walreceiver how to reach us */ + if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect) + elog(ERROR, "libpqwalreceiver already loaded"); + walrcv_connect = libpqrcv_connect; + walrcv_receive = libpqrcv_receive; + walrcv_disconnect = libpqrcv_disconnect; +} + +/* + * Establish the connection to the primary server for XLOG streaming + */ +static bool +libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) +{ + char conninfo_repl[MAXCONNINFO + 14]; + char *primary_sysid; + char standby_sysid[32]; + TimeLineID primary_tli; + TimeLineID standby_tli; + PGresult *res; + char cmd[64]; + + Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0); + + /* Connect */ + snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo); + + streamConn = PQconnectdb(conninfo_repl); + if (PQstatus(streamConn) != CONNECTION_OK) + ereport(ERROR, + (errmsg("could not connect to the primary server : %s", + PQerrorMessage(streamConn)))); + + /* + * Get the system identifier and timeline ID as a DataRow message + * from the primary server. + */ + res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive the SYSID and timeline ID from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", + ntuples, nfields))); + } + primary_sysid = PQgetvalue(res, 0, 0); + primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + + /* + * Confirm that the system identifier of the primary is the same + * as ours. + */ + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + PQclear(res); + ereport(ERROR, + (errmsg("system differs between the primary and standby"), + errdetail("the primary SYSID is %s, standby SYSID is %s", + primary_sysid, standby_sysid))); + } + + /* + * Confirm that the current timeline of the primary is the same + * as the recovery target timeline. + */ + standby_tli = GetRecoveryTargetTLI(); + PQclear(res); + if (primary_tli != standby_tli) + ereport(ERROR, + (errmsg("timeline %u of the primary does not match recovery target timeline %u", + primary_tli, standby_tli))); + ThisTimeLineID = primary_tli; + + /* Start streaming from the point requested by startup process */ + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", + startpoint.xlogid, startpoint.xrecoff); + res = PQexec(streamConn, cmd); + if (PQresultStatus(res) != PGRES_COPY_OUT) + ereport(ERROR, + (errmsg("could not start XLOG streaming: %s", + PQerrorMessage(streamConn)))); + PQclear(res); + + justconnected = true; + + return true; +} + +/* + * Wait until we can read WAL stream, or timeout. + * + * Returns true if data has become available for reading, false if timed out + * or interrupted by signal. + * + * This is based on pqSocketCheck. + */ +static bool +libpq_select(int timeout_ms) +{ + int ret; + + Assert(streamConn != NULL); + if (PQsocket(streamConn) < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + + /* We use poll(2) if available, otherwise select(2) */ + { +#ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = PQsocket(streamConn); + input_fd.events = POLLIN | POLLERR; + input_fd.revents = 0; + + ret = poll(&input_fd, 1, timeout_ms); +#else /* !HAVE_POLL */ + + fd_set input_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(streamConn), &input_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + ret = select(PQsocket(streamConn) + 1, &input_mask, + NULL, NULL, ptr_timeout); +#endif /* HAVE_POLL */ + } + + if (ret == 0 || (ret < 0 && errno == EINTR)) + return false; + if (ret < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + return true; +} + +/* + * Disconnect connection to primary, if any. + */ +static void +libpqrcv_disconnect(void) +{ + PQfinish(streamConn); + streamConn = NULL; + justconnected = false; +} + +/* + * Receive any WAL records available from XLOG stream, blocking for + * maximum of 'timeout' ms. + * + * Returns: + * + * True if data was received. *recptr, *buffer and *len are set to + * the WAL location of the received data, buffer holding it, and length, + * respectively. + * + * False if no data was available within timeout, or wait was interrupted + * by signal. + * + * The buffer returned is only valid until the next call of this function or + * libpq_connect/disconnect. + * + * ereports on error. + */ +static bool +libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) +{ + int rawlen; + + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + + /* + * If the caller requested to block, wait for data to arrive. But if + * this is the first call after connecting, don't wait, because + * there might already be some data in libpq buffer that we haven't + * returned to caller. + */ + if (timeout > 0 && !justconnected) + { + if (!libpq_select(timeout)) + return false; + + if (PQconsumeInput(streamConn) == 0) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + justconnected = false; + + /* Receive CopyData message */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) /* no records available yet, then return */ + return false; + if (rawlen == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("replication terminated by primary server"))); + } + PQclear(res); + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + if (rawlen < -1) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + + if (rawlen < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + /* Return received WAL records to caller */ + *recptr = *((XLogRecPtr *) recvBuf); + *buffer = recvBuf + sizeof(XLogRecPtr); + *len = rawlen - sizeof(XLogRecPtr); + + return true; +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c new file mode 100644 index 00000000000..f805e673e11 --- /dev/null +++ b/src/backend/replication/walreceiver.c @@ -0,0 +1,566 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.c + * + * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It + * is the process in the standby server that takes charge of receiving + * XLOG records from a primary server during streaming replication. + * + * When the startup process determines that it's time to start streaming, + * it instructs postmaster to start walreceiver. Walreceiver first connects + * connects to the primary server (it will be served by a walsender process + * in the primary server), and then keeps receiving XLOG records and + * writing them to the disk as long as the connection is alive. As XLOG + * records are received and flushed to disk, it updates the + * WalRcv->receivedUpTo variable in shared memory, to inform the startup + * process of how far it can proceed with XLOG replay. + * + * Normal termination is by SIGTERM, which instructs the walreceiver to + * exit(0). Emergency termination is by SIGQUIT; like any postmaster child + * process, the walreceiver will simply abort and exit on SIGQUIT. A close + * of the connection and a FATAL error are treated not as a crash but as + * normal operation. + * + * This file contains the server-facing parts of walreceiver. The libpq- + * specific parts are in the libpqwalreceiver module. It's loaded + * dynamically to avoid linking the server with libpq. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "access/xlog_internal.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" + +/* libpqreceiver hooks to these when loaded */ +walrcv_connect_type walrcv_connect = NULL; +walrcv_receive_type walrcv_receive = NULL; +walrcv_disconnect_type walrcv_disconnect = NULL; + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off, + * but for walreceiver to write the XLOG. + */ +static int recvFile = -1; +static uint32 recvId = 0; +static uint32 recvSeg = 0; +static uint32 recvOff = 0; + +/* + * Flags set by interrupt handlers of walreceiver for later service in the + * main loop. + */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGTERM = false; + +static void ProcessWalRcvInterrupts(void); +static void EnableWalRcvImmediateExit(void); +static void DisableWalRcvImmediateExit(void); + +/* + * About SIGTERM handling: + * + * We can't just exit(1) within SIGTERM signal handler, because the signal + * might arrive in the middle of some critical operation, like while we're + * holding a spinlock. We also can't just set a flag in signal handler and + * check it in the main loop, because we perform some blocking libpq + * operations like PQexec(), which can take a long time to finish. + * + * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's + * safe for the signal handler to elog(FATAL) immediately. Otherwise it just + * sets got_SIGTERM flag, which is checked in the main loop when convenient. + * + * This is very much like what regular backends do with ImmediateInterruptOK, + * ProcessInterrupts() etc. + */ +static volatile bool WalRcvImmediateInterruptOK = false; + +static void +ProcessWalRcvInterrupts(void) +{ + /* + * Although walreceiver interrupt handling doesn't use the same scheme + * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we + * receive any incoming signals on Win32. + */ + CHECK_FOR_INTERRUPTS(); + + if (got_SIGTERM) + { + WalRcvImmediateInterruptOK = false; + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); + } +} + +static void +EnableWalRcvImmediateExit() +{ + WalRcvImmediateInterruptOK = true; + ProcessWalRcvInterrupts(); +} + +static void +DisableWalRcvImmediateExit() +{ + WalRcvImmediateInterruptOK = false; + ProcessWalRcvInterrupts(); +} + +/* Signal handlers */ +static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvShutdownHandler(SIGNAL_ARGS); +static void WalRcvQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static void InitWalRcv(void); +static void WalRcvKill(int code, Datum arg); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvFlush(void); + +/* + * LogstreamResult indicates the byte positions that we have already + * written/fsynced. + */ +static struct +{ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; + +/* Main entry point for walreceiver process */ +void +WalReceiverMain(void) +{ + sigjmp_buf local_sigjmp_buf; + MemoryContext walrcv_context; + char conninfo[MAXCONNINFO]; + XLogRecPtr startpoint; + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (walrcv_connect == NULL || walrcv_receive == NULL || + walrcv_disconnect == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + + /* Mark walreceiver in progress */ + InitWalRcv(); + + /* + * If possible, make this process a group leader, so that the postmaster + * can signal any child processes too. (walreceiver probably never has + * any child processes, but for consistency we make all postmaster child + * processes do this.) + */ +#ifdef HAVE_SETSID + if (setsid() < 0) + elog(FATAL, "setsid() failed: %m"); +#endif + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ + pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); + + /* We allow SIGQUIT (quickdie) at all times */ + sigdelset(&BlockSig, SIGQUIT); + + /* + * Create a resource owner to keep track of our resources (not clear that + * we need this, but may as well have one). + */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. + */ + walrcv_context = AllocSetContextCreate(TopMemoryContext, + "Wal Receiver", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(walrcv_context); + + /* + * If an exception is encountered, processing resumes here. + * + * This code is heavily based on bgwriter.c, q.v. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Reset WalRcvImmediateInterruptOK */ + DisableWalRcvImmediateExit(); + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Disconnect any previous connection. */ + EnableWalRcvImmediateExit(); + walrcv_disconnect(); + DisableWalRcvImmediateExit(); + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(walrcv_context); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextResetAndDeleteChildren(walrcv_context); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep at least 1 second after any error. A write error is likely + * to be repeated, and we don't want to be filling the error logs as + * fast as we can. + */ + pg_usleep(1000000L); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Fetch connection information from shared memory */ + SpinLockAcquire(&walrcv->mutex); + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + startpoint = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* Establish the connection to the primary for XLOG streaming */ + EnableWalRcvImmediateExit(); + walrcv_connect(conninfo, startpoint); + DisableWalRcvImmediateExit(); + + /* Loop until end-of-streaming or error */ + for (;;) + { + XLogRecPtr recptr; + char *buf; + int len; + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not happen, + * but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errmsg("cannot continue XLOG streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Wait a while for data to arrive */ + if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) + { + /* Write received WAL records to disk */ + XLogWalRcvWrite(buf, len, recptr); + + /* Receive any more WAL records we can without sleeping */ + while(walrcv_receive(0, &recptr, &buf, &len)) + XLogWalRcvWrite(buf, len, recptr); + + /* + * Now that we've written some records, flush them to disk and + * let the startup process know about them. + */ + XLogWalRcvFlush(); + } + } +} + +/* Advertise our pid in shared memory, so that startup process can kill us. */ +static void +InitWalRcv(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + if (walrcv == NULL) + elog(PANIC, "walreceiver control data uninitialized"); + + /* If we've already been requested to stop, don't start up */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + if (walrcv->walRcvState == WALRCV_STOPPED || + walrcv->walRcvState == WALRCV_STOPPING) + { + walrcv->walRcvState = WALRCV_STOPPED; + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + } + walrcv->pid = MyProcPid; + SpinLockRelease(&walrcv->mutex); + + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvKill, 0); +} + +/* + * Clear our pid from shared memory at exit. + */ +static void +WalRcvKill(int code, Datum arg) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + bool stopped = false; + + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_STOPPING || + walrcv->walRcvState == WALRCV_STOPPED) + { + walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; + elog(LOG, "walreceiver stopped"); + } + walrcv->pid = 0; + SpinLockRelease(&walrcv->mutex); + + walrcv_disconnect(); + + /* If requested to stop, tell postmaster to not restart us. */ + if (stopped) + SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +WalRcvSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ +static void +WalRcvShutdownHandler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Don't joggle the elbow of proc_exit */ + if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) + ProcessWalRcvInterrupts(); +} + +/* + * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, so we need to stop what we're doing and + * exit. + */ +static void +WalRcvQuickDieHandler(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(2) not exit(0). This is to force the postmaster into a + * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random + * backend. This is necessary precisely because we don't clean up our + * shared memory state. (The "dead man switch" mechanism in pmsignal.c + * should ensure the postmaster sees this as a crash, too, but no harm + * in being doubly sure.) + */ + exit(2); +} + +/* + * Write XLOG data to disk. + */ +static void +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) + { + bool use_existent; + + /* + * XLOG segment files will be re-read in recovery operation soon, + * so we don't need to advise the OS to release any cache page. + */ + if (recvFile >= 0) + { + /* + * fsync() before we switch to next file. We would otherwise + * have to reopen this file to fsync it later + */ + XLogWalRcvFlush(); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log file %u, segment %u: %m", + recvId, recvSeg))); + } + recvFile = -1; + + /* Create/use new log file */ + XLByteToSeg(recptr, recvId, recvSeg); + use_existent = true; + recvFile = XLogFileInit(recvId, recvSeg, + &use_existent, true); + recvOff = 0; + } + + /* Calculate the start offset of the received logs */ + startoff = recptr.xrecoff % XLogSegSize; + + if (startoff + nbytes > XLogSegSize) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + /* Need to seek in the file? */ + if (recvOff != startoff) + { + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, " + "segment %u to offset %u: %m", + recvId, recvSeg, startoff))); + recvOff = startoff; + } + + /* OK to write the logs */ + errno = 0; + + byteswritten = write(recvFile, buf, segbytes); + if (byteswritten <= 0) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u " + "at offset %u, length %lu: %m", + recvId, recvSeg, + recvOff, (unsigned long) segbytes))); + } + + /* Update state for write */ + XLByteAdvance(recptr, byteswritten); + + recvOff += byteswritten; + nbytes -= byteswritten; + buf += byteswritten; + + LogstreamResult.Write = recptr; + + /* + * XXX: Should we signal bgwriter to start a restartpoint + * if we've consumed too much xlog since the last one, like + * in normal processing? But this is not worth doing unless + * a restartpoint can be created independently from a + * checkpoint record. + */ + } +} + +/* Flush the log to disk */ +static void +XLogWalRcvFlush(void) +{ + if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write)) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + char activitymsg[50]; + + issue_xlog_fsync(recvFile, recvId, recvSeg); + + LogstreamResult.Flush = LogstreamResult.Write; + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = LogstreamResult.Flush; + SpinLockRelease(&walrcv->mutex); + + /* Report XLOG streaming progress in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); + set_ps_display(activitymsg, false); + } +} diff --git a/src/backend/replication/walreceiver/Makefile b/src/backend/replication/walreceiver/Makefile deleted file mode 100644 index 3376ba6ec87..00000000000 --- a/src/backend/replication/walreceiver/Makefile +++ /dev/null @@ -1,34 +0,0 @@ -#------------------------------------------------------------------------- -# -# Makefile-- -# Makefile for src/backend/replication/walreceiver -# -# IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.4 2010/01/15 21:06:26 tgl Exp $ -# -#------------------------------------------------------------------------- - -subdir = src/backend/replication/walreceiver -top_builddir = ../../../.. -include $(top_builddir)/src/Makefile.global - -override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) - -OBJS = walreceiver.o - -SHLIB_LINK = $(libpq) - -NAME := walreceiver - -all: submake-libpq all-shared-lib - -include $(top_srcdir)/src/Makefile.shlib - -install: all installdirs install-lib - -installdirs: installdirs-lib - -uninstall: uninstall-lib - -clean distclean maintainer-clean: clean-lib - rm -f $(OBJS) diff --git a/src/backend/replication/walreceiver/walreceiver.c b/src/backend/replication/walreceiver/walreceiver.c deleted file mode 100644 index 65b1dfe1e6b..00000000000 --- a/src/backend/replication/walreceiver/walreceiver.c +++ /dev/null @@ -1,797 +0,0 @@ -/*------------------------------------------------------------------------- - * - * walreceiver.c - * - * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It - * is the process in the standby server that takes charge of receiving - * XLOG records from a primary server during streaming replication. - * - * When the startup process determines that it's time to start streaming, - * it instructs postmaster to start walreceiver. Walreceiver first connects - * connects to the primary server (it will be served by a walsender process - * in the primary server), and then keeps receiving XLOG records and - * writing them to the disk as long as the connection is alive. As XLOG - * records are received and flushed to disk, it updates the - * WalRcv->receivedUpTo variable in shared memory, to inform the startup - * process of how far it can proceed with XLOG replay. - * - * Normal termination is by SIGTERM, which instructs the walreceiver to - * exit(0). Emergency termination is by SIGQUIT; like any postmaster child - * process, the walreceiver will simply abort and exit on SIGQUIT. A close - * of the connection and a FATAL error are treated not as a crash but as - * normal operation. - * - * Walreceiver is a postmaster child process like others, but it's compiled - * as a dynamic module to avoid linking libpq with the main server binary. - * - * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group - * - * - * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.2 2010/01/16 01:55:28 momjian Exp $ - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include -#include - -#include "access/xlog_internal.h" -#include "libpq-fe.h" -#include "libpq/pqsignal.h" -#include "miscadmin.h" -#include "replication/walreceiver.h" -#include "storage/ipc.h" -#include "storage/pmsignal.h" -#include "utils/builtins.h" -#include "utils/guc.h" -#include "utils/memutils.h" -#include "utils/ps_status.h" -#include "utils/resowner.h" - -#ifdef HAVE_POLL_H -#include -#endif -#ifdef HAVE_SYS_POLL_H -#include -#endif -#ifdef HAVE_SYS_SELECT_H -#include -#endif - -PG_MODULE_MAGIC; - -PG_FUNCTION_INFO_V1(WalReceiverMain); -Datum WalReceiverMain(PG_FUNCTION_ARGS); - -/* streamConn is a PGconn object of a connection to walsender from walreceiver */ -static PGconn *streamConn = NULL; - -#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ - -/* - * These variables are used similarly to openLogFile/Id/Seg/Off, - * but for walreceiver to write the XLOG. - */ -static int recvFile = -1; -static uint32 recvId = 0; -static uint32 recvSeg = 0; -static uint32 recvOff = 0; - -/* Buffer for currently read records */ -static char *recvBuf = NULL; - -/* Flags set by interrupt handlers of walreceiver for later service in the main loop */ -static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t got_SIGTERM = false; - -static void ProcessWalRcvInterrupts(void); -static void EnableImmediateExit(void); -static void DisableImmediateExit(void); - -/* - * About SIGTERM handling: - * - * We can't just exit(1) within SIGTERM signal handler, because the signal - * might arrive in the middle of some critical operation, like while we're - * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking libpq - * operations like PQexec(), which can take a long time to finish. - * - * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's - * safe for the signal handler to elog(FATAL) immediately. Otherwise it just - * sets got_SIGTERM flag, which is checked in the main loop when convenient. - * - * This is very much like what regular backends do with ImmediateInterruptOK, - * ProcessInterrupts() etc. - */ -static volatile bool WalRcvImmediateInterruptOK = false; - -static void -ProcessWalRcvInterrupts(void) -{ - /* - * Although walreceiver interrupt handling doesn't use the same scheme - * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we - * receive any incoming signals on Win32. - */ - CHECK_FOR_INTERRUPTS(); - - if (got_SIGTERM) - { - WalRcvImmediateInterruptOK = false; - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating walreceiver process due to administrator command"))); - } -} - -static void -EnableImmediateExit() -{ - WalRcvImmediateInterruptOK = true; - ProcessWalRcvInterrupts(); -} - -static void -DisableImmediateExit() -{ - WalRcvImmediateInterruptOK = false; - ProcessWalRcvInterrupts(); -} - -/* Signal handlers */ -static void WalRcvSigHupHandler(SIGNAL_ARGS); -static void WalRcvShutdownHandler(SIGNAL_ARGS); -static void WalRcvQuickDieHandler(SIGNAL_ARGS); - -/* Prototypes for private functions */ -static void WalRcvLoop(void); -static void InitWalRcv(void); -static void WalRcvConnect(void); -static bool WalRcvWait(int timeout_ms); -static void WalRcvKill(int code, Datum arg); -static void XLogRecv(void); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); -static void XLogWalRcvFlush(void); - -/* - * LogstreamResult indicates the byte positions that we have already - * written/fsynced. - */ -static struct -{ - XLogRecPtr Write; /* last byte + 1 written out in the standby */ - XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ -} LogstreamResult; - -/* Main entry point for walreceiver process */ -Datum -WalReceiverMain(PG_FUNCTION_ARGS) -{ - sigjmp_buf local_sigjmp_buf; - MemoryContext walrcv_context; - - /* Mark walreceiver in progress */ - InitWalRcv(); - - /* - * If possible, make this process a group leader, so that the postmaster - * can signal any child processes too. (walreceiver probably never has - * any child processes, but for consistency we make all postmaster child - * processes do this.) - */ -#ifdef HAVE_SETSID - if (setsid() < 0) - elog(FATAL, "setsid() failed: %m"); -#endif - - /* Properly accept or ignore signals the postmaster might send us */ - pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ - pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ - pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ - pqsignal(SIGALRM, SIG_IGN); - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); - pqsignal(SIGUSR2, SIG_IGN); - - /* Reset some signals that are accepted by postmaster but not here */ - pqsignal(SIGCHLD, SIG_DFL); - pqsignal(SIGTTIN, SIG_DFL); - pqsignal(SIGTTOU, SIG_DFL); - pqsignal(SIGCONT, SIG_DFL); - pqsignal(SIGWINCH, SIG_DFL); - - /* We allow SIGQUIT (quickdie) at all times */ - sigdelset(&BlockSig, SIGQUIT); - - /* - * Create a resource owner to keep track of our resources (not clear that - * we need this, but may as well have one). - */ - CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); - - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. - */ - walrcv_context = AllocSetContextCreate(TopMemoryContext, - "Wal Receiver", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walrcv_context); - - /* - * If an exception is encountered, processing resumes here. - * - * This code is heavily based on bgwriter.c, q.v. - */ - if (sigsetjmp(local_sigjmp_buf, 1) != 0) - { - /* Since not using PG_TRY, must reset error stack by hand */ - error_context_stack = NULL; - - /* Reset WalRcvImmediateInterruptOK */ - DisableImmediateExit(); - - /* Prevent interrupts while cleaning up */ - HOLD_INTERRUPTS(); - - /* Report the error to the server log */ - EmitErrorReport(); - - /* Free the data structure related to a connection */ - PQfinish(streamConn); - streamConn = NULL; - if (recvBuf != NULL) - PQfreemem(recvBuf); - recvBuf = NULL; - - /* - * Now return to normal top-level context and clear ErrorContext for - * next time. - */ - MemoryContextSwitchTo(walrcv_context); - FlushErrorState(); - - /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(walrcv_context); - - /* Now we can allow interrupts again */ - RESUME_INTERRUPTS(); - - /* - * Sleep at least 1 second after any error. A write error is likely - * to be repeated, and we don't want to be filling the error logs as - * fast as we can. - */ - pg_usleep(1000000L); - } - - /* We can now handle ereport(ERROR) */ - PG_exception_stack = &local_sigjmp_buf; - - /* Unblock signals (they were blocked when the postmaster forked us) */ - PG_SETMASK(&UnBlockSig); - - /* Establish the connection to the primary for XLOG streaming */ - WalRcvConnect(); - - /* Main loop of walreceiver */ - WalRcvLoop(); - - PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */ -} - -/* Main loop of walreceiver process */ -static void -WalRcvLoop(void) -{ - /* Loop until end-of-streaming or error */ - for (;;) - { - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); - - /* - * Exit walreceiver if we're not in recovery. This should not happen, - * but cross-check the status here. - */ - if (!RecoveryInProgress()) - ereport(FATAL, - (errmsg("cannot continue XLOG streaming, recovery has already ended"))); - - /* Process any requests or signals received recently */ - ProcessWalRcvInterrupts(); - - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } - - /* Wait a while for data to arrive */ - if (WalRcvWait(NAPTIME_PER_CYCLE)) - { - /* data has arrived. Process it */ - if (PQconsumeInput(streamConn) == 0) - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - XLogRecv(); - } - } -} - -/* Advertise our pid in shared memory, so that startup process can kill us. */ -static void -InitWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). - */ - if (walrcv == NULL) - elog(PANIC, "walreceiver control data uninitialized"); - - /* If we've already been requested to stop, don't start up */ - SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->pid == 0); - if (walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_STOPPING) - { - walrcv->walRcvState = WALRCV_STOPPED; - SpinLockRelease(&walrcv->mutex); - proc_exit(1); - } - walrcv->pid = MyProcPid; - SpinLockRelease(&walrcv->mutex); - - /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvKill, 0); -} - -/* - * Establish the connection to the primary server for XLOG streaming - */ -static void -WalRcvConnect(void) -{ - char conninfo[MAXCONNINFO + 14]; - char *primary_sysid; - char standby_sysid[32]; - TimeLineID primary_tli; - TimeLineID standby_tli; - PGresult *res; - XLogRecPtr recptr; - char cmd[64]; - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * Set up a connection for XLOG streaming - */ - SpinLockAcquire(&walrcv->mutex); - snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo); - recptr = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - - /* initialize local XLOG pointers */ - LogstreamResult.Write = LogstreamResult.Flush = recptr; - - Assert(recptr.xlogid != 0 || recptr.xrecoff != 0); - - EnableImmediateExit(); - streamConn = PQconnectdb(conninfo); - DisableImmediateExit(); - if (PQstatus(streamConn) != CONNECTION_OK) - ereport(ERROR, - (errmsg("could not connect to the primary server : %s", - PQerrorMessage(streamConn)))); - - /* - * Get the system identifier and timeline ID as a DataRow message - * from the primary server. - */ - EnableImmediateExit(); - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); - DisableImmediateExit(); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - PQclear(res); - ereport(ERROR, - (errmsg("could not receive the SYSID and timeline ID from " - "the primary server: %s", - PQerrorMessage(streamConn)))); - } - if (PQnfields(res) != 2 || PQntuples(res) != 1) - { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); - PQclear(res); - ereport(ERROR, - (errmsg("invalid response from primary server"), - errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", - ntuples, nfields))); - } - primary_sysid = PQgetvalue(res, 0, 0); - primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); - - /* - * Confirm that the system identifier of the primary is the same - * as ours. - */ - snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, - GetSystemIdentifier()); - if (strcmp(primary_sysid, standby_sysid) != 0) - { - PQclear(res); - ereport(ERROR, - (errmsg("system differs between the primary and standby"), - errdetail("the primary SYSID is %s, standby SYSID is %s", - primary_sysid, standby_sysid))); - } - - /* - * Confirm that the current timeline of the primary is the same - * as the recovery target timeline. - */ - standby_tli = GetRecoveryTargetTLI(); - PQclear(res); - if (primary_tli != standby_tli) - ereport(ERROR, - (errmsg("timeline %u of the primary does not match recovery target timeline %u", - primary_tli, standby_tli))); - ThisTimeLineID = primary_tli; - - /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff); - EnableImmediateExit(); - res = PQexec(streamConn, cmd); - DisableImmediateExit(); - if (PQresultStatus(res) != PGRES_COPY_OUT) - ereport(ERROR, - (errmsg("could not start XLOG streaming: %s", - PQerrorMessage(streamConn)))); - PQclear(res); - - /* - * Process the outstanding messages before beginning to wait for - * new message to arrive. - */ - XLogRecv(); -} - -/* - * Wait until we can read WAL stream, or timeout. - * - * Returns true if data has become available for reading, false if timed out - * or interrupted by signal. - * - * This is based on pqSocketCheck. - */ -static bool -WalRcvWait(int timeout_ms) -{ - int ret; - - Assert(streamConn != NULL); - if (PQsocket(streamConn) < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("socket not open"))); - - /* We use poll(2) if available, otherwise select(2) */ - { -#ifdef HAVE_POLL - struct pollfd input_fd; - - input_fd.fd = PQsocket(streamConn); - input_fd.events = POLLIN | POLLERR; - input_fd.revents = 0; - - ret = poll(&input_fd, 1, timeout_ms); -#else /* !HAVE_POLL */ - - fd_set input_mask; - struct timeval timeout; - struct timeval *ptr_timeout; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(streamConn), &input_mask); - - if (timeout_ms < 0) - ptr_timeout = NULL; - else - { - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms % 1000) * 1000; - ptr_timeout = &timeout; - } - - ret = select(PQsocket(streamConn) + 1, &input_mask, - NULL, NULL, ptr_timeout); -#endif /* HAVE_POLL */ - } - - if (ret == 0 || (ret < 0 && errno == EINTR)) - return false; - if (ret < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed: %m"))); - return true; -} - -/* - * Clear our pid from shared memory at exit. - */ -static void -WalRcvKill(int code, Datum arg) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - bool stopped = false; - - SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STOPPING || - walrcv->walRcvState == WALRCV_STOPPED) - { - walrcv->walRcvState = WALRCV_STOPPED; - stopped = true; - elog(LOG, "walreceiver stopped"); - } - walrcv->pid = 0; - SpinLockRelease(&walrcv->mutex); - - PQfinish(streamConn); - - /* If requested to stop, tell postmaster to not restart us. */ - if (stopped) - SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); -} - -/* SIGHUP: set flag to re-read config file at next convenient time */ -static void -WalRcvSigHupHandler(SIGNAL_ARGS) -{ - got_SIGHUP = true; -} - -/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ -static void -WalRcvShutdownHandler(SIGNAL_ARGS) -{ - got_SIGTERM = true; - - /* Don't joggle the elbow of proc_exit */ - if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) - ProcessWalRcvInterrupts(); -} - -/* - * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. - * - * Some backend has bought the farm, so we need to stop what we're doing and - * exit. - */ -static void -WalRcvQuickDieHandler(SIGNAL_ARGS) -{ - PG_SETMASK(&BlockSig); - - /* - * We DO NOT want to run proc_exit() callbacks -- we're here because - * shared memory may be corrupted, so we don't want to try to clean up our - * transaction. Just nail the windows shut and get out of town. Now that - * there's an atexit callback to prevent third-party code from breaking - * things by calling exit() directly, we have to reset the callbacks - * explicitly to make this work as intended. - */ - on_exit_reset(); - - /* - * Note we do exit(2) not exit(0). This is to force the postmaster into a - * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random - * backend. This is necessary precisely because we don't clean up our - * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm - * in being doubly sure.) - */ - exit(2); -} - -/* - * Receive any WAL records available without blocking from XLOG stream and - * write it to the disk. - */ -static void -XLogRecv(void) -{ - XLogRecPtr *recptr; - int len; - - for (;;) - { - /* Receive CopyData message */ - len = PQgetCopyData(streamConn, &recvBuf, 1); - if (len == 0) /* no records available yet, then return */ - break; - if (len == -1) /* end-of-streaming or error */ - { - PGresult *res; - - res = PQgetResult(streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - PQclear(res); - ereport(ERROR, - (errmsg("replication terminated by primary server"))); - } - PQclear(res); - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - } - if (len < -1) - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - - if (len < sizeof(XLogRecPtr)) - ereport(ERROR, - (errmsg("invalid WAL message received from primary"))); - - /* Write received WAL records to disk */ - recptr = (XLogRecPtr *) recvBuf; - XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr), - len - sizeof(XLogRecPtr), *recptr); - - if (recvBuf != NULL) - PQfreemem(recvBuf); - recvBuf = NULL; - } - - /* - * Now that we've written some records, flush them to disk and let the - * startup process know about them. - */ - XLogWalRcvFlush(); -} - -/* - * Write XLOG data to disk. - */ -static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) -{ - int startoff; - int byteswritten; - - while (nbytes > 0) - { - int segbytes; - - if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) - { - bool use_existent; - - /* - * XLOG segment files will be re-read in recovery operation soon, - * so we don't need to advise the OS to release any cache page. - */ - if (recvFile >= 0) - { - /* - * fsync() before we switch to next file. We would otherwise - * have to reopen this file to fsync it later - */ - XLogWalRcvFlush(); - if (close(recvFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log file %u, segment %u: %m", - recvId, recvSeg))); - } - recvFile = -1; - - /* Create/use new log file */ - XLByteToSeg(recptr, recvId, recvSeg); - use_existent = true; - recvFile = XLogFileInit(recvId, recvSeg, - &use_existent, true); - recvOff = 0; - } - - /* Calculate the start offset of the received logs */ - startoff = recptr.xrecoff % XLogSegSize; - - if (startoff + nbytes > XLogSegSize) - segbytes = XLogSegSize - startoff; - else - segbytes = nbytes; - - /* Need to seek in the file? */ - if (recvOff != startoff) - { - if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in log file %u, " - "segment %u to offset %u: %m", - recvId, recvSeg, startoff))); - recvOff = startoff; - } - - /* OK to write the logs */ - errno = 0; - - byteswritten = write(recvFile, buf, segbytes); - if (byteswritten <= 0) - { - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log file %u, segment %u " - "at offset %u, length %lu: %m", - recvId, recvSeg, - recvOff, (unsigned long) segbytes))); - } - - /* Update state for write */ - XLByteAdvance(recptr, byteswritten); - - recvOff += byteswritten; - nbytes -= byteswritten; - buf += byteswritten; - - LogstreamResult.Write = recptr; - - /* - * XXX: Should we signal bgwriter to start a restartpoint - * if we've consumed too much xlog since the last one, like - * in normal processing? But this is not worth doing unless - * a restartpoint can be created independently from a - * checkpoint record. - */ - } -} - -/* Flush the log to disk */ -static void -XLogWalRcvFlush(void) -{ - if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write)) - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - char activitymsg[50]; - - issue_xlog_fsync(recvFile, recvId, recvSeg); - - LogstreamResult.Flush = LogstreamResult.Write; - - /* Update shared-memory status */ - SpinLockAcquire(&walrcv->mutex); - walrcv->receivedUpto = LogstreamResult.Flush; - SpinLockRelease(&walrcv->mutex); - - /* Report XLOG streaming progress in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", - LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); - set_ps_display(activitymsg, false); - } -} diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 4342e252d65..c1d7b558874 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -4,13 +4,13 @@ * * This file contains functions used by the startup process to communicate * with the walreceiver process. Functions implementing walreceiver itself - * are in src/backend/replication/walreceiver subdirectory. + * are in walreceiver.c. * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ * *------------------------------------------------------------------------- */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f848a9e509c..57de368d41f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,13 +5,14 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.2 2010/01/16 00:04:41 tgl Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.3 2010/01/20 09:16:24 heikki Exp $ * *------------------------------------------------------------------------- */ #ifndef _WALRECEIVER_H #define _WALRECEIVER_H +#include "access/xlogdefs.h" #include "storage/spin.h" /* @@ -60,6 +61,17 @@ typedef struct extern PGDLLIMPORT WalRcvData *WalRcv; +/* libpqwalreceiver hooks */ +typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); +extern PGDLLIMPORT walrcv_connect_type walrcv_connect; + +typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len); +extern PGDLLIMPORT walrcv_receive_type walrcv_receive; + +typedef void (*walrcv_disconnect_type) (void); +extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; + +extern void WalReceiverMain(void); extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); extern bool WalRcvInProgress(void); -- cgit v1.2.3