aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/config.sgml4
-rw-r--r--doc/src/sgml/monitoring.sgml10
-rw-r--r--src/backend/commands/subscriptioncmds.c10
-rw-r--r--src/backend/replication/logical/launcher.c232
-rw-r--r--src/backend/replication/logical/tablesync.c8
-rw-r--r--src/backend/replication/logical/worker.c20
-rw-r--r--src/backend/storage/lmgr/lwlock.c4
-rw-r--r--src/include/replication/logicallauncher.h2
-rw-r--r--src/include/storage/lwlock.h2
9 files changed, 243 insertions, 49 deletions
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index dc9b78b0b7d..f985afc009d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
environments where the number of times an infrastructure is accessed
is taken into account.
</para>
+ <para>
+ In logical replication, this parameter also limits how often a failing
+ replication apply worker will be respawned.
+ </para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index e3a783abd0f..1756f1a4b67 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2009,6 +2009,16 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
about <quote>heavyweight</quote> locks.</entry>
</row>
<row>
+ <entry><literal>LogicalRepLauncherDSA</literal></entry>
+ <entry>Waiting to access logical replication launcher's dynamic shared
+ memory allocator.</entry>
+ </row>
+ <row>
+ <entry><literal>LogicalRepLauncherHash</literal></entry>
+ <entry>Waiting to access logical replication launcher's shared
+ hash table.</entry>
+ </row>
+ <row>
<entry><literal>LogicalRepWorker</literal></entry>
<entry>Waiting to read or update the state of logical replication
workers.</entry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00dd74e..464db6d247f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1505,6 +1505,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
list_free(subworkers);
/*
+ * Remove the no-longer-useful entry in the launcher's table of apply
+ * worker start times.
+ *
+ * If this transaction rolls back, the launcher might restart a failed
+ * apply worker before wal_retrieve_retry_interval milliseconds have
+ * elapsed, but that's pretty harmless.
+ */
+ ApplyLauncherForgetWorkerStartTime(subid);
+
+ /*
* Cleanup of tablesync replication origins.
*
* Any READY-state relations would already have dealt with clean-ups.
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 27e58566cec..564bffe5caf 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
+#include "lib/dshash.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
/* Supervisor process. */
pid_t launcher_pid;
+ /* Hash table holding last start times of subscriptions' apply workers. */
+ dsa_handle last_start_dsa;
+ dshash_table_handle last_start_dsh;
+
/* Background workers. */
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
static LogicalRepCtxStruct *LogicalRepCtx;
+/* an entry in the last-start-times shared hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+ Oid subid; /* OID of logrep subscription (hash key) */
+ TimestampTz last_start_time; /* last time its apply worker was started */
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start-times shared hash table */
+static const dshash_parameters dsh_params = {
+ sizeof(Oid),
+ sizeof(LauncherLastStartTimesEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
+static bool on_commit_launcher_wakeup = false;
+
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
static int logicalrep_pa_worker_count(Oid subid);
-
-static bool on_commit_launcher_wakeup = false;
+static void logicalrep_launcher_attach_dshmem(void);
+static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
+static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
/*
@@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
+ LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+ LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
+
/* Initialize memory and spin locks for each worker slot. */
for (slot = 0; slot < max_logical_replication_workers; slot++)
{
@@ -906,6 +937,105 @@ ApplyLauncherShmemInit(void)
}
/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.
+ * This must be called before accessing the table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+ MemoryContext oldcontext;
+
+ /* Quick exit if we already did this. */
+ if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID &&
+ last_start_times != NULL)
+ return;
+
+ /* Otherwise, use a lock to ensure only one process creates the table. */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+ /* Be sure any local memory allocated by DSA routines is persistent. */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+ {
+ /* Initialize dynamic shared hash table for last-start times. */
+ last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+ dsa_pin(last_start_times_dsa);
+ dsa_pin_mapping(last_start_times_dsa);
+ last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+ /* Store handles in shared memory for other backends to use. */
+ LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+ LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+ }
+ else if (!last_start_times)
+ {
+ /* Attach to existing dynamic shared hash table. */
+ last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+ dsa_pin_mapping(last_start_times_dsa);
+ last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+ LogicalRepCtx->last_start_dsh, 0);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
+{
+ LauncherLastStartTimesEntry *entry;
+ bool found;
+
+ logicalrep_launcher_attach_dshmem();
+
+ entry = dshash_find_or_insert(last_start_times, &subid, &found);
+ entry->last_start_time = start_time;
+ dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+ApplyLauncherGetWorkerStartTime(Oid subid)
+{
+ LauncherLastStartTimesEntry *entry;
+ TimestampTz ret;
+
+ logicalrep_launcher_attach_dshmem();
+
+ entry = dshash_find(last_start_times, &subid, false);
+ if (entry == NULL)
+ return 0;
+
+ ret = entry->last_start_time;
+ dshash_release_lock(last_start_times, entry);
+
+ return ret;
+}
+
+/*
+ * Remove the last-start-time entry for the subscription, if one exists.
+ *
+ * This has two use-cases: to remove the entry related to a subscription
+ * that's been deleted or disabled (just to avoid leaking shared memory),
+ * and to allow immediate restart of an apply worker that has exited
+ * due to subscription parameter changes.
+ */
+void
+ApplyLauncherForgetWorkerStartTime(Oid subid)
+{
+ logicalrep_launcher_attach_dshmem();
+
+ (void) dshash_delete_key(last_start_times, &subid);
+}
+
+/*
* Wakeup the launcher on commit if requested.
*/
void
@@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
void
ApplyLauncherMain(Datum main_arg)
{
- TimestampTz last_start_time = 0;
-
ereport(DEBUG1,
(errmsg_internal("logical replication launcher started")));
@@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg)
ListCell *lc;
MemoryContext subctx;
MemoryContext oldctx;
- TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
CHECK_FOR_INTERRUPTS();
- now = GetCurrentTimestamp();
+ /* Use temporary context to avoid leaking memory across cycles. */
+ subctx = AllocSetContextCreate(TopMemoryContext,
+ "Logical Replication Launcher sublist",
+ ALLOCSET_DEFAULT_SIZES);
+ oldctx = MemoryContextSwitchTo(subctx);
- /* Limit the start retry to once a wal_retrieve_retry_interval */
- if (TimestampDifferenceExceeds(last_start_time, now,
- wal_retrieve_retry_interval))
+ /* Start any missing workers for enabled subscriptions. */
+ sublist = get_subscription_list();
+ foreach(lc, sublist)
{
- /* Use temporary context for the database list and worker info. */
- subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
- ALLOCSET_DEFAULT_SIZES);
- oldctx = MemoryContextSwitchTo(subctx);
+ Subscription *sub = (Subscription *) lfirst(lc);
+ LogicalRepWorker *w;
+ TimestampTz last_start;
+ TimestampTz now;
+ long elapsed;
- /* search for subscriptions to start or stop. */
- sublist = get_subscription_list();
-
- /* Start the missing workers for enabled subscriptions. */
- foreach(lc, sublist)
- {
- Subscription *sub = (Subscription *) lfirst(lc);
- LogicalRepWorker *w;
+ if (!sub->enabled)
+ continue;
- if (!sub->enabled)
- continue;
-
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
- LWLockRelease(LogicalRepWorkerLock);
-
- if (w == NULL)
- {
- last_start_time = now;
- wait_time = wal_retrieve_retry_interval;
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ LWLockRelease(LogicalRepWorkerLock);
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid, DSM_HANDLE_INVALID);
- }
- }
+ if (w != NULL)
+ continue; /* worker is running already */
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
- }
- else
- {
/*
- * The wait in previous cycle was interrupted in less than
- * wal_retrieve_retry_interval since last worker was started, this
- * usually means crash of the worker, so we should retry in
- * wal_retrieve_retry_interval again.
+ * If the worker is eligible to start now, launch it. Otherwise,
+ * adjust wait_time so that we'll wake up as soon as it can be
+ * started.
+ *
+ * Each subscription's apply worker can only be restarted once per
+ * wal_retrieve_retry_interval, so that errors do not cause us to
+ * repeatedly restart the worker as fast as possible. In cases
+ * where a restart is expected (e.g., subscription parameter
+ * changes), another process should remove the last-start entry
+ * for the subscription so that the worker can be restarted
+ * without waiting for wal_retrieve_retry_interval to elapse.
*/
- wait_time = wal_retrieve_retry_interval;
+ last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+ now = GetCurrentTimestamp();
+ if (last_start == 0 ||
+ (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
+ {
+ ApplyLauncherSetWorkerStartTime(sub->oid, now);
+ logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid,
+ DSM_HANDLE_INVALID);
+ }
+ else
+ {
+ wait_time = Min(wait_time,
+ wal_retrieve_retry_interval - elapsed);
+ }
}
+ /* Switch back to original memory context. */
+ MemoryContextSwitchTo(oldctx);
+ /* Clean the temporary memory. */
+ MemoryContextDelete(subctx);
+
/* Wait for more work. */
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce71296..4647837b823 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
if (should_exit)
+ {
+ /*
+ * Reset the last-start time for this worker so that the launcher will
+ * restart it without waiting for wal_retrieve_retry_interval.
+ */
+ ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
+
proc_exit(0);
+ }
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a0084c7ef69..cfb2ab62481 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -174,6 +174,7 @@
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
+#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
@@ -3811,6 +3812,15 @@ apply_worker_exit(void)
return;
}
+ /*
+ * Reset the last-start time for this apply worker so that the launcher
+ * will restart it without waiting for wal_retrieve_retry_interval if the
+ * subscription is still active, and so that we won't leak that hash table
+ * entry if it isn't.
+ */
+ if (!am_tablesync_worker())
+ ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
+
proc_exit(0);
}
@@ -3851,6 +3861,9 @@ maybe_reread_subscription(void)
(errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
get_worker_name(), MySubscription->name)));
+ /* Ensure we remove no-longer-useful entry for worker's start time */
+ if (!am_tablesync_worker() && !am_parallel_apply_worker())
+ ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
proc_exit(0);
}
@@ -4421,6 +4434,9 @@ InitializeApplyWorker(void)
(errmsg("%s for subscription %u will not start because the subscription was removed during startup",
get_worker_name(), MyLogicalRepWorker->subid)));
+ /* Ensure we remove no-longer-useful entry for worker's start time */
+ if (!am_tablesync_worker() && !am_parallel_apply_worker())
+ ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
proc_exit(0);
}
@@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void)
DisableSubscription(MySubscription->oid);
CommitTransactionCommand();
+ /* Ensure we remove no-longer-useful entry for worker's start time */
+ if (!am_tablesync_worker() && !am_parallel_apply_worker())
+ ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
+
/* Notify the subscription has been disabled and exit */
ereport(LOG,
errmsg("subscription \"%s\" has been disabled because of an error",
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 196bece0a3d..d2ec3960451 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
"PgStatsHash",
/* LWTRANCHE_PGSTATS_DATA: */
"PgStatsData",
+ /* LWTRANCHE_LAUNCHER_DSA: */
+ "LogicalRepLauncherDSA",
+ /* LWTRANCHE_LAUNCHER_HASH: */
+ "LogicalRepLauncherHash",
};
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 360e98702a8..a07c9cb311a 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg);
extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
+extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
+
extern void ApplyLauncherWakeupAtCommit(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e4162db613c..d2c7afb8f40 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_PGSTATS_DSA,
LWTRANCHE_PGSTATS_HASH,
LWTRANCHE_PGSTATS_DATA,
+ LWTRANCHE_LAUNCHER_DSA,
+ LWTRANCHE_LAUNCHER_HASH,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;