diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 8 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 11 |
2 files changed, 13 insertions, 6 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 41c50005d7f..c251b92f57b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ + /* + * Compute the current end-of-wal and maintain ThisTimeLineID. + * RecoveryInProgress() will update ThisTimeLineID on promotion. + */ if (!RecoveryInProgress()) end_of_wal = GetFlushRecPtr(); else - end_of_wal = GetXLogReplayRecPtr(NULL); + end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); ReplicationSlotAcquire(NameStr(*name)); @@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f6b828336f..75617709ecf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,6 +48,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. |