aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c8
-rw-r--r--src/backend/replication/walsender.c11
2 files changed, 13 insertions, 6 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 41c50005d7f..c251b92f57b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
- /* compute the current end-of-wal */
+ /*
+ * Compute the current end-of-wal and maintain ThisTimeLineID.
+ * RecoveryInProgress() will update ThisTimeLineID on promotion.
+ */
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
else
- end_of_wal = GetXLogReplayRecPtr(NULL);
+ end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
ReplicationSlotAcquire(NameStr(*name));
@@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0f6b828336f..75617709ecf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -48,6 +48,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLogRecPtr flushptr;
int count;
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+ sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+ sendTimeLine = state->currTLI;
+ sendTimeLineValidUpto = state->currTLIValidUntil;
+ sendTimeLineNextTLI = state->nextTLI;
+
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
@@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
- /* setup state for XLogReadPage */
- sendTimeLineIsHistoric = false;
- sendTimeLine = ThisTimeLineID;
-
/*
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.