aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogrecovery.c')
-rw-r--r--src/backend/access/transam/xlogrecovery.c179
1 files changed, 119 insertions, 60 deletions
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 79d38a837c4..54fd10475a7 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -36,6 +36,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
@@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply;
/* XLogReader object used to parse the WAL records */
static XLogReaderState *xlogreader = NULL;
+/* XLogPrefetcher object used to consume WAL records with read-ahead */
+static XLogPrefetcher *xlogprefetcher = NULL;
+
/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
typedef struct XLogPageReadPrivate
{
@@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
- int emode, bool fetching_ckpt, TimeLineID replayTLI);
+static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
+ int emode, bool fetching_ckpt,
+ TimeLineID replayTLI);
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
-static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt,
- XLogRecPtr tliRecPtr,
- TimeLineID replayTLI,
- XLogRecPtr replayLSN);
+static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
+ bool randAccess,
+ bool fetching_ckpt,
+ XLogRecPtr tliRecPtr,
+ TimeLineID replayTLI,
+ XLogRecPtr replayLSN,
+ bool nonblocking);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
-static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
int whichChkpt, bool report, TimeLineID replayTLI);
static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
@@ -562,6 +569,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
xlogreader->system_identifier = ControlFile->system_identifier;
/*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /* Create a WAL prefetcher. */
+ xlogprefetcher = XLogPrefetcherAllocate(xlogreader);
+
+ /*
* Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons:
* (1) no need to waste the storage in most instantiations of the backend;
@@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
- record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 0, true, CheckPointTLI);
+ record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 0, true,
+ CheckPointTLI);
if (record != NULL)
{
memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
@@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
*/
if (checkPoint.redo < CheckPointLoc)
{
- XLogBeginRead(xlogreader, checkPoint.redo);
- if (!ReadRecord(xlogreader, LOG, false,
+ XLogPrefetcherBeginRead(xlogprefetcher, checkPoint.redo);
+ if (!ReadRecord(xlogprefetcher, LOG, false,
checkPoint.ThisTimeLineID))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
@@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID;
RedoStartLSN = ControlFile->checkPointCopy.redo;
RedoStartTLI = ControlFile->checkPointCopy.ThisTimeLineID;
- record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 1, true,
+ record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 1, true,
CheckPointTLI);
if (record != NULL)
{
@@ -1413,8 +1430,8 @@ FinishWalRecovery(void)
lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
}
- XLogBeginRead(xlogreader, lastRec);
- (void) ReadRecord(xlogreader, PANIC, false, lastRecTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
+ (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
endOfLog = xlogreader->EndRecPtr;
/*
@@ -1503,6 +1520,9 @@ ShutdownWalRecovery(void)
{
char recoveryPath[MAXPGPATH];
+ /* Final update of pg_stat_recovery_prefetch. */
+ XLogPrefetcherComputeStats(xlogprefetcher);
+
/* Shut down xlogreader */
if (readFile >= 0)
{
@@ -1510,6 +1530,7 @@ ShutdownWalRecovery(void)
readFile = -1;
}
XLogReaderFree(xlogreader);
+ XLogPrefetcherFree(xlogprefetcher);
if (ArchiveRecoveryRequested)
{
@@ -1593,15 +1614,15 @@ PerformWalRecovery(void)
{
/* back up to find the record */
replayTLI = RedoStartTLI;
- XLogBeginRead(xlogreader, RedoStartLSN);
- record = ReadRecord(xlogreader, PANIC, false, replayTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, RedoStartLSN);
+ record = ReadRecord(xlogprefetcher, PANIC, false, replayTLI);
}
else
{
/* just have to read next record after CheckPoint */
Assert(xlogreader->ReadRecPtr == CheckPointLoc);
replayTLI = CheckPointTLI;
- record = ReadRecord(xlogreader, LOG, false, replayTLI);
+ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
}
if (record != NULL)
@@ -1710,7 +1731,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogreader, LOG, false, replayTLI);
+ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
/*
@@ -1921,6 +1942,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
*/
if (AllowCascadeReplication())
WalSndWakeup();
+
+ /* Reset the prefetcher. */
+ XLogPrefetchReconfigure();
}
}
@@ -2305,7 +2329,8 @@ verifyBackupPageConsistency(XLogReaderState *record)
* temporary page.
*/
buf = XLogReadBufferExtended(rnode, forknum, blkno,
- RBM_NORMAL_NO_LOG);
+ RBM_NORMAL_NO_LOG,
+ InvalidBuffer);
if (!BufferIsValid(buf))
continue;
@@ -2917,17 +2942,18 @@ ConfirmRecoveryPaused(void)
* Attempt to read the next XLOG record.
*
* Before first call, the reader needs to be positioned to the first record
- * by calling XLogBeginRead().
+ * by calling XLogPrefetcherBeginRead().
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, int emode,
+ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
bool fetching_ckpt, TimeLineID replayTLI)
{
XLogRecord *record;
+ XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
/* Pass through parameters to XLogPageRead */
@@ -2943,7 +2969,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
{
char *errormsg;
- record = XLogReadRecord(xlogreader, &errormsg);
+ record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
if (record == NULL)
{
/*
@@ -3056,9 +3082,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
/*
* Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns number of bytes read, if the page is read successfully, or -1
- * in case of errors. When errors occur, they are ereport'ed, but only
- * if they have not been previously reported.
+ * Returns number of bytes read, if the page is read successfully, or
+ * XLREAD_FAIL in case of errors. When errors occur, they are ereport'ed, but
+ * only if they have not been previously reported.
+ *
+ * While prefetching, xlogreader->nonblocking may be set. In that case,
+ * returns XLREAD_WOULDBLOCK if we'd otherwise have to wait for more WAL.
*
* This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode.
@@ -3066,7 +3095,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
* 'emode' specifies the log level used for reporting "file not found" or
* "end of WAL" situations in archive recovery, or in standby mode when a
* trigger file is found. If set to WARNING or below, XLogPageRead() returns
- * false in those situations, on higher log levels the ereport() won't
+ * XLREAD_FAIL in those situations, on higher log levels the ereport() won't
* return.
*
* In standby mode, if after a successful return of XLogPageRead() the
@@ -3125,20 +3154,31 @@ retry:
(readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen))
{
- if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
- private->randAccess,
- private->fetching_ckpt,
- targetRecPtr,
- private->replayTLI,
- xlogreader->EndRecPtr))
+ if (readFile >= 0 &&
+ xlogreader->nonblocking &&
+ readSource == XLOG_FROM_STREAM &&
+ flushedUpto < targetPagePtr + reqLen)
+ return XLREAD_WOULDBLOCK;
+
+ switch (WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+ private->randAccess,
+ private->fetching_ckpt,
+ targetRecPtr,
+ private->replayTLI,
+ xlogreader->EndRecPtr,
+ xlogreader->nonblocking))
{
- if (readFile >= 0)
- close(readFile);
- readFile = -1;
- readLen = 0;
- readSource = XLOG_FROM_ANY;
-
- return -1;
+ case XLREAD_WOULDBLOCK:
+ return XLREAD_WOULDBLOCK;
+ case XLREAD_FAIL:
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readLen = 0;
+ readSource = XLOG_FROM_ANY;
+ return XLREAD_FAIL;
+ case XLREAD_SUCCESS:
+ break;
}
}
@@ -3263,7 +3303,7 @@ next_record_is_invalid:
if (StandbyMode)
goto retry;
else
- return -1;
+ return XLREAD_FAIL;
}
/*
@@ -3292,14 +3332,18 @@ next_record_is_invalid:
* available.
*
* When the requested record becomes available, the function opens the file
- * containing it (if not open already), and returns true. When end of standby
- * mode is triggered by the user, and there is no more WAL available, returns
- * false.
+ * containing it (if not open already), and returns XLREAD_SUCCESS. When end
+ * of standby mode is triggered by the user, and there is no more WAL
+ * available, returns XLREAD_FAIL.
+ *
+ * If nonblocking is true, then give up immediately if we can't satisfy the
+ * request, returning XLREAD_WOULDBLOCK instead of waiting.
*/
-static bool
+static XLogPageReadResult
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr,
- TimeLineID replayTLI, XLogRecPtr replayLSN)
+ TimeLineID replayTLI, XLogRecPtr replayLSN,
+ bool nonblocking)
{
static TimestampTz last_fail_time = 0;
TimestampTz now;
@@ -3353,6 +3397,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
if (lastSourceFailed)
{
+ /*
+ * Don't allow any retry loops to occur during nonblocking
+ * readahead. Let the caller process everything that has been
+ * decoded already first.
+ */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
@@ -3367,7 +3419,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (StandbyMode && CheckForStandbyTrigger())
{
XLogShutdownWalRcv();
- return false;
+ return XLREAD_FAIL;
}
/*
@@ -3375,7 +3427,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* and pg_wal.
*/
if (!StandbyMode)
- return false;
+ return XLREAD_FAIL;
/*
* Move to XLOG_FROM_STREAM state, and set to start a
@@ -3519,7 +3571,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
currentSource);
if (readFile >= 0)
- return true; /* success! */
+ return XLREAD_SUCCESS; /* success! */
/*
* Nope, not found in archive or pg_wal.
@@ -3674,11 +3726,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM;
- return true;
+ return XLREAD_SUCCESS;
}
break;
}
+ /* In nonblocking mode, return rather than sleeping. */
+ if (nonblocking)
+ return XLREAD_WOULDBLOCK;
+
/*
* Data not here yet. Check for trigger, then wait for
* walreceiver to wake us up when new WAL arrives.
@@ -3686,13 +3742,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (CheckForStandbyTrigger())
{
/*
- * Note that we don't "return false" immediately here.
- * After being triggered, we still want to replay all
- * the WAL that was already streamed. It's in pg_wal
- * now, so we just treat this as a failure, and the
- * state machine will move on to replay the streamed
- * WAL from pg_wal, and then recheck the trigger and
- * exit replay.
+ * Note that we don't return XLREAD_FAIL immediately
+ * here. After being triggered, we still want to
+ * replay all the WAL that was already streamed. It's
+ * in pg_wal now, so we just treat this as a failure,
+ * and the state machine will move on to replay the
+ * streamed WAL from pg_wal, and then recheck the
+ * trigger and exit replay.
*/
lastSourceFailed = true;
break;
@@ -3711,6 +3767,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
streaming_reply_sent = true;
}
+ /* Update pg_stat_recovery_prefetch before sleeping. */
+ XLogPrefetcherComputeStats(xlogprefetcher);
+
/*
* Wait for more WAL to arrive. Time out after 5 seconds
* to react to a trigger file promptly and to check if the
@@ -3743,7 +3802,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
HandleStartupProcInterrupts();
}
- return false; /* not reached */
+ return XLREAD_FAIL; /* not reached */
}
@@ -3788,7 +3847,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
* 1 for "primary", 0 for "other" (backup_label)
*/
static XLogRecord *
-ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr,
int whichChkpt, bool report, TimeLineID replayTLI)
{
XLogRecord *record;
@@ -3815,8 +3874,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
return NULL;
}
- XLogBeginRead(xlogreader, RecPtr);
- record = ReadRecord(xlogreader, LOG, true, replayTLI);
+ XLogPrefetcherBeginRead(xlogprefetcher, RecPtr);
+ record = ReadRecord(xlogprefetcher, LOG, true, replayTLI);
if (record == NULL)
{