aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/logical.c26
-rw-r--r--src/backend/replication/logical/logicalfuncs.c13
-rw-r--r--src/backend/replication/slotfuncs.c18
-rw-r--r--src/backend/replication/walsender.c42
4 files changed, 62 insertions, 37 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f6803637bf..4f6e87f18d3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
+ ctx->page_read = page_read;
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
@@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- xl_routine, prepare_write, do_write,
+ page_read, cleanup_cb, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -476,7 +479,8 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, xl_routine, prepare_write,
- do_write, update_progress);
+ fast_forward, page_read, cleanup_cb,
+ prepare_write, do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
- record = XLogReadRecord(ctx->reader, &err);
+ while (XLogReadRecord(ctx->reader, &record, &err) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->page_read(ctx->reader))
+ break;
+ }
+
if (err)
elog(ERROR, "%s", err);
if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 01d354829b9..8f8c129620f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
+ read_local_xlog_page,
+ wal_segment_close,
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
@@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
XLogRecord *record;
char *errm = NULL;
- record = XLogReadRecord(ctx->reader, &errm);
+ while (XLogReadRecord(ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->page_read(ctx->reader))
+ break;
+ }
+
if (errm)
elog(ERROR, "%s", errm);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9d36879ed7..7ab0b804e4c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -153,9 +153,8 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
restart_lsn,
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
+ read_local_xlog_page,
+ wal_segment_close,
NULL, NULL, NULL);
/*
@@ -512,9 +511,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
+ read_local_xlog_page,
+ wal_segment_close,
NULL, NULL, NULL);
/*
@@ -536,7 +534,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
- record = XLogReadRecord(ctx->reader, &errm);
+ while (XLogReadRecord(ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->page_read(ctx->reader))
+ break;
+ }
+
if (errm)
elog(ERROR, "%s", errm);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4bf8a18e01e..52fe9aba660 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd)
/* create xlogreader for physical replication */
xlogreader =
- XLogReaderAllocate(wal_segment_size, NULL,
- XL_ROUTINE(.segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
- NULL);
+ XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
if (!xlogreader)
ereport(ERROR,
@@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd)
* which has to do a plain sleep/busy loop, because the walsender's latch gets
* set every time WAL is flushed.
*/
-static int
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page)
+static bool
+logical_read_xlog_page(XLogReaderState *state)
{
+ XLogRecPtr targetPagePtr = state->readPagePtr;
+ int reqLen = state->reqLen;
+ char *cur_page = state->readBuf;
XLogRecPtr flushptr;
int count;
WALReadError errinfo;
@@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
- return -1;
+ {
+ XLogReaderSetInputData(state, -1);
+ return false;
+ }
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */
@@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- if (!WALRead(state,
+ if (!WALRead(state, WalSndSegmentOpen, wal_segment_close,
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
@@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
CheckXLogRemoved(segno, state->seg.ws_tli);
- return count;
+ XLogReaderSetInputData(state, count);
+ return true;
}
/*
@@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- XL_ROUTINE(.page_read = logical_read_xlog_page,
- .segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
+ logical_read_xlog_page,
+ wal_segment_close,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- XL_ROUTINE(.page_read = logical_read_xlog_page,
- .segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
+ logical_read_xlog_page,
+ wal_segment_close,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader;
@@ -2745,7 +2746,7 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(xlogreader,
+ if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close,
&output_message.data[output_message.len],
startptr,
nbytes,
@@ -2843,7 +2844,12 @@ XLogSendLogical(void)
*/
WalSndCaughtUp = false;
- record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+ while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader))
+ break;
+ }
/* xlog record was invalid */
if (errm != NULL)