diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 587 |
1 files changed, 463 insertions, 124 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8774d7e8229..aec57f5535f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -7,10 +7,15 @@ * (Note that there can be more than one walsender process concurrently.) * It is started by the postmaster when the walreceiver of a standby server * connects to the primary server and requests XLOG streaming replication. - * It attempts to keep reading XLOG records from the disk and sending them - * to the standby server, as long as the connection is alive (i.e., like - * any backend, there is a one-to-one relationship between a connection - * and a walsender process). + * + * A walsender is similar to a regular backend, ie. there is a one-to-one + * relationship between a connection and a walsender process, but instead + * of processing SQL queries, it understands a small set of special + * replication-mode commands. The START_REPLICATION command begins streaming + * WAL to the client. While streaming, the walsender keeps reading XLOG + * records from the disk and sends them to the standby server over the + * COPY protocol, until the either side ends the replication by exiting COPY + * mode (or until the connection is closed). * * Normal termination is by SIGTERM, which instructs the walsender to * close the connection and exit(0) at next convenient moment. Emergency @@ -37,6 +42,7 @@ #include <signal.h> #include <unistd.h> +#include "access/timeline.h" #include "access/transam.h" #include "access/xlog_internal.h" #include "catalog/pg_type.h" @@ -87,8 +93,6 @@ bool am_walsender = false; /* Am I a walsender process ? */ bool am_cascading_walsender = false; /* Am I cascading WAL to * another standby ? */ -static bool replication_started = false; /* Started streaming yet? */ - /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int wal_sender_timeout = 60 * 1000; /* maximum time to send one @@ -107,6 +111,16 @@ static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; /* + * These variables keep track of the state of the timeline we're currently + * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, + * the timeline is not the latest timeline on this server, and the server's + * history forked off from that timeline at sendTimeLineValidUpto. + */ +static TimeLineID sendTimeLine = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; + +/* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ @@ -124,9 +138,26 @@ static TimestampTz last_reply_timestamp; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool ping_sent = false; +/* + * While streaming WAL in Copy mode, streamingDoneSending is set to true + * after we have sent CopyDone. We should not send any more CopyData messages + * after that. streamingDoneReceiving is set to true when we receive CopyDone + * from the other end. When both become true, it's time to exit Copy mode. + */ +static bool streamingDoneSending; +static bool streamingDoneReceiving; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t walsender_ready_to_stop = false; +static volatile sig_atomic_t walsender_ready_to_stop = false; + +/* + * This is set while we are streaming. When not set, SIGUSR2 signal will be + * handled like SIGTERM. When set, the main loop is responsible for checking + * walsender_ready_to_stop and terminating when it's set (after streaming any + * remaining WAL). + */ +static volatile sig_atomic_t replication_active = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); @@ -134,7 +165,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void WalSndLoop(void) __attribute__((noreturn)); +static void WalSndLoop(void); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(bool *caughtup); @@ -164,6 +195,16 @@ InitWalSender(void) */ if (am_cascading_walsender) ThisTimeLineID = GetRecoveryTargetTLI(); + + /* + * Let postmaster know that we're a WAL sender. Once we've declared us as + * a WAL sender process, postmaster will let us outlive the bgwriter and + * kill us last in the shutdown sequence, so we get a chance to stream all + * remaining WAL at shutdown, including the shutdown checkpoint. Note that + * there's no going back, and we mustn't write any WAL records after this. + */ + MarkPostmasterChildWalSender(); + SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); } /* @@ -182,17 +223,16 @@ WalSndErrorCleanup() sendFile = -1; } - /* - * Don't return back to the command loop after we've started replicating. - * We've already marked us as an actively streaming WAL sender in the - * PMSignal slot, and there's currently no way to undo that. - */ - if (replication_started) + replication_active = false; + if (walsender_ready_to_stop) proc_exit(0); + + /* Revert back to startup state */ + WalSndSetState(WALSNDSTATE_STARTUP); } /* - * IDENTIFY_SYSTEM + * Handle the IDENTIFY_SYSTEM command. */ static void IdentifySystem(void) @@ -210,9 +250,17 @@ IdentifySystem(void) snprintf(sysid, sizeof(sysid), UINT64_FORMAT, GetSystemIdentifier()); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr(); + am_cascading_walsender = RecoveryInProgress(); + if (am_cascading_walsender) + { + logptr = GetStandbyFlushRecPtr(); + ThisTimeLineID = GetRecoveryTargetTLI(); + } + else + logptr = GetInsertRecPtr(); + + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); @@ -261,56 +309,106 @@ IdentifySystem(void) pq_endmessage(&buf); } + /* - * Handle START_REPLICATION command. - * - * At the moment, this never returns, but an ereport(ERROR) will take us back - * to the main loop. + * Handle TIMELINE_HISTORY command. */ static void -StartReplication(StartReplicationCmd *cmd) +SendTimeLineHistory(TimeLineHistoryCmd *cmd) { StringInfoData buf; + char histfname[MAXFNAMELEN]; + char path[MAXPGPATH]; + int fd; + size_t histfilelen; + size_t bytesleft; /* - * Let postmaster know that we're streaming. Once we've declared us as a - * WAL sender process, postmaster will let us outlive the bgwriter and - * kill us last in the shutdown sequence, so we get a chance to stream all - * remaining WAL at shutdown, including the shutdown checkpoint. Note that - * there's no going back, and we mustn't write any WAL records after this. + * Reply with a result set with one row, and two columns. The first col + * is the name of the history file, 2nd is the contents. */ - MarkPostmasterChildWalSender(); - SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); - replication_started = true; - /* - * When promoting a cascading standby, postmaster sends SIGUSR2 to any - * cascading walsenders to kill them. But there is a corner-case where - * such walsender fails to receive SIGUSR2 and survives a standby - * promotion unexpectedly. This happens when postmaster sends SIGUSR2 - * before the walsender marks itself as a WAL sender, because postmaster - * sends SIGUSR2 to only the processes marked as a WAL sender. - * - * To avoid this corner-case, if recovery is NOT in progress even though - * the walsender is cascading one, we do the same thing as SIGUSR2 signal - * handler does, i.e., set walsender_ready_to_stop to true. Which causes - * the walsender to end later. - * - * When terminating cascading walsenders, usually postmaster writes the - * log message announcing the terminations. But there is a race condition - * here. If there is no walsender except this process before reaching - * here, postmaster thinks that there is no walsender and suppresses that - * log message. To handle this case, we always emit that log message here. - * This might cause duplicate log messages, but which is less likely to - * happen, so it's not worth writing some code to suppress them. - */ - if (am_cascading_walsender && !RecoveryInProgress()) + TLHistoryFileName(histfname, cmd->timeline); + TLHistoryFilePath(path, cmd->timeline); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "filename"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "content"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, BYTEAOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ + pq_sendbytes(&buf, histfname, strlen(histfname)); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* Determine file length and send it to client */ + histfilelen = lseek(fd, 0, SEEK_END); + if (histfilelen < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to end of file \"%s\": %m", path))); + if (lseek(fd, 0, SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to beginning of file \"%s\": %m", path))); + + pq_sendint(&buf, histfilelen, 4); /* col2 len */ + + bytesleft = histfilelen; + while (bytesleft > 0) { - ereport(LOG, - (errmsg("terminating walsender process to force cascaded standby " - "to update timeline and reconnect"))); - walsender_ready_to_stop = true; + char rbuf[BLCKSZ]; + int nread; + + nread = read(fd, rbuf, sizeof(rbuf)); + if (nread <= 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + pq_sendbytes(&buf, rbuf, nread); + bytesleft -= nread; } + CloseTransientFile(fd); + + pq_endmessage(&buf); +} + +/* + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. + */ +static void +StartReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; /* * We assume here that we're logging enough information in the WAL for @@ -322,42 +420,144 @@ StartReplication(StartReplicationCmd *cmd) */ /* - * When we first start replication the standby will be behind the primary. - * For some applications, for example, synchronous replication, it is - * important to have a clear state for this initial catchup mode, so we - * can trigger actions when we change streaming state later. We may stay - * in this state for a long time, which is exactly why we want to be able - * to monitor whether or not we are still here. + * Select the timeline. If it was given explicitly by the client, use + * that. Otherwise use the current ThisTimeLineID. */ - WalSndSetState(WALSNDSTATE_CATCHUP); + if (cmd->timeline != 0) + { + XLogRecPtr switchpoint; - /* Send a CopyBothResponse message, and start streaming */ - pq_beginmessage(&buf, 'W'); - pq_sendbyte(&buf, 0); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); - pq_flush(); + sendTimeLine = cmd->timeline; + if (sendTimeLine == ThisTimeLineID) + { + sendTimeLineIsHistoric = false; + sendTimeLineValidUpto = InvalidXLogRecPtr; + } + else + { + List *timeLineHistory; - /* - * Initialize position to the received one, then the xlog records begin to - * be shipped from that position - */ - sentPtr = cmd->startpoint; + sendTimeLineIsHistoric = true; - /* Also update the start position status in shared memory */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; + /* + * Check that the timeline the client requested for exists, and the + * requested start location is on that timeline. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory); + list_free_deep(timeLineHistory); - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); + /* + * Found the requested timeline in the history. Check that + * requested startpoint is on that timeline in our history. + * + * This is quite loose on purpose. We only check that we didn't + * fork off the requested timeline before the switchpoint. We don't + * check that we switched *to* it before the requested starting + * point. This is because the client can legitimately request to + * start replication from the beginning of the WAL segment that + * contains switchpoint, but on the new timeline, so that it + * doesn't end up with a partial segment. If you ask for a too old + * starting point, you'll get an error later when we fail to find + * the requested WAL segment in pg_xlog. + * + * XXX: we could be more strict here and only allow a startpoint + * that's older than the switchpoint, if it it's still in the same + * WAL segment. + */ + if (!XLogRecPtrIsInvalid(switchpoint) && + XLByteLT(switchpoint, cmd->startpoint)) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", + (uint32) (cmd->startpoint >> 32), + (uint32) (cmd->startpoint), + cmd->timeline), + errdetail("This server's history forked from timeline %u at %X/%X", + cmd->timeline, + (uint32) (switchpoint >> 32), + (uint32) (switchpoint)))); + } + sendTimeLineValidUpto = switchpoint; + } + } + else + { + sendTimeLine = ThisTimeLineID; + sendTimeLineValidUpto = InvalidXLogRecPtr; + sendTimeLineIsHistoric = false; } - SyncRepInitConfig(); + streamingDoneSending = streamingDoneReceiving = false; + + /* If there is nothing to stream, don't even enter COPY mode */ + if (!sendTimeLineIsHistoric || + XLByteLT(cmd->startpoint, sendTimeLineValidUpto)) + { + XLogRecPtr FlushPtr; + /* + * When we first start replication the standby will be behind the primary. + * For some applications, for example, synchronous replication, it is + * important to have a clear state for this initial catchup mode, so we + * can trigger actions when we change streaming state later. We may stay + * in this state for a long time, which is exactly why we want to be able + * to monitor whether or not we are still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* + * Don't allow a request to stream from a future point in WAL that + * hasn't been flushed to disk in this server yet. + */ + if (am_cascading_walsender) + FlushPtr = GetStandbyFlushRecPtr(); + else + FlushPtr = GetFlushRecPtr(); + if (XLByteLT(FlushPtr, cmd->startpoint)) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", + (uint32) (cmd->startpoint >> 32), + (uint32) (cmd->startpoint), + (uint32) (FlushPtr >> 32), + (uint32) (FlushPtr)))); + } + + /* Start streaming from the requested point */ + sentPtr = cmd->startpoint; - /* Main loop of walsender */ - WalSndLoop(); + /* Initialize shared memory status, too */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + SyncRepInitConfig(); + + /* Main loop of walsender */ + replication_active = true; + + WalSndLoop(); + + replication_active = false; + if (walsender_ready_to_stop) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + } + + /* Get out of COPY mode (CommandComplete). */ + EndCommand("COPY 0", DestRemote); } /* @@ -406,10 +606,13 @@ exec_replication_command(const char *cmd_string) SendBaseBackup((BaseBackupCmd *) cmd_node); break; + case T_TimeLineHistoryCmd: + SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + break; + default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", cmd_string))); + elog(ERROR, "unrecognized replication command node tag: %u", + cmd_node->type); } /* done */ @@ -421,7 +624,8 @@ exec_replication_command(const char *cmd_string) } /* - * Check if the remote end has closed the connection. + * Process any incoming messages while streaming. Also checks if the remote + * end has closed the connection. */ static void ProcessRepliesIfAny(void) @@ -430,7 +634,12 @@ ProcessRepliesIfAny(void) int r; bool received = false; - for (;;) + /* + * If we already received a CopyDone from the frontend, any subsequent + * message is the beginning of a new command, and should be processed in + * the main processing loop. + */ + while (!streamingDoneReceiving) { r = pq_getbyte_if_available(&firstchar); if (r < 0) @@ -459,6 +668,31 @@ ProcessRepliesIfAny(void) break; /* + * CopyDone means the standby requested to finish streaming. + * Reply with CopyDone, if we had not sent that already. + */ + case 'c': + if (!streamingDoneSending) + { + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + } + + /* consume the CopyData message */ + resetStringInfo(&reply_message); + if (pq_getmessage(&reply_message, 0)) + { + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + + streamingDoneReceiving = true; + received = true; + break; + + /* * 'X' means that the standby is closing down the socket. */ case 'X': @@ -666,7 +900,10 @@ WalSndLoop(void) last_reply_timestamp = GetCurrentTimestamp(); ping_sent = false; - /* Loop forever, unless we get an error */ + /* + * Loop until we reach the end of this timeline or the client requests + * to stop streaming. + */ for (;;) { /* Clear any already-pending wakeups */ @@ -693,6 +930,14 @@ WalSndLoop(void) ProcessRepliesIfAny(); /* + * If we have received CopyDone from the client, sent CopyDone + * ourselves, and the output buffer is empty, it's time to exit + * streaming. + */ + if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving) + break; + + /* * If we don't have any pending data in the output buffer, try to send * some more. If there is some, we don't bother to call XLogSend * again until we've flushed it ... but we'd better assume we are not @@ -705,7 +950,7 @@ WalSndLoop(void) /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) - break; + goto send_failure; /* If nothing remains to be sent right now ... */ if (caughtup && !pq_is_send_pending()) @@ -739,7 +984,7 @@ WalSndLoop(void) if (caughtup && !pq_is_send_pending()) { /* Inform the standby that XLOG streaming is done */ - pq_puttextmessage('C', "COPY 0"); + EndCommand("COPY 0", DestRemote); pq_flush(); proc_exit(0); @@ -754,14 +999,16 @@ WalSndLoop(void) * loaded a subset of the available data but then pq_flush_if_writable * flushed it all --- we should immediately try to send more. */ - if (caughtup || pq_is_send_pending()) + if ((caughtup && !streamingDoneSending) || pq_is_send_pending()) { TimestampTz timeout = 0; long sleeptime = 10000; /* 10 s */ int wakeEvents; - wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_READABLE | WL_TIMEOUT; + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT; + + if (!streamingDoneReceiving) + wakeEvents |= WL_SOCKET_READABLE; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; @@ -813,11 +1060,13 @@ WalSndLoop(void) */ ereport(COMMERROR, (errmsg("terminating walsender process due to replication timeout"))); - break; + goto send_failure; } } } + return; +send_failure: /* * Get here on send failure. Clean up and exit. * @@ -916,7 +1165,7 @@ WalSndKill(int code, Datum arg) * more than one. */ void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -937,7 +1186,7 @@ retry: startoff = recptr % XLogSegSize; - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || sendTimeLine != tli) { char path[MAXPGPATH]; @@ -945,8 +1194,9 @@ retry: if (sendFile >= 0) close(sendFile); + sendTimeLine = tli; XLByteToSeg(recptr, sendSegNo); - XLogFilePath(path, ThisTimeLineID, sendSegNo); + XLogFilePath(path, sendTimeLine, sendSegNo); sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (sendFile < 0) @@ -960,7 +1210,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(ThisTimeLineID, sendSegNo)))); + XLogFileNameP(sendTimeLine, sendSegNo)))); else ereport(ERROR, (errcode_for_file_access(), @@ -977,7 +1227,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(ThisTimeLineID, sendSegNo), + XLogFileNameP(sendTimeLine, sendSegNo), startoff))); sendOff = startoff; } @@ -994,7 +1244,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %lu: %m", - XLogFileNameP(ThisTimeLineID, sendSegNo), + XLogFileNameP(sendTimeLine, sendSegNo), sendOff, (unsigned long) segbytes))); } @@ -1019,7 +1269,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(ThisTimeLineID, segno)))); + XLogFileNameP(sendTimeLine, segno)))); /* * During recovery, the currently-open WAL file might be replaced with the @@ -1060,10 +1310,17 @@ static void XLogSend(bool *caughtup) { XLogRecPtr SendRqstPtr; + XLogRecPtr FlushPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + if (streamingDoneSending) + { + *caughtup = true; + return; + } + /* * Attempt to send all data that's already been written out and fsync'd to * disk. We cannot go further than what's been written out given the @@ -1073,32 +1330,103 @@ XLogSend(bool *caughtup) * that gets lost on the master. */ if (am_cascading_walsender) + FlushPtr = GetStandbyFlushRecPtr(); + else + FlushPtr = GetFlushRecPtr(); + + /* + * In a cascading standby, the current recovery target timeline can + * change, or we can be promoted. In either case, the current timeline + * becomes historic. We need to detect that so that we don't try to stream + * past the point where we switched to another timeline. It's checked + * after calculating FlushPtr, to avoid a race condition: if the timeline + * becomes historic just after we checked that it was still current, it + * should still be OK to stream it up to the FlushPtr that was calculated + * before it became historic. + */ + if (!sendTimeLineIsHistoric && am_cascading_walsender) { - TimeLineID currentTargetTLI; - SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI); + bool becameHistoric = false; + TimeLineID targetTLI; - /* - * If the recovery target timeline changed, bail out. It's a bit - * unfortunate that we have to just disconnect, but there is no way - * to tell the client that the timeline changed. We also don't know - * exactly where the switch happened, so we cannot safely try to send - * up to the switchover point before disconnecting. - */ - if (currentTargetTLI != ThisTimeLineID) + if (!RecoveryInProgress()) { - if (!walsender_ready_to_stop) - ereport(LOG, - (errmsg("terminating walsender process to force cascaded standby " - "to update timeline and reconnect"))); - walsender_ready_to_stop = true; - *caughtup = true; - return; + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + targetTLI = ThisTimeLineID; + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the recovery target timeline? + */ + targetTLI = GetRecoveryTargetTLI(); + + if (targetTLI != sendTimeLine) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + List *history; + + history = readTimeLineHistory(targetTLI); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); + Assert(XLByteLE(sentPtr, sendTimeLineValidUpto)); + list_free_deep(history); + + /* the switchpoint should be >= current send pointer */ + if (!XLByteLE(sentPtr, sendTimeLineValidUpto)) + elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X", + sendTimeLine, + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), + (uint32) sentPtr); + + sendTimeLineIsHistoric = true; } } + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, stop streaming. + */ + if (sendTimeLineIsHistoric && XLByteLE(sendTimeLineValidUpto, sentPtr)) + { + /* close the current file. */ + if (sendFile >= 0) + close(sendFile); + sendFile = -1; + + /* Send CopyDone */ + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + + *caughtup = true; + return; + } + + /* + * Stream up to the point known to be flushed to disk, or to the end of + * this timeline, whichever comes first. + */ + if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr)) + SendRqstPtr = sendTimeLineValidUpto; else - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = FlushPtr; - /* Quick exit if nothing to do */ + Assert(XLByteLE(sentPtr, SendRqstPtr)); if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; @@ -1124,7 +1452,10 @@ XLogSend(bool *caughtup) if (XLByteLE(SendRqstPtr, endptr)) { endptr = SendRqstPtr; - *caughtup = true; + if (sendTimeLineIsHistoric) + *caughtup = false; + else + *caughtup = true; } else { @@ -1151,7 +1482,7 @@ XLogSend(bool *caughtup) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(&output_message.data[output_message.len], sendTimeLine, startptr, nbytes); output_message.len += nbytes; output_message.data[output_message.len] = '\0'; @@ -1242,6 +1573,14 @@ WalSndLastCycleHandler(SIGNAL_ARGS) { int save_errno = errno; + /* + * If replication has not yet started, die like with SIGTERM. If + * replication is active, only set a flag and wake up the main loop. It + * will send any outstanding WAL, and then exit gracefully. + */ + if (!replication_active) + kill(MyProcPid, SIGTERM); + walsender_ready_to_stop = true; if (MyWalSnd) SetLatch(&MyWalSnd->latch); |