diff options
Diffstat (limited to 'src/backend/utils/activity/pgstat_replslot.c')
-rw-r--r-- | src/backend/utils/activity/pgstat_replslot.c | 183 |
1 files changed, 143 insertions, 40 deletions
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ceefc5d59b3..b77c05ab5fa 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -8,6 +8,14 @@ * storage implementation and the details about individual types of * statistics. * + * Replication slot stats work a bit different than other other + * variable-numbered stats. Slots do not have oids (so they can be created on + * physical replicas). Use the slot index as object id while running. However, + * the slot index can change when restarting. That is addressed by using the + * name when (de-)serializing. After a restart it is possible for slots to + * have been dropped while shut down, which is addressed by not restoring + * stats for slots that cannot be found by name when starting up. + * * Copyright (c) 2001-2022, PostgreSQL Global Development Group * * IDENTIFICATION @@ -22,6 +30,9 @@ #include "utils/pgstat_internal.h" +static int get_replslot_index(const char *name); + + /* * Reset counters for a single replication slot. * @@ -32,18 +43,10 @@ void pgstat_reset_replslot(const char *name) { ReplicationSlot *slot; - PgStat_MsgResetreplslotcounter msg; AssertArg(name != NULL); - if (pgStatSock == PGINVALID_SOCKET) - return; - - /* - * Check if the slot exists with the given name. It is possible that by - * the time this message is executed the slot is dropped but at least this - * check will ensure that the given name is for a valid slot. - */ + /* Check if the slot exits with the given name. */ slot = SearchNamedReplicationSlot(name, true); if (!slot) @@ -59,10 +62,9 @@ pgstat_reset_replslot(const char *name) if (SlotIsPhysical(slot)) return; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER); - namestrcpy(&msg.m_slotname, name); - msg.clearall = false; - pgstat_send(&msg, sizeof(msg)); + /* reset this one entry */ + pgstat_reset(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot)); } /* @@ -71,24 +73,34 @@ pgstat_reset_replslot(const char *name) void pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat) { - PgStat_MsgReplSlot msg; + PgStat_EntryRef *entry_ref; + PgStatShared_ReplSlot *shstatent; + PgStat_StatReplSlotEntry *statent; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot), false); + shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; + statent = &shstatent->stats; /* - * Prepare and send the message + * Any mismatch should have been fixed in pgstat_create_replslot() or + * pgstat_acquire_replslot(). */ - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname)); - msg.m_create = false; - msg.m_drop = false; - msg.m_spill_txns = repSlotStat->spill_txns; - msg.m_spill_count = repSlotStat->spill_count; - msg.m_spill_bytes = repSlotStat->spill_bytes; - msg.m_stream_txns = repSlotStat->stream_txns; - msg.m_stream_count = repSlotStat->stream_count; - msg.m_stream_bytes = repSlotStat->stream_bytes; - msg.m_total_txns = repSlotStat->total_txns; - msg.m_total_bytes = repSlotStat->total_bytes; - pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); + Assert(namestrcmp(&statent->slotname, NameStr(slot->data.name)) == 0); + + /* Update the replication slot statistics */ +#define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld + REPLSLOT_ACC(spill_txns); + REPLSLOT_ACC(spill_count); + REPLSLOT_ACC(spill_bytes); + REPLSLOT_ACC(stream_txns); + REPLSLOT_ACC(stream_count); + REPLSLOT_ACC(stream_bytes); + REPLSLOT_ACC(total_txns); + REPLSLOT_ACC(total_bytes); +#undef REPLSLOT_ACC + + pgstat_unlock_entry(entry_ref); } /* @@ -100,13 +112,50 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re void pgstat_create_replslot(ReplicationSlot *slot) { - PgStat_MsgReplSlot msg; + PgStat_EntryRef *entry_ref; + PgStatShared_ReplSlot *shstatent; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot), false); + shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - namestrcpy(&msg.m_slotname, NameStr(slot->data.name)); - msg.m_create = true; - msg.m_drop = false; - pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); + /* + * NB: need to accept that there might be stats from an older slot, e.g. + * if we previously crashed after dropping a slot. + */ + memset(&shstatent->stats, 0, sizeof(shstatent->stats)); + namestrcpy(&shstatent->stats.slotname, NameStr(slot->data.name)); + + pgstat_unlock_entry(entry_ref); +} + +/* + * Report replication slot has been acquired. + */ +void +pgstat_acquire_replslot(ReplicationSlot *slot) +{ + PgStat_EntryRef *entry_ref; + PgStatShared_ReplSlot *shstatent; + PgStat_StatReplSlotEntry *statent; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot), false); + shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; + statent = &shstatent->stats; + + /* + * NB: need to accept that there might be stats from an older slot, e.g. + * if we previously crashed after dropping a slot. + */ + if (NameStr(statent->slotname)[0] == 0 || + namestrcmp(&statent->slotname, NameStr(slot->data.name)) != 0) + { + memset(statent, 0, sizeof(*statent)); + namestrcpy(&statent->slotname, NameStr(slot->data.name)); + } + + pgstat_unlock_entry(entry_ref); } /* @@ -115,11 +164,65 @@ pgstat_create_replslot(ReplicationSlot *slot) void pgstat_drop_replslot(ReplicationSlot *slot) { - PgStat_MsgReplSlot msg; + pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot)); +} + +/* + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the replication slot statistics struct. + */ +PgStat_StatReplSlotEntry * +pgstat_fetch_replslot(NameData slotname) +{ + int idx = get_replslot_index(NameStr(slotname)); + + if (idx == -1) + return NULL; + + return (PgStat_StatReplSlotEntry *) + pgstat_fetch_entry(PGSTAT_KIND_REPLSLOT, InvalidOid, idx); +} + +void +pgstat_replslot_to_serialized_name_cb(const PgStatShared_Common *header, NameData *name) +{ + namestrcpy(name, NameStr(((PgStatShared_ReplSlot *) header)->stats.slotname)); +} + +bool +pgstat_replslot_from_serialized_name_cb(const NameData *name, PgStat_HashKey *key) +{ + int idx = get_replslot_index(NameStr(*name)); + + /* slot might have been deleted */ + if (idx == -1) + return false; + + key->kind = PGSTAT_KIND_REPLSLOT; + key->dboid = InvalidOid; + key->objoid = idx; + + return true; +} + +void +pgstat_replslot_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((PgStatShared_ReplSlot *) header)->stats.stat_reset_timestamp = ts; +} + +static int +get_replslot_index(const char *name) +{ + ReplicationSlot *slot; + + AssertArg(name != NULL); + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + return -1; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - namestrcpy(&msg.m_slotname, NameStr(slot->data.name)); - msg.m_create = false; - msg.m_drop = true; - pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); + return ReplicationSlotIndex(slot); } |