diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 12 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 11 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 403 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 12 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 159 | ||||
-rw-r--r-- | src/backend/utils/activity/wait_event_names.txt | 1 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 14 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 | ||||
-rw-r--r-- | src/include/replication/slot.h | 5 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 1 | ||||
-rw-r--r-- | src/include/replication/walsender_private.h | 7 | ||||
-rw-r--r-- | src/include/utils/guc_hooks.h | 3 | ||||
-rw-r--r-- | src/test/recovery/t/006_logical_decoding.pl | 3 | ||||
-rw-r--r-- | src/test/recovery/t/040_standby_failover_slots_sync.pl | 244 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
15 files changed, 855 insertions, 23 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index a1ff631e5ed..b4dd5cce75b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -105,6 +105,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -224,6 +225,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to wait_for_wal_lsn. + */ + if (XLogRecPtrIsInvalid(upto_lsn)) + wait_for_wal_lsn = end_of_wal; + else + wait_for_wal_lsn = Min(upto_lsn, end_of_wal); + + WaitForStandbyConfirmation(wait_for_wal_lsn); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index ad0fc6a04b6..5074c8409f7 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -488,6 +488,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) { + /* + * Can get here only if GUC 'standby_slot_names' on the primary server + * was not configured correctly. + */ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("skipping slot synchronization as the received slot sync" @@ -857,6 +861,13 @@ validate_remote_info(WalReceiverConn *wrconn) remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); Assert(!isnull); + /* + * Slot sync is currently not supported on a cascading standby. This is + * because if we allow it, the primary server needs to wait for all the + * cascading standbys, otherwise, logical subscribers can still be ahead + * of one of the cascading standbys which we plan to promote. Thus, to + * avoid this additional complexity, we restrict it for the time being. + */ if (remote_in_recovery) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2614f98ddd2..b8bf98b1822 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,13 +46,17 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -78,6 +82,24 @@ typedef struct ReplicationSlotOnDisk } ReplicationSlotOnDisk; /* + * Struct for the configuration of standby_slot_names. + * + * Note: this must be a flat representation that can be held in a single chunk + * of guc_malloc'd memory, so that it can be stored as the "extra" data for the + * standby_slot_names GUC. + */ +typedef struct +{ + /* Number of slot names in the slot_names[] */ + int nslotnames; + + /* + * slot_names contains 'nslotnames' consecutive null-terminated C strings. + */ + char slot_names[FLEXIBLE_ARRAY_MEMBER]; +} StandbySlotNamesConfigData; + +/* * Lookup table for slot invalidation causes. */ const char *const SlotInvalidationCauses[] = { @@ -115,10 +137,25 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * This GUC lists streaming replication standby server slot names that + * logical WAL sender processes will wait for. + */ +char *standby_slot_names; + +/* This is the parsed and cached configuration for standby_slot_names */ +static StandbySlotNamesConfigData *standby_slot_names_config; + +/* + * Oldest LSN that has been confirmed to be flushed to the standbys + * corresponding to the physical slots specified in the standby_slot_names GUC. + */ +static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; + static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -2345,3 +2382,367 @@ GetSlotInvalidationCause(const char *conflict_reason) Assert(found); return result; } + +/* + * A helper function to validate slots specified in GUC standby_slot_names. + * + * The rawname will be parsed, and the result will be saved into *elemlist. + */ +static bool +validate_standby_slots(char *rawname, List **elemlist) +{ + bool ok; + + /* Verify syntax and parse string into a list of identifiers */ + ok = SplitIdentifierString(rawname, ',', elemlist); + + if (!ok) + { + GUC_check_errdetail("List syntax is invalid."); + } + else if (!ReplicationSlotCtl) + { + /* + * We cannot validate the replication slot if the replication slots' + * data has not been initialized. This is ok as we will anyway + * validate the specified slot when waiting for them to catch up. See + * StandbySlotsHaveCaughtup() for details. + */ + } + else + { + /* Check that the specified slots exist and are logical slots */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + foreach_ptr(char, name, *elemlist) + { + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, false); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", + name); + ok = false; + break; + } + + if (!SlotIsPhysical(slot)) + { + GUC_check_errdetail("\"%s\" is not a physical replication slot", + name); + ok = false; + break; + } + } + + LWLockRelease(ReplicationSlotControlLock); + } + + return ok; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + char *rawname; + char *ptr; + List *elemlist; + int size; + bool ok; + StandbySlotNamesConfigData *config; + + if ((*newval)[0] == '\0') + return true; + + /* Need a modifiable copy of the GUC string */ + rawname = pstrdup(*newval); + + /* Now verify if the specified slots exist and have correct type */ + ok = validate_standby_slots(rawname, &elemlist); + + if (!ok || elemlist == NIL) + { + pfree(rawname); + list_free(elemlist); + return ok; + } + + /* Compute the size required for the StandbySlotNamesConfigData struct */ + size = offsetof(StandbySlotNamesConfigData, slot_names); + foreach_ptr(char, slot_name, elemlist) + size += strlen(slot_name) + 1; + + /* GUC extra value must be guc_malloc'd, not palloc'd */ + config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size); + + /* Transform the data into StandbySlotNamesConfigData */ + config->nslotnames = list_length(elemlist); + + ptr = config->slot_names; + foreach_ptr(char, slot_name, elemlist) + { + strcpy(ptr, slot_name); + ptr += strlen(slot_name) + 1; + } + + *extra = (void *) config; + + pfree(rawname); + list_free(elemlist); + return true; +} + +/* + * GUC assign_hook for standby_slot_names + */ +void +assign_standby_slot_names(const char *newval, void *extra) +{ + /* + * The standby slots may have changed, so we must recompute the oldest + * LSN. + */ + ss_oldest_flush_lsn = InvalidXLogRecPtr; + + standby_slot_names_config = (StandbySlotNamesConfigData *) extra; +} + +/* + * Check if the passed slot_name is specified in the standby_slot_names GUC. + */ +bool +SlotExistsInStandbySlotNames(const char *slot_name) +{ + const char *standby_slot_name; + + /* Return false if there is no value in standby_slot_names */ + if (standby_slot_names_config == NULL) + return false; + + /* + * XXX: We are not expecting this list to be long so a linear search + * shouldn't hurt but if that turns out not to be true then we can cache + * this information for each WalSender as well. + */ + standby_slot_name = standby_slot_names_config->slot_names; + for (int i = 0; i < standby_slot_names_config->nslotnames; i++) + { + if (strcmp(standby_slot_name, slot_name) == 0) + return true; + + standby_slot_name += strlen(standby_slot_name) + 1; + } + + return false; +} + +/* + * Return true if the slots specified in standby_slot_names have caught up to + * the given WAL location, false otherwise. + * + * The elevel parameter specifies the error level used for logging messages + * related to slots that do not exist, are invalidated, or are inactive. + */ +bool +StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) +{ + const char *name; + int caught_up_slot_num = 0; + XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + + /* + * Don't need to wait for the standbys to catch up if there is no value in + * standby_slot_names. + */ + if (standby_slot_names_config == NULL) + return true; + + /* + * Don't need to wait for the standbys to catch up if we are on a standby + * server, since we do not support syncing slots to cascading standbys. + */ + if (RecoveryInProgress()) + return true; + + /* + * Don't need to wait for the standbys to catch up if they are already + * beyond the specified WAL location. + */ + if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) && + ss_oldest_flush_lsn >= wait_for_lsn) + return true; + + /* + * To prevent concurrent slot dropping and creation while filtering the + * slots, take the ReplicationSlotControlLock outside of the loop. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + name = standby_slot_names_config->slot_names; + for (int i = 0; i < standby_slot_names_config->nslotnames; i++) + { + XLogRecPtr restart_lsn; + bool invalidated; + bool inactive; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, false); + + if (!slot) + { + /* + * If a slot name provided in standby_slot_names does not exist, + * report a message and exit the loop. A user can specify a slot + * name that does not exist just before the server startup. The + * GUC check_hook(validate_standby_slots) cannot validate such a + * slot during startup as the ReplicationSlotCtl shared memory is + * not initialized at that time. It is also possible for a user to + * drop the slot in standby_slot_names afterwards. + */ + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" specified in parameter %s does not exist", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider creating the slot \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + break; + } + + if (SlotIsLogical(slot)) + { + /* + * If a logical slot name is provided in standby_slot_names, + * report a message and exit the loop. Similar to the non-existent + * case, a user can specify a logical slot name in + * standby_slot_names before the server startup, or drop an + * existing physical slot and recreate a logical slot with the + * same name. + */ + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot have logical replication slot \"%s\" in parameter %s", + name, "standby_slot_names"), + errdetail("Logical replication is waiting for correction on \"%s\".", + name), + errhint("Consider removing logical slot \"%s\" from parameter %s.", + name, "standby_slot_names")); + break; + } + + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + inactive = slot->active_pid == 0; + SpinLockRelease(&slot->mutex); + + if (invalidated) + { + /* Specified physical slot has been invalidated */ + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical slot \"%s\" specified in parameter %s has been invalidated", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + break; + } + + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn) + { + /* Log a message if no active_pid for this physical slot */ + if (inactive) + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider starting standby associated with \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + + /* Continue if the current slot hasn't caught up. */ + break; + } + + Assert(restart_lsn >= wait_for_lsn); + + if (XLogRecPtrIsInvalid(min_restart_lsn) || + min_restart_lsn > restart_lsn) + min_restart_lsn = restart_lsn; + + caught_up_slot_num++; + + name += strlen(name) + 1; + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Return false if not all the standbys have caught up to the specified + * WAL location. + */ + if (caught_up_slot_num != standby_slot_names_config->nslotnames) + return false; + + /* The ss_oldest_flush_lsn must not retreat. */ + Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) || + min_restart_lsn >= ss_oldest_flush_lsn); + + ss_oldest_flush_lsn = min_restart_lsn; + + return true; +} + +/* + * Wait for physical standbys to confirm receiving the given lsn. + * + * Used by logical decoding SQL functions. It waits for physical standbys + * corresponding to the physical slots specified in the standby_slot_names GUC. + */ +void +WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + /* + * Don't need to wait for the standby to catch up if the current acquired + * slot is not a logical failover slot, or there is no value in + * standby_slot_names. + */ + if (!MyReplicationSlot->data.failover || !standby_slot_names_config) + return; + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Exit if done waiting for every slot. */ + if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING)) + break; + + /* + * Wait for the slots in the standby_slot_names to catch up, but use a + * timeout (1s) so we can also check if the standby_slot_names has + * been changed. + */ + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000, + WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 768a304723b..ad79e1fccd6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * crash, but this makes the data consistent after a clean shutdown. */ ReplicationSlotMarkDirty(); + + /* + * Wake up logical walsenders holding logical failover slots after + * updating the restart_lsn of the physical slot. + */ + PhysicalWakeupLogicalWalSnd(); } return retlsn; @@ -505,6 +511,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) NULL, NULL, NULL); /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + + /* * Start reading at the slot's restart_lsn, which we know to point to * a valid record. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f1047179cb..25edb5e1412 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId } /* + * Wake up the logical walsender processes with logical failover slots if the + * currently acquired physical slot is specified in standby_slot_names GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + /* + * If we are running in a standby, there is no need to wake up walsenders. + * This is because we do not support syncing slots to cascading standbys, + * so, there are no walsenders waiting for standbys to catch up. + */ + if (RecoveryInProgress()) + return; + + if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name))) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); +} + +/* + * Returns true if not all standbys have caught up to the flushed position + * (flushed_lsn) when the current acquired slot is a logical failover + * slot and we are streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event) +{ + int elevel = got_STOPPING ? ERROR : WARNING; + bool failover_slot; + + failover_slot = (replication_active && MyReplicationSlot->data.failover); + + /* + * Note that after receiving the shutdown signal, an ERROR is reported if + * any slots are dropped, invalidated, or inactive. This measure is taken + * to prevent the walsender from waiting indefinitely. + */ + if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel)) + { + *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION; + return true; + } + + *wait_event = 0; + return false; +} + +/* + * Returns true if we need to wait for WALs to be flushed to disk, or if not + * all standbys have caught up to the flushed position (flushed_lsn) when the + * current acquired slot is a logical failover slot and we are + * streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, + uint32 *wait_event) +{ + /* Check if we need to wait for WALs to be flushed to disk */ + if (target_lsn > flushed_lsn) + { + *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + return true; + } + + /* Check if the standby slots have caught up to the flushed position */ + return NeedToWaitForStandbys(flushed_lsn, wait_event); +} + +/* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical failover slot, we also wait for all the + * specified streaming replication standby servers to confirm receipt of WAL + * up to RecentFlushPtr. It is beneficial to wait here for the confirmation + * up to RecentFlushPtr rather than waiting before transmitting each change + * to logical subscribers, which is already covered by RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; /* * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * have enough WAL available and all the standby servers have confirmed + * receipt of WAL up to RecentFlushPtr. This is particularly interesting + * if we're far behind. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) return RecentFlushPtr; /* Get a more recent flush pointer. */ @@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc) else RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * Within the loop, we wait for the necessary WALs to be flushed to disk + * first, followed by waiting for standbys to catch up if there are enough + * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal. + */ for (;;) { + bool wait_for_standby_at_stop = false; long sleeptime; /* Clear any already-pending wakeups */ @@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); - /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) - RecentFlushPtr = GetFlushRecPtr(NULL); - else - RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * To avoid the scenario where standbys need to catch up to a newer + * WAL location in each iteration, we update our idea of the currently + * flushed position only if we are not waiting for standbys to catch + * up. + */ + if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + { + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(NULL); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + } /* - * If postmaster asked us to stop, don't wait anymore. + * If postmaster asked us to stop and the standby slots have caught up + * to the flushed position, don't wait anymore. * * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. */ if (got_STOPPING) - break; + { + if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event)) + wait_for_standby_at_stop = true; + else + break; + } /* * We only send regular messages to the client for full decoded @@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + /* + * Exit the loop if already caught up and doesn't need to wait for + * standby slots. + */ + if (!wait_for_standby_at_stop && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) break; - /* Waiting for new WAL. Since we need to wait, we're now caught up. */ + /* + * Waiting for new WAL or waiting for standbys to catch up. Since we + * need to wait, we're now caught up. + */ WalSndCaughtUp = true; /* @@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + Assert(wait_event != 0); + + WalSndWait(wakeEvents, sleeptime, wait_event); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3535,6 +3649,7 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * until awakened by physical walsenders after the walreceiver confirms + * the receipt of the LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ec2f31f82af..c08e00d1d6a 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server." LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." +WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 45013582a74..d77214795de 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4670,6 +4670,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("Lists streaming replication standby server slot " + "names that logical WAL sender processes will wait for."), + gettext_noop("Logical WAL sender processes will send decoded " + "changes to plugins only after the specified " + "replication slots confirm receiving WAL."), + GUC_LIST_INPUT + }, + &standby_slot_names, + "", + check_standby_slot_names, assign_standby_slot_names, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index edcc0282b2d..2244ee52f79 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -343,6 +343,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsender processes will wait for # - Standby Servers - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index acbf567150e..425effad210 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -226,6 +226,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -274,4 +275,8 @@ extern void CheckSlotPermissions(void); extern ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *conflict_reason); +extern bool SlotExistsInStandbySlotNames(const char *slot_name); +extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel); +extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); + #endif /* SLOT_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 0c3996e9263..f2d8297f016 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -39,6 +39,7 @@ extern void InitWalSender(void); extern bool exec_replication_command(const char *cmd_string); extern void WalSndErrorCleanup(void); extern void WalSndResourceCleanup(bool isCommit); +extern void PhysicalWakeupLogicalWalSnd(void); extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 3113e9ea470..109924ffcdc 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,13 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + /* + * Used by physical walsenders holding slots specified in + * standby_slot_names to wake up logical walsenders holding logical + * failover slots when a walreceiver confirms the receipt of LSN. + */ + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index c8a7aa9a112..d64dc5fcdb0 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -174,5 +174,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); +extern void assign_standby_slot_names(const char *newval, void *extra); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5c7b4ca5e36..b95d95c06f9 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Passing failover=true (last arg) should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 021c58f621c..0ea1f3d323c 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -505,17 +505,257 @@ ok( $standby1->poll_query_until( 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); ################################################## +# Test that logical failover replication slots wait for the specified +# physical replication slots to receive the changes first. It uses the +# following set up: +# +# (physical standbys) +# | ----> standby1 (primary_slot_name = sb1_slot) +# | ----> standby2 (primary_slot_name = sb2_slot) +# primary ----- | +# (logical replication) +# | ----> subscriber1 (failover = true, slot_name = lsub1_slot) +# | ----> subscriber2 (failover = false, slot_name = lsub2_slot) +# +# standby_slot_names = 'sb1_slot' +# +# The setup is configured in such a way that the logical slot of subscriber1 is +# enabled for failover, and thus the subscriber1 will wait for the physical +# slot of standby1(sb1_slot) to catch up before receiving the decoded changes. +################################################## + +$backup_name = 'backup3'; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->backup($backup_name); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Configure primary to disallow any logical slots that have enabled failover +# from getting ahead of the specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->reload; + +# Create another subscriber node without enabling failover, wait for sync to +# complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init; +$subscriber2->start; +$subscriber2->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot); +]); + +$subscriber2->wait_for_subscription_sync; + +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +my $offset = -s $primary->logfile; + +# Stop the standby associated with the specified physical replication slot +# (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive +# changes until the standby comes up. +$standby1->stop; + +# Create some data on the primary +my $primary_row_count = 20; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);"); + +# Wait until the standby2 that's still running gets the data from the primary +$primary->wait_for_replay_catchup($standby2); +$result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for regress_mysub2 to get the data from the primary. This subscription +# was not enabled for failover so it gets the data without waiting for any +# standbys. +$primary->wait_for_catchup('regress_mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# Wait until the primary server logs a warning indicating that it is waiting +# for the sb1_slot to catch up. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/, + $offset); + +# The regress_mysub1 was enabled for failover so it doesn't get the data from +# primary and keeps waiting for the standby specified in standby_slot_names +# (sb1_slot aka standby1). +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) <> $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data from primary until standby1 acknowledges changes" +); + +# Start the standby specified in standby_slot_names (sb1_slot aka standby1) and +# wait for it to catch up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, the +# primary can send the decoded changes to the subscription enabled for failover +# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't +# receive any data from the primary. i.e. the primary didn't allow it to go +# ahead of standby. +$primary->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 acknowledges changes"); + +################################################## +# Verify that when using pg_logical_slot_get_changes to consume changes from a +# logical failover slot, it will also wait for the slots specified in +# standby_slot_names to catch up. +################################################## + +# Stop the standby associated with the specified physical replication slot so +# that the logical replication slot won't receive changes until the standby +# slot's restart_lsn is advanced or the slot is removed from the +# standby_slot_names list. +$primary->safe_psql('postgres', "TRUNCATE tab_int;"); +$primary->wait_for_catchup('regress_mysub1'); +$standby1->stop; + +# Disable the regress_mysub1 to prevent the logical walsender from generating +# more warnings. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'", + 1); + +# Create a logical 'test_decoding' replication slot with failover enabled +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);" +); + +my $back_q = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +# pg_logical_slot_get_changes will be blocked until the standby catches up, +# hence it needs to be executed in a background session. +$offset = -s $primary->logfile; +$back_q->query_until( + qr/logical_slot_get_changes/, q( + \echo logical_slot_get_changes + SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL); +)); + +# Wait until the primary server logs a warning indicating that it is waiting +# for the sb1_slot to catch up. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/, + $offset); + +# Remove the standby from the standby_slot_names list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''"); +$primary->reload; + +# Since there are no slots in standby_slot_names, the function +# pg_logical_slot_get_changes should now return, and the session can be +# stopped. +$back_q->quit; + +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('test_slot');" +); + +# Add the physical slot (sb1_slot) back to the standby_slot_names for further +# tests. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'"); +$primary->reload; + +# Enable the regress_mysub1 for further tests +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +################################################## +# Test that logical replication will wait for the user-created inactive +# physical slot to catch up until we remove the slot from standby_slot_names. +################################################## + +$offset = -s $primary->logfile; + +# Create some data on the primary +$primary_row_count = 10; +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait until the primary server logs a warning indicating that it is waiting +# for the sb1_slot to catch up. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/, + $offset); + +# The regress_mysub1 doesn't get the data from primary because the specified +# standby slot (sb1_slot) in standby_slot_names is inactive. +$result = + $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', + "subscriber1 doesn't get data as the sb1_slot doesn't catch up"); + +# Remove the standby from the standby_slot_names list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''"); +$primary->reload; + +# Since there are no slots in standby_slot_names, the primary server should now +# send the decoded changes to the subscription. +$primary->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', + "subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list" +); + +# Add the physical slot (sb1_slot) back to the standby_slot_names for further +# tests. +$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'"); +$primary->reload; + +################################################## # Promote the standby1 to primary. Confirm that: # a) the slot 'lsub1_slot' is retained on the new primary # b) logical replication for regress_mysub1 is resumed successfully after failover ################################################## +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + $standby1->promote; # Update subscription with the new primary's connection info my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; $subscriber1->safe_psql('postgres', - "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; - ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';"); # Confirm the synced slot 'lsub1_slot' is retained on the new primary is($standby1->safe_psql('postgres', diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cc3611e6068..d3a7f75b080 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2650,6 +2650,7 @@ SplitPoint SplitTextOutputData SplitVar StackElem +StandbySlotNamesConfigData StartDataPtrType StartLOPtrType StartLOsPtrType |