aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-03-25 16:34:33 +0530
committerAmit Kapila <akapila@postgresql.org>2024-03-25 16:34:33 +0530
commita11f330b5584f2430371d68871e00f5c63735299 (patch)
treeb53cafab5f97bb5440db906e20ca0f38e46a0fd9 /src/backend
parent0f7863afef67e462574fe5af6317e26a2f2d47fb (diff)
downloadpostgresql-a11f330b5584f2430371d68871e00f5c63735299.tar.gz
postgresql-a11f330b5584f2430371d68871e00f5c63735299.zip
Track last_inactive_time in pg_replication_slots.
This commit adds a new property called last_inactive_time for slots. It is set to 0 whenever a slot is made active/acquired and set to the current timestamp whenever the slot is inactive/released or restored from the disk. Note that we don't set the last_inactive_time for the slots currently being synced from the primary to the standby because such slots are typically inactive as decoding is not allowed on those. The 'last_inactive_time' will be useful on production servers to debug and analyze inactive replication slots. It will also help to know the lifetime of a replication slot - one can know how long a streaming standby, logical subscriber, or replication slot consumer is down. The 'last_inactive_time' will also be useful to implement inactive timeout-based replication slot invalidation in a future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/slot.c35
-rw-r--r--src/backend/replication/slotfuncs.c7
3 files changed, 42 insertions, 1 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f69b7f55801..bc70ff193e3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
+ L.last_inactive_time,
L.conflicting,
L.invalidation_reason,
L.failover,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cdf0c450c59..eaa74925691 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+ slot->last_inactive_time = 0;
/*
* Create the slot on disk. We haven't actually marked the slot allocated
@@ -622,6 +623,11 @@ retry:
if (SlotIsLogical(s))
pgstat_acquire_replslot(s);
+ /* Reset the last inactive time as the slot is active now. */
+ SpinLockAcquire(&s->mutex);
+ s->last_inactive_time = 0;
+ SpinLockRelease(&s->mutex);
+
if (am_walsender)
{
ereport(log_replication_commands ? LOG : DEBUG1,
@@ -645,6 +651,7 @@ ReplicationSlotRelease(void)
ReplicationSlot *slot = MyReplicationSlot;
char *slotname = NULL; /* keep compiler quiet */
bool is_logical = false; /* keep compiler quiet */
+ TimestampTz now = 0;
Assert(slot != NULL && slot->active_pid != 0);
@@ -679,6 +686,15 @@ ReplicationSlotRelease(void)
ReplicationSlotsComputeRequiredXmin(false);
}
+ /*
+ * Set the last inactive time after marking the slot inactive. We don't set
+ * it for the slots currently being synced from the primary to the standby
+ * because such slots are typically inactive as decoding is not allowed on
+ * those.
+ */
+ if (!(RecoveryInProgress() && slot->data.synced))
+ now = GetCurrentTimestamp();
+
if (slot->data.persistency == RS_PERSISTENT)
{
/*
@@ -687,9 +703,16 @@ ReplicationSlotRelease(void)
*/
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
+ slot->last_inactive_time = now;
SpinLockRelease(&slot->mutex);
ConditionVariableBroadcast(&slot->active_cv);
}
+ else
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->last_inactive_time = now;
+ SpinLockRelease(&slot->mutex);
+ }
MyReplicationSlot = NULL;
@@ -2342,6 +2365,18 @@ RestoreSlotFromDisk(const char *name)
slot->in_use = true;
slot->active_pid = 0;
+ /*
+ * We set the last inactive time after loading the slot from the disk
+ * into memory. Whoever acquires the slot i.e. makes the slot active
+ * will reset it. We don't set it for the slots currently being synced
+ * from the primary to the standby because such slots are typically
+ * inactive as decoding is not allowed on those.
+ */
+ if (!(RecoveryInProgress() && slot->data.synced))
+ slot->last_inactive_time = GetCurrentTimestamp();
+ else
+ slot->last_inactive_time = 0;
+
restored = true;
break;
}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 4232c1e52e5..24f5e6d90a5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 18
+#define PG_GET_REPLICATION_SLOTS_COLS 19
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+ if (slot_contents.last_inactive_time > 0)
+ values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time);
+ else
+ nulls[i++] = true;
+
cause = slot_contents.data.invalidated;
if (SlotIsPhysical(&slot_contents))