aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c836
1 files changed, 135 insertions, 701 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 51a515a5552..70cfabc2367 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -30,6 +30,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@@ -548,7 +549,6 @@ static int readFile = -1;
static XLogSegNo readSegNo = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
-static bool readFileHeaderValidated = false;
static XLogSource readSource = 0; /* XLOG_FROM_* code */
/*
@@ -561,6 +561,13 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */
static XLogSource currentSource = 0; /* XLOG_FROM_* code */
static bool lastSourceFailed = false;
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+} XLogPageReadPrivate;
+
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -572,18 +579,9 @@ static bool lastSourceFailed = false;
static TimestampTz XLogReceiptTime = 0;
static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
-
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; /* start of last record read */
static XLogRecPtr EndRecPtr; /* end+1 of last record read */
-static TimeLineID lastPageTLI = 0;
-static TimeLineID lastSegmentTLI = 0;
static XLogRecPtr minRecoveryPoint; /* local copy of
* ControlFile->minRecoveryPoint */
@@ -627,8 +625,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notexistOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
- bool randAccess);
+static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+ int reqLen, char *readBuf, TimeLineID *readTLI);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -639,12 +637,11 @@ static void UpdateLastRemovedPtr(char *filename);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
-static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
- int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
+ XLogRecPtr RecPtr, int whichChkpt);
static bool rescanLatestTimeLine(void);
static void WriteControlFile(void);
static void ReadControlFile(void);
@@ -2652,9 +2649,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
- /* The file header needs to be validated on first access */
- readFileHeaderValidated = false;
-
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -2709,7 +2703,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
{
- fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
+ fd = XLogFileRead(segno, emode, tli,
+ XLOG_FROM_ARCHIVE, true);
if (fd != -1)
{
elog(DEBUG1, "got WAL segment from archive");
@@ -2721,7 +2716,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
{
- fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
+ fd = XLogFileRead(segno, emode, tli,
+ XLOG_FROM_PG_XLOG, true);
if (fd != -1)
{
if (!expectedTLEs)
@@ -3178,102 +3174,6 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
}
/*
- * CRC-check an XLOG record. We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
- pg_crc32 crc;
- int i;
- uint32 len = record->xl_len;
- BkpBlock bkpb;
- char *blk;
- size_t remaining = record->xl_tot_len;
-
- /* First the rmgr data */
- if (remaining < SizeOfXLogRecord + len)
- {
- /* ValidXLogRecordHeader() should've caught this already... */
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid record length at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- remaining -= SizeOfXLogRecord + len;
- INIT_CRC32(crc);
- COMP_CRC32(crc, XLogRecGetData(record), len);
-
- /* Add in the backup blocks, if any */
- blk = (char *) XLogRecGetData(record) + len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- uint32 blen;
-
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- if (remaining < sizeof(BkpBlock))
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- memcpy(&bkpb, blk, sizeof(BkpBlock));
-
- if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect hole size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
- if (remaining < blen)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- remaining -= blen;
- COMP_CRC32(crc, blk, blen);
- blk += blen;
- }
-
- /* Check that xl_tot_len agrees with our calculation */
- if (remaining != 0)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect total length in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
-
- /* Finally include the record header */
- COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
- FIN_CRC32(crc);
-
- if (!EQ_CRC32(record->xl_crc, crc))
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect resource manager data checksum in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
-
- return true;
-}
-
-/*
* Attempt to read an XLOG record.
*
* If RecPtr is not NULL, try to read a record at that position. Otherwise
@@ -3286,511 +3186,68 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
* the returned record pointer always points there.
*/
static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+ bool fetching_ckpt)
{
XLogRecord *record;
- XLogRecPtr tmpRecPtr = EndRecPtr;
- bool randAccess = false;
- uint32 len,
- total_len;
- uint32 targetRecOff;
- uint32 pageHeaderSize;
- bool gotheader;
-
- if (readBuf == NULL)
- {
- /*
- * First time through, permanently allocate readBuf. We do it this
- * way, rather than just making a static array, for two reasons: (1)
- * no need to waste the storage in most instantiations of the backend;
- * (2) a static char array isn't guaranteed to have any particular
- * alignment, whereas malloc() will provide MAXALIGN'd storage.
- */
- readBuf = (char *) malloc(XLOG_BLCKSZ);
- Assert(readBuf != NULL);
- }
-
- if (RecPtr == NULL)
- {
- RecPtr = &tmpRecPtr;
+ XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
- /*
- * RecPtr is pointing to end+1 of the previous WAL record. If
- * we're at a page boundary, no more records can fit on the current
- * page. We must skip over the page header, but we can't do that
- * until we've read in the page, since the header size is variable.
- */
- }
- else
- {
- /*
- * In this case, the passed-in record pointer should already be
- * pointing to a valid record starting position.
- */
- if (!XRecOffIsValid(*RecPtr))
- ereport(PANIC,
- (errmsg("invalid record offset at %X/%X",
- (uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
- /*
- * Since we are going to a random position in WAL, forget any prior
- * state about what timeline we were in, and allow it to be any
- * timeline in expectedTLEs. We also set a flag to allow curFileTLI
- * to go backwards (but we can't reset that variable right here, since
- * we might not change files at all).
- */
- /* see comment in ValidXLogPageHeader */
- lastPageTLI = lastSegmentTLI = 0;
- randAccess = true; /* allow curFileTLI to go backwards too */
- }
+ /* Pass through parameters to XLogPageRead */
+ private->fetching_ckpt = fetching_ckpt;
+ private->emode = emode;
+ private->randAccess = (RecPtr != InvalidXLogRecPtr);
/* This is the first try to read this page. */
lastSourceFailed = false;
-retry:
- /* Read the page containing the record */
- if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
- return NULL;
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
- if (targetRecOff == 0)
- {
- /*
- * At page start, so skip over page header. The Assert checks that
- * we're not scribbling on caller's record pointer; it's OK because we
- * can only get here in the continuing-from-prev-record case, since
- * XRecOffIsValid rejected the zero-page-offset case otherwise.
- */
- Assert(RecPtr == &tmpRecPtr);
- (*RecPtr) += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
+ do
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record offset at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("contrecord is requested by %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
+ char *errormsg;
- /*
- * Read the record length.
- *
- * NB: Even though we use an XLogRecord pointer here, the whole record
- * header might not fit on this page. xl_tot_len is the first field of
- * the struct, so it must be on this page (the records are MAXALIGNed),
- * but we cannot access any other fields until we've verified that we
- * got the whole header.
- */
- record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
- total_len = record->xl_tot_len;
-
- /*
- * If the whole record header is on this page, validate it immediately.
- * Otherwise do just a basic sanity check on xl_tot_len, and validate the
- * rest of the header after reading it from the next page. The xl_tot_len
- * check is necessary here to ensure that we enter the "Need to reassemble
- * record" code path below; otherwise we might fail to apply
- * ValidXLogRecordHeader at all.
- */
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
- goto next_record_is_invalid;
- gotheader = true;
- }
- else
- {
- if (total_len < SizeOfXLogRecord)
+ record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+ ReadRecPtr = xlogreader->ReadRecPtr;
+ EndRecPtr = xlogreader->EndRecPtr;
+ if (record == NULL)
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- gotheader = false;
- }
+ /* not all failures fill errormsg; report those that do */
+ if (errormsg && errormsg[0] != '\0')
+ ereport(emode_for_corrupt_record(emode,
+ RecPtr ? RecPtr : EndRecPtr),
+ (errmsg_internal("%s", errormsg) /* already translated */));
- /*
- * Allocate or enlarge readRecordBuf as needed. To avoid useless small
- * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
- * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with. (That is
- * enough for all "normal" records, but very large commit or abort records
- * might need more space.)
- */
- if (total_len > readRecordBufSize)
- {
- uint32 newSize = total_len;
-
- newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
- newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
- if (readRecordBuf)
- free(readRecordBuf);
- readRecordBuf = (char *) malloc(newSize);
- if (!readRecordBuf)
- {
- readRecordBufSize = 0;
- /* We treat this as a "bogus data" condition */
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record length %u at %X/%X too long",
- total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- readRecordBufSize = newSize;
- }
+ lastSourceFailed = true;
- len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
- if (total_len > len)
- {
- /* Need to reassemble record */
- char *contrecord;
- XLogPageHeader pageHeader;
- XLogRecPtr pagelsn;
- char *buffer;
- uint32 gotlen;
-
- /* Initialize pagelsn to the beginning of the page this record is on */
- pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
- /* Copy the first fragment of the record from the first page. */
- memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
- buffer = readRecordBuf + len;
- gotlen = len;
-
- do
- {
- /* Calculate pointer to beginning of next page */
- pagelsn += XLOG_BLCKSZ;
- /* Wait for the next page to become available */
- if (!XLogPageRead(&pagelsn, emode, false, false))
- return NULL;
-
- /* Check that the continuation on next page looks valid */
- pageHeader = (XLogPageHeader) readBuf;
- if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("there is no contrecord flag in log segment %s, offset %u",
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- goto next_record_is_invalid;
- }
- /*
- * Cross-check that xlp_rem_len agrees with how much of the record
- * we expect there to be left.
- */
- if (pageHeader->xlp_rem_len == 0 ||
- total_len != (pageHeader->xlp_rem_len + gotlen))
+ if (readFile >= 0)
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid contrecord length %u in log segment %s, offset %u",
- pageHeader->xlp_rem_len,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- goto next_record_is_invalid;
+ close(readFile);
+ readFile = -1;
}
-
- /* Append the continuation from this page to the buffer */
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
- contrecord = (char *) readBuf + pageHeaderSize;
- len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len < len)
- len = pageHeader->xlp_rem_len;
- memcpy(buffer, (char *) contrecord, len);
- buffer += len;
- gotlen += len;
-
- /* If we just reassembled the record header, validate it. */
- if (!gotheader)
- {
- record = (XLogRecord *) readRecordBuf;
- if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
- goto next_record_is_invalid;
- gotheader = true;
- }
- } while (pageHeader->xlp_rem_len > len);
-
- record = (XLogRecord *) readRecordBuf;
- if (!RecordIsValid(record, *RecPtr, emode))
- goto next_record_is_invalid;
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- XLogSegNoOffsetToRecPtr(
- readSegNo,
- readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
- EndRecPtr);
- ReadRecPtr = *RecPtr;
- }
- else
- {
- /* Record does not cross a page boundary */
- if (!RecordIsValid(record, *RecPtr, emode))
- goto next_record_is_invalid;
- EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
- ReadRecPtr = *RecPtr;
- memcpy(readRecordBuf, record, total_len);
- }
-
- /*
- * Special processing if it's an XLOG SWITCH record
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- /* Pretend it extends to end of segment */
- EndRecPtr += XLogSegSize - 1;
- EndRecPtr -= EndRecPtr % XLogSegSize;
-
- /*
- * Pretend that readBuf contains the last page of the segment. This is
- * just to avoid Assert failure in StartupXLOG if XLOG ends with this
- * segment.
- */
- readOff = XLogSegSize - XLOG_BLCKSZ;
- }
- return record;
-
-next_record_is_invalid:
- lastSourceFailed = true;
-
- if (readFile >= 0)
- {
- close(readFile);
- readFile = -1;
- }
-
- /* In standby-mode, keep trying */
- if (StandbyMode)
- goto retry;
- else
- return NULL;
-}
-
-/*
- * Check whether the xlog header of a page just read in looks valid.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
-{
- XLogRecPtr recaddr;
-
- XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr);
-
- if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid magic number %04X in log segment %s, offset %u",
- hdr->xlp_magic,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid info bits %04X in log segment %s, offset %u",
- hdr->xlp_info,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- if (hdr->xlp_info & XLP_LONG_HEADER)
- {
- XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
-
- if (longhdr->xlp_sysid != ControlFile->system_identifier)
- {
- char fhdrident_str[32];
- char sysident_str[32];
-
- /*
- * Format sysids separately to keep platform-dependent format code
- * out of the translatable message string.
- */
- snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
- longhdr->xlp_sysid);
- snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
- ControlFile->system_identifier);
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
- fhdrident_str, sysident_str)));
- return false;
- }
- if (longhdr->xlp_seg_size != XLogSegSize)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
- return false;
- }
- if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("Incorrect XLOG_BLCKSZ in page header.")));
- return false;
+ break;
}
- }
- else if (readOff == 0)
- {
- /* hmm, first page of file doesn't have a long header? */
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid info bits %04X in log segment %s, offset %u",
- hdr->xlp_info,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
-
- if (hdr->xlp_pageaddr != recaddr)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
- (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- /*
- * Check page TLI is one of the expected values.
- */
- if (!tliInHistory(hdr->xlp_tli, expectedTLEs))
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
- hdr->xlp_tli,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
-
- /*
- * Since child timelines are always assigned a TLI greater than their
- * immediate parent's TLI, we should never see TLI go backwards across
- * successive pages of a consistent WAL sequence.
- *
- * Of course this check should only be applied when advancing sequentially
- * across pages; therefore ReadRecord resets lastPageTLI and
- * lastSegmentTLI to zero when going to a random page.
- *
- * Sometimes we re-open a segment that's already been partially replayed.
- * In that case we cannot perform the normal TLI check: if there is a
- * timeline switch within the segment, the first page has a smaller TLI
- * than later pages following the timeline switch, and we might've read
- * them already. As a weaker test, we still check that it's not smaller
- * than the TLI we last saw at the beginning of a segment. Pass
- * segmentonly = true when re-validating the first page like that, and the
- * page you're actually interested in comes later.
- */
- if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
- hdr->xlp_tli,
- segmentonly ? lastSegmentTLI : lastPageTLI,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- lastPageTLI = hdr->xlp_tli;
- if (readOff == 0)
- lastSegmentTLI = hdr->xlp_tli;
-
- return true;
-}
-
-/*
- * Validate an XLOG record header.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
- bool randAccess)
-{
- /*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- if (record->xl_len != 0)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid xlog switch record at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- }
- else if (record->xl_len == 0)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with zero length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (record->xl_rmid > RM_MAX_ID)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid resource manager ID %u at %X/%X",
- record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (randAccess)
- {
/*
- * We can't exactly verify the prev-link, but surely it should be less
- * than the record's own address.
+ * Check page TLI is one of the expected values.
*/
- if (!(record->xl_prev < *RecPtr))
+ if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
+ char fname[MAXFNAMELEN];
+ XLogSegNo segno;
+ int32 offset;
+
+ XLByteToSeg(xlogreader->latestPagePtr, segno);
+ offset = xlogreader->latestPagePtr % XLogSegSize;
+ XLogFileName(fname, xlogreader->readPageTLI, segno);
+ ereport(emode_for_corrupt_record(emode,
+ RecPtr ? RecPtr : EndRecPtr),
+ (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
+ xlogreader->latestPageTLI,
+ fname,
+ offset)));
return false;
}
- }
- else
- {
- /*
- * Record's prev-link should exactly match our previous location. This
- * check guards against torn WAL pages where a stale but valid-looking
- * WAL record starts on a sector boundary.
- */
- if (record->xl_prev != ReadRecPtr)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- }
+ } while (StandbyMode && record == NULL);
- return true;
+ return record;
}
/*
@@ -5235,6 +4692,8 @@ StartupXLOG(void)
bool backupEndRequired = false;
bool backupFromStandby = false;
DBState dbstate_at_startup;
+ XLogReaderState *xlogreader;
+ XLogPageReadPrivate private;
/*
* Read control file and check XLOG status looks valid.
@@ -5351,6 +4810,16 @@ StartupXLOG(void)
if (StandbyMode)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
+ /* Set up XLOG reader facility */
+ MemSet(&private, 0, sizeof(XLogPageReadPrivate));
+ xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating an XLog reading processor")));
+ xlogreader->system_identifier = ControlFile->system_identifier;
+
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
@@ -5358,7 +4827,7 @@ StartupXLOG(void)
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
- record = ReadCheckpointRecord(checkPointLoc, 0);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0);
if (record != NULL)
{
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
@@ -5376,7 +4845,7 @@ StartupXLOG(void)
*/
if (checkPoint.redo < checkPointLoc)
{
- if (!ReadRecord(&(checkPoint.redo), LOG, false))
+ if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
@@ -5400,7 +4869,7 @@ StartupXLOG(void)
*/
checkPointLoc = ControlFile->checkPoint;
RedoStartLSN = ControlFile->checkPointCopy.redo;
- record = ReadCheckpointRecord(checkPointLoc, 1);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1);
if (record != NULL)
{
ereport(DEBUG1,
@@ -5419,7 +4888,7 @@ StartupXLOG(void)
else
{
checkPointLoc = ControlFile->prevCheckPoint;
- record = ReadCheckpointRecord(checkPointLoc, 2);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2);
if (record != NULL)
{
ereport(LOG,
@@ -5777,12 +5246,12 @@ StartupXLOG(void)
if (checkPoint.redo < RecPtr)
{
/* back up to find the record */
- record = ReadRecord(&(checkPoint.redo), PANIC, false);
+ record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = ReadRecord(NULL, LOG, false);
+ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
}
if (record != NULL)
@@ -5963,7 +5432,7 @@ StartupXLOG(void)
break;
/* Else, try to fetch the next WAL record */
- record = ReadRecord(NULL, LOG, false);
+ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
/*
@@ -6013,7 +5482,7 @@ StartupXLOG(void)
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(&LastRec, PANIC, false);
+ record = ReadRecord(xlogreader, LastRec, PANIC, false);
EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogSegNo);
@@ -6117,7 +5586,7 @@ StartupXLOG(void)
* we will use that below.)
*/
if (InArchiveRecovery)
- exitArchiveRecovery(curFileTLI, endLogSegNo);
+ exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo);
/*
* Prepare to write WAL starting at EndOfLog position, and init xlog
@@ -6136,8 +5605,15 @@ StartupXLOG(void)
* record spans, not the one it starts in. The last block is indeed the
* one we want to use.
*/
- Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
- memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
+ if (EndOfLog % XLOG_BLCKSZ == 0)
+ {
+ memset(Insert->currpage, 0, XLOG_BLCKSZ);
+ }
+ else
+ {
+ Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
+ memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
+ }
Insert->currpos = (char *) Insert->currpage +
(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
@@ -6288,23 +5764,13 @@ StartupXLOG(void)
if (standbyState != STANDBY_DISABLED)
ShutdownRecoveryTransactionEnvironment();
- /* Shut down readFile facility, free space */
+ /* Shut down xlogreader */
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
- if (readBuf)
- {
- free(readBuf);
- readBuf = NULL;
- }
- if (readRecordBuf)
- {
- free(readRecordBuf);
- readRecordBuf = NULL;
- readRecordBufSize = 0;
- }
+ XLogReaderFree(xlogreader);
/*
* If any of the critical GUCs have changed, log them before we allow
@@ -6554,7 +6020,8 @@ LocalSetXLogInsertAllowed(void)
* 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
*/
static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ int whichChkpt)
{
XLogRecord *record;
@@ -6578,7 +6045,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
return NULL;
}
- record = ReadRecord(&RecPtr, LOG, true);
+ record = ReadRecord(xlogreader, RecPtr, LOG, true);
if (record == NULL)
{
@@ -9313,7 +8780,9 @@ CancelBackup(void)
/*
* Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns true if the page is read successfully.
+ * Returns number of bytes read, if the page is read successfully, or -1
+ * in case of errors. When errors occur, they are ereport'ed, but only
+ * if they have not been previously reported.
*
* This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode.
@@ -9332,28 +8801,24 @@ CancelBackup(void)
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
- bool randAccess)
+static int
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ char *readBuf, TimeLineID *readTLI)
{
+ XLogPageReadPrivate *private =
+ (XLogPageReadPrivate *) xlogreader->private_data;
+ int emode = private->emode;
uint32 targetPageOff;
- uint32 targetRecOff;
- XLogSegNo targetSegNo;
-
- XLByteToSeg(*RecPtr, targetSegNo);
- targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
+ XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
- /* Fast exit if we have read the record in the current buffer already */
- if (!lastSourceFailed && targetSegNo == readSegNo &&
- targetPageOff == readOff && targetRecOff < readLen)
- return true;
+ XLByteToSeg(targetPagePtr, targetSegNo);
+ targetPageOff = targetPagePtr % XLogSegSize;
/*
* See if we need to switch to a new segment because the requested record
* is not in the currently open one.
*/
- if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+ if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
{
/*
* Request a restartpoint if we've replayed too much xlog since the
@@ -9374,39 +8839,34 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
readSource = 0;
}
- XLByteToSeg(*RecPtr, readSegNo);
+ XLByteToSeg(targetPagePtr, readSegNo);
retry:
/* See if we need to retrieve more data */
if (readFile < 0 ||
- (readSource == XLOG_FROM_STREAM && receivedUpto <= *RecPtr))
+ (readSource == XLOG_FROM_STREAM &&
+ receivedUpto <= targetPagePtr + reqLen))
{
if (StandbyMode)
{
- if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
- fetching_ckpt))
+ if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+ private->randAccess,
+ private->fetching_ckpt))
goto triggered;
}
- else
+ /* In archive or crash recovery. */
+ else if (readFile < 0)
{
- /* In archive or crash recovery. */
- if (readFile < 0)
- {
- int source;
+ int source;
- /* Reset curFileTLI if random fetch. */
- if (randAccess)
- curFileTLI = 0;
-
- if (InArchiveRecovery)
- source = XLOG_FROM_ANY;
- else
- source = XLOG_FROM_PG_XLOG;
+ if (InArchiveRecovery)
+ source = XLOG_FROM_ANY;
+ else
+ source = XLOG_FROM_PG_XLOG;
- readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
- if (readFile < 0)
- return false;
- }
+ readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
+ if (readFile < 0)
+ return -1;
}
}
@@ -9424,72 +8884,46 @@ retry:
*/
if (readSource == XLOG_FROM_STREAM)
{
- if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
- {
+ if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
readLen = XLOG_BLCKSZ;
- }
else
readLen = receivedUpto % XLogSegSize - targetPageOff;
}
else
readLen = XLOG_BLCKSZ;
- if (!readFileHeaderValidated && targetPageOff != 0)
- {
- /*
- * Whenever switching to a new WAL segment, we read the first page of
- * the file and validate its header, even if that's not where the
- * target record is. This is so that we can check the additional
- * identification info that is present in the first page's "long"
- * header.
- */
- readOff = 0;
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- char fname[MAXFNAMELEN];
- XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u: %m",
- fname, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true))
- goto next_record_is_invalid;
- }
-
/* Read the requested page */
readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
char fname[MAXFNAMELEN];
+
XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
+ ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
- fname, readOff)));
+ fname, readOff)));
goto next_record_is_invalid;
}
+
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
+
XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
+ ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
- fname, readOff)));
+ fname, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false))
- goto next_record_is_invalid;
-
- readFileHeaderValidated = true;
Assert(targetSegNo == readSegNo);
Assert(targetPageOff == readOff);
- Assert(targetRecOff < readLen);
+ Assert(reqLen <= readLen);
- return true;
+ *readTLI = curFileTLI;
+ return readLen;
next_record_is_invalid:
lastSourceFailed = true;
@@ -9504,7 +8938,7 @@ next_record_is_invalid:
if (StandbyMode)
goto retry;
else
- return false;
+ return -1;
triggered:
if (readFile >= 0)
@@ -9513,7 +8947,7 @@ triggered:
readLen = 0;
readSource = 0;
- return false;
+ return -1;
}
/*