aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c472
1 files changed, 380 insertions, 92 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 62135037f10..303edb75a32 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -15,6 +15,14 @@
* WalRcv->receivedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay.
*
+ * If the primary server ends streaming, but doesn't disconnect, walreceiver
+ * goes into "waiting" mode, and waits for the startup process to give new
+ * instructions. The startup process will treat that the same as
+ * disconnection, and will rescan the archive/pg_xlog directory. But when the
+ * startup process wants to try streaming replication again, it will just
+ * nudge the existing walreceiver process that's waiting, instead of launching
+ * a new one.
+ *
* Normal termination is by SIGTERM, which instructs the walreceiver to
* exit(0). Emergency termination is by SIGQUIT; like any postmaster child
* process, the walreceiver will simply abort and exit on SIGQUIT. A close
@@ -38,6 +46,7 @@
#include <signal.h>
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog_internal.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
@@ -60,6 +69,10 @@ bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
+walrcv_identify_system_type walrcv_identify_system = NULL;
+walrcv_startstreaming_type walrcv_startstreaming = NULL;
+walrcv_endstreaming_type walrcv_endstreaming = NULL;
+walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
@@ -118,6 +131,8 @@ static volatile bool WalRcvImmediateInterruptOK = false;
static void ProcessWalRcvInterrupts(void);
static void EnableWalRcvImmediateExit(void);
static void DisableWalRcvImmediateExit(void);
+static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
+static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
@@ -128,6 +143,7 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
@@ -171,6 +187,10 @@ WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
+ TimeLineID startpointTLI;
+ TimeLineID primaryTLI;
+ bool first_stream;
+
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp;
@@ -207,17 +227,21 @@ WalReceiverMain(void)
/* The usual case */
break;
- case WALRCV_RUNNING:
+ case WALRCV_WAITING:
+ case WALRCV_STREAMING:
+ case WALRCV_RESTARTING:
+ default:
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
- walrcv->walRcvState = WALRCV_RUNNING;
+ walrcv->walRcvState = WALRCV_STREAMING;
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
+ startpointTLI = walrcv->receiveStartTLI;
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
@@ -227,6 +251,8 @@ WalReceiverMain(void)
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
+ OwnLatch(&walrcv->latch);
+
/*
* If possible, make this process a group leader, so that the postmaster
* can signal any child processes too. (walreceiver probably never has
@@ -246,7 +272,7 @@ WalReceiverMain(void)
pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
pqsignal(SIGUSR2, SIG_IGN);
/* Reset some signals that are accepted by postmaster but not here */
@@ -261,8 +287,12 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_send == NULL || walrcv_disconnect == NULL)
+ if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
+ walrcv_endstreaming == NULL ||
+ walrcv_identify_system == NULL ||
+ walrcv_readtimelinehistoryfile == NULL ||
+ walrcv_receive == NULL || walrcv_send == NULL ||
+ walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
@@ -276,122 +306,360 @@ WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo, startpoint);
+ walrcv_connect(conninfo);
DisableWalRcvImmediateExit();
- /* Initialize LogstreamResult and buffers for processing messages */
- LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
- initStringInfo(&reply_message);
- initStringInfo(&incoming_message);
-
- /* Initialize the last recv timestamp */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
-
- /* Loop until end-of-streaming or error */
+ first_stream = true;
for (;;)
{
- unsigned char type;
- char *buf;
- int len;
-
/*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
+ * Check that we're connected to a valid server using the
+ * IDENTIFY_SYSTEM replication command,
*/
- if (!PostmasterIsAlive())
- exit(1);
+ EnableWalRcvImmediateExit();
+ walrcv_identify_system(&primaryTLI);
+ DisableWalRcvImmediateExit();
/*
- * Exit walreceiver if we're not in recovery. This should not happen,
- * but cross-check the status here.
+ * Confirm that the current timeline of the primary is the same or
+ * ahead of ours.
*/
- if (!RecoveryInProgress())
- ereport(FATAL,
- (errmsg("cannot continue WAL streaming, recovery has already ended")));
-
- /* Process any requests or signals received recently */
- ProcessWalRcvInterrupts();
+ if (primaryTLI < startpointTLI)
+ ereport(ERROR,
+ (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
+ primaryTLI, startpointTLI)));
- if (got_SIGHUP)
- {
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ /*
+ * Get any missing history files. We do this always, even when we're
+ * not interested in that timeline, so that if we're promoted to become
+ * the master later on, we don't select the same timeline that was
+ * already used in the current master. This isn't bullet-proof - you'll
+ * need some external software to manage your cluster if you need to
+ * ensure that a unique timeline id is chosen in every case, but let's
+ * avoid the confusion of timeline id collisions where we can.
+ */
+ WalRcvFetchTimeLineHistoryFiles(startpointTLI + 1, primaryTLI);
- /* Wait a while for data to arrive */
- if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
+ /*
+ * Start streaming.
+ *
+ * We'll try to start at the requested starting point and timeline,
+ * even if it's different from the server's latest timeline. In case
+ * we've already reached the end of the old timeline, the server will
+ * finish the streaming immediately, and we will go back to await
+ * orders from the startup process. If recovery_target_timeline is
+ * 'latest', the startup process will scan pg_xlog and find the new
+ * history file, bump recovery target timeline, and ask us to restart
+ * on the new timeline.
+ */
+ ThisTimeLineID = startpointTLI;
+ if (walrcv_startstreaming(startpointTLI, startpoint))
{
- /* Something was received from master, so reset timeout */
+ bool endofwal = false;
+
+ if (first_stream)
+ ereport(LOG,
+ (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ startpointTLI)));
+ else
+ ereport(LOG,
+ (errmsg("restarted WAL streaming at %X/%X on timeline %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ startpointTLI)));
+ first_stream = false;
+
+ /* Initialize LogstreamResult and buffers for processing messages */
+ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
+ initStringInfo(&reply_message);
+ initStringInfo(&incoming_message);
+
+ /* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
- /* Accept the received data, and process it */
- XLogWalRcvProcessMsg(type, buf, len);
-
- /* Receive any more data we can without sleeping */
- while (walrcv_receive(0, &type, &buf, &len))
+ /* Loop until end-of-streaming or error */
+ while (!endofwal)
{
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
- XLogWalRcvProcessMsg(type, buf, len);
- }
+ char *buf;
+ int len;
- /* Let the master know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid
+ * the necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /*
+ * Exit walreceiver if we're not in recovery. This should not
+ * happen, but cross-check the status here.
+ */
+ if (!RecoveryInProgress())
+ ereport(FATAL,
+ (errmsg("cannot continue WAL streaming, recovery has already ended")));
+
+ /* Process any requests or signals received recently */
+ ProcessWalRcvInterrupts();
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Wait a while for data to arrive */
+ len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ if (len != 0)
+ {
+ /*
+ * Process the received data, and any subsequent data we
+ * can read without blocking.
+ */
+ for (;;)
+ {
+ if (len > 0)
+ {
+ /* Something was received from master, so reset timeout */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+ XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+ }
+ else if (len == 0)
+ break;
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("replication terminated by primary server"),
+ errdetail("End of WAL reached on timeline %u", startpointTLI)));
+ endofwal = true;
+ break;
+ }
+ len = walrcv_receive(0, &buf);
+ }
+
+ /* Let the master know that we received some data. */
+ XLogWalRcvSendReply(false, false);
+
+ /*
+ * If we've written some records, flush them to disk and
+ * let the startup process and primary server know about
+ * them.
+ */
+ XLogWalRcvFlush(false);
+ }
+ else
+ {
+ /*
+ * We didn't receive anything new. If we haven't heard
+ * anything from the server for more than
+ * wal_receiver_timeout / 2, ping the server. Also, if it's
+ * been longer than wal_receiver_status_interval since the
+ * last update we sent, send a status update to the master
+ * anyway, to report any progress in applying WAL.
+ */
+ bool requestReply = false;
+
+ /*
+ * Check if time since last receive from standby has
+ * reached the configured limit.
+ */
+ if (wal_receiver_timeout > 0)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout;
+
+ timeout =
+ TimestampTzPlusMilliseconds(last_recv_timestamp,
+ wal_receiver_timeout);
+
+ if (now >= timeout)
+ ereport(ERROR,
+ (errmsg("terminating walreceiver due to timeout")));
+
+ /*
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
+ */
+ if (!ping_sent)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ (wal_receiver_timeout/2));
+ if (now >= timeout)
+ {
+ requestReply = true;
+ ping_sent = true;
+ }
+ }
+ }
+
+ XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendHSFeedback();
+ }
+ }
/*
- * If we've written some records, flush them to disk and let the
- * startup process and primary server know about them.
+ * The backend finished streaming. Exit streaming COPY-mode from
+ * our side, too.
*/
+ EnableWalRcvImmediateExit();
+ walrcv_endstreaming();
+ DisableWalRcvImmediateExit();
+ }
+ else
+ ereport(LOG,
+ (errmsg("primary server contains no more WAL on requested timeline %u",
+ startpointTLI)));
+
+ /*
+ * End of WAL reached on the requested timeline. Close the last
+ * segment, and await for new orders from the startup process.
+ */
+ if (recvFile >= 0)
+ {
XLogWalRcvFlush(false);
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log segment %s: %m",
+ XLogFileNameP(recvFileTLI, recvSegNo))));
}
+ recvFile = -1;
+
+ elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
+ WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+ }
+ /* not reached */
+}
+
+/*
+ * Wait for startup process to set receiveStart and receiveStartTLI.
+ */
+static void
+WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ int state;
+
+ SpinLockAcquire(&walrcv->mutex);
+ state = walrcv->walRcvState;
+ if (state != WALRCV_STREAMING)
+ {
+ SpinLockRelease(&walrcv->mutex);
+ if (state == WALRCV_STOPPING)
+ proc_exit(0);
else
+ elog(FATAL, "unexpected walreceiver state");
+ }
+ walrcv->walRcvState = WALRCV_WAITING;
+ walrcv->receiveStart = InvalidXLogRecPtr;
+ walrcv->receiveStartTLI = 0;
+ SpinLockRelease(&walrcv->mutex);
+
+ if (update_process_title)
+ set_ps_display("idle", false);
+
+ /*
+ * nudge startup process to notice that we've stopped streaming and are
+ * now waiting for instructions.
+ */
+ WakeupRecovery();
+ for (;;)
+ {
+ ResetLatch(&walrcv->latch);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ ProcessWalRcvInterrupts();
+
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
+ walrcv->walRcvState == WALRCV_WAITING ||
+ walrcv->walRcvState == WALRCV_STOPPING);
+ if (walrcv->walRcvState == WALRCV_RESTARTING)
+ {
+ /* we don't expect primary_conninfo to change */
+ *startpoint = walrcv->receiveStart;
+ *startpointTLI = walrcv->receiveStartTLI;
+ walrcv->walRcvState = WALRCV_STREAMING;
+ SpinLockRelease(&walrcv->mutex);
+ break;
+ }
+ if (walrcv->walRcvState == WALRCV_STOPPING)
{
/*
- * We didn't receive anything new. If we haven't heard anything
- * from the server for more than wal_receiver_timeout / 2,
- * ping the server. Also, if it's been longer than
- * wal_receiver_status_interval since the last update we sent,
- * send a status update to the master anyway, to report any
- * progress in applying WAL.
+ * We should've received SIGTERM if the startup process wants
+ * us to die, but might as well check it here too.
*/
- bool requestReply = false;
+ SpinLockRelease(&walrcv->mutex);
+ exit(1);
+ }
+ SpinLockRelease(&walrcv->mutex);
- /*
- * Check if time since last receive from standby has reached the
- * configured limit.
- */
- if (wal_receiver_timeout > 0)
- {
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
+ WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+ }
+
+ if (update_process_title)
+ {
+ char activitymsg[50];
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
+ snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
+ (uint32) (*startpoint >> 32),
+ (uint32) *startpoint);
+ set_ps_display(activitymsg, false);
+ }
+}
- if (now >= timeout)
- ereport(ERROR,
- (errmsg("terminating walreceiver due to timeout")));
+/*
+ * Fetch any missing timeline history files between 'first' and 'last'
+ * (inclusive) from the server.
+ */
+static void
+WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
+{
+ TimeLineID tli;
- /*
- * We didn't receive anything new, for half of receiver
- * replication timeout. Ping the server.
- */
- if (!ping_sent)
- {
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout/2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
- }
- }
+ for (tli = first; tli <= last; tli++)
+ {
+ if (!existsTimeLineHistory(tli))
+ {
+ char *fname;
+ char *content;
+ int len;
+ char expectedfname[MAXFNAMELEN];
- XLogWalRcvSendReply(requestReply, requestReply);
- XLogWalRcvSendHSFeedback();
+ ereport(LOG,
+ (errmsg("fetching timeline history file for timeline %u from primary server",
+ tli)));
+
+ EnableWalRcvImmediateExit();
+ walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+ DisableWalRcvImmediateExit();
+
+ /*
+ * Check that the filename on the master matches what we calculated
+ * ourselves. This is just a sanity check, it should always match.
+ */
+ TLHistoryFileName(expectedfname, tli);
+ if (strcmp(fname, expectedfname) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
+ tli)));
+
+ /*
+ * Write the file to pg_xlog.
+ */
+ writeTimeLineHistoryFile(tli, content, len);
+
+ pfree(fname);
+ pfree(content);
}
}
}
@@ -408,9 +676,15 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
+ DisownLatch(&walrcv->latch);
+
SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+ Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+ walrcv->walRcvState == WALRCV_RESTARTING ||
+ walrcv->walRcvState == WALRCV_STARTING ||
+ walrcv->walRcvState == WALRCV_WAITING ||
walrcv->walRcvState == WALRCV_STOPPING);
+ Assert(walrcv->pid == MyProcPid);
walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
@@ -418,6 +692,9 @@ WalRcvDie(int code, Datum arg)
/* Terminate the connection gracefully. */
if (walrcv_disconnect != NULL)
walrcv_disconnect();
+
+ /* Wake up the startup process to notice promptly that we're gone */
+ WakeupRecovery();
}
/* SIGHUP: set flag to re-read config file at next convenient time */
@@ -427,6 +704,14 @@ WalRcvSigHupHandler(SIGNAL_ARGS)
got_SIGHUP = true;
}
+
+/* SIGUSR1: used by latch mechanism */
+static void
+WalRcvSigUsr1Handler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
static void
WalRcvShutdownHandler(SIGNAL_ARGS)
@@ -435,6 +720,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
got_SIGTERM = true;
+ SetLatch(&WalRcv->latch);
+
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts();
@@ -661,6 +948,7 @@ XLogWalRcvFlush(bool dying)
{
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
+ walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
@@ -738,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyPtr = GetXLogReplayRecPtr();
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');