aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c587
1 files changed, 463 insertions, 124 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8774d7e8229..aec57f5535f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -7,10 +7,15 @@
* (Note that there can be more than one walsender process concurrently.)
* It is started by the postmaster when the walreceiver of a standby server
* connects to the primary server and requests XLOG streaming replication.
- * It attempts to keep reading XLOG records from the disk and sending them
- * to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is a one-to-one relationship between a connection
- * and a walsender process).
+ *
+ * A walsender is similar to a regular backend, ie. there is a one-to-one
+ * relationship between a connection and a walsender process, but instead
+ * of processing SQL queries, it understands a small set of special
+ * replication-mode commands. The START_REPLICATION command begins streaming
+ * WAL to the client. While streaming, the walsender keeps reading XLOG
+ * records from the disk and sends them to the standby server over the
+ * COPY protocol, until the either side ends the replication by exiting COPY
+ * mode (or until the connection is closed).
*
* Normal termination is by SIGTERM, which instructs the walsender to
* close the connection and exit(0) at next convenient moment. Emergency
@@ -37,6 +42,7 @@
#include <signal.h>
#include <unistd.h>
+#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
@@ -87,8 +93,6 @@ bool am_walsender = false; /* Am I a walsender process ? */
bool am_cascading_walsender = false; /* Am I cascading WAL to
* another standby ? */
-static bool replication_started = false; /* Started streaming yet? */
-
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int wal_sender_timeout = 60 * 1000; /* maximum time to send one
@@ -107,6 +111,16 @@ static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
+ * These variables keep track of the state of the timeline we're currently
+ * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
+ * the timeline is not the latest timeline on this server, and the server's
+ * history forked off from that timeline at sendTimeLineValidUpto.
+ */
+static TimeLineID sendTimeLine = 0;
+static bool sendTimeLineIsHistoric = false;
+static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
+
+/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
@@ -124,9 +138,26 @@ static TimestampTz last_reply_timestamp;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool ping_sent = false;
+/*
+ * While streaming WAL in Copy mode, streamingDoneSending is set to true
+ * after we have sent CopyDone. We should not send any more CopyData messages
+ * after that. streamingDoneReceiving is set to true when we receive CopyDone
+ * from the other end. When both become true, it's time to exit Copy mode.
+ */
+static bool streamingDoneSending;
+static bool streamingDoneReceiving;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_ready_to_stop = false;
+static volatile sig_atomic_t walsender_ready_to_stop = false;
+
+/*
+ * This is set while we are streaming. When not set, SIGUSR2 signal will be
+ * handled like SIGTERM. When set, the main loop is responsible for checking
+ * walsender_ready_to_stop and terminating when it's set (after streaming any
+ * remaining WAL).
+ */
+static volatile sig_atomic_t replication_active = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
@@ -134,7 +165,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void WalSndLoop(void) __attribute__((noreturn));
+static void WalSndLoop(void);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(bool *caughtup);
@@ -164,6 +195,16 @@ InitWalSender(void)
*/
if (am_cascading_walsender)
ThisTimeLineID = GetRecoveryTargetTLI();
+
+ /*
+ * Let postmaster know that we're a WAL sender. Once we've declared us as
+ * a WAL sender process, postmaster will let us outlive the bgwriter and
+ * kill us last in the shutdown sequence, so we get a chance to stream all
+ * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+ * there's no going back, and we mustn't write any WAL records after this.
+ */
+ MarkPostmasterChildWalSender();
+ SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
}
/*
@@ -182,17 +223,16 @@ WalSndErrorCleanup()
sendFile = -1;
}
- /*
- * Don't return back to the command loop after we've started replicating.
- * We've already marked us as an actively streaming WAL sender in the
- * PMSignal slot, and there's currently no way to undo that.
- */
- if (replication_started)
+ replication_active = false;
+ if (walsender_ready_to_stop)
proc_exit(0);
+
+ /* Revert back to startup state */
+ WalSndSetState(WALSNDSTATE_STARTUP);
}
/*
- * IDENTIFY_SYSTEM
+ * Handle the IDENTIFY_SYSTEM command.
*/
static void
IdentifySystem(void)
@@ -210,9 +250,17 @@ IdentifySystem(void)
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
+ am_cascading_walsender = RecoveryInProgress();
+ if (am_cascading_walsender)
+ {
+ logptr = GetStandbyFlushRecPtr();
+ ThisTimeLineID = GetRecoveryTargetTLI();
+ }
+ else
+ logptr = GetInsertRecPtr();
+
+ snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
@@ -261,56 +309,106 @@ IdentifySystem(void)
pq_endmessage(&buf);
}
+
/*
- * Handle START_REPLICATION command.
- *
- * At the moment, this never returns, but an ereport(ERROR) will take us back
- * to the main loop.
+ * Handle TIMELINE_HISTORY command.
*/
static void
-StartReplication(StartReplicationCmd *cmd)
+SendTimeLineHistory(TimeLineHistoryCmd *cmd)
{
StringInfoData buf;
+ char histfname[MAXFNAMELEN];
+ char path[MAXPGPATH];
+ int fd;
+ size_t histfilelen;
+ size_t bytesleft;
/*
- * Let postmaster know that we're streaming. Once we've declared us as a
- * WAL sender process, postmaster will let us outlive the bgwriter and
- * kill us last in the shutdown sequence, so we get a chance to stream all
- * remaining WAL at shutdown, including the shutdown checkpoint. Note that
- * there's no going back, and we mustn't write any WAL records after this.
+ * Reply with a result set with one row, and two columns. The first col
+ * is the name of the history file, 2nd is the contents.
*/
- MarkPostmasterChildWalSender();
- SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
- replication_started = true;
- /*
- * When promoting a cascading standby, postmaster sends SIGUSR2 to any
- * cascading walsenders to kill them. But there is a corner-case where
- * such walsender fails to receive SIGUSR2 and survives a standby
- * promotion unexpectedly. This happens when postmaster sends SIGUSR2
- * before the walsender marks itself as a WAL sender, because postmaster
- * sends SIGUSR2 to only the processes marked as a WAL sender.
- *
- * To avoid this corner-case, if recovery is NOT in progress even though
- * the walsender is cascading one, we do the same thing as SIGUSR2 signal
- * handler does, i.e., set walsender_ready_to_stop to true. Which causes
- * the walsender to end later.
- *
- * When terminating cascading walsenders, usually postmaster writes the
- * log message announcing the terminations. But there is a race condition
- * here. If there is no walsender except this process before reaching
- * here, postmaster thinks that there is no walsender and suppresses that
- * log message. To handle this case, we always emit that log message here.
- * This might cause duplicate log messages, but which is less likely to
- * happen, so it's not worth writing some code to suppress them.
- */
- if (am_cascading_walsender && !RecoveryInProgress())
+ TLHistoryFileName(histfname, cmd->timeline);
+ TLHistoryFilePath(path, cmd->timeline);
+
+ /* Send a RowDescription message */
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 2, 2); /* 2 fields */
+
+ /* first field */
+ pq_sendstring(&buf, "filename"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field */
+ pq_sendstring(&buf, "content"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, BYTEAOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 2, 2); /* # of columns */
+ pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
+ pq_sendbytes(&buf, histfname, strlen(histfname));
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /* Determine file length and send it to client */
+ histfilelen = lseek(fd, 0, SEEK_END);
+ if (histfilelen < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to end of file \"%s\": %m", path)));
+ if (lseek(fd, 0, SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to beginning of file \"%s\": %m", path)));
+
+ pq_sendint(&buf, histfilelen, 4); /* col2 len */
+
+ bytesleft = histfilelen;
+ while (bytesleft > 0)
{
- ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
- walsender_ready_to_stop = true;
+ char rbuf[BLCKSZ];
+ int nread;
+
+ nread = read(fd, rbuf, sizeof(rbuf));
+ if (nread <= 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ path)));
+ pq_sendbytes(&buf, rbuf, nread);
+ bytesleft -= nread;
}
+ CloseTransientFile(fd);
+
+ pq_endmessage(&buf);
+}
+
+/*
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
+ */
+static void
+StartReplication(StartReplicationCmd *cmd)
+{
+ StringInfoData buf;
/*
* We assume here that we're logging enough information in the WAL for
@@ -322,42 +420,144 @@ StartReplication(StartReplicationCmd *cmd)
*/
/*
- * When we first start replication the standby will be behind the primary.
- * For some applications, for example, synchronous replication, it is
- * important to have a clear state for this initial catchup mode, so we
- * can trigger actions when we change streaming state later. We may stay
- * in this state for a long time, which is exactly why we want to be able
- * to monitor whether or not we are still here.
+ * Select the timeline. If it was given explicitly by the client, use
+ * that. Otherwise use the current ThisTimeLineID.
*/
- WalSndSetState(WALSNDSTATE_CATCHUP);
+ if (cmd->timeline != 0)
+ {
+ XLogRecPtr switchpoint;
- /* Send a CopyBothResponse message, and start streaming */
- pq_beginmessage(&buf, 'W');
- pq_sendbyte(&buf, 0);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
- pq_flush();
+ sendTimeLine = cmd->timeline;
+ if (sendTimeLine == ThisTimeLineID)
+ {
+ sendTimeLineIsHistoric = false;
+ sendTimeLineValidUpto = InvalidXLogRecPtr;
+ }
+ else
+ {
+ List *timeLineHistory;
- /*
- * Initialize position to the received one, then the xlog records begin to
- * be shipped from that position
- */
- sentPtr = cmd->startpoint;
+ sendTimeLineIsHistoric = true;
- /* Also update the start position status in shared memory */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
+ /*
+ * Check that the timeline the client requested for exists, and the
+ * requested start location is on that timeline.
+ */
+ timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+ switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+ list_free_deep(timeLineHistory);
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
+ /*
+ * Found the requested timeline in the history. Check that
+ * requested startpoint is on that timeline in our history.
+ *
+ * This is quite loose on purpose. We only check that we didn't
+ * fork off the requested timeline before the switchpoint. We don't
+ * check that we switched *to* it before the requested starting
+ * point. This is because the client can legitimately request to
+ * start replication from the beginning of the WAL segment that
+ * contains switchpoint, but on the new timeline, so that it
+ * doesn't end up with a partial segment. If you ask for a too old
+ * starting point, you'll get an error later when we fail to find
+ * the requested WAL segment in pg_xlog.
+ *
+ * XXX: we could be more strict here and only allow a startpoint
+ * that's older than the switchpoint, if it it's still in the same
+ * WAL segment.
+ */
+ if (!XLogRecPtrIsInvalid(switchpoint) &&
+ XLByteLT(switchpoint, cmd->startpoint))
+ {
+ ereport(ERROR,
+ (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
+ (uint32) (cmd->startpoint >> 32),
+ (uint32) (cmd->startpoint),
+ cmd->timeline),
+ errdetail("This server's history forked from timeline %u at %X/%X",
+ cmd->timeline,
+ (uint32) (switchpoint >> 32),
+ (uint32) (switchpoint))));
+ }
+ sendTimeLineValidUpto = switchpoint;
+ }
+ }
+ else
+ {
+ sendTimeLine = ThisTimeLineID;
+ sendTimeLineValidUpto = InvalidXLogRecPtr;
+ sendTimeLineIsHistoric = false;
}
- SyncRepInitConfig();
+ streamingDoneSending = streamingDoneReceiving = false;
+
+ /* If there is nothing to stream, don't even enter COPY mode */
+ if (!sendTimeLineIsHistoric ||
+ XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
+ {
+ XLogRecPtr FlushPtr;
+ /*
+ * When we first start replication the standby will be behind the primary.
+ * For some applications, for example, synchronous replication, it is
+ * important to have a clear state for this initial catchup mode, so we
+ * can trigger actions when we change streaming state later. We may stay
+ * in this state for a long time, which is exactly why we want to be able
+ * to monitor whether or not we are still here.
+ */
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ /*
+ * Don't allow a request to stream from a future point in WAL that
+ * hasn't been flushed to disk in this server yet.
+ */
+ if (am_cascading_walsender)
+ FlushPtr = GetStandbyFlushRecPtr();
+ else
+ FlushPtr = GetFlushRecPtr();
+ if (XLByteLT(FlushPtr, cmd->startpoint))
+ {
+ ereport(ERROR,
+ (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
+ (uint32) (cmd->startpoint >> 32),
+ (uint32) (cmd->startpoint),
+ (uint32) (FlushPtr >> 32),
+ (uint32) (FlushPtr))));
+ }
+
+ /* Start streaming from the requested point */
+ sentPtr = cmd->startpoint;
- /* Main loop of walsender */
- WalSndLoop();
+ /* Initialize shared memory status, too */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ SyncRepInitConfig();
+
+ /* Main loop of walsender */
+ replication_active = true;
+
+ WalSndLoop();
+
+ replication_active = false;
+ if (walsender_ready_to_stop)
+ proc_exit(0);
+ WalSndSetState(WALSNDSTATE_STARTUP);
+ }
+
+ /* Get out of COPY mode (CommandComplete). */
+ EndCommand("COPY 0", DestRemote);
}
/*
@@ -406,10 +606,13 @@ exec_replication_command(const char *cmd_string)
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
+ case T_TimeLineHistoryCmd:
+ SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ break;
+
default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby query string: %s", cmd_string)));
+ elog(ERROR, "unrecognized replication command node tag: %u",
+ cmd_node->type);
}
/* done */
@@ -421,7 +624,8 @@ exec_replication_command(const char *cmd_string)
}
/*
- * Check if the remote end has closed the connection.
+ * Process any incoming messages while streaming. Also checks if the remote
+ * end has closed the connection.
*/
static void
ProcessRepliesIfAny(void)
@@ -430,7 +634,12 @@ ProcessRepliesIfAny(void)
int r;
bool received = false;
- for (;;)
+ /*
+ * If we already received a CopyDone from the frontend, any subsequent
+ * message is the beginning of a new command, and should be processed in
+ * the main processing loop.
+ */
+ while (!streamingDoneReceiving)
{
r = pq_getbyte_if_available(&firstchar);
if (r < 0)
@@ -459,6 +668,31 @@ ProcessRepliesIfAny(void)
break;
/*
+ * CopyDone means the standby requested to finish streaming.
+ * Reply with CopyDone, if we had not sent that already.
+ */
+ case 'c':
+ if (!streamingDoneSending)
+ {
+ pq_putmessage_noblock('c', NULL, 0);
+ streamingDoneSending = true;
+ }
+
+ /* consume the CopyData message */
+ resetStringInfo(&reply_message);
+ if (pq_getmessage(&reply_message, 0))
+ {
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+
+ streamingDoneReceiving = true;
+ received = true;
+ break;
+
+ /*
* 'X' means that the standby is closing down the socket.
*/
case 'X':
@@ -666,7 +900,10 @@ WalSndLoop(void)
last_reply_timestamp = GetCurrentTimestamp();
ping_sent = false;
- /* Loop forever, unless we get an error */
+ /*
+ * Loop until we reach the end of this timeline or the client requests
+ * to stop streaming.
+ */
for (;;)
{
/* Clear any already-pending wakeups */
@@ -693,6 +930,14 @@ WalSndLoop(void)
ProcessRepliesIfAny();
/*
+ * If we have received CopyDone from the client, sent CopyDone
+ * ourselves, and the output buffer is empty, it's time to exit
+ * streaming.
+ */
+ if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
+ break;
+
+ /*
* If we don't have any pending data in the output buffer, try to send
* some more. If there is some, we don't bother to call XLogSend
* again until we've flushed it ... but we'd better assume we are not
@@ -705,7 +950,7 @@ WalSndLoop(void)
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
- break;
+ goto send_failure;
/* If nothing remains to be sent right now ... */
if (caughtup && !pq_is_send_pending())
@@ -739,7 +984,7 @@ WalSndLoop(void)
if (caughtup && !pq_is_send_pending())
{
/* Inform the standby that XLOG streaming is done */
- pq_puttextmessage('C', "COPY 0");
+ EndCommand("COPY 0", DestRemote);
pq_flush();
proc_exit(0);
@@ -754,14 +999,16 @@ WalSndLoop(void)
* loaded a subset of the available data but then pq_flush_if_writable
* flushed it all --- we should immediately try to send more.
*/
- if (caughtup || pq_is_send_pending())
+ if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
{
TimestampTz timeout = 0;
long sleeptime = 10000; /* 10 s */
int wakeEvents;
- wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE | WL_TIMEOUT;
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
+
+ if (!streamingDoneReceiving)
+ wakeEvents |= WL_SOCKET_READABLE;
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
@@ -813,11 +1060,13 @@ WalSndLoop(void)
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
- break;
+ goto send_failure;
}
}
}
+ return;
+send_failure:
/*
* Get here on send failure. Clean up and exit.
*
@@ -916,7 +1165,7 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
@@ -937,7 +1186,7 @@ retry:
startoff = recptr % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || sendTimeLine != tli)
{
char path[MAXPGPATH];
@@ -945,8 +1194,9 @@ retry:
if (sendFile >= 0)
close(sendFile);
+ sendTimeLine = tli;
XLByteToSeg(recptr, sendSegNo);
- XLogFilePath(path, ThisTimeLineID, sendSegNo);
+ XLogFilePath(path, sendTimeLine, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
@@ -960,7 +1210,7 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(ThisTimeLineID, sendSegNo))));
+ XLogFileNameP(sendTimeLine, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
@@ -977,7 +1227,7 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(ThisTimeLineID, sendSegNo),
+ XLogFileNameP(sendTimeLine, sendSegNo),
startoff)));
sendOff = startoff;
}
@@ -994,7 +1244,7 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- XLogFileNameP(ThisTimeLineID, sendSegNo),
+ XLogFileNameP(sendTimeLine, sendSegNo),
sendOff, (unsigned long) segbytes)));
}
@@ -1019,7 +1269,7 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(ThisTimeLineID, segno))));
+ XLogFileNameP(sendTimeLine, segno))));
/*
* During recovery, the currently-open WAL file might be replaced with the
@@ -1060,10 +1310,17 @@ static void
XLogSend(bool *caughtup)
{
XLogRecPtr SendRqstPtr;
+ XLogRecPtr FlushPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ if (streamingDoneSending)
+ {
+ *caughtup = true;
+ return;
+ }
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
@@ -1073,32 +1330,103 @@ XLogSend(bool *caughtup)
* that gets lost on the master.
*/
if (am_cascading_walsender)
+ FlushPtr = GetStandbyFlushRecPtr();
+ else
+ FlushPtr = GetFlushRecPtr();
+
+ /*
+ * In a cascading standby, the current recovery target timeline can
+ * change, or we can be promoted. In either case, the current timeline
+ * becomes historic. We need to detect that so that we don't try to stream
+ * past the point where we switched to another timeline. It's checked
+ * after calculating FlushPtr, to avoid a race condition: if the timeline
+ * becomes historic just after we checked that it was still current, it
+ * should still be OK to stream it up to the FlushPtr that was calculated
+ * before it became historic.
+ */
+ if (!sendTimeLineIsHistoric && am_cascading_walsender)
{
- TimeLineID currentTargetTLI;
- SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
+ bool becameHistoric = false;
+ TimeLineID targetTLI;
- /*
- * If the recovery target timeline changed, bail out. It's a bit
- * unfortunate that we have to just disconnect, but there is no way
- * to tell the client that the timeline changed. We also don't know
- * exactly where the switch happened, so we cannot safely try to send
- * up to the switchover point before disconnecting.
- */
- if (currentTargetTLI != ThisTimeLineID)
+ if (!RecoveryInProgress())
{
- if (!walsender_ready_to_stop)
- ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
- walsender_ready_to_stop = true;
- *caughtup = true;
- return;
+ /*
+ * We have been promoted. RecoveryInProgress() updated
+ * ThisTimeLineID to the new current timeline.
+ */
+ targetTLI = ThisTimeLineID;
+ am_cascading_walsender = false;
+ becameHistoric = true;
+ }
+ else
+ {
+ /*
+ * Still a cascading standby. But is the timeline we're sending
+ * still the recovery target timeline?
+ */
+ targetTLI = GetRecoveryTargetTLI();
+
+ if (targetTLI != sendTimeLine)
+ becameHistoric = true;
+ }
+
+ if (becameHistoric)
+ {
+ /*
+ * The timeline we were sending has become historic. Read the
+ * timeline history file of the new timeline to see where exactly
+ * we forked off from the timeline we were sending.
+ */
+ List *history;
+
+ history = readTimeLineHistory(targetTLI);
+ sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+ Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
+ list_free_deep(history);
+
+ /* the switchpoint should be >= current send pointer */
+ if (!XLByteLE(sentPtr, sendTimeLineValidUpto))
+ elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
+ sendTimeLine,
+ (uint32) (sendTimeLineValidUpto >> 32),
+ (uint32) sendTimeLineValidUpto,
+ (uint32) (sentPtr >> 32),
+ (uint32) sentPtr);
+
+ sendTimeLineIsHistoric = true;
}
}
+
+ /*
+ * If this is a historic timeline and we've reached the point where we
+ * forked to the next timeline, stop streaming.
+ */
+ if (sendTimeLineIsHistoric && XLByteLE(sendTimeLineValidUpto, sentPtr))
+ {
+ /* close the current file. */
+ if (sendFile >= 0)
+ close(sendFile);
+ sendFile = -1;
+
+ /* Send CopyDone */
+ pq_putmessage_noblock('c', NULL, 0);
+ streamingDoneSending = true;
+
+ *caughtup = true;
+ return;
+ }
+
+ /*
+ * Stream up to the point known to be flushed to disk, or to the end of
+ * this timeline, whichever comes first.
+ */
+ if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr))
+ SendRqstPtr = sendTimeLineValidUpto;
else
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = FlushPtr;
- /* Quick exit if nothing to do */
+ Assert(XLByteLE(sentPtr, SendRqstPtr));
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
@@ -1124,7 +1452,10 @@ XLogSend(bool *caughtup)
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
- *caughtup = true;
+ if (sendTimeLineIsHistoric)
+ *caughtup = false;
+ else
+ *caughtup = true;
}
else
{
@@ -1151,7 +1482,7 @@ XLogSend(bool *caughtup)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(&output_message.data[output_message.len], sendTimeLine, startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
@@ -1242,6 +1573,14 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
{
int save_errno = errno;
+ /*
+ * If replication has not yet started, die like with SIGTERM. If
+ * replication is active, only set a flag and wake up the main loop. It
+ * will send any outstanding WAL, and then exit gracefully.
+ */
+ if (!replication_active)
+ kill(MyProcPid, SIGTERM);
+
walsender_ready_to_stop = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);