aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logicalfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logicalfuncs.c')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 3853ab4cf5f..dd6cd62ccd1 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
@@ -273,7 +267,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* slot's confirmed_flush. This means we might read xlog we don't
* actually decode rows from, but the snapshot builder might need it
* to get to a consistent point. The point we start returning data to
- * *users* at is the candidate restart lsn from the decoding context.
+ * *users* at is the confirmed_flush lsn set up in the decoding
+ * context.
*/
startptr = MyReplicationSlot->data.restart_lsn;
@@ -282,8 +277,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ if (!RecoveryInProgress())
+ end_of_wal = GetFlushRecPtr();
+ else
+ end_of_wal = GetXLogReplayRecPtr(NULL);
+
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
XLogRecord *record;
char *errm = NULL;