diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 472 |
1 files changed, 380 insertions, 92 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 62135037f10..303edb75a32 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -15,6 +15,14 @@ * WalRcv->receivedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * + * If the primary server ends streaming, but doesn't disconnect, walreceiver + * goes into "waiting" mode, and waits for the startup process to give new + * instructions. The startup process will treat that the same as + * disconnection, and will rescan the archive/pg_xlog directory. But when the + * startup process wants to try streaming replication again, it will just + * nudge the existing walreceiver process that's waiting, instead of launching + * a new one. + * * Normal termination is by SIGTERM, which instructs the walreceiver to * exit(0). Emergency termination is by SIGQUIT; like any postmaster child * process, the walreceiver will simply abort and exit on SIGQUIT. A close @@ -38,6 +46,7 @@ #include <signal.h> #include <unistd.h> +#include "access/timeline.h" #include "access/xlog_internal.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" @@ -60,6 +69,10 @@ bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; +walrcv_identify_system_type walrcv_identify_system = NULL; +walrcv_startstreaming_type walrcv_startstreaming = NULL; +walrcv_endstreaming_type walrcv_endstreaming = NULL; +walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL; walrcv_receive_type walrcv_receive = NULL; walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; @@ -118,6 +131,8 @@ static volatile bool WalRcvImmediateInterruptOK = false; static void ProcessWalRcvInterrupts(void); static void EnableWalRcvImmediateExit(void); static void DisableWalRcvImmediateExit(void); +static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); +static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); @@ -128,6 +143,7 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvSigUsr1Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); @@ -171,6 +187,10 @@ WalReceiverMain(void) { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; + TimeLineID startpointTLI; + TimeLineID primaryTLI; + bool first_stream; + /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; @@ -207,17 +227,21 @@ WalReceiverMain(void) /* The usual case */ break; - case WALRCV_RUNNING: + case WALRCV_WAITING: + case WALRCV_STREAMING: + case WALRCV_RESTARTING: + default: /* Shouldn't happen */ elog(PANIC, "walreceiver still running according to shared memory state"); } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STREAMING; /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); startpoint = walrcv->receiveStart; + startpointTLI = walrcv->receiveStartTLI; /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp(); @@ -227,6 +251,8 @@ WalReceiverMain(void) /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); + OwnLatch(&walrcv->latch); + /* * If possible, make this process a group leader, so that the postmaster * can signal any child processes too. (walreceiver probably never has @@ -246,7 +272,7 @@ WalReceiverMain(void) pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR1, WalRcvSigUsr1Handler); pqsignal(SIGUSR2, SIG_IGN); /* Reset some signals that are accepted by postmaster but not here */ @@ -261,8 +287,12 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_send == NULL || walrcv_disconnect == NULL) + if (walrcv_connect == NULL || walrcv_startstreaming == NULL || + walrcv_endstreaming == NULL || + walrcv_identify_system == NULL || + walrcv_readtimelinehistoryfile == NULL || + walrcv_receive == NULL || walrcv_send == NULL || + walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* @@ -276,122 +306,360 @@ WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); - walrcv_connect(conninfo, startpoint); + walrcv_connect(conninfo); DisableWalRcvImmediateExit(); - /* Initialize LogstreamResult and buffers for processing messages */ - LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); - initStringInfo(&reply_message); - initStringInfo(&incoming_message); - - /* Initialize the last recv timestamp */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; - - /* Loop until end-of-streaming or error */ + first_stream = true; for (;;) { - unsigned char type; - char *buf; - int len; - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. + * Check that we're connected to a valid server using the + * IDENTIFY_SYSTEM replication command, */ - if (!PostmasterIsAlive()) - exit(1); + EnableWalRcvImmediateExit(); + walrcv_identify_system(&primaryTLI); + DisableWalRcvImmediateExit(); /* - * Exit walreceiver if we're not in recovery. This should not happen, - * but cross-check the status here. + * Confirm that the current timeline of the primary is the same or + * ahead of ours. */ - if (!RecoveryInProgress()) - ereport(FATAL, - (errmsg("cannot continue WAL streaming, recovery has already ended"))); - - /* Process any requests or signals received recently */ - ProcessWalRcvInterrupts(); + if (primaryTLI < startpointTLI) + ereport(ERROR, + (errmsg("highest timeline %u of the primary is behind recovery timeline %u", + primaryTLI, startpointTLI))); - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } + /* + * Get any missing history files. We do this always, even when we're + * not interested in that timeline, so that if we're promoted to become + * the master later on, we don't select the same timeline that was + * already used in the current master. This isn't bullet-proof - you'll + * need some external software to manage your cluster if you need to + * ensure that a unique timeline id is chosen in every case, but let's + * avoid the confusion of timeline id collisions where we can. + */ + WalRcvFetchTimeLineHistoryFiles(startpointTLI + 1, primaryTLI); - /* Wait a while for data to arrive */ - if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) + /* + * Start streaming. + * + * We'll try to start at the requested starting point and timeline, + * even if it's different from the server's latest timeline. In case + * we've already reached the end of the old timeline, the server will + * finish the streaming immediately, and we will go back to await + * orders from the startup process. If recovery_target_timeline is + * 'latest', the startup process will scan pg_xlog and find the new + * history file, bump recovery target timeline, and ask us to restart + * on the new timeline. + */ + ThisTimeLineID = startpointTLI; + if (walrcv_startstreaming(startpointTLI, startpoint)) { - /* Something was received from master, so reset timeout */ + bool endofwal = false; + + if (first_stream) + ereport(LOG, + (errmsg("started streaming WAL from primary at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); + else + ereport(LOG, + (errmsg("restarted WAL streaming at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); + first_stream = false; + + /* Initialize LogstreamResult and buffers for processing messages */ + LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(); + initStringInfo(&reply_message); + initStringInfo(&incoming_message); + + /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; - /* Accept the received data, and process it */ - XLogWalRcvProcessMsg(type, buf, len); - - /* Receive any more data we can without sleeping */ - while (walrcv_receive(0, &type, &buf, &len)) + /* Loop until end-of-streaming or error */ + while (!endofwal) { - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; - XLogWalRcvProcessMsg(type, buf, len); - } + char *buf; + int len; - /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + /* + * Emergency bailout if postmaster has died. This is to avoid + * the necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not + * happen, but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errmsg("cannot continue WAL streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Wait a while for data to arrive */ + len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + if (len != 0) + { + /* + * Process the received data, and any subsequent data we + * can read without blocking. + */ + for (;;) + { + if (len > 0) + { + /* Something was received from master, so reset timeout */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); + } + else if (len == 0) + break; + else if (len < 0) + { + ereport(LOG, + (errmsg("replication terminated by primary server"), + errdetail("End of WAL reached on timeline %u", startpointTLI))); + endofwal = true; + break; + } + len = walrcv_receive(0, &buf); + } + + /* Let the master know that we received some data. */ + XLogWalRcvSendReply(false, false); + + /* + * If we've written some records, flush them to disk and + * let the startup process and primary server know about + * them. + */ + XLogWalRcvFlush(false); + } + else + { + /* + * We didn't receive anything new. If we haven't heard + * anything from the server for more than + * wal_receiver_timeout / 2, ping the server. Also, if it's + * been longer than wal_receiver_status_interval since the + * last update we sent, send a status update to the master + * anyway, to report any progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from standby has + * reached the configured limit. + */ + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = + TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout/2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendHSFeedback(); + } + } /* - * If we've written some records, flush them to disk and let the - * startup process and primary server know about them. + * The backend finished streaming. Exit streaming COPY-mode from + * our side, too. */ + EnableWalRcvImmediateExit(); + walrcv_endstreaming(); + DisableWalRcvImmediateExit(); + } + else + ereport(LOG, + (errmsg("primary server contains no more WAL on requested timeline %u", + startpointTLI))); + + /* + * End of WAL reached on the requested timeline. Close the last + * segment, and await for new orders from the startup process. + */ + if (recvFile >= 0) + { XLogWalRcvFlush(false); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + XLogFileNameP(recvFileTLI, recvSegNo)))); } + recvFile = -1; + + elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); + WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + } + /* not reached */ +} + +/* + * Wait for startup process to set receiveStart and receiveStartTLI. + */ +static void +WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + int state; + + SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + if (state != WALRCV_STREAMING) + { + SpinLockRelease(&walrcv->mutex); + if (state == WALRCV_STOPPING) + proc_exit(0); else + elog(FATAL, "unexpected walreceiver state"); + } + walrcv->walRcvState = WALRCV_WAITING; + walrcv->receiveStart = InvalidXLogRecPtr; + walrcv->receiveStartTLI = 0; + SpinLockRelease(&walrcv->mutex); + + if (update_process_title) + set_ps_display("idle", false); + + /* + * nudge startup process to notice that we've stopped streaming and are + * now waiting for instructions. + */ + WakeupRecovery(); + for (;;) + { + ResetLatch(&walrcv->latch); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + ProcessWalRcvInterrupts(); + + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_WAITING || + walrcv->walRcvState == WALRCV_STOPPING); + if (walrcv->walRcvState == WALRCV_RESTARTING) + { + /* we don't expect primary_conninfo to change */ + *startpoint = walrcv->receiveStart; + *startpointTLI = walrcv->receiveStartTLI; + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + break; + } + if (walrcv->walRcvState == WALRCV_STOPPING) { /* - * We didn't receive anything new. If we haven't heard anything - * from the server for more than wal_receiver_timeout / 2, - * ping the server. Also, if it's been longer than - * wal_receiver_status_interval since the last update we sent, - * send a status update to the master anyway, to report any - * progress in applying WAL. + * We should've received SIGTERM if the startup process wants + * us to die, but might as well check it here too. */ - bool requestReply = false; + SpinLockRelease(&walrcv->mutex); + exit(1); + } + SpinLockRelease(&walrcv->mutex); - /* - * Check if time since last receive from standby has reached the - * configured limit. - */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; + WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + } + + if (update_process_title) + { + char activitymsg[50]; - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", + (uint32) (*startpoint >> 32), + (uint32) *startpoint); + set_ps_display(activitymsg, false); + } +} - if (now >= timeout) - ereport(ERROR, - (errmsg("terminating walreceiver due to timeout"))); +/* + * Fetch any missing timeline history files between 'first' and 'last' + * (inclusive) from the server. + */ +static void +WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) +{ + TimeLineID tli; - /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. - */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout/2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } - } + for (tli = first; tli <= last; tli++) + { + if (!existsTimeLineHistory(tli)) + { + char *fname; + char *content; + int len; + char expectedfname[MAXFNAMELEN]; - XLogWalRcvSendReply(requestReply, requestReply); - XLogWalRcvSendHSFeedback(); + ereport(LOG, + (errmsg("fetching timeline history file for timeline %u from primary server", + tli))); + + EnableWalRcvImmediateExit(); + walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); + DisableWalRcvImmediateExit(); + + /* + * Check that the filename on the master matches what we calculated + * ourselves. This is just a sanity check, it should always match. + */ + TLHistoryFileName(expectedfname, tli); + if (strcmp(fname, expectedfname) != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u", + tli))); + + /* + * Write the file to pg_xlog. + */ + writeTimeLineHistoryFile(tli, content, len); + + pfree(fname); + pfree(content); } } } @@ -408,9 +676,15 @@ WalRcvDie(int code, Datum arg) /* Ensure that all WAL records received are flushed to disk */ XLogWalRcvFlush(true); + DisownLatch(&walrcv->latch); + SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING || + Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_STARTING || + walrcv->walRcvState == WALRCV_WAITING || walrcv->walRcvState == WALRCV_STOPPING); + Assert(walrcv->pid == MyProcPid); walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); @@ -418,6 +692,9 @@ WalRcvDie(int code, Datum arg) /* Terminate the connection gracefully. */ if (walrcv_disconnect != NULL) walrcv_disconnect(); + + /* Wake up the startup process to notice promptly that we're gone */ + WakeupRecovery(); } /* SIGHUP: set flag to re-read config file at next convenient time */ @@ -427,6 +704,14 @@ WalRcvSigHupHandler(SIGNAL_ARGS) got_SIGHUP = true; } + +/* SIGUSR1: used by latch mechanism */ +static void +WalRcvSigUsr1Handler(SIGNAL_ARGS) +{ + latch_sigusr1_handler(); +} + /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalRcvShutdownHandler(SIGNAL_ARGS) @@ -435,6 +720,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS) got_SIGTERM = true; + SetLatch(&WalRcv->latch); + /* Don't joggle the elbow of proc_exit */ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) ProcessWalRcvInterrupts(); @@ -661,6 +948,7 @@ XLogWalRcvFlush(bool dying) { walrcv->latestChunkStart = walrcv->receivedUpto; walrcv->receivedUpto = LogstreamResult.Flush; + walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -738,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyPtr = GetXLogReplayRecPtr(); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); |