aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/transam/twophase.c3
-rw-r--r--src/backend/access/transam/xlog.c36
-rw-r--r--src/backend/access/transam/xlogreader.c79
3 files changed, 71 insertions, 47 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6f7ee0c947d..5adf956f413 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1338,7 +1338,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
- record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ XLogBeginRead(xlogreader, lsn);
+ record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7f4f784c0eb..882d5e8a73f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -897,7 +897,7 @@ static void UpdateLastRemovedPtr(char *filename);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
@@ -4246,17 +4246,17 @@ CleanupBackupHistory(void)
}
/*
- * Attempt to read an XLOG record.
+ * Attempt to read the next XLOG record.
*
- * If RecPtr is valid, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
+ * Before first call, the reader needs to be positioned to the first record
+ * by calling XLogBeginRead().
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
@@ -4265,7 +4265,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
private->emode = emode;
- private->randAccess = (RecPtr != InvalidXLogRecPtr);
+ private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr);
/* This is the first attempt to read this page. */
lastSourceFailed = false;
@@ -4274,7 +4274,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
{
char *errormsg;
- record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+ record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
@@ -4292,8 +4292,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
* shouldn't loop anymore in that case.
*/
if (errormsg)
- ereport(emode_for_corrupt_record(emode,
- RecPtr ? RecPtr : EndRecPtr),
+ ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */ ));
}
@@ -4311,8 +4310,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
wal_segment_size);
XLogFileName(fname, xlogreader->seg.ws_tli, segno,
wal_segment_size);
- ereport(emode_for_corrupt_record(emode,
- RecPtr ? RecPtr : EndRecPtr),
+ ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg("unexpected timeline ID %u in log segment %s, offset %u",
xlogreader->latestPageTLI,
fname,
@@ -6427,7 +6425,8 @@ StartupXLOG(void)
*/
if (checkPoint.redo < checkPointLoc)
{
- if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
+ XLogBeginRead(xlogreader, checkPoint.redo);
+ if (!ReadRecord(xlogreader, LOG, false))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n"
@@ -7034,12 +7033,13 @@ StartupXLOG(void)
if (checkPoint.redo < RecPtr)
{
/* back up to find the record */
- record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
+ XLogBeginRead(xlogreader, checkPoint.redo);
+ record = ReadRecord(xlogreader, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+ record = ReadRecord(xlogreader, LOG, false);
}
if (record != NULL)
@@ -7263,7 +7263,7 @@ StartupXLOG(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+ record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
/*
@@ -7365,7 +7365,8 @@ StartupXLOG(void)
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(xlogreader, LastRec, PANIC, false);
+ XLogBeginRead(xlogreader, LastRec);
+ record = ReadRecord(xlogreader, PANIC, false);
EndOfLog = EndRecPtr;
/*
@@ -8094,7 +8095,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
return NULL;
}
- record = ReadRecord(xlogreader, RecPtr, LOG, true);
+ XLogBeginRead(xlogreader, RecPtr);
+ record = ReadRecord(xlogreader, LOG, true);
if (record == NULL)
{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3aa68127a3e..32f02256edb 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -219,10 +219,33 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
}
/*
+ * Begin reading WAL at 'RecPtr'.
+ *
+ * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at
+ * the beginning of a page is also OK, if there is a new record right after
+ * the page header, i.e. not a continuation.
+ *
+ * This does not make any attempt to read the WAL yet, and hence cannot fail.
+ * If the starting address is not correct, the first call to XLogReadRecord()
+ * will error out.
+ */
+void
+XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
+{
+ Assert(!XLogRecPtrIsInvalid(RecPtr));
+
+ ResetDecoder(state);
+
+ /* Begin at the passed-in record pointer. */
+ state->EndRecPtr = RecPtr;
+ state->ReadRecPtr = InvalidXLogRecPtr;
+}
+
+/*
* Attempt to read an XLOG record.
*
- * If RecPtr is valid, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
+ * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
+ * to XLogReadRecord().
*
* If the read_page callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
@@ -235,8 +258,9 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
* valid until the next call to XLogReadRecord.
*/
XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+XLogReadRecord(XLogReaderState *state, char **errormsg)
{
+ XLogRecPtr RecPtr;
XLogRecord *record;
XLogRecPtr targetPagePtr;
bool randAccess;
@@ -260,19 +284,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
ResetDecoder(state);
- if (RecPtr == InvalidXLogRecPtr)
- {
- /* No explicit start point; read the record after the one we just read */
- RecPtr = state->EndRecPtr;
+ RecPtr = state->EndRecPtr;
- if (state->ReadRecPtr == InvalidXLogRecPtr)
- randAccess = true;
+ if (state->ReadRecPtr != InvalidXLogRecPtr)
+ {
+ /* read the record after the one we just read */
/*
- * RecPtr is pointing to end+1 of the previous WAL record. If we're
- * at a page boundary, no more records can fit on the current page. We
- * must skip over the page header, but we can't do that until we've
- * read in the page, since the header size is variable.
+ * EndRecPtr is pointing to end+1 of the previous WAL record. If
+ * we're at a page boundary, no more records can fit on the current
+ * page. We must skip over the page header, but we can't do that until
+ * we've read in the page, since the header size is variable.
*/
}
else
@@ -280,8 +302,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
/*
* Caller supplied a position to start at.
*
- * In this case, the passed-in record pointer should already be
- * pointing to a valid record starting position.
+ * In this case, EndRecPtr should already be pointing to a valid
+ * record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
randAccess = true;
@@ -899,14 +921,17 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
/*
* Find the first record with an lsn >= RecPtr.
*
- * Useful for checking whether RecPtr is a valid xlog address for reading, and
- * to find the first valid address after some address when dumping records for
- * debugging purposes.
+ * This is different from XLogBeginRead() in that RecPtr doesn't need to point
+ * to a valid record boundary. Useful for checking whether RecPtr is a valid
+ * xlog address for reading, and to find the first valid address after some
+ * address when dumping records for debugging purposes.
+ *
+ * This positions the reader, like XLogBeginRead(), so that the next call to
+ * XLogReadRecord() will read the next valid record.
*/
XLogRecPtr
XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
{
- XLogReaderState saved_state = *state;
XLogRecPtr tmpRecPtr;
XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header;
@@ -991,27 +1016,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation.
*/
- while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+ XLogBeginRead(state, tmpRecPtr);
+ while (XLogReadRecord(state, &errormsg) != NULL)
{
- /* continue after the record */
- tmpRecPtr = InvalidXLogRecPtr;
-
/* past the record we've found, break out */
if (RecPtr <= state->ReadRecPtr)
{
+ /* Rewind the reader to the beginning of the last record. */
found = state->ReadRecPtr;
- goto out;
+ XLogBeginRead(state, found);
+ return found;
}
}
err:
-out:
- /* Reset state to what we had before finding the record */
- state->ReadRecPtr = saved_state.ReadRecPtr;
- state->EndRecPtr = saved_state.EndRecPtr;
XLogReaderInvalReadState(state);
- return found;
+ return InvalidXLogRecPtr;
}
#endif /* FRONTEND */