aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xlogreader.c106
-rw-r--r--src/backend/access/transam/xlogutils.c205
-rw-r--r--src/backend/replication/walsender.c300
-rw-r--r--src/bin/pg_waldump/pg_waldump.c168
-rw-r--r--src/include/access/xlogreader.h39
-rw-r--r--src/include/access/xlogutils.h2
6 files changed, 387 insertions, 433 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7f24f0cb95f..67418b05f15 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
*/
#include "postgres.h"
+#include <unistd.h>
+
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
@@ -27,6 +29,7 @@
#ifndef FRONTEND
#include "miscadmin.h"
+#include "pgstat.h"
#include "utils/memutils.h"
#endif
@@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
{
seg->ws_file = -1;
seg->ws_segno = 0;
- seg->ws_off = 0;
seg->ws_tli = 0;
segcxt->ws_segsize = segsize;
@@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
* byte to cover the whole record header, or at least the part of it that
* fits on the same page.
*/
- readOff = ReadPageInternal(state,
- targetPagePtr,
+ readOff = ReadPageInternal(state, targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
if (readOff < 0)
goto err;
@@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* check whether we have all the requested data already */
if (targetSegNo == state->seg.ws_segno &&
- targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
+ targetPageOff == state->segoff && reqLen <= state->readLen)
return state->readLen;
/*
@@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* update read state information */
state->seg.ws_segno = targetSegNo;
- state->seg.ws_off = targetPageOff;
+ state->segoff = targetPageOff;
state->readLen = readLen;
return readLen;
@@ -644,7 +645,7 @@ static void
XLogReaderInvalReadState(XLogReaderState *state)
{
state->seg.ws_segno = 0;
- state->seg.ws_off = 0;
+ state->segoff = 0;
state->readLen = 0;
}
@@ -1015,6 +1016,99 @@ out:
#endif /* FRONTEND */
+/*
+ * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
+ * fetched from timeline 'tli'.
+ *
+ * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
+ * to open the next segment, if necessary.
+ *
+ * Returns true if succeeded, false if an error occurs, in which case
+ * 'errinfo' receives error details.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+ WALOpenSegment *seg, WALSegmentContext *segcxt,
+ WALSegmentOpen openSegment, WALReadError *errinfo)
+{
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
+
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
+ while (nbytes > 0)
+ {
+ uint32 startoff;
+ int segbytes;
+ int readbytes;
+
+ startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+ /*
+ * If the data we want is not in a segment we have open, close what we
+ * have (if anything) and open the next one, using the caller's
+ * provided openSegment callback.
+ */
+ if (seg->ws_file < 0 ||
+ !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+ tli != seg->ws_tli)
+ {
+ XLogSegNo nextSegNo;
+
+ if (seg->ws_file >= 0)
+ close(seg->ws_file);
+
+ XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+ seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+
+ /* Update the current segment info. */
+ seg->ws_tli = tli;
+ seg->ws_segno = nextSegNo;
+ }
+
+ /* How many bytes are within this segment? */
+ if (nbytes > (segcxt->ws_segsize - startoff))
+ segbytes = segcxt->ws_segsize - startoff;
+ else
+ segbytes = nbytes;
+
+#ifndef FRONTEND
+ pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+ /* Reset errno first; eases reporting non-errno-affecting errors */
+ errno = 0;
+ readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+
+#ifndef FRONTEND
+ pgstat_report_wait_end();
+#endif
+
+ if (readbytes <= 0)
+ {
+ errinfo->wre_errno = errno;
+ errinfo->wre_req = segbytes;
+ errinfo->wre_read = readbytes;
+ errinfo->wre_off = startoff;
+ errinfo->wre_seg = *seg;
+ return false;
+ }
+
+ /* Update state for read */
+ recptr += readbytes;
+ nbytes -= readbytes;
+ p += readbytes;
+ }
+
+ return true;
+}
+
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
* ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5f1e5ba75d5..446760ed6e7 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -640,128 +640,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
}
/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend). Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
- Size count)
-{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- /* state maintained across calls */
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static TimeLineID sendTLI = 0;
- static uint32 sendOff = 0;
-
- Assert(segsize == wal_segment_size);
-
- p = buf;
- recptr = startptr;
- nbytes = count;
-
- while (nbytes > 0)
- {
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, segsize);
-
- /* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
- sendTLI != tli)
- {
- char path[MAXPGPATH];
-
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo, segsize);
-
- XLogFilePath(path, tli, sendSegNo, segsize);
-
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
- if (sendFile < 0)
- {
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- path)));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendOff = 0;
- sendTLI = tli;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- char path[MAXPGPATH];
- int save_errno = errno;
-
- XLogFilePath(path, tli, sendSegNo, segsize);
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- path, startoff)));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (segsize - startoff))
- segbytes = segsize - startoff;
- else
- segbytes = nbytes;
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendFile, p, segbytes);
- pgstat_report_wait_end();
- if (readbytes <= 0)
- {
- char path[MAXPGPATH];
- int save_errno = errno;
-
- XLogFilePath(path, tli, sendSegNo, segsize);
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- path, sendOff, (unsigned long) segbytes)));
- }
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
- }
-}
-
-/*
* Determine which timeline to read an xlog page from and set the
* XLogReaderState's currTLI to that timeline ID.
*
@@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
{
- const XLogRecPtr lastReadPage = state->seg.ws_segno *
- state->segcxt.ws_segsize + state->seg.ws_off;
+ const XLogRecPtr lastReadPage = (state->seg.ws_segno *
+ state->segcxt.ws_segsize + state->segoff);
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
@@ -896,6 +774,34 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
+/* openSegment callback for WALRead */
+static int
+wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p)
+{
+ TimeLineID tli = *tli_p;
+ char path[MAXPGPATH];
+ int fd;
+
+ XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (fd >= 0)
+ return fd;
+
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ path)));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+
+ return -1; /* keep compiler quiet */
+}
+
/*
* read_page callback for reading local xlog files
*
@@ -913,7 +819,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
{
XLogRecPtr read_upto,
loc;
+ TimeLineID tli;
int count;
+ WALReadError errinfo;
loc = targetPagePtr + reqLen;
@@ -932,7 +840,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
read_upto = GetFlushRecPtr();
else
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
- state->seg.ws_tli = ThisTimeLineID;
+ tli = ThisTimeLineID;
/*
* Check which timeline to get the record from.
@@ -982,14 +890,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
read_upto = state->currTLIValidUntil;
/*
- * Setting ws_tli to our wanted record's TLI is slightly wrong;
- * the page might begin on an older timeline if it contains a
- * timeline switch, since its xlog segment will have been copied
- * from the prior timeline. This is pretty harmless though, as
- * nothing cares so long as the timeline doesn't go backwards. We
- * should read the page header instead; FIXME someday.
+ * Setting tli to our wanted record's TLI is slightly wrong; the
+ * page might begin on an older timeline if it contains a timeline
+ * switch, since its xlog segment will have been copied from the
+ * prior timeline. This is pretty harmless though, as nothing
+ * cares so long as the timeline doesn't go backwards. We should
+ * read the page header instead; FIXME someday.
*/
- state->seg.ws_tli = state->currTLI;
+ tli = state->currTLI;
/* No need to wait on a historical timeline */
break;
@@ -1020,9 +928,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
- XLOG_BLCKSZ);
+ if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
+ &state->segcxt, wal_segment_open, &errinfo))
+ WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */
return count;
}
+
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * WALRead().
+ */
+void
+WALReadRaiseError(WALReadError *errinfo)
+{
+ WALOpenSegment *seg = &errinfo->wre_seg;
+ char *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno);
+
+ if (errinfo->wre_read < 0)
+ {
+ errno = errinfo->wre_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from log segment %s, offset %u: %m",
+ fname, errinfo->wre_off)));
+ }
+ else if (errinfo->wre_read == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+ fname, errinfo->wre_off, errinfo->wre_read,
+ (Size) errinfo->wre_req)));
+ }
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cbc928501af..ac9209747a4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
+static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
@@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
{
XLogRecPtr flushptr;
int count;
+ WALReadError errinfo;
+ XLogSegNo segno;
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+ if (!WALRead(cur_page,
+ targetPagePtr,
+ XLOG_BLCKSZ,
+ sendSeg->ws_tli, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new
+ * TLI is needed. */
+ sendSeg,
+ sendCxt,
+ WalSndSegmentOpen,
+ &errinfo))
+ WALReadRaiseError(&errinfo);
+
+ /*
+ * After reading into the buffer, check that what we read was valid. We do
+ * this after reading, because even though the segment was present when we
+ * opened it, it might get recycled or removed while we read it. The
+ * read() succeeds in that case, but the data we tried to read might
+ * already have been overwritten with new WAL records.
+ */
+ XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+ CheckXLogRemoved(segno, sendSeg->ws_tli);
return count;
}
@@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex);
}
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+/* walsender's openSegment callback for WALRead */
+static int
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p)
{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
- XLogSegNo segno;
-
-retry:
- p = buf;
- recptr = startptr;
- nbytes = count;
+ char path[MAXPGPATH];
+ int fd;
- while (nbytes > 0)
+ /*-------
+ * When reading from a historic timeline, and there is a timeline switch
+ * within this segment, read from the WAL segment belonging to the new
+ * timeline.
+ *
+ * For example, imagine that this server is currently on timeline 5, and
+ * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+ * 0/13002088. In pg_wal, we have these files:
+ *
+ * ...
+ * 000000040000000000000012
+ * 000000040000000000000013
+ * 000000050000000000000013
+ * 000000050000000000000014
+ * ...
+ *
+ * In this situation, when requested to send the WAL from segment 0x13, on
+ * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+ * recovery prefers files from newer timelines, so if the segment was
+ * restored from the archive on this server, the file belonging to the old
+ * timeline, 000000040000000000000013, might not exist. Their contents are
+ * equal up to the switchpoint, because at a timeline switch, the used
+ * portion of the old segment is copied to the new file. -------
+ */
+ *tli_p = sendTimeLine;
+ if (sendTimeLineIsHistoric)
{
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
- if (sendSeg->ws_file < 0 ||
- !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
- {
- char path[MAXPGPATH];
-
- /* Switch to another logfile segment */
- if (sendSeg->ws_file >= 0)
- close(sendSeg->ws_file);
-
- XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
- /*-------
- * When reading from a historic timeline, and there is a timeline
- * switch within this segment, read from the WAL segment belonging
- * to the new timeline.
- *
- * For example, imagine that this server is currently on timeline
- * 5, and we're streaming timeline 4. The switch from timeline 4
- * to 5 happened at 0/13002088. In pg_wal, we have these files:
- *
- * ...
- * 000000040000000000000012
- * 000000040000000000000013
- * 000000050000000000000013
- * 000000050000000000000014
- * ...
- *
- * In this situation, when requested to send the WAL from
- * segment 0x13, on timeline 4, we read the WAL from file
- * 000000050000000000000013. Archive recovery prefers files from
- * newer timelines, so if the segment was restored from the
- * archive on this server, the file belonging to the old timeline,
- * 000000040000000000000013, might not exist. Their contents are
- * equal up to the switchpoint, because at a timeline switch, the
- * used portion of the old segment is copied to the new file.
- *-------
- */
- sendSeg->ws_tli = sendTimeLine;
- if (sendTimeLineIsHistoric)
- {
- XLogSegNo endSegNo;
-
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
- if (sendSeg->ws_segno == endSegNo)
- sendSeg->ws_tli = sendTimeLineNextTLI;
- }
-
- XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
- sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (sendSeg->ws_file < 0)
- {
- /*
- * If the file is not found, assume it's because the standby
- * asked for a too old WAL segment that has already been
- * removed or recycled.
- */
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendSeg->ws_off = 0;
- }
-
- /* Need to seek in the file? */
- if (sendSeg->ws_off != startoff)
- {
- if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- startoff)));
- sendSeg->ws_off = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (segcxt->ws_segsize - startoff))
- segbytes = segcxt->ws_segsize - startoff;
- else
- segbytes = nbytes;
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendSeg->ws_file, p, segbytes);
- pgstat_report_wait_end();
- if (readbytes < 0)
- {
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %zu: %m",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- sendSeg->ws_off, (Size) segbytes)));
- }
- else if (readbytes == 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- sendSeg->ws_off, readbytes, (Size) segbytes)));
- }
+ XLogSegNo endSegNo;
- /* Update state for read */
- recptr += readbytes;
-
- sendSeg->ws_off += readbytes;
- nbytes -= readbytes;
- p += readbytes;
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+ if (sendSeg->ws_segno == endSegNo)
+ *tli_p = sendTimeLineNextTLI;
}
- /*
- * After reading into the buffer, check that what we read was valid. We do
- * this after reading, because even though the segment was present when we
- * opened it, it might get recycled or removed while we read it. The
- * read() succeeds in that case, but the data we tried to read might
- * already have been overwritten with new WAL records.
- */
- XLByteToSeg(startptr, segno, segcxt->ws_segsize);
- CheckXLogRemoved(segno, ThisTimeLineID);
+ XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (fd >= 0)
+ return fd;
/*
- * During recovery, the currently-open WAL file might be replaced with the
- * file of the same name retrieved from archive. So we always need to
- * check what we read was valid after reading into the buffer. If it's
- * invalid, we try to open and read the file again.
+ * If the file is not found, assume it's because the standby asked for a
+ * too old WAL segment that has already been removed or recycled.
*/
- if (am_cascading_walsender)
- {
- WalSnd *walsnd = MyWalSnd;
- bool reload;
-
- SpinLockAcquire(&walsnd->mutex);
- reload = walsnd->needreload;
- walsnd->needreload = false;
- SpinLockRelease(&walsnd->mutex);
-
- if (reload && sendSeg->ws_file >= 0)
- {
- close(sendSeg->ws_file);
- sendSeg->ws_file = -1;
-
- goto retry;
- }
- }
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ XLogFileNameP(*tli_p, nextSegNo))));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+ return -1; /* keep compiler quiet */
}
/*
@@ -2562,6 +2464,8 @@ XLogSendPhysical(void)
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ XLogSegNo segno;
+ WALReadError errinfo;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2777,7 +2681,49 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+ if (!WALRead(&output_message.data[output_message.len],
+ startptr,
+ nbytes,
+ sendSeg->ws_tli, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new
+ * TLI is needed. */
+ sendSeg,
+ sendCxt,
+ WalSndSegmentOpen,
+ &errinfo))
+ WALReadRaiseError(&errinfo);
+
+ /* See logical_read_xlog_page(). */
+ XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+ CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendSeg->ws_file >= 0)
+ {
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
+
+ goto retry;
+ }
+ }
+
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d6695f7196f..30a5851d87c 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,137 +280,57 @@ identify_target_directory(char *directory, char *fname)
return NULL; /* not reached */
}
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
- XLogRecPtr startptr, char *buf, Size count)
+/* pg_waldump's openSegment callback for WALRead */
+static int
+WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p)
{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static uint32 sendOff = 0;
+ TimeLineID tli = *tli_p;
+ char fname[MAXPGPATH];
+ int fd;
+ int tries;
- p = buf;
- recptr = startptr;
- nbytes = count;
+ XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
- while (nbytes > 0)
+ /*
+ * In follow mode there is a short period of time after the server has
+ * written the end of the previous file before the new file is available.
+ * So we loop for 5 seconds looking for the file to appear before giving
+ * up.
+ */
+ for (tries = 0; tries < 10; tries++)
{
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, WalSegSz);
-
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
- {
- char fname[MAXFNAMELEN];
- int tries;
-
- /* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
- /*
- * In follow mode there is a short period of time after the server
- * has written the end of the previous file before the new file is
- * available. So we loop for 5 seconds looking for the file to
- * appear before giving up.
- */
- for (tries = 0; tries < 10; tries++)
- {
- sendFile = open_file_in_directory(directory, fname);
- if (sendFile >= 0)
- break;
- if (errno == ENOENT)
- {
- int save_errno = errno;
-
- /* File not there yet, try again */
- pg_usleep(500 * 1000);
-
- errno = save_errno;
- continue;
- }
- /* Any other error, fall through and fail */
- break;
- }
-
- if (sendFile < 0)
- fatal_error("could not find file \"%s\": %s",
- fname, strerror(errno));
- sendOff = 0;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- int err = errno;
- char fname[MAXPGPATH];
-
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
- fatal_error("could not seek in log file %s to offset %u: %s",
- fname, startoff, strerror(err));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (WalSegSz - startoff))
- segbytes = WalSegSz - startoff;
- else
- segbytes = nbytes;
-
- readbytes = read(sendFile, p, segbytes);
- if (readbytes <= 0)
+ fd = open_file_in_directory(segcxt->ws_dir, fname);
+ if (fd >= 0)
+ return fd;
+ if (errno == ENOENT)
{
- int err = errno;
- char fname[MAXPGPATH];
int save_errno = errno;
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
- errno = save_errno;
+ /* File not there yet, try again */
+ pg_usleep(500 * 1000);
- if (readbytes < 0)
- fatal_error("could not read from log file %s, offset %u, length %d: %s",
- fname, sendOff, segbytes, strerror(err));
- else if (readbytes == 0)
- fatal_error("could not read from log file %s, offset %u: read %d of %zu",
- fname, sendOff, readbytes, (Size) segbytes);
+ errno = save_errno;
+ continue;
}
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
+ /* Any other error, fall through and fail */
+ break;
}
+
+ fatal_error("could not find file \"%s\": %s", fname, strerror(errno));
+ return -1; /* keep compiler quiet */
}
/*
* XLogReader read_page callback
*/
static int
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetPtr, char *readBuff)
+WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetPtr, char *readBuff)
{
XLogDumpPrivate *private = state->private_data;
int count = XLOG_BLCKSZ;
+ WALReadError errinfo;
if (private->endptr != InvalidXLogRecPtr)
{
@@ -425,8 +345,26 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
- readBuff, count);
+ if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
+ &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+ {
+ WALOpenSegment *seg = &errinfo.wre_seg;
+ char fname[MAXPGPATH];
+
+ XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+ state->segcxt.ws_segsize);
+
+ if (errinfo.wre_errno != 0)
+ {
+ errno = errinfo.wre_errno;
+ fatal_error("could not read from file %s, offset %u: %m",
+ fname, errinfo.wre_off);
+ }
+ else
+ fatal_error("could not read from file %s, offset %u: read %d of %zu",
+ fname, errinfo.wre_off, errinfo.wre_read,
+ (Size) errinfo.wre_req);
+ }
return count;
}
@@ -1089,7 +1027,7 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
+ xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
&private);
if (!xlogreader_state)
fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 1bbee386e8d..0193611b7fd 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -36,7 +36,6 @@ typedef struct WALOpenSegment
{
int ws_file; /* segment file descriptor */
XLogSegNo ws_segno; /* segment number */
- uint32 ws_off; /* offset in the segment */
TimeLineID ws_tli; /* timeline ID of the currently open file */
} WALOpenSegment;
@@ -168,6 +167,7 @@ struct XLogReaderState
/* last read XLOG position for data currently in readBuf */
WALSegmentContext segcxt;
WALOpenSegment seg;
+ uint32 segoff;
/*
* beginning of prior page read, and its TLI. Doesn't necessarily
@@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/*
+ * Callback to open the specified WAL segment for reading. Returns a valid
+ * file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can use
+ * it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
@@ -232,6 +250,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
+
+/*
+ * Error information from WALRead that both backend and frontend caller can
+ * process. Currently only errors from pg_pread can be reported.
+ */
+typedef struct WALReadError
+{
+ int wre_errno; /* errno set by the last pg_pread() */
+ int wre_off; /* Offset we tried to read from. */
+ int wre_req; /* Bytes requested to be read. */
+ int wre_read; /* Bytes read by the last read(). */
+ WALOpenSegment wre_seg; /* Segment we tried to read from. */
+} WALReadError;
+
+extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+ TimeLineID tli, WALOpenSegment *seg,
+ WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+ WALReadError *errinfo);
+
/* Functions for decoding an XLogRecord */
extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 2df98e45b20..0572b241927 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -54,4 +54,6 @@ extern int read_local_xlog_page(XLogReaderState *state,
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
+extern void WALReadRaiseError(WALReadError *errinfo);
+
#endif