aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c12
-rw-r--r--src/backend/replication/logical/slotsync.c11
-rw-r--r--src/backend/replication/slot.c403
-rw-r--r--src/backend/replication/slotfuncs.c12
-rw-r--r--src/backend/replication/walsender.c159
-rw-r--r--src/backend/utils/activity/wait_event_names.txt1
-rw-r--r--src/backend/utils/misc/guc_tables.c14
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
-rw-r--r--src/include/replication/slot.h5
-rw-r--r--src/include/replication/walsender.h1
-rw-r--r--src/include/replication/walsender_private.h7
-rw-r--r--src/include/utils/guc_hooks.h3
-rw-r--r--src/test/recovery/t/006_logical_decoding.pl3
-rw-r--r--src/test/recovery/t/040_standby_failover_slots_sync.pl244
-rw-r--r--src/tools/pgindent/typedefs.list1
15 files changed, 855 insertions, 23 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index a1ff631e5ed..b4dd5cce75b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -105,6 +105,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
+ XLogRecPtr wait_for_wal_lsn;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
@@ -224,6 +225,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
NameStr(MyReplicationSlot->data.plugin),
format_procedure(fcinfo->flinfo->fn_oid))));
+ /*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to wait_for_wal_lsn.
+ */
+ if (XLogRecPtrIsInvalid(upto_lsn))
+ wait_for_wal_lsn = end_of_wal;
+ else
+ wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
+
+ WaitForStandbyConfirmation(wait_for_wal_lsn);
+
ctx->output_writer_private = p;
/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index ad0fc6a04b6..5074c8409f7 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -488,6 +488,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
{
+ /*
+ * Can get here only if GUC 'standby_slot_names' on the primary server
+ * was not configured correctly.
+ */
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization as the received slot sync"
@@ -857,6 +861,13 @@ validate_remote_info(WalReceiverConn *wrconn)
remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
Assert(!isnull);
+ /*
+ * Slot sync is currently not supported on a cascading standby. This is
+ * because if we allow it, the primary server needs to wait for all the
+ * cascading standbys, otherwise, logical subscribers can still be ahead
+ * of one of the cascading standbys which we plan to promote. Thus, to
+ * avoid this additional complexity, we restrict it for the time being.
+ */
if (remote_in_recovery)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2614f98ddd2..b8bf98b1822 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,13 +46,17 @@
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/varlena.h"
/*
* Replication slot on-disk data structure.
@@ -78,6 +82,24 @@ typedef struct ReplicationSlotOnDisk
} ReplicationSlotOnDisk;
/*
+ * Struct for the configuration of standby_slot_names.
+ *
+ * Note: this must be a flat representation that can be held in a single chunk
+ * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
+ * standby_slot_names GUC.
+ */
+typedef struct
+{
+ /* Number of slot names in the slot_names[] */
+ int nslotnames;
+
+ /*
+ * slot_names contains 'nslotnames' consecutive null-terminated C strings.
+ */
+ char slot_names[FLEXIBLE_ARRAY_MEMBER];
+} StandbySlotNamesConfigData;
+
+/*
* Lookup table for slot invalidation causes.
*/
const char *const SlotInvalidationCauses[] = {
@@ -115,10 +137,25 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
/* My backend's replication slot in the shared memory array */
ReplicationSlot *MyReplicationSlot = NULL;
-/* GUC variable */
+/* GUC variables */
int max_replication_slots = 10; /* the maximum number of replication
* slots */
+/*
+ * This GUC lists streaming replication standby server slot names that
+ * logical WAL sender processes will wait for.
+ */
+char *standby_slot_names;
+
+/* This is the parsed and cached configuration for standby_slot_names */
+static StandbySlotNamesConfigData *standby_slot_names_config;
+
+/*
+ * Oldest LSN that has been confirmed to be flushed to the standbys
+ * corresponding to the physical slots specified in the standby_slot_names GUC.
+ */
+static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
+
static void ReplicationSlotShmemExit(int code, Datum arg);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -2345,3 +2382,367 @@ GetSlotInvalidationCause(const char *conflict_reason)
Assert(found);
return result;
}
+
+/*
+ * A helper function to validate slots specified in GUC standby_slot_names.
+ *
+ * The rawname will be parsed, and the result will be saved into *elemlist.
+ */
+static bool
+validate_standby_slots(char *rawname, List **elemlist)
+{
+ bool ok;
+
+ /* Verify syntax and parse string into a list of identifiers */
+ ok = SplitIdentifierString(rawname, ',', elemlist);
+
+ if (!ok)
+ {
+ GUC_check_errdetail("List syntax is invalid.");
+ }
+ else if (!ReplicationSlotCtl)
+ {
+ /*
+ * We cannot validate the replication slot if the replication slots'
+ * data has not been initialized. This is ok as we will anyway
+ * validate the specified slot when waiting for them to catch up. See
+ * StandbySlotsHaveCaughtup() for details.
+ */
+ }
+ else
+ {
+ /* Check that the specified slots exist and are logical slots */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ foreach_ptr(char, name, *elemlist)
+ {
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, false);
+
+ if (!slot)
+ {
+ GUC_check_errdetail("replication slot \"%s\" does not exist",
+ name);
+ ok = false;
+ break;
+ }
+
+ if (!SlotIsPhysical(slot))
+ {
+ GUC_check_errdetail("\"%s\" is not a physical replication slot",
+ name);
+ ok = false;
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+ }
+
+ return ok;
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+ char *rawname;
+ char *ptr;
+ List *elemlist;
+ int size;
+ bool ok;
+ StandbySlotNamesConfigData *config;
+
+ if ((*newval)[0] == '\0')
+ return true;
+
+ /* Need a modifiable copy of the GUC string */
+ rawname = pstrdup(*newval);
+
+ /* Now verify if the specified slots exist and have correct type */
+ ok = validate_standby_slots(rawname, &elemlist);
+
+ if (!ok || elemlist == NIL)
+ {
+ pfree(rawname);
+ list_free(elemlist);
+ return ok;
+ }
+
+ /* Compute the size required for the StandbySlotNamesConfigData struct */
+ size = offsetof(StandbySlotNamesConfigData, slot_names);
+ foreach_ptr(char, slot_name, elemlist)
+ size += strlen(slot_name) + 1;
+
+ /* GUC extra value must be guc_malloc'd, not palloc'd */
+ config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
+
+ /* Transform the data into StandbySlotNamesConfigData */
+ config->nslotnames = list_length(elemlist);
+
+ ptr = config->slot_names;
+ foreach_ptr(char, slot_name, elemlist)
+ {
+ strcpy(ptr, slot_name);
+ ptr += strlen(slot_name) + 1;
+ }
+
+ *extra = (void *) config;
+
+ pfree(rawname);
+ list_free(elemlist);
+ return true;
+}
+
+/*
+ * GUC assign_hook for standby_slot_names
+ */
+void
+assign_standby_slot_names(const char *newval, void *extra)
+{
+ /*
+ * The standby slots may have changed, so we must recompute the oldest
+ * LSN.
+ */
+ ss_oldest_flush_lsn = InvalidXLogRecPtr;
+
+ standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
+}
+
+/*
+ * Check if the passed slot_name is specified in the standby_slot_names GUC.
+ */
+bool
+SlotExistsInStandbySlotNames(const char *slot_name)
+{
+ const char *standby_slot_name;
+
+ /* Return false if there is no value in standby_slot_names */
+ if (standby_slot_names_config == NULL)
+ return false;
+
+ /*
+ * XXX: We are not expecting this list to be long so a linear search
+ * shouldn't hurt but if that turns out not to be true then we can cache
+ * this information for each WalSender as well.
+ */
+ standby_slot_name = standby_slot_names_config->slot_names;
+ for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
+ {
+ if (strcmp(standby_slot_name, slot_name) == 0)
+ return true;
+
+ standby_slot_name += strlen(standby_slot_name) + 1;
+ }
+
+ return false;
+}
+
+/*
+ * Return true if the slots specified in standby_slot_names have caught up to
+ * the given WAL location, false otherwise.
+ *
+ * The elevel parameter specifies the error level used for logging messages
+ * related to slots that do not exist, are invalidated, or are inactive.
+ */
+bool
+StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
+{
+ const char *name;
+ int caught_up_slot_num = 0;
+ XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
+
+ /*
+ * Don't need to wait for the standbys to catch up if there is no value in
+ * standby_slot_names.
+ */
+ if (standby_slot_names_config == NULL)
+ return true;
+
+ /*
+ * Don't need to wait for the standbys to catch up if we are on a standby
+ * server, since we do not support syncing slots to cascading standbys.
+ */
+ if (RecoveryInProgress())
+ return true;
+
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
+
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ name = standby_slot_names_config->slot_names;
+ for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
+ {
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, false);
+
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in standby_slot_names does not exist,
+ * report a message and exit the loop. A user can specify a slot
+ * name that does not exist just before the server startup. The
+ * GUC check_hook(validate_standby_slots) cannot validate such a
+ * slot during startup as the ReplicationSlotCtl shared memory is
+ * not initialized at that time. It is also possible for a user to
+ * drop the slot in standby_slot_names afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in standby_slot_names,
+ * report a message and exit the loop. Similar to the non-existent
+ * case, a user can specify a logical slot name in
+ * standby_slot_names before the server startup, or drop an
+ * existing physical slot and recreate a logical slot with the
+ * same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != standby_slot_names_config->nslotnames)
+ return false;
+
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
+
+ ss_oldest_flush_lsn = min_restart_lsn;
+
+ return true;
+}
+
+/*
+ * Wait for physical standbys to confirm receiving the given lsn.
+ *
+ * Used by logical decoding SQL functions. It waits for physical standbys
+ * corresponding to the physical slots specified in the standby_slot_names GUC.
+ */
+void
+WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
+{
+ /*
+ * Don't need to wait for the standby to catch up if the current acquired
+ * slot is not a logical failover slot, or there is no value in
+ * standby_slot_names.
+ */
+ if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
+ return;
+
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Exit if done waiting for every slot. */
+ if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
+ break;
+
+ /*
+ * Wait for the slots in the standby_slot_names to catch up, but use a
+ * timeout (1s) so we can also check if the standby_slot_names has
+ * been changed.
+ */
+ ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
+ WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+ }
+
+ ConditionVariableCancelSleep();
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 768a304723b..ad79e1fccd6 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
* crash, but this makes the data consistent after a clean shutdown.
*/
ReplicationSlotMarkDirty();
+
+ /*
+ * Wake up logical walsenders holding logical failover slots after
+ * updating the restart_lsn of the physical slot.
+ */
+ PhysicalWakeupLogicalWalSnd();
}
return retlsn;
@@ -505,6 +511,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
NULL, NULL, NULL);
/*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to moveto lsn.
+ */
+ WaitForStandbyConfirmation(moveto);
+
+ /*
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0f1047179cb..25edb5e1412 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
}
/*
+ * Wake up the logical walsender processes with logical failover slots if the
+ * currently acquired physical slot is specified in standby_slot_names GUC.
+ */
+void
+PhysicalWakeupLogicalWalSnd(void)
+{
+ Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot));
+
+ /*
+ * If we are running in a standby, there is no need to wake up walsenders.
+ * This is because we do not support syncing slots to cascading standbys,
+ * so, there are no walsenders waiting for standbys to catch up.
+ */
+ if (RecoveryInProgress())
+ return;
+
+ if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name)))
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+}
+
+/*
+ * Returns true if not all standbys have caught up to the flushed position
+ * (flushed_lsn) when the current acquired slot is a logical failover
+ * slot and we are streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
+{
+ int elevel = got_STOPPING ? ERROR : WARNING;
+ bool failover_slot;
+
+ failover_slot = (replication_active && MyReplicationSlot->data.failover);
+
+ /*
+ * Note that after receiving the shutdown signal, an ERROR is reported if
+ * any slots are dropped, invalidated, or inactive. This measure is taken
+ * to prevent the walsender from waiting indefinitely.
+ */
+ if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
+ {
+ *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
+ return true;
+ }
+
+ *wait_event = 0;
+ return false;
+}
+
+/*
+ * Returns true if we need to wait for WALs to be flushed to disk, or if not
+ * all standbys have caught up to the flushed position (flushed_lsn) when the
+ * current acquired slot is a logical failover slot and we are
+ * streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn,
+ uint32 *wait_event)
+{
+ /* Check if we need to wait for WALs to be flushed to disk */
+ if (target_lsn > flushed_lsn)
+ {
+ *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
+ return true;
+ }
+
+ /* Check if the standby slots have caught up to the flushed position */
+ return NeedToWaitForStandbys(flushed_lsn, wait_event);
+}
+
+/*
* Wait till WAL < loc is flushed to disk so it can be safely sent to client.
*
- * Returns end LSN of flushed WAL. Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical failover slot, we also wait for all the
+ * specified streaming replication standby servers to confirm receipt of WAL
+ * up to RecentFlushPtr. It is beneficial to wait here for the confirmation
+ * up to RecentFlushPtr rather than waiting before transmitting each change
+ * to logical subscribers, which is already covered by RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
+ uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
- * have enough WAL available. This is particularly interesting if we're
- * far behind.
+ * have enough WAL available and all the standby servers have confirmed
+ * receipt of WAL up to RecentFlushPtr. This is particularly interesting
+ * if we're far behind.
*/
- if (RecentFlushPtr != InvalidXLogRecPtr &&
- loc <= RecentFlushPtr)
+ if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
return RecentFlushPtr;
/* Get a more recent flush pointer. */
@@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc)
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * Within the loop, we wait for the necessary WALs to be flushed to disk
+ * first, followed by waiting for standbys to catch up if there are enough
+ * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
+ */
for (;;)
{
+ bool wait_for_standby_at_stop = false;
long sleeptime;
/* Clear any already-pending wakeups */
@@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc)
if (got_STOPPING)
XLogBackgroundFlush();
- /* Update our idea of the currently flushed position. */
- if (!RecoveryInProgress())
- RecentFlushPtr = GetFlushRecPtr(NULL);
- else
- RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * To avoid the scenario where standbys need to catch up to a newer
+ * WAL location in each iteration, we update our idea of the currently
+ * flushed position only if we are not waiting for standbys to catch
+ * up.
+ */
+ if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ {
+ if (!RecoveryInProgress())
+ RecentFlushPtr = GetFlushRecPtr(NULL);
+ else
+ RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ }
/*
- * If postmaster asked us to stop, don't wait anymore.
+ * If postmaster asked us to stop and the standby slots have caught up
+ * to the flushed position, don't wait anymore.
*
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
*/
if (got_STOPPING)
- break;
+ {
+ if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
+ wait_for_standby_at_stop = true;
+ else
+ break;
+ }
/*
* We only send regular messages to the client for full decoded
@@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc)
!waiting_for_ping_response)
WalSndKeepalive(false, InvalidXLogRecPtr);
- /* check whether we're done */
- if (loc <= RecentFlushPtr)
+ /*
+ * Exit the loop if already caught up and doesn't need to wait for
+ * standby slots.
+ */
+ if (!wait_for_standby_at_stop &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
break;
- /* Waiting for new WAL. Since we need to wait, we're now caught up. */
+ /*
+ * Waiting for new WAL or waiting for standbys to catch up. Since we
+ * need to wait, we're now caught up.
+ */
WalSndCaughtUp = true;
/*
@@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
+ Assert(wait_event != 0);
+
+ WalSndWait(wakeEvents, sleeptime, wait_event);
}
/* reactivate latch so WalSndLoop knows to continue */
@@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredLSN();
+ PhysicalWakeupLogicalWalSnd();
}
/*
@@ -3535,6 +3649,7 @@ WalSndShmemInit(void)
ConditionVariableInit(&WalSndCtl->wal_flush_cv);
ConditionVariableInit(&WalSndCtl->wal_replay_cv);
+ ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv);
}
}
@@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
*
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
+ *
+ * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
+ * until awakened by physical walsenders after the walreceiver confirms
+ * the receipt of the LSN.
*/
- if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+ if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index ec2f31f82af..c08e00d1d6a 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP
LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server."
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
+WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 45013582a74..d77214795de 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4670,6 +4670,20 @@ struct config_string ConfigureNamesString[] =
check_debug_io_direct, assign_debug_io_direct, NULL
},
+ {
+ {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+ gettext_noop("Lists streaming replication standby server slot "
+ "names that logical WAL sender processes will wait for."),
+ gettext_noop("Logical WAL sender processes will send decoded "
+ "changes to plugins only after the specified "
+ "replication slots confirm receiving WAL."),
+ GUC_LIST_INPUT
+ },
+ &standby_slot_names,
+ "",
+ check_standby_slot_names, assign_standby_slot_names, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index edcc0282b2d..2244ee52f79 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -343,6 +343,8 @@
# method to choose sync standbys, number of sync standbys,
# and comma-separated list of application_name
# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+ # logical walsender processes will wait for
# - Standby Servers -
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index acbf567150e..425effad210 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -226,6 +226,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT char *standby_slot_names;
/* shmem initialization functions */
extern Size ReplicationSlotsShmemSize(void);
@@ -274,4 +275,8 @@ extern void CheckSlotPermissions(void);
extern ReplicationSlotInvalidationCause
GetSlotInvalidationCause(const char *conflict_reason);
+extern bool SlotExistsInStandbySlotNames(const char *slot_name);
+extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
+extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
+
#endif /* SLOT_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 0c3996e9263..f2d8297f016 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -39,6 +39,7 @@ extern void InitWalSender(void);
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
+extern void PhysicalWakeupLogicalWalSnd(void);
extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 3113e9ea470..109924ffcdc 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -113,6 +113,13 @@ typedef struct
ConditionVariable wal_flush_cv;
ConditionVariable wal_replay_cv;
+ /*
+ * Used by physical walsenders holding slots specified in
+ * standby_slot_names to wake up logical walsenders holding logical
+ * failover slots when a walreceiver confirms the receipt of LSN.
+ */
+ ConditionVariable wal_confirm_rcv_cv;
+
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
} WalSndCtlData;
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index c8a7aa9a112..d64dc5fcdb0 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -174,5 +174,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra,
extern void assign_wal_consistency_checking(const char *newval, void *extra);
extern bool check_wal_segment_size(int *newval, void **extra, GucSource source);
extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
+extern bool check_standby_slot_names(char **newval, void **extra,
+ GucSource source);
+extern void assign_standby_slot_names(const char *newval, void *extra);
#endif /* GUC_HOOKS_H */
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 5c7b4ca5e36..b95d95c06f9 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'},
undef, 'logical slot was actually dropped with DB');
# Test logical slot advancing and its durability.
+# Passing failover=true (last arg) should not have any impact on advancing.
my $logical_slot = 'logical_slot';
$node_primary->safe_psql('postgres',
- "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"
+ "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);"
);
$node_primary->psql(
'postgres', "
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 021c58f621c..0ea1f3d323c 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -505,17 +505,257 @@ ok( $standby1->poll_query_until(
'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
##################################################
+# Test that logical failover replication slots wait for the specified
+# physical replication slots to receive the changes first. It uses the
+# following set up:
+#
+# (physical standbys)
+# | ----> standby1 (primary_slot_name = sb1_slot)
+# | ----> standby2 (primary_slot_name = sb2_slot)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# standby_slot_names = 'sb1_slot'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the physical
+# slot of standby1(sb1_slot) to catch up before receiving the decoded changes.
+##################################################
+
+$backup_name = 'backup3';
+
+$primary->psql('postgres',
+ q{SELECT pg_create_physical_replication_slot('sb2_slot');});
+
+$primary->backup($backup_name);
+
+# Create another standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$standby2->append_conf(
+ 'postgresql.conf', qq(
+primary_slot_name = 'sb2_slot'
+));
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Configure primary to disallow any logical slots that have enabled failover
+# from getting ahead of the specified physical replication slot (sb1_slot).
+$primary->append_conf(
+ 'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+));
+$primary->reload;
+
+# Create another subscriber node without enabling failover, wait for sync to
+# complete
+my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$subscriber2->init;
+$subscriber2->start;
+$subscriber2->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_int (a int PRIMARY KEY);
+ CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);
+]);
+
+$subscriber2->wait_for_subscription_sync;
+
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+my $offset = -s $primary->logfile;
+
+# Stop the standby associated with the specified physical replication slot
+# (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive
+# changes until the standby comes up.
+$standby1->stop;
+
+# Create some data on the primary
+my $primary_row_count = 20;
+$primary->safe_psql('postgres',
+ "INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);");
+
+# Wait until the standby2 that's still running gets the data from the primary
+$primary->wait_for_replay_catchup($standby2);
+$result = $standby2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby2 gets data from primary");
+
+# Wait for regress_mysub2 to get the data from the primary. This subscription
+# was not enabled for failover so it gets the data without waiting for any
+# standbys.
+$primary->wait_for_catchup('regress_mysub2');
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber2 gets data from primary");
+
+# Wait until the primary server logs a warning indicating that it is waiting
+# for the sb1_slot to catch up.
+$primary->wait_for_log(
+ qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
+ $offset);
+
+# The regress_mysub1 was enabled for failover so it doesn't get the data from
+# primary and keeps waiting for the standby specified in standby_slot_names
+# (sb1_slot aka standby1).
+$result =
+ $subscriber1->safe_psql('postgres', "SELECT count(*) <> $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 doesn't get data from primary until standby1 acknowledges changes"
+);
+
+# Start the standby specified in standby_slot_names (sb1_slot aka standby1) and
+# wait for it to catch up with the primary.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Now that the standby specified in standby_slot_names is up and running, the
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of standby.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+##################################################
+# Verify that when using pg_logical_slot_get_changes to consume changes from a
+# logical failover slot, it will also wait for the slots specified in
+# standby_slot_names to catch up.
+##################################################
+
+# Stop the standby associated with the specified physical replication slot so
+# that the logical replication slot won't receive changes until the standby
+# slot's restart_lsn is advanced or the slot is removed from the
+# standby_slot_names list.
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+$primary->wait_for_catchup('regress_mysub1');
+$standby1->stop;
+
+# Disable the regress_mysub1 to prevent the logical walsender from generating
+# more warnings.
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
+ 1);
+
+# Create a logical 'test_decoding' replication slot with failover enabled
+$primary->safe_psql('postgres',
+ "SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
+);
+
+my $back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+# pg_logical_slot_get_changes will be blocked until the standby catches up,
+# hence it needs to be executed in a background session.
+$offset = -s $primary->logfile;
+$back_q->query_until(
+ qr/logical_slot_get_changes/, q(
+ \echo logical_slot_get_changes
+ SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
+));
+
+# Wait until the primary server logs a warning indicating that it is waiting
+# for the sb1_slot to catch up.
+$primary->wait_for_log(
+ qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
+ $offset);
+
+# Remove the standby from the standby_slot_names list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
+$primary->reload;
+
+# Since there are no slots in standby_slot_names, the function
+# pg_logical_slot_get_changes should now return, and the session can be
+# stopped.
+$back_q->quit;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_drop_replication_slot('test_slot');"
+);
+
+# Add the physical slot (sb1_slot) back to the standby_slot_names for further
+# tests.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'");
+$primary->reload;
+
+# Enable the regress_mysub1 for further tests
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+##################################################
+# Test that logical replication will wait for the user-created inactive
+# physical slot to catch up until we remove the slot from standby_slot_names.
+##################################################
+
+$offset = -s $primary->logfile;
+
+# Create some data on the primary
+$primary_row_count = 10;
+$primary->safe_psql('postgres',
+ "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
+
+# Wait until the primary server logs a warning indicating that it is waiting
+# for the sb1_slot to catch up.
+$primary->wait_for_log(
+ qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
+ $offset);
+
+# The regress_mysub1 doesn't get the data from primary because the specified
+# standby slot (sb1_slot) in standby_slot_names is inactive.
+$result =
+ $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;");
+is($result, 't',
+ "subscriber1 doesn't get data as the sb1_slot doesn't catch up");
+
+# Remove the standby from the standby_slot_names list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
+$primary->reload;
+
+# Since there are no slots in standby_slot_names, the primary server should now
+# send the decoded changes to the subscription.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list"
+);
+
+# Add the physical slot (sb1_slot) back to the standby_slot_names for further
+# tests.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'");
+$primary->reload;
+
+##################################################
# Promote the standby1 to primary. Confirm that:
# a) the slot 'lsub1_slot' is retained on the new primary
# b) logical replication for regress_mysub1 is resumed successfully after failover
##################################################
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
$standby1->promote;
# Update subscription with the new primary's connection info
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
$subscriber1->safe_psql('postgres',
- "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';
- ALTER SUBSCRIPTION regress_mysub1 ENABLE; ");
+ "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';");
# Confirm the synced slot 'lsub1_slot' is retained on the new primary
is($standby1->safe_psql('postgres',
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cc3611e6068..d3a7f75b080 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2650,6 +2650,7 @@ SplitPoint
SplitTextOutputData
SplitVar
StackElem
+StandbySlotNamesConfigData
StartDataPtrType
StartLOPtrType
StartLOsPtrType