diff options
Diffstat (limited to 'src/backend/access/transam/xlogrecovery.c')
-rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 179 |
1 files changed, 119 insertions, 60 deletions
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 79d38a837c4..54fd10475a7 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -36,6 +36,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" @@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply; /* XLogReader object used to parse the WAL records */ static XLogReaderState *xlogreader = NULL; +/* XLogPrefetcher object used to consume WAL records with read-ahead */ +static XLogPrefetcher *xlogprefetcher = NULL; + /* Parameters passed down from ReadRecord to the XLogPageRead callback. */ typedef struct XLogPageReadPrivate { @@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); static void ConfirmRecoveryPaused(void); -static XLogRecord *ReadRecord(XLogReaderState *xlogreader, - int emode, bool fetching_ckpt, TimeLineID replayTLI); +static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, + int emode, bool fetching_ckpt, + TimeLineID replayTLI); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, - XLogRecPtr tliRecPtr, - TimeLineID replayTLI, - XLogRecPtr replayLSN); +static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, + bool randAccess, + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + TimeLineID replayTLI, + XLogRecPtr replayLSN, + bool nonblocking); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); -static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, int whichChkpt, bool report, TimeLineID replayTLI); static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, @@ -562,6 +569,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, xlogreader->system_identifier = ControlFile->system_identifier; /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* Create a WAL prefetcher. */ + xlogprefetcher = XLogPrefetcherAllocate(xlogreader); + + /* * Allocate two page buffers dedicated to WAL consistency checks. We do * it this way, rather than just making static arrays, for two reasons: * (1) no need to waste the storage in most instantiations of the backend; @@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, * When a backup_label file is present, we want to roll forward from * the checkpoint it identifies, rather than using pg_control. */ - record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 0, true, CheckPointTLI); + record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 0, true, + CheckPointTLI); if (record != NULL) { memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); @@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, */ if (checkPoint.redo < CheckPointLoc) { - XLogBeginRead(xlogreader, checkPoint.redo); - if (!ReadRecord(xlogreader, LOG, false, + XLogPrefetcherBeginRead(xlogprefetcher, checkPoint.redo); + if (!ReadRecord(xlogprefetcher, LOG, false, checkPoint.ThisTimeLineID)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), @@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID; RedoStartLSN = ControlFile->checkPointCopy.redo; RedoStartTLI = ControlFile->checkPointCopy.ThisTimeLineID; - record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 1, true, + record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 1, true, CheckPointTLI); if (record != NULL) { @@ -1413,8 +1430,8 @@ FinishWalRecovery(void) lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr; lastRecTLI = XLogRecoveryCtl->lastReplayedTLI; } - XLogBeginRead(xlogreader, lastRec); - (void) ReadRecord(xlogreader, PANIC, false, lastRecTLI); + XLogPrefetcherBeginRead(xlogprefetcher, lastRec); + (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI); endOfLog = xlogreader->EndRecPtr; /* @@ -1503,6 +1520,9 @@ ShutdownWalRecovery(void) { char recoveryPath[MAXPGPATH]; + /* Final update of pg_stat_recovery_prefetch. */ + XLogPrefetcherComputeStats(xlogprefetcher); + /* Shut down xlogreader */ if (readFile >= 0) { @@ -1510,6 +1530,7 @@ ShutdownWalRecovery(void) readFile = -1; } XLogReaderFree(xlogreader); + XLogPrefetcherFree(xlogprefetcher); if (ArchiveRecoveryRequested) { @@ -1593,15 +1614,15 @@ PerformWalRecovery(void) { /* back up to find the record */ replayTLI = RedoStartTLI; - XLogBeginRead(xlogreader, RedoStartLSN); - record = ReadRecord(xlogreader, PANIC, false, replayTLI); + XLogPrefetcherBeginRead(xlogprefetcher, RedoStartLSN); + record = ReadRecord(xlogprefetcher, PANIC, false, replayTLI); } else { /* just have to read next record after CheckPoint */ Assert(xlogreader->ReadRecPtr == CheckPointLoc); replayTLI = CheckPointTLI; - record = ReadRecord(xlogreader, LOG, false, replayTLI); + record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } if (record != NULL) @@ -1710,7 +1731,7 @@ PerformWalRecovery(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogreader, LOG, false, replayTLI); + record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); /* @@ -1921,6 +1942,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ if (AllowCascadeReplication()) WalSndWakeup(); + + /* Reset the prefetcher. */ + XLogPrefetchReconfigure(); } } @@ -2305,7 +2329,8 @@ verifyBackupPageConsistency(XLogReaderState *record) * temporary page. */ buf = XLogReadBufferExtended(rnode, forknum, blkno, - RBM_NORMAL_NO_LOG); + RBM_NORMAL_NO_LOG, + InvalidBuffer); if (!BufferIsValid(buf)) continue; @@ -2917,17 +2942,18 @@ ConfirmRecoveryPaused(void) * Attempt to read the next XLOG record. * * Before first call, the reader needs to be positioned to the first record - * by calling XLogBeginRead(). + * by calling XLogPrefetcherBeginRead(). * * If no valid record is available, returns NULL, or fails if emode is PANIC. * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, int emode, +ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, TimeLineID replayTLI) { XLogRecord *record; + XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher); XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; /* Pass through parameters to XLogPageRead */ @@ -2943,7 +2969,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, { char *errormsg; - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg); if (record == NULL) { /* @@ -3056,9 +3082,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode, /* * Read the XLOG page containing RecPtr into readBuf (if not read already). - * Returns number of bytes read, if the page is read successfully, or -1 - * in case of errors. When errors occur, they are ereport'ed, but only - * if they have not been previously reported. + * Returns number of bytes read, if the page is read successfully, or + * XLREAD_FAIL in case of errors. When errors occur, they are ereport'ed, but + * only if they have not been previously reported. + * + * While prefetching, xlogreader->nonblocking may be set. In that case, + * returns XLREAD_WOULDBLOCK if we'd otherwise have to wait for more WAL. * * This is responsible for restoring files from archive as needed, as well * as for waiting for the requested WAL record to arrive in standby mode. @@ -3066,7 +3095,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, * 'emode' specifies the log level used for reporting "file not found" or * "end of WAL" situations in archive recovery, or in standby mode when a * trigger file is found. If set to WARNING or below, XLogPageRead() returns - * false in those situations, on higher log levels the ereport() won't + * XLREAD_FAIL in those situations, on higher log levels the ereport() won't * return. * * In standby mode, if after a successful return of XLogPageRead() the @@ -3125,20 +3154,31 @@ retry: (readSource == XLOG_FROM_STREAM && flushedUpto < targetPagePtr + reqLen)) { - if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr, - private->replayTLI, - xlogreader->EndRecPtr)) + if (readFile >= 0 && + xlogreader->nonblocking && + readSource == XLOG_FROM_STREAM && + flushedUpto < targetPagePtr + reqLen) + return XLREAD_WOULDBLOCK; + + switch (WaitForWALToBecomeAvailable(targetPagePtr + reqLen, + private->randAccess, + private->fetching_ckpt, + targetRecPtr, + private->replayTLI, + xlogreader->EndRecPtr, + xlogreader->nonblocking)) { - if (readFile >= 0) - close(readFile); - readFile = -1; - readLen = 0; - readSource = XLOG_FROM_ANY; - - return -1; + case XLREAD_WOULDBLOCK: + return XLREAD_WOULDBLOCK; + case XLREAD_FAIL: + if (readFile >= 0) + close(readFile); + readFile = -1; + readLen = 0; + readSource = XLOG_FROM_ANY; + return XLREAD_FAIL; + case XLREAD_SUCCESS: + break; } } @@ -3263,7 +3303,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; else - return -1; + return XLREAD_FAIL; } /* @@ -3292,14 +3332,18 @@ next_record_is_invalid: * available. * * When the requested record becomes available, the function opens the file - * containing it (if not open already), and returns true. When end of standby - * mode is triggered by the user, and there is no more WAL available, returns - * false. + * containing it (if not open already), and returns XLREAD_SUCCESS. When end + * of standby mode is triggered by the user, and there is no more WAL + * available, returns XLREAD_FAIL. + * + * If nonblocking is true, then give up immediately if we can't satisfy the + * request, returning XLREAD_WOULDBLOCK instead of waiting. */ -static bool +static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr, - TimeLineID replayTLI, XLogRecPtr replayLSN) + TimeLineID replayTLI, XLogRecPtr replayLSN, + bool nonblocking) { static TimestampTz last_fail_time = 0; TimestampTz now; @@ -3353,6 +3397,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (lastSourceFailed) { + /* + * Don't allow any retry loops to occur during nonblocking + * readahead. Let the caller process everything that has been + * decoded already first. + */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + switch (currentSource) { case XLOG_FROM_ARCHIVE: @@ -3367,7 +3419,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (StandbyMode && CheckForStandbyTrigger()) { XLogShutdownWalRcv(); - return false; + return XLREAD_FAIL; } /* @@ -3375,7 +3427,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and pg_wal. */ if (!StandbyMode) - return false; + return XLREAD_FAIL; /* * Move to XLOG_FROM_STREAM state, and set to start a @@ -3519,7 +3571,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY : currentSource); if (readFile >= 0) - return true; /* success! */ + return XLREAD_SUCCESS; /* success! */ /* * Nope, not found in archive or pg_wal. @@ -3674,11 +3726,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, /* just make sure source info is correct... */ readSource = XLOG_FROM_STREAM; XLogReceiptSource = XLOG_FROM_STREAM; - return true; + return XLREAD_SUCCESS; } break; } + /* In nonblocking mode, return rather than sleeping. */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + /* * Data not here yet. Check for trigger, then wait for * walreceiver to wake us up when new WAL arrives. @@ -3686,13 +3742,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (CheckForStandbyTrigger()) { /* - * Note that we don't "return false" immediately here. - * After being triggered, we still want to replay all - * the WAL that was already streamed. It's in pg_wal - * now, so we just treat this as a failure, and the - * state machine will move on to replay the streamed - * WAL from pg_wal, and then recheck the trigger and - * exit replay. + * Note that we don't return XLREAD_FAIL immediately + * here. After being triggered, we still want to + * replay all the WAL that was already streamed. It's + * in pg_wal now, so we just treat this as a failure, + * and the state machine will move on to replay the + * streamed WAL from pg_wal, and then recheck the + * trigger and exit replay. */ lastSourceFailed = true; break; @@ -3711,6 +3767,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, streaming_reply_sent = true; } + /* Update pg_stat_recovery_prefetch before sleeping. */ + XLogPrefetcherComputeStats(xlogprefetcher); + /* * Wait for more WAL to arrive. Time out after 5 seconds * to react to a trigger file promptly and to check if the @@ -3743,7 +3802,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, HandleStartupProcInterrupts(); } - return false; /* not reached */ + return XLREAD_FAIL; /* not reached */ } @@ -3788,7 +3847,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) * 1 for "primary", 0 for "other" (backup_label) */ static XLogRecord * -ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, int whichChkpt, bool report, TimeLineID replayTLI) { XLogRecord *record; @@ -3815,8 +3874,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, return NULL; } - XLogBeginRead(xlogreader, RecPtr); - record = ReadRecord(xlogreader, LOG, true, replayTLI); + XLogPrefetcherBeginRead(xlogprefetcher, RecPtr); + record = ReadRecord(xlogprefetcher, LOG, true, replayTLI); if (record == NULL) { |