aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2016-03-30 20:07:05 -0300
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2016-03-30 20:07:05 -0300
commit24c5f1a103ce6656a5cb430d9a996c34e61ab2a5 (patch)
treec55425aba03671efba7b59497f9fad55d021411a
parent3b02ea4f0780ccce7dc116010201dad7ee50a401 (diff)
downloadpostgresql-24c5f1a103ce6656a5cb430d9a996c34e61ab2a5.tar.gz
postgresql-24c5f1a103ce6656a5cb430d9a996c34e61ab2a5.zip
Enable logical slots to follow timeline switches
When decoding from a logical slot, it's necessary for xlog reading to be able to read xlog from historical (i.e. not current) timelines; otherwise, decoding fails after failover, because the archives are in the historical timeline. This is required to make "failover logical slots" possible; it currently has no other use, although theoretically it could be used by an extension that creates a slot on a standby and continues to replay from the slot when the standby is promoted. This commit includes a module in src/test/modules with functions to manipulate the slots (which is not otherwise possible in SQL code) in order to enable testing, and a new test in src/test/recovery to ensure that the behavior is as expected. Author: Craig Ringer Reviewed-By: Oleksii Kliukin, Andres Freund, Petr JelĂ­nek
-rw-r--r--src/backend/access/transam/xlogreader.c9
-rw-r--r--src/backend/access/transam/xlogutils.c244
-rw-r--r--src/backend/replication/logical/logicalfuncs.c17
-rw-r--r--src/include/access/xlogreader.h18
-rw-r--r--src/test/modules/Makefile1
-rw-r--r--src/test/modules/test_slot_timelines/.gitignore3
-rw-r--r--src/test/modules/test_slot_timelines/Makefile22
-rw-r--r--src/test/modules/test_slot_timelines/README19
-rw-r--r--src/test/modules/test_slot_timelines/expected/load_extension.out19
-rw-r--r--src/test/modules/test_slot_timelines/sql/load_extension.sql7
-rw-r--r--src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql16
-rw-r--r--src/test/modules/test_slot_timelines/test_slot_timelines.c133
-rw-r--r--src/test/modules/test_slot_timelines/test_slot_timelines.conf2
-rw-r--r--src/test/modules/test_slot_timelines/test_slot_timelines.control5
-rw-r--r--src/test/recovery/Makefile2
-rw-r--r--src/test/recovery/t/006_logical_decoding_timelines.pl304
16 files changed, 790 insertions, 31 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 018fdf3d34e..b67da2ee925 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -118,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
return NULL;
}
+#ifndef FRONTEND
+ /* Will be loaded on first read */
+ state->timelineHistory = NIL;
+#endif
+
return state;
}
@@ -137,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
pfree(state->errormsg_buf);
if (state->readRecordBuf)
pfree(state->readRecordBuf);
+#ifndef FRONTEND
+ if (state->timelineHistory)
+ list_free_deep(state->timelineHistory);
+#endif
pfree(state->readBuf);
pfree(state);
}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 2635d80dc0c..f6ca2b95e51 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,6 +19,7 @@
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
@@ -659,6 +660,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
/* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
+ static TimeLineID sendTLI = 0;
static uint32 sendOff = 0;
p = buf;
@@ -674,7 +676,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
startoff = recptr % XLogSegSize;
/* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+ sendTLI != tli)
{
char path[MAXPGPATH];
@@ -701,6 +704,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
path)));
}
sendOff = 0;
+ sendTLI = tli;
}
/* Need to seek in the file? */
@@ -749,6 +753,147 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
}
/*
+ * Determine XLogReaderState->currTLI and ->currTLIValidUntil;
+ * XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
+ * decision. This may later be used to determine which xlog segment file to
+ * open, etc.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch. The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state)
+{
+ /* Read the history on first time through */
+ if (state->timelineHistory == NIL)
+ state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ /*
+ * Are we reading the record immediately following the one we read last
+ * time? If not, then don't use the cached timeline info.
+ */
+ if (state->currRecPtr != state->EndRecPtr)
+ {
+ state->currTLI = 0;
+ state->currTLIValidUntil = InvalidXLogRecPtr;
+ }
+
+ /*
+ * Are we reading a timeline that used to be the latest one, but became
+ * historical? This can happen in a replica that gets promoted, and in a
+ * cascading replica whose upstream gets promoted. In either case,
+ * re-read the timeline history data. We cannot read past the timeline
+ * switch point, because either the records in the old timeline might be
+ * invalid, or worse, they may valid but *different* from the ones we
+ * should be reading.
+ */
+ if (state->currTLIValidUntil == InvalidXLogRecPtr &&
+ state->currTLI != ThisTimeLineID &&
+ state->currTLI != 0)
+ {
+ /* re-read timeline history */
+ list_free_deep(state->timelineHistory);
+ state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+ elog(DEBUG2, "timeline %u became historical during decoding",
+ state->currTLI);
+
+ /* then invalidate the cached timeline info */
+ state->currTLI = 0;
+ state->currTLIValidUntil = InvalidXLogRecPtr;
+ }
+
+ /*
+ * Are we reading a record immediately following a timeline switch? If
+ * so, we must follow the switch too.
+ */
+ if (state->currRecPtr == state->EndRecPtr &&
+ state->currTLI != 0 &&
+ state->currTLIValidUntil != InvalidXLogRecPtr &&
+ state->currRecPtr >= state->currTLIValidUntil)
+ {
+ elog(DEBUG2,
+ "requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline",
+ (uint32) (state->currRecPtr >> 32),
+ (uint32) state->currRecPtr,
+ state->currTLI,
+ (uint32) (state->currTLIValidUntil >> 32),
+ (uint32) (state->currTLIValidUntil));
+
+ /* invalidate TLI info so we look up the next TLI */
+ state->currTLI = 0;
+ state->currTLIValidUntil = InvalidXLogRecPtr;
+ }
+
+ if (state->currTLI == 0)
+ {
+ /*
+ * Something changed; work out what timeline this record is on. We
+ * might read it from the segment on this TLI or, if the segment is
+ * also contained by newer timelines, the copy from a newer TLI.
+ */
+ state->currTLI = tliOfPointInHistory(state->currRecPtr,
+ state->timelineHistory);
+
+ /*
+ * Look for the most recent timeline that's on the same xlog segment
+ * as this record, since that's the only one we can assume is still
+ * readable.
+ */
+ while (state->currTLI != ThisTimeLineID &&
+ state->currTLIValidUntil == InvalidXLogRecPtr)
+ {
+ XLogRecPtr tliSwitch;
+ TimeLineID nextTLI;
+
+ CHECK_FOR_INTERRUPTS();
+
+ tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
+ &nextTLI);
+
+ /* round ValidUntil down to start of seg containing the switch */
+ state->currTLIValidUntil =
+ ((tliSwitch / XLogSegSize) * XLogSegSize);
+
+ if (state->currRecPtr >= state->currTLIValidUntil)
+ {
+ /*
+ * The new currTLI ends on this WAL segment so check the next
+ * TLI to see if it's the last one on the segment.
+ *
+ * If that's the current TLI we'll stop searching.
+ */
+ state->currTLI = nextTLI;
+ state->currTLIValidUntil = InvalidXLogRecPtr;
+ }
+ }
+
+ /*
+ * We're now either reading from the first xlog segment in the current
+ * server's timeline or the most recent historical timeline that
+ * exists on the target segment.
+ */
+ elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+ (uint32) (state->currRecPtr >> 32),
+ (uint32) state->currRecPtr,
+ state->currTLI,
+ (uint32) (state->currTLIValidUntil >> 32),
+ (uint32) (state->currTLIValidUntil),
+ ThisTimeLineID);
+ }
+}
+
+/*
* read_page callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
@@ -761,48 +906,101 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
*/
int
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
+ TimeLineID *pageTLI)
{
- XLogRecPtr flushptr,
+ XLogRecPtr read_upto,
loc;
int count;
loc = targetPagePtr + reqLen;
+
+ /* Make sure enough xlog is available... */
while (1)
{
/*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
+ * Check which timeline to get the record from.
+ *
+ * We have to do it each time through the loop because if we're in
+ * recovery as a cascading standby, the current timeline might've
+ * become historical.
*/
- if (!RecoveryInProgress())
+ XLogReadDetermineTimeline(state);
+
+ if (state->currTLI == ThisTimeLineID)
{
- *pageTLI = ThisTimeLineID;
- flushptr = GetFlushRecPtr();
+ /*
+ * We're reading from the current timeline so we might have to
+ * wait for the desired record to be generated (or, for a standby,
+ * received & replayed)
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ read_upto = GetFlushRecPtr();
+ }
+ else
+ read_upto = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= read_upto)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
}
else
- flushptr = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= flushptr)
+ {
+ /*
+ * We're on a historical timeline, so limit reading to the switch
+ * point where we moved to the next timeline.
+ *
+ * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+ * about the new timeline, so we must've received past the end of
+ * it.
+ */
+ read_upto = state->currTLIValidUntil;
+
+ /*
+ * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * the page might begin on an older timeline if it contains a
+ * timeline switch, since its xlog segment will have been copied
+ * from the prior timeline. This is pretty harmless though, as
+ * nothing cares so long as the timeline doesn't go backwards. We
+ * should read the page header instead; FIXME someday.
+ */
+ *pageTLI = state->currTLI;
+
+ /* No need to wait on a historical timeline */
break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
+ }
}
- /* more than one block available */
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+ if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+ {
+ /*
+ * more than one block available; read only that block, have caller
+ * come back if they need more.
+ */
count = XLOG_BLCKSZ;
- /* not enough data there */
- else if (targetPagePtr + reqLen > flushptr)
+ }
+ else if (targetPagePtr + reqLen > read_upto)
+ {
+ /* not enough data there */
return -1;
- /* part of the page available */
+ }
else
- count = flushptr - targetPagePtr;
+ {
+ /* enough bytes available to satisfy the request */
+ count = read_upto - targetPagePtr;
+ }
+ /*
+ * Even though we just determined how much of the page can be validly read
+ * as 'count', read the whole page anyway. It's guaranteed to be
+ * zero-padded up to the page boundary if it's incomplete.
+ */
XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+ /* number of valid bytes in the buffer */
return count;
}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 3853ab4cf5f..dd6cd62ccd1 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
rsinfo->setResult = p->tupstore;
rsinfo->setDesc = p->tupdesc;
- /* compute the current end-of-wal */
- if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
- else
- end_of_wal = GetXLogReplayRecPtr(NULL);
-
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
@@ -273,7 +267,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* slot's confirmed_flush. This means we might read xlog we don't
* actually decode rows from, but the snapshot builder might need it
* to get to a consistent point. The point we start returning data to
- * *users* at is the candidate restart lsn from the decoding context.
+ * *users* at is the confirmed_flush lsn set up in the decoding
+ * context.
*/
startptr = MyReplicationSlot->data.restart_lsn;
@@ -282,8 +277,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
+ if (!RecoveryInProgress())
+ end_of_wal = GetFlushRecPtr();
+ else
+ end_of_wal = GetXLogReplayRecPtr(NULL);
+
+ /* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
XLogRecord *record;
char *errm = NULL;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5128b..300747dbf76 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
#include "access/xlogrecord.h"
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
@@ -160,11 +164,25 @@ struct XLogReaderState
/* beginning of the WAL record being read. */
XLogRecPtr currRecPtr;
+ /* timeline to read it from, 0 if a lookup is required */
+ TimeLineID currTLI;
+ /*
+ * Safe point to read to in currTLI. If currTLI is historical, then this
+ * is set to the end of the last whole segment that contains that TLI;
+ * if currTLI is ThisTimeLineID, this is InvalidXLogRecPtr. This is *not*
+ * the tliSwitchPoint.
+ */
+ XLogRecPtr currTLIValidUntil;
/* Buffer for current ReadRecord result (expandable) */
char *readRecordBuf;
uint32 readRecordBufSize;
+#ifndef FRONTEND
+ /* cached timeline history, only available in backend */
+ List *timelineHistory;
+#endif
+
/* Buffer to hold error message */
char *errormsg_buf;
};
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6167ec13446..ebdcdc8c2ae 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -13,6 +13,7 @@ SUBDIRS = \
test_parser \
test_rls_hooks \
test_shm_mq \
+ test_slot_timelines \
worker_spi
all: submake-errcodes
diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore
new file mode 100644
index 00000000000..543c50d1bbd
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile
new file mode 100644
index 00000000000..21757c5bcb7
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/test_slot_timelines/Makefile
+
+MODULES = test_slot_timelines
+PGFILEDESC = "test_slot_timelines - test utility for slot timeline following"
+
+EXTENSION = test_slot_timelines
+DATA = test_slot_timelines--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_slot_timelines
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README
new file mode 100644
index 00000000000..585f02f1e6e
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out
new file mode 100644
index 00000000000..14a414aa7ed
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION test_slot_timelines;
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+ test_slot_timelines_create_logical_slot
+-----------------------------------------
+
+(1 row)
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+ test_slot_timelines_advance_logical_slot
+------------------------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql
new file mode 100644
index 00000000000..a71127d4716
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION test_slot_timelines;
+
+SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
new file mode 100644
index 00000000000..31d7f8ef1c1
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c
new file mode 100644
index 00000000000..74dd1a041b0
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.c
@@ -0,0 +1,133 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_slot_timelines.c
+ * Test harness code for slot timeline following
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_slot_timelines/test_slot_timelines.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot);
+PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+ CheckSlotRequirements();
+
+ ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+ /* register the plugin name with the slot */
+ StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+ /*
+ * Initialize persistent state to placeholders to be set by
+ * test_slot_timelines_advance_logical_slot .
+ */
+ MyReplicationSlot->data.xmin = InvalidTransactionId;
+ MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ *
+ * Note that this is test harness code. You shouldn't expose slot internals
+ * to SQL like this for any real world usage. See the README.
+ */
+Datum
+test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+ char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+ TransactionId new_xmin = (TransactionId) PG_GETARG_INT64(1);
+ TransactionId new_catalog_xmin = (TransactionId) PG_GETARG_INT64(2);
+ XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
+ XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
+
+ CheckSlotRequirements();
+
+ ReplicationSlotAcquire(slotname);
+
+ if (MyReplicationSlot->data.database != MyDatabaseId)
+ elog(ERROR, "Trying to update a slot on a different database");
+
+ MyReplicationSlot->data.xmin = new_xmin;
+ MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+ MyReplicationSlot->data.restart_lsn = restart_lsn;
+ MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+ clear_slot_transient_state();
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
+
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+
+ PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ /*
+ * Make sure the slot state is the same as if it were newly loaded from
+ * disk on recovery.
+ */
+ MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+ MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+ MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+ MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+ MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
new file mode 100644
index 00000000000..56b46d7e275
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control
new file mode 100644
index 00000000000..dcee1a72576
--- /dev/null
+++ b/src/test/modules/test_slot_timelines/test_slot_timelines.control
@@ -0,0 +1,5 @@
+# test_slot_timelines extension
+comment = 'Test utility for slot timeline following and logical decoding'
+default_version = '1.0'
+module_pathname = '$libdir/test_slot_timelines'
+relocatable = true
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 929071909ad..78570dd1566 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
#
#-------------------------------------------------------------------------
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines
+
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 00000000000..bc20f405d78
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,304 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+# excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+# advancing it as we go using the low level APIs. It can't be done
+# from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+ 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+ $stderr,
+ qr/replication slot "after_basebackup" does not exist/,
+ 'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with test_slot_timelines module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+ 'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+ 0,
+ 'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the test_slot_timelines test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+ 'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+ ),
+ 0,
+ 'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;');
+my $slotinfo = $node_master->safe_psql('postgres',
+'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name'
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+ print $_;
+ chomp $_;
+ my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+ $confirmed_flush_lsn)
+ = map {
+ if ($_ ne '') { "'$_'" }
+ else { 'NULL' }
+ } split qr/\|/, $_;
+
+ print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+ $node_replica->safe_psql('postgres',
+ "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);"
+ );
+ $node_replica->safe_psql('postgres',
+"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+ );
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+ 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+ "after_basebackup\nbefore_basebackup",
+ 'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+ 'postgres',
+ qq{SELECT slot_name, plugin, xmin, catalog_xmin,
+ restart_lsn, confirmed_flush_lsn
+ FROM pg_replication_slots
+ ORDER BY slot_name});
+is($stdout, $slotinfo,
+ "slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+ 'postgres',
+ qq{SELECT pg_xlogfile_name((
+ SELECT restart_lsn
+ FROM pg_replication_slots
+ ORDER BY restart_lsn ASC
+ LIMIT 1
+ ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+ next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+ diag "copying xlog seg $seg";
+ copy(
+ $node_master->data_dir . "/pg_xlog/" . $seg,
+ $node_replica->data_dir . "/pg_xlog/" . $seg
+ ) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+ "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+ "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+ 'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+ timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+ 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;