aboutsummaryrefslogtreecommitdiff
path: root/src/backend/postmaster/pgstat.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r--src/backend/postmaster/pgstat.c4513
1 files changed, 955 insertions, 3558 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 20c4629e55c..a9f3a7ef492 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1,100 +1,161 @@
/* ----------
* pgstat.c
+ * Infrastructure for the cumulative statistics system.
*
- * All the statistics collector stuff hacked up in one big, ugly file.
+ * The cumulative statistics system accumulates statistics for different kinds
+ * of objects. Some kinds of statistics are collected for a fixed number of
+ * objects (most commonly 1), e.g., checkpointer statistics. Other kinds of
+ * statistics are collected for a varying number of objects
+ * (e.g. relations). See PgStat_KindInfo for a list of currently handled
+ * statistics.
*
- * TODO: - Separate collector, postmaster and backend stuff
- * into different files.
+ * Statistics are loaded from the filesystem during startup (by the startup
+ * process), unless preceded by a crash, in which case all stats are
+ * discarded. They are written out by the checkpointer process just before
+ * shutting down, except when shutting down in immediate mode.
*
- * - Add some automatic call for pgstat vacuuming.
+ * Fixed-numbered stats are stored in plain (non-dynamic) shared memory.
*
- * - Add a pgstat config column to pg_database, so this
- * entire thing can be enabled/disabled on a per db basis.
+ * Statistics for variable-numbered objects are stored in dynamic shared
+ * memory and can be found via a dshash hashtable. The statistics counters are
+ * not part of the dshash entry (PgStatShared_HashEntry) directly, but are
+ * separately allocated (PgStatShared_HashEntry->body). The separate
+ * allocation allows different kinds of statistics to be stored in the same
+ * hashtable without wasting space in PgStatShared_HashEntry.
*
- * Copyright (c) 2001-2022, PostgreSQL Global Development Group
+ * Variable-numbered stats are addressed by PgStat_HashKey while running. It
+ * is not possible to have statistics for an object that cannot be addressed
+ * that way at runtime. A wider identifier can be used when serializing to
+ * disk (used for replication slot stats).
*
- * src/backend/postmaster/pgstat.c
+ * To avoid contention on the shared hashtable, each backend has a
+ * backend-local hashtable (pgStatEntryRefHash) in front of the shared
+ * hashtable, containing references (PgStat_EntryRef) to shared hashtable
+ * entries. The shared hashtable only needs to be accessed when no prior
+ * reference is found in the local hashtable. Besides pointing to the the
+ * shared hashtable entry (PgStatShared_HashEntry) PgStat_EntryRef also
+ * contains a pointer to the the shared statistics data, as a process-local
+ * address, to reduce access costs.
+ *
+ * The names for structs stored in shared memory are prefixed with
+ * PgStatShared instead of PgStat. Each stats entry in shared memory is
+ * protected by a dedicated lwlock.
+ *
+ * Most stats updates are first accumulated locally in each process as pending
+ * entries, then later flushed to shared memory (just after commit, or by
+ * idle-timeout). This practically eliminates contention on individual stats
+ * entries. For most kinds of variable-numbered pending stats data is stored
+ * in PgStat_EntryRef->pending. All entries with pending data are in the
+ * pgStatPending list. Pending statistics updates are flushed out by
+ * pgstat_report_stat().
+ *
+ * The behavior of different kinds of statistics is determined by the kind's
+ * entry in pgstat_kind_infos, see PgStat_KindInfo for details.
+ *
+ * The consistency of read accesses to statistics can be configured using the
+ * stats_fetch_consistency GUC (see config.sgml and monitoring.sgml for the
+ * settings). When using PGSTAT_FETCH_CONSISTENCY_CACHE or
+ * PGSTAT_FETCH_CONSISTENCY_SNAPSHOT statistics are stored in
+ * pgStatLocal.snapshot.
+ *
+ * To keep things manageable, stats handling is split across several
+ * files. Infrastructure pieces are in:
+ * - pgstat.c - this file, to tie it all together
+ * - pgstat_shmem.c - nearly everything dealing with shared memory, including
+ * the maintenance of hashtable entries
+ * - pgstat_xact.c - transactional integration, including the transactional
+ * creation and dropping of stats entries
+ *
+ * Each statistics kind is handled in a dedicated file:
+ * - pgstat_archiver.c
+ * - pgstat_bgwriter.c
+ * - pgstat_checkpointer.c
+ * - pgstat_database.c
+ * - pgstat_function.c
+ * - pgstat_relation.c
+ * - pgstat_slru.c
+ * - pgstat_subscription.c
+ * - pgstat_wal.c
+ *
+ * Whenever possible infrastructure files should not contain code related to
+ * specific kinds of stats.
+ *
+ *
+ * Copyright (c) 2001-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/pgstat.c
* ----------
*/
#include "postgres.h"
#include <unistd.h>
-#include <fcntl.h>
-#include <sys/param.h>
-#include <sys/time.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <signal.h>
-#include <time.h>
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-#include "access/heapam.h"
-#include "access/htup_details.h"
-#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
-#include "catalog/catalog.h"
-#include "catalog/pg_database.h"
-#include "catalog/pg_proc.h"
-#include "catalog/pg_subscription.h"
-#include "common/ip.h"
-#include "libpq/libpq.h"
-#include "libpq/pqsignal.h"
-#include "mb/pg_wchar.h"
-#include "miscadmin.h"
+#include "lib/dshash.h"
#include "pgstat.h"
-#include "postmaster/autovacuum.h"
-#include "postmaster/fork_process.h"
-#include "postmaster/interrupt.h"
-#include "postmaster/postmaster.h"
-#include "replication/slot.h"
-#include "replication/walsender.h"
-#include "storage/backendid.h"
-#include "storage/dsm.h"
+#include "port/atomics.h"
#include "storage/fd.h"
#include "storage/ipc.h"
-#include "storage/latch.h"
-#include "storage/lmgr.h"
+#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
-#include "storage/proc.h"
-#include "storage/procsignal.h"
-#include "utils/builtins.h"
+#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pgstat_internal.h"
-#include "utils/ps_status.h"
-#include "utils/rel.h"
-#include "utils/snapmgr.h"
#include "utils/timestamp.h"
/* ----------
* Timer definitions.
+ *
+ * In milliseconds.
* ----------
*/
-#define PGSTAT_RETRY_DELAY 10 /* How long to wait between checks for a
- * new file; in milliseconds. */
+/* minimum interval non-forced stats flushes.*/
+#define PGSTAT_MIN_INTERVAL 1000
+/* how long until to block flushing pending stats updates */
+#define PGSTAT_MAX_INTERVAL 60000
+/* when to call pgstat_report_stat() again, even when idle */
+#define PGSTAT_IDLE_INTERVAL 10000
-#define PGSTAT_MAX_WAIT_TIME 10000 /* Maximum time to wait for a stats
- * file update; in milliseconds. */
+/* ----------
+ * Initial size hints for the hash tables used in statistics.
+ * ----------
+ */
-#define PGSTAT_INQ_INTERVAL 640 /* How often to ping the collector for a
- * new file; in milliseconds. */
+#define PGSTAT_SNAPSHOT_HASH_SIZE 512
-#define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a
- * failed statistics collector; in
- * seconds. */
-#define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
-#define PGSTAT_INQ_LOOP_COUNT (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
+/* hash table for statistics snapshots entry */
+typedef struct PgStat_SnapshotEntry
+{
+ PgStat_HashKey key;
+ char status; /* for simplehash use */
+ void *data; /* the stats data itself */
+} PgStat_SnapshotEntry;
-/* Minimum receive buffer size for the collector's socket. */
-#define PGSTAT_MIN_RCVBUF (100 * 1024)
+
+/* ----------
+ * Backend-local Hash Table Definitions
+ * ----------
+ */
+
+/* for stats snapshot entries */
+#define SH_PREFIX pgstat_snapshot
+#define SH_ELEMENT_TYPE PgStat_SnapshotEntry
+#define SH_KEY_TYPE PgStat_HashKey
+#define SH_KEY key
+#define SH_HASH_KEY(tb, key) \
+ pgstat_hash_hash_key(&key, sizeof(PgStat_HashKey), NULL)
+#define SH_EQUAL(tb, a, b) \
+ pgstat_cmp_hash_key(&a, &b, sizeof(PgStat_HashKey), NULL) == 0
+#define SH_SCOPE static inline
+#define SH_DEFINE
+#define SH_DECLARE
+#include "lib/simplehash.h"
/* ----------
@@ -102,63 +163,18 @@
* ----------
*/
-#ifdef EXEC_BACKEND
-static pid_t pgstat_forkexec(void);
-#endif
+static void pgstat_write_statsfile(void);
+static void pgstat_read_statsfile(void);
-NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
-
-static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
-static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
- Oid tableoid, bool create);
-static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid, bool create);
-static void pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts);
-static void pgstat_write_statsfiles(bool permanent, bool allDbs);
-static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
-static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
-static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- bool permanent);
-static void backend_read_statsfile(void);
-
-static bool pgstat_write_statsfile_needed(void);
-static bool pgstat_db_requested(Oid databaseid);
-
-static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
-static void pgstat_reset_replslot_entry(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
-
-static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
-
-static void pgstat_setup_memcxt(void);
-
-static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
-static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
-static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
-static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
-static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
-static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
-static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
-static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
-static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
-static void pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len);
-static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
-static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
-static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
-static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
-static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
-static void pgstat_recv_checkpointer(PgStat_MsgCheckpointer *msg, int len);
-static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
-static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
-static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
-static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
-static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
-static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
-static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
-static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
-static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
-static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
-static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
-static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len);
-static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len);
+static void pgstat_reset_after_failure(TimestampTz ts);
+
+static bool pgstat_flush_pending_entries(bool nowait);
+
+static void pgstat_prep_snapshot(void);
+static void pgstat_build_snapshot(void);
+static void pgstat_build_snapshot_fixed(PgStat_Kind kind);
+
+static inline bool pgstat_is_kind_valid(int ikind);
/* ----------
@@ -167,6 +183,7 @@ static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int
*/
bool pgstat_track_counts = false;
+int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE;
/* ----------
@@ -184,44 +201,33 @@ char *pgstat_stat_tmpname = NULL;
* ----------
*/
-pgsocket pgStatSock = PGINVALID_SOCKET;
+PgStat_LocalState pgStatLocal;
/* ----------
* Local data
+ *
+ * NB: There should be only variables related to stats infrastructure here,
+ * not for specific kinds of stats.
* ----------
*/
-static struct sockaddr_storage pgStatAddr;
-
-static time_t last_pgstat_start_time;
-
-static bool pgStatRunningInCollector = false;
-
/*
- * Info about current "snapshot" of stats file
+ * Memory contexts containing the pgStatEntryRefHash table, the
+ * pgStatSharedRef entries, and pending data respectively. Mostly to make it
+ * easier to track / attribute memory usage.
*/
-static MemoryContext pgStatLocalContext = NULL;
-static HTAB *pgStatDBHash = NULL;
-/*
- * Cluster wide statistics, kept in the stats collector.
- * Contains statistics that are not collected per database
- * or per table.
- */
-static PgStat_ArchiverStats archiverStats;
-static PgStat_GlobalStats globalStats;
-static PgStat_WalStats walStats;
-static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static HTAB *replSlotStatHash = NULL;
-static HTAB *subscriptionStatHash = NULL;
+static MemoryContext pgStatPendingContext = NULL;
/*
- * List of OIDs of databases we need to write out. If an entry is InvalidOid,
- * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
- * will write both that DB's data and the shared stats.
+ * Backend local list of PgStat_EntryRef with unflushed pending stats.
+ *
+ * Newly pending entries should only ever be added to the end of the list,
+ * otherwise pgstat_flush_pending_entries() might not see them immediately.
*/
-static List *pending_write_requests = NIL;
+static dlist_head pgStatPending = DLIST_STATIC_INIT(pgStatPending);
+
/*
* For assertions that check pgstat is not used before initialization / after
@@ -233,455 +239,234 @@ static bool pgstat_is_shutdown = false;
#endif
-/* ------------------------------------------------------------
- * Public functions called from postmaster follow
- * ------------------------------------------------------------
- */
-
/*
- * Called from postmaster at startup. Create the resources required
- * by the statistics collector process. If unable to do so, do not
- * fail --- better to let the postmaster start with stats collection
- * disabled.
+ * The different kinds of statistics.
+ *
+ * If reasonably possible, handling specific to one kind of stats should go
+ * through this abstraction, rather than making more of pgstat.c aware.
+ *
+ * See comments for struct PgStat_KindInfo for details about the individual
+ * fields.
+ *
+ * XXX: It'd be nicer to define this outside of this file. But there doesn't
+ * seem to be a great way of doing that, given the split across multiple
+ * files.
*/
-void
-pgstat_init(void)
-{
- socklen_t alen;
- struct addrinfo *addrs = NULL,
- *addr,
- hints;
- int ret;
- fd_set rset;
- struct timeval tv;
- char test_byte;
- int sel_res;
- int tries = 0;
+static const PgStat_KindInfo pgstat_kind_infos[PGSTAT_NUM_KINDS] = {
-#define TESTBYTEVAL ((char) 199)
+ /* stats kinds for variable-numbered objects */
- /*
- * This static assertion verifies that we didn't mess up the calculations
- * involved in selecting maximum payload sizes for our UDP messages.
- * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
- * be silent performance loss from fragmentation, it seems worth having a
- * compile-time cross-check that we didn't.
- */
- StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
- "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
+ [PGSTAT_KIND_DATABASE] = {
+ .name = "database",
- /*
- * Create the UDP socket for sending and receiving statistic messages
- */
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_protocol = 0;
- hints.ai_addrlen = 0;
- hints.ai_addr = NULL;
- hints.ai_canonname = NULL;
- hints.ai_next = NULL;
- ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
- if (ret || !addrs)
- {
- ereport(LOG,
- (errmsg("could not resolve \"localhost\": %s",
- gai_strerror(ret))));
- goto startup_failed;
- }
+ .fixed_amount = false,
+ /* so pg_stat_database entries can be seen in all databases */
+ .accessed_across_databases = true,
- /*
- * On some platforms, pg_getaddrinfo_all() may return multiple addresses
- * only one of which will actually work (eg, both IPv6 and IPv4 addresses
- * when kernel will reject IPv6). Worse, the failure may occur at the
- * bind() or perhaps even connect() stage. So we must loop through the
- * results till we find a working combination. We will generate LOG
- * messages, but no error, for bogus combinations.
- */
- for (addr = addrs; addr; addr = addr->ai_next)
- {
-#ifdef HAVE_UNIX_SOCKETS
- /* Ignore AF_UNIX sockets, if any are returned. */
- if (addr->ai_family == AF_UNIX)
- continue;
-#endif
+ .shared_size = sizeof(PgStatShared_Database),
+ .shared_data_off = offsetof(PgStatShared_Database, stats),
+ .shared_data_len = sizeof(((PgStatShared_Database *) 0)->stats),
+ .pending_size = sizeof(PgStat_StatDBEntry),
- if (++tries > 1)
- ereport(LOG,
- (errmsg("trying another address for the statistics collector")));
+ .flush_pending_cb = pgstat_database_flush_cb,
+ .reset_timestamp_cb = pgstat_database_reset_timestamp_cb,
+ },
- /*
- * Create the socket.
- */
- if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not create socket for statistics collector: %m")));
- continue;
- }
+ [PGSTAT_KIND_RELATION] = {
+ .name = "relation",
- /*
- * Bind it to a kernel assigned port on localhost and get the assigned
- * port via getsockname().
- */
- if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not bind socket for statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .fixed_amount = false,
- alen = sizeof(pgStatAddr);
- if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not get address of socket for statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .shared_size = sizeof(PgStatShared_Relation),
+ .shared_data_off = offsetof(PgStatShared_Relation, stats),
+ .shared_data_len = sizeof(((PgStatShared_Relation *) 0)->stats),
+ .pending_size = sizeof(PgStat_TableStatus),
- /*
- * Connect the socket to its own address. This saves a few cycles by
- * not having to respecify the target address on every send. This also
- * provides a kernel-level check that only packets from this same
- * address will be received.
- */
- if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not connect socket for statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .flush_pending_cb = pgstat_relation_flush_cb,
+ .delete_pending_cb = pgstat_relation_delete_pending_cb,
+ },
- /*
- * Try to send and receive a one-byte test message on the socket. This
- * is to catch situations where the socket can be created but will not
- * actually pass data (for instance, because kernel packet filtering
- * rules prevent it).
- */
- test_byte = TESTBYTEVAL;
+ [PGSTAT_KIND_FUNCTION] = {
+ .name = "function",
-retry1:
- if (send(pgStatSock, &test_byte, 1, 0) != 1)
- {
- if (errno == EINTR)
- goto retry1; /* if interrupted, just retry */
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not send test message on socket for statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .fixed_amount = false,
- /*
- * There could possibly be a little delay before the message can be
- * received. We arbitrarily allow up to half a second before deciding
- * it's broken.
- */
- for (;;) /* need a loop to handle EINTR */
- {
- FD_ZERO(&rset);
- FD_SET(pgStatSock, &rset);
-
- tv.tv_sec = 0;
- tv.tv_usec = 500000;
- sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
- if (sel_res >= 0 || errno != EINTR)
- break;
- }
- if (sel_res < 0)
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("select() failed in statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
- if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
- {
- /*
- * This is the case we actually think is likely, so take pains to
- * give a specific message for it.
- *
- * errno will not be set meaningfully here, so don't use it.
- */
- ereport(LOG,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("test message did not get through on socket for statistics collector")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .shared_size = sizeof(PgStatShared_Function),
+ .shared_data_off = offsetof(PgStatShared_Function, stats),
+ .shared_data_len = sizeof(((PgStatShared_Function *) 0)->stats),
+ .pending_size = sizeof(PgStat_BackendFunctionEntry),
- test_byte++; /* just make sure variable is changed */
+ .flush_pending_cb = pgstat_function_flush_cb,
+ },
-retry2:
- if (recv(pgStatSock, &test_byte, 1, 0) != 1)
- {
- if (errno == EINTR)
- goto retry2; /* if interrupted, just retry */
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not receive test message on socket for statistics collector: %m")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ [PGSTAT_KIND_REPLSLOT] = {
+ .name = "replslot",
- if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
- {
- ereport(LOG,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("incorrect test message transmission on socket for statistics collector")));
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
- continue;
- }
+ .fixed_amount = false,
- /* If we get here, we have a working socket */
- break;
- }
+ .accessed_across_databases = true,
+ .named_on_disk = true,
- /* Did we find a working address? */
- if (!addr || pgStatSock == PGINVALID_SOCKET)
- goto startup_failed;
+ .shared_size = sizeof(PgStatShared_ReplSlot),
+ .shared_data_off = offsetof(PgStatShared_ReplSlot, stats),
+ .shared_data_len = sizeof(((PgStatShared_ReplSlot *) 0)->stats),
- /*
- * Set the socket to non-blocking IO. This ensures that if the collector
- * falls behind, statistics messages will be discarded; backends won't
- * block waiting to send messages to the collector.
- */
- if (!pg_set_noblock(pgStatSock))
- {
- ereport(LOG,
- (errcode_for_socket_access(),
- errmsg("could not set statistics collector socket to nonblocking mode: %m")));
- goto startup_failed;
- }
+ .reset_timestamp_cb = pgstat_replslot_reset_timestamp_cb,
+ .to_serialized_name = pgstat_replslot_to_serialized_name_cb,
+ .from_serialized_name = pgstat_replslot_from_serialized_name_cb,
+ },
- /*
- * Try to ensure that the socket's receive buffer is at least
- * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
- * data. Use of UDP protocol means that we are willing to lose data under
- * heavy load, but we don't want it to happen just because of ridiculously
- * small default buffer sizes (such as 8KB on older Windows versions).
- */
- {
- int old_rcvbuf;
- int new_rcvbuf;
- socklen_t rcvbufsize = sizeof(old_rcvbuf);
+ [PGSTAT_KIND_SUBSCRIPTION] = {
+ .name = "subscription",
- if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
- (char *) &old_rcvbuf, &rcvbufsize) < 0)
- {
- ereport(LOG,
- (errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF")));
- /* if we can't get existing size, always try to set it */
- old_rcvbuf = 0;
- }
+ .fixed_amount = false,
+ /* so pg_stat_subscription_stats entries can be seen in all databases */
+ .accessed_across_databases = true,
- new_rcvbuf = PGSTAT_MIN_RCVBUF;
- if (old_rcvbuf < new_rcvbuf)
- {
- if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
- (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
- ereport(LOG,
- (errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF")));
- }
- }
+ .shared_size = sizeof(PgStatShared_Subscription),
+ .shared_data_off = offsetof(PgStatShared_Subscription, stats),
+ .shared_data_len = sizeof(((PgStatShared_Subscription *) 0)->stats),
+ .pending_size = sizeof(PgStat_BackendSubEntry),
- pg_freeaddrinfo_all(hints.ai_family, addrs);
+ .flush_pending_cb = pgstat_subscription_flush_cb,
+ .reset_timestamp_cb = pgstat_subscription_reset_timestamp_cb,
+ },
- /* Now that we have a long-lived socket, tell fd.c about it. */
- ReserveExternalFD();
- return;
+ /* stats for fixed-numbered (mostly 1) objects */
-startup_failed:
- ereport(LOG,
- (errmsg("disabling statistics collector for lack of working socket")));
+ [PGSTAT_KIND_ARCHIVER] = {
+ .name = "archiver",
- if (addrs)
- pg_freeaddrinfo_all(hints.ai_family, addrs);
+ .fixed_amount = true,
- if (pgStatSock != PGINVALID_SOCKET)
- closesocket(pgStatSock);
- pgStatSock = PGINVALID_SOCKET;
+ .reset_all_cb = pgstat_archiver_reset_all_cb,
+ .snapshot_cb = pgstat_archiver_snapshot_cb,
+ },
- /*
- * Adjust GUC variables to suppress useless activity, and for debugging
- * purposes (seeing track_counts off is a clue that we failed here). We
- * use PGC_S_OVERRIDE because there is no point in trying to turn it back
- * on from postgresql.conf without a restart.
- */
- SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
-}
+ [PGSTAT_KIND_BGWRITER] = {
+ .name = "bgwriter",
-/*
- * subroutine for pgstat_reset_all
- */
-static void
-pgstat_reset_remove_files(const char *directory)
-{
- DIR *dir;
- struct dirent *entry;
- char fname[MAXPGPATH * 2];
+ .fixed_amount = true,
- dir = AllocateDir(directory);
- while ((entry = ReadDir(dir, directory)) != NULL)
- {
- int nchars;
- Oid tmp_oid;
+ .reset_all_cb = pgstat_bgwriter_reset_all_cb,
+ .snapshot_cb = pgstat_bgwriter_snapshot_cb,
+ },
- /*
- * Skip directory entries that don't match the file names we write.
- * See get_dbstat_filename for the database-specific pattern.
- */
- if (strncmp(entry->d_name, "global.", 7) == 0)
- nchars = 7;
- else
- {
- nchars = 0;
- (void) sscanf(entry->d_name, "db_%u.%n",
- &tmp_oid, &nchars);
- if (nchars <= 0)
- continue;
- /* %u allows leading whitespace, so reject that */
- if (strchr("0123456789", entry->d_name[3]) == NULL)
- continue;
- }
+ [PGSTAT_KIND_CHECKPOINTER] = {
+ .name = "checkpointer",
- if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
- strcmp(entry->d_name + nchars, "stat") != 0)
- continue;
+ .fixed_amount = true,
- snprintf(fname, sizeof(fname), "%s/%s", directory,
- entry->d_name);
- unlink(fname);
- }
- FreeDir(dir);
-}
+ .reset_all_cb = pgstat_checkpointer_reset_all_cb,
+ .snapshot_cb = pgstat_checkpointer_snapshot_cb,
+ },
+
+ [PGSTAT_KIND_SLRU] = {
+ .name = "slru",
+
+ .fixed_amount = true,
+
+ .reset_all_cb = pgstat_slru_reset_all_cb,
+ .snapshot_cb = pgstat_slru_snapshot_cb,
+ },
+
+ [PGSTAT_KIND_WAL] = {
+ .name = "wal",
+
+ .fixed_amount = true,
+
+ .reset_all_cb = pgstat_wal_reset_all_cb,
+ .snapshot_cb = pgstat_wal_snapshot_cb,
+ },
+};
+
+
+/* ------------------------------------------------------------
+ * Functions managing the state of the stats system for all backends.
+ * ------------------------------------------------------------
+ */
/*
- * Remove the stats files. This is currently used only if WAL
- * recovery is needed after a crash.
+ * Read on-disk stats into memory at server start.
+ *
+ * Should only be called by the startup process or in single user mode.
*/
void
-pgstat_reset_all(void)
+pgstat_restore_stats(void)
{
- pgstat_reset_remove_files(pgstat_stat_directory);
- pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
+ pgstat_read_statsfile();
}
-#ifdef EXEC_BACKEND
-
/*
- * Format up the arglist for, then fork and exec, statistics collector process
+ * Remove the stats file. This is currently used only if WAL recovery is
+ * needed after a crash.
+ *
+ * Should only be called by the startup process or in single user mode.
*/
-static pid_t
-pgstat_forkexec(void)
+void
+pgstat_discard_stats(void)
{
- char *av[10];
- int ac = 0;
-
- av[ac++] = "postgres";
- av[ac++] = "--forkcol";
- av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ int ret;
- av[ac] = NULL;
- Assert(ac < lengthof(av));
+ /* NB: this needs to be done even in single user mode */
- return postmaster_forkexec(ac, av);
+ ret = unlink(PGSTAT_STAT_PERMANENT_FILENAME);
+ if (ret != 0)
+ {
+ if (errno == ENOENT)
+ elog(DEBUG2,
+ "didn't need to unlink permanent stats file \"%s\" - didn't exist",
+ PGSTAT_STAT_PERMANENT_FILENAME);
+ else
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not unlink permanent statistics file \"%s\": %m",
+ PGSTAT_STAT_PERMANENT_FILENAME)));
+ }
+ else
+ {
+ ereport(DEBUG2,
+ (errcode_for_file_access(),
+ errmsg("unlinked permanent statistics file \"%s\"",
+ PGSTAT_STAT_PERMANENT_FILENAME)));
+ }
}
-#endif /* EXEC_BACKEND */
-
/*
- * Called from postmaster at startup or after an existing collector
- * died. Attempt to fire up a fresh statistics collector.
- *
- * Returns PID of child process, or 0 if fail.
+ * pgstat_before_server_shutdown() needs to be called by exactly one process
+ * during regular server shutdowns. Otherwise all stats will be lost.
*
- * Note: if fail, we will be called again from the postmaster main loop.
+ * We currently only write out stats for proc_exit(0). We might want to change
+ * that at some point... But right now pgstat_discard_stats() would be called
+ * during the start after a disorderly shutdown, anyway.
*/
-int
-pgstat_start(void)
+void
+pgstat_before_server_shutdown(int code, Datum arg)
{
- time_t curtime;
- pid_t pgStatPid;
+ Assert(pgStatLocal.shmem != NULL);
+ Assert(!pgStatLocal.shmem->is_shutdown);
/*
- * Check that the socket is there, else pgstat_init failed and we can do
- * nothing useful.
+ * Stats should only be reported after pgstat_initialize() and before
+ * pgstat_shutdown(). This is a convenient point to catch most violations
+ * of this rule.
*/
- if (pgStatSock == PGINVALID_SOCKET)
- return 0;
+ Assert(pgstat_is_initialized && !pgstat_is_shutdown);
- /*
- * Do nothing if too soon since last collector start. This is a safety
- * valve to protect against continuous respawn attempts if the collector
- * is dying immediately at launch. Note that since we will be re-called
- * from the postmaster main loop, we will get another chance later.
- */
- curtime = time(NULL);
- if ((unsigned int) (curtime - last_pgstat_start_time) <
- (unsigned int) PGSTAT_RESTART_INTERVAL)
- return 0;
- last_pgstat_start_time = curtime;
+ /* flush out our own pending changes before writing out */
+ pgstat_report_stat(true);
/*
- * Okay, fork off the collector.
+ * Only write out file during normal shutdown. Don't even signal that
+ * we've shutdown during irregular shutdowns, because the shutdown
+ * sequence isn't coordinated to ensure this backend shuts down last.
*/
-#ifdef EXEC_BACKEND
- switch ((pgStatPid = pgstat_forkexec()))
-#else
- switch ((pgStatPid = fork_process()))
-#endif
+ if (code == 0)
{
- case -1:
- ereport(LOG,
- (errmsg("could not fork statistics collector: %m")));
- return 0;
-
-#ifndef EXEC_BACKEND
- case 0:
- /* in postmaster child ... */
- InitPostmasterChild();
-
- /* Close the postmaster's sockets */
- ClosePostmasterPorts(false);
-
- /* Drop our connection to postmaster's shared memory, as well */
- dsm_detach_all();
- PGSharedMemoryDetach();
-
- PgstatCollectorMain(0, NULL);
- break;
-#endif
-
- default:
- return (int) pgStatPid;
+ pgStatLocal.shmem->is_shutdown = true;
+ pgstat_write_statsfile();
}
-
- /* shouldn't get here */
- return 0;
-}
-
-void
-allow_immediate_pgstat_restart(void)
-{
- last_pgstat_start_time = 0;
}
@@ -701,6 +486,7 @@ static void
pgstat_shutdown_hook(int code, Datum arg)
{
Assert(!pgstat_is_shutdown);
+ Assert(IsUnderPostmaster || !IsPostmasterEnvironment);
/*
* If we got as far as discovering our own database ID, we can flush out
@@ -709,7 +495,15 @@ pgstat_shutdown_hook(int code, Datum arg)
* failed backend starts might never get counted.)
*/
if (OidIsValid(MyDatabaseId))
- pgstat_report_stat(true);
+ pgstat_report_disconnect(MyDatabaseId);
+
+ pgstat_report_stat(true);
+
+ /* there shouldn't be any pending changes left */
+ Assert(dlist_is_empty(&pgStatPending));
+ dlist_init(&pgStatPending);
+
+ pgstat_detach_shmem();
#ifdef USE_ASSERT_CHECKING
pgstat_is_shutdown = true;
@@ -727,6 +521,8 @@ pgstat_initialize(void)
{
Assert(!pgstat_is_initialized);
+ pgstat_attach_shmem();
+
pgstat_init_wal();
/* Set up a process-exit hook to clean up */
@@ -745,331 +541,119 @@ pgstat_initialize(void)
/*
* Must be called by processes that performs DML: tcop/postgres.c, logical
- * receiver processes, SPI worker, etc. to send the so far collected
- * per-table and function usage statistics to the collector. Note that this
- * is called only when not within a transaction, so it is fair to use
- * transaction stop time as an approximation of current time.
+ * receiver processes, SPI worker, etc. to flush pending statistics updates to
+ * shared memory.
*
- * "disconnect" is "true" only for the last call before the backend
- * exits. This makes sure that no data is lost and that interrupted
- * sessions are reported correctly.
+ * Unless called with 'force', pending stats updates are flushed happen once
+ * per PGSTAT_MIN_INTERVAL (1000ms). When not forced, stats flushes do not
+ * block on lock acquisition, except if stats updates have been pending for
+ * longer than PGSTAT_MAX_INTERVAL (60000ms).
+ *
+ * Whenever pending stats updates remain at the end of pgstat_report_stat() a
+ * suggested idle timeout is returned. Currently this is always
+ * PGSTAT_IDLE_INTERVAL (10000ms). Callers can use the returned time to set up
+ * a timeout after which to call pgstat_report_stat(true), but are not
+ * required to to do so.
+ *
+ * Note that this is called only when not within a transaction, so it is fair
+ * to use transaction stop time as an approximation of current time.
*/
-void
-pgstat_report_stat(bool disconnect)
+long
+pgstat_report_stat(bool force)
{
- static TimestampTz last_report = 0;
-
+ static TimestampTz pending_since = 0;
+ static TimestampTz last_flush = 0;
+ bool partial_flush;
TimestampTz now;
+ bool nowait;
pgstat_assert_is_up();
+ Assert(!IsTransactionBlock());
- /*
- * Don't expend a clock check if nothing to do.
- */
- if (!have_relation_stats &&
- pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
- !pgstat_have_pending_wal() &&
- !have_function_stats && !disconnect)
- return;
-
- /*
- * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
- * msec since we last sent one, or the backend is about to exit.
- */
- now = GetCurrentTransactionStopTimestamp();
- if (!disconnect &&
- !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
- return;
-
- last_report = now;
-
- if (disconnect)
- pgstat_report_disconnect(MyDatabaseId);
-
- /* First, send relation statistics */
- pgstat_send_tabstats(now, disconnect);
-
- /* Now, send function statistics */
- pgstat_send_funcstats();
-
- /* Send WAL statistics */
- pgstat_report_wal(true);
-
- /* Finally send SLRU statistics */
- pgstat_send_slru();
-}
-
-/*
- * Will tell the collector about objects he can get rid of.
- */
-void
-pgstat_vacuum_stat(void)
-{
- HTAB *htab;
- PgStat_MsgTabpurge msg;
- PgStat_MsgFuncpurge f_msg;
- HASH_SEQ_STATUS hstat;
- PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
- PgStat_StatFuncEntry *funcentry;
- int len;
-
- if (pgStatSock == PGINVALID_SOCKET)
- return;
-
- /*
- * If not done for this transaction, read the statistics collector stats
- * file into some hash tables.
- */
- backend_read_statsfile();
-
- /*
- * Read pg_database and make a list of OIDs of all existing databases
- */
- htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid);
-
- /*
- * Search the database hash table for dead databases and tell the
- * collector to drop them.
- */
- hash_seq_init(&hstat, pgStatDBHash);
- while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
+ /* Don't expend a clock check if nothing to do */
+ if (dlist_is_empty(&pgStatPending) &&
+ !have_slrustats &&
+ !pgstat_have_pending_wal())
{
- Oid dbid = dbentry->databaseid;
-
- CHECK_FOR_INTERRUPTS();
-
- /* the DB entry for shared tables (with InvalidOid) is never dropped */
- if (OidIsValid(dbid) &&
- hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
- pgstat_drop_database(dbid);
+ Assert(pending_since == 0);
+ return 0;
}
- /* Clean up */
- hash_destroy(htab);
-
/*
- * Search for all the dead replication slots in stats hashtable and tell
- * the stats collector to drop them.
+ * There should never be stats to report once stats are shut down. Can't
+ * assert that before the checks above, as there is an unconditional
+ * pgstat_report_stat() call in pgstat_shutdown_hook() - which at least
+ * the process that ran pgstat_before_server_shutdown() will still call.
*/
- if (replSlotStatHash)
- {
- PgStat_StatReplSlotEntry *slotentry;
+ Assert(!pgStatLocal.shmem->is_shutdown);
- hash_seq_init(&hstat, replSlotStatHash);
- while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
- {
- CHECK_FOR_INTERRUPTS();
-
- if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
- {
- PgStat_MsgReplSlot msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
- namestrcpy(&msg.m_slotname, NameStr(slotentry->slotname));
- msg.m_create = false;
- msg.m_drop = true;
- pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
- }
- }
- }
+ now = GetCurrentTransactionStopTimestamp();
- /*
- * Repeat the above steps for subscriptions, if subscription stats are
- * being collected.
- */
- if (subscriptionStatHash)
+ if (!force)
{
- PgStat_StatSubEntry *subentry;
-
- /*
- * Read pg_subscription and make a list of OIDs of all existing
- * subscriptions.
- */
- htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
-
- hash_seq_init(&hstat, subscriptionStatHash);
- while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+ if (pending_since > 0 &&
+ TimestampDifferenceExceeds(pending_since, now, PGSTAT_MAX_INTERVAL))
{
- CHECK_FOR_INTERRUPTS();
-
- if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL)
- pgstat_drop_subscription(subentry->subid);
+ /* don't keep pending updates longer than PGSTAT_MAX_INTERVAL */
+ force = true;
}
+ else if (last_flush > 0 &&
+ !TimestampDifferenceExceeds(last_flush, now, PGSTAT_MIN_INTERVAL))
+ {
+ /* don't flush too frequently */
+ if (pending_since == 0)
+ pending_since = now;
- hash_destroy(htab);
+ return PGSTAT_IDLE_INTERVAL;
+ }
}
- /*
- * Lookup our own database entry; if not found, nothing more to do.
- */
- dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
- (void *) &MyDatabaseId,
- HASH_FIND, NULL);
- if (dbentry == NULL || dbentry->tables == NULL)
- return;
-
- /*
- * Similarly to above, make a list of all known relations in this DB.
- */
- htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid);
-
- /*
- * Initialize our messages table counter to zero
- */
- msg.m_nentries = 0;
+ pgstat_update_dbstats(now);
- /*
- * Check for all tables listed in stats hashtable if they still exist.
- */
- hash_seq_init(&hstat, dbentry->tables);
- while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
- {
- Oid tabid = tabentry->tableid;
+ /* don't wait for lock acquisition when !force */
+ nowait = !force;
- CHECK_FOR_INTERRUPTS();
+ partial_flush = false;
- if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
- continue;
+ /* flush database / relation / function / ... stats */
+ partial_flush |= pgstat_flush_pending_entries(nowait);
- /*
- * Not there, so add this table's Oid to the message
- */
- msg.m_tableid[msg.m_nentries++] = tabid;
+ /* flush wal stats */
+ partial_flush |= pgstat_flush_wal(nowait);
- /*
- * If the message is full, send it out and reinitialize to empty
- */
- if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
- {
- len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
- + msg.m_nentries * sizeof(Oid);
+ /* flush SLRU stats */
+ partial_flush |= pgstat_slru_flush(nowait);
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
- msg.m_databaseid = MyDatabaseId;
- pgstat_send(&msg, len);
-
- msg.m_nentries = 0;
- }
- }
+ last_flush = now;
/*
- * Send the rest
+ * If some of the pending stats could not be flushed due to lock
+ * contention, let the caller know when to retry.
*/
- if (msg.m_nentries > 0)
+ if (partial_flush)
{
- len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
- + msg.m_nentries * sizeof(Oid);
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
- msg.m_databaseid = MyDatabaseId;
- pgstat_send(&msg, len);
- }
-
- /* Clean up */
- hash_destroy(htab);
-
- /*
- * Now repeat the above steps for functions. However, we needn't bother
- * in the common case where no function stats are being collected.
- */
- if (dbentry->functions != NULL &&
- hash_get_num_entries(dbentry->functions) > 0)
- {
- htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid);
-
- pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
- f_msg.m_databaseid = MyDatabaseId;
- f_msg.m_nentries = 0;
-
- hash_seq_init(&hstat, dbentry->functions);
- while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
- {
- Oid funcid = funcentry->functionid;
-
- CHECK_FOR_INTERRUPTS();
-
- if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
- continue;
+ /* force should have prevented us from getting here */
+ Assert(!force);
- /*
- * Not there, so add this function's Oid to the message
- */
- f_msg.m_functionid[f_msg.m_nentries++] = funcid;
+ /* remember since when stats have been pending */
+ if (pending_since == 0)
+ pending_since = now;
- /*
- * If the message is full, send it out and reinitialize to empty
- */
- if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
- {
- len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
- + f_msg.m_nentries * sizeof(Oid);
-
- pgstat_send(&f_msg, len);
-
- f_msg.m_nentries = 0;
- }
- }
-
- /*
- * Send the rest
- */
- if (f_msg.m_nentries > 0)
- {
- len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
- + f_msg.m_nentries * sizeof(Oid);
+ return PGSTAT_IDLE_INTERVAL;
+ }
- pgstat_send(&f_msg, len);
- }
+ pending_since = 0;
- hash_destroy(htab);
- }
+ return 0;
}
/*
- * Collect the OIDs of all objects listed in the specified system catalog
- * into a temporary hash table. Caller should hash_destroy the result
- * when done with it. (However, we make the table in CurrentMemoryContext
- * so that it will be freed properly in event of an error.)
+ * Only for use by pgstat_reset_counters()
*/
-static HTAB *
-pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
+static bool
+match_db_entries(PgStatShared_HashEntry *entry, Datum match_data)
{
- HTAB *htab;
- HASHCTL hash_ctl;
- Relation rel;
- TableScanDesc scan;
- HeapTuple tup;
- Snapshot snapshot;
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(Oid);
- hash_ctl.hcxt = CurrentMemoryContext;
- htab = hash_create("Temporary table of OIDs",
- PGSTAT_TAB_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
- rel = table_open(catalogid, AccessShareLock);
- snapshot = RegisterSnapshot(GetLatestSnapshot());
- scan = table_beginscan(rel, snapshot, 0, NULL);
- while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- Oid thisoid;
- bool isnull;
-
- thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull);
- Assert(!isnull);
-
- CHECK_FOR_INTERRUPTS();
-
- (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
- }
- table_endscan(scan);
- UnregisterSnapshot(snapshot);
- table_close(rel, AccessShareLock);
-
- return htab;
+ return entry->key.dboid == DatumGetObjectId(MyDatabaseId);
}
/*
@@ -1081,14 +665,11 @@ pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
void
pgstat_reset_counters(void)
{
- PgStat_MsgResetcounter msg;
-
- if (pgStatSock == PGINVALID_SOCKET)
- return;
+ TimestampTz ts = GetCurrentTimestamp();
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
- msg.m_databaseid = MyDatabaseId;
- pgstat_send(&msg, sizeof(msg));
+ pgstat_reset_matching_entries(match_db_entries,
+ ObjectIdGetDatum(MyDatabaseId),
+ ts);
}
/*
@@ -1103,38 +684,17 @@ pgstat_reset_counters(void)
void
pgstat_reset(PgStat_Kind kind, Oid dboid, Oid objoid)
{
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+ TimestampTz ts = GetCurrentTimestamp();
- if (pgStatSock == PGINVALID_SOCKET)
- return;
+ /* not needed atm, and doesn't make sense with the current signature */
+ Assert(!pgstat_get_kind_info(kind)->fixed_amount);
- switch (kind)
- {
- case PGSTAT_KIND_FUNCTION:
- case PGSTAT_KIND_RELATION:
- {
- PgStat_MsgResetsinglecounter msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
- msg.m_databaseid = dboid;
- msg.m_resettype = kind;
- msg.m_objectid = objoid;
- pgstat_send(&msg, sizeof(msg));
- }
- break;
-
- case PGSTAT_KIND_SUBSCRIPTION:
- {
- PgStat_MsgResetsubcounter msg;
-
- Assert(dboid == InvalidOid);
- msg.m_subid = objoid;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER);
- }
- break;
-
- default:
- elog(ERROR, "unexpected");
- }
+ /* reset the "single counter" */
+ pgstat_reset_entry(kind, dboid, objoid, ts);
+
+ if (!kind_info->accessed_across_databases)
+ pgstat_reset_database_timestamp(dboid, ts);
}
/*
@@ -1146,87 +706,20 @@ pgstat_reset(PgStat_Kind kind, Oid dboid, Oid objoid)
void
pgstat_reset_of_kind(PgStat_Kind kind)
{
- if (pgStatSock == PGINVALID_SOCKET)
- return;
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+ TimestampTz ts = GetCurrentTimestamp();
- switch (kind)
- {
- case PGSTAT_KIND_ARCHIVER:
- case PGSTAT_KIND_BGWRITER:
- case PGSTAT_KIND_CHECKPOINTER:
- case PGSTAT_KIND_WAL:
- {
- PgStat_MsgResetsharedcounter msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
- msg.m_resettarget = kind;
- pgstat_send(&msg, sizeof(msg));
- }
- break;
- case PGSTAT_KIND_SLRU:
- {
- PgStat_MsgResetslrucounter msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSLRUCOUNTER);
- msg.m_index = -1;
- pgstat_send(&msg, sizeof(msg));
- }
- break;
- case PGSTAT_KIND_REPLSLOT:
- {
- PgStat_MsgResetreplslotcounter msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
- msg.clearall = true;
- pgstat_send(&msg, sizeof(msg));
- }
- break;
-
- case PGSTAT_KIND_SUBSCRIPTION:
- {
- PgStat_MsgResetsubcounter msg;
-
- msg.m_subid = InvalidOid;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER);
-
- pgstat_send(&msg, sizeof(msg));
- }
- break;
-
- default:
- elog(ERROR, "unexpected");
- }
+ if (kind_info->fixed_amount)
+ kind_info->reset_all_cb(ts);
+ else
+ pgstat_reset_entries_of_kind(kind, ts);
}
-/*
- * Send some junk data to the collector to increase traffic.
- */
-void
-pgstat_ping(void)
-{
- PgStat_MsgDummy msg;
- if (pgStatSock == PGINVALID_SOCKET)
- return;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
- pgstat_send(&msg, sizeof(msg));
-}
-
-/*
- * Notify collector that we need fresh data.
+/* ------------------------------------------------------------
+ * Fetching of stats
+ * ------------------------------------------------------------
*/
-static void
-pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
-{
- PgStat_MsgInquiry msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
- msg.clock_time = clock_time;
- msg.cutoff_time = cutoff_time;
- msg.databaseid = databaseid;
- pgstat_send(&msg, sizeof(msg));
-}
/*
* Discard any data collected in the current transaction. Any subsequent
@@ -1240,15 +733,19 @@ pgstat_clear_snapshot(void)
{
pgstat_assert_is_up();
+ memset(&pgStatLocal.snapshot.fixed_valid, 0,
+ sizeof(pgStatLocal.snapshot.fixed_valid));
+ pgStatLocal.snapshot.stats = NULL;
+ pgStatLocal.snapshot.mode = PGSTAT_FETCH_CONSISTENCY_NONE;
+
/* Release memory, if any was allocated */
- if (pgStatLocalContext)
- MemoryContextDelete(pgStatLocalContext);
+ if (pgStatLocal.snapshot.context)
+ {
+ MemoryContextDelete(pgStatLocal.snapshot.context);
- /* Reset variables */
- pgStatLocalContext = NULL;
- pgStatDBHash = NULL;
- replSlotStatHash = NULL;
- subscriptionStatHash = NULL;
+ /* Reset variables */
+ pgStatLocal.snapshot.context = NULL;
+ }
/*
* Historically the backend_status.c facilities lived in this file, and
@@ -1258,844 +755,448 @@ pgstat_clear_snapshot(void)
pgstat_clear_backend_activity_snapshot();
}
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * the collected statistics for one database or NULL. NULL doesn't mean
- * that the database doesn't exist, just that there are no statistics, so the
- * caller is better off to report ZERO instead.
- */
-PgStat_StatDBEntry *
-pgstat_fetch_stat_dbentry(Oid dbid)
+void *
+pgstat_fetch_entry(PgStat_Kind kind, Oid dboid, Oid objoid)
{
- /*
- * If not done for this transaction, read the statistics collector stats
- * file into some hash tables.
- */
- backend_read_statsfile();
+ PgStat_HashKey key;
+ PgStat_EntryRef *entry_ref;
+ void *stats_data;
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
- /*
- * Lookup the requested database; return NULL if not found
- */
- return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
- (void *) &dbid,
- HASH_FIND, NULL);
-}
+ /* should be called from backends */
+ Assert(IsUnderPostmaster || !IsPostmasterEnvironment);
+ AssertArg(!kind_info->fixed_amount);
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the global statistics struct.
- */
-PgStat_GlobalStats *
-pgstat_fetch_global(void)
-{
- backend_read_statsfile();
+ pgstat_prep_snapshot();
- return &globalStats;
-}
+ key.kind = kind;
+ key.dboid = dboid;
+ key.objoid = objoid;
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * the collected statistics for one table or NULL. NULL doesn't mean
- * that the table doesn't exist, just that there are no statistics, so the
- * caller is better off to report ZERO instead.
- */
-PgStat_StatTabEntry *
-pgstat_fetch_stat_tabentry(Oid relid)
-{
- Oid dbid;
- PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
+ /* if we need to build a full snapshot, do so */
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT)
+ pgstat_build_snapshot();
- /*
- * If not done for this transaction, read the statistics collector stats
- * file into some hash tables.
- */
- backend_read_statsfile();
-
- /*
- * Lookup our database, then look in its table hash table.
- */
- dbid = MyDatabaseId;
- dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
- (void *) &dbid,
- HASH_FIND, NULL);
- if (dbentry != NULL && dbentry->tables != NULL)
+ /* if caching is desired, look up in cache */
+ if (pgstat_fetch_consistency > PGSTAT_FETCH_CONSISTENCY_NONE)
{
- tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
- (void *) &relid,
- HASH_FIND, NULL);
- if (tabentry)
- return tabentry;
- }
+ PgStat_SnapshotEntry *entry = NULL;
- /*
- * If we didn't find it, maybe it's a shared table.
- */
- dbid = InvalidOid;
- dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
- (void *) &dbid,
- HASH_FIND, NULL);
- if (dbentry != NULL && dbentry->tables != NULL)
- {
- tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
- (void *) &relid,
- HASH_FIND, NULL);
- if (tabentry)
- return tabentry;
- }
+ entry = pgstat_snapshot_lookup(pgStatLocal.snapshot.stats, key);
- return NULL;
-}
+ if (entry)
+ return entry->data;
+ /*
+ * If we built a full snapshot and the key is not in
+ * pgStatLocal.snapshot.stats, there are no matching stats.
+ */
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT)
+ return NULL;
+ }
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * the collected statistics for one function or NULL.
- */
-PgStat_StatFuncEntry *
-pgstat_fetch_stat_funcentry(Oid func_id)
-{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatFuncEntry *funcentry = NULL;
+ pgStatLocal.snapshot.mode = pgstat_fetch_consistency;
- /* load the stats file if needed */
- backend_read_statsfile();
+ entry_ref = pgstat_get_entry_ref(kind, dboid, objoid, false, NULL);
- /* Lookup our database, then find the requested function. */
- dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
- if (dbentry != NULL && dbentry->functions != NULL)
+ if (entry_ref == NULL || entry_ref->shared_entry->dropped)
{
- funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
- (void *) &func_id,
- HASH_FIND, NULL);
- }
-
- return funcentry;
-}
-
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the archiver statistics struct.
- */
-PgStat_ArchiverStats *
-pgstat_fetch_stat_archiver(void)
-{
- backend_read_statsfile();
+ /* create empty entry when using PGSTAT_FETCH_CONSISTENCY_CACHE */
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_CACHE)
+ {
+ PgStat_SnapshotEntry *entry = NULL;
+ bool found;
- return &archiverStats;
-}
+ entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, key, &found);
+ Assert(!found);
+ entry->data = NULL;
+ }
+ return NULL;
+ }
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the bgwriter statistics struct.
- */
-PgStat_BgWriterStats *
-pgstat_fetch_stat_bgwriter(void)
-{
- backend_read_statsfile();
+ /*
+ * Allocate in caller's context for PGSTAT_FETCH_CONSISTENCY_NONE,
+ * otherwise we could quickly end up with a fair bit of memory used due to
+ * repeated accesses.
+ */
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE)
+ stats_data = palloc(kind_info->shared_data_len);
+ else
+ stats_data = MemoryContextAlloc(pgStatLocal.snapshot.context,
+ kind_info->shared_data_len);
+ memcpy(stats_data,
+ pgstat_get_entry_data(kind, entry_ref->shared_stats),
+ kind_info->shared_data_len);
- return &globalStats.bgwriter;
-}
+ if (pgstat_fetch_consistency > PGSTAT_FETCH_CONSISTENCY_NONE)
+ {
+ PgStat_SnapshotEntry *entry = NULL;
+ bool found;
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the checkpointer statistics struct.
- */
-PgStat_CheckpointerStats *
-pgstat_fetch_stat_checkpointer(void)
-{
- backend_read_statsfile();
+ entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, key, &found);
+ entry->data = stats_data;
+ }
- return &globalStats.checkpointer;
+ return stats_data;
}
/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the WAL statistics struct.
+ * If a stats snapshot has been taken, return the timestamp at which that was
+ * done, and set *have_snapshot to true. Otherwise *have_snapshot is set to
+ * false.
*/
-PgStat_WalStats *
-pgstat_fetch_stat_wal(void)
+TimestampTz
+pgstat_get_stat_snapshot_timestamp(bool *have_snapshot)
{
- backend_read_statsfile();
+ if (pgStatLocal.snapshot.mode == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT)
+ {
+ *have_snapshot = true;
+ return pgStatLocal.snapshot.snapshot_timestamp;
+ }
- return &walStats;
-}
+ *have_snapshot = false;
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the slru statistics struct.
- */
-PgStat_SLRUStats *
-pgstat_fetch_slru(void)
-{
- backend_read_statsfile();
-
- return slruStats;
+ return 0;
}
/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the replication slot statistics struct.
+ * Ensure snapshot for fixed-numbered 'kind' exists.
+ *
+ * Typically used by the pgstat_fetch_* functions for a kind of stats, before
+ * massaging the data into the desired format.
*/
-PgStat_StatReplSlotEntry *
-pgstat_fetch_replslot(NameData slotname)
+void
+pgstat_snapshot_fixed(PgStat_Kind kind)
{
- backend_read_statsfile();
-
- return pgstat_get_replslot_entry(slotname, false);
-}
+ AssertArg(pgstat_is_kind_valid(kind));
+ AssertArg(pgstat_get_kind_info(kind)->fixed_amount);
-/*
- * Support function for the SQL-callable pgstat* functions. Returns
- * the collected statistics for one subscription or NULL.
- */
-PgStat_StatSubEntry *
-pgstat_fetch_stat_subscription(Oid subid)
-{
- /* Load the stats file if needed */
- backend_read_statsfile();
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT)
+ pgstat_build_snapshot();
+ else
+ pgstat_build_snapshot_fixed(kind);
- return pgstat_get_subscription_entry(subid, false);
+ Assert(pgStatLocal.snapshot.fixed_valid[kind]);
}
-
-/* ------------------------------------------------------------
- * Helper / infrastructure functions
- * ------------------------------------------------------------
- */
-
-/*
- * Create pgStatLocalContext, if not already done.
- */
static void
-pgstat_setup_memcxt(void)
+pgstat_prep_snapshot(void)
{
- if (!pgStatLocalContext)
- pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
- "Statistics snapshot",
- ALLOCSET_SMALL_SIZES);
-}
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE ||
+ pgStatLocal.snapshot.stats != NULL)
+ return;
-/*
- * Stats should only be reported after pgstat_initialize() and before
- * pgstat_shutdown(). This check is put in a few central places to catch
- * violations of this rule more easily.
- */
-#ifdef USE_ASSERT_CHECKING
-void
-pgstat_assert_is_up(void)
-{
- Assert(pgstat_is_initialized && !pgstat_is_shutdown);
-}
-#endif
+ if (!pgStatLocal.snapshot.context)
+ pgStatLocal.snapshot.context = AllocSetContextCreate(TopMemoryContext,
+ "PgStat Snapshot",
+ ALLOCSET_SMALL_SIZES);
-/*
- * Set common header fields in a statistics message
- */
-void
-pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
-{
- hdr->m_type = mtype;
+ pgStatLocal.snapshot.stats =
+ pgstat_snapshot_create(pgStatLocal.snapshot.context,
+ PGSTAT_SNAPSHOT_HASH_SIZE,
+ NULL);
}
-
-/*
- * Send out one statistics message to the collector
- */
-void
-pgstat_send(void *msg, int len)
+static void
+pgstat_build_snapshot(void)
{
- int rc;
+ dshash_seq_status hstat;
+ PgStatShared_HashEntry *p;
- pgstat_assert_is_up();
+ /* should only be called when we need a snapshot */
+ Assert(pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT);
- if (pgStatSock == PGINVALID_SOCKET)
+ /* snapshot already built */
+ if (pgStatLocal.snapshot.mode == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT)
return;
- ((PgStat_MsgHdr *) msg)->m_size = len;
+ pgstat_prep_snapshot();
- /* We'll retry after EINTR, but ignore all other failures */
- do
- {
- rc = send(pgStatSock, msg, len, 0);
- } while (rc < 0 && errno == EINTR);
+ Assert(pgStatLocal.snapshot.stats->members == 0);
-#ifdef USE_ASSERT_CHECKING
- /* In debug builds, log send failures ... */
- if (rc < 0)
- elog(LOG, "could not send to statistics collector: %m");
-#endif
-}
-
-/*
- * Start up the statistics collector process. This is the body of the
- * postmaster child process.
- *
- * The argc/argv parameters are valid only in EXEC_BACKEND case.
- */
-NON_EXEC_STATIC void
-PgstatCollectorMain(int argc, char *argv[])
-{
- int len;
- PgStat_Msg msg;
- int wr;
- WaitEvent event;
- WaitEventSet *wes;
-
- /*
- * Ignore all signals usually bound to some action in the postmaster,
- * except SIGHUP and SIGQUIT. Note we don't need a SIGUSR1 handler to
- * support latch operations, because we only use a local latch.
- */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SIG_IGN);
- pqsignal(SIGTERM, SIG_IGN);
- pqsignal(SIGQUIT, SignalHandlerForShutdownRequest);
- pqsignal(SIGALRM, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN);
- pqsignal(SIGUSR2, SIG_IGN);
- /* Reset some signals that are accepted by postmaster but not here */
- pqsignal(SIGCHLD, SIG_DFL);
- PG_SETMASK(&UnBlockSig);
-
- MyBackendType = B_STATS_COLLECTOR;
- init_ps_display(NULL);
-
- /*
- * Read in existing stats files or initialize the stats to zero.
- */
- pgStatRunningInCollector = true;
- pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
-
- /* Prepare to wait for our latch or data in our socket. */
- wes = CreateWaitEventSet(CurrentMemoryContext, 3);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
- AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
+ pgStatLocal.snapshot.snapshot_timestamp = GetCurrentTimestamp();
/*
- * Loop to process messages until we get SIGQUIT or detect ungraceful
- * death of our parent postmaster.
- *
- * For performance reasons, we don't want to do ResetLatch/WaitLatch after
- * every message; instead, do that only after a recv() fails to obtain a
- * message. (This effectively means that if backends are sending us stuff
- * like mad, we won't notice postmaster death until things slack off a
- * bit; which seems fine.) To do that, we have an inner loop that
- * iterates as long as recv() succeeds. We do check ConfigReloadPending
- * inside the inner loop, which means that such interrupts will get
- * serviced but the latch won't get cleared until next time there is a
- * break in the action.
+ * Snapshot all variable stats.
*/
- for (;;)
+ dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+ while ((p = dshash_seq_next(&hstat)) != NULL)
{
- /* Clear any already-pending wakeups */
- ResetLatch(MyLatch);
-
- /*
- * Quit if we get SIGQUIT from the postmaster.
- */
- if (ShutdownRequestPending)
- break;
+ PgStat_Kind kind = p->key.kind;
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+ bool found;
+ PgStat_SnapshotEntry *entry;
+ PgStatShared_Common *stats_data;
/*
- * Inner loop iterates as long as we keep getting messages, or until
- * ShutdownRequestPending becomes set.
+ * Check if the stats object should be included in the snapshot.
+ * Unless the stats kind can be accessed from all databases (e.g.,
+ * database stats themselves), we only include stats for the current
+ * database or objects not associated with a database (e.g. shared
+ * relations).
*/
- while (!ShutdownRequestPending)
- {
- /*
- * Reload configuration if we got SIGHUP from the postmaster.
- */
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
-
- /*
- * Write the stats file(s) if a new request has arrived that is
- * not satisfied by existing file(s).
- */
- if (pgstat_write_statsfile_needed())
- pgstat_write_statsfiles(false, false);
-
- /*
- * Try to receive and process a message. This will not block,
- * since the socket is set to non-blocking mode.
- *
- * XXX On Windows, we have to force pgwin32_recv to cooperate,
- * despite the previous use of pg_set_noblock() on the socket.
- * This is extremely broken and should be fixed someday.
- */
-#ifdef WIN32
- pgwin32_noblock = 1;
-#endif
-
- len = recv(pgStatSock, (char *) &msg,
- sizeof(PgStat_Msg), 0);
-
-#ifdef WIN32
- pgwin32_noblock = 0;
-#endif
-
- if (len < 0)
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- break; /* out of inner loop */
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("could not read statistics message: %m")));
- }
-
- /*
- * We ignore messages that are smaller than our common header
- */
- if (len < sizeof(PgStat_MsgHdr))
- continue;
-
- /*
- * The received length must match the length in the header
- */
- if (msg.msg_hdr.m_size != len)
- continue;
-
- /*
- * O.K. - we accept this message. Process it.
- */
- switch (msg.msg_hdr.m_type)
- {
- case PGSTAT_MTYPE_DUMMY:
- break;
-
- case PGSTAT_MTYPE_INQUIRY:
- pgstat_recv_inquiry(&msg.msg_inquiry, len);
- break;
-
- case PGSTAT_MTYPE_TABSTAT:
- pgstat_recv_tabstat(&msg.msg_tabstat, len);
- break;
-
- case PGSTAT_MTYPE_TABPURGE:
- pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
- break;
-
- case PGSTAT_MTYPE_DROPDB:
- pgstat_recv_dropdb(&msg.msg_dropdb, len);
- break;
-
- case PGSTAT_MTYPE_RESETCOUNTER:
- pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
- break;
-
- case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
- pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSINGLECOUNTER:
- pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSLRUCOUNTER:
- pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
- pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSUBCOUNTER:
- pgstat_recv_resetsubcounter(&msg.msg_resetsubcounter, len);
- break;
-
- case PGSTAT_MTYPE_AUTOVAC_START:
- pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
- break;
+ if (p->key.dboid != MyDatabaseId &&
+ p->key.dboid != InvalidOid &&
+ !kind_info->accessed_across_databases)
+ continue;
- case PGSTAT_MTYPE_VACUUM:
- pgstat_recv_vacuum(&msg.msg_vacuum, len);
- break;
+ if (p->dropped)
+ continue;
- case PGSTAT_MTYPE_ANALYZE:
- pgstat_recv_analyze(&msg.msg_analyze, len);
- break;
+ Assert(pg_atomic_read_u32(&p->refcount) > 0);
- case PGSTAT_MTYPE_ARCHIVER:
- pgstat_recv_archiver(&msg.msg_archiver, len);
- break;
+ stats_data = dsa_get_address(pgStatLocal.dsa, p->body);
+ Assert(stats_data);
- case PGSTAT_MTYPE_BGWRITER:
- pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
- break;
+ entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, p->key, &found);
+ Assert(!found);
- case PGSTAT_MTYPE_CHECKPOINTER:
- pgstat_recv_checkpointer(&msg.msg_checkpointer, len);
- break;
+ entry->data = MemoryContextAlloc(pgStatLocal.snapshot.context,
+ kind_info->shared_size);
+ memcpy(entry->data,
+ pgstat_get_entry_data(kind, stats_data),
+ kind_info->shared_size);
+ }
+ dshash_seq_term(&hstat);
- case PGSTAT_MTYPE_WAL:
- pgstat_recv_wal(&msg.msg_wal, len);
- break;
+ /*
+ * Build snapshot of all fixed-numbered stats.
+ */
+ for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++)
+ {
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
- case PGSTAT_MTYPE_SLRU:
- pgstat_recv_slru(&msg.msg_slru, len);
- break;
+ if (!kind_info->fixed_amount)
+ {
+ Assert(kind_info->snapshot_cb == NULL);
+ continue;
+ }
- case PGSTAT_MTYPE_FUNCSTAT:
- pgstat_recv_funcstat(&msg.msg_funcstat, len);
- break;
+ pgstat_build_snapshot_fixed(kind);
+ }
- case PGSTAT_MTYPE_FUNCPURGE:
- pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
- break;
+ pgStatLocal.snapshot.mode = PGSTAT_FETCH_CONSISTENCY_SNAPSHOT;
+}
- case PGSTAT_MTYPE_RECOVERYCONFLICT:
- pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
- len);
- break;
+static void
+pgstat_build_snapshot_fixed(PgStat_Kind kind)
+{
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
- case PGSTAT_MTYPE_DEADLOCK:
- pgstat_recv_deadlock(&msg.msg_deadlock, len);
- break;
+ Assert(kind_info->fixed_amount);
+ Assert(kind_info->snapshot_cb != NULL);
- case PGSTAT_MTYPE_TEMPFILE:
- pgstat_recv_tempfile(&msg.msg_tempfile, len);
- break;
+ if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE)
+ {
+ /* rebuild every time */
+ pgStatLocal.snapshot.fixed_valid[kind] = false;
+ }
+ else if (pgStatLocal.snapshot.fixed_valid[kind])
+ {
+ /* in snapshot mode we shouldn't get called again */
+ Assert(pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_CACHE);
+ return;
+ }
- case PGSTAT_MTYPE_CHECKSUMFAILURE:
- pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
- len);
- break;
+ Assert(!pgStatLocal.snapshot.fixed_valid[kind]);
- case PGSTAT_MTYPE_REPLSLOT:
- pgstat_recv_replslot(&msg.msg_replslot, len);
- break;
+ kind_info->snapshot_cb();
- case PGSTAT_MTYPE_CONNECT:
- pgstat_recv_connect(&msg.msg_connect, len);
- break;
+ Assert(!pgStatLocal.snapshot.fixed_valid[kind]);
+ pgStatLocal.snapshot.fixed_valid[kind] = true;
+}
- case PGSTAT_MTYPE_DISCONNECT:
- pgstat_recv_disconnect(&msg.msg_disconnect, len);
- break;
- case PGSTAT_MTYPE_SUBSCRIPTIONDROP:
- pgstat_recv_subscription_drop(&msg.msg_subscriptiondrop, len);
- break;
+/* ------------------------------------------------------------
+ * Backend-local pending stats infrastructure
+ * ------------------------------------------------------------
+ */
- case PGSTAT_MTYPE_SUBSCRIPTIONERROR:
- pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len);
- break;
+/*
+ * Returns the appropriate PgStat_EntryRef, preparing it to receive pending
+ * stats if not already done.
+ *
+ * If created_entry is non-NULL, it'll be set to true if the entry is newly
+ * created, false otherwise.
+ */
+PgStat_EntryRef *
+pgstat_prep_pending_entry(PgStat_Kind kind, Oid dboid, Oid objoid, bool *created_entry)
+{
+ PgStat_EntryRef *entry_ref;
- default:
- break;
- }
- } /* end of inner message-processing loop */
+ /* need to be able to flush out */
+ Assert(pgstat_get_kind_info(kind)->flush_pending_cb != NULL);
- /* Sleep until there's something to do */
-#ifndef WIN32
- wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
-#else
+ if (unlikely(!pgStatPendingContext))
+ {
+ pgStatPendingContext =
+ AllocSetContextCreate(CacheMemoryContext,
+ "PgStat Pending",
+ ALLOCSET_SMALL_SIZES);
+ }
- /*
- * Windows, at least in its Windows Server 2003 R2 incarnation,
- * sometimes loses FD_READ events. Waking up and retrying the recv()
- * fixes that, so don't sleep indefinitely. This is a crock of the
- * first water, but until somebody wants to debug exactly what's
- * happening there, this is the best we can do. The two-second
- * timeout matches our pre-9.2 behavior, and needs to be short enough
- * to not provoke "using stale statistics" complaints from
- * backend_read_statsfile.
- */
- wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1,
- WAIT_EVENT_PGSTAT_MAIN);
-#endif
+ entry_ref = pgstat_get_entry_ref(kind, dboid, objoid,
+ true, created_entry);
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
- break;
- } /* end of outer loop */
+ if (entry_ref->pending == NULL)
+ {
+ size_t entrysize = pgstat_get_kind_info(kind)->pending_size;
- /*
- * Save the final stats to reuse at next startup.
- */
- pgstat_write_statsfiles(true, true);
+ Assert(entrysize != (size_t) -1);
- FreeWaitEventSet(wes);
+ entry_ref->pending = MemoryContextAllocZero(pgStatPendingContext, entrysize);
+ dlist_push_tail(&pgStatPending, &entry_ref->pending_node);
+ }
- exit(0);
+ return entry_ref;
}
/*
- * Subroutine to clear stats in a database entry
+ * Return an existing stats entry, or NULL.
*
- * Tables and functions hashes are initialized to empty.
+ * This should only be used for helper function for pgstatfuncs.c - outside of
+ * that it shouldn't be needed.
*/
-static void
-reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
+PgStat_EntryRef *
+pgstat_fetch_pending_entry(PgStat_Kind kind, Oid dboid, Oid objoid)
{
- HASHCTL hash_ctl;
-
- dbentry->n_xact_commit = 0;
- dbentry->n_xact_rollback = 0;
- dbentry->n_blocks_fetched = 0;
- dbentry->n_blocks_hit = 0;
- dbentry->n_tuples_returned = 0;
- dbentry->n_tuples_fetched = 0;
- dbentry->n_tuples_inserted = 0;
- dbentry->n_tuples_updated = 0;
- dbentry->n_tuples_deleted = 0;
- dbentry->last_autovac_time = 0;
- dbentry->n_conflict_tablespace = 0;
- dbentry->n_conflict_lock = 0;
- dbentry->n_conflict_snapshot = 0;
- dbentry->n_conflict_bufferpin = 0;
- dbentry->n_conflict_startup_deadlock = 0;
- dbentry->n_temp_files = 0;
- dbentry->n_temp_bytes = 0;
- dbentry->n_deadlocks = 0;
- dbentry->n_checksum_failures = 0;
- dbentry->last_checksum_failure = 0;
- dbentry->n_block_read_time = 0;
- dbentry->n_block_write_time = 0;
- dbentry->n_sessions = 0;
- dbentry->total_session_time = 0;
- dbentry->total_active_time = 0;
- dbentry->total_idle_in_xact_time = 0;
- dbentry->n_sessions_abandoned = 0;
- dbentry->n_sessions_fatal = 0;
- dbentry->n_sessions_killed = 0;
-
- dbentry->stat_reset_timestamp = GetCurrentTimestamp();
- dbentry->stats_timestamp = 0;
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
- dbentry->tables = hash_create("Per-database table",
- PGSTAT_TAB_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
- dbentry->functions = hash_create("Per-database function",
- PGSTAT_FUNCTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
-}
+ PgStat_EntryRef *entry_ref;
-/*
- * Lookup the hash table entry for the specified database. If no hash
- * table entry exists, initialize it, if the create parameter is true.
- * Else, return NULL.
- */
-static PgStat_StatDBEntry *
-pgstat_get_db_entry(Oid databaseid, bool create)
-{
- PgStat_StatDBEntry *result;
- bool found;
- HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
+ entry_ref = pgstat_get_entry_ref(kind, dboid, objoid, false, NULL);
- /* Lookup or create the hash table entry for this database */
- result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
- &databaseid,
- action, &found);
-
- if (!create && !found)
+ if (entry_ref == NULL || entry_ref->pending == NULL)
return NULL;
- /*
- * If not found, initialize the new one. This creates empty hash tables
- * for tables and functions, too.
- */
- if (!found)
- reset_dbentry_counters(result);
-
- return result;
+ return entry_ref;
}
-/*
- * Lookup the hash table entry for the specified table. If no hash
- * table entry exists, initialize it, if the create parameter is true.
- * Else, return NULL.
- */
-static PgStat_StatTabEntry *
-pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
+void
+pgstat_delete_pending_entry(PgStat_EntryRef *entry_ref)
{
- PgStat_StatTabEntry *result;
- bool found;
- HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
+ PgStat_Kind kind = entry_ref->shared_entry->key.kind;
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+ void *pending_data = entry_ref->pending;
- /* Lookup or create the hash table entry for this table */
- result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
- &tableoid,
- action, &found);
+ Assert(pending_data != NULL);
+ /* !fixed_amount stats should be handled explicitly */
+ Assert(!pgstat_get_kind_info(kind)->fixed_amount);
- if (!create && !found)
- return NULL;
+ if (kind_info->delete_pending_cb)
+ kind_info->delete_pending_cb(entry_ref);
- /* If not found, initialize the new one. */
- if (!found)
- {
- result->numscans = 0;
- result->tuples_returned = 0;
- result->tuples_fetched = 0;
- result->tuples_inserted = 0;
- result->tuples_updated = 0;
- result->tuples_deleted = 0;
- result->tuples_hot_updated = 0;
- result->n_live_tuples = 0;
- result->n_dead_tuples = 0;
- result->changes_since_analyze = 0;
- result->inserts_since_vacuum = 0;
- result->blocks_fetched = 0;
- result->blocks_hit = 0;
- result->vacuum_timestamp = 0;
- result->vacuum_count = 0;
- result->autovac_vacuum_timestamp = 0;
- result->autovac_vacuum_count = 0;
- result->analyze_timestamp = 0;
- result->analyze_count = 0;
- result->autovac_analyze_timestamp = 0;
- result->autovac_analyze_count = 0;
- }
+ pfree(pending_data);
+ entry_ref->pending = NULL;
- return result;
+ dlist_delete(&entry_ref->pending_node);
}
/*
- * Return the entry of replication slot stats with the given name. Return
- * NULL if not found and the caller didn't request to create it.
- *
- * create tells whether to create the new slot entry if it is not found.
+ * Flush out pending stats for database objects (databases, relations,
+ * functions).
*/
-static PgStat_StatReplSlotEntry *
-pgstat_get_replslot_entry(NameData name, bool create)
+static bool
+pgstat_flush_pending_entries(bool nowait)
{
- PgStat_StatReplSlotEntry *slotent;
- bool found;
+ bool have_pending = false;
+ dlist_node *cur = NULL;
+
+ /*
+ * Need to be a bit careful iterating over the list of pending entries.
+ * Processing a pending entry may queue further pending entries to the end
+ * of the list that we want to process, so a simple iteration won't do.
+ * Further complicating matters is that we want to delete the current
+ * entry in each iteration from the list if we flushed successfully.
+ *
+ * So we just keep track of the next pointer in each loop iteration.
+ */
+ if (!dlist_is_empty(&pgStatPending))
+ cur = dlist_head_node(&pgStatPending);
- if (replSlotStatHash == NULL)
+ while (cur)
{
- HASHCTL hash_ctl;
+ PgStat_EntryRef *entry_ref =
+ dlist_container(PgStat_EntryRef, pending_node, cur);
+ PgStat_HashKey key = entry_ref->shared_entry->key;
+ PgStat_Kind kind = key.kind;
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+ bool did_flush;
+ dlist_node *next;
- /*
- * Quick return NULL if the hash table is empty and the caller didn't
- * request to create the entry.
- */
- if (!create)
- return NULL;
+ Assert(!kind_info->fixed_amount);
+ Assert(kind_info->flush_pending_cb != NULL);
- hash_ctl.keysize = sizeof(NameData);
- hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
- replSlotStatHash = hash_create("Replication slots hash",
- PGSTAT_REPLSLOT_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
- }
+ /* flush the stats, if possible */
+ did_flush = kind_info->flush_pending_cb(entry_ref, nowait);
- slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
- (void *) &name,
- create ? HASH_ENTER : HASH_FIND,
- &found);
+ Assert(did_flush || nowait);
- if (!slotent)
- {
- /* not found */
- Assert(!create && !found);
- return NULL;
- }
+ /* determine next entry, before deleting the pending entry */
+ if (dlist_has_next(&pgStatPending, cur))
+ next = dlist_next_node(&pgStatPending, cur);
+ else
+ next = NULL;
- /* initialize the entry */
- if (create && !found)
- {
- namestrcpy(&(slotent->slotname), NameStr(name));
- pgstat_reset_replslot_entry(slotent, 0);
+ /* if successfully flushed, remove entry */
+ if (did_flush)
+ pgstat_delete_pending_entry(entry_ref);
+ else
+ have_pending = true;
+
+ cur = next;
}
- return slotent;
-}
+ Assert(dlist_is_empty(&pgStatPending) == !have_pending);
-/*
- * Reset the given replication slot stats.
- */
-static void
-pgstat_reset_replslot_entry(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
-{
- /* reset only counters. Don't clear slot name */
- slotent->spill_txns = 0;
- slotent->spill_count = 0;
- slotent->spill_bytes = 0;
- slotent->stream_txns = 0;
- slotent->stream_count = 0;
- slotent->stream_bytes = 0;
- slotent->total_txns = 0;
- slotent->total_bytes = 0;
- slotent->stat_reset_timestamp = ts;
+ return have_pending;
}
-/*
- * Return the subscription statistics entry with the given subscription OID.
- * If no subscription entry exists, initialize it, if the create parameter is
- * true. Else, return NULL.
+
+/* ------------------------------------------------------------
+ * Helper / infrastructure functions
+ * ------------------------------------------------------------
*/
-static PgStat_StatSubEntry *
-pgstat_get_subscription_entry(Oid subid, bool create)
-{
- PgStat_StatSubEntry *subentry;
- bool found;
- HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
- if (subscriptionStatHash == NULL)
+PgStat_Kind
+pgstat_get_kind_from_str(char *kind_str)
+{
+ for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++)
{
- HASHCTL hash_ctl;
-
- /*
- * Quick return NULL if the hash table is empty and the caller didn't
- * request to create the entry.
- */
- if (!create)
- return NULL;
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
- subscriptionStatHash = hash_create("Subscription hash",
- PGSTAT_SUBSCRIPTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
+ if (pg_strcasecmp(kind_str, pgstat_kind_infos[kind].name) == 0)
+ return kind;
}
- subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash,
- (void *) &subid,
- action, &found);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid statistics kind: \"%s\"", kind_str)));
+ return PGSTAT_KIND_DATABASE; /* avoid compiler warnings */
+}
- if (!create && !found)
- return NULL;
+static inline bool
+pgstat_is_kind_valid(int ikind)
+{
+ return ikind >= PGSTAT_KIND_FIRST_VALID && ikind <= PGSTAT_KIND_LAST;
+}
- /* If not found, initialize the new one */
- if (!found)
- pgstat_reset_subscription(subentry, 0);
+const PgStat_KindInfo *
+pgstat_get_kind_info(PgStat_Kind kind)
+{
+ AssertArg(pgstat_is_kind_valid(kind));
- return subentry;
+ return &pgstat_kind_infos[kind];
}
/*
- * Reset the given subscription stats.
+ * Stats should only be reported after pgstat_initialize() and before
+ * pgstat_shutdown(). This check is put in a few central places to catch
+ * violations of this rule more easily.
*/
-static void
-pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts)
+#ifdef USE_ASSERT_CHECKING
+void
+pgstat_assert_is_up(void)
{
- subentry->apply_error_count = 0;
- subentry->sync_error_count = 0;
- subentry->stat_reset_timestamp = ts;
+ Assert(pgstat_is_initialized && !pgstat_is_shutdown);
}
+#endif
/* ------------------------------------------------------------
@@ -2103,28 +1204,38 @@ pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts)
* ------------------------------------------------------------
*/
+/* helpers for pgstat_write_statsfile() */
+static void
+write_chunk(FILE *fpout, void *ptr, size_t len)
+{
+ int rc;
+
+ rc = fwrite(ptr, len, 1, fpout);
+
+ /* we'll check for errors with ferror once at the end */
+ (void) rc;
+}
+
+#define write_chunk_s(fpout, ptr) write_chunk(fpout, ptr, sizeof(*ptr))
+
/*
- * Write the global statistics file, as well as requested DB files.
- *
- * 'permanent' specifies writing to the permanent files not temporary ones.
- * When true (happens only when the collector is shutting down), also remove
- * the temporary files so that backends starting up under a new postmaster
- * can't read old data before the new collector is ready.
- *
- * When 'allDbs' is false, only the requested databases (listed in
- * pending_write_requests) will be written; otherwise, all databases
- * will be written.
+ * This function is called in the last process that is accessing the shared
+ * stats so locking is not required.
*/
static void
-pgstat_write_statsfiles(bool permanent, bool allDbs)
+pgstat_write_statsfile(void)
{
- HASH_SEQ_STATUS hstat;
- PgStat_StatDBEntry *dbentry;
FILE *fpout;
int32 format_id;
- const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
- const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
- int rc;
+ const char *tmpfile = PGSTAT_STAT_PERMANENT_TMPFILE;
+ const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME;
+ dshash_seq_status hstat;
+ PgStatShared_HashEntry *ps;
+
+ pgstat_assert_is_up();
+
+ /* we're shutting down, so it's ok to just override this */
+ pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -2142,237 +1253,99 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
}
/*
- * Set the timestamp of the stats file.
- */
- globalStats.stats_timestamp = GetCurrentTimestamp();
-
- /*
* Write the file header --- currently just a format ID.
*/
format_id = PGSTAT_FILE_FORMAT_ID;
- rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ write_chunk_s(fpout, &format_id);
/*
- * Write global stats struct
+ * XXX: The following could now be generalized to just iterate over
+ * pgstat_kind_infos instead of knowing about the different kinds of
+ * stats.
*/
- rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
/*
* Write archiver stats struct
*/
- rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ pgstat_build_snapshot_fixed(PGSTAT_KIND_ARCHIVER);
+ write_chunk_s(fpout, &pgStatLocal.snapshot.archiver);
/*
- * Write WAL stats struct
+ * Write bgwriter stats struct
*/
- rc = fwrite(&walStats, sizeof(walStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ pgstat_build_snapshot_fixed(PGSTAT_KIND_BGWRITER);
+ write_chunk_s(fpout, &pgStatLocal.snapshot.bgwriter);
/*
- * Write SLRU stats struct
+ * Write checkpointer stats struct
*/
- rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ pgstat_build_snapshot_fixed(PGSTAT_KIND_CHECKPOINTER);
+ write_chunk_s(fpout, &pgStatLocal.snapshot.checkpointer);
/*
- * Walk through the database table.
+ * Write SLRU stats struct
*/
- hash_seq_init(&hstat, pgStatDBHash);
- while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
- {
- /*
- * Write out the table and function stats for this DB into the
- * appropriate per-DB stat file, if required.
- */
- if (allDbs || pgstat_db_requested(dbentry->databaseid))
- {
- /* Make DB's timestamp consistent with the global stats */
- dbentry->stats_timestamp = globalStats.stats_timestamp;
-
- pgstat_write_db_statsfile(dbentry, permanent);
- }
-
- /*
- * Write out the DB entry. We don't write the tables or functions
- * pointers, since they're of no use to any other process.
- */
- fputc('D', fpout);
- rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
+ pgstat_build_snapshot_fixed(PGSTAT_KIND_SLRU);
+ write_chunk_s(fpout, &pgStatLocal.snapshot.slru);
/*
- * Write replication slot stats struct
+ * Write WAL stats struct
*/
- if (replSlotStatHash)
- {
- PgStat_StatReplSlotEntry *slotent;
-
- hash_seq_init(&hstat, replSlotStatHash);
- while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
- {
- fputc('R', fpout);
- rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
- }
+ pgstat_build_snapshot_fixed(PGSTAT_KIND_WAL);
+ write_chunk_s(fpout, &pgStatLocal.snapshot.wal);
/*
- * Write subscription stats struct
+ * Walk through the stats entries
*/
- if (subscriptionStatHash)
+ dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
+ while ((ps = dshash_seq_next(&hstat)) != NULL)
{
- PgStat_StatSubEntry *subentry;
-
- hash_seq_init(&hstat, subscriptionStatHash);
- while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
- {
- fputc('S', fpout);
- rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
- }
+ PgStatShared_Common *shstats;
+ const PgStat_KindInfo *kind_info = NULL;
- /*
- * No more output to be done. Close the temp file and replace the old
- * pgstat.stat with it. The ferror() check replaces testing for error
- * after each individual fputc or fwrite above.
- */
- fputc('E', fpout);
-
- if (ferror(fpout))
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write temporary statistics file \"%s\": %m",
- tmpfile)));
- FreeFile(fpout);
- unlink(tmpfile);
- }
- else if (FreeFile(fpout) < 0)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not close temporary statistics file \"%s\": %m",
- tmpfile)));
- unlink(tmpfile);
- }
- else if (rename(tmpfile, statfile) < 0)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
- tmpfile, statfile)));
- unlink(tmpfile);
- }
-
- if (permanent)
- unlink(pgstat_stat_filename);
+ CHECK_FOR_INTERRUPTS();
- /*
- * Now throw away the list of requests. Note that requests sent after we
- * started the write are still waiting on the network socket.
- */
- list_free(pending_write_requests);
- pending_write_requests = NIL;
-}
+ /* we may have some "dropped" entries not yet removed, skip them */
+ Assert(!ps->dropped);
+ if (ps->dropped)
+ continue;
-/*
- * return the filename for a DB stat file; filename is the output buffer,
- * of length len.
- */
-static void
-get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
- char *filename, int len)
-{
- int printed;
-
- /* NB -- pgstat_reset_remove_files knows about the pattern this uses */
- printed = snprintf(filename, len, "%s/db_%u.%s",
- permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
- pgstat_stat_directory,
- databaseid,
- tempname ? "tmp" : "stat");
- if (printed >= len)
- elog(ERROR, "overlength pgstat path");
-}
+ shstats = (PgStatShared_Common *) dsa_get_address(pgStatLocal.dsa, ps->body);
-/*
- * Write the stat file for a single database.
- *
- * If writing to the permanent file (happens when the collector is
- * shutting down only), remove the temporary file so that backends
- * starting up under a new postmaster can't read the old data before
- * the new collector is ready.
- */
-static void
-pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
-{
- HASH_SEQ_STATUS tstat;
- HASH_SEQ_STATUS fstat;
- PgStat_StatTabEntry *tabentry;
- PgStat_StatFuncEntry *funcentry;
- FILE *fpout;
- int32 format_id;
- Oid dbid = dbentry->databaseid;
- int rc;
- char tmpfile[MAXPGPATH];
- char statfile[MAXPGPATH];
+ kind_info = pgstat_get_kind_info(ps->key.kind);
- get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
- get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
+ /* if not dropped the valid-entry refcount should exist */
+ Assert(pg_atomic_read_u32(&ps->refcount) > 0);
- elog(DEBUG2, "writing stats file \"%s\"", statfile);
-
- /*
- * Open the statistics temp file to write out the current values.
- */
- fpout = AllocateFile(tmpfile, PG_BINARY_W);
- if (fpout == NULL)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not open temporary statistics file \"%s\": %m",
- tmpfile)));
- return;
- }
+ if (!kind_info->to_serialized_name)
+ {
+ /* normal stats entry, identified by PgStat_HashKey */
+ fputc('S', fpout);
+ write_chunk_s(fpout, &ps->key);
+ }
+ else
+ {
+ /* stats entry identified by name on disk (e.g. slots) */
+ NameData name;
- /*
- * Write the file header --- currently just a format ID.
- */
- format_id = PGSTAT_FILE_FORMAT_ID;
- rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ kind_info->to_serialized_name(shstats, &name);
- /*
- * Walk through the database's access stats per table.
- */
- hash_seq_init(&tstat, dbentry->tables);
- while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
- {
- fputc('T', fpout);
- rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
+ fputc('N', fpout);
+ write_chunk_s(fpout, &ps->key.kind);
+ write_chunk_s(fpout, &name);
+ }
- /*
- * Walk through the database's function stats table.
- */
- hash_seq_init(&fstat, dbentry->functions);
- while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
- {
- fputc('F', fpout);
- rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ /* Write except the header part of the entry */
+ write_chunk(fpout,
+ pgstat_get_entry_data(ps->key.kind, shstats),
+ pgstat_get_entry_len(ps->key.kind));
}
+ dshash_seq_term(&hstat);
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
- * after each individual fputc or fwrite above.
+ * after each individual fputc or fwrite (in write_chunk()) above.
*/
fputc('E', fpout);
@@ -2401,1806 +1374,230 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
tmpfile, statfile)));
unlink(tmpfile);
}
+}
- if (permanent)
- {
- get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
-
- elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
- unlink(statfile);
- }
+/* helpers for pgstat_read_statsfile() */
+static bool
+read_chunk(FILE *fpin, void *ptr, size_t len)
+{
+ return fread(ptr, 1, len, fpin) == len;
}
+#define read_chunk_s(fpin, ptr) read_chunk(fpin, ptr, sizeof(*ptr))
+
/*
- * Reads in some existing statistics collector files and returns the
- * databases hash table that is the top level of the data.
- *
- * If 'onlydb' is not InvalidOid, it means we only want data for that DB
- * plus the shared catalogs ("DB 0"). We'll still populate the DB hash
- * table for all databases, but we don't bother even creating table/function
- * hash tables for other databases.
+ * Reads in existing statistics file into the shared stats hash.
*
- * 'permanent' specifies reading from the permanent files not temporary ones.
- * When true (happens only when the collector is starting up), remove the
- * files after reading; the in-memory status is now authoritative, and the
- * files would be out of date in case somebody else reads them.
- *
- * If a 'deep' read is requested, table/function stats are read, otherwise
- * the table/function hash tables remain empty.
+ * This function is called in the only process that is accessing the shared
+ * stats so locking is not required.
*/
-static HTAB *
-pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
+static void
+pgstat_read_statsfile(void)
{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatDBEntry dbbuf;
- HASHCTL hash_ctl;
- HTAB *dbhash;
FILE *fpin;
int32 format_id;
bool found;
- const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
- int i;
- TimestampTz ts;
-
- /*
- * The tables will live in pgStatLocalContext.
- */
- pgstat_setup_memcxt();
-
- /*
- * Create the DB hashtable
- */
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME;
+ PgStat_ShmemControl *shmem = pgStatLocal.shmem;
+ TimestampTz ts = GetCurrentTimestamp();
- /*
- * Clear out global, archiver, WAL and SLRU statistics so they start from
- * zero in case we can't load an existing statsfile.
- */
- memset(&globalStats, 0, sizeof(globalStats));
- memset(&archiverStats, 0, sizeof(archiverStats));
- memset(&walStats, 0, sizeof(walStats));
- memset(&slruStats, 0, sizeof(slruStats));
+ /* shouldn't be called from postmaster */
+ Assert(IsUnderPostmaster || !IsPostmasterEnvironment);
- /*
- * Set the current timestamp (will be kept only in case we can't load an
- * existing statsfile).
- */
- ts = GetCurrentTimestamp();
- globalStats.bgwriter.stat_reset_timestamp = ts;
- archiverStats.stat_reset_timestamp = ts;
- walStats.stat_reset_timestamp = ts;
-
- /*
- * Set the same reset timestamp for all SLRU items too.
- */
- for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
- slruStats[i].stat_reset_timestamp = ts;
+ elog(DEBUG2, "reading stats file \"%s\"", statfile);
/*
* Try to open the stats file. If it doesn't exist, the backends simply
- * return zero for anything and the collector simply starts from scratch
+ * returns zero for anything and statistics simply starts from scratch
* with empty counters.
*
- * ENOENT is a possibility if the stats collector is not running or has
- * not yet written the stats file the first time. Any other failure
- * condition is suspicious.
+ * ENOENT is a possibility if stats collection was previously disabled or
+ * has not yet written the stats file for the first time. Any other
+ * failure condition is suspicious.
*/
if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
{
if (errno != ENOENT)
- ereport(pgStatRunningInCollector ? LOG : WARNING,
+ ereport(LOG,
(errcode_for_file_access(),
errmsg("could not open statistics file \"%s\": %m",
statfile)));
- return dbhash;
+ pgstat_reset_after_failure(ts);
+ return;
}
/*
* Verify it's of the expected format.
*/
- if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
+ if (!read_chunk_s(fpin, &format_id) ||
format_id != PGSTAT_FILE_FORMAT_ID)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- goto done;
- }
+ goto error;
/*
- * Read global stats struct
+ * XXX: The following could now be generalized to just iterate over
+ * pgstat_kind_infos instead of knowing about the different kinds of
+ * stats.
*/
- if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- memset(&globalStats, 0, sizeof(globalStats));
- goto done;
- }
/*
- * In the collector, disregard the timestamp we read from the permanent
- * stats file; we should be willing to write a temp stats file immediately
- * upon the first request from any backend. This only matters if the old
- * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
- * an unusual scenario.
+ * Read archiver stats struct
*/
- if (pgStatRunningInCollector)
- globalStats.stats_timestamp = 0;
+ if (!read_chunk_s(fpin, &shmem->archiver.stats))
+ goto error;
/*
- * Read archiver stats struct
+ * Read bgwriter stats struct
*/
- if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- memset(&archiverStats, 0, sizeof(archiverStats));
- goto done;
- }
+ if (!read_chunk_s(fpin, &shmem->bgwriter.stats))
+ goto error;
/*
- * Read WAL stats struct
+ * Read checkpointer stats struct
*/
- if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- memset(&walStats, 0, sizeof(walStats));
- goto done;
- }
+ if (!read_chunk_s(fpin, &shmem->checkpointer.stats))
+ goto error;
/*
* Read SLRU stats struct
*/
- if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- memset(&slruStats, 0, sizeof(slruStats));
- goto done;
- }
+ if (!read_chunk_s(fpin, &shmem->slru.stats))
+ goto error;
+
+ /*
+ * Read WAL stats struct
+ */
+ if (!read_chunk_s(fpin, &shmem->wal.stats))
+ goto error;
/*
- * We found an existing collector stats file. Read it and put all the
- * hashtable entries into place.
+ * We found an existing statistics file. Read it and put all the hash
+ * table entries into place.
*/
for (;;)
{
- switch (fgetc(fpin))
- {
- /*
- * 'D' A PgStat_StatDBEntry struct describing a database
- * follows.
- */
- case 'D':
- if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
- fpin) != offsetof(PgStat_StatDBEntry, tables))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- /*
- * Add to the DB hash
- */
- dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
- (void *) &dbbuf.databaseid,
- HASH_ENTER,
- &found);
- if (found)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
+ char t = fgetc(fpin);
- memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
- dbentry->tables = NULL;
- dbentry->functions = NULL;
-
- /*
- * In the collector, disregard the timestamp we read from the
- * permanent stats file; we should be willing to write a temp
- * stats file immediately upon the first request from any
- * backend.
- */
- if (pgStatRunningInCollector)
- dbentry->stats_timestamp = 0;
-
- /*
- * Don't create tables/functions hashtables for uninteresting
- * databases.
- */
- if (onlydb != InvalidOid)
+ switch (t)
+ {
+ case 'S':
+ case 'N':
{
- if (dbbuf.databaseid != onlydb &&
- dbbuf.databaseid != InvalidOid)
- break;
- }
+ PgStat_HashKey key;
+ PgStatShared_HashEntry *p;
+ PgStatShared_Common *header;
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- dbentry->tables = hash_create("Per-database table",
- PGSTAT_TAB_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- dbentry->functions = hash_create("Per-database function",
- PGSTAT_FUNCTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
- /*
- * If requested, read the data from the database-specific
- * file. Otherwise we just leave the hashtables empty.
- */
- if (deep)
- pgstat_read_db_statsfile(dbentry->databaseid,
- dbentry->tables,
- dbentry->functions,
- permanent);
-
- break;
-
- /*
- * 'R' A PgStat_StatReplSlotEntry struct describing a
- * replication slot follows.
- */
- case 'R':
- {
- PgStat_StatReplSlotEntry slotbuf;
- PgStat_StatReplSlotEntry *slotent;
+ CHECK_FOR_INTERRUPTS();
- if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
- != sizeof(PgStat_StatReplSlotEntry))
+ if (t == 'S')
{
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
+ /* normal stats entry, identified by PgStat_HashKey */
+ if (!read_chunk_s(fpin, &key))
+ goto error;
- /* Create hash table if we don't have it already. */
- if (replSlotStatHash == NULL)
- {
- HASHCTL hash_ctl;
-
- hash_ctl.keysize = sizeof(NameData);
- hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- replSlotStatHash = hash_create("Replication slots hash",
- PGSTAT_REPLSLOT_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ if (!pgstat_is_kind_valid(key.kind))
+ goto error;
}
-
- slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
- (void *) &slotbuf.slotname,
- HASH_ENTER, NULL);
- memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
- break;
- }
-
- /*
- * 'S' A PgStat_StatSubEntry struct describing subscription
- * statistics.
- */
- case 'S':
- {
- PgStat_StatSubEntry subbuf;
- PgStat_StatSubEntry *subentry;
-
- if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin)
- != sizeof(PgStat_StatSubEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- if (subscriptionStatHash == NULL)
+ else
{
- HASHCTL hash_ctl;
-
- hash_ctl.keysize = sizeof(Oid);
- hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- subscriptionStatHash = hash_create("Subscription hash",
- PGSTAT_SUBSCRIPTION_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- }
-
- subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash,
- (void *) &subbuf.subid,
- HASH_ENTER, NULL);
-
- memcpy(subentry, &subbuf, sizeof(subbuf));
- break;
- }
-
- case 'E':
- goto done;
+ /* stats entry identified by name on disk (e.g. slots) */
+ const PgStat_KindInfo *kind_info = NULL;
+ PgStat_Kind kind;
+ NameData name;
- default:
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
- }
-
-done:
- FreeFile(fpin);
-
- /* If requested to read the permanent file, also get rid of it. */
- if (permanent)
- {
- elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
- unlink(statfile);
- }
+ if (!read_chunk_s(fpin, &kind))
+ goto error;
+ if (!read_chunk_s(fpin, &name))
+ goto error;
+ if (!pgstat_is_kind_valid(kind))
+ goto error;
- return dbhash;
-}
+ kind_info = pgstat_get_kind_info(kind);
+ if (!kind_info->from_serialized_name)
+ goto error;
-/*
- * Reads in the existing statistics collector file for the given database,
- * filling the passed-in tables and functions hash tables.
- *
- * As in pgstat_read_statsfiles, if the permanent file is requested, it is
- * removed after reading.
- *
- * Note: this code has the ability to skip storing per-table or per-function
- * data, if NULL is passed for the corresponding hashtable. That's not used
- * at the moment though.
- */
-static void
-pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- bool permanent)
-{
- PgStat_StatTabEntry *tabentry;
- PgStat_StatTabEntry tabbuf;
- PgStat_StatFuncEntry funcbuf;
- PgStat_StatFuncEntry *funcentry;
- FILE *fpin;
- int32 format_id;
- bool found;
- char statfile[MAXPGPATH];
+ if (!kind_info->from_serialized_name(&name, &key))
+ {
+ /* skip over data for entry we don't care about */
+ if (fseek(fpin, pgstat_get_entry_len(kind), SEEK_CUR) != 0)
+ goto error;
- get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
+ continue;
+ }
- /*
- * Try to open the stats file. If it doesn't exist, the backends simply
- * return zero for anything and the collector simply starts from scratch
- * with empty counters.
- *
- * ENOENT is a possibility if the stats collector is not running or has
- * not yet written the stats file the first time. Any other failure
- * condition is suspicious.
- */
- if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
- {
- if (errno != ENOENT)
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errcode_for_file_access(),
- errmsg("could not open statistics file \"%s\": %m",
- statfile)));
- return;
- }
-
- /*
- * Verify it's of the expected format.
- */
- if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
- format_id != PGSTAT_FILE_FORMAT_ID)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- goto done;
- }
-
- /*
- * We found an existing collector stats file. Read it and put all the
- * hashtable entries into place.
- */
- for (;;)
- {
- switch (fgetc(fpin))
- {
- /*
- * 'T' A PgStat_StatTabEntry follows.
- */
- case 'T':
- if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
- fpin) != sizeof(PgStat_StatTabEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- /*
- * Skip if table data not wanted.
- */
- if (tabhash == NULL)
- break;
+ Assert(key.kind == kind);
+ }
- tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
- (void *) &tabbuf.tableid,
- HASH_ENTER, &found);
+ /*
+ * This intentionally doesn't use pgstat_get_entry_ref() -
+ * putting all stats into checkpointer's
+ * pgStatEntryRefHash would be wasted effort and memory.
+ */
+ p = dshash_find_or_insert(pgStatLocal.shared_hash, &key, &found);
- if (found)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
+ /* don't allow duplicate entries */
+ if (found)
+ {
+ dshash_release_lock(pgStatLocal.shared_hash, p);
+ elog(WARNING, "found duplicate stats entry %d/%u/%u",
+ key.kind, key.dboid, key.objoid);
+ goto error;
+ }
- memcpy(tabentry, &tabbuf, sizeof(tabbuf));
- break;
+ header = pgstat_init_entry(key.kind, p);
+ dshash_release_lock(pgStatLocal.shared_hash, p);
- /*
- * 'F' A PgStat_StatFuncEntry follows.
- */
- case 'F':
- if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
- fpin) != sizeof(PgStat_StatFuncEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
+ if (!read_chunk(fpin,
+ pgstat_get_entry_data(key.kind, header),
+ pgstat_get_entry_len(key.kind)))
+ goto error;
- /*
- * Skip if function data not wanted.
- */
- if (funchash == NULL)
break;
-
- funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
- (void *) &funcbuf.functionid,
- HASH_ENTER, &found);
-
- if (found)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- memcpy(funcentry, &funcbuf, sizeof(funcbuf));
- break;
-
- /*
- * 'E' The EOF marker of a complete stats file.
- */
- case 'E':
- goto done;
-
- default:
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
- }
-
-done:
- FreeFile(fpin);
-
- if (permanent)
- {
- elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
- unlink(statfile);
- }
-}
-
-/*
- * Attempt to determine the timestamp of the last db statfile write.
- * Returns true if successful; the timestamp is stored in *ts. The caller must
- * rely on timestamp stored in *ts iff the function returns true.
- *
- * This needs to be careful about handling databases for which no stats file
- * exists, such as databases without a stat entry or those not yet written:
- *
- * - if there's a database entry in the global file, return the corresponding
- * stats_timestamp value.
- *
- * - if there's no db stat entry (e.g. for a new or inactive database),
- * there's no stats_timestamp value, but also nothing to write so we return
- * the timestamp of the global statfile.
- */
-static bool
-pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
- TimestampTz *ts)
-{
- PgStat_StatDBEntry dbentry;
- PgStat_GlobalStats myGlobalStats;
- PgStat_ArchiverStats myArchiverStats;
- PgStat_WalStats myWalStats;
- PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
- PgStat_StatReplSlotEntry myReplSlotStats;
- PgStat_StatSubEntry mySubStats;
- FILE *fpin;
- int32 format_id;
- const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
-
- /*
- * Try to open the stats file. As above, anything but ENOENT is worthy of
- * complaining about.
- */
- if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
- {
- if (errno != ENOENT)
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errcode_for_file_access(),
- errmsg("could not open statistics file \"%s\": %m",
- statfile)));
- return false;
- }
-
- /*
- * Verify it's of the expected format.
- */
- if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
- format_id != PGSTAT_FILE_FORMAT_ID)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /*
- * Read global stats struct
- */
- if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
- fpin) != sizeof(myGlobalStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /*
- * Read archiver stats struct
- */
- if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
- fpin) != sizeof(myArchiverStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /*
- * Read WAL stats struct
- */
- if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /*
- * Read SLRU stats struct
- */
- if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"", statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /* By default, we're going to return the timestamp of the global file. */
- *ts = myGlobalStats.stats_timestamp;
-
- /*
- * We found an existing collector stats file. Read it and look for a
- * record for the requested database. If found, use its timestamp.
- */
- for (;;)
- {
- switch (fgetc(fpin))
- {
- /*
- * 'D' A PgStat_StatDBEntry struct describing a database
- * follows.
- */
- case 'D':
- if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
- fpin) != offsetof(PgStat_StatDBEntry, tables))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- FreeFile(fpin);
- return false;
- }
-
- /*
- * If this is the DB we're looking for, save its timestamp and
- * we're done.
- */
- if (dbentry.databaseid == databaseid)
- {
- *ts = dbentry.stats_timestamp;
- goto done;
- }
-
- break;
-
- /*
- * 'R' A PgStat_StatReplSlotEntry struct describing a
- * replication slot follows.
- */
- case 'R':
- if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
- != sizeof(PgStat_StatReplSlotEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- FreeFile(fpin);
- return false;
- }
- break;
-
- /*
- * 'S' A PgStat_StatSubEntry struct describing subscription
- * statistics follows.
- */
- case 'S':
- if (fread(&mySubStats, 1, sizeof(PgStat_StatSubEntry), fpin)
- != sizeof(PgStat_StatSubEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- FreeFile(fpin);
- return false;
}
- break;
-
case 'E':
goto done;
default:
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- FreeFile(fpin);
- return false;
- }
+ goto error;
}
}
done:
FreeFile(fpin);
- return true;
-}
-
-/*
- * If not already done, read the statistics collector stats file into
- * some hash tables. The results will be kept until pgstat_clear_snapshot()
- * is called (typically, at end of transaction).
- */
-static void
-backend_read_statsfile(void)
-{
- TimestampTz min_ts = 0;
- TimestampTz ref_ts = 0;
- Oid inquiry_db;
- int count;
-
- pgstat_assert_is_up();
-
- /* already read it? */
- if (pgStatDBHash)
- return;
- Assert(!pgStatRunningInCollector);
-
- /*
- * In a normal backend, we check staleness of the data for our own DB, and
- * so we send MyDatabaseId in inquiry messages. In the autovac launcher,
- * check staleness of the shared-catalog data, and send InvalidOid in
- * inquiry messages so as not to force writing unnecessary data.
- */
- if (IsAutoVacuumLauncherProcess())
- inquiry_db = InvalidOid;
- else
- inquiry_db = MyDatabaseId;
-
- /*
- * Loop until fresh enough stats file is available or we ran out of time.
- * The stats inquiry message is sent repeatedly in case collector drops
- * it; but not every single time, as that just swamps the collector.
- */
- for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
- {
- bool ok;
- TimestampTz file_ts = 0;
- TimestampTz cur_ts;
-
- CHECK_FOR_INTERRUPTS();
-
- ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
-
- cur_ts = GetCurrentTimestamp();
- /* Calculate min acceptable timestamp, if we didn't already */
- if (count == 0 || cur_ts < ref_ts)
- {
- /*
- * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
- * msec before now. This indirectly ensures that the collector
- * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
- * an autovacuum worker, however, we want a lower delay to avoid
- * using stale data, so we use PGSTAT_RETRY_DELAY (since the
- * number of workers is low, this shouldn't be a problem).
- *
- * We don't recompute min_ts after sleeping, except in the
- * unlikely case that cur_ts went backwards. So we might end up
- * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
- * practice that shouldn't happen, though, as long as the sleep
- * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
- * tell the collector that our cutoff time is less than what we'd
- * actually accept.
- */
- ref_ts = cur_ts;
- if (IsAutoVacuumWorkerProcess())
- min_ts = TimestampTzPlusMilliseconds(ref_ts,
- -PGSTAT_RETRY_DELAY);
- else
- min_ts = TimestampTzPlusMilliseconds(ref_ts,
- -PGSTAT_STAT_INTERVAL);
- }
-
- /*
- * If the file timestamp is actually newer than cur_ts, we must have
- * had a clock glitch (system time went backwards) or there is clock
- * skew between our processor and the stats collector's processor.
- * Accept the file, but send an inquiry message anyway to make
- * pgstat_recv_inquiry do a sanity check on the collector's time.
- */
- if (ok && file_ts > cur_ts)
- {
- /*
- * A small amount of clock skew between processors isn't terribly
- * surprising, but a large difference is worth logging. We
- * arbitrarily define "large" as 1000 msec.
- */
- if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
- {
- char *filetime;
- char *mytime;
-
- /* Copy because timestamptz_to_str returns a static buffer */
- filetime = pstrdup(timestamptz_to_str(file_ts));
- mytime = pstrdup(timestamptz_to_str(cur_ts));
- ereport(LOG,
- (errmsg("statistics collector's time %s is later than backend local time %s",
- filetime, mytime)));
- pfree(filetime);
- pfree(mytime);
- }
-
- pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
- break;
- }
-
- /* Normal acceptance case: file is not older than cutoff time */
- if (ok && file_ts >= min_ts)
- break;
-
- /* Not there or too old, so kick the collector and wait a bit */
- if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
- pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
-
- pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
- }
-
- if (count >= PGSTAT_POLL_LOOP_COUNT)
- ereport(LOG,
- (errmsg("using stale statistics instead of current ones "
- "because stats collector is not responding")));
-
- /*
- * Autovacuum launcher wants stats about all databases, but a shallow read
- * is sufficient. Regular backends want a deep read for just the tables
- * they can see (MyDatabaseId + shared catalogs).
- */
- if (IsAutoVacuumLauncherProcess())
- pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
- else
- pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
-}
-
-/*
- * Do we need to write out any stats files?
- */
-static bool
-pgstat_write_statsfile_needed(void)
-{
- if (pending_write_requests != NIL)
- return true;
-
- /* Everything was written recently */
- return false;
-}
-
-/*
- * Checks whether stats for a particular DB need to be written to a file.
- */
-static bool
-pgstat_db_requested(Oid databaseid)
-{
- /*
- * If any requests are outstanding at all, we should write the stats for
- * shared catalogs (the "database" with OID 0). This ensures that
- * backends will see up-to-date stats for shared catalogs, even though
- * they send inquiry messages mentioning only their own DB.
- */
- if (databaseid == InvalidOid && pending_write_requests != NIL)
- return true;
-
- /* Search to see if there's an open request to write this database. */
- if (list_member_oid(pending_write_requests, databaseid))
- return true;
-
- return false;
-}
-
-
-/* ------------------------------------------------------------
- * stats collector message processing functions
- * ------------------------------------------------------------
- */
-
-/*
- * Process stat inquiry requests.
- */
-static void
-pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
-
- /*
- * If there's already a write request for this DB, there's nothing to do.
- *
- * Note that if a request is found, we return early and skip the below
- * check for clock skew. This is okay, since the only way for a DB
- * request to be present in the list is that we have been here since the
- * last write round. It seems sufficient to check for clock skew once per
- * write round.
- */
- if (list_member_oid(pending_write_requests, msg->databaseid))
- return;
-
- /*
- * Check to see if we last wrote this database at a time >= the requested
- * cutoff time. If so, this is a stale request that was generated before
- * we updated the DB file, and we don't need to do so again.
- *
- * If the requestor's local clock time is older than stats_timestamp, we
- * should suspect a clock glitch, ie system time going backwards; though
- * the more likely explanation is just delayed message receipt. It is
- * worth expending a GetCurrentTimestamp call to be sure, since a large
- * retreat in the system clock reading could otherwise cause us to neglect
- * to update the stats file for a long time.
- */
- dbentry = pgstat_get_db_entry(msg->databaseid, false);
- if (dbentry == NULL)
- {
- /*
- * We have no data for this DB. Enter a write request anyway so that
- * the global stats will get updated. This is needed to prevent
- * backend_read_statsfile from waiting for data that we cannot supply,
- * in the case of a new DB that nobody has yet reported any stats for.
- * See the behavior of pgstat_read_db_statsfile_timestamp.
- */
- }
- else if (msg->clock_time < dbentry->stats_timestamp)
- {
- TimestampTz cur_ts = GetCurrentTimestamp();
-
- if (cur_ts < dbentry->stats_timestamp)
- {
- /*
- * Sure enough, time went backwards. Force a new stats file write
- * to get back in sync; but first, log a complaint.
- */
- char *writetime;
- char *mytime;
-
- /* Copy because timestamptz_to_str returns a static buffer */
- writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
- mytime = pstrdup(timestamptz_to_str(cur_ts));
- ereport(LOG,
- (errmsg("stats_timestamp %s is later than collector's time %s for database %u",
- writetime, mytime, dbentry->databaseid)));
- pfree(writetime);
- pfree(mytime);
- }
- else
- {
- /*
- * Nope, it's just an old request. Assuming msg's clock_time is
- * >= its cutoff_time, it must be stale, so we can ignore it.
- */
- return;
- }
- }
- else if (msg->cutoff_time <= dbentry->stats_timestamp)
- {
- /* Stale request, ignore it */
- return;
- }
-
- /*
- * We need to write this DB, so create a request.
- */
- pending_write_requests = lappend_oid(pending_write_requests,
- msg->databaseid);
-}
-
-/*
- * Count what the backend has done.
- */
-static void
-pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
- int i;
- bool found;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- /*
- * Update database-wide stats.
- */
- dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
- dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
- dbentry->n_block_read_time += msg->m_block_read_time;
- dbentry->n_block_write_time += msg->m_block_write_time;
-
- dbentry->total_session_time += msg->m_session_time;
- dbentry->total_active_time += msg->m_active_time;
- dbentry->total_idle_in_xact_time += msg->m_idle_in_xact_time;
-
- /*
- * Process all table entries in the message.
- */
- for (i = 0; i < msg->m_nentries; i++)
- {
- PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
-
- tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
- (void *) &(tabmsg->t_id),
- HASH_ENTER, &found);
-
- if (!found)
- {
- /*
- * If it's a new table entry, initialize counters to the values we
- * just got.
- */
- tabentry->numscans = tabmsg->t_counts.t_numscans;
- tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
- tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
- tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
- tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
- tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
- tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
- tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
- tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
- tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
- tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted;
- tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
- tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
-
- tabentry->vacuum_timestamp = 0;
- tabentry->vacuum_count = 0;
- tabentry->autovac_vacuum_timestamp = 0;
- tabentry->autovac_vacuum_count = 0;
- tabentry->analyze_timestamp = 0;
- tabentry->analyze_count = 0;
- tabentry->autovac_analyze_timestamp = 0;
- tabentry->autovac_analyze_count = 0;
- }
- else
- {
- /*
- * Otherwise add the values to the existing entry.
- */
- tabentry->numscans += tabmsg->t_counts.t_numscans;
- tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
- tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
- tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
- tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
- tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
- tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
-
- /*
- * If table was truncated/dropped, first reset the live/dead
- * counters.
- */
- if (tabmsg->t_counts.t_truncdropped)
- {
- tabentry->n_live_tuples = 0;
- tabentry->n_dead_tuples = 0;
- tabentry->inserts_since_vacuum = 0;
- }
- tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
- tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
- tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
- tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted;
- tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
- tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
- }
-
- /* Clamp n_live_tuples in case of negative delta_live_tuples */
- tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
- /* Likewise for n_dead_tuples */
- tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
-
- /*
- * Add per-table stats to the per-database entry, too.
- */
- dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
- dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
- dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
- dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
- dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
- dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
- dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
- }
-}
-
-/*
- * Arrange for dead table removal.
- */
-static void
-pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
- int i;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
- /*
- * No need to purge if we don't even know the database.
- */
- if (!dbentry || !dbentry->tables)
- return;
-
- /*
- * Process all table entries in the message.
- */
- for (i = 0; i < msg->m_nentries; i++)
- {
- /* Remove from hashtable if present; we don't care if it's not. */
- (void) hash_search(dbentry->tables,
- (void *) &(msg->m_tableid[i]),
- HASH_REMOVE, NULL);
- }
-}
-
-/*
- * Arrange for dead database removal
- */
-static void
-pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
-{
- Oid dbid = msg->m_databaseid;
- PgStat_StatDBEntry *dbentry;
-
- /*
- * Lookup the database in the hashtable.
- */
- dbentry = pgstat_get_db_entry(dbid, false);
-
- /*
- * If found, remove it (along with the db statfile).
- */
- if (dbentry)
- {
- char statfile[MAXPGPATH];
-
- get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
-
- elog(DEBUG2, "removing stats file \"%s\"", statfile);
- unlink(statfile);
-
- if (dbentry->tables != NULL)
- hash_destroy(dbentry->tables);
- if (dbentry->functions != NULL)
- hash_destroy(dbentry->functions);
-
- if (hash_search(pgStatDBHash,
- (void *) &dbid,
- HASH_REMOVE, NULL) == NULL)
- ereport(ERROR,
- (errmsg("database hash table corrupted during cleanup --- abort")));
- }
-}
-
-/*
- * Reset the statistics for the specified database.
- */
-static void
-pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- /*
- * Lookup the database in the hashtable. Nothing to do if not there.
- */
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
- if (!dbentry)
- return;
-
- /*
- * We simply throw away all the database's table entries by recreating a
- * new hash table for them.
- */
- if (dbentry->tables != NULL)
- hash_destroy(dbentry->tables);
- if (dbentry->functions != NULL)
- hash_destroy(dbentry->functions);
-
- dbentry->tables = NULL;
- dbentry->functions = NULL;
-
- /*
- * Reset database-level stats, too. This creates empty hash tables for
- * tables and functions.
- */
- reset_dbentry_counters(dbentry);
-}
-
-/*
- * Reset some shared statistics of the cluster.
- */
-static void
-pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
-{
- if (msg->m_resettarget == PGSTAT_KIND_BGWRITER ||
- msg->m_resettarget == PGSTAT_KIND_CHECKPOINTER)
- {
- /*
- * Reset the global, bgwriter and checkpointer statistics for the
- * cluster.
- */
- memset(&globalStats, 0, sizeof(globalStats));
- globalStats.bgwriter.stat_reset_timestamp = GetCurrentTimestamp();
- }
- else if (msg->m_resettarget == PGSTAT_KIND_ARCHIVER)
- {
- /* Reset the archiver statistics for the cluster. */
- memset(&archiverStats, 0, sizeof(archiverStats));
- archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
- }
- else if (msg->m_resettarget == PGSTAT_KIND_WAL)
- {
- /* Reset the WAL statistics for the cluster. */
- memset(&walStats, 0, sizeof(walStats));
- walStats.stat_reset_timestamp = GetCurrentTimestamp();
- }
-
- /*
- * Presumably the sender of this message validated the target, don't
- * complain here if it's not valid
- */
-}
-
-/*
- * Reset a statistics for a single object, which may be of current
- * database or shared across all databases in the cluster.
- */
-static void
-pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- if (IsSharedRelation(msg->m_objectid))
- dbentry = pgstat_get_db_entry(InvalidOid, false);
- else
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
- if (!dbentry)
- return;
-
- /* Set the reset timestamp for the whole database */
- dbentry->stat_reset_timestamp = GetCurrentTimestamp();
-
- /* Remove object if it exists, ignore it if not */
- if (msg->m_resettype == PGSTAT_KIND_RELATION)
- (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
- HASH_REMOVE, NULL);
- else if (msg->m_resettype == PGSTAT_KIND_FUNCTION)
- (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
- HASH_REMOVE, NULL);
-}
-
-/*
- * Reset some SLRU statistics of the cluster.
- */
-static void
-pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
-{
- int i;
- TimestampTz ts = GetCurrentTimestamp();
-
- for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
- {
- /* reset entry with the given index, or all entries (index is -1) */
- if ((msg->m_index == -1) || (msg->m_index == i))
- {
- memset(&slruStats[i], 0, sizeof(slruStats[i]));
- slruStats[i].stat_reset_timestamp = ts;
- }
- }
-}
-
-/*
- * Reset some replication slot statistics of the cluster.
- */
-static void
-pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
- int len)
-{
- PgStat_StatReplSlotEntry *slotent;
- TimestampTz ts;
-
- /* Return if we don't have replication slot statistics */
- if (replSlotStatHash == NULL)
- return;
-
- ts = GetCurrentTimestamp();
- if (msg->clearall)
- {
- HASH_SEQ_STATUS sstat;
-
- hash_seq_init(&sstat, replSlotStatHash);
- while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
- pgstat_reset_replslot_entry(slotent, ts);
- }
- else
- {
- /* Get the slot statistics to reset */
- slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
-
- /*
- * Nothing to do if the given slot entry is not found. This could
- * happen when the slot with the given name is removed and the
- * corresponding statistics entry is also removed before receiving the
- * reset message.
- */
- if (!slotent)
- return;
-
- /* Reset the stats for the requested replication slot */
- pgstat_reset_replslot_entry(slotent, ts);
- }
-}
-
-/*
- * Reset some subscription statistics of the cluster.
- */
-static void
-pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len)
-{
- PgStat_StatSubEntry *subentry;
- TimestampTz ts;
-
- /* Return if we don't have replication subscription statistics */
- if (subscriptionStatHash == NULL)
- return;
-
- ts = GetCurrentTimestamp();
- if (!OidIsValid(msg->m_subid))
- {
- HASH_SEQ_STATUS sstat;
-
- /* Clear all subscription counters */
- hash_seq_init(&sstat, subscriptionStatHash);
- while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL)
- pgstat_reset_subscription(subentry, ts);
- }
- else
- {
- /* Get the subscription statistics to reset */
- subentry = pgstat_get_subscription_entry(msg->m_subid, false);
-
- /*
- * Nothing to do if the given subscription entry is not found. This
- * could happen when the subscription with the subid is removed and
- * the corresponding statistics entry is also removed before receiving
- * the reset message.
- */
- if (!subentry)
- return;
-
- /* Reset the stats for the requested subscription */
- pgstat_reset_subscription(subentry, ts);
- }
-}
-
-/*
- * Process an autovacuum signaling message.
- */
-static void
-pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- /*
- * Store the last autovacuum time in the database's hashtable entry.
- */
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- dbentry->last_autovac_time = msg->m_start_time;
-}
-
-/*
- * Process a VACUUM message.
- */
-static void
-pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
-
- /*
- * Store the data in the table's hashtable entry.
- */
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
- tabentry->n_live_tuples = msg->m_live_tuples;
- tabentry->n_dead_tuples = msg->m_dead_tuples;
+ elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
+ unlink(statfile);
- /*
- * It is quite possible that a non-aggressive VACUUM ended up skipping
- * various pages, however, we'll zero the insert counter here regardless.
- * It's currently used only to track when we need to perform an "insert"
- * autovacuum, which are mainly intended to freeze newly inserted tuples.
- * Zeroing this may just mean we'll not try to vacuum the table again
- * until enough tuples have been inserted to trigger another insert
- * autovacuum. An anti-wraparound autovacuum will catch any persistent
- * stragglers.
- */
- tabentry->inserts_since_vacuum = 0;
-
- if (msg->m_autovacuum)
- {
- tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
- tabentry->autovac_vacuum_count++;
- }
- else
- {
- tabentry->vacuum_timestamp = msg->m_vacuumtime;
- tabentry->vacuum_count++;
- }
-}
-
-/*
- * Process an ANALYZE message.
- */
-static void
-pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatTabEntry *tabentry;
-
- /*
- * Store the data in the table's hashtable entry.
- */
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
-
- tabentry->n_live_tuples = msg->m_live_tuples;
- tabentry->n_dead_tuples = msg->m_dead_tuples;
-
- /*
- * If commanded, reset changes_since_analyze to zero. This forgets any
- * changes that were committed while the ANALYZE was in progress, but we
- * have no good way to estimate how many of those there were.
- */
- if (msg->m_resetcounter)
- tabentry->changes_since_analyze = 0;
-
- if (msg->m_autovacuum)
- {
- tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
- tabentry->autovac_analyze_count++;
- }
- else
- {
- tabentry->analyze_timestamp = msg->m_analyzetime;
- tabentry->analyze_count++;
- }
-}
-
-/*
- * Process a ARCHIVER message.
- */
-static void
-pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
-{
- if (msg->m_failed)
- {
- /* Failed archival attempt */
- ++archiverStats.failed_count;
- memcpy(archiverStats.last_failed_wal, msg->m_xlog,
- sizeof(archiverStats.last_failed_wal));
- archiverStats.last_failed_timestamp = msg->m_timestamp;
- }
- else
- {
- /* Successful archival operation */
- ++archiverStats.archived_count;
- memcpy(archiverStats.last_archived_wal, msg->m_xlog,
- sizeof(archiverStats.last_archived_wal));
- archiverStats.last_archived_timestamp = msg->m_timestamp;
- }
-}
-
-/*
- * Process a BGWRITER message.
- */
-static void
-pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
-{
- globalStats.bgwriter.buf_written_clean += msg->m_buf_written_clean;
- globalStats.bgwriter.maxwritten_clean += msg->m_maxwritten_clean;
- globalStats.bgwriter.buf_alloc += msg->m_buf_alloc;
-}
-
-/*
- * Process a CHECKPOINTER message.
- */
-static void
-pgstat_recv_checkpointer(PgStat_MsgCheckpointer *msg, int len)
-{
- globalStats.checkpointer.timed_checkpoints += msg->m_timed_checkpoints;
- globalStats.checkpointer.requested_checkpoints += msg->m_requested_checkpoints;
- globalStats.checkpointer.checkpoint_write_time += msg->m_checkpoint_write_time;
- globalStats.checkpointer.checkpoint_sync_time += msg->m_checkpoint_sync_time;
- globalStats.checkpointer.buf_written_checkpoints += msg->m_buf_written_checkpoints;
- globalStats.checkpointer.buf_written_backend += msg->m_buf_written_backend;
- globalStats.checkpointer.buf_fsync_backend += msg->m_buf_fsync_backend;
-}
-
-/*
- * Process a WAL message.
- */
-static void
-pgstat_recv_wal(PgStat_MsgWal *msg, int len)
-{
- walStats.wal_records += msg->m_wal_records;
- walStats.wal_fpi += msg->m_wal_fpi;
- walStats.wal_bytes += msg->m_wal_bytes;
- walStats.wal_buffers_full += msg->m_wal_buffers_full;
- walStats.wal_write += msg->m_wal_write;
- walStats.wal_sync += msg->m_wal_sync;
- walStats.wal_write_time += msg->m_wal_write_time;
- walStats.wal_sync_time += msg->m_wal_sync_time;
-}
-
-/*
- * Process a SLRU message.
- */
-static void
-pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
-{
- slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed;
- slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit;
- slruStats[msg->m_index].blocks_read += msg->m_blocks_read;
- slruStats[msg->m_index].blocks_written += msg->m_blocks_written;
- slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists;
- slruStats[msg->m_index].flush += msg->m_flush;
- slruStats[msg->m_index].truncate += msg->m_truncate;
-}
-
-/*
- * Process a RECOVERYCONFLICT message.
- */
-static void
-pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- switch (msg->m_reason)
- {
- case PROCSIG_RECOVERY_CONFLICT_DATABASE:
-
- /*
- * Since we drop the information about the database as soon as it
- * replicates, there is no point in counting these conflicts.
- */
- break;
- case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
- dbentry->n_conflict_tablespace++;
- break;
- case PROCSIG_RECOVERY_CONFLICT_LOCK:
- dbentry->n_conflict_lock++;
- break;
- case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
- dbentry->n_conflict_snapshot++;
- break;
- case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
- dbentry->n_conflict_bufferpin++;
- break;
- case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
- dbentry->n_conflict_startup_deadlock++;
- break;
- }
-}
-
-/*
- * Process a DEADLOCK message.
- */
-static void
-pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- dbentry->n_deadlocks++;
-}
-
-/*
- * Process a CHECKSUMFAILURE message.
- */
-static void
-pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- dbentry->n_checksum_failures += msg->m_failurecount;
- dbentry->last_checksum_failure = msg->m_failure_time;
-}
-
-/*
- * Process a REPLSLOT message.
- */
-static void
-pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
-{
- if (msg->m_drop)
- {
- Assert(!msg->m_create);
-
- /* Remove the replication slot statistics with the given name */
- if (replSlotStatHash != NULL)
- (void) hash_search(replSlotStatHash,
- (void *) &(msg->m_slotname),
- HASH_REMOVE,
- NULL);
- }
- else
- {
- PgStat_StatReplSlotEntry *slotent;
-
- slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
- Assert(slotent);
-
- if (msg->m_create)
- {
- /*
- * If the message for dropping the slot with the same name gets
- * lost, slotent has stats for the old slot. So we initialize all
- * counters at slot creation.
- */
- pgstat_reset_replslot_entry(slotent, 0);
- }
- else
- {
- /* Update the replication slot statistics */
- slotent->spill_txns += msg->m_spill_txns;
- slotent->spill_count += msg->m_spill_count;
- slotent->spill_bytes += msg->m_spill_bytes;
- slotent->stream_txns += msg->m_stream_txns;
- slotent->stream_count += msg->m_stream_count;
- slotent->stream_bytes += msg->m_stream_bytes;
- slotent->total_txns += msg->m_total_txns;
- slotent->total_bytes += msg->m_total_bytes;
- }
- }
-}
-
-/*
- * Process a CONNECT message.
- */
-static void
-pgstat_recv_connect(PgStat_MsgConnect *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
- dbentry->n_sessions++;
-}
-
-/*
- * Process a DISCONNECT message.
- */
-static void
-pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- switch (msg->m_cause)
- {
- case DISCONNECT_NOT_YET:
- case DISCONNECT_NORMAL:
- /* we don't collect these */
- break;
- case DISCONNECT_CLIENT_EOF:
- dbentry->n_sessions_abandoned++;
- break;
- case DISCONNECT_FATAL:
- dbentry->n_sessions_fatal++;
- break;
- case DISCONNECT_KILLED:
- dbentry->n_sessions_killed++;
- break;
- }
-}
+ return;
-/*
- * Process a TEMPFILE message.
- */
-static void
-pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
+error:
+ ereport(LOG,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+ /* Set the current timestamp as reset timestamp */
+ pgstat_reset_after_failure(ts);
- dbentry->n_temp_bytes += msg->m_filesize;
- dbentry->n_temp_files += 1;
+ goto done;
}
/*
- * Count what the backend has done.
+ * Helper to reset / drop stats after restoring stats from disk failed,
+ * potentially after already loading parts.
*/
static void
-pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
+pgstat_reset_after_failure(TimestampTz ts)
{
- PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
- PgStat_StatDBEntry *dbentry;
- PgStat_StatFuncEntry *funcentry;
- int i;
- bool found;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
- /*
- * Process all function entries in the message.
- */
- for (i = 0; i < msg->m_nentries; i++, funcmsg++)
+ /* reset fixed-numbered stats */
+ for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++)
{
- funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
- (void *) &(funcmsg->f_id),
- HASH_ENTER, &found);
-
- if (!found)
- {
- /*
- * If it's a new function entry, initialize counters to the values
- * we just got.
- */
- funcentry->f_numcalls = funcmsg->f_numcalls;
- funcentry->f_total_time = funcmsg->f_total_time;
- funcentry->f_self_time = funcmsg->f_self_time;
- }
- else
- {
- /*
- * Otherwise add the values to the existing entry.
- */
- funcentry->f_numcalls += funcmsg->f_numcalls;
- funcentry->f_total_time += funcmsg->f_total_time;
- funcentry->f_self_time += funcmsg->f_self_time;
- }
- }
-}
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
-/*
- * Arrange for dead function removal.
- */
-static void
-pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
-{
- PgStat_StatDBEntry *dbentry;
- int i;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
- /*
- * No need to purge if we don't even know the database.
- */
- if (!dbentry || !dbentry->functions)
- return;
+ if (!kind_info->fixed_amount)
+ continue;
- /*
- * Process all function entries in the message.
- */
- for (i = 0; i < msg->m_nentries; i++)
- {
- /* Remove from hashtable if present; we don't care if it's not. */
- (void) hash_search(dbentry->functions,
- (void *) &(msg->m_functionid[i]),
- HASH_REMOVE, NULL);
+ kind_info->reset_all_cb(ts);
}
-}
-/*
- * Process a SUBSCRIPTIONDROP message.
- */
-static void
-pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len)
-{
- /* Return if we don't have replication subscription statistics */
- if (subscriptionStatHash == NULL)
- return;
-
- /* Remove from hashtable if present; we don't care if it's not */
- (void) hash_search(subscriptionStatHash, (void *) &(msg->m_subid),
- HASH_REMOVE, NULL);
-}
-
-/*
- * Process a SUBSCRIPTIONERROR message.
- */
-static void
-pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len)
-{
- PgStat_StatSubEntry *subentry;
-
- /* Get the subscription stats */
- subentry = pgstat_get_subscription_entry(msg->m_subid, true);
- Assert(subentry);
-
- if (msg->m_is_apply_error)
- subentry->apply_error_count++;
- else
- subentry->sync_error_count++;
+ /* and drop variable-numbered ones */
+ pgstat_drop_all_entries();
}