aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/twophase.c5
-rw-r--r--src/backend/access/transam/xlog.c17
-rw-r--r--src/backend/access/transam/xlogreader.c73
-rw-r--r--src/backend/access/transam/xlogutils.c30
-rw-r--r--src/backend/replication/logical/logical.c2
-rw-r--r--src/backend/replication/logical/logicalfuncs.c4
-rw-r--r--src/backend/replication/walsender.c102
-rw-r--r--src/bin/pg_rewind/parsexlog.c21
-rw-r--r--src/bin/pg_waldump/pg_waldump.c68
-rw-r--r--src/include/access/xlogreader.h41
-rw-r--r--src/include/access/xlogutils.h3
-rw-r--r--src/include/replication/logicalfuncs.h2
12 files changed, 189 insertions, 179 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477709bbc23..546bd43ce8b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
*
* Note clearly that this function can access WAL during normal operation,
* similarly to the way WALSender or Logical Decoding would do.
- *
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState *xlogreader;
char *errormsg;
- xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
- NULL);
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ &read_local_xlog_page, NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 501f46fd52d..6c69eb6dd76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *readTLI);
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader)
- debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+ debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
+ NULL, NULL);
if (!debug_reader)
{
@@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
offset = XLogSegmentOffset(xlogreader->latestPagePtr,
wal_segment_size);
- XLogFileName(fname, xlogreader->readPageTLI, segno,
+ XLogFileName(fname, xlogreader->seg.ws_tli, segno,
wal_segment_size);
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
@@ -6353,7 +6353,8 @@ StartupXLOG(void)
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ &XLogPageRead, &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7355,7 +7356,7 @@ StartupXLOG(void)
* and we were reading the old WAL from a segment belonging to a higher
* timeline.
*/
- EndOfLogTLI = xlogreader->readPageTLI;
+ EndOfLogTLI = xlogreader->seg.ws_tli;
/*
* Complain if we did not roll forward far enough to render the backup
@@ -11523,7 +11524,7 @@ CancelBackup(void)
*/
static int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+ XLogRecPtr targetRecPtr, char *readBuf)
{
XLogPageReadPrivate *private =
(XLogPageReadPrivate *) xlogreader->private_data;
@@ -11640,7 +11641,7 @@ retry:
Assert(targetPageOff == readOff);
Assert(reqLen <= readLen);
- *readTLI = curFileTLI;
+ xlogreader->seg.ws_tli = curFileTLI;
/*
* Check the page header immediately, so that we can retry immediately if
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a66e3324b11..27c27303d6c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
* Returns NULL if the xlogreader couldn't be allocated.
*/
XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
- void *private_data)
+XLogReaderAllocate(int wal_segment_size, const char *waldir,
+ XLogPageReadCB pagereadfunc, void *private_data)
{
XLogReaderState *state;
@@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
return NULL;
}
- state->wal_segment_size = wal_segment_size;
+ /* Initialize segment info. */
+ WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
+ waldir);
+
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
@@ -199,6 +202,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
}
/*
+ * Initialize the passed segment structs.
+ */
+void
+WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+ int segsize, const char *waldir)
+{
+ seg->ws_file = -1;
+ seg->ws_segno = 0;
+ seg->ws_off = 0;
+ seg->ws_tli = 0;
+
+ segcxt->ws_segsize = segsize;
+ if (waldir)
+ snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
+}
+
+/*
* Attempt to read an XLOG record.
*
* If RecPtr is valid, try to read a record at that position. Otherwise
@@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->EndRecPtr += state->wal_segment_size - 1;
- state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
+ state->EndRecPtr += state->segcxt.ws_segsize - 1;
+ state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
}
if (DecodeXLogRecord(state, record, errormsg))
@@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
Assert((pageptr % XLOG_BLCKSZ) == 0);
- XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
- targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+ XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+ targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
/* check whether we have all the requested data already */
- if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
- reqLen <= state->readLen)
+ if (targetSegNo == state->seg.ws_segno &&
+ targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
return state->readLen;
/*
@@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header.
*/
- if (targetSegNo != state->readSegNo && targetPageOff != 0)
+ if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
*/
readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
}
@@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
goto err;
/* update read state information */
- state->readSegNo = targetSegNo;
- state->readOff = targetPageOff;
+ state->seg.ws_segno = targetSegNo;
+ state->seg.ws_off = targetPageOff;
state->readLen = readLen;
return readLen;
@@ -625,8 +645,8 @@ err:
static void
XLogReaderInvalReadState(XLogReaderState *state)
{
- state->readSegNo = 0;
- state->readOff = 0;
+ state->seg.ws_segno = 0;
+ state->seg.ws_off = 0;
state->readLen = 0;
}
@@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
Assert((recptr % XLOG_BLCKSZ) == 0);
- XLByteToSeg(recptr, segno, state->wal_segment_size);
- offset = XLogSegmentOffset(recptr, state->wal_segment_size);
+ XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
+ offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
- XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
+ XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"invalid magic number %04X in log segment %s, offset %u",
@@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"invalid info bits %04X in log segment %s, offset %u",
@@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
(unsigned long long) state->system_identifier);
return false;
}
- else if (longhdr->xlp_seg_size != state->wal_segment_size)
+ else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
{
report_invalid_record(state,
"WAL file is from different database system: incorrect segment size in page header");
@@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
/* hmm, first page of file doesn't have a long header? */
report_invalid_record(state,
@@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"unexpected pageaddr %X/%X in log segment %s, offset %u",
@@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
@@ -997,7 +1017,6 @@ out:
#endif /* FRONTEND */
-
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
* ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 1fc39333f15..5f1e5ba75d5 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
{
- const XLogRecPtr lastReadPage = state->readSegNo *
- state->wal_segment_size + state->readOff;
+ const XLogRecPtr lastReadPage = state->seg.ws_segno *
+ state->segcxt.ws_segsize + state->seg.ws_off;
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
state->currTLI != ThisTimeLineID &&
state->currTLI != 0 &&
- ((wantPage + wantLength) / state->wal_segment_size) <
- (state->currTLIValidUntil / state->wal_segment_size))
+ ((wantPage + wantLength) / state->segcxt.ws_segsize) <
+ (state->currTLIValidUntil / state->segcxt.ws_segsize))
return;
/*
@@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* by a promotion or replay from a cascaded replica.
*/
List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+ XLogRecPtr endOfSegment;
- XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1)
- * state->wal_segment_size) - 1;
-
- Assert(wantPage / state->wal_segment_size ==
- endOfSegment / state->wal_segment_size);
+ endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
+ state->segcxt.ws_segsize - 1;
+ Assert(wantPage / state->segcxt.ws_segsize ==
+ endOfSegment / state->segcxt.ws_segsize);
/*
* Find the timeline of the last LSN on the segment containing
@@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
*/
int
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
{
XLogRecPtr read_upto,
loc;
@@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
read_upto = GetFlushRecPtr();
else
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-
- *pageTLI = ThisTimeLineID;
+ state->seg.ws_tli = ThisTimeLineID;
/*
* Check which timeline to get the record from.
@@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
read_upto = state->currTLIValidUntil;
/*
- * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * Setting ws_tli to our wanted record's TLI is slightly wrong;
* the page might begin on an older timeline if it contains a
* timeline switch, since its xlog segment will have been copied
* from the prior timeline. This is pretty harmless though, as
* nothing cares so long as the timeline doesn't go backwards. We
* should read the page header instead; FIXME someday.
*/
- *pageTLI = state->currTLI;
+ state->seg.ws_tli = state->currTLI;
/* No need to wait on a historical timeline */
break;
@@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
+ XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
XLOG_BLCKSZ);
/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f8b9020081e..da265f52940 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6ef..d1cf80d4417 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -116,10 +116,10 @@ check_permissions(void)
int
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
{
return read_local_xlog_page(state, targetPagePtr, reqLen,
- targetRecPtr, cur_page, pageTLI);
+ targetRecPtr, cur_page);
}
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23870a25a56..eb4a98cc912 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -128,16 +128,8 @@ bool log_replication_commands = false;
*/
bool wake_wal_senders = false;
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static WALOpenSegment *sendSeg = NULL;
+static WALSegmentContext *sendCxt = NULL;
/*
* These variables keep track of the state of the timeline we're currently
@@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
@@ -285,6 +277,13 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+ /* Make sure we can remember the current read position in XLOG. */
+ sendSeg = (WALOpenSegment *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
+ sendCxt = (WALSegmentContext *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
+ WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
}
/*
@@ -301,10 +300,10 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- if (sendFile >= 0)
+ if (sendSeg->ws_file >= 0)
{
- close(sendFile);
- sendFile = -1;
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
}
if (MyReplicationSlot != NULL)
@@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd)
*/
static int
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+ XLogRecPtr targetRecPtr, char *cur_page)
{
XLogRecPtr flushptr;
int count;
@@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
return count;
}
@@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
@@ -2382,17 +2381,18 @@ retry:
int segbytes;
int readbytes;
- startoff = XLogSegmentOffset(recptr, wal_segment_size);
+ startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
+ if (sendSeg->ws_file < 0 ||
+ !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
{
char path[MAXPGPATH];
/* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
+ if (sendSeg->ws_file >= 0)
+ close(sendSeg->ws_file);
- XLByteToSeg(recptr, sendSegNo, wal_segment_size);
+ XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
/*-------
* When reading from a historic timeline, and there is a timeline
@@ -2420,20 +2420,20 @@ retry:
* used portion of the old segment is copied to the new file.
*-------
*/
- curFileTimeLine = sendTimeLine;
+ sendSeg->ws_tli = sendTimeLine;
if (sendTimeLineIsHistoric)
{
XLogSegNo endSegNo;
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
- if (sendSegNo == endSegNo)
- curFileTimeLine = sendTimeLineNextTLI;
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+ if (sendSeg->ws_segno == endSegNo)
+ sendSeg->ws_tli = sendTimeLineNextTLI;
}
- XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
+ XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (sendFile < 0)
+ sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (sendSeg->ws_file < 0)
{
/*
* If the file is not found, assume it's because the standby
@@ -2444,58 +2444,58 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(curFileTimeLine, sendSegNo))));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
else
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
}
- sendOff = 0;
+ sendSeg->ws_off = 0;
}
/* Need to seek in the file? */
- if (sendOff != startoff)
+ if (sendSeg->ws_off != startoff)
{
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+ if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
startoff)));
- sendOff = startoff;
+ sendSeg->ws_off = startoff;
}
/* How many bytes are within this segment? */
- if (nbytes > (wal_segment_size - startoff))
- segbytes = wal_segment_size - startoff;
+ if (nbytes > (segcxt->ws_segsize - startoff))
+ segbytes = segcxt->ws_segsize - startoff;
else
segbytes = nbytes;
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendFile, p, segbytes);
+ readbytes = read(sendSeg->ws_file, p, segbytes);
pgstat_report_wait_end();
if (readbytes < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u, length %zu: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, (Size) segbytes)));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+ sendSeg->ws_off, (Size) segbytes)));
}
else if (readbytes == 0)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, readbytes, (Size) segbytes)));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+ sendSeg->ws_off, readbytes, (Size) segbytes)));
}
/* Update state for read */
recptr += readbytes;
- sendOff += readbytes;
+ sendSeg->ws_off += readbytes;
nbytes -= readbytes;
p += readbytes;
}
@@ -2507,7 +2507,7 @@ retry:
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLByteToSeg(startptr, segno, wal_segment_size);
+ XLByteToSeg(startptr, segno, segcxt->ws_segsize);
CheckXLogRemoved(segno, ThisTimeLineID);
/*
@@ -2526,10 +2526,10 @@ retry:
walsnd->needreload = false;
SpinLockRelease(&walsnd->mutex);
- if (reload && sendFile >= 0)
+ if (reload && sendSeg->ws_file >= 0)
{
- close(sendFile);
- sendFile = -1;
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
goto retry;
}
@@ -2695,9 +2695,9 @@ XLogSendPhysical(void)
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
- if (sendFile >= 0)
- close(sendFile);
- sendFile = -1;
+ if (sendSeg->ws_file >= 0)
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
@@ -2768,7 +2768,7 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 63c3879ead8..264a8f4db5f 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -43,14 +43,12 @@ static char xlogfpath[MAXPGPATH];
typedef struct XLogPageReadPrivate
{
- const char *datadir;
int tliIndex;
} XLogPageReadPrivate;
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI);
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
/*
* Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -66,9 +64,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
char *errormsg;
XLogPageReadPrivate private;
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -119,9 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
XLogPageReadPrivate private;
XLogRecPtr endptr;
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -177,9 +173,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
forkptr += SizeOfXLogShortPHD;
}
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -237,8 +232,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
/* XLogReader callback function, to read a WAL page */
static int
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
{
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
uint32 targetPageOff;
@@ -283,7 +277,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
xlogreadsegno, WalSegSz);
- snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+ snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
+ xlogreader->segcxt.ws_dir, xlogfname);
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
@@ -321,7 +316,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
Assert(targetSegNo == xlogreadsegno);
- *pageTLI = targetHistory[private->tliIndex].tli;
+ xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
return XLOG_BLCKSZ;
}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b95d467805a..b79208cd736 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -33,7 +33,6 @@ static int WalSegSz;
typedef struct XLogDumpPrivate
{
TimeLineID timeline;
- char *inpath;
XLogRecPtr startptr;
XLogRecPtr endptr;
bool endptr_reached;
@@ -224,7 +223,7 @@ search_directory(const char *directory, const char *fname)
}
/*
- * Identify the target directory and set WalSegSz.
+ * Identify the target directory.
*
* Try to find the file in several places:
* if directory != NULL:
@@ -235,29 +234,22 @@ search_directory(const char *directory, const char *fname)
* XLOGDIR /
* $PGDATA / XLOGDIR /
*
- * Set the valid target directory in private->inpath.
+ * The valid target directory is returned.
*/
-static void
-identify_target_directory(XLogDumpPrivate *private, char *directory,
- char *fname)
+static char *
+identify_target_directory(char *directory, char *fname)
{
char fpath[MAXPGPATH];
if (directory != NULL)
{
if (search_directory(directory, fname))
- {
- private->inpath = pg_strdup(directory);
- return;
- }
+ return pg_strdup(directory);
/* directory / XLOGDIR */
snprintf(fpath, MAXPGPATH, "%s/%s", directory, XLOGDIR);
if (search_directory(fpath, fname))
- {
- private->inpath = pg_strdup(fpath);
- return;
- }
+ return pg_strdup(fpath);
}
else
{
@@ -265,16 +257,10 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
/* current directory */
if (search_directory(".", fname))
- {
- private->inpath = pg_strdup(".");
- return;
- }
+ return pg_strdup(".");
/* XLOGDIR */
if (search_directory(XLOGDIR, fname))
- {
- private->inpath = pg_strdup(XLOGDIR);
- return;
- }
+ return pg_strdup(XLOGDIR);
datadir = getenv("PGDATA");
/* $PGDATA / XLOGDIR */
@@ -282,10 +268,7 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
{
snprintf(fpath, MAXPGPATH, "%s/%s", datadir, XLOGDIR);
if (search_directory(fpath, fname))
- {
- private->inpath = pg_strdup(fpath);
- return;
- }
+ return pg_strdup(fpath);
}
}
@@ -294,6 +277,8 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
fatal_error("could not locate WAL file \"%s\"", fname);
else
fatal_error("could not find any WAL file");
+
+ return NULL; /* not reached */
}
/*
@@ -423,7 +408,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
*/
static int
XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+ XLogRecPtr targetPtr, char *readBuff)
{
XLogDumpPrivate *private = state->private_data;
int count = XLOG_BLCKSZ;
@@ -441,7 +426,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
+ XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
readBuff, count);
return count;
@@ -820,6 +805,7 @@ main(int argc, char **argv)
XLogDumpStats stats;
XLogRecord *record;
XLogRecPtr first_record;
+ char *waldir = NULL;
char *errormsg;
static struct option long_options[] = {
@@ -912,7 +898,7 @@ main(int argc, char **argv)
}
break;
case 'p':
- private.inpath = pg_strdup(optarg);
+ waldir = pg_strdup(optarg);
break;
case 'r':
{
@@ -994,13 +980,13 @@ main(int argc, char **argv)
goto bad_argument;
}
- if (private.inpath != NULL)
+ if (waldir != NULL)
{
/* validate path points to directory */
- if (!verify_directory(private.inpath))
+ if (!verify_directory(waldir))
{
pg_log_error("path \"%s\" could not be opened: %s",
- private.inpath, strerror(errno));
+ waldir, strerror(errno));
goto bad_argument;
}
}
@@ -1015,17 +1001,17 @@ main(int argc, char **argv)
split_path(argv[optind], &directory, &fname);
- if (private.inpath == NULL && directory != NULL)
+ if (waldir == NULL && directory != NULL)
{
- private.inpath = directory;
+ waldir = directory;
- if (!verify_directory(private.inpath))
+ if (!verify_directory(waldir))
fatal_error("could not open directory \"%s\": %s",
- private.inpath, strerror(errno));
+ waldir, strerror(errno));
}
- identify_target_directory(&private, private.inpath, fname);
- fd = open_file_in_directory(private.inpath, fname);
+ waldir = identify_target_directory(waldir, fname);
+ fd = open_file_in_directory(waldir, fname);
if (fd < 0)
fatal_error("could not open file \"%s\"", fname);
close(fd);
@@ -1056,7 +1042,7 @@ main(int argc, char **argv)
/* ignore directory, already have that */
split_path(argv[optind + 1], &directory, &fname);
- fd = open_file_in_directory(private.inpath, fname);
+ fd = open_file_in_directory(waldir, fname);
if (fd < 0)
fatal_error("could not open file \"%s\"", fname);
close(fd);
@@ -1088,7 +1074,7 @@ main(int argc, char **argv)
}
}
else
- identify_target_directory(&private, private.inpath, NULL);
+ waldir = identify_target_directory(waldir, NULL);
/* we don't know what to print */
if (XLogRecPtrIsInvalid(private.startptr))
@@ -1100,7 +1086,7 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
+ xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
&private);
if (!xlogreader_state)
fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 735b1bd2fd6..1bbee386e8d 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -31,6 +31,22 @@
#include "access/xlogrecord.h"
+/* WALOpenSegment represents a WAL segment being read. */
+typedef struct WALOpenSegment
+{
+ int ws_file; /* segment file descriptor */
+ XLogSegNo ws_segno; /* segment number */
+ uint32 ws_off; /* offset in the segment */
+ TimeLineID ws_tli; /* timeline ID of the currently open file */
+} WALOpenSegment;
+
+/* WALSegmentContext carries context information about WAL segments to read */
+typedef struct WALSegmentContext
+{
+ char ws_dir[MAXPGPATH];
+ int ws_segsize;
+} WALSegmentContext;
+
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
@@ -38,8 +54,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
- char *readBuf,
- TimeLineID *pageTLI);
+ char *readBuf);
typedef struct
{
@@ -78,11 +93,6 @@ struct XLogReaderState
*/
/*
- * Segment size of the to-be-parsed data (mandatory).
- */
- int wal_segment_size;
-
- /*
* Data input callback (mandatory).
*
* This callback shall read at least reqLen valid bytes of the xlog page
@@ -99,9 +109,8 @@ struct XLogReaderState
* actual WAL record it's interested in. In that case, targetRecPtr can
* be used to determine which timeline to read the page from.
*
- * The callback shall set *pageTLI to the TLI of the file the page was
- * read from. It is currently used only for error reporting purposes, to
- * reconstruct the name of the WAL file where an error occurred.
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
*/
XLogPageReadCB read_page;
@@ -156,10 +165,9 @@ struct XLogReaderState
char *readBuf;
uint32 readLen;
- /* last read segment, segment offset, TLI for data currently in readBuf */
- XLogSegNo readSegNo;
- uint32 readOff;
- TimeLineID readPageTLI;
+ /* last read XLOG position for data currently in readBuf */
+ WALSegmentContext segcxt;
+ WALOpenSegment seg;
/*
* beginning of prior page read, and its TLI. Doesn't necessarily
@@ -202,12 +210,17 @@ struct XLogReaderState
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
+ const char *waldir,
XLogPageReadCB pagereadfunc,
void *private_data);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Initialize supporting structures */
+extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+ int segsize, const char *waldir);
+
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
XLogRecPtr recptr, char **errormsg);
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904b..2df98e45b20 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -49,8 +49,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI);
+ XLogRecPtr targetRecPtr, char *cur_page);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e68..012096f183d 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -14,6 +14,6 @@
extern int logical_read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr,
- char *cur_page, TimeLineID *pageTLI);
+ char *cur_page);
#endif