diff options
author | Thomas Munro <tmunro@postgresql.org> | 2021-04-08 23:03:23 +1200 |
---|---|---|
committer | Thomas Munro <tmunro@postgresql.org> | 2021-04-08 23:20:42 +1200 |
commit | 323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b (patch) | |
tree | 5290af3834511b9bd1773841b1068e485ba52fe6 /src/backend/access/transam/xlog.c | |
parent | 5ac9c4307337313bedeafc21dbbab93ba809241c (diff) | |
download | postgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.tar.gz postgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.zip |
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a
callback function. Redesign the interface so that it tells the caller
to insert more data with a special return value instead. This API suits
later patches for prefetching, encryption and maybe other future
projects that would otherwise require continually extending the callback
interface.
As incidental cleanup work, move global variables readOff, readLen and
readSegNo inside XlogReaderState.
Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version)
Reviewed-by: Antonin Houska <ah@cybertec.at>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 126 |
1 files changed, 57 insertions, 69 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c1d4415a433..7faac01bf24 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readLen indicates how much of the current - * page has been read into readBuf, and readSource indicates where we got - * the currently open file from. + * will be just past that page. readSource indicates where we got the + * currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; -static XLogSegNo readSegNo = 0; -static uint32 readOff = 0; -static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr); + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + XLogSegNo readSegNo); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(), NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); if (!debug_reader) { @@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - /* Pass through parameters to XLogPageRead */ - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + + } - record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6457,7 +6450,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6616,13 +6608,9 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &XLogPageRead, - .segment_open = NULL, - .segment_close = wal_segment_close), - &private); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7819,7 +7807,8 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == + XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -12107,13 +12096,15 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf) -{ - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; +static bool +XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess) +{ + char *readBuf = state->readBuf; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + int readLen = 0; + XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) + !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); + XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12156,17 +12147,15 @@ retry: flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr)) + randAccess, fetching_ckpt, + targetRecPtr, state->seg.ws_segno)) { if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; - - return -1; + XLogReaderSetInputData(state, -1); + return false; } } @@ -12193,40 +12182,36 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ - readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, targetPageOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); + fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == readSegNo); - Assert(targetPageOff == readOff); - Assert(reqLen <= readLen); + Assert(targetSegNo == state->seg.ws_segno); + Assert(readLen >= reqLen); - xlogreader->seg.ws_tli = curFileTLI; + state->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12254,14 +12239,16 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) { - /* reset any error XLogReaderValidatePageHeader() might have set */ - xlogreader->errormsg_buf[0] = '\0'; + /* reset any error StateValidatePageHeader() might have set */ + state->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - return readLen; + Assert(state->readPagePtr == targetPagePtr); + XLogReaderSetInputData(state, readLen); + return true; next_record_is_invalid: lastSourceFailed = true; @@ -12269,14 +12256,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + XLogReaderSetInputData(state, -1); + return false; } /* @@ -12307,7 +12294,8 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr) + bool fetching_ckpt, XLogRecPtr tliRecPtr, + XLogSegNo readSegNo) { static TimestampTz last_fail_time = 0; TimestampTz now; |