diff options
Diffstat (limited to 'src')
17 files changed, 21 insertions, 773 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c5a964ad8ba..c3aecc75746 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -118,11 +118,6 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) return NULL; } -#ifndef FRONTEND - /* Will be loaded on first read */ - state->timelineHistory = NIL; -#endif - return state; } @@ -142,10 +137,6 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); -#ifndef FRONTEND - if (state->timelineHistory) - list_free_deep(state->timelineHistory); -#endif pfree(state->readBuf); pfree(state); } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index f6ca2b95e51..cb4563ed731 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,7 +19,6 @@ #include <unistd.h> -#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -660,7 +659,6 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -676,8 +674,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || - sendTLI != tli) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) { char path[MAXPGPATH]; @@ -704,7 +701,6 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; - sendTLI = tli; } /* Need to seek in the file? */ @@ -753,147 +749,6 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } /* - * Determine XLogReaderState->currTLI and ->currTLIValidUntil; - * XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the - * decision. This may later be used to determine which xlog segment file to - * open, etc. - * - * We switch to an xlog segment from the new timeline eagerly when on a - * historical timeline, as soon as we reach the start of the xlog segment - * containing the timeline switch. The server copied the segment to the new - * timeline so all the data up to the switch point is the same, but there's no - * guarantee the old segment will still exist. It may have been deleted or - * renamed with a .partial suffix so we can't necessarily keep reading from - * the old TLI even though tliSwitchPoint says it's OK. - * - * Because of this, callers MAY NOT assume that currTLI is the timeline that - * will be in a page's xlp_tli; the page may begin on an older timeline or we - * might be reading from historical timeline data on a segment that's been - * copied to a new timeline. - */ -static void -XLogReadDetermineTimeline(XLogReaderState *state) -{ - /* Read the history on first time through */ - if (state->timelineHistory == NIL) - state->timelineHistory = readTimeLineHistory(ThisTimeLineID); - - /* - * Are we reading the record immediately following the one we read last - * time? If not, then don't use the cached timeline info. - */ - if (state->currRecPtr != state->EndRecPtr) - { - state->currTLI = 0; - state->currTLIValidUntil = InvalidXLogRecPtr; - } - - /* - * Are we reading a timeline that used to be the latest one, but became - * historical? This can happen in a replica that gets promoted, and in a - * cascading replica whose upstream gets promoted. In either case, - * re-read the timeline history data. We cannot read past the timeline - * switch point, because either the records in the old timeline might be - * invalid, or worse, they may valid but *different* from the ones we - * should be reading. - */ - if (state->currTLIValidUntil == InvalidXLogRecPtr && - state->currTLI != ThisTimeLineID && - state->currTLI != 0) - { - /* re-read timeline history */ - list_free_deep(state->timelineHistory); - state->timelineHistory = readTimeLineHistory(ThisTimeLineID); - - elog(DEBUG2, "timeline %u became historical during decoding", - state->currTLI); - - /* then invalidate the cached timeline info */ - state->currTLI = 0; - state->currTLIValidUntil = InvalidXLogRecPtr; - } - - /* - * Are we reading a record immediately following a timeline switch? If - * so, we must follow the switch too. - */ - if (state->currRecPtr == state->EndRecPtr && - state->currTLI != 0 && - state->currTLIValidUntil != InvalidXLogRecPtr && - state->currRecPtr >= state->currTLIValidUntil) - { - elog(DEBUG2, - "requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline", - (uint32) (state->currRecPtr >> 32), - (uint32) state->currRecPtr, - state->currTLI, - (uint32) (state->currTLIValidUntil >> 32), - (uint32) (state->currTLIValidUntil)); - - /* invalidate TLI info so we look up the next TLI */ - state->currTLI = 0; - state->currTLIValidUntil = InvalidXLogRecPtr; - } - - if (state->currTLI == 0) - { - /* - * Something changed; work out what timeline this record is on. We - * might read it from the segment on this TLI or, if the segment is - * also contained by newer timelines, the copy from a newer TLI. - */ - state->currTLI = tliOfPointInHistory(state->currRecPtr, - state->timelineHistory); - - /* - * Look for the most recent timeline that's on the same xlog segment - * as this record, since that's the only one we can assume is still - * readable. - */ - while (state->currTLI != ThisTimeLineID && - state->currTLIValidUntil == InvalidXLogRecPtr) - { - XLogRecPtr tliSwitch; - TimeLineID nextTLI; - - CHECK_FOR_INTERRUPTS(); - - tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, - &nextTLI); - - /* round ValidUntil down to start of seg containing the switch */ - state->currTLIValidUntil = - ((tliSwitch / XLogSegSize) * XLogSegSize); - - if (state->currRecPtr >= state->currTLIValidUntil) - { - /* - * The new currTLI ends on this WAL segment so check the next - * TLI to see if it's the last one on the segment. - * - * If that's the current TLI we'll stop searching. - */ - state->currTLI = nextTLI; - state->currTLIValidUntil = InvalidXLogRecPtr; - } - } - - /* - * We're now either reading from the first xlog segment in the current - * server's timeline or the most recent historical timeline that - * exists on the target segment. - */ - elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u", - (uint32) (state->currRecPtr >> 32), - (uint32) state->currRecPtr, - state->currTLI, - (uint32) (state->currTLIValidUntil >> 32), - (uint32) (state->currTLIValidUntil), - ThisTimeLineID); - } -} - -/* * read_page callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another @@ -914,65 +769,28 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; - - /* Make sure enough xlog is available... */ while (1) { /* - * Check which timeline to get the record from. - * - * We have to do it each time through the loop because if we're in - * recovery as a cascading standby, the current timeline might've - * become historical. + * TODO: we're going to have to do something more intelligent about + * timelines on standbys. Use readTimeLineHistory() and + * tliOfPointInHistory() to get the proper LSN? For now we'll catch + * that case earlier, but the code and TODO is left in here for when + * that changes. */ - XLogReadDetermineTimeline(state); - - if (state->currTLI == ThisTimeLineID) + if (!RecoveryInProgress()) { - /* - * We're reading from the current timeline so we might have to - * wait for the desired record to be generated (or, for a standby, - * received & replayed) - */ - if (!RecoveryInProgress()) - { - *pageTLI = ThisTimeLineID; - read_upto = GetFlushRecPtr(); - } - else - read_upto = GetXLogReplayRecPtr(pageTLI); - - if (loc <= read_upto) - break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + *pageTLI = ThisTimeLineID; + read_upto = GetFlushRecPtr(); } else - { - /* - * We're on a historical timeline, so limit reading to the switch - * point where we moved to the next timeline. - * - * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know - * about the new timeline, so we must've received past the end of - * it. - */ - read_upto = state->currTLIValidUntil; - - /* - * Setting pageTLI to our wanted record's TLI is slightly wrong; - * the page might begin on an older timeline if it contains a - * timeline switch, since its xlog segment will have been copied - * from the prior timeline. This is pretty harmless though, as - * nothing cares so long as the timeline doesn't go backwards. We - * should read the page header instead; FIXME someday. - */ - *pageTLI = state->currTLI; - - /* No need to wait on a historical timeline */ + read_upto = GetXLogReplayRecPtr(pageTLI); + + if (loc <= read_upto) break; - } + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } if (targetPagePtr + XLOG_BLCKSZ <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 99112ac1b4d..4e4c8cdaeb2 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -234,6 +234,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; + /* compute the current end-of-wal */ + if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -273,12 +279,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); - if (!RecoveryInProgress()) - end_of_wal = GetFlushRecPtr(); - else - end_of_wal = GetXLogReplayRecPtr(NULL); - - /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 300747dbf76..deaa7f5128b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -27,10 +27,6 @@ #include "access/xlogrecord.h" -#ifndef FRONTEND -#include "nodes/pg_list.h" -#endif - typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -164,25 +160,11 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; - /* timeline to read it from, 0 if a lookup is required */ - TimeLineID currTLI; - /* - * Safe point to read to in currTLI. If currTLI is historical, then this - * is set to the end of the last whole segment that contains that TLI; - * if currTLI is ThisTimeLineID, this is InvalidXLogRecPtr. This is *not* - * the tliSwitchPoint. - */ - XLogRecPtr currTLIValidUntil; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; uint32 readRecordBufSize; -#ifndef FRONTEND - /* cached timeline history, only available in backend */ - List *timelineHistory; -#endif - /* Buffer to hold error message */ char *errormsg_buf; }; diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index d086163fabc..9b935521f47 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -14,7 +14,6 @@ SUBDIRS = \ test_parser \ test_rls_hooks \ test_shm_mq \ - test_slot_timelines \ worker_spi all: submake-errcodes diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore deleted file mode 100644 index 543c50d1bbd..00000000000 --- a/src/test/modules/test_slot_timelines/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -results/ -tmp_check/ -log/ diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile deleted file mode 100644 index 21757c5bcb7..00000000000 --- a/src/test/modules/test_slot_timelines/Makefile +++ /dev/null @@ -1,22 +0,0 @@ -# src/test/modules/test_slot_timelines/Makefile - -MODULES = test_slot_timelines -PGFILEDESC = "test_slot_timelines - test utility for slot timeline following" - -EXTENSION = test_slot_timelines -DATA = test_slot_timelines--1.0.sql - -EXTRA_INSTALL=contrib/test_decoding -REGRESS=load_extension -REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -subdir = src/test/modules/test_slot_timelines -top_builddir = ../../../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README deleted file mode 100644 index 585f02f1e6e..00000000000 --- a/src/test/modules/test_slot_timelines/README +++ /dev/null @@ -1,19 +0,0 @@ -A test module for logical decoding failover and timeline following. - -This module provides a minimal way to maintain logical slots on replicas -that mirror the state on the master. It doesn't make decoding possible, -just tracking slot state so that a decoding client that's using the master -can follow a physical failover to the standby. The master doesn't know -about the slots on the standby, they're synced by a client that connects -to both. - -This is intentionally not part of the test_decoding module because that's meant -to serve as example code, where this module exercises internal server features -by unsafely exposing internal state to SQL. It's not the right way to do -failover, it's just a simple way to test it from the perl TAP framework to -prove the feature works. - -In a practical implementation of this approach a bgworker on the master would -monitor slot positions and relay them to a bgworker on the standby that applies -the position updates without exposing slot internals to SQL. That's too complex -for this test framework though. diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out deleted file mode 100644 index 7c2ad9d7c19..00000000000 --- a/src/test/modules/test_slot_timelines/expected/load_extension.out +++ /dev/null @@ -1,19 +0,0 @@ -CREATE EXTENSION test_slot_timelines; -SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); - test_slot_timelines_create_logical_slot ------------------------------------------ - -(1 row) - -SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location()); - test_slot_timelines_advance_logical_slot ------------------------------------------- - -(1 row) - -SELECT pg_drop_replication_slot('test_slot'); - pg_drop_replication_slot --------------------------- - -(1 row) - diff --git a/src/test/modules/test_slot_timelines/expected/load_extension_1.out b/src/test/modules/test_slot_timelines/expected/load_extension_1.out deleted file mode 100644 index 0db21e46d90..00000000000 --- a/src/test/modules/test_slot_timelines/expected/load_extension_1.out +++ /dev/null @@ -1,7 +0,0 @@ -CREATE EXTENSION test_slot_timelines; -SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); -ERROR: replication slots can only be used if max_replication_slots > 0 -SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location()); -ERROR: replication slots can only be used if max_replication_slots > 0 -SELECT pg_drop_replication_slot('test_slot'); -ERROR: replication slots can only be used if max_replication_slots > 0 diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql deleted file mode 100644 index 24403552464..00000000000 --- a/src/test/modules/test_slot_timelines/sql/load_extension.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE EXTENSION test_slot_timelines; - -SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); - -SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current()::text::xid, txid_current()::text::xid, pg_current_xlog_location(), pg_current_xlog_location()); - -SELECT pg_drop_replication_slot('test_slot'); diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql deleted file mode 100644 index a1886f732f1..00000000000 --- a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql +++ /dev/null @@ -1,16 +0,0 @@ --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit - -CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text) -RETURNS void -STRICT LANGUAGE c AS 'MODULE_PATHNAME'; - -COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text) -IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.'; - -CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin xid, new_catalog_xmin xid, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn) -RETURNS void -STRICT LANGUAGE c AS 'MODULE_PATHNAME'; - -COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, xid, xid, pg_lsn, pg_lsn) -IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.'; diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c deleted file mode 100644 index 1f074881d2f..00000000000 --- a/src/test/modules/test_slot_timelines/test_slot_timelines.c +++ /dev/null @@ -1,133 +0,0 @@ -/*-------------------------------------------------------------------------- - * - * test_slot_timelines.c - * Test harness code for slot timeline following - * - * Copyright (c) 2016, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/test/modules/test_slot_timelines/test_slot_timelines.c - * - * ------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "access/transam.h" -#include "fmgr.h" -#include "miscadmin.h" -#include "replication/slot.h" -#include "utils/builtins.h" -#include "utils/pg_lsn.h" - -PG_MODULE_MAGIC; - -PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot); -PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot); - -static void clear_slot_transient_state(void); - -/* - * Create a new logical slot, with invalid LSN and xid, directly. This does not - * use the snapshot builder or logical decoding machinery. It's only intended - * for creating a slot on a replica that mirrors the state of a slot on an - * upstream master. - * - * Note that this is test harness code. You shouldn't expose slot internals - * to SQL like this for any real world usage. See the README. - */ -Datum -test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS) -{ - char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0)); - char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1)); - - CheckSlotRequirements(); - - ReplicationSlotCreate(slotname, true, RS_PERSISTENT); - - /* register the plugin name with the slot */ - StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN); - - /* - * Initialize persistent state to placeholders to be set by - * test_slot_timelines_advance_logical_slot . - */ - MyReplicationSlot->data.xmin = InvalidTransactionId; - MyReplicationSlot->data.catalog_xmin = InvalidTransactionId; - MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr; - MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr; - - clear_slot_transient_state(); - - ReplicationSlotRelease(); - - PG_RETURN_VOID(); -} - -/* - * Set the state of a slot. - * - * This doesn't maintain the non-persistent state at all, - * but since the slot isn't in use that's OK. - * - * There's intentionally no check to prevent slots going backwards - * because they can actually go backwards if the master crashes when - * it hasn't yet flushed slot state to disk then we copy the older - * slot state after recovery. - * - * There's no checking done for xmin or catalog xmin either, since - * we can't really do anything useful that accounts for xid wrap-around. - * - * Note that this is test harness code. You shouldn't expose slot internals - * to SQL like this for any real world usage. See the README. - */ -Datum -test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS) -{ - char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0)); - TransactionId new_xmin = DatumGetTransactionId(PG_GETARG_DATUM(1)); - TransactionId new_catalog_xmin = DatumGetTransactionId(PG_GETARG_DATUM(2)); - XLogRecPtr restart_lsn = PG_GETARG_LSN(3); - XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4); - - CheckSlotRequirements(); - - ReplicationSlotAcquire(slotname); - - if (MyReplicationSlot->data.database != MyDatabaseId) - elog(ERROR, "trying to update a slot on a different database"); - - MyReplicationSlot->data.xmin = new_xmin; - MyReplicationSlot->data.catalog_xmin = new_catalog_xmin; - MyReplicationSlot->data.restart_lsn = restart_lsn; - MyReplicationSlot->data.confirmed_flush = confirmed_lsn; - - clear_slot_transient_state(); - - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); - - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); - - PG_RETURN_VOID(); -} - -static void -clear_slot_transient_state(void) -{ - Assert(MyReplicationSlot != NULL); - - /* - * Make sure the slot state is the same as if it were newly loaded from - * disk on recovery. - */ - MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; - MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; - - MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId; - MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr; - MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; - MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; -} diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf deleted file mode 100644 index 56b46d7e275..00000000000 --- a/src/test/modules/test_slot_timelines/test_slot_timelines.conf +++ /dev/null @@ -1,2 +0,0 @@ -max_replication_slots=2 -wal_level=logical diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control deleted file mode 100644 index dcee1a72576..00000000000 --- a/src/test/modules/test_slot_timelines/test_slot_timelines.control +++ /dev/null @@ -1,5 +0,0 @@ -# test_slot_timelines extension -comment = 'Test utility for slot timeline following and logical decoding' -default_version = '1.0' -module_pathname = '$libdir/test_slot_timelines' -relocatable = true diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 78570dd1566..929071909ad 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,8 +9,6 @@ # #------------------------------------------------------------------------- -EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines - subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl deleted file mode 100644 index a3a4b6150ae..00000000000 --- a/src/test/recovery/t/006_logical_decoding_timelines.pl +++ /dev/null @@ -1,307 +0,0 @@ -# Demonstrate that logical can follow timeline switches. -# -# Logical replication slots can follow timeline switches but it's -# normally not possible to have a logical slot on a replica where -# promotion and a timeline switch can occur. The only ways -# we can create that circumstance are: -# -# * By doing a filesystem-level copy of the DB, since pg_basebackup -# excludes pg_replslot but we can copy it directly; or -# -# * by creating a slot directly at the C level on the replica and -# advancing it as we go using the low level APIs. It can't be done -# from SQL since logical decoding isn't allowed on replicas. -# -# This module uses the first approach to show that timeline following -# on a logical slot works. -# -use strict; -use warnings; - -use PostgresNode; -use TestLib; -use Test::More tests => 20; -use RecursiveCopy; -use File::Copy; - -my ($stdout, $stderr, $ret); - -# Initialize master node -my $node_master = get_new_node('master'); -$node_master->init(allows_streaming => 1, has_archiving => 1); -$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); -$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); -$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); -$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); -$node_master->dump_info; -$node_master->start; - -diag "Testing logical timeline following with a filesystem-level copy"; - -$node_master->safe_psql('postgres', -"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" -); -$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); -$node_master->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('beforebb');"); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); - -my $backup_name = 'b1'; -$node_master->backup_fs_hot($backup_name); - -my $node_replica = get_new_node('replica'); -$node_replica->init_from_backup( - $node_master, $backup_name, - has_streaming => 1, - has_restoring => 1); -$node_replica->start; - -$node_master->safe_psql('postgres', -"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" -); -$node_master->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('afterbb');"); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); - -# Verify that only the before base_backup slot is on the replica -$stdout = $node_replica->safe_psql('postgres', - 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); -is($stdout, 'before_basebackup', - 'Expected to find only slot before_basebackup on replica'); - -# Boom, crash -$node_master->stop('immediate'); - -$node_replica->promote; -$node_replica->poll_query_until('postgres', - "SELECT NOT pg_is_in_recovery();"); - -$node_replica->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('after failover');"); - -# Shouldn't be able to read from slot created after base backup -($ret, $stdout, $stderr) = $node_replica->psql('postgres', -"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" -); -is($ret, 3, 'replaying from after_basebackup slot fails'); -like( - $stderr, - qr/replication slot "after_basebackup" does not exist/, - 'after_basebackup slot missing'); - -# Should be able to read from slot created before base backup -($ret, $stdout, $stderr) = $node_replica->psql( - 'postgres', -"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", - timeout => 30); -is($ret, 0, 'replay from slot before_basebackup succeeds'); -is( $stdout, q(BEGIN -table public.decoding: INSERT: blah[text]:'beforebb' -COMMIT -BEGIN -table public.decoding: INSERT: blah[text]:'afterbb' -COMMIT -BEGIN -table public.decoding: INSERT: blah[text]:'after failover' -COMMIT), 'decoded expected data from slot before_basebackup'); -is($stderr, '', 'replay from slot before_basebackup produces no stderr'); - -# We don't need the standby anymore -$node_replica->teardown_node(); - - -# OK, time to try the same thing again, but this time we'll be using slot -# mirroring on the standby and a pg_basebackup of the master. - -diag "Testing logical timeline following with test_slot_timelines module"; - -$node_master->start(); - -# Clean up after the last test -$node_master->safe_psql('postgres', 'DELETE FROM decoding;'); -is( $node_master->psql( - 'postgres', -'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'), - 0, - 'dropping slots succeeds via pg_drop_replication_slot'); - -# Same as before, we'll make one slot before basebackup, one after. This time -# the basebackup will be with pg_basebackup so it'll omit both slots, then -# we'll use SQL functions provided by the test_slot_timelines test module to sync -# them to the replica, do some work, sync them and fail over then test again. -# This time we should have both the before- and after-basebackup slots working. - -is( $node_master->psql( - 'postgres', -"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" - ), - 0, - 'creating slot before_basebackup succeeds'); - -$node_master->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('beforebb');"); - -$backup_name = 'b2'; -$node_master->backup($backup_name); - -is( $node_master->psql( - 'postgres', -"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" - ), - 0, - 'creating slot after_basebackup succeeds'); - -$node_master->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('afterbb');"); - -$node_replica = get_new_node('replica2'); -$node_replica->init_from_backup( - $node_master, $backup_name, - has_streaming => 1, - has_restoring => 1); - -$node_replica->start; - -# Verify the slots are both absent on the replica -$stdout = $node_replica->safe_psql('postgres', - 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); -is($stdout, '', 'No slots exist on the replica'); - -# Now do our magic to sync the slot states across. Normally -# this would be being done continuously by a bgworker but -# we're just doing it by hand for this test. This is exposing -# postgres innards to SQL so it's unsafe except for testing. -$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;'); - -my $slotinfo = $node_master->safe_psql( - 'postgres', - qq{SELECT slot_name, plugin, - COALESCE(xmin, '0'), catalog_xmin, - restart_lsn, confirmed_flush_lsn - FROM pg_replication_slots ORDER BY slot_name} -); -diag "Copying slots to replica"; -open my $fh, '<', \$slotinfo or die $!; -while (<$fh>) -{ - print $_; - chomp $_; - my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn, - $confirmed_flush_lsn) - = map { "'$_'" } split qr/\|/, $_; - - print -"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n"; - $node_replica->safe_psql('postgres', - "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);" - ); - $node_replica->safe_psql('postgres', -"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);" - ); -} -close $fh or die $!; - -# Now both slots are present on the replica and exactly match the master -$stdout = $node_replica->safe_psql('postgres', - 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); -is( $stdout, - "after_basebackup\nbefore_basebackup", - 'both slots now exist on replica'); - -$stdout = $node_replica->safe_psql( - 'postgres', - qq{SELECT slot_name, plugin, COALESCE(xmin, '0'), catalog_xmin, - restart_lsn, confirmed_flush_lsn - FROM pg_replication_slots - ORDER BY slot_name}); -is($stdout, $slotinfo, - "slot data read back from replica matches slot data on master"); - -# We now have to copy some extra WAL to satisfy the requirements of the oldest -# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots -# so we have to help out. We know the WAL is still retained on the master -# because we haven't advanced the slots there. -# -# Figure out what the oldest segment we need is by looking at the restart_lsn -# of the oldest slot. -# -# It only makes sense to do this once the slots are created on the replica, -# otherwise it might just delete the segments again. - -my $oldest_needed_segment = $node_master->safe_psql( - 'postgres', - qq{SELECT pg_xlogfile_name(( - SELECT restart_lsn - FROM pg_replication_slots - ORDER BY restart_lsn ASC - LIMIT 1 - ));} -); - -diag "oldest needed xlog seg is $oldest_needed_segment "; - -# WAL segment names sort lexically so we can just grab everything > than this -# segment. -opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!; -while (my $seg = readdir $pg_xlog) -{ - next if $seg eq '.' or $seg eq '..'; - next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/; - diag "copying xlog seg $seg"; - copy( - $node_master->data_dir . "/pg_xlog/" . $seg, - $node_replica->data_dir . "/pg_xlog/" . $seg - ) or die "copy of xlog seg $seg failed: $!"; -} -closedir $pg_xlog; - -# Boom, crash the master -$node_master->stop('immediate'); - -$node_replica->promote; -$node_replica->poll_query_until('postgres', - "SELECT NOT pg_is_in_recovery();"); - -$node_replica->safe_psql('postgres', - "INSERT INTO decoding(blah) VALUES ('after failover');"); - -# This time we can read from both slots -($ret, $stdout, $stderr) = $node_replica->psql( - 'postgres', -"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", - timeout => 30); -is($ret, 0, 'replay from slot after_basebackup succeeds'); -is( $stdout, q(BEGIN -table public.decoding: INSERT: blah[text]:'afterbb' -COMMIT -BEGIN -table public.decoding: INSERT: blah[text]:'after failover' -COMMIT), 'decoded expected data from slot after_basebackup'); -is($stderr, '', 'replay from slot after_basebackup produces no stderr'); - -# Should be able to read from slot created before base backup -# -# This would fail with an error about missing WAL segments if we hadn't -# copied extra WAL earlier. -($ret, $stdout, $stderr) = $node_replica->psql( - 'postgres', -"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", - timeout => 30); -is($ret, 0, 'replay from slot before_basebackup succeeds'); -is( $stdout, q(BEGIN -table public.decoding: INSERT: blah[text]:'beforebb' -COMMIT -BEGIN -table public.decoding: INSERT: blah[text]:'afterbb' -COMMIT -BEGIN -table public.decoding: INSERT: blah[text]:'after failover' -COMMIT), 'decoded expected data from slot before_basebackup'); -is($stderr, '', 'replay from slot before_basebackup produces no stderr'); - -($ret, $stdout, $stderr) = $node_replica->psql('postgres', - 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'); -is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot'); -is($stderr, '', 'dropping slots produces no stderr output'); - -1; |