diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/twophase.c | 3 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 36 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 79 |
3 files changed, 71 insertions, 47 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6f7ee0c947d..5adf956f413 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1338,7 +1338,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); - record = XLogReadRecord(xlogreader, lsn, &errormsg); + XLogBeginRead(xlogreader, lsn); + record = XLogReadRecord(xlogreader, &errormsg); if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7f4f784c0eb..882d5e8a73f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -897,7 +897,7 @@ static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +static XLogRecord *ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt); static void CheckRecoveryConsistency(void); static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, @@ -4246,17 +4246,17 @@ CleanupBackupHistory(void) } /* - * Attempt to read an XLOG record. + * Attempt to read the next XLOG record. * - * If RecPtr is valid, try to read a record at that position. Otherwise - * try to read a record just after the last one previously read. + * Before first call, the reader needs to be positioned to the first record + * by calling XLogBeginRead(). * * If no valid record is available, returns NULL, or fails if emode is PANIC. * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, +ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; @@ -4265,7 +4265,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; private->emode = emode; - private->randAccess = (RecPtr != InvalidXLogRecPtr); + private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4274,7 +4274,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, { char *errormsg; - record = XLogReadRecord(xlogreader, RecPtr, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -4292,8 +4292,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, * shouldn't loop anymore in that case. */ if (errormsg) - ereport(emode_for_corrupt_record(emode, - RecPtr ? RecPtr : EndRecPtr), + ereport(emode_for_corrupt_record(emode, EndRecPtr), (errmsg_internal("%s", errormsg) /* already translated */ )); } @@ -4311,8 +4310,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, wal_segment_size); XLogFileName(fname, xlogreader->seg.ws_tli, segno, wal_segment_size); - ereport(emode_for_corrupt_record(emode, - RecPtr ? RecPtr : EndRecPtr), + ereport(emode_for_corrupt_record(emode, EndRecPtr), (errmsg("unexpected timeline ID %u in log segment %s, offset %u", xlogreader->latestPageTLI, fname, @@ -6427,7 +6425,8 @@ StartupXLOG(void) */ if (checkPoint.redo < checkPointLoc) { - if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false)) + XLogBeginRead(xlogreader, checkPoint.redo); + if (!ReadRecord(xlogreader, LOG, false)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n" @@ -7034,12 +7033,13 @@ StartupXLOG(void) if (checkPoint.redo < RecPtr) { /* back up to find the record */ - record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); + XLogBeginRead(xlogreader, checkPoint.redo); + record = ReadRecord(xlogreader, PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + record = ReadRecord(xlogreader, LOG, false); } if (record != NULL) @@ -7263,7 +7263,7 @@ StartupXLOG(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); /* @@ -7365,7 +7365,8 @@ StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(xlogreader, LastRec, PANIC, false); + XLogBeginRead(xlogreader, LastRec); + record = ReadRecord(xlogreader, PANIC, false); EndOfLog = EndRecPtr; /* @@ -8094,7 +8095,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, return NULL; } - record = ReadRecord(xlogreader, RecPtr, LOG, true); + XLogBeginRead(xlogreader, RecPtr); + record = ReadRecord(xlogreader, LOG, true); if (record == NULL) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3aa68127a3e..32f02256edb 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -219,10 +219,33 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, } /* + * Begin reading WAL at 'RecPtr'. + * + * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at + * the beginning of a page is also OK, if there is a new record right after + * the page header, i.e. not a continuation. + * + * This does not make any attempt to read the WAL yet, and hence cannot fail. + * If the starting address is not correct, the first call to XLogReadRecord() + * will error out. + */ +void +XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) +{ + Assert(!XLogRecPtrIsInvalid(RecPtr)); + + ResetDecoder(state); + + /* Begin at the passed-in record pointer. */ + state->EndRecPtr = RecPtr; + state->ReadRecPtr = InvalidXLogRecPtr; +} + +/* * Attempt to read an XLOG record. * - * If RecPtr is valid, try to read a record at that position. Otherwise - * try to read a record just after the last one previously read. + * XLogBeginRead() or XLogFindNextRecord() must be called before the first call + * to XLogReadRecord(). * * If the read_page callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg @@ -235,8 +258,9 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, * valid until the next call to XLogReadRecord. */ XLogRecord * -XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) +XLogReadRecord(XLogReaderState *state, char **errormsg) { + XLogRecPtr RecPtr; XLogRecord *record; XLogRecPtr targetPagePtr; bool randAccess; @@ -260,19 +284,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) ResetDecoder(state); - if (RecPtr == InvalidXLogRecPtr) - { - /* No explicit start point; read the record after the one we just read */ - RecPtr = state->EndRecPtr; + RecPtr = state->EndRecPtr; - if (state->ReadRecPtr == InvalidXLogRecPtr) - randAccess = true; + if (state->ReadRecPtr != InvalidXLogRecPtr) + { + /* read the record after the one we just read */ /* - * RecPtr is pointing to end+1 of the previous WAL record. If we're - * at a page boundary, no more records can fit on the current page. We - * must skip over the page header, but we can't do that until we've - * read in the page, since the header size is variable. + * EndRecPtr is pointing to end+1 of the previous WAL record. If + * we're at a page boundary, no more records can fit on the current + * page. We must skip over the page header, but we can't do that until + * we've read in the page, since the header size is variable. */ } else @@ -280,8 +302,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) /* * Caller supplied a position to start at. * - * In this case, the passed-in record pointer should already be - * pointing to a valid record starting position. + * In this case, EndRecPtr should already be pointing to a valid + * record starting position. */ Assert(XRecOffIsValid(RecPtr)); randAccess = true; @@ -899,14 +921,17 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, /* * Find the first record with an lsn >= RecPtr. * - * Useful for checking whether RecPtr is a valid xlog address for reading, and - * to find the first valid address after some address when dumping records for - * debugging purposes. + * This is different from XLogBeginRead() in that RecPtr doesn't need to point + * to a valid record boundary. Useful for checking whether RecPtr is a valid + * xlog address for reading, and to find the first valid address after some + * address when dumping records for debugging purposes. + * + * This positions the reader, like XLogBeginRead(), so that the next call to + * XLogReadRecord() will read the next valid record. */ XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) { - XLogReaderState saved_state = *state; XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; @@ -991,27 +1016,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL) + XLogBeginRead(state, tmpRecPtr); + while (XLogReadRecord(state, &errormsg) != NULL) { - /* continue after the record */ - tmpRecPtr = InvalidXLogRecPtr; - /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) { + /* Rewind the reader to the beginning of the last record. */ found = state->ReadRecPtr; - goto out; + XLogBeginRead(state, found); + return found; } } err: -out: - /* Reset state to what we had before finding the record */ - state->ReadRecPtr = saved_state.ReadRecPtr; - state->EndRecPtr = saved_state.EndRecPtr; XLogReaderInvalReadState(state); - return found; + return InvalidXLogRecPtr; } #endif /* FRONTEND */ |