aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlogrecovery.c15
-rw-r--r--src/backend/postmaster/postmaster.c93
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c3
-rw-r--r--src/backend/replication/logical/slotsync.c702
-rw-r--r--src/backend/replication/slot.c14
-rw-r--r--src/backend/replication/slotfuncs.c4
-rw-r--r--src/backend/replication/walsender.c2
-rw-r--r--src/backend/storage/lmgr/proc.c13
-rw-r--r--src/backend/utils/activity/pgstat_io.c1
-rw-r--r--src/backend/utils/activity/wait_event_names.txt2
-rw-r--r--src/backend/utils/init/miscinit.c9
-rw-r--r--src/backend/utils/init/postinit.c8
-rw-r--r--src/backend/utils/misc/guc_tables.c10
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
-rw-r--r--src/include/miscadmin.h1
-rw-r--r--src/include/replication/slotsync.h21
-rw-r--r--src/test/recovery/t/040_standby_failover_slots_sync.pl120
17 files changed, 944 insertions, 75 deletions
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 0bb472da278..d73a49b3e81 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -1468,6 +1469,20 @@ FinishWalRecovery(void)
XLogShutdownWalRcv();
/*
+ * Shutdown the slot sync worker to drop any temporary slots acquired by
+ * it and to prevent it from keep trying to fetch the failover slots.
+ *
+ * We do not update the 'synced' column from true to false here, as any
+ * failed update could leave 'synced' column false for some slots. This
+ * could cause issues during slot sync after restarting the server as a
+ * standby. While updating the 'synced' column after switching to the new
+ * timeline is an option, it does not simplify the handling for the
+ * 'synced' column. Therefore, we retain the 'synced' column as true after
+ * promotion as it may provide useful information about the slot origin.
+ */
+ ShutDownSlotSync();
+
+ /*
* We are now done reading the xlog from stream. Turn off streaming
* recovery to force fetching the files (which would be required at end of
* recovery, e.g., timeline history file) from archive or pg_wal.
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index df945a5ac4d..da0c627107e 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -115,6 +115,7 @@
#include "postmaster/syslogger.h"
#include "postmaster/walsummarizer.h"
#include "replication/logicallauncher.h"
+#include "replication/slotsync.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -167,11 +168,11 @@
* they will never become live backends. dead_end children are not assigned a
* PMChildSlot. dead_end children have bkend_type NORMAL.
*
- * "Special" children such as the startup, bgwriter and autovacuum launcher
- * tasks are not in this list. They are tracked via StartupPID and other
- * pid_t variables below. (Thus, there can't be more than one of any given
- * "special" child process type. We use BackendList entries for any child
- * process there can be more than one of.)
+ * "Special" children such as the startup, bgwriter, autovacuum launcher, and
+ * slot sync worker tasks are not in this list. They are tracked via StartupPID
+ * and other pid_t variables below. (Thus, there can't be more than one of any
+ * given "special" child process type. We use BackendList entries for any
+ * child process there can be more than one of.)
*/
typedef struct bkend
{
@@ -254,7 +255,8 @@ static pid_t StartupPID = 0,
WalSummarizerPID = 0,
AutoVacPID = 0,
PgArchPID = 0,
- SysLoggerPID = 0;
+ SysLoggerPID = 0,
+ SlotSyncWorkerPID = 0;
/* Startup process's status */
typedef enum
@@ -445,6 +447,7 @@ static void StartAutovacuumWorker(void);
static void MaybeStartWalReceiver(void);
static void MaybeStartWalSummarizer(void);
static void InitPostmasterDeathWatchHandle(void);
+static void MaybeStartSlotSyncWorker(void);
/*
* Archiver is allowed to start up at the current postmaster state?
@@ -1822,6 +1825,9 @@ ServerLoop(void)
if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = StartChildProcess(ArchiverProcess);
+ /* If we need to start a slot sync worker, try to do that now */
+ MaybeStartSlotSyncWorker();
+
/* If we need to signal the autovacuum launcher, do so now */
if (avlauncher_needs_signal)
{
@@ -2661,6 +2667,8 @@ process_pm_reload_request(void)
signal_child(PgArchPID, SIGHUP);
if (SysLoggerPID != 0)
signal_child(SysLoggerPID, SIGHUP);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, SIGHUP);
/* Reload authentication config files too */
if (!load_hba())
@@ -3010,6 +3018,7 @@ process_pm_child_exit(void)
AutoVacPID = StartAutoVacLauncher();
if (PgArchStartupAllowed() && PgArchPID == 0)
PgArchPID = StartChildProcess(ArchiverProcess);
+ MaybeStartSlotSyncWorker();
/* workers may be scheduled to start now */
maybe_start_bgworkers();
@@ -3180,6 +3189,22 @@ process_pm_child_exit(void)
continue;
}
+ /*
+ * Was it the slot sync worker? Normal exit or FATAL exit can be
+ * ignored (FATAL can be caused by libpqwalreceiver on receiving
+ * shutdown request by the startup process during promotion); we'll
+ * start a new one at the next iteration of the postmaster's main
+ * loop, if necessary. Any other exit condition is treated as a crash.
+ */
+ if (pid == SlotSyncWorkerPID)
+ {
+ SlotSyncWorkerPID = 0;
+ if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("slot sync worker process"));
+ continue;
+ }
+
/* Was it one of our background workers? */
if (CleanupBackgroundWorker(pid, exitstatus))
{
@@ -3384,7 +3409,7 @@ CleanupBackend(int pid,
/*
* HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer,
- * walwriter, autovacuum, archiver or background worker.
+ * walwriter, autovacuum, archiver, slot sync worker, or background worker.
*
* The objectives here are to clean up our local state about the child
* process, and to signal all other remaining children to quickdie.
@@ -3546,6 +3571,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
else if (PgArchPID != 0 && take_action)
sigquit_child(PgArchPID);
+ /* Take care of the slot sync worker too */
+ if (pid == SlotSyncWorkerPID)
+ SlotSyncWorkerPID = 0;
+ else if (SlotSyncWorkerPID != 0 && take_action)
+ sigquit_child(SlotSyncWorkerPID);
+
/* We do NOT restart the syslogger */
if (Shutdown != ImmediateShutdown)
@@ -3686,6 +3717,8 @@ PostmasterStateMachine(void)
signal_child(WalReceiverPID, SIGTERM);
if (WalSummarizerPID != 0)
signal_child(WalSummarizerPID, SIGTERM);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, SIGTERM);
/* checkpointer, archiver, stats, and syslogger may continue for now */
/* Now transition to PM_WAIT_BACKENDS state to wait for them to die */
@@ -3701,13 +3734,13 @@ PostmasterStateMachine(void)
/*
* PM_WAIT_BACKENDS state ends when we have no regular backends
* (including autovac workers), no bgworkers (including unconnected
- * ones), and no walwriter, autovac launcher or bgwriter. If we are
- * doing crash recovery or an immediate shutdown then we expect the
- * checkpointer to exit as well, otherwise not. The stats and
- * syslogger processes are disregarded since they are not connected to
- * shared memory; we also disregard dead_end children here. Walsenders
- * and archiver are also disregarded, they will be terminated later
- * after writing the checkpoint record.
+ * ones), and no walwriter, autovac launcher, bgwriter or slot sync
+ * worker. If we are doing crash recovery or an immediate shutdown
+ * then we expect the checkpointer to exit as well, otherwise not. The
+ * stats and syslogger processes are disregarded since they are not
+ * connected to shared memory; we also disregard dead_end children
+ * here. Walsenders and archiver are also disregarded, they will be
+ * terminated later after writing the checkpoint record.
*/
if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 &&
StartupPID == 0 &&
@@ -3717,7 +3750,8 @@ PostmasterStateMachine(void)
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
WalWriterPID == 0 &&
- AutoVacPID == 0)
+ AutoVacPID == 0 &&
+ SlotSyncWorkerPID == 0)
{
if (Shutdown >= ImmediateShutdown || FatalError)
{
@@ -3815,6 +3849,7 @@ PostmasterStateMachine(void)
Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0);
+ Assert(SlotSyncWorkerPID == 0);
/* syslogger is not considered here */
pmState = PM_NO_CHILDREN;
}
@@ -4038,6 +4073,8 @@ TerminateChildren(int signal)
signal_child(AutoVacPID, signal);
if (PgArchPID != 0)
signal_child(PgArchPID, signal);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, signal);
}
/*
@@ -4850,6 +4887,7 @@ SubPostmasterMain(int argc, char *argv[])
*/
if (strcmp(argv[1], "--forkbackend") == 0 ||
strcmp(argv[1], "--forkavlauncher") == 0 ||
+ strcmp(argv[1], "--forkssworker") == 0 ||
strcmp(argv[1], "--forkavworker") == 0 ||
strcmp(argv[1], "--forkaux") == 0 ||
strcmp(argv[1], "--forkbgworker") == 0)
@@ -4953,6 +4991,13 @@ SubPostmasterMain(int argc, char *argv[])
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
}
+ if (strcmp(argv[1], "--forkssworker") == 0)
+ {
+ /* Restore basic shared memory pointers */
+ InitShmemAccess(UsedShmemSegAddr);
+
+ ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */
+ }
if (strcmp(argv[1], "--forkbgworker") == 0)
{
/* do this as early as possible; in particular, before InitProcess() */
@@ -5499,6 +5544,24 @@ MaybeStartWalSummarizer(void)
/*
+ * MaybeStartSlotSyncWorker
+ * Start the slot sync worker, if not running and our state allows.
+ *
+ * We allow to start the slot sync worker when we are on a hot standby,
+ * fast or immediate shutdown is not in progress, slot sync parameters
+ * are configured correctly, and it is the first time of worker's launch,
+ * or enough time has passed since the worker was launched last.
+ */
+static void
+MaybeStartSlotSyncWorker(void)
+{
+ if (SlotSyncWorkerPID == 0 && pmState == PM_HOT_STANDBY &&
+ Shutdown <= SmartShutdown && sync_replication_slots &&
+ ValidateSlotSyncParams(LOG) && SlotSyncWorkerCanRestart())
+ SlotSyncWorkerPID = StartSlotSyncWorker();
+}
+
+/*
* Create the opts file
*/
static bool
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855b..04271ee7032 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -6,6 +6,9 @@
* loaded as a dynamic module to avoid linking the main server binary with
* libpq.
*
+ * Apart from walreceiver, the libpq-specific routines are now being used by
+ * logical replication workers and slot synchronization.
+ *
* Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
*
*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 4cc9148c572..36773cfe73f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -10,18 +10,25 @@
*
* This file contains the code for slot synchronization on a physical standby
* to fetch logical failover slots information from the primary server, create
- * the slots on the standby and synchronize them. This is done by a call to SQL
- * function pg_sync_replication_slots.
+ * the slots on the standby and synchronize them periodically.
*
- * If on physical standby, the WAL corresponding to the remote's restart_lsn
- * is not available or the remote's catalog_xmin precedes the oldest xid for which
- * it is guaranteed that rows wouldn't have been removed then we cannot create
- * the local standby slot because that would mean moving the local slot
+ * Slot synchronization can be performed either automatically by enabling slot
+ * sync worker or manually by calling SQL function pg_sync_replication_slots().
+ *
+ * If the WAL corresponding to the remote's restart_lsn is not available on the
+ * physical standby or the remote's catalog_xmin precedes the oldest xid for
+ * which it is guaranteed that rows wouldn't have been removed then we cannot
+ * create the local standby slot because that would mean moving the local slot
* backward and decoding won't be possible via such a slot. In this case, the
* slot will be marked as RS_TEMPORARY. Once the primary server catches up,
* the slot will be marked as RS_PERSISTENT (which means sync-ready) after
- * which we can call pg_sync_replication_slots() periodically to perform
- * syncs.
+ * which slot sync worker can perform the sync periodically or user can call
+ * pg_sync_replication_slots() periodically to perform the syncs.
+ *
+ * The slot sync worker waits for some time before the next synchronization,
+ * with the duration varying based on whether any slots were updated during
+ * the last cycle. Refer to the comments above wait_for_slot_activity() for
+ * more details.
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
@@ -31,28 +38,84 @@
#include "postgres.h"
+#include <time.h>
+
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
+#include "utils/ps_status.h"
+#include "utils/timeout.h"
-/* Struct for sharing information to control slot synchronization. */
+/*
+ * Struct for sharing information to control slot synchronization.
+ *
+ * The slot sync worker's pid is needed by the startup process to shut it
+ * down during promotion. The startup process shuts down the slot sync worker
+ * and also sets stopSignaled=true to handle the race condition when the
+ * postmaster has not noticed the promotion yet and thus may end up restarting
+ * the slot sync worker. If stopSignaled is set, the worker will exit in such a
+ * case. Note that we don't need to reset this variable as after promotion the
+ * slot sync worker won't be restarted because the pmState changes to PM_RUN from
+ * PM_HOT_STANDBY and we don't support demoting primary without restarting the
+ * server. See MaybeStartSlotSyncWorker.
+ *
+ * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
+ * overwrites.
+ *
+ * The 'last_start_time' is needed by postmaster to start the slot sync worker
+ * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart
+ * is expected (e.g., slot sync GUCs change), slot sync worker will reset
+ * last_start_time before exiting, so that postmaster can start the worker
+ * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ *
+ * All the fields except 'syncing' are used only by slotsync worker.
+ * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
+ */
typedef struct SlotSyncCtxStruct
{
- /* prevents concurrent slot syncs to avoid slot overwrites */
+ pid_t pid;
+ bool stopSignaled;
bool syncing;
+ time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;
SlotSyncCtxStruct *SlotSyncCtx = NULL;
+/* GUC variable */
+bool sync_replication_slots = false;
+
+/*
+ * The sleep time (ms) between slot-sync cycles varies dynamically
+ * (within a MIN/MAX range) according to slot activity. See
+ * wait_for_slot_activity() for details.
+ */
+#define MIN_WORKER_NAPTIME_MS 200
+#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */
+
+static long sleep_ms = MIN_WORKER_NAPTIME_MS;
+
+/* The restart interval for slot sync work used by postmaster */
+#define SLOTSYNC_RESTART_INTERVAL_SEC 10
+
+/* Flag to tell if we are in a slot sync worker process */
+static bool am_slotsync_worker = false;
+
/*
* Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
* in SlotSyncCtxStruct, this flag is true only if the current process is
@@ -79,6 +142,13 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+#ifdef EXEC_BACKEND
+static pid_t slotsyncworker_forkexec(void);
+#endif
+NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+
+static void slotsync_failure_callback(int code, Datum arg);
+
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
@@ -343,8 +413,11 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
+ *
+ * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
+ * false.
*/
-static void
+static bool
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
@@ -375,7 +448,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
slot->data.catalog_xmin));
- return;
+ return false;
}
/* First time slot update, the function must return true */
@@ -387,6 +460,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(LOG,
errmsg("newly created slot \"%s\" is sync-ready now",
remote_slot->name));
+
+ return true;
}
/*
@@ -399,12 +474,15 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* the remote_slot catches up with locally reserved position and local slot is
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
+ *
+ * Returns TRUE if the local slot is updated.
*/
-static void
+static bool
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
+ bool slot_updated = false;
/*
* Make sure that concerned WAL is received and flushed before syncing
@@ -412,12 +490,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*/
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
- elog(ERROR,
- "skipping slot synchronization as the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr));
+ {
+ ereport(am_slotsync_worker ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization as the received slot sync"
+ " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return false;
+ }
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
@@ -465,19 +548,22 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
ReplicationSlotRelease();
- return;
+ return slot_updated;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ slot_updated = update_and_persist_local_synced_slot(remote_slot,
+ remote_dbid);
}
/* Slot ready for sync, so sync it. */
@@ -500,6 +586,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
}
}
@@ -511,7 +599,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip creating the local slot if remote_slot is invalidated already */
if (remote_slot->invalidated != RS_INVAL_NONE)
- return;
+ return false;
/*
* We create temporary slots instead of ephemeral slots here because
@@ -548,9 +636,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
LWLockRelease(ProcArrayLock);
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+
+ slot_updated = true;
}
ReplicationSlotRelease();
+
+ return slot_updated;
}
/*
@@ -558,8 +650,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally. Creates the slots if not present on the standby.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
-static void
+static bool
synchronize_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 9
@@ -569,6 +663,8 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
+ bool some_slot_updated = false;
+ bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
" database, conflict_reason"
@@ -589,9 +685,15 @@ synchronize_slots(WalReceiverConn *wrconn)
syncing_slots = true;
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
/* Execute the query */
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
-
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
@@ -686,7 +788,7 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
@@ -696,11 +798,16 @@ synchronize_slots(WalReceiverConn *wrconn)
walrcv_clear_result(res);
+ if (started_tx)
+ CommitTransactionCommand();
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
+
+ return some_slot_updated;
}
/*
@@ -720,6 +827,7 @@ validate_remote_info(WalReceiverConn *wrconn)
TupleTableSlot *tupslot;
bool remote_in_recovery;
bool primary_slot_valid;
+ bool started_tx = false;
initStringInfo(&cmd);
appendStringInfo(&cmd,
@@ -728,6 +836,13 @@ validate_remote_info(WalReceiverConn *wrconn)
" WHERE slot_type='physical' AND slot_name=%s",
quote_literal_cstr(PrimarySlotName));
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
pfree(cmd.data);
@@ -763,28 +878,73 @@ validate_remote_info(WalReceiverConn *wrconn)
ExecClearTuple(tupslot);
walrcv_clear_result(res);
+
+ if (started_tx)
+ CommitTransactionCommand();
}
/*
- * Check all necessary GUCs for slot synchronization are set
- * appropriately, otherwise, raise ERROR.
+ * Checks if dbname is specified in 'primary_conninfo'.
+ *
+ * Error out if not specified otherwise return it.
*/
-void
-ValidateSlotSyncParams(void)
+char *
+CheckAndGetDbnameFromConninfo(void)
{
char *dbname;
/*
+ * The slot synchronization needs a database connection for walrcv_exec to
+ * work.
+ */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ ereport(ERROR,
+
+ /*
+ * translator: dbname is a specific option; %s is a GUC variable name
+ */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires dbname to be specified in %s",
+ "primary_conninfo"));
+ return dbname;
+}
+
+/*
+ * Return true if all necessary GUCs for slot synchronization are set
+ * appropriately, otherwise, return false.
+ */
+bool
+ValidateSlotSyncParams(int elevel)
+{
+ /*
+ * Logical slot sync/creation requires wal_level >= logical.
+ *
+ * Sincle altering the wal_level requires a server restart, so error out
+ * in this case regardless of elevel provided by caller.
+ */
+ if (wal_level < WAL_LEVEL_LOGICAL)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
+
+ /*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* after restarting, so that the synchronized slot on the standby will not
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
+ return false;
+ }
/*
* hot_standby_feedback must be enabled to cooperate with the physical
@@ -792,47 +952,478 @@ ValidateSlotSyncParams(void)
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be enabled",
"hot_standby_feedback"));
-
- /* Logical slot sync/creation requires wal_level >= logical. */
- if (wal_level < WAL_LEVEL_LOGICAL)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined",
"primary_conninfo"));
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Re-read the config file.
+ *
+ * Exit if any of the slot sync GUCs have changed. The postmaster will
+ * restart it.
+ */
+static void
+slotsync_reread_config(void)
+{
+ char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
+ char *old_primary_slotname = pstrdup(PrimarySlotName);
+ bool old_sync_replication_slots = sync_replication_slots;
+ bool old_hot_standby_feedback = hot_standby_feedback;
+ bool conninfo_changed;
+ bool primary_slotname_changed;
+
+ Assert(sync_replication_slots);
+
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
+ primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+ pfree(old_primary_conninfo);
+ pfree(old_primary_slotname);
+
+ if (old_sync_replication_slots != sync_replication_slots)
+ {
+ ereport(LOG,
+ /* translator: %s is a GUC variable name */
+ errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
+ proc_exit(0);
+ }
+
+ if (conninfo_changed ||
+ primary_slotname_changed ||
+ (old_hot_standby_feedback != hot_standby_feedback))
+ {
+ ereport(LOG,
+ errmsg("slot sync worker will restart because of a parameter change"));
+
+ /*
+ * Reset the last-start time for this worker so that the postmaster
+ * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ */
+ SlotSyncCtx->last_start_time = 0;
+
+ proc_exit(0);
+ }
+
+}
+
+/*
+ * Interrupt handler for main loop of slot sync worker.
+ */
+static void
+ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ if (ShutdownRequestPending)
+ {
+ ereport(LOG,
+ errmsg("slot sync worker is shutting down on receiving SIGINT"));
+
+ proc_exit(0);
+ }
+
+ if (ConfigReloadPending)
+ slotsync_reread_config();
+}
+
+/*
+ * Cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_onexit(int code, Datum arg)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->pid = InvalidPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * Sleep for long enough that we believe it's likely that the slots on primary
+ * get updated.
+ *
+ * If there is no slot activity the wait time between sync-cycles will double
+ * (to a maximum of 30s). If there is some slot activity the wait time between
+ * sync-cycles is reset to the minimum (200ms).
+ */
+static void
+wait_for_slot_activity(bool some_slot_updated)
+{
+ int rc;
+
+ if (!some_slot_updated)
+ {
+ /*
+ * No slots were updated, so double the sleep time, but not beyond the
+ * maximum allowable value.
+ */
+ sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS);
+ }
+ else
+ {
+ /*
+ * Some slots were updated since the last sleep, so reset the sleep
+ * time.
+ */
+ sleep_ms = MIN_WORKER_NAPTIME_MS;
+ }
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_ms,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+}
+
+/*
+ * The main loop of our worker process.
+ *
+ * It connects to the primary server, fetches logical failover slots
+ * information periodically in order to create and sync the slots.
+ */
+NON_EXEC_STATIC void
+ReplSlotSyncWorkerMain(int argc, char *argv[])
+{
+ WalReceiverConn *wrconn = NULL;
+ char *dbname;
+ char *err;
+ sigjmp_buf local_sigjmp_buf;
+ StringInfoData app_name;
+
+ am_slotsync_worker = true;
+
+ MyBackendType = B_SLOTSYNC_WORKER;
+
+ init_ps_display(NULL);
+
+ SetProcessingMode(InitProcessing);
/*
- * The slot synchronization needs a database connection for walrcv_exec to
- * work.
+ * Create a per-backend PGPROC struct in shared memory. We must do this
+ * before we access any shared memory.
*/
- dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
- if (dbname == NULL)
- ereport(ERROR,
+ InitProcess();
+
+ /*
+ * Early initialization.
+ */
+ BaseInit();
+
+ Assert(SlotSyncCtx != NULL);
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync worker to stop, so if meanwhile
+ * postmaster ended up starting the worker again, exit.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ proc_exit(0);
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = MyProcPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * We just need to clean up, report the error, and go away.
+ *
+ * If we do not have this handling here, then since this worker process
+ * operates at the bottom of the exception stack, ERRORs turn into FATALs.
+ * Therefore, we create our own exception handler to catch ERRORs.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
/*
- * translator: dbname is a specific option; %s is a GUC variable name
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
*/
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires dbname to be specified in %s",
- "primary_conninfo"));
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+ dbname = CheckAndGetDbnameFromConninfo();
+
+ /*
+ * Connect to the database specified by the user in primary_conninfo. We
+ * need a database connection for walrcv_exec to work which we use to
+ * fetch slot information from the remote node. See comments atop
+ * libpqrcv_exec.
+ *
+ * We do not specify a specific user here since the slot sync worker will
+ * operate as a superuser. This is safe because the slot sync worker does
+ * not interact with user tables, eliminating the risk of executing
+ * arbitrary code within triggers.
+ */
+ InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
+
+ SetProcessingMode(NormalProcessing);
+
+ initStringInfo(&app_name);
+ if (cluster_name[0])
+ appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
+ else
+ appendStringInfo(&app_name, "%s", "slotsync worker");
+
+ /*
+ * Establish the connection to the primary server for slot
+ * synchronization.
+ */
+ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
+ app_name.data, &err);
+ pfree(app_name.data);
+
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err));
+
+ /*
+ * Register the failure callback once we have the connection.
+ *
+ * XXX: This can be combined with previous such cleanup registration of
+ * slotsync_worker_onexit() but that will need the connection to be made
+ * global and we want to avoid introducing global for this purpose.
+ */
+ before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+
+ /*
+ * Using the specified primary server connection, check that we are not a
+ * cascading standby and slot configured in 'primary_slot_name' exists on
+ * the primary server.
+ */
+ validate_remote_info(wrconn);
+
+ /* Main loop to synchronize slots */
+ for (;;)
+ {
+ bool some_slot_updated = false;
+
+ ProcessSlotSyncInterrupts(wrconn);
+
+ some_slot_updated = synchronize_slots(wrconn);
+
+ wait_for_slot_activity(some_slot_updated);
+ }
+
+ /*
+ * The slot sync worker can't get here because it will only stop when it
+ * receives a SIGINT from the startup process, or when there is an error.
+ */
+ Assert(false);
+}
+
+/*
+ * Main entry point for slot sync worker process, to be called from the
+ * postmaster.
+ */
+int
+StartSlotSyncWorker(void)
+{
+ pid_t pid;
+
+#ifdef EXEC_BACKEND
+ switch ((pid = slotsyncworker_forkexec()))
+ {
+#else
+ switch ((pid = fork_process()))
+ {
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ ReplSlotSyncWorkerMain(0, NULL);
+ break;
+#endif
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork slot sync worker process: %m")));
+ return 0;
+
+ default:
+ return (int) pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+#ifdef EXEC_BACKEND
+/*
+ * The forkexec routine for the slot sync worker process.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+static pid_t
+slotsyncworker_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkssworker";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+#endif
+
+/*
+ * Shut down the slot sync worker.
+ */
+void
+ShutDownSlotSync(void)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ SlotSyncCtx->stopSignaled = true;
+
+ if (SlotSyncCtx->pid == InvalidPid)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ kill(SlotSyncCtx->pid, SIGINT);
+
+ /* Wait for it to die */
+ for (;;)
+ {
+ int rc;
+
+ /* Wait a bit, we don't expect to have to wait long */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* Is it gone? */
+ if (SlotSyncCtx->pid == InvalidPid)
+ break;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ }
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * SlotSyncWorkerCanRestart
+ *
+ * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
+ * since it was launched last. Otherwise returns false.
+ *
+ * This is a safety valve to protect against continuous respawn attempts if the
+ * worker is dying immediately at launch. Note that since we will retry to
+ * launch the worker from the postmaster main loop, we will get another
+ * chance later.
+ */
+bool
+SlotSyncWorkerCanRestart(void)
+{
+ time_t curtime = time(NULL);
+
+ /* Return false if too soon since last start. */
+ if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
+ (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
+ return false;
+
+ SlotSyncCtx->last_start_time = curtime;
+
+ return true;
}
/*
- * Is current process syncing replication slots ?
+ * Is current process syncing replication slots?
+ *
+ * Could be either backend executing SQL function or slot sync worker.
*/
bool
IsSyncingReplicationSlots(void)
@@ -841,6 +1432,15 @@ IsSyncingReplicationSlots(void)
}
/*
+ * Is current process a slot sync worker?
+ */
+bool
+IsLogicalSlotSyncWorker(void)
+{
+ return am_slotsync_worker;
+}
+
+/*
* Amount of shared memory required for slot synchronization.
*/
Size
@@ -855,14 +1455,16 @@ SlotSyncShmemSize(void)
void
SlotSyncShmemInit(void)
{
+ Size size = SlotSyncShmemSize();
bool found;
SlotSyncCtx = (SlotSyncCtxStruct *)
- ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
+ ShmemInitStruct("Slot Sync Data", size, &found);
if (!found)
{
- SlotSyncCtx->syncing = false;
+ memset(SlotSyncCtx, 0, size);
+ SlotSyncCtx->pid = InvalidPid;
SpinLockInit(&SlotSyncCtx->mutex);
}
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 233652b4799..033b4ce0971 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1252,6 +1252,20 @@ restart:
* concurrently being dropped by a backend connected to another DB.
*
* That's fairly unlikely in practice, so we'll just bail out.
+ *
+ * The slot sync worker holds a shared lock on the database before
+ * operating on synced logical slots to avoid conflict with the drop
+ * happening here. The persistent synced slots are thus safe but there
+ * is a possibility that the slot sync worker has created a temporary
+ * slot (which stays active even on release) and we are trying to drop
+ * that here. In practice, the chances of hitting this scenario are
+ * less as during slot synchronization, the temporary slot is
+ * immediately converted to persistent and thus is safe due to the
+ * shared lock taken on the database. So, we'll just bail out in such
+ * a case.
+ *
+ * XXX: We can consider shutting down the slot sync worker before
+ * trying to drop synced temporary slots here.
*/
if (active_pid)
ereport(ERROR,
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index c108bf9608f..768a304723b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -960,10 +960,12 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS)
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slots can only be synchronized to a standby server"));
+ ValidateSlotSyncParams(ERROR);
+
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- ValidateSlotSyncParams();
+ (void) CheckAndGetDbnameFromConninfo();
initStringInfo(&app_name);
if (cluster_name[0])
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 631d1e0c9fd..13bc3e0aee4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3389,7 +3389,7 @@ WalSndDone(WalSndSendDataCallback send_data)
* This should only be called when in recovery.
*
* This is called either by cascading walsender to find WAL postion to be sent
- * to a cascaded standby or by slot synchronization function to validate remote
+ * to a cascaded standby or by slot synchronization operation to validate remote
* slot's lsn before syncing it locally.
*
* As a side-effect, *tli is updated to the TLI of the last
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 4aec4a3c5f4..6e334971dc9 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -40,6 +40,7 @@
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
@@ -366,8 +367,12 @@ InitProcess(void)
* child; this is so that the postmaster can detect it if we exit without
* cleaning up. (XXX autovac launcher currently doesn't participate in
* this; it probably should.)
+ *
+ * Slot sync worker also does not participate in it, see comments atop
+ * 'struct bkend' in postmaster.c.
*/
- if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
+ if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() &&
+ !IsLogicalSlotSyncWorker())
MarkPostmasterChildActive();
/*
@@ -939,8 +944,12 @@ ProcKill(int code, Datum arg)
* This process is no longer present in shared memory in any meaningful
* way, so tell the postmaster we've cleaned up acceptably well. (XXX
* autovac launcher should be included here someday)
+ *
+ * Slot sync worker is also not a postmaster child, so skip this shared
+ * memory related processing here.
*/
- if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
+ if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() &&
+ !IsLogicalSlotSyncWorker())
MarkPostmasterChildInactive();
/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c
index 43c393d6fe8..9d6e0673827 100644
--- a/src/backend/utils/activity/pgstat_io.c
+++ b/src/backend/utils/activity/pgstat_io.c
@@ -338,6 +338,7 @@ pgstat_tracks_io_bktype(BackendType bktype)
case B_BG_WORKER:
case B_BG_WRITER:
case B_CHECKPOINTER:
+ case B_SLOTSYNC_WORKER:
case B_STANDALONE_BACKEND:
case B_STARTUP:
case B_WAL_SENDER:
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 6464386b779..4fffb466255 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
+REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker."
+REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 23f77a59e58..77fd8047563 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -40,6 +40,7 @@
#include "postmaster/interrupt.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
+#include "replication/slotsync.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@@ -293,6 +294,9 @@ GetBackendTypeDesc(BackendType backendType)
case B_LOGGER:
backendDesc = "logger";
break;
+ case B_SLOTSYNC_WORKER:
+ backendDesc = "slotsync worker";
+ break;
case B_STANDALONE_BACKEND:
backendDesc = "standalone backend";
break;
@@ -835,9 +839,10 @@ InitializeSessionUserIdStandalone(void)
{
/*
* This function should only be called in single-user mode, in autovacuum
- * workers, and in background workers.
+ * workers, in slot sync worker and in background workers.
*/
- Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker);
+ Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() ||
+ IsLogicalSlotSyncWorker() || IsBackgroundWorker);
/* call only once */
Assert(!OidIsValid(AuthenticatedUserId));
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7797876d008..5ffe9bdd987 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -43,6 +43,7 @@
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -876,10 +877,11 @@ InitPostgres(const char *in_dbname, Oid dboid,
* Perform client authentication if necessary, then figure out our
* postgres user ID, and see if we are a superuser.
*
- * In standalone mode and in autovacuum worker processes, we use a fixed
- * ID, otherwise we figure it out from the authenticated user name.
+ * In standalone mode, autovacuum worker processes and slot sync worker
+ * process, we use a fixed ID, otherwise we figure it out from the
+ * authenticated user name.
*/
- if (bootstrap || IsAutoVacuumWorkerProcess())
+ if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker())
{
InitializeSessionUserIdStandalone();
am_superuser = true;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 70652f0a3fc..37be0669bba 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -67,6 +67,7 @@
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/large_object.h"
@@ -2054,6 +2055,15 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"sync_replication_slots", PGC_SIGHUP, REPLICATION_STANDBY,
+ gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."),
+ },
+ &sync_replication_slots,
+ false,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e10755972ae..c97f9a25f05 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -361,6 +361,7 @@
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
#recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery
+#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary
# - Subscribers -
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 0445fbf61d7..612fb5f42e0 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -333,6 +333,7 @@ typedef enum BackendType
B_BG_WRITER,
B_CHECKPOINTER,
B_LOGGER,
+ B_SLOTSYNC_WORKER,
B_STANDALONE_BACKEND,
B_STARTUP,
B_WAL_RECEIVER,
diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h
index e86d8a47b85..726d65f9b62 100644
--- a/src/include/replication/slotsync.h
+++ b/src/include/replication/slotsync.h
@@ -14,8 +14,27 @@
#include "replication/walreceiver.h"
-extern void ValidateSlotSyncParams(void);
+extern PGDLLIMPORT bool sync_replication_slots;
+
+/*
+ * GUCs needed by slot sync worker to connect to the primary
+ * server and carry on with slots synchronization.
+ */
+extern PGDLLIMPORT char *PrimaryConnInfo;
+extern PGDLLIMPORT char *PrimarySlotName;
+
+extern char *CheckAndGetDbnameFromConninfo(void);
+extern bool ValidateSlotSyncParams(int elevel);
+
+#ifdef EXEC_BACKEND
+extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+#endif
+extern int StartSlotSyncWorker(void);
+
+extern void ShutDownSlotSync(void);
+extern bool SlotSyncWorkerCanRestart(void);
extern bool IsSyncingReplicationSlots(void);
+extern bool IsLogicalSlotSyncWorker(void);
extern Size SlotSyncShmemSize(void);
extern void SlotSyncShmemInit(void);
extern void SyncReplicationSlots(WalReceiverConn *wrconn);
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 0f2f819f53b..e24009610ad 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -322,6 +322,10 @@ ok( $stderr =~
/ERROR: slot synchronization requires dbname to be specified in primary_conninfo/,
"cannot sync slots if dbname is not specified in primary_conninfo");
+# Add the dbname back to the primary_conninfo for further tests
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres'");
+$standby1->reload;
+
##################################################
# Test that we cannot synchronize slots to a cascading standby server.
##################################################
@@ -355,4 +359,120 @@ ok( $stderr =~
/ERROR: cannot synchronize replication slots from a standby server/,
"cannot sync slots to a cascading standby server");
+$cascading_standby->stop;
+
+##################################################
+# Test to confirm that the slot sync worker exits on invalid GUC(s) and
+# get started again on valid GUC(s).
+##################################################
+
+$log_offset = -s $standby1->logfile;
+
+# Enable slot sync worker.
+$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
+$standby1->reload;
+
+# Confirm that the slot sync worker is able to start.
+$standby1->wait_for_log(qr/LOG: slot sync worker started/,
+ $log_offset);
+
+$log_offset = -s $standby1->logfile;
+
+# Disable another GUC required for slot sync.
+$standby1->append_conf( 'postgresql.conf', qq(hot_standby_feedback = off));
+$standby1->reload;
+
+# Confirm that slot sync worker acknowledge the GUC change and logs the msg
+# about wrong configuration.
+$standby1->wait_for_log(qr/LOG: slot sync worker will restart because of a parameter change/,
+ $log_offset);
+$standby1->wait_for_log(qr/LOG: slot synchronization requires hot_standby_feedback to be enabled/,
+ $log_offset);
+
+$log_offset = -s $standby1->logfile;
+
+# Re-enable the required GUC
+$standby1->append_conf('postgresql.conf', "hot_standby_feedback = on");
+$standby1->reload;
+
+# Confirm that the slot sync worker is able to start now.
+$standby1->wait_for_log(qr/LOG: slot sync worker started/,
+ $log_offset);
+
+##################################################
+# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot
+# on the primary is synced to the standby via the slot sync worker.
+##################################################
+
+# Insert data on the primary
+$primary->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_int (a int PRIMARY KEY);
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+]);
+
+# Subscribe to the new table data and wait for it to arrive
+$subscriber1->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE tab_int (a int PRIMARY KEY);
+ ALTER SUBSCRIPTION regress_mysub1 ENABLE;
+ ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION;
+]);
+
+$subscriber1->wait_for_subscription_sync;
+
+# Do not allow any further advancement of the restart_lsn and
+# confirmed_flush_lsn for the lsub1_slot.
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
+ 1);
+
+# Get the restart_lsn for the logical slot lsub1_slot on the primary
+my $primary_restart_lsn = $primary->safe_psql('postgres',
+ "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
+
+# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
+my $primary_flush_lsn = $primary->safe_psql('postgres',
+ "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
+
+# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
+# to the standby
+ok( $standby1->poll_query_until(
+ 'postgres',
+ "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
+ 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
+
+##################################################
+# Promote the standby1 to primary. Confirm that:
+# a) the slot 'lsub1_slot' is retained on the new primary
+# b) logical replication for regress_mysub1 is resumed successfully after failover
+##################################################
+$standby1->promote;
+
+# Update subscription with the new primary's connection info
+my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
+$subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';
+ ALTER SUBSCRIPTION regress_mysub1 ENABLE; ");
+
+# Confirm the synced slot 'lsub1_slot' is retained on the new primary
+is($standby1->safe_psql('postgres',
+ q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;}),
+ 'lsub1_slot',
+ 'synced slot retained on the new primary');
+
+# Insert data on the new primary
+$standby1->safe_psql('postgres',
+ "INSERT INTO tab_int SELECT generate_series(11, 20);");
+$standby1->wait_for_catchup('regress_mysub1');
+
+# Confirm that data in tab_int replicated on the subscriber
+is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
+ "20",
+ 'data replicated from the new primary');
+
done_testing();