aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logicalfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logicalfuncs.c')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c18
1 files changed, 17 insertions, 1 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc127d0..3853ab4cf5f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,7 +115,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
{
return read_local_xlog_page(state, targetPagePtr, reqLen,
- targetRecPtr, cur_page, pageTLI);
+ targetRecPtr, cur_page, pageTLI);
}
/*
@@ -241,6 +241,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
PG_TRY();
{
+ /*
+ * Passing InvalidXLogRecPtr here causes replay to start at the slot's
+ * confirmed_flush.
+ */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
logical_read_local_xlog_page,
@@ -263,6 +267,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx->output_writer_private = p;
+ /*
+ * We start reading xlog from the restart lsn, even though in
+ * CreateDecodingContext we set the snapshot builder up using the
+ * slot's confirmed_flush. This means we might read xlog we don't
+ * actually decode rows from, but the snapshot builder might need it
+ * to get to a consistent point. The point we start returning data to
+ * *users* at is the candidate restart lsn from the decoding context.
+ */
startptr = MyReplicationSlot->data.restart_lsn;
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
@@ -280,6 +292,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (errm)
elog(ERROR, "%s", errm);
+ /*
+ * Now that we've set up the xlog reader state, subsequent calls
+ * pass InvalidXLogRecPtr to say "continue from last record"
+ */
startptr = InvalidXLogRecPtr;
/*