aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/activity/pgstat_replslot.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/activity/pgstat_replslot.c')
-rw-r--r--src/backend/utils/activity/pgstat_replslot.c183
1 files changed, 143 insertions, 40 deletions
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ceefc5d59b3..b77c05ab5fa 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -8,6 +8,14 @@
* storage implementation and the details about individual types of
* statistics.
*
+ * Replication slot stats work a bit different than other other
+ * variable-numbered stats. Slots do not have oids (so they can be created on
+ * physical replicas). Use the slot index as object id while running. However,
+ * the slot index can change when restarting. That is addressed by using the
+ * name when (de-)serializing. After a restart it is possible for slots to
+ * have been dropped while shut down, which is addressed by not restoring
+ * stats for slots that cannot be found by name when starting up.
+ *
* Copyright (c) 2001-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
@@ -22,6 +30,9 @@
#include "utils/pgstat_internal.h"
+static int get_replslot_index(const char *name);
+
+
/*
* Reset counters for a single replication slot.
*
@@ -32,18 +43,10 @@ void
pgstat_reset_replslot(const char *name)
{
ReplicationSlot *slot;
- PgStat_MsgResetreplslotcounter msg;
AssertArg(name != NULL);
- if (pgStatSock == PGINVALID_SOCKET)
- return;
-
- /*
- * Check if the slot exists with the given name. It is possible that by
- * the time this message is executed the slot is dropped but at least this
- * check will ensure that the given name is for a valid slot.
- */
+ /* Check if the slot exits with the given name. */
slot = SearchNamedReplicationSlot(name, true);
if (!slot)
@@ -59,10 +62,9 @@ pgstat_reset_replslot(const char *name)
if (SlotIsPhysical(slot))
return;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
- namestrcpy(&msg.m_slotname, name);
- msg.clearall = false;
- pgstat_send(&msg, sizeof(msg));
+ /* reset this one entry */
+ pgstat_reset(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot));
}
/*
@@ -71,24 +73,34 @@ pgstat_reset_replslot(const char *name)
void
pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
{
- PgStat_MsgReplSlot msg;
+ PgStat_EntryRef *entry_ref;
+ PgStatShared_ReplSlot *shstatent;
+ PgStat_StatReplSlotEntry *statent;
+
+ entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot), false);
+ shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
+ statent = &shstatent->stats;
/*
- * Prepare and send the message
+ * Any mismatch should have been fixed in pgstat_create_replslot() or
+ * pgstat_acquire_replslot().
*/
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
- namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
- msg.m_create = false;
- msg.m_drop = false;
- msg.m_spill_txns = repSlotStat->spill_txns;
- msg.m_spill_count = repSlotStat->spill_count;
- msg.m_spill_bytes = repSlotStat->spill_bytes;
- msg.m_stream_txns = repSlotStat->stream_txns;
- msg.m_stream_count = repSlotStat->stream_count;
- msg.m_stream_bytes = repSlotStat->stream_bytes;
- msg.m_total_txns = repSlotStat->total_txns;
- msg.m_total_bytes = repSlotStat->total_bytes;
- pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+ Assert(namestrcmp(&statent->slotname, NameStr(slot->data.name)) == 0);
+
+ /* Update the replication slot statistics */
+#define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld
+ REPLSLOT_ACC(spill_txns);
+ REPLSLOT_ACC(spill_count);
+ REPLSLOT_ACC(spill_bytes);
+ REPLSLOT_ACC(stream_txns);
+ REPLSLOT_ACC(stream_count);
+ REPLSLOT_ACC(stream_bytes);
+ REPLSLOT_ACC(total_txns);
+ REPLSLOT_ACC(total_bytes);
+#undef REPLSLOT_ACC
+
+ pgstat_unlock_entry(entry_ref);
}
/*
@@ -100,13 +112,50 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
void
pgstat_create_replslot(ReplicationSlot *slot)
{
- PgStat_MsgReplSlot msg;
+ PgStat_EntryRef *entry_ref;
+ PgStatShared_ReplSlot *shstatent;
+
+ entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot), false);
+ shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
- namestrcpy(&msg.m_slotname, NameStr(slot->data.name));
- msg.m_create = true;
- msg.m_drop = false;
- pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+ /*
+ * NB: need to accept that there might be stats from an older slot, e.g.
+ * if we previously crashed after dropping a slot.
+ */
+ memset(&shstatent->stats, 0, sizeof(shstatent->stats));
+ namestrcpy(&shstatent->stats.slotname, NameStr(slot->data.name));
+
+ pgstat_unlock_entry(entry_ref);
+}
+
+/*
+ * Report replication slot has been acquired.
+ */
+void
+pgstat_acquire_replslot(ReplicationSlot *slot)
+{
+ PgStat_EntryRef *entry_ref;
+ PgStatShared_ReplSlot *shstatent;
+ PgStat_StatReplSlotEntry *statent;
+
+ entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot), false);
+ shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
+ statent = &shstatent->stats;
+
+ /*
+ * NB: need to accept that there might be stats from an older slot, e.g.
+ * if we previously crashed after dropping a slot.
+ */
+ if (NameStr(statent->slotname)[0] == 0 ||
+ namestrcmp(&statent->slotname, NameStr(slot->data.name)) != 0)
+ {
+ memset(statent, 0, sizeof(*statent));
+ namestrcpy(&statent->slotname, NameStr(slot->data.name));
+ }
+
+ pgstat_unlock_entry(entry_ref);
}
/*
@@ -115,11 +164,65 @@ pgstat_create_replslot(ReplicationSlot *slot)
void
pgstat_drop_replslot(ReplicationSlot *slot)
{
- PgStat_MsgReplSlot msg;
+ pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot));
+}
+
+/*
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the replication slot statistics struct.
+ */
+PgStat_StatReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
+{
+ int idx = get_replslot_index(NameStr(slotname));
+
+ if (idx == -1)
+ return NULL;
+
+ return (PgStat_StatReplSlotEntry *)
+ pgstat_fetch_entry(PGSTAT_KIND_REPLSLOT, InvalidOid, idx);
+}
+
+void
+pgstat_replslot_to_serialized_name_cb(const PgStatShared_Common *header, NameData *name)
+{
+ namestrcpy(name, NameStr(((PgStatShared_ReplSlot *) header)->stats.slotname));
+}
+
+bool
+pgstat_replslot_from_serialized_name_cb(const NameData *name, PgStat_HashKey *key)
+{
+ int idx = get_replslot_index(NameStr(*name));
+
+ /* slot might have been deleted */
+ if (idx == -1)
+ return false;
+
+ key->kind = PGSTAT_KIND_REPLSLOT;
+ key->dboid = InvalidOid;
+ key->objoid = idx;
+
+ return true;
+}
+
+void
+pgstat_replslot_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
+{
+ ((PgStatShared_ReplSlot *) header)->stats.stat_reset_timestamp = ts;
+}
+
+static int
+get_replslot_index(const char *name)
+{
+ ReplicationSlot *slot;
+
+ AssertArg(name != NULL);
+
+ slot = SearchNamedReplicationSlot(name, true);
+
+ if (!slot)
+ return -1;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
- namestrcpy(&msg.m_slotname, NameStr(slot->data.name));
- msg.m_create = false;
- msg.m_drop = true;
- pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+ return ReplicationSlotIndex(slot);
}