diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/twophase.c | 5 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 10 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 51 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 24 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 20 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 10 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 36 |
8 files changed, 103 insertions, 57 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2f7d4ed59a8..e1904877faa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &read_local_xlog_page, NULL); + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d3d6709284..a53e6d96334 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata, if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - NULL, NULL); + XL_ROUTINE(), NULL); if (!debug_reader) { @@ -6478,8 +6478,12 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &XLogPageRead, &private); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474c..7cee8b92c90 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) + XLogReaderRoutine *routine, void *private_data) { XLogReaderState *state; @@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, if (!state) return NULL; + /* initialize caller-provided support functions */ + state->routine = *routine; + state->max_block_id = -1; /* @@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ @@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) int block_id; if (state->seg.ws_file != -1) - close(state->seg.ws_file); + state->routine.segment_close(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is + * If the page_read callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg * is set to NULL. * @@ -559,10 +561,10 @@ err: /* * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * via the page_read() callback. * * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * is set in that case (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Data is not in our buffer. * * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the read_page callback might + * it before, we need to do verification as the page_read callback might * now be rereading data from a different source. * * Whenever switching to a new WAL segment, we read the first page of the @@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; } @@ -1041,11 +1043,14 @@ err: #endif /* FRONTEND */ /* + * Helper function to ease writing of XLogRoutine->page_read callbacks. + * If this function is used, caller must supply an open_segment callback in + * 'state', as that is used here. + * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback - * to open the next segment, if necessary. + * 'seg/segcxt' identify the last segment used. * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. @@ -1054,9 +1059,10 @@ err: * WAL buffers when possible. */ bool -WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, +WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, - WALSegmentOpen openSegment, WALReadError *errinfo) + WALReadError *errinfo) { char *p; XLogRecPtr recptr; @@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, XLogSegNo nextSegNo; if (seg->ws_file >= 0) - close(seg->ws_file); + state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli); /* Update the current segment info. */ seg->ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 6cb143e161d..bbd801513a8 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } -/* openSegment callback for WALRead */ -static int -wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, - TimeLineID *tli_p) +/* XLogReaderRoutine->segment_open callback for local pg_wal files */ +int +wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; @@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, return -1; /* keep compiler quiet */ } +/* stock XLogReaderRoutine->segment_close callback */ +void +wal_segment_close(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + /* - * read_page callback for reading local xlog files + * XLogReaderRoutine->page_read callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. @@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, - &state->segcxt, wal_segment_open, &errinfo)) + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, + &state->seg, &state->segcxt, + &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583b..dc69e5ce5f3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options, * Otherwise, we set for decoding to start from the given LSN without * marking WAL reserved beforehand. In that scenario, it's up to the * caller to guarantee that WAL remains available. - * read_page, prepare_write, do_write, update_progress -- + * xl_routine -- XLogReaderRoutine for underlying XLogReader + * prepare_write, do_write, update_progress -- * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived @@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - read_page, prepare_write, do_write, + xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin, * fast_forward * bypass the generation of logical changes. * - * read_page, prepare_write, do_write, update_progress + * xl_routine + * XLogReaderRoutine used by underlying xlogreader + * + * prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -372,7 +376,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, read_page, prepare_write, + fast_forward, xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fded8e82908..b99c94e8489 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ae751e94e76..26890dffb45 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ restart_lsn, - read_local_xlog_page, NULL, NULL, - NULL); + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); /* * If caller needs us to determine the decoding start point, do so now. @@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), NULL, NULL, NULL); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8b55bbfcb2e..d18475b8540 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -54,8 +54,8 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "access/xlogutils.h" - #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); +static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd) } /* - * read_page callback for logical decoding contexts, as a walsender process. + * XLogReaderRoutine->page_read callback for logical decoding contexts, as a + * walsender process. * * Inside the walsender we can do better than read_local_xlog_page, * which has to do a plain sleep/busy loop, because the walsender's latch gets @@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(cur_page, + if (!WALRead(state, + cur_page, targetPagePtr, XLOG_BLCKSZ, sendSeg->ws_tli, /* Pass the current TLI because only @@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); @@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* walsender's openSegment callback for WALRead */ +/* XLogReaderRoutine->segment_open callback */ static int -WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, +WalSndSegmentOpen(XLogReaderState *state, + XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { char path[MAXPGPATH]; @@ -2531,6 +2537,12 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; + static XLogReaderState fake_xlogreader = + { + /* Fake xlogreader state for WALRead */ + .routine.segment_open = WalSndSegmentOpen, + .routine.segment_close = wal_segment_close + }; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2748,7 +2760,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(&output_message.data[output_message.len], + if (!WALRead(&fake_xlogreader, + &output_message.data[output_message.len], startptr, nbytes, sendSeg->ws_tli, /* Pass the current TLI because only @@ -2756,7 +2769,6 @@ retry: * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); |