diff options
author | Thomas Munro <tmunro@postgresql.org> | 2021-04-08 23:03:23 +1200 |
---|---|---|
committer | Thomas Munro <tmunro@postgresql.org> | 2021-04-08 23:20:42 +1200 |
commit | 323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b (patch) | |
tree | 5290af3834511b9bd1773841b1068e485ba52fe6 /src/backend/replication | |
parent | 5ac9c4307337313bedeafc21dbbab93ba809241c (diff) | |
download | postgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.tar.gz postgresql-323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b.zip |
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a
callback function. Redesign the interface so that it tells the caller
to insert more data with a special return value instead. This API suits
later patches for prefetching, encryption and maybe other future
projects that would otherwise require continually extending the callback
interface.
As incidental cleanup work, move global variables readOff, readLen and
readSegNo inside XlogReaderState.
Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version)
Reviewed-by: Antonin Houska <ah@cybertec.at>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/logical.c | 26 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 13 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 18 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 42 |
4 files changed, 62 insertions, 37 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f6803637bf..4f6e87f18d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->page_read = page_read; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - xl_routine, prepare_write, do_write, + page_read, cleanup_cb, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -476,7 +479,8 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, - do_write, update_progress); + fast_forward, page_read, cleanup_cb, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); + while (XLogReadRecord(ctx->reader, &record, &err) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (err) elog(ERROR, "%s", err); if (!record) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 01d354829b9..8f8c129620f 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); @@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9d36879ed7..7ab0b804e4c 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -153,9 +153,8 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, NULL, NULL, NULL); /* @@ -512,9 +511,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, NULL, NULL, NULL); /* @@ -536,7 +534,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4bf8a18e01e..52fe9aba660 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd) /* create xlogreader for physical replication */ xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - NULL); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); if (!xlogreader) ereport(ERROR, @@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int -logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page) +static bool +logical_read_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; WALReadError errinfo; @@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + XLogReaderSetInputData(state, -1); + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(state, + if (!WALRead(state, WalSndSegmentOpen, wal_segment_close, cur_page, targetPagePtr, XLOG_BLCKSZ, @@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - return count; + XLogReaderSetInputData(state, count); + return true; } /* @@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); xlogreader = logical_decoding_ctx->reader; @@ -2745,7 +2746,7 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(xlogreader, + if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close, &output_message.data[output_message.len], startptr, nbytes, @@ -2843,7 +2844,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader)) + break; + } /* xlog record was invalid */ if (errm != NULL) |