diff options
-rw-r--r-- | doc/src/sgml/config.sgml | 4 | ||||
-rw-r--r-- | doc/src/sgml/monitoring.sgml | 10 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 232 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 20 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 4 | ||||
-rw-r--r-- | src/include/replication/logicallauncher.h | 2 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 2 |
9 files changed, 243 insertions, 49 deletions
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index dc9b78b0b7d..f985afc009d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" environments where the number of times an infrastructure is accessed is taken into account. </para> + <para> + In logical replication, this parameter also limits how often a failing + replication apply worker will be respawned. + </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e3a783abd0f..1756f1a4b67 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2009,6 +2009,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser about <quote>heavyweight</quote> locks.</entry> </row> <row> + <entry><literal>LogicalRepLauncherDSA</literal></entry> + <entry>Waiting to access logical replication launcher's dynamic shared + memory allocator.</entry> + </row> + <row> + <entry><literal>LogicalRepLauncherHash</literal></entry> + <entry>Waiting to access logical replication launcher's shared + hash table.</entry> + </row> + <row> <entry><literal>LogicalRepWorker</literal></entry> <entry>Waiting to read or update the state of logical replication workers.</entry> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index baff00dd74e..464db6d247f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1505,6 +1505,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) list_free(subworkers); /* + * Remove the no-longer-useful entry in the launcher's table of apply + * worker start times. + * + * If this transaction rolls back, the launcher might restart a failed + * apply worker before wal_retrieve_retry_interval milliseconds have + * elapsed, but that's pretty harmless. + */ + ApplyLauncherForgetWorkerStartTime(subid); + + /* * Cleanup of tablesync replication origins. * * Any READY-state relations would already have dealt with clean-ups. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 27e58566cec..564bffe5caf 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -25,6 +25,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct /* Supervisor process. */ pid_t launcher_pid; + /* Hash table holding last start times of subscriptions' apply workers. */ + dsa_handle last_start_dsa; + dshash_table_handle last_start_dsh; + /* Background workers. */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; static LogicalRepCtxStruct *LogicalRepCtx; +/* an entry in the last-start-times shared hash table */ +typedef struct LauncherLastStartTimesEntry +{ + Oid subid; /* OID of logrep subscription (hash key) */ + TimestampTz last_start_time; /* last time its apply worker was started */ +} LauncherLastStartTimesEntry; + +/* parameters for the last-start-times shared hash table */ +static const dshash_parameters dsh_params = { + sizeof(Oid), + sizeof(LauncherLastStartTimesEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_LAUNCHER_HASH +}; + +static dsa_area *last_start_times_dsa = NULL; +static dshash_table *last_start_times = NULL; + +static bool on_commit_launcher_wakeup = false; + + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); static int logicalrep_pa_worker_count(Oid subid); - -static bool on_commit_launcher_wakeup = false; +static void logicalrep_launcher_attach_dshmem(void); +static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); +static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); /* @@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void) memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); + LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; + /* Initialize memory and spin locks for each worker slot. */ for (slot = 0; slot < max_logical_replication_workers; slot++) { @@ -906,6 +937,105 @@ ApplyLauncherShmemInit(void) } /* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. + * This must be called before accessing the table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this. */ + if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID && + last_start_times != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Set the last-start time for the subscription. + */ +static void +ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +ApplyLauncherGetWorkerStartTime(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start-time entry for the subscription, if one exists. + * + * This has two use-cases: to remove the entry related to a subscription + * that's been deleted or disabled (just to avoid leaking shared memory), + * and to allow immediate restart of an apply worker that has exited + * due to subscription parameter changes. + */ +void +ApplyLauncherForgetWorkerStartTime(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} + +/* * Wakeup the launcher on commit if requested. */ void @@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg) ListCell *lc; MemoryContext subctx; MemoryContext oldctx; - TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - now = GetCurrentTimestamp(); + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + if (!sub->enabled) + continue; - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, DSM_HANDLE_INVALID); - } - } + if (w != NULL) + continue; /* worker is running already */ - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases + * where a restart is expected (e.g., subscription parameter + * changes), another process should remove the last-start entry + * for the subscription so that the worker can be restarted + * without waiting for wal_retrieve_retry_interval to elapse. */ - wait_time = wal_retrieve_retry_interval; + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + wait_time = Min(wait_time, + wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 38dfce71296..4647837b823 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } if (should_exit) + { + /* + * Reset the last-start time for this worker so that the launcher will + * restart it without waiting for wal_retrieve_retry_interval. + */ + ApplyLauncherForgetWorkerStartTime(MySubscription->oid); + proc_exit(0); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a0084c7ef69..cfb2ab62481 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -174,6 +174,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3811,6 +3812,15 @@ apply_worker_exit(void) return; } + /* + * Reset the last-start time for this apply worker so that the launcher + * will restart it without waiting for wal_retrieve_retry_interval if the + * subscription is still active, and so that we won't leak that hash table + * entry if it isn't. + */ + if (!am_tablesync_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + proc_exit(0); } @@ -3851,6 +3861,9 @@ maybe_reread_subscription(void) (errmsg("%s for subscription \"%s\" will stop because the subscription was removed", get_worker_name(), MySubscription->name))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4421,6 +4434,9 @@ InitializeApplyWorker(void) (errmsg("%s for subscription %u will not start because the subscription was removed during startup", get_worker_name(), MyLogicalRepWorker->subid))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void) DisableSubscription(MySubscription->oid); CommitTransactionCommand(); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + /* Notify the subscription has been disabled and exit */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3d..d2ec3960451 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 360e98702a8..a07c9cb311a 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); +extern void ApplyLauncherForgetWorkerStartTime(Oid subid); + extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613c..d2c7afb8f40 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; |