diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 15 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 93 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 702 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 14 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 2 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 13 | ||||
-rw-r--r-- | src/backend/utils/activity/pgstat_io.c | 1 | ||||
-rw-r--r-- | src/backend/utils/activity/wait_event_names.txt | 2 | ||||
-rw-r--r-- | src/backend/utils/init/miscinit.c | 9 | ||||
-rw-r--r-- | src/backend/utils/init/postinit.c | 8 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 10 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 1 | ||||
-rw-r--r-- | src/include/miscadmin.h | 1 | ||||
-rw-r--r-- | src/include/replication/slotsync.h | 21 | ||||
-rw-r--r-- | src/test/recovery/t/040_standby_failover_slots_sync.pl | 120 |
17 files changed, 944 insertions, 75 deletions
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0bb472da278..d73a49b3e81 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -49,6 +49,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/startup.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/walreceiver.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -1468,6 +1469,20 @@ FinishWalRecovery(void) XLogShutdownWalRcv(); /* + * Shutdown the slot sync worker to drop any temporary slots acquired by + * it and to prevent it from keep trying to fetch the failover slots. + * + * We do not update the 'synced' column from true to false here, as any + * failed update could leave 'synced' column false for some slots. This + * could cause issues during slot sync after restarting the server as a + * standby. While updating the 'synced' column after switching to the new + * timeline is an option, it does not simplify the handling for the + * 'synced' column. Therefore, we retain the 'synced' column as true after + * promotion as it may provide useful information about the slot origin. + */ + ShutDownSlotSync(); + + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of * recovery, e.g., timeline history file) from archive or pg_wal. diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index df945a5ac4d..da0c627107e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -115,6 +115,7 @@ #include "postmaster/syslogger.h" #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" +#include "replication/slotsync.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -167,11 +168,11 @@ * they will never become live backends. dead_end children are not assigned a * PMChildSlot. dead_end children have bkend_type NORMAL. * - * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. They are tracked via StartupPID and other - * pid_t variables below. (Thus, there can't be more than one of any given - * "special" child process type. We use BackendList entries for any child - * process there can be more than one of.) + * "Special" children such as the startup, bgwriter, autovacuum launcher, and + * slot sync worker tasks are not in this list. They are tracked via StartupPID + * and other pid_t variables below. (Thus, there can't be more than one of any + * given "special" child process type. We use BackendList entries for any + * child process there can be more than one of.) */ typedef struct bkend { @@ -254,7 +255,8 @@ static pid_t StartupPID = 0, WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, - SysLoggerPID = 0; + SysLoggerPID = 0, + SlotSyncWorkerPID = 0; /* Startup process's status */ typedef enum @@ -445,6 +447,7 @@ static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); static void MaybeStartWalSummarizer(void); static void InitPostmasterDeathWatchHandle(void); +static void MaybeStartSlotSyncWorker(void); /* * Archiver is allowed to start up at the current postmaster state? @@ -1822,6 +1825,9 @@ ServerLoop(void) if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = StartChildProcess(ArchiverProcess); + /* If we need to start a slot sync worker, try to do that now */ + MaybeStartSlotSyncWorker(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -2661,6 +2667,8 @@ process_pm_reload_request(void) signal_child(PgArchPID, SIGHUP); if (SysLoggerPID != 0) signal_child(SysLoggerPID, SIGHUP); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGHUP); /* Reload authentication config files too */ if (!load_hba()) @@ -3010,6 +3018,7 @@ process_pm_child_exit(void) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = StartChildProcess(ArchiverProcess); + MaybeStartSlotSyncWorker(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -3180,6 +3189,22 @@ process_pm_child_exit(void) continue; } + /* + * Was it the slot sync worker? Normal exit or FATAL exit can be + * ignored (FATAL can be caused by libpqwalreceiver on receiving + * shutdown request by the startup process during promotion); we'll + * start a new one at the next iteration of the postmaster's main + * loop, if necessary. Any other exit condition is treated as a crash. + */ + if (pid == SlotSyncWorkerPID) + { + SlotSyncWorkerPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("slot sync worker process")); + continue; + } + /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3384,7 +3409,7 @@ CleanupBackend(int pid, /* * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer, - * walwriter, autovacuum, archiver or background worker. + * walwriter, autovacuum, archiver, slot sync worker, or background worker. * * The objectives here are to clean up our local state about the child * process, and to signal all other remaining children to quickdie. @@ -3546,6 +3571,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (PgArchPID != 0 && take_action) sigquit_child(PgArchPID); + /* Take care of the slot sync worker too */ + if (pid == SlotSyncWorkerPID) + SlotSyncWorkerPID = 0; + else if (SlotSyncWorkerPID != 0 && take_action) + sigquit_child(SlotSyncWorkerPID); + /* We do NOT restart the syslogger */ if (Shutdown != ImmediateShutdown) @@ -3686,6 +3717,8 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); if (WalSummarizerPID != 0) signal_child(WalSummarizerPID, SIGTERM); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3701,13 +3734,13 @@ PostmasterStateMachine(void) /* * PM_WAIT_BACKENDS state ends when we have no regular backends * (including autovac workers), no bgworkers (including unconnected - * ones), and no walwriter, autovac launcher or bgwriter. If we are - * doing crash recovery or an immediate shutdown then we expect the - * checkpointer to exit as well, otherwise not. The stats and - * syslogger processes are disregarded since they are not connected to - * shared memory; we also disregard dead_end children here. Walsenders - * and archiver are also disregarded, they will be terminated later - * after writing the checkpoint record. + * ones), and no walwriter, autovac launcher, bgwriter or slot sync + * worker. If we are doing crash recovery or an immediate shutdown + * then we expect the checkpointer to exit as well, otherwise not. The + * stats and syslogger processes are disregarded since they are not + * connected to shared memory; we also disregard dead_end children + * here. Walsenders and archiver are also disregarded, they will be + * terminated later after writing the checkpoint record. */ if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && @@ -3717,7 +3750,8 @@ PostmasterStateMachine(void) (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && - AutoVacPID == 0) + AutoVacPID == 0 && + SlotSyncWorkerPID == 0) { if (Shutdown >= ImmediateShutdown || FatalError) { @@ -3815,6 +3849,7 @@ PostmasterStateMachine(void) Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); + Assert(SlotSyncWorkerPID == 0); /* syslogger is not considered here */ pmState = PM_NO_CHILDREN; } @@ -4038,6 +4073,8 @@ TerminateChildren(int signal) signal_child(AutoVacPID, signal); if (PgArchPID != 0) signal_child(PgArchPID, signal); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, signal); } /* @@ -4850,6 +4887,7 @@ SubPostmasterMain(int argc, char *argv[]) */ if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || + strcmp(argv[1], "--forkssworker") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkaux") == 0 || strcmp(argv[1], "--forkbgworker") == 0) @@ -4953,6 +4991,13 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkssworker") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */ + } if (strcmp(argv[1], "--forkbgworker") == 0) { /* do this as early as possible; in particular, before InitProcess() */ @@ -5499,6 +5544,24 @@ MaybeStartWalSummarizer(void) /* + * MaybeStartSlotSyncWorker + * Start the slot sync worker, if not running and our state allows. + * + * We allow to start the slot sync worker when we are on a hot standby, + * fast or immediate shutdown is not in progress, slot sync parameters + * are configured correctly, and it is the first time of worker's launch, + * or enough time has passed since the worker was launched last. + */ +static void +MaybeStartSlotSyncWorker(void) +{ + if (SlotSyncWorkerPID == 0 && pmState == PM_HOT_STANDBY && + Shutdown <= SmartShutdown && sync_replication_slots && + ValidateSlotSyncParams(LOG) && SlotSyncWorkerCanRestart()) + SlotSyncWorkerPID = StartSlotSyncWorker(); +} + +/* * Create the opts file */ static bool diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9270d7b855b..04271ee7032 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -6,6 +6,9 @@ * loaded as a dynamic module to avoid linking the main server binary with * libpq. * + * Apart from walreceiver, the libpq-specific routines are now being used by + * logical replication workers and slot synchronization. + * * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group * * diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 4cc9148c572..36773cfe73f 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -10,18 +10,25 @@ * * This file contains the code for slot synchronization on a physical standby * to fetch logical failover slots information from the primary server, create - * the slots on the standby and synchronize them. This is done by a call to SQL - * function pg_sync_replication_slots. + * the slots on the standby and synchronize them periodically. * - * If on physical standby, the WAL corresponding to the remote's restart_lsn - * is not available or the remote's catalog_xmin precedes the oldest xid for which - * it is guaranteed that rows wouldn't have been removed then we cannot create - * the local standby slot because that would mean moving the local slot + * Slot synchronization can be performed either automatically by enabling slot + * sync worker or manually by calling SQL function pg_sync_replication_slots(). + * + * If the WAL corresponding to the remote's restart_lsn is not available on the + * physical standby or the remote's catalog_xmin precedes the oldest xid for + * which it is guaranteed that rows wouldn't have been removed then we cannot + * create the local standby slot because that would mean moving the local slot * backward and decoding won't be possible via such a slot. In this case, the * slot will be marked as RS_TEMPORARY. Once the primary server catches up, * the slot will be marked as RS_PERSISTENT (which means sync-ready) after - * which we can call pg_sync_replication_slots() periodically to perform - * syncs. + * which slot sync worker can perform the sync periodically or user can call + * pg_sync_replication_slots() periodically to perform the syncs. + * + * The slot sync worker waits for some time before the next synchronization, + * with the duration varying based on whether any slots were updated during + * the last cycle. Refer to the comments above wait_for_slot_activity() for + * more details. * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more @@ -31,28 +38,84 @@ #include "postgres.h" +#include <time.h> + #include "access/xlog_internal.h" #include "access/xlogrecovery.h" #include "catalog/pg_database.h" #include "commands/dbcommands.h" +#include "libpq/pqsignal.h" +#include "pgstat.h" +#include "postmaster/fork_process.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" -/* Struct for sharing information to control slot synchronization. */ +/* + * Struct for sharing information to control slot synchronization. + * + * The slot sync worker's pid is needed by the startup process to shut it + * down during promotion. The startup process shuts down the slot sync worker + * and also sets stopSignaled=true to handle the race condition when the + * postmaster has not noticed the promotion yet and thus may end up restarting + * the slot sync worker. If stopSignaled is set, the worker will exit in such a + * case. Note that we don't need to reset this variable as after promotion the + * slot sync worker won't be restarted because the pmState changes to PM_RUN from + * PM_HOT_STANDBY and we don't support demoting primary without restarting the + * server. See MaybeStartSlotSyncWorker. + * + * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot + * overwrites. + * + * The 'last_start_time' is needed by postmaster to start the slot sync worker + * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart + * is expected (e.g., slot sync GUCs change), slot sync worker will reset + * last_start_time before exiting, so that postmaster can start the worker + * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + * + * All the fields except 'syncing' are used only by slotsync worker. + * 'syncing' is used both by worker and SQL function pg_sync_replication_slots. + */ typedef struct SlotSyncCtxStruct { - /* prevents concurrent slot syncs to avoid slot overwrites */ + pid_t pid; + bool stopSignaled; bool syncing; + time_t last_start_time; slock_t mutex; } SlotSyncCtxStruct; SlotSyncCtxStruct *SlotSyncCtx = NULL; +/* GUC variable */ +bool sync_replication_slots = false; + +/* + * The sleep time (ms) between slot-sync cycles varies dynamically + * (within a MIN/MAX range) according to slot activity. See + * wait_for_slot_activity() for details. + */ +#define MIN_WORKER_NAPTIME_MS 200 +#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ + +static long sleep_ms = MIN_WORKER_NAPTIME_MS; + +/* The restart interval for slot sync work used by postmaster */ +#define SLOTSYNC_RESTART_INTERVAL_SEC 10 + +/* Flag to tell if we are in a slot sync worker process */ +static bool am_slotsync_worker = false; + /* * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag * in SlotSyncCtxStruct, this flag is true only if the current process is @@ -79,6 +142,13 @@ typedef struct RemoteSlot ReplicationSlotInvalidationCause invalidated; } RemoteSlot; +#ifdef EXEC_BACKEND +static pid_t slotsyncworker_forkexec(void); +#endif +NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); + +static void slotsync_failure_callback(int code, Datum arg); + /* * If necessary, update the local synced slot's metadata based on the data * from the remote slot. @@ -343,8 +413,11 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * If the remote restart_lsn and catalog_xmin have caught up with the * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. + * + * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise + * false. */ -static void +static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; @@ -375,7 +448,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) remote_slot->catalog_xmin, LSN_FORMAT_ARGS(slot->data.restart_lsn), slot->data.catalog_xmin)); - return; + return false; } /* First time slot update, the function must return true */ @@ -387,6 +460,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) ereport(LOG, errmsg("newly created slot \"%s\" is sync-ready now", remote_slot->name)); + + return true; } /* @@ -399,12 +474,15 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * the remote_slot catches up with locally reserved position and local slot is * updated. The slot is then persisted and is considered as sync-ready for * periodic syncs. + * + * Returns TRUE if the local slot is updated. */ -static void +static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; + bool slot_updated = false; /* * Make sure that concerned WAL is received and flushed before syncing @@ -412,12 +490,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) */ latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) - elog(ERROR, - "skipping slot synchronization as the received slot sync" - " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), - remote_slot->name, - LSN_FORMAT_ARGS(latestFlushPtr)); + { + ereport(am_slotsync_worker ? LOG : ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return false; + } /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) @@ -465,19 +548,22 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Make sure the invalidated state persists across server restart */ ReplicationSlotMarkDirty(); ReplicationSlotSave(); + + slot_updated = true; } /* Skip the sync of an invalidated slot */ if (slot->data.invalidated != RS_INVAL_NONE) { ReplicationSlotRelease(); - return; + return slot_updated; } /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + slot_updated = update_and_persist_local_synced_slot(remote_slot, + remote_dbid); } /* Slot ready for sync, so sync it. */ @@ -500,6 +586,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlotMarkDirty(); ReplicationSlotSave(); + + slot_updated = true; } } } @@ -511,7 +599,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Skip creating the local slot if remote_slot is invalidated already */ if (remote_slot->invalidated != RS_INVAL_NONE) - return; + return false; /* * We create temporary slots instead of ephemeral slots here because @@ -548,9 +636,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) LWLockRelease(ProcArrayLock); update_and_persist_local_synced_slot(remote_slot, remote_dbid); + + slot_updated = true; } ReplicationSlotRelease(); + + return slot_updated; } /* @@ -558,8 +650,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) * * Gets the failover logical slots info from the primary server and updates * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. */ -static void +static bool synchronize_slots(WalReceiverConn *wrconn) { #define SLOTSYNC_COLUMN_COUNT 9 @@ -569,6 +663,8 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; + bool some_slot_updated = false; + bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," " restart_lsn, catalog_xmin, two_phase, failover," " database, conflict_reason" @@ -589,9 +685,15 @@ synchronize_slots(WalReceiverConn *wrconn) syncing_slots = true; + /* The syscache access in walrcv_exec() needs a transaction env. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + /* Execute the query */ res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); - if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errmsg("could not fetch failover logical slots info from the primary server: %s", @@ -686,7 +788,7 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } @@ -696,11 +798,16 @@ synchronize_slots(WalReceiverConn *wrconn) walrcv_clear_result(res); + if (started_tx) + CommitTransactionCommand(); + SpinLockAcquire(&SlotSyncCtx->mutex); SlotSyncCtx->syncing = false; SpinLockRelease(&SlotSyncCtx->mutex); syncing_slots = false; + + return some_slot_updated; } /* @@ -720,6 +827,7 @@ validate_remote_info(WalReceiverConn *wrconn) TupleTableSlot *tupslot; bool remote_in_recovery; bool primary_slot_valid; + bool started_tx = false; initStringInfo(&cmd); appendStringInfo(&cmd, @@ -728,6 +836,13 @@ validate_remote_info(WalReceiverConn *wrconn) " WHERE slot_type='physical' AND slot_name=%s", quote_literal_cstr(PrimarySlotName)); + /* The syscache access in walrcv_exec() needs a transaction env. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); pfree(cmd.data); @@ -763,28 +878,73 @@ validate_remote_info(WalReceiverConn *wrconn) ExecClearTuple(tupslot); walrcv_clear_result(res); + + if (started_tx) + CommitTransactionCommand(); } /* - * Check all necessary GUCs for slot synchronization are set - * appropriately, otherwise, raise ERROR. + * Checks if dbname is specified in 'primary_conninfo'. + * + * Error out if not specified otherwise return it. */ -void -ValidateSlotSyncParams(void) +char * +CheckAndGetDbnameFromConninfo(void) { char *dbname; /* + * The slot synchronization needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + ereport(ERROR, + + /* + * translator: dbname is a specific option; %s is a GUC variable name + */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("slot synchronization requires dbname to be specified in %s", + "primary_conninfo")); + return dbname; +} + +/* + * Return true if all necessary GUCs for slot synchronization are set + * appropriately, otherwise, return false. + */ +bool +ValidateSlotSyncParams(int elevel) +{ + /* + * Logical slot sync/creation requires wal_level >= logical. + * + * Sincle altering the wal_level requires a server restart, so error out + * in this case regardless of elevel provided by caller. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + { + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("slot synchronization requires wal_level >= \"logical\"")); + return false; + } + + /* * A physical replication slot(primary_slot_name) is required on the * primary to ensure that the rows needed by the standby are not removed * after restarting, so that the synchronized slot on the standby will not * be invalidated. */ if (PrimarySlotName == NULL || *PrimarySlotName == '\0') - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("slot synchronization requires %s to be defined", "primary_slot_name")); + return false; + } /* * hot_standby_feedback must be enabled to cooperate with the physical @@ -792,47 +952,478 @@ ValidateSlotSyncParams(void) * catalog_xmin values on the standby. */ if (!hot_standby_feedback) - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("slot synchronization requires %s to be enabled", "hot_standby_feedback")); - - /* Logical slot sync/creation requires wal_level >= logical. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("slot synchronization requires wal_level >= \"logical\"")); + return false; + } /* * The primary_conninfo is required to make connection to primary for * getting slots information. */ if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0') - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("slot synchronization requires %s to be defined", "primary_conninfo")); + return false; + } + + return true; +} + +/* + * Re-read the config file. + * + * Exit if any of the slot sync GUCs have changed. The postmaster will + * restart it. + */ +static void +slotsync_reread_config(void) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_sync_replication_slots = sync_replication_slots; + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + Assert(sync_replication_slots); + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + pfree(old_primary_conninfo); + pfree(old_primary_slotname); + + if (old_sync_replication_slots != sync_replication_slots) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots")); + proc_exit(0); + } + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of a parameter change")); + + /* + * Reset the last-start time for this worker so that the postmaster + * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + */ + SlotSyncCtx->last_start_time = 0; + + proc_exit(0); + } + +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + errmsg("slot sync worker is shutting down on receiving SIGINT")); + + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(); +} + +/* + * Cleanup function for slotsync worker. + * + * Called on slotsync worker exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->pid = InvalidPid; + SpinLockRelease(&SlotSyncCtx->mutex); +} + +/* + * Sleep for long enough that we believe it's likely that the slots on primary + * get updated. + * + * If there is no slot activity the wait time between sync-cycles will double + * (to a maximum of 30s). If there is some slot activity the wait time between + * sync-cycles is reset to the minimum (200ms). + */ +static void +wait_for_slot_activity(bool some_slot_updated) +{ + int rc; + + if (!some_slot_updated) + { + /* + * No slots were updated, so double the sleep time, but not beyond the + * maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS); + } + else + { + /* + * Some slots were updated since the last sleep, so reset the sleep + * time. + */ + sleep_ms = MIN_WORKER_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_ms, + WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +NON_EXEC_STATIC void +ReplSlotSyncWorkerMain(int argc, char *argv[]) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + char *err; + sigjmp_buf local_sigjmp_buf; + StringInfoData app_name; + + am_slotsync_worker = true; + + MyBackendType = B_SLOTSYNC_WORKER; + + init_ps_display(NULL); + + SetProcessingMode(InitProcessing); /* - * The slot synchronization needs a database connection for walrcv_exec to - * work. + * Create a per-backend PGPROC struct in shared memory. We must do this + * before we access any shared memory. */ - dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); - if (dbname == NULL) - ereport(ERROR, + InitProcess(); + + /* + * Early initialization. + */ + BaseInit(); + + Assert(SlotSyncCtx != NULL); + + SpinLockAcquire(&SlotSyncCtx->mutex); + Assert(SlotSyncCtx->pid == InvalidPid); + + /* + * Startup process signaled the slot sync worker to stop, so if meanwhile + * postmaster ended up starting the worker again, exit. + */ + if (SlotSyncCtx->stopSignaled) + { + SpinLockRelease(&SlotSyncCtx->mutex); + proc_exit(0); + } + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncCtx->pid = MyProcPid; + SpinLockRelease(&SlotSyncCtx->mutex); + + ereport(LOG, errmsg("slot sync worker started")); + + /* Register it as soon as SlotSyncCtx->pid is initialized. */ + before_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * If an exception is encountered, processing resumes here. + * + * We just need to clean up, report the error, and go away. + * + * If we do not have this handling here, then since this worker process + * operates at the bottom of the exception stack, ERRORs turn into FATALs. + * Therefore, we create our own exception handler to catch ERRORs. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); /* - * translator: dbname is a specific option; %s is a GUC variable name + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. */ - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("slot synchronization requires dbname to be specified in %s", - "primary_conninfo")); + proc_exit(0); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + dbname = CheckAndGetDbnameFromConninfo(); + + /* + * Connect to the database specified by the user in primary_conninfo. We + * need a database connection for walrcv_exec to work which we use to + * fetch slot information from the remote node. See comments atop + * libpqrcv_exec. + * + * We do not specify a specific user here since the slot sync worker will + * operate as a superuser. This is safe because the slot sync worker does + * not interact with user tables, eliminating the risk of executing + * arbitrary code within triggers. + */ + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); + + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker"); + else + appendStringInfo(&app_name, "%s", "slotsync worker"); + + /* + * Establish the connection to the primary server for slot + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, &err); + pfree(app_name.data); + + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Register the failure callback once we have the connection. + * + * XXX: This can be combined with previous such cleanup registration of + * slotsync_worker_onexit() but that will need the connection to be made + * global and we want to avoid introducing global for this purpose. + */ + before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn)); + + /* + * Using the specified primary server connection, check that we are not a + * cascading standby and slot configured in 'primary_slot_name' exists on + * the primary server. + */ + validate_remote_info(wrconn); + + /* Main loop to synchronize slots */ + for (;;) + { + bool some_slot_updated = false; + + ProcessSlotSyncInterrupts(wrconn); + + some_slot_updated = synchronize_slots(wrconn); + + wait_for_slot_activity(some_slot_updated); + } + + /* + * The slot sync worker can't get here because it will only stop when it + * receives a SIGINT from the startup process, or when there is an error. + */ + Assert(false); +} + +/* + * Main entry point for slot sync worker process, to be called from the + * postmaster. + */ +int +StartSlotSyncWorker(void) +{ + pid_t pid; + +#ifdef EXEC_BACKEND + switch ((pid = slotsyncworker_forkexec())) + { +#else + switch ((pid = fork_process())) + { + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); + + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + ReplSlotSyncWorkerMain(0, NULL); + break; +#endif + case -1: + ereport(LOG, + (errmsg("could not fork slot sync worker process: %m"))); + return 0; + + default: + return (int) pid; + } + + /* shouldn't get here */ + return 0; +} + +#ifdef EXEC_BACKEND +/* + * The forkexec routine for the slot sync worker process. + * + * Format up the arglist, then fork and exec. + */ +static pid_t +slotsyncworker_forkexec(void) +{ + char *av[10]; + int ac = 0; + + av[ac++] = "postgres"; + av[ac++] = "--forkssworker"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncCtx->mutex); + + SlotSyncCtx->stopSignaled = true; + + if (SlotSyncCtx->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncCtx->mutex); + return; + } + SpinLockRelease(&SlotSyncCtx->mutex); + + kill(SlotSyncCtx->pid, SIGINT); + + /* Wait for it to die */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncCtx->mutex); + + /* Is it gone? */ + if (SlotSyncCtx->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncCtx->mutex); + } + + SpinLockRelease(&SlotSyncCtx->mutex); +} + +/* + * SlotSyncWorkerCanRestart + * + * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed + * since it was launched last. Otherwise returns false. + * + * This is a safety valve to protect against continuous respawn attempts if the + * worker is dying immediately at launch. Note that since we will retry to + * launch the worker from the postmaster main loop, we will get another + * chance later. + */ +bool +SlotSyncWorkerCanRestart(void) +{ + time_t curtime = time(NULL); + + /* Return false if too soon since last start. */ + if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) < + (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC) + return false; + + SlotSyncCtx->last_start_time = curtime; + + return true; } /* - * Is current process syncing replication slots ? + * Is current process syncing replication slots? + * + * Could be either backend executing SQL function or slot sync worker. */ bool IsSyncingReplicationSlots(void) @@ -841,6 +1432,15 @@ IsSyncingReplicationSlots(void) } /* + * Is current process a slot sync worker? + */ +bool +IsLogicalSlotSyncWorker(void) +{ + return am_slotsync_worker; +} + +/* * Amount of shared memory required for slot synchronization. */ Size @@ -855,14 +1455,16 @@ SlotSyncShmemSize(void) void SlotSyncShmemInit(void) { + Size size = SlotSyncShmemSize(); bool found; SlotSyncCtx = (SlotSyncCtxStruct *) - ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found); + ShmemInitStruct("Slot Sync Data", size, &found); if (!found) { - SlotSyncCtx->syncing = false; + memset(SlotSyncCtx, 0, size); + SlotSyncCtx->pid = InvalidPid; SpinLockInit(&SlotSyncCtx->mutex); } } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 233652b4799..033b4ce0971 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1252,6 +1252,20 @@ restart: * concurrently being dropped by a backend connected to another DB. * * That's fairly unlikely in practice, so we'll just bail out. + * + * The slot sync worker holds a shared lock on the database before + * operating on synced logical slots to avoid conflict with the drop + * happening here. The persistent synced slots are thus safe but there + * is a possibility that the slot sync worker has created a temporary + * slot (which stays active even on release) and we are trying to drop + * that here. In practice, the chances of hitting this scenario are + * less as during slot synchronization, the temporary slot is + * immediately converted to persistent and thus is safe due to the + * shared lock taken on the database. So, we'll just bail out in such + * a case. + * + * XXX: We can consider shutting down the slot sync worker before + * trying to drop synced temporary slots here. */ if (active_pid) ereport(ERROR, diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index c108bf9608f..768a304723b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -960,10 +960,12 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slots can only be synchronized to a standby server")); + ValidateSlotSyncParams(ERROR); + /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - ValidateSlotSyncParams(); + (void) CheckAndGetDbnameFromConninfo(); initStringInfo(&app_name); if (cluster_name[0]) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 631d1e0c9fd..13bc3e0aee4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3389,7 +3389,7 @@ WalSndDone(WalSndSendDataCallback send_data) * This should only be called when in recovery. * * This is called either by cascading walsender to find WAL postion to be sent - * to a cascaded standby or by slot synchronization function to validate remote + * to a cascaded standby or by slot synchronization operation to validate remote * slot's lsn before syncing it locally. * * As a side-effect, *tli is updated to the TLI of the last diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 4aec4a3c5f4..6e334971dc9 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -40,6 +40,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/syncrep.h" #include "replication/walsender.h" #include "storage/condition_variable.h" @@ -366,8 +367,12 @@ InitProcess(void) * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in * this; it probably should.) + * + * Slot sync worker also does not participate in it, see comments atop + * 'struct bkend' in postmaster.c. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildActive(); /* @@ -939,8 +944,12 @@ ProcKill(int code, Datum arg) * This process is no longer present in shared memory in any meaningful * way, so tell the postmaster we've cleaned up acceptably well. (XXX * autovac launcher should be included here someday) + * + * Slot sync worker is also not a postmaster child, so skip this shared + * memory related processing here. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildInactive(); /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index 43c393d6fe8..9d6e0673827 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -338,6 +338,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_BG_WORKER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_SLOTSYNC_WORKER: case B_STANDALONE_BACKEND: case B_STARTUP: case B_WAL_SENDER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 6464386b779..4fffb466255 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 23f77a59e58..77fd8047563 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "replication/slotsync.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -293,6 +294,9 @@ GetBackendTypeDesc(BackendType backendType) case B_LOGGER: backendDesc = "logger"; break; + case B_SLOTSYNC_WORKER: + backendDesc = "slotsync worker"; + break; case B_STANDALONE_BACKEND: backendDesc = "standalone backend"; break; @@ -835,9 +839,10 @@ InitializeSessionUserIdStandalone(void) { /* * This function should only be called in single-user mode, in autovacuum - * workers, and in background workers. + * workers, in slot sync worker and in background workers. */ - Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); + Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || + IsLogicalSlotSyncWorker() || IsBackgroundWorker); /* call only once */ Assert(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 7797876d008..5ffe9bdd987 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -43,6 +43,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -876,10 +877,11 @@ InitPostgres(const char *in_dbname, Oid dboid, * Perform client authentication if necessary, then figure out our * postgres user ID, and see if we are a superuser. * - * In standalone mode and in autovacuum worker processes, we use a fixed - * ID, otherwise we figure it out from the authenticated user name. + * In standalone mode, autovacuum worker processes and slot sync worker + * process, we use a fixed ID, otherwise we figure it out from the + * authenticated user name. */ - if (bootstrap || IsAutoVacuumWorkerProcess()) + if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker()) { InitializeSessionUserIdStandalone(); am_superuser = true; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 70652f0a3fc..37be0669bba 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/large_object.h" @@ -2054,6 +2055,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"sync_replication_slots", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &sync_replication_slots, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e10755972ae..c97f9a25f05 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 0445fbf61d7..612fb5f42e0 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -333,6 +333,7 @@ typedef enum BackendType B_BG_WRITER, B_CHECKPOINTER, B_LOGGER, + B_SLOTSYNC_WORKER, B_STANDALONE_BACKEND, B_STARTUP, B_WAL_RECEIVER, diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h index e86d8a47b85..726d65f9b62 100644 --- a/src/include/replication/slotsync.h +++ b/src/include/replication/slotsync.h @@ -14,8 +14,27 @@ #include "replication/walreceiver.h" -extern void ValidateSlotSyncParams(void); +extern PGDLLIMPORT bool sync_replication_slots; + +/* + * GUCs needed by slot sync worker to connect to the primary + * server and carry on with slots synchronization. + */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; + +extern char *CheckAndGetDbnameFromConninfo(void); +extern bool ValidateSlotSyncParams(int elevel); + +#ifdef EXEC_BACKEND +extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); +#endif +extern int StartSlotSyncWorker(void); + +extern void ShutDownSlotSync(void); +extern bool SlotSyncWorkerCanRestart(void); extern bool IsSyncingReplicationSlots(void); +extern bool IsLogicalSlotSyncWorker(void); extern Size SlotSyncShmemSize(void); extern void SlotSyncShmemInit(void); extern void SyncReplicationSlots(WalReceiverConn *wrconn); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 0f2f819f53b..e24009610ad 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -322,6 +322,10 @@ ok( $stderr =~ /ERROR: slot synchronization requires dbname to be specified in primary_conninfo/, "cannot sync slots if dbname is not specified in primary_conninfo"); +# Add the dbname back to the primary_conninfo for further tests +$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres'"); +$standby1->reload; + ################################################## # Test that we cannot synchronize slots to a cascading standby server. ################################################## @@ -355,4 +359,120 @@ ok( $stderr =~ /ERROR: cannot synchronize replication slots from a standby server/, "cannot sync slots to a cascading standby server"); +$cascading_standby->stop; + +################################################## +# Test to confirm that the slot sync worker exits on invalid GUC(s) and +# get started again on valid GUC(s). +################################################## + +$log_offset = -s $standby1->logfile; + +# Enable slot sync worker. +$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on)); +$standby1->reload; + +# Confirm that the slot sync worker is able to start. +$standby1->wait_for_log(qr/LOG: slot sync worker started/, + $log_offset); + +$log_offset = -s $standby1->logfile; + +# Disable another GUC required for slot sync. +$standby1->append_conf( 'postgresql.conf', qq(hot_standby_feedback = off)); +$standby1->reload; + +# Confirm that slot sync worker acknowledge the GUC change and logs the msg +# about wrong configuration. +$standby1->wait_for_log(qr/LOG: slot sync worker will restart because of a parameter change/, + $log_offset); +$standby1->wait_for_log(qr/LOG: slot synchronization requires hot_standby_feedback to be enabled/, + $log_offset); + +$log_offset = -s $standby1->logfile; + +# Re-enable the required GUC +$standby1->append_conf('postgresql.conf', "hot_standby_feedback = on"); +$standby1->reload; + +# Confirm that the slot sync worker is able to start now. +$standby1->wait_for_log(qr/LOG: slot sync worker started/, + $log_offset); + +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby via the slot sync worker. +################################################## + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +# Subscribe to the new table data and wait for it to arrive +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 ENABLE; + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; +]); + +$subscriber1->wait_for_subscription_sync; + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the slot 'lsub1_slot' is retained on the new primary +# b) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +# Confirm the synced slot 'lsub1_slot' is retained on the new primary +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on the subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); + done_testing(); |