diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 836 |
1 files changed, 135 insertions, 701 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 51a515a5552..70cfabc2367 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -30,6 +30,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -548,7 +549,6 @@ static int readFile = -1; static XLogSegNo readSegNo = 0; static uint32 readOff = 0; static uint32 readLen = 0; -static bool readFileHeaderValidated = false; static XLogSource readSource = 0; /* XLOG_FROM_* code */ /* @@ -561,6 +561,13 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; +} XLogPageReadPrivate; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -572,18 +579,9 @@ static bool lastSourceFailed = false; static TimestampTz XLogReceiptTime = 0; static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */ -/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ -static char *readBuf = NULL; - -/* Buffer for current ReadRecord result (expandable) */ -static char *readRecordBuf = NULL; -static uint32 readRecordBufSize = 0; - /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */ -static TimeLineID lastPageTLI = 0; -static TimeLineID lastSegmentTLI = 0; static XLogRecPtr minRecoveryPoint; /* local copy of * ControlFile->minRecoveryPoint */ @@ -627,8 +625,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notexistOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); -static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, - bool randAccess); +static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, + int reqLen, char *readBuf, TimeLineID *readTLI); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -639,12 +637,11 @@ static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); +static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, + int emode, bool fetching_ckpt); static void CheckRecoveryConsistency(void); -static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly); -static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, - int emode, bool randAccess); -static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); +static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, + XLogRecPtr RecPtr, int whichChkpt); static bool rescanLatestTimeLine(void); static void WriteControlFile(void); static void ReadControlFile(void); @@ -2652,9 +2649,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, if (source != XLOG_FROM_STREAM) XLogReceiptTime = GetCurrentTimestamp(); - /* The file header needs to be validated on first access */ - readFileHeaderValidated = false; - return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ @@ -2709,7 +2703,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE) { - fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true); + fd = XLogFileRead(segno, emode, tli, + XLOG_FROM_ARCHIVE, true); if (fd != -1) { elog(DEBUG1, "got WAL segment from archive"); @@ -2721,7 +2716,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG) { - fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true); + fd = XLogFileRead(segno, emode, tli, + XLOG_FROM_PG_XLOG, true); if (fd != -1) { if (!expectedTLEs) @@ -3178,102 +3174,6 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, } /* - * CRC-check an XLOG record. We do not believe the contents of an XLOG - * record (other than to the minimal extent of computing the amount of - * data to read in) until we've checked the CRCs. - * - * We assume all of the record (that is, xl_tot_len bytes) has been read - * into memory at *record. Also, ValidXLogRecordHeader() has accepted the - * record's header, which means in particular that xl_tot_len is at least - * SizeOfXlogRecord, so it is safe to fetch xl_len. - */ -static bool -RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) -{ - pg_crc32 crc; - int i; - uint32 len = record->xl_len; - BkpBlock bkpb; - char *blk; - size_t remaining = record->xl_tot_len; - - /* First the rmgr data */ - if (remaining < SizeOfXLogRecord + len) - { - /* ValidXLogRecordHeader() should've caught this already... */ - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid record length at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - remaining -= SizeOfXLogRecord + len; - INIT_CRC32(crc); - COMP_CRC32(crc, XLogRecGetData(record), len); - - /* Add in the backup blocks, if any */ - blk = (char *) XLogRecGetData(record) + len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - uint32 blen; - - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - if (remaining < sizeof(BkpBlock)) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - memcpy(&bkpb, blk, sizeof(BkpBlock)); - - if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect hole size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; - - if (remaining < blen) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - remaining -= blen; - COMP_CRC32(crc, blk, blen); - blk += blen; - } - - /* Check that xl_tot_len agrees with our calculation */ - if (remaining != 0) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect total length in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - - /* Finally include the record header */ - COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc)); - FIN_CRC32(crc); - - if (!EQ_CRC32(record->xl_crc, crc)) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect resource manager data checksum in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - - return true; -} - -/* * Attempt to read an XLOG record. * * If RecPtr is not NULL, try to read a record at that position. Otherwise @@ -3286,511 +3186,68 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) +ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, + bool fetching_ckpt) { XLogRecord *record; - XLogRecPtr tmpRecPtr = EndRecPtr; - bool randAccess = false; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; - - if (readBuf == NULL) - { - /* - * First time through, permanently allocate readBuf. We do it this - * way, rather than just making a static array, for two reasons: (1) - * no need to waste the storage in most instantiations of the backend; - * (2) a static char array isn't guaranteed to have any particular - * alignment, whereas malloc() will provide MAXALIGN'd storage. - */ - readBuf = (char *) malloc(XLOG_BLCKSZ); - Assert(readBuf != NULL); - } - - if (RecPtr == NULL) - { - RecPtr = &tmpRecPtr; + XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - /* - * RecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that - * until we've read in the page, since the header size is variable. - */ - } - else - { - /* - * In this case, the passed-in record pointer should already be - * pointing to a valid record starting position. - */ - if (!XRecOffIsValid(*RecPtr)) - ereport(PANIC, - (errmsg("invalid record offset at %X/%X", - (uint32) (*RecPtr >> 32), (uint32) *RecPtr))); - - /* - * Since we are going to a random position in WAL, forget any prior - * state about what timeline we were in, and allow it to be any - * timeline in expectedTLEs. We also set a flag to allow curFileTLI - * to go backwards (but we can't reset that variable right here, since - * we might not change files at all). - */ - /* see comment in ValidXLogPageHeader */ - lastPageTLI = lastSegmentTLI = 0; - randAccess = true; /* allow curFileTLI to go backwards too */ - } + /* Pass through parameters to XLogPageRead */ + private->fetching_ckpt = fetching_ckpt; + private->emode = emode; + private->randAccess = (RecPtr != InvalidXLogRecPtr); /* This is the first try to read this page. */ lastSourceFailed = false; -retry: - /* Read the page containing the record */ - if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) - return NULL; - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - targetRecOff = (*RecPtr) % XLOG_BLCKSZ; - if (targetRecOff == 0) - { - /* - * At page start, so skip over page header. The Assert checks that - * we're not scribbling on caller's record pointer; it's OK because we - * can only get here in the continuing-from-prev-record case, since - * XRecOffIsValid rejected the zero-page-offset case otherwise. - */ - Assert(RecPtr == &tmpRecPtr); - (*RecPtr) += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) + do { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record offset at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } - if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("contrecord is requested by %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } + char *errormsg; - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of - * the struct, so it must be on this page (the records are MAXALIGNed), - * but we cannot access any other fields until we've verified that we - * got the whole header. - */ - record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ); - total_len = record->xl_tot_len; - - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) - goto next_record_is_invalid; - gotheader = true; - } - else - { - if (total_len < SizeOfXLogRecord) + record = XLogReadRecord(xlogreader, RecPtr, &errormsg); + ReadRecPtr = xlogreader->ReadRecPtr; + EndRecPtr = xlogreader->EndRecPtr; + if (record == NULL) { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } - gotheader = false; - } + /* not all failures fill errormsg; report those that do */ + if (errormsg && errormsg[0] != '\0') + ereport(emode_for_corrupt_record(emode, + RecPtr ? RecPtr : EndRecPtr), + (errmsg_internal("%s", errormsg) /* already translated */)); - /* - * Allocate or enlarge readRecordBuf as needed. To avoid useless small - * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure - * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with. (That is - * enough for all "normal" records, but very large commit or abort records - * might need more space.) - */ - if (total_len > readRecordBufSize) - { - uint32 newSize = total_len; - - newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); - newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ)); - if (readRecordBuf) - free(readRecordBuf); - readRecordBuf = (char *) malloc(newSize); - if (!readRecordBuf) - { - readRecordBufSize = 0; - /* We treat this as a "bogus data" condition */ - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record length %u at %X/%X too long", - total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } - readRecordBufSize = newSize; - } + lastSourceFailed = true; - len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contrecord; - XLogPageHeader pageHeader; - XLogRecPtr pagelsn; - char *buffer; - uint32 gotlen; - - /* Initialize pagelsn to the beginning of the page this record is on */ - pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - - /* Copy the first fragment of the record from the first page. */ - memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len); - buffer = readRecordBuf + len; - gotlen = len; - - do - { - /* Calculate pointer to beginning of next page */ - pagelsn += XLOG_BLCKSZ; - /* Wait for the next page to become available */ - if (!XLogPageRead(&pagelsn, emode, false, false)) - return NULL; - - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("there is no contrecord flag in log segment %s, offset %u", - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - goto next_record_is_invalid; - } - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + if (readFile >= 0) { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid contrecord length %u in log segment %s, offset %u", - pageHeader->xlp_rem_len, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - goto next_record_is_invalid; + close(readFile); + readFile = -1; } - - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); - contrecord = (char *) readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; - memcpy(buffer, (char *) contrecord, len); - buffer += len; - gotlen += len; - - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) readRecordBuf; - if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) - goto next_record_is_invalid; - gotheader = true; - } - } while (pageHeader->xlp_rem_len > len); - - record = (XLogRecord *) readRecordBuf; - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - XLogSegNoOffsetToRecPtr( - readSegNo, - readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len), - EndRecPtr); - ReadRecPtr = *RecPtr; - } - else - { - /* Record does not cross a page boundary */ - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - EndRecPtr = *RecPtr + MAXALIGN(total_len); - - ReadRecPtr = *RecPtr; - memcpy(readRecordBuf, record, total_len); - } - - /* - * Special processing if it's an XLOG SWITCH record - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - /* Pretend it extends to end of segment */ - EndRecPtr += XLogSegSize - 1; - EndRecPtr -= EndRecPtr % XLogSegSize; - - /* - * Pretend that readBuf contains the last page of the segment. This is - * just to avoid Assert failure in StartupXLOG if XLOG ends with this - * segment. - */ - readOff = XLogSegSize - XLOG_BLCKSZ; - } - return record; - -next_record_is_invalid: - lastSourceFailed = true; - - if (readFile >= 0) - { - close(readFile); - readFile = -1; - } - - /* In standby-mode, keep trying */ - if (StandbyMode) - goto retry; - else - return NULL; -} - -/* - * Check whether the xlog header of a page just read in looks valid. - * - * This is just a convenience subroutine to avoid duplicated code in - * ReadRecord. It's not intended for use from anywhere else. - */ -static bool -ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly) -{ - XLogRecPtr recaddr; - - XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr); - - if (hdr->xlp_magic != XLOG_PAGE_MAGIC) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid magic number %04X in log segment %s, offset %u", - hdr->xlp_magic, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid info bits %04X in log segment %s, offset %u", - hdr->xlp_info, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - if (hdr->xlp_info & XLP_LONG_HEADER) - { - XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; - - if (longhdr->xlp_sysid != ControlFile->system_identifier) - { - char fhdrident_str[32]; - char sysident_str[32]; - - /* - * Format sysids separately to keep platform-dependent format code - * out of the translatable message string. - */ - snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, - longhdr->xlp_sysid); - snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, - ControlFile->system_identifier); - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.", - fhdrident_str, sysident_str))); - return false; - } - if (longhdr->xlp_seg_size != XLogSegSize) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("Incorrect XLOG_SEG_SIZE in page header."))); - return false; - } - if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("Incorrect XLOG_BLCKSZ in page header."))); - return false; + break; } - } - else if (readOff == 0) - { - /* hmm, first page of file doesn't have a long header? */ - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid info bits %04X in log segment %s, offset %u", - hdr->xlp_info, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - - if (hdr->xlp_pageaddr != recaddr) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u", - (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - /* - * Check page TLI is one of the expected values. - */ - if (!tliInHistory(hdr->xlp_tli, expectedTLEs)) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("unexpected timeline ID %u in log segment %s, offset %u", - hdr->xlp_tli, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - - /* - * Since child timelines are always assigned a TLI greater than their - * immediate parent's TLI, we should never see TLI go backwards across - * successive pages of a consistent WAL sequence. - * - * Of course this check should only be applied when advancing sequentially - * across pages; therefore ReadRecord resets lastPageTLI and - * lastSegmentTLI to zero when going to a random page. - * - * Sometimes we re-open a segment that's already been partially replayed. - * In that case we cannot perform the normal TLI check: if there is a - * timeline switch within the segment, the first page has a smaller TLI - * than later pages following the timeline switch, and we might've read - * them already. As a weaker test, we still check that it's not smaller - * than the TLI we last saw at the beginning of a segment. Pass - * segmentonly = true when re-validating the first page like that, and the - * page you're actually interested in comes later. - */ - if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI)) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", - hdr->xlp_tli, - segmentonly ? lastSegmentTLI : lastPageTLI, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - lastPageTLI = hdr->xlp_tli; - if (readOff == 0) - lastSegmentTLI = hdr->xlp_tli; - - return true; -} - -/* - * Validate an XLOG record header. - * - * This is just a convenience subroutine to avoid duplicated code in - * ReadRecord. It's not intended for use from anywhere else. - */ -static bool -ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode, - bool randAccess) -{ - /* - * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is - * required. - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - if (record->xl_len != 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid xlog switch record at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - } - else if (record->xl_len == 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with zero length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || - record->xl_tot_len > SizeOfXLogRecord + record->xl_len + - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (record->xl_rmid > RM_MAX_ID) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid resource manager ID %u at %X/%X", - record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (randAccess) - { /* - * We can't exactly verify the prev-link, but surely it should be less - * than the record's own address. + * Check page TLI is one of the expected values. */ - if (!(record->xl_prev < *RecPtr)) + if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs)) { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev, - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); + char fname[MAXFNAMELEN]; + XLogSegNo segno; + int32 offset; + + XLByteToSeg(xlogreader->latestPagePtr, segno); + offset = xlogreader->latestPagePtr % XLogSegSize; + XLogFileName(fname, xlogreader->readPageTLI, segno); + ereport(emode_for_corrupt_record(emode, + RecPtr ? RecPtr : EndRecPtr), + (errmsg("unexpected timeline ID %u in log segment %s, offset %u", + xlogreader->latestPageTLI, + fname, + offset))); return false; } - } - else - { - /* - * Record's prev-link should exactly match our previous location. This - * check guards against torn WAL pages where a stale but valid-looking - * WAL record starts on a sector boundary. - */ - if (record->xl_prev != ReadRecPtr) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev, - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - } + } while (StandbyMode && record == NULL); - return true; + return record; } /* @@ -5235,6 +4692,8 @@ StartupXLOG(void) bool backupEndRequired = false; bool backupFromStandby = false; DBState dbstate_at_startup; + XLogReaderState *xlogreader; + XLogPageReadPrivate private; /* * Read control file and check XLOG status looks valid. @@ -5351,6 +4810,16 @@ StartupXLOG(void) if (StandbyMode) OwnLatch(&XLogCtl->recoveryWakeupLatch); + /* Set up XLOG reader facility */ + MemSet(&private, 0, sizeof(XLogPageReadPrivate)); + xlogreader = XLogReaderAllocate(&XLogPageRead, &private); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor"))); + xlogreader->system_identifier = ControlFile->system_identifier; + if (read_backup_label(&checkPointLoc, &backupEndRequired, &backupFromStandby)) { @@ -5358,7 +4827,7 @@ StartupXLOG(void) * When a backup_label file is present, we want to roll forward from * the checkpoint it identifies, rather than using pg_control. */ - record = ReadCheckpointRecord(checkPointLoc, 0); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0); if (record != NULL) { memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); @@ -5376,7 +4845,7 @@ StartupXLOG(void) */ if (checkPoint.redo < checkPointLoc) { - if (!ReadRecord(&(checkPoint.redo), LOG, false)) + if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir))); @@ -5400,7 +4869,7 @@ StartupXLOG(void) */ checkPointLoc = ControlFile->checkPoint; RedoStartLSN = ControlFile->checkPointCopy.redo; - record = ReadCheckpointRecord(checkPointLoc, 1); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1); if (record != NULL) { ereport(DEBUG1, @@ -5419,7 +4888,7 @@ StartupXLOG(void) else { checkPointLoc = ControlFile->prevCheckPoint; - record = ReadCheckpointRecord(checkPointLoc, 2); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2); if (record != NULL) { ereport(LOG, @@ -5777,12 +5246,12 @@ StartupXLOG(void) if (checkPoint.redo < RecPtr) { /* back up to find the record */ - record = ReadRecord(&(checkPoint.redo), PANIC, false); + record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(NULL, LOG, false); + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } if (record != NULL) @@ -5963,7 +5432,7 @@ StartupXLOG(void) break; /* Else, try to fetch the next WAL record */ - record = ReadRecord(NULL, LOG, false); + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); /* @@ -6013,7 +5482,7 @@ StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(&LastRec, PANIC, false); + record = ReadRecord(xlogreader, LastRec, PANIC, false); EndOfLog = EndRecPtr; XLByteToPrevSeg(EndOfLog, endLogSegNo); @@ -6117,7 +5586,7 @@ StartupXLOG(void) * we will use that below.) */ if (InArchiveRecovery) - exitArchiveRecovery(curFileTLI, endLogSegNo); + exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo); /* * Prepare to write WAL starting at EndOfLog position, and init xlog @@ -6136,8 +5605,15 @@ StartupXLOG(void) * record spans, not the one it starts in. The last block is indeed the * one we want to use. */ - Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); - memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ); + if (EndOfLog % XLOG_BLCKSZ == 0) + { + memset(Insert->currpage, 0, XLOG_BLCKSZ); + } + else + { + Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); + memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ); + } Insert->currpos = (char *) Insert->currpage + (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]); @@ -6288,23 +5764,13 @@ StartupXLOG(void) if (standbyState != STANDBY_DISABLED) ShutdownRecoveryTransactionEnvironment(); - /* Shut down readFile facility, free space */ + /* Shut down xlogreader */ if (readFile >= 0) { close(readFile); readFile = -1; } - if (readBuf) - { - free(readBuf); - readBuf = NULL; - } - if (readRecordBuf) - { - free(readRecordBuf); - readRecordBuf = NULL; - readRecordBufSize = 0; - } + XLogReaderFree(xlogreader); /* * If any of the critical GUCs have changed, log them before we allow @@ -6554,7 +6020,8 @@ LocalSetXLogInsertAllowed(void) * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label) */ static XLogRecord * -ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) +ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, + int whichChkpt) { XLogRecord *record; @@ -6578,7 +6045,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = ReadRecord(&RecPtr, LOG, true); + record = ReadRecord(xlogreader, RecPtr, LOG, true); if (record == NULL) { @@ -9313,7 +8780,9 @@ CancelBackup(void) /* * Read the XLOG page containing RecPtr into readBuf (if not read already). - * Returns true if the page is read successfully. + * Returns number of bytes read, if the page is read successfully, or -1 + * in case of errors. When errors occur, they are ereport'ed, but only + * if they have not been previously reported. * * This is responsible for restoring files from archive as needed, as well * as for waiting for the requested WAL record to arrive in standby mode. @@ -9332,28 +8801,24 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static bool -XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, - bool randAccess) +static int +XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + char *readBuf, TimeLineID *readTLI) { + XLogPageReadPrivate *private = + (XLogPageReadPrivate *) xlogreader->private_data; + int emode = private->emode; uint32 targetPageOff; - uint32 targetRecOff; - XLogSegNo targetSegNo; - - XLByteToSeg(*RecPtr, targetSegNo); - targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - targetRecOff = (*RecPtr) % XLOG_BLCKSZ; + XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; - /* Fast exit if we have read the record in the current buffer already */ - if (!lastSourceFailed && targetSegNo == readSegNo && - targetPageOff == readOff && targetRecOff < readLen) - return true; + XLByteToSeg(targetPagePtr, targetSegNo); + targetPageOff = targetPagePtr % XLogSegSize; /* * See if we need to switch to a new segment because the requested record * is not in the currently open one. */ - if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo)) + if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -9374,39 +8839,34 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, readSource = 0; } - XLByteToSeg(*RecPtr, readSegNo); + XLByteToSeg(targetPagePtr, readSegNo); retry: /* See if we need to retrieve more data */ if (readFile < 0 || - (readSource == XLOG_FROM_STREAM && receivedUpto <= *RecPtr)) + (readSource == XLOG_FROM_STREAM && + receivedUpto <= targetPagePtr + reqLen)) { if (StandbyMode) { - if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess, - fetching_ckpt)) + if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, + private->randAccess, + private->fetching_ckpt)) goto triggered; } - else + /* In archive or crash recovery. */ + else if (readFile < 0) { - /* In archive or crash recovery. */ - if (readFile < 0) - { - int source; + int source; - /* Reset curFileTLI if random fetch. */ - if (randAccess) - curFileTLI = 0; - - if (InArchiveRecovery) - source = XLOG_FROM_ANY; - else - source = XLOG_FROM_PG_XLOG; + if (InArchiveRecovery) + source = XLOG_FROM_ANY; + else + source = XLOG_FROM_PG_XLOG; - readFile = XLogFileReadAnyTLI(readSegNo, emode, source); - if (readFile < 0) - return false; - } + readFile = XLogFileReadAnyTLI(readSegNo, emode, source); + if (readFile < 0) + return -1; } } @@ -9424,72 +8884,46 @@ retry: */ if (readSource == XLOG_FROM_STREAM) { - if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) - { + if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; - } else readLen = receivedUpto % XLogSegSize - targetPageOff; } else readLen = XLOG_BLCKSZ; - if (!readFileHeaderValidated && targetPageOff != 0) - { - /* - * Whenever switching to a new WAL segment, we read the first page of - * the file and validate its header, even if that's not where the - * target record is. This is so that we can check the additional - * identification info that is present in the first page's "long" - * header. - */ - readOff = 0; - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - char fname[MAXFNAMELEN]; - XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true)) - goto next_record_is_invalid; - } - /* Read the requested page */ readOff = targetPageOff; if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) { char fname[MAXFNAMELEN]; + XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - fname, readOff))); + fname, readOff))); goto next_record_is_invalid; } + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; + XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, readOff))); goto next_record_is_invalid; } - if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false)) - goto next_record_is_invalid; - - readFileHeaderValidated = true; Assert(targetSegNo == readSegNo); Assert(targetPageOff == readOff); - Assert(targetRecOff < readLen); + Assert(reqLen <= readLen); - return true; + *readTLI = curFileTLI; + return readLen; next_record_is_invalid: lastSourceFailed = true; @@ -9504,7 +8938,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; else - return false; + return -1; triggered: if (readFile >= 0) @@ -9513,7 +8947,7 @@ triggered: readLen = 0; readSource = 0; - return false; + return -1; } /* |