diff options
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 4513 |
1 files changed, 955 insertions, 3558 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 20c4629e55c..a9f3a7ef492 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1,100 +1,161 @@ /* ---------- * pgstat.c + * Infrastructure for the cumulative statistics system. * - * All the statistics collector stuff hacked up in one big, ugly file. + * The cumulative statistics system accumulates statistics for different kinds + * of objects. Some kinds of statistics are collected for a fixed number of + * objects (most commonly 1), e.g., checkpointer statistics. Other kinds of + * statistics are collected for a varying number of objects + * (e.g. relations). See PgStat_KindInfo for a list of currently handled + * statistics. * - * TODO: - Separate collector, postmaster and backend stuff - * into different files. + * Statistics are loaded from the filesystem during startup (by the startup + * process), unless preceded by a crash, in which case all stats are + * discarded. They are written out by the checkpointer process just before + * shutting down, except when shutting down in immediate mode. * - * - Add some automatic call for pgstat vacuuming. + * Fixed-numbered stats are stored in plain (non-dynamic) shared memory. * - * - Add a pgstat config column to pg_database, so this - * entire thing can be enabled/disabled on a per db basis. + * Statistics for variable-numbered objects are stored in dynamic shared + * memory and can be found via a dshash hashtable. The statistics counters are + * not part of the dshash entry (PgStatShared_HashEntry) directly, but are + * separately allocated (PgStatShared_HashEntry->body). The separate + * allocation allows different kinds of statistics to be stored in the same + * hashtable without wasting space in PgStatShared_HashEntry. * - * Copyright (c) 2001-2022, PostgreSQL Global Development Group + * Variable-numbered stats are addressed by PgStat_HashKey while running. It + * is not possible to have statistics for an object that cannot be addressed + * that way at runtime. A wider identifier can be used when serializing to + * disk (used for replication slot stats). * - * src/backend/postmaster/pgstat.c + * To avoid contention on the shared hashtable, each backend has a + * backend-local hashtable (pgStatEntryRefHash) in front of the shared + * hashtable, containing references (PgStat_EntryRef) to shared hashtable + * entries. The shared hashtable only needs to be accessed when no prior + * reference is found in the local hashtable. Besides pointing to the the + * shared hashtable entry (PgStatShared_HashEntry) PgStat_EntryRef also + * contains a pointer to the the shared statistics data, as a process-local + * address, to reduce access costs. + * + * The names for structs stored in shared memory are prefixed with + * PgStatShared instead of PgStat. Each stats entry in shared memory is + * protected by a dedicated lwlock. + * + * Most stats updates are first accumulated locally in each process as pending + * entries, then later flushed to shared memory (just after commit, or by + * idle-timeout). This practically eliminates contention on individual stats + * entries. For most kinds of variable-numbered pending stats data is stored + * in PgStat_EntryRef->pending. All entries with pending data are in the + * pgStatPending list. Pending statistics updates are flushed out by + * pgstat_report_stat(). + * + * The behavior of different kinds of statistics is determined by the kind's + * entry in pgstat_kind_infos, see PgStat_KindInfo for details. + * + * The consistency of read accesses to statistics can be configured using the + * stats_fetch_consistency GUC (see config.sgml and monitoring.sgml for the + * settings). When using PGSTAT_FETCH_CONSISTENCY_CACHE or + * PGSTAT_FETCH_CONSISTENCY_SNAPSHOT statistics are stored in + * pgStatLocal.snapshot. + * + * To keep things manageable, stats handling is split across several + * files. Infrastructure pieces are in: + * - pgstat.c - this file, to tie it all together + * - pgstat_shmem.c - nearly everything dealing with shared memory, including + * the maintenance of hashtable entries + * - pgstat_xact.c - transactional integration, including the transactional + * creation and dropping of stats entries + * + * Each statistics kind is handled in a dedicated file: + * - pgstat_archiver.c + * - pgstat_bgwriter.c + * - pgstat_checkpointer.c + * - pgstat_database.c + * - pgstat_function.c + * - pgstat_relation.c + * - pgstat_slru.c + * - pgstat_subscription.c + * - pgstat_wal.c + * + * Whenever possible infrastructure files should not contain code related to + * specific kinds of stats. + * + * + * Copyright (c) 2001-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/postmaster/pgstat.c * ---------- */ #include "postgres.h" #include <unistd.h> -#include <fcntl.h> -#include <sys/param.h> -#include <sys/time.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <signal.h> -#include <time.h> -#ifdef HAVE_SYS_SELECT_H -#include <sys/select.h> -#endif -#include "access/heapam.h" -#include "access/htup_details.h" -#include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" -#include "catalog/catalog.h" -#include "catalog/pg_database.h" -#include "catalog/pg_proc.h" -#include "catalog/pg_subscription.h" -#include "common/ip.h" -#include "libpq/libpq.h" -#include "libpq/pqsignal.h" -#include "mb/pg_wchar.h" -#include "miscadmin.h" +#include "lib/dshash.h" #include "pgstat.h" -#include "postmaster/autovacuum.h" -#include "postmaster/fork_process.h" -#include "postmaster/interrupt.h" -#include "postmaster/postmaster.h" -#include "replication/slot.h" -#include "replication/walsender.h" -#include "storage/backendid.h" -#include "storage/dsm.h" +#include "port/atomics.h" #include "storage/fd.h" #include "storage/ipc.h" -#include "storage/latch.h" -#include "storage/lmgr.h" +#include "storage/lwlock.h" #include "storage/pg_shmem.h" -#include "storage/proc.h" -#include "storage/procsignal.h" -#include "utils/builtins.h" +#include "storage/shmem.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" -#include "utils/ps_status.h" -#include "utils/rel.h" -#include "utils/snapmgr.h" #include "utils/timestamp.h" /* ---------- * Timer definitions. + * + * In milliseconds. * ---------- */ -#define PGSTAT_RETRY_DELAY 10 /* How long to wait between checks for a - * new file; in milliseconds. */ +/* minimum interval non-forced stats flushes.*/ +#define PGSTAT_MIN_INTERVAL 1000 +/* how long until to block flushing pending stats updates */ +#define PGSTAT_MAX_INTERVAL 60000 +/* when to call pgstat_report_stat() again, even when idle */ +#define PGSTAT_IDLE_INTERVAL 10000 -#define PGSTAT_MAX_WAIT_TIME 10000 /* Maximum time to wait for a stats - * file update; in milliseconds. */ +/* ---------- + * Initial size hints for the hash tables used in statistics. + * ---------- + */ -#define PGSTAT_INQ_INTERVAL 640 /* How often to ping the collector for a - * new file; in milliseconds. */ +#define PGSTAT_SNAPSHOT_HASH_SIZE 512 -#define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a - * failed statistics collector; in - * seconds. */ -#define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY) -#define PGSTAT_INQ_LOOP_COUNT (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY) +/* hash table for statistics snapshots entry */ +typedef struct PgStat_SnapshotEntry +{ + PgStat_HashKey key; + char status; /* for simplehash use */ + void *data; /* the stats data itself */ +} PgStat_SnapshotEntry; -/* Minimum receive buffer size for the collector's socket. */ -#define PGSTAT_MIN_RCVBUF (100 * 1024) + +/* ---------- + * Backend-local Hash Table Definitions + * ---------- + */ + +/* for stats snapshot entries */ +#define SH_PREFIX pgstat_snapshot +#define SH_ELEMENT_TYPE PgStat_SnapshotEntry +#define SH_KEY_TYPE PgStat_HashKey +#define SH_KEY key +#define SH_HASH_KEY(tb, key) \ + pgstat_hash_hash_key(&key, sizeof(PgStat_HashKey), NULL) +#define SH_EQUAL(tb, a, b) \ + pgstat_cmp_hash_key(&a, &b, sizeof(PgStat_HashKey), NULL) == 0 +#define SH_SCOPE static inline +#define SH_DEFINE +#define SH_DECLARE +#include "lib/simplehash.h" /* ---------- @@ -102,63 +163,18 @@ * ---------- */ -#ifdef EXEC_BACKEND -static pid_t pgstat_forkexec(void); -#endif +static void pgstat_write_statsfile(void); +static void pgstat_read_statsfile(void); -NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn(); - -static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); -static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, - Oid tableoid, bool create); -static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid, bool create); -static void pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts); -static void pgstat_write_statsfiles(bool permanent, bool allDbs); -static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); -static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); -static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - bool permanent); -static void backend_read_statsfile(void); - -static bool pgstat_write_statsfile_needed(void); -static bool pgstat_db_requested(Oid databaseid); - -static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); -static void pgstat_reset_replslot_entry(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts); - -static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); - -static void pgstat_setup_memcxt(void); - -static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len); -static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len); -static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len); -static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len); -static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len); -static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len); -static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); -static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); -static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); -static void pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len); -static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); -static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); -static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); -static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len); -static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); -static void pgstat_recv_checkpointer(PgStat_MsgCheckpointer *msg, int len); -static void pgstat_recv_wal(PgStat_MsgWal *msg, int len); -static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len); -static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); -static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); -static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); -static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); -static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); -static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); -static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); -static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); -static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); -static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len); -static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len); +static void pgstat_reset_after_failure(TimestampTz ts); + +static bool pgstat_flush_pending_entries(bool nowait); + +static void pgstat_prep_snapshot(void); +static void pgstat_build_snapshot(void); +static void pgstat_build_snapshot_fixed(PgStat_Kind kind); + +static inline bool pgstat_is_kind_valid(int ikind); /* ---------- @@ -167,6 +183,7 @@ static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int */ bool pgstat_track_counts = false; +int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE; /* ---------- @@ -184,44 +201,33 @@ char *pgstat_stat_tmpname = NULL; * ---------- */ -pgsocket pgStatSock = PGINVALID_SOCKET; +PgStat_LocalState pgStatLocal; /* ---------- * Local data + * + * NB: There should be only variables related to stats infrastructure here, + * not for specific kinds of stats. * ---------- */ -static struct sockaddr_storage pgStatAddr; - -static time_t last_pgstat_start_time; - -static bool pgStatRunningInCollector = false; - /* - * Info about current "snapshot" of stats file + * Memory contexts containing the pgStatEntryRefHash table, the + * pgStatSharedRef entries, and pending data respectively. Mostly to make it + * easier to track / attribute memory usage. */ -static MemoryContext pgStatLocalContext = NULL; -static HTAB *pgStatDBHash = NULL; -/* - * Cluster wide statistics, kept in the stats collector. - * Contains statistics that are not collected per database - * or per table. - */ -static PgStat_ArchiverStats archiverStats; -static PgStat_GlobalStats globalStats; -static PgStat_WalStats walStats; -static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; -static HTAB *replSlotStatHash = NULL; -static HTAB *subscriptionStatHash = NULL; +static MemoryContext pgStatPendingContext = NULL; /* - * List of OIDs of databases we need to write out. If an entry is InvalidOid, - * it means to write only the shared-catalog stats ("DB 0"); otherwise, we - * will write both that DB's data and the shared stats. + * Backend local list of PgStat_EntryRef with unflushed pending stats. + * + * Newly pending entries should only ever be added to the end of the list, + * otherwise pgstat_flush_pending_entries() might not see them immediately. */ -static List *pending_write_requests = NIL; +static dlist_head pgStatPending = DLIST_STATIC_INIT(pgStatPending); + /* * For assertions that check pgstat is not used before initialization / after @@ -233,455 +239,234 @@ static bool pgstat_is_shutdown = false; #endif -/* ------------------------------------------------------------ - * Public functions called from postmaster follow - * ------------------------------------------------------------ - */ - /* - * Called from postmaster at startup. Create the resources required - * by the statistics collector process. If unable to do so, do not - * fail --- better to let the postmaster start with stats collection - * disabled. + * The different kinds of statistics. + * + * If reasonably possible, handling specific to one kind of stats should go + * through this abstraction, rather than making more of pgstat.c aware. + * + * See comments for struct PgStat_KindInfo for details about the individual + * fields. + * + * XXX: It'd be nicer to define this outside of this file. But there doesn't + * seem to be a great way of doing that, given the split across multiple + * files. */ -void -pgstat_init(void) -{ - socklen_t alen; - struct addrinfo *addrs = NULL, - *addr, - hints; - int ret; - fd_set rset; - struct timeval tv; - char test_byte; - int sel_res; - int tries = 0; +static const PgStat_KindInfo pgstat_kind_infos[PGSTAT_NUM_KINDS] = { -#define TESTBYTEVAL ((char) 199) + /* stats kinds for variable-numbered objects */ - /* - * This static assertion verifies that we didn't mess up the calculations - * involved in selecting maximum payload sizes for our UDP messages. - * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would - * be silent performance loss from fragmentation, it seems worth having a - * compile-time cross-check that we didn't. - */ - StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE, - "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE"); + [PGSTAT_KIND_DATABASE] = { + .name = "database", - /* - * Create the UDP socket for sending and receiving statistic messages - */ - hints.ai_flags = AI_PASSIVE; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = 0; - hints.ai_addrlen = 0; - hints.ai_addr = NULL; - hints.ai_canonname = NULL; - hints.ai_next = NULL; - ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs); - if (ret || !addrs) - { - ereport(LOG, - (errmsg("could not resolve \"localhost\": %s", - gai_strerror(ret)))); - goto startup_failed; - } + .fixed_amount = false, + /* so pg_stat_database entries can be seen in all databases */ + .accessed_across_databases = true, - /* - * On some platforms, pg_getaddrinfo_all() may return multiple addresses - * only one of which will actually work (eg, both IPv6 and IPv4 addresses - * when kernel will reject IPv6). Worse, the failure may occur at the - * bind() or perhaps even connect() stage. So we must loop through the - * results till we find a working combination. We will generate LOG - * messages, but no error, for bogus combinations. - */ - for (addr = addrs; addr; addr = addr->ai_next) - { -#ifdef HAVE_UNIX_SOCKETS - /* Ignore AF_UNIX sockets, if any are returned. */ - if (addr->ai_family == AF_UNIX) - continue; -#endif + .shared_size = sizeof(PgStatShared_Database), + .shared_data_off = offsetof(PgStatShared_Database, stats), + .shared_data_len = sizeof(((PgStatShared_Database *) 0)->stats), + .pending_size = sizeof(PgStat_StatDBEntry), - if (++tries > 1) - ereport(LOG, - (errmsg("trying another address for the statistics collector"))); + .flush_pending_cb = pgstat_database_flush_cb, + .reset_timestamp_cb = pgstat_database_reset_timestamp_cb, + }, - /* - * Create the socket. - */ - if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not create socket for statistics collector: %m"))); - continue; - } + [PGSTAT_KIND_RELATION] = { + .name = "relation", - /* - * Bind it to a kernel assigned port on localhost and get the assigned - * port via getsockname(). - */ - if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not bind socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .fixed_amount = false, - alen = sizeof(pgStatAddr); - if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not get address of socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .shared_size = sizeof(PgStatShared_Relation), + .shared_data_off = offsetof(PgStatShared_Relation, stats), + .shared_data_len = sizeof(((PgStatShared_Relation *) 0)->stats), + .pending_size = sizeof(PgStat_TableStatus), - /* - * Connect the socket to its own address. This saves a few cycles by - * not having to respecify the target address on every send. This also - * provides a kernel-level check that only packets from this same - * address will be received. - */ - if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not connect socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .flush_pending_cb = pgstat_relation_flush_cb, + .delete_pending_cb = pgstat_relation_delete_pending_cb, + }, - /* - * Try to send and receive a one-byte test message on the socket. This - * is to catch situations where the socket can be created but will not - * actually pass data (for instance, because kernel packet filtering - * rules prevent it). - */ - test_byte = TESTBYTEVAL; + [PGSTAT_KIND_FUNCTION] = { + .name = "function", -retry1: - if (send(pgStatSock, &test_byte, 1, 0) != 1) - { - if (errno == EINTR) - goto retry1; /* if interrupted, just retry */ - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not send test message on socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .fixed_amount = false, - /* - * There could possibly be a little delay before the message can be - * received. We arbitrarily allow up to half a second before deciding - * it's broken. - */ - for (;;) /* need a loop to handle EINTR */ - { - FD_ZERO(&rset); - FD_SET(pgStatSock, &rset); - - tv.tv_sec = 0; - tv.tv_usec = 500000; - sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv); - if (sel_res >= 0 || errno != EINTR) - break; - } - if (sel_res < 0) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("select() failed in statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } - if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset)) - { - /* - * This is the case we actually think is likely, so take pains to - * give a specific message for it. - * - * errno will not be set meaningfully here, so don't use it. - */ - ereport(LOG, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("test message did not get through on socket for statistics collector"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .shared_size = sizeof(PgStatShared_Function), + .shared_data_off = offsetof(PgStatShared_Function, stats), + .shared_data_len = sizeof(((PgStatShared_Function *) 0)->stats), + .pending_size = sizeof(PgStat_BackendFunctionEntry), - test_byte++; /* just make sure variable is changed */ + .flush_pending_cb = pgstat_function_flush_cb, + }, -retry2: - if (recv(pgStatSock, &test_byte, 1, 0) != 1) - { - if (errno == EINTR) - goto retry2; /* if interrupted, just retry */ - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not receive test message on socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + [PGSTAT_KIND_REPLSLOT] = { + .name = "replslot", - if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */ - { - ereport(LOG, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("incorrect test message transmission on socket for statistics collector"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; - continue; - } + .fixed_amount = false, - /* If we get here, we have a working socket */ - break; - } + .accessed_across_databases = true, + .named_on_disk = true, - /* Did we find a working address? */ - if (!addr || pgStatSock == PGINVALID_SOCKET) - goto startup_failed; + .shared_size = sizeof(PgStatShared_ReplSlot), + .shared_data_off = offsetof(PgStatShared_ReplSlot, stats), + .shared_data_len = sizeof(((PgStatShared_ReplSlot *) 0)->stats), - /* - * Set the socket to non-blocking IO. This ensures that if the collector - * falls behind, statistics messages will be discarded; backends won't - * block waiting to send messages to the collector. - */ - if (!pg_set_noblock(pgStatSock)) - { - ereport(LOG, - (errcode_for_socket_access(), - errmsg("could not set statistics collector socket to nonblocking mode: %m"))); - goto startup_failed; - } + .reset_timestamp_cb = pgstat_replslot_reset_timestamp_cb, + .to_serialized_name = pgstat_replslot_to_serialized_name_cb, + .from_serialized_name = pgstat_replslot_from_serialized_name_cb, + }, - /* - * Try to ensure that the socket's receive buffer is at least - * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose - * data. Use of UDP protocol means that we are willing to lose data under - * heavy load, but we don't want it to happen just because of ridiculously - * small default buffer sizes (such as 8KB on older Windows versions). - */ - { - int old_rcvbuf; - int new_rcvbuf; - socklen_t rcvbufsize = sizeof(old_rcvbuf); + [PGSTAT_KIND_SUBSCRIPTION] = { + .name = "subscription", - if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF, - (char *) &old_rcvbuf, &rcvbufsize) < 0) - { - ereport(LOG, - (errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF"))); - /* if we can't get existing size, always try to set it */ - old_rcvbuf = 0; - } + .fixed_amount = false, + /* so pg_stat_subscription_stats entries can be seen in all databases */ + .accessed_across_databases = true, - new_rcvbuf = PGSTAT_MIN_RCVBUF; - if (old_rcvbuf < new_rcvbuf) - { - if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF, - (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0) - ereport(LOG, - (errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF"))); - } - } + .shared_size = sizeof(PgStatShared_Subscription), + .shared_data_off = offsetof(PgStatShared_Subscription, stats), + .shared_data_len = sizeof(((PgStatShared_Subscription *) 0)->stats), + .pending_size = sizeof(PgStat_BackendSubEntry), - pg_freeaddrinfo_all(hints.ai_family, addrs); + .flush_pending_cb = pgstat_subscription_flush_cb, + .reset_timestamp_cb = pgstat_subscription_reset_timestamp_cb, + }, - /* Now that we have a long-lived socket, tell fd.c about it. */ - ReserveExternalFD(); - return; + /* stats for fixed-numbered (mostly 1) objects */ -startup_failed: - ereport(LOG, - (errmsg("disabling statistics collector for lack of working socket"))); + [PGSTAT_KIND_ARCHIVER] = { + .name = "archiver", - if (addrs) - pg_freeaddrinfo_all(hints.ai_family, addrs); + .fixed_amount = true, - if (pgStatSock != PGINVALID_SOCKET) - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + .reset_all_cb = pgstat_archiver_reset_all_cb, + .snapshot_cb = pgstat_archiver_snapshot_cb, + }, - /* - * Adjust GUC variables to suppress useless activity, and for debugging - * purposes (seeing track_counts off is a clue that we failed here). We - * use PGC_S_OVERRIDE because there is no point in trying to turn it back - * on from postgresql.conf without a restart. - */ - SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE); -} + [PGSTAT_KIND_BGWRITER] = { + .name = "bgwriter", -/* - * subroutine for pgstat_reset_all - */ -static void -pgstat_reset_remove_files(const char *directory) -{ - DIR *dir; - struct dirent *entry; - char fname[MAXPGPATH * 2]; + .fixed_amount = true, - dir = AllocateDir(directory); - while ((entry = ReadDir(dir, directory)) != NULL) - { - int nchars; - Oid tmp_oid; + .reset_all_cb = pgstat_bgwriter_reset_all_cb, + .snapshot_cb = pgstat_bgwriter_snapshot_cb, + }, - /* - * Skip directory entries that don't match the file names we write. - * See get_dbstat_filename for the database-specific pattern. - */ - if (strncmp(entry->d_name, "global.", 7) == 0) - nchars = 7; - else - { - nchars = 0; - (void) sscanf(entry->d_name, "db_%u.%n", - &tmp_oid, &nchars); - if (nchars <= 0) - continue; - /* %u allows leading whitespace, so reject that */ - if (strchr("0123456789", entry->d_name[3]) == NULL) - continue; - } + [PGSTAT_KIND_CHECKPOINTER] = { + .name = "checkpointer", - if (strcmp(entry->d_name + nchars, "tmp") != 0 && - strcmp(entry->d_name + nchars, "stat") != 0) - continue; + .fixed_amount = true, - snprintf(fname, sizeof(fname), "%s/%s", directory, - entry->d_name); - unlink(fname); - } - FreeDir(dir); -} + .reset_all_cb = pgstat_checkpointer_reset_all_cb, + .snapshot_cb = pgstat_checkpointer_snapshot_cb, + }, + + [PGSTAT_KIND_SLRU] = { + .name = "slru", + + .fixed_amount = true, + + .reset_all_cb = pgstat_slru_reset_all_cb, + .snapshot_cb = pgstat_slru_snapshot_cb, + }, + + [PGSTAT_KIND_WAL] = { + .name = "wal", + + .fixed_amount = true, + + .reset_all_cb = pgstat_wal_reset_all_cb, + .snapshot_cb = pgstat_wal_snapshot_cb, + }, +}; + + +/* ------------------------------------------------------------ + * Functions managing the state of the stats system for all backends. + * ------------------------------------------------------------ + */ /* - * Remove the stats files. This is currently used only if WAL - * recovery is needed after a crash. + * Read on-disk stats into memory at server start. + * + * Should only be called by the startup process or in single user mode. */ void -pgstat_reset_all(void) +pgstat_restore_stats(void) { - pgstat_reset_remove_files(pgstat_stat_directory); - pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY); + pgstat_read_statsfile(); } -#ifdef EXEC_BACKEND - /* - * Format up the arglist for, then fork and exec, statistics collector process + * Remove the stats file. This is currently used only if WAL recovery is + * needed after a crash. + * + * Should only be called by the startup process or in single user mode. */ -static pid_t -pgstat_forkexec(void) +void +pgstat_discard_stats(void) { - char *av[10]; - int ac = 0; - - av[ac++] = "postgres"; - av[ac++] = "--forkcol"; - av[ac++] = NULL; /* filled in by postmaster_forkexec */ + int ret; - av[ac] = NULL; - Assert(ac < lengthof(av)); + /* NB: this needs to be done even in single user mode */ - return postmaster_forkexec(ac, av); + ret = unlink(PGSTAT_STAT_PERMANENT_FILENAME); + if (ret != 0) + { + if (errno == ENOENT) + elog(DEBUG2, + "didn't need to unlink permanent stats file \"%s\" - didn't exist", + PGSTAT_STAT_PERMANENT_FILENAME); + else + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not unlink permanent statistics file \"%s\": %m", + PGSTAT_STAT_PERMANENT_FILENAME))); + } + else + { + ereport(DEBUG2, + (errcode_for_file_access(), + errmsg("unlinked permanent statistics file \"%s\"", + PGSTAT_STAT_PERMANENT_FILENAME))); + } } -#endif /* EXEC_BACKEND */ - /* - * Called from postmaster at startup or after an existing collector - * died. Attempt to fire up a fresh statistics collector. - * - * Returns PID of child process, or 0 if fail. + * pgstat_before_server_shutdown() needs to be called by exactly one process + * during regular server shutdowns. Otherwise all stats will be lost. * - * Note: if fail, we will be called again from the postmaster main loop. + * We currently only write out stats for proc_exit(0). We might want to change + * that at some point... But right now pgstat_discard_stats() would be called + * during the start after a disorderly shutdown, anyway. */ -int -pgstat_start(void) +void +pgstat_before_server_shutdown(int code, Datum arg) { - time_t curtime; - pid_t pgStatPid; + Assert(pgStatLocal.shmem != NULL); + Assert(!pgStatLocal.shmem->is_shutdown); /* - * Check that the socket is there, else pgstat_init failed and we can do - * nothing useful. + * Stats should only be reported after pgstat_initialize() and before + * pgstat_shutdown(). This is a convenient point to catch most violations + * of this rule. */ - if (pgStatSock == PGINVALID_SOCKET) - return 0; + Assert(pgstat_is_initialized && !pgstat_is_shutdown); - /* - * Do nothing if too soon since last collector start. This is a safety - * valve to protect against continuous respawn attempts if the collector - * is dying immediately at launch. Note that since we will be re-called - * from the postmaster main loop, we will get another chance later. - */ - curtime = time(NULL); - if ((unsigned int) (curtime - last_pgstat_start_time) < - (unsigned int) PGSTAT_RESTART_INTERVAL) - return 0; - last_pgstat_start_time = curtime; + /* flush out our own pending changes before writing out */ + pgstat_report_stat(true); /* - * Okay, fork off the collector. + * Only write out file during normal shutdown. Don't even signal that + * we've shutdown during irregular shutdowns, because the shutdown + * sequence isn't coordinated to ensure this backend shuts down last. */ -#ifdef EXEC_BACKEND - switch ((pgStatPid = pgstat_forkexec())) -#else - switch ((pgStatPid = fork_process())) -#endif + if (code == 0) { - case -1: - ereport(LOG, - (errmsg("could not fork statistics collector: %m"))); - return 0; - -#ifndef EXEC_BACKEND - case 0: - /* in postmaster child ... */ - InitPostmasterChild(); - - /* Close the postmaster's sockets */ - ClosePostmasterPorts(false); - - /* Drop our connection to postmaster's shared memory, as well */ - dsm_detach_all(); - PGSharedMemoryDetach(); - - PgstatCollectorMain(0, NULL); - break; -#endif - - default: - return (int) pgStatPid; + pgStatLocal.shmem->is_shutdown = true; + pgstat_write_statsfile(); } - - /* shouldn't get here */ - return 0; -} - -void -allow_immediate_pgstat_restart(void) -{ - last_pgstat_start_time = 0; } @@ -701,6 +486,7 @@ static void pgstat_shutdown_hook(int code, Datum arg) { Assert(!pgstat_is_shutdown); + Assert(IsUnderPostmaster || !IsPostmasterEnvironment); /* * If we got as far as discovering our own database ID, we can flush out @@ -709,7 +495,15 @@ pgstat_shutdown_hook(int code, Datum arg) * failed backend starts might never get counted.) */ if (OidIsValid(MyDatabaseId)) - pgstat_report_stat(true); + pgstat_report_disconnect(MyDatabaseId); + + pgstat_report_stat(true); + + /* there shouldn't be any pending changes left */ + Assert(dlist_is_empty(&pgStatPending)); + dlist_init(&pgStatPending); + + pgstat_detach_shmem(); #ifdef USE_ASSERT_CHECKING pgstat_is_shutdown = true; @@ -727,6 +521,8 @@ pgstat_initialize(void) { Assert(!pgstat_is_initialized); + pgstat_attach_shmem(); + pgstat_init_wal(); /* Set up a process-exit hook to clean up */ @@ -745,331 +541,119 @@ pgstat_initialize(void) /* * Must be called by processes that performs DML: tcop/postgres.c, logical - * receiver processes, SPI worker, etc. to send the so far collected - * per-table and function usage statistics to the collector. Note that this - * is called only when not within a transaction, so it is fair to use - * transaction stop time as an approximation of current time. + * receiver processes, SPI worker, etc. to flush pending statistics updates to + * shared memory. * - * "disconnect" is "true" only for the last call before the backend - * exits. This makes sure that no data is lost and that interrupted - * sessions are reported correctly. + * Unless called with 'force', pending stats updates are flushed happen once + * per PGSTAT_MIN_INTERVAL (1000ms). When not forced, stats flushes do not + * block on lock acquisition, except if stats updates have been pending for + * longer than PGSTAT_MAX_INTERVAL (60000ms). + * + * Whenever pending stats updates remain at the end of pgstat_report_stat() a + * suggested idle timeout is returned. Currently this is always + * PGSTAT_IDLE_INTERVAL (10000ms). Callers can use the returned time to set up + * a timeout after which to call pgstat_report_stat(true), but are not + * required to to do so. + * + * Note that this is called only when not within a transaction, so it is fair + * to use transaction stop time as an approximation of current time. */ -void -pgstat_report_stat(bool disconnect) +long +pgstat_report_stat(bool force) { - static TimestampTz last_report = 0; - + static TimestampTz pending_since = 0; + static TimestampTz last_flush = 0; + bool partial_flush; TimestampTz now; + bool nowait; pgstat_assert_is_up(); + Assert(!IsTransactionBlock()); - /* - * Don't expend a clock check if nothing to do. - */ - if (!have_relation_stats && - pgStatXactCommit == 0 && pgStatXactRollback == 0 && - !pgstat_have_pending_wal() && - !have_function_stats && !disconnect) - return; - - /* - * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL - * msec since we last sent one, or the backend is about to exit. - */ - now = GetCurrentTransactionStopTimestamp(); - if (!disconnect && - !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL)) - return; - - last_report = now; - - if (disconnect) - pgstat_report_disconnect(MyDatabaseId); - - /* First, send relation statistics */ - pgstat_send_tabstats(now, disconnect); - - /* Now, send function statistics */ - pgstat_send_funcstats(); - - /* Send WAL statistics */ - pgstat_report_wal(true); - - /* Finally send SLRU statistics */ - pgstat_send_slru(); -} - -/* - * Will tell the collector about objects he can get rid of. - */ -void -pgstat_vacuum_stat(void) -{ - HTAB *htab; - PgStat_MsgTabpurge msg; - PgStat_MsgFuncpurge f_msg; - HASH_SEQ_STATUS hstat; - PgStat_StatDBEntry *dbentry; - PgStat_StatTabEntry *tabentry; - PgStat_StatFuncEntry *funcentry; - int len; - - if (pgStatSock == PGINVALID_SOCKET) - return; - - /* - * If not done for this transaction, read the statistics collector stats - * file into some hash tables. - */ - backend_read_statsfile(); - - /* - * Read pg_database and make a list of OIDs of all existing databases - */ - htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid); - - /* - * Search the database hash table for dead databases and tell the - * collector to drop them. - */ - hash_seq_init(&hstat, pgStatDBHash); - while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) + /* Don't expend a clock check if nothing to do */ + if (dlist_is_empty(&pgStatPending) && + !have_slrustats && + !pgstat_have_pending_wal()) { - Oid dbid = dbentry->databaseid; - - CHECK_FOR_INTERRUPTS(); - - /* the DB entry for shared tables (with InvalidOid) is never dropped */ - if (OidIsValid(dbid) && - hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL) - pgstat_drop_database(dbid); + Assert(pending_since == 0); + return 0; } - /* Clean up */ - hash_destroy(htab); - /* - * Search for all the dead replication slots in stats hashtable and tell - * the stats collector to drop them. + * There should never be stats to report once stats are shut down. Can't + * assert that before the checks above, as there is an unconditional + * pgstat_report_stat() call in pgstat_shutdown_hook() - which at least + * the process that ran pgstat_before_server_shutdown() will still call. */ - if (replSlotStatHash) - { - PgStat_StatReplSlotEntry *slotentry; + Assert(!pgStatLocal.shmem->is_shutdown); - hash_seq_init(&hstat, replSlotStatHash); - while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) - { - CHECK_FOR_INTERRUPTS(); - - if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL) - { - PgStat_MsgReplSlot msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - namestrcpy(&msg.m_slotname, NameStr(slotentry->slotname)); - msg.m_create = false; - msg.m_drop = true; - pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); - } - } - } + now = GetCurrentTransactionStopTimestamp(); - /* - * Repeat the above steps for subscriptions, if subscription stats are - * being collected. - */ - if (subscriptionStatHash) + if (!force) { - PgStat_StatSubEntry *subentry; - - /* - * Read pg_subscription and make a list of OIDs of all existing - * subscriptions. - */ - htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); - - hash_seq_init(&hstat, subscriptionStatHash); - while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) + if (pending_since > 0 && + TimestampDifferenceExceeds(pending_since, now, PGSTAT_MAX_INTERVAL)) { - CHECK_FOR_INTERRUPTS(); - - if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL) - pgstat_drop_subscription(subentry->subid); + /* don't keep pending updates longer than PGSTAT_MAX_INTERVAL */ + force = true; } + else if (last_flush > 0 && + !TimestampDifferenceExceeds(last_flush, now, PGSTAT_MIN_INTERVAL)) + { + /* don't flush too frequently */ + if (pending_since == 0) + pending_since = now; - hash_destroy(htab); + return PGSTAT_IDLE_INTERVAL; + } } - /* - * Lookup our own database entry; if not found, nothing more to do. - */ - dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, - (void *) &MyDatabaseId, - HASH_FIND, NULL); - if (dbentry == NULL || dbentry->tables == NULL) - return; - - /* - * Similarly to above, make a list of all known relations in this DB. - */ - htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid); - - /* - * Initialize our messages table counter to zero - */ - msg.m_nentries = 0; + pgstat_update_dbstats(now); - /* - * Check for all tables listed in stats hashtable if they still exist. - */ - hash_seq_init(&hstat, dbentry->tables); - while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL) - { - Oid tabid = tabentry->tableid; + /* don't wait for lock acquisition when !force */ + nowait = !force; - CHECK_FOR_INTERRUPTS(); + partial_flush = false; - if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL) - continue; + /* flush database / relation / function / ... stats */ + partial_flush |= pgstat_flush_pending_entries(nowait); - /* - * Not there, so add this table's Oid to the message - */ - msg.m_tableid[msg.m_nentries++] = tabid; + /* flush wal stats */ + partial_flush |= pgstat_flush_wal(nowait); - /* - * If the message is full, send it out and reinitialize to empty - */ - if (msg.m_nentries >= PGSTAT_NUM_TABPURGE) - { - len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) - + msg.m_nentries * sizeof(Oid); + /* flush SLRU stats */ + partial_flush |= pgstat_slru_flush(nowait); - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE); - msg.m_databaseid = MyDatabaseId; - pgstat_send(&msg, len); - - msg.m_nentries = 0; - } - } + last_flush = now; /* - * Send the rest + * If some of the pending stats could not be flushed due to lock + * contention, let the caller know when to retry. */ - if (msg.m_nentries > 0) + if (partial_flush) { - len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) - + msg.m_nentries * sizeof(Oid); - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE); - msg.m_databaseid = MyDatabaseId; - pgstat_send(&msg, len); - } - - /* Clean up */ - hash_destroy(htab); - - /* - * Now repeat the above steps for functions. However, we needn't bother - * in the common case where no function stats are being collected. - */ - if (dbentry->functions != NULL && - hash_get_num_entries(dbentry->functions) > 0) - { - htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid); - - pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE); - f_msg.m_databaseid = MyDatabaseId; - f_msg.m_nentries = 0; - - hash_seq_init(&hstat, dbentry->functions); - while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL) - { - Oid funcid = funcentry->functionid; - - CHECK_FOR_INTERRUPTS(); - - if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL) - continue; + /* force should have prevented us from getting here */ + Assert(!force); - /* - * Not there, so add this function's Oid to the message - */ - f_msg.m_functionid[f_msg.m_nentries++] = funcid; + /* remember since when stats have been pending */ + if (pending_since == 0) + pending_since = now; - /* - * If the message is full, send it out and reinitialize to empty - */ - if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE) - { - len = offsetof(PgStat_MsgFuncpurge, m_functionid[0]) - + f_msg.m_nentries * sizeof(Oid); - - pgstat_send(&f_msg, len); - - f_msg.m_nentries = 0; - } - } - - /* - * Send the rest - */ - if (f_msg.m_nentries > 0) - { - len = offsetof(PgStat_MsgFuncpurge, m_functionid[0]) - + f_msg.m_nentries * sizeof(Oid); + return PGSTAT_IDLE_INTERVAL; + } - pgstat_send(&f_msg, len); - } + pending_since = 0; - hash_destroy(htab); - } + return 0; } /* - * Collect the OIDs of all objects listed in the specified system catalog - * into a temporary hash table. Caller should hash_destroy the result - * when done with it. (However, we make the table in CurrentMemoryContext - * so that it will be freed properly in event of an error.) + * Only for use by pgstat_reset_counters() */ -static HTAB * -pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid) +static bool +match_db_entries(PgStatShared_HashEntry *entry, Datum match_data) { - HTAB *htab; - HASHCTL hash_ctl; - Relation rel; - TableScanDesc scan; - HeapTuple tup; - Snapshot snapshot; - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(Oid); - hash_ctl.hcxt = CurrentMemoryContext; - htab = hash_create("Temporary table of OIDs", - PGSTAT_TAB_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - rel = table_open(catalogid, AccessShareLock); - snapshot = RegisterSnapshot(GetLatestSnapshot()); - scan = table_beginscan(rel, snapshot, 0, NULL); - while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Oid thisoid; - bool isnull; - - thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull); - Assert(!isnull); - - CHECK_FOR_INTERRUPTS(); - - (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL); - } - table_endscan(scan); - UnregisterSnapshot(snapshot); - table_close(rel, AccessShareLock); - - return htab; + return entry->key.dboid == DatumGetObjectId(MyDatabaseId); } /* @@ -1081,14 +665,11 @@ pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid) void pgstat_reset_counters(void) { - PgStat_MsgResetcounter msg; - - if (pgStatSock == PGINVALID_SOCKET) - return; + TimestampTz ts = GetCurrentTimestamp(); - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER); - msg.m_databaseid = MyDatabaseId; - pgstat_send(&msg, sizeof(msg)); + pgstat_reset_matching_entries(match_db_entries, + ObjectIdGetDatum(MyDatabaseId), + ts); } /* @@ -1103,38 +684,17 @@ pgstat_reset_counters(void) void pgstat_reset(PgStat_Kind kind, Oid dboid, Oid objoid) { + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); + TimestampTz ts = GetCurrentTimestamp(); - if (pgStatSock == PGINVALID_SOCKET) - return; + /* not needed atm, and doesn't make sense with the current signature */ + Assert(!pgstat_get_kind_info(kind)->fixed_amount); - switch (kind) - { - case PGSTAT_KIND_FUNCTION: - case PGSTAT_KIND_RELATION: - { - PgStat_MsgResetsinglecounter msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER); - msg.m_databaseid = dboid; - msg.m_resettype = kind; - msg.m_objectid = objoid; - pgstat_send(&msg, sizeof(msg)); - } - break; - - case PGSTAT_KIND_SUBSCRIPTION: - { - PgStat_MsgResetsubcounter msg; - - Assert(dboid == InvalidOid); - msg.m_subid = objoid; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER); - } - break; - - default: - elog(ERROR, "unexpected"); - } + /* reset the "single counter" */ + pgstat_reset_entry(kind, dboid, objoid, ts); + + if (!kind_info->accessed_across_databases) + pgstat_reset_database_timestamp(dboid, ts); } /* @@ -1146,87 +706,20 @@ pgstat_reset(PgStat_Kind kind, Oid dboid, Oid objoid) void pgstat_reset_of_kind(PgStat_Kind kind) { - if (pgStatSock == PGINVALID_SOCKET) - return; + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); + TimestampTz ts = GetCurrentTimestamp(); - switch (kind) - { - case PGSTAT_KIND_ARCHIVER: - case PGSTAT_KIND_BGWRITER: - case PGSTAT_KIND_CHECKPOINTER: - case PGSTAT_KIND_WAL: - { - PgStat_MsgResetsharedcounter msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER); - msg.m_resettarget = kind; - pgstat_send(&msg, sizeof(msg)); - } - break; - case PGSTAT_KIND_SLRU: - { - PgStat_MsgResetslrucounter msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSLRUCOUNTER); - msg.m_index = -1; - pgstat_send(&msg, sizeof(msg)); - } - break; - case PGSTAT_KIND_REPLSLOT: - { - PgStat_MsgResetreplslotcounter msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER); - msg.clearall = true; - pgstat_send(&msg, sizeof(msg)); - } - break; - - case PGSTAT_KIND_SUBSCRIPTION: - { - PgStat_MsgResetsubcounter msg; - - msg.m_subid = InvalidOid; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER); - - pgstat_send(&msg, sizeof(msg)); - } - break; - - default: - elog(ERROR, "unexpected"); - } + if (kind_info->fixed_amount) + kind_info->reset_all_cb(ts); + else + pgstat_reset_entries_of_kind(kind, ts); } -/* - * Send some junk data to the collector to increase traffic. - */ -void -pgstat_ping(void) -{ - PgStat_MsgDummy msg; - if (pgStatSock == PGINVALID_SOCKET) - return; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY); - pgstat_send(&msg, sizeof(msg)); -} - -/* - * Notify collector that we need fresh data. +/* ------------------------------------------------------------ + * Fetching of stats + * ------------------------------------------------------------ */ -static void -pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid) -{ - PgStat_MsgInquiry msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY); - msg.clock_time = clock_time; - msg.cutoff_time = cutoff_time; - msg.databaseid = databaseid; - pgstat_send(&msg, sizeof(msg)); -} /* * Discard any data collected in the current transaction. Any subsequent @@ -1240,15 +733,19 @@ pgstat_clear_snapshot(void) { pgstat_assert_is_up(); + memset(&pgStatLocal.snapshot.fixed_valid, 0, + sizeof(pgStatLocal.snapshot.fixed_valid)); + pgStatLocal.snapshot.stats = NULL; + pgStatLocal.snapshot.mode = PGSTAT_FETCH_CONSISTENCY_NONE; + /* Release memory, if any was allocated */ - if (pgStatLocalContext) - MemoryContextDelete(pgStatLocalContext); + if (pgStatLocal.snapshot.context) + { + MemoryContextDelete(pgStatLocal.snapshot.context); - /* Reset variables */ - pgStatLocalContext = NULL; - pgStatDBHash = NULL; - replSlotStatHash = NULL; - subscriptionStatHash = NULL; + /* Reset variables */ + pgStatLocal.snapshot.context = NULL; + } /* * Historically the backend_status.c facilities lived in this file, and @@ -1258,844 +755,448 @@ pgstat_clear_snapshot(void) pgstat_clear_backend_activity_snapshot(); } -/* - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for one database or NULL. NULL doesn't mean - * that the database doesn't exist, just that there are no statistics, so the - * caller is better off to report ZERO instead. - */ -PgStat_StatDBEntry * -pgstat_fetch_stat_dbentry(Oid dbid) +void * +pgstat_fetch_entry(PgStat_Kind kind, Oid dboid, Oid objoid) { - /* - * If not done for this transaction, read the statistics collector stats - * file into some hash tables. - */ - backend_read_statsfile(); + PgStat_HashKey key; + PgStat_EntryRef *entry_ref; + void *stats_data; + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); - /* - * Lookup the requested database; return NULL if not found - */ - return (PgStat_StatDBEntry *) hash_search(pgStatDBHash, - (void *) &dbid, - HASH_FIND, NULL); -} + /* should be called from backends */ + Assert(IsUnderPostmaster || !IsPostmasterEnvironment); + AssertArg(!kind_info->fixed_amount); -/* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the global statistics struct. - */ -PgStat_GlobalStats * -pgstat_fetch_global(void) -{ - backend_read_statsfile(); + pgstat_prep_snapshot(); - return &globalStats; -} + key.kind = kind; + key.dboid = dboid; + key.objoid = objoid; -/* - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for one table or NULL. NULL doesn't mean - * that the table doesn't exist, just that there are no statistics, so the - * caller is better off to report ZERO instead. - */ -PgStat_StatTabEntry * -pgstat_fetch_stat_tabentry(Oid relid) -{ - Oid dbid; - PgStat_StatDBEntry *dbentry; - PgStat_StatTabEntry *tabentry; + /* if we need to build a full snapshot, do so */ + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT) + pgstat_build_snapshot(); - /* - * If not done for this transaction, read the statistics collector stats - * file into some hash tables. - */ - backend_read_statsfile(); - - /* - * Lookup our database, then look in its table hash table. - */ - dbid = MyDatabaseId; - dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, - (void *) &dbid, - HASH_FIND, NULL); - if (dbentry != NULL && dbentry->tables != NULL) + /* if caching is desired, look up in cache */ + if (pgstat_fetch_consistency > PGSTAT_FETCH_CONSISTENCY_NONE) { - tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, - (void *) &relid, - HASH_FIND, NULL); - if (tabentry) - return tabentry; - } + PgStat_SnapshotEntry *entry = NULL; - /* - * If we didn't find it, maybe it's a shared table. - */ - dbid = InvalidOid; - dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, - (void *) &dbid, - HASH_FIND, NULL); - if (dbentry != NULL && dbentry->tables != NULL) - { - tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, - (void *) &relid, - HASH_FIND, NULL); - if (tabentry) - return tabentry; - } + entry = pgstat_snapshot_lookup(pgStatLocal.snapshot.stats, key); - return NULL; -} + if (entry) + return entry->data; + /* + * If we built a full snapshot and the key is not in + * pgStatLocal.snapshot.stats, there are no matching stats. + */ + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT) + return NULL; + } -/* - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for one function or NULL. - */ -PgStat_StatFuncEntry * -pgstat_fetch_stat_funcentry(Oid func_id) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatFuncEntry *funcentry = NULL; + pgStatLocal.snapshot.mode = pgstat_fetch_consistency; - /* load the stats file if needed */ - backend_read_statsfile(); + entry_ref = pgstat_get_entry_ref(kind, dboid, objoid, false, NULL); - /* Lookup our database, then find the requested function. */ - dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); - if (dbentry != NULL && dbentry->functions != NULL) + if (entry_ref == NULL || entry_ref->shared_entry->dropped) { - funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions, - (void *) &func_id, - HASH_FIND, NULL); - } - - return funcentry; -} - -/* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the archiver statistics struct. - */ -PgStat_ArchiverStats * -pgstat_fetch_stat_archiver(void) -{ - backend_read_statsfile(); + /* create empty entry when using PGSTAT_FETCH_CONSISTENCY_CACHE */ + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_CACHE) + { + PgStat_SnapshotEntry *entry = NULL; + bool found; - return &archiverStats; -} + entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, key, &found); + Assert(!found); + entry->data = NULL; + } + return NULL; + } -/* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the bgwriter statistics struct. - */ -PgStat_BgWriterStats * -pgstat_fetch_stat_bgwriter(void) -{ - backend_read_statsfile(); + /* + * Allocate in caller's context for PGSTAT_FETCH_CONSISTENCY_NONE, + * otherwise we could quickly end up with a fair bit of memory used due to + * repeated accesses. + */ + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE) + stats_data = palloc(kind_info->shared_data_len); + else + stats_data = MemoryContextAlloc(pgStatLocal.snapshot.context, + kind_info->shared_data_len); + memcpy(stats_data, + pgstat_get_entry_data(kind, entry_ref->shared_stats), + kind_info->shared_data_len); - return &globalStats.bgwriter; -} + if (pgstat_fetch_consistency > PGSTAT_FETCH_CONSISTENCY_NONE) + { + PgStat_SnapshotEntry *entry = NULL; + bool found; -/* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the checkpointer statistics struct. - */ -PgStat_CheckpointerStats * -pgstat_fetch_stat_checkpointer(void) -{ - backend_read_statsfile(); + entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, key, &found); + entry->data = stats_data; + } - return &globalStats.checkpointer; + return stats_data; } /* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the WAL statistics struct. + * If a stats snapshot has been taken, return the timestamp at which that was + * done, and set *have_snapshot to true. Otherwise *have_snapshot is set to + * false. */ -PgStat_WalStats * -pgstat_fetch_stat_wal(void) +TimestampTz +pgstat_get_stat_snapshot_timestamp(bool *have_snapshot) { - backend_read_statsfile(); + if (pgStatLocal.snapshot.mode == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT) + { + *have_snapshot = true; + return pgStatLocal.snapshot.snapshot_timestamp; + } - return &walStats; -} + *have_snapshot = false; -/* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the slru statistics struct. - */ -PgStat_SLRUStats * -pgstat_fetch_slru(void) -{ - backend_read_statsfile(); - - return slruStats; + return 0; } /* - * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the replication slot statistics struct. + * Ensure snapshot for fixed-numbered 'kind' exists. + * + * Typically used by the pgstat_fetch_* functions for a kind of stats, before + * massaging the data into the desired format. */ -PgStat_StatReplSlotEntry * -pgstat_fetch_replslot(NameData slotname) +void +pgstat_snapshot_fixed(PgStat_Kind kind) { - backend_read_statsfile(); - - return pgstat_get_replslot_entry(slotname, false); -} + AssertArg(pgstat_is_kind_valid(kind)); + AssertArg(pgstat_get_kind_info(kind)->fixed_amount); -/* - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for one subscription or NULL. - */ -PgStat_StatSubEntry * -pgstat_fetch_stat_subscription(Oid subid) -{ - /* Load the stats file if needed */ - backend_read_statsfile(); + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT) + pgstat_build_snapshot(); + else + pgstat_build_snapshot_fixed(kind); - return pgstat_get_subscription_entry(subid, false); + Assert(pgStatLocal.snapshot.fixed_valid[kind]); } - -/* ------------------------------------------------------------ - * Helper / infrastructure functions - * ------------------------------------------------------------ - */ - -/* - * Create pgStatLocalContext, if not already done. - */ static void -pgstat_setup_memcxt(void) +pgstat_prep_snapshot(void) { - if (!pgStatLocalContext) - pgStatLocalContext = AllocSetContextCreate(TopMemoryContext, - "Statistics snapshot", - ALLOCSET_SMALL_SIZES); -} + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE || + pgStatLocal.snapshot.stats != NULL) + return; -/* - * Stats should only be reported after pgstat_initialize() and before - * pgstat_shutdown(). This check is put in a few central places to catch - * violations of this rule more easily. - */ -#ifdef USE_ASSERT_CHECKING -void -pgstat_assert_is_up(void) -{ - Assert(pgstat_is_initialized && !pgstat_is_shutdown); -} -#endif + if (!pgStatLocal.snapshot.context) + pgStatLocal.snapshot.context = AllocSetContextCreate(TopMemoryContext, + "PgStat Snapshot", + ALLOCSET_SMALL_SIZES); -/* - * Set common header fields in a statistics message - */ -void -pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype) -{ - hdr->m_type = mtype; + pgStatLocal.snapshot.stats = + pgstat_snapshot_create(pgStatLocal.snapshot.context, + PGSTAT_SNAPSHOT_HASH_SIZE, + NULL); } - -/* - * Send out one statistics message to the collector - */ -void -pgstat_send(void *msg, int len) +static void +pgstat_build_snapshot(void) { - int rc; + dshash_seq_status hstat; + PgStatShared_HashEntry *p; - pgstat_assert_is_up(); + /* should only be called when we need a snapshot */ + Assert(pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT); - if (pgStatSock == PGINVALID_SOCKET) + /* snapshot already built */ + if (pgStatLocal.snapshot.mode == PGSTAT_FETCH_CONSISTENCY_SNAPSHOT) return; - ((PgStat_MsgHdr *) msg)->m_size = len; + pgstat_prep_snapshot(); - /* We'll retry after EINTR, but ignore all other failures */ - do - { - rc = send(pgStatSock, msg, len, 0); - } while (rc < 0 && errno == EINTR); + Assert(pgStatLocal.snapshot.stats->members == 0); -#ifdef USE_ASSERT_CHECKING - /* In debug builds, log send failures ... */ - if (rc < 0) - elog(LOG, "could not send to statistics collector: %m"); -#endif -} - -/* - * Start up the statistics collector process. This is the body of the - * postmaster child process. - * - * The argc/argv parameters are valid only in EXEC_BACKEND case. - */ -NON_EXEC_STATIC void -PgstatCollectorMain(int argc, char *argv[]) -{ - int len; - PgStat_Msg msg; - int wr; - WaitEvent event; - WaitEventSet *wes; - - /* - * Ignore all signals usually bound to some action in the postmaster, - * except SIGHUP and SIGQUIT. Note we don't need a SIGUSR1 handler to - * support latch operations, because we only use a local latch. - */ - pqsignal(SIGHUP, SignalHandlerForConfigReload); - pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, SIG_IGN); - pqsignal(SIGQUIT, SignalHandlerForShutdownRequest); - pqsignal(SIGALRM, SIG_IGN); - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); - pqsignal(SIGUSR2, SIG_IGN); - /* Reset some signals that are accepted by postmaster but not here */ - pqsignal(SIGCHLD, SIG_DFL); - PG_SETMASK(&UnBlockSig); - - MyBackendType = B_STATS_COLLECTOR; - init_ps_display(NULL); - - /* - * Read in existing stats files or initialize the stats to zero. - */ - pgStatRunningInCollector = true; - pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true); - - /* Prepare to wait for our latch or data in our socket. */ - wes = CreateWaitEventSet(CurrentMemoryContext, 3); - AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL); + pgStatLocal.snapshot.snapshot_timestamp = GetCurrentTimestamp(); /* - * Loop to process messages until we get SIGQUIT or detect ungraceful - * death of our parent postmaster. - * - * For performance reasons, we don't want to do ResetLatch/WaitLatch after - * every message; instead, do that only after a recv() fails to obtain a - * message. (This effectively means that if backends are sending us stuff - * like mad, we won't notice postmaster death until things slack off a - * bit; which seems fine.) To do that, we have an inner loop that - * iterates as long as recv() succeeds. We do check ConfigReloadPending - * inside the inner loop, which means that such interrupts will get - * serviced but the latch won't get cleared until next time there is a - * break in the action. + * Snapshot all variable stats. */ - for (;;) + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) { - /* Clear any already-pending wakeups */ - ResetLatch(MyLatch); - - /* - * Quit if we get SIGQUIT from the postmaster. - */ - if (ShutdownRequestPending) - break; + PgStat_Kind kind = p->key.kind; + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); + bool found; + PgStat_SnapshotEntry *entry; + PgStatShared_Common *stats_data; /* - * Inner loop iterates as long as we keep getting messages, or until - * ShutdownRequestPending becomes set. + * Check if the stats object should be included in the snapshot. + * Unless the stats kind can be accessed from all databases (e.g., + * database stats themselves), we only include stats for the current + * database or objects not associated with a database (e.g. shared + * relations). */ - while (!ShutdownRequestPending) - { - /* - * Reload configuration if we got SIGHUP from the postmaster. - */ - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - } - - /* - * Write the stats file(s) if a new request has arrived that is - * not satisfied by existing file(s). - */ - if (pgstat_write_statsfile_needed()) - pgstat_write_statsfiles(false, false); - - /* - * Try to receive and process a message. This will not block, - * since the socket is set to non-blocking mode. - * - * XXX On Windows, we have to force pgwin32_recv to cooperate, - * despite the previous use of pg_set_noblock() on the socket. - * This is extremely broken and should be fixed someday. - */ -#ifdef WIN32 - pgwin32_noblock = 1; -#endif - - len = recv(pgStatSock, (char *) &msg, - sizeof(PgStat_Msg), 0); - -#ifdef WIN32 - pgwin32_noblock = 0; -#endif - - if (len < 0) - { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - break; /* out of inner loop */ - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not read statistics message: %m"))); - } - - /* - * We ignore messages that are smaller than our common header - */ - if (len < sizeof(PgStat_MsgHdr)) - continue; - - /* - * The received length must match the length in the header - */ - if (msg.msg_hdr.m_size != len) - continue; - - /* - * O.K. - we accept this message. Process it. - */ - switch (msg.msg_hdr.m_type) - { - case PGSTAT_MTYPE_DUMMY: - break; - - case PGSTAT_MTYPE_INQUIRY: - pgstat_recv_inquiry(&msg.msg_inquiry, len); - break; - - case PGSTAT_MTYPE_TABSTAT: - pgstat_recv_tabstat(&msg.msg_tabstat, len); - break; - - case PGSTAT_MTYPE_TABPURGE: - pgstat_recv_tabpurge(&msg.msg_tabpurge, len); - break; - - case PGSTAT_MTYPE_DROPDB: - pgstat_recv_dropdb(&msg.msg_dropdb, len); - break; - - case PGSTAT_MTYPE_RESETCOUNTER: - pgstat_recv_resetcounter(&msg.msg_resetcounter, len); - break; - - case PGSTAT_MTYPE_RESETSHAREDCOUNTER: - pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter, - len); - break; - - case PGSTAT_MTYPE_RESETSINGLECOUNTER: - pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter, - len); - break; - - case PGSTAT_MTYPE_RESETSLRUCOUNTER: - pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter, - len); - break; - - case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER: - pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter, - len); - break; - - case PGSTAT_MTYPE_RESETSUBCOUNTER: - pgstat_recv_resetsubcounter(&msg.msg_resetsubcounter, len); - break; - - case PGSTAT_MTYPE_AUTOVAC_START: - pgstat_recv_autovac(&msg.msg_autovacuum_start, len); - break; + if (p->key.dboid != MyDatabaseId && + p->key.dboid != InvalidOid && + !kind_info->accessed_across_databases) + continue; - case PGSTAT_MTYPE_VACUUM: - pgstat_recv_vacuum(&msg.msg_vacuum, len); - break; + if (p->dropped) + continue; - case PGSTAT_MTYPE_ANALYZE: - pgstat_recv_analyze(&msg.msg_analyze, len); - break; + Assert(pg_atomic_read_u32(&p->refcount) > 0); - case PGSTAT_MTYPE_ARCHIVER: - pgstat_recv_archiver(&msg.msg_archiver, len); - break; + stats_data = dsa_get_address(pgStatLocal.dsa, p->body); + Assert(stats_data); - case PGSTAT_MTYPE_BGWRITER: - pgstat_recv_bgwriter(&msg.msg_bgwriter, len); - break; + entry = pgstat_snapshot_insert(pgStatLocal.snapshot.stats, p->key, &found); + Assert(!found); - case PGSTAT_MTYPE_CHECKPOINTER: - pgstat_recv_checkpointer(&msg.msg_checkpointer, len); - break; + entry->data = MemoryContextAlloc(pgStatLocal.snapshot.context, + kind_info->shared_size); + memcpy(entry->data, + pgstat_get_entry_data(kind, stats_data), + kind_info->shared_size); + } + dshash_seq_term(&hstat); - case PGSTAT_MTYPE_WAL: - pgstat_recv_wal(&msg.msg_wal, len); - break; + /* + * Build snapshot of all fixed-numbered stats. + */ + for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++) + { + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); - case PGSTAT_MTYPE_SLRU: - pgstat_recv_slru(&msg.msg_slru, len); - break; + if (!kind_info->fixed_amount) + { + Assert(kind_info->snapshot_cb == NULL); + continue; + } - case PGSTAT_MTYPE_FUNCSTAT: - pgstat_recv_funcstat(&msg.msg_funcstat, len); - break; + pgstat_build_snapshot_fixed(kind); + } - case PGSTAT_MTYPE_FUNCPURGE: - pgstat_recv_funcpurge(&msg.msg_funcpurge, len); - break; + pgStatLocal.snapshot.mode = PGSTAT_FETCH_CONSISTENCY_SNAPSHOT; +} - case PGSTAT_MTYPE_RECOVERYCONFLICT: - pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict, - len); - break; +static void +pgstat_build_snapshot_fixed(PgStat_Kind kind) +{ + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); - case PGSTAT_MTYPE_DEADLOCK: - pgstat_recv_deadlock(&msg.msg_deadlock, len); - break; + Assert(kind_info->fixed_amount); + Assert(kind_info->snapshot_cb != NULL); - case PGSTAT_MTYPE_TEMPFILE: - pgstat_recv_tempfile(&msg.msg_tempfile, len); - break; + if (pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_NONE) + { + /* rebuild every time */ + pgStatLocal.snapshot.fixed_valid[kind] = false; + } + else if (pgStatLocal.snapshot.fixed_valid[kind]) + { + /* in snapshot mode we shouldn't get called again */ + Assert(pgstat_fetch_consistency == PGSTAT_FETCH_CONSISTENCY_CACHE); + return; + } - case PGSTAT_MTYPE_CHECKSUMFAILURE: - pgstat_recv_checksum_failure(&msg.msg_checksumfailure, - len); - break; + Assert(!pgStatLocal.snapshot.fixed_valid[kind]); - case PGSTAT_MTYPE_REPLSLOT: - pgstat_recv_replslot(&msg.msg_replslot, len); - break; + kind_info->snapshot_cb(); - case PGSTAT_MTYPE_CONNECT: - pgstat_recv_connect(&msg.msg_connect, len); - break; + Assert(!pgStatLocal.snapshot.fixed_valid[kind]); + pgStatLocal.snapshot.fixed_valid[kind] = true; +} - case PGSTAT_MTYPE_DISCONNECT: - pgstat_recv_disconnect(&msg.msg_disconnect, len); - break; - case PGSTAT_MTYPE_SUBSCRIPTIONDROP: - pgstat_recv_subscription_drop(&msg.msg_subscriptiondrop, len); - break; +/* ------------------------------------------------------------ + * Backend-local pending stats infrastructure + * ------------------------------------------------------------ + */ - case PGSTAT_MTYPE_SUBSCRIPTIONERROR: - pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len); - break; +/* + * Returns the appropriate PgStat_EntryRef, preparing it to receive pending + * stats if not already done. + * + * If created_entry is non-NULL, it'll be set to true if the entry is newly + * created, false otherwise. + */ +PgStat_EntryRef * +pgstat_prep_pending_entry(PgStat_Kind kind, Oid dboid, Oid objoid, bool *created_entry) +{ + PgStat_EntryRef *entry_ref; - default: - break; - } - } /* end of inner message-processing loop */ + /* need to be able to flush out */ + Assert(pgstat_get_kind_info(kind)->flush_pending_cb != NULL); - /* Sleep until there's something to do */ -#ifndef WIN32 - wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN); -#else + if (unlikely(!pgStatPendingContext)) + { + pgStatPendingContext = + AllocSetContextCreate(CacheMemoryContext, + "PgStat Pending", + ALLOCSET_SMALL_SIZES); + } - /* - * Windows, at least in its Windows Server 2003 R2 incarnation, - * sometimes loses FD_READ events. Waking up and retrying the recv() - * fixes that, so don't sleep indefinitely. This is a crock of the - * first water, but until somebody wants to debug exactly what's - * happening there, this is the best we can do. The two-second - * timeout matches our pre-9.2 behavior, and needs to be short enough - * to not provoke "using stale statistics" complaints from - * backend_read_statsfile. - */ - wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1, - WAIT_EVENT_PGSTAT_MAIN); -#endif + entry_ref = pgstat_get_entry_ref(kind, dboid, objoid, + true, created_entry); - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (wr == 1 && event.events == WL_POSTMASTER_DEATH) - break; - } /* end of outer loop */ + if (entry_ref->pending == NULL) + { + size_t entrysize = pgstat_get_kind_info(kind)->pending_size; - /* - * Save the final stats to reuse at next startup. - */ - pgstat_write_statsfiles(true, true); + Assert(entrysize != (size_t) -1); - FreeWaitEventSet(wes); + entry_ref->pending = MemoryContextAllocZero(pgStatPendingContext, entrysize); + dlist_push_tail(&pgStatPending, &entry_ref->pending_node); + } - exit(0); + return entry_ref; } /* - * Subroutine to clear stats in a database entry + * Return an existing stats entry, or NULL. * - * Tables and functions hashes are initialized to empty. + * This should only be used for helper function for pgstatfuncs.c - outside of + * that it shouldn't be needed. */ -static void -reset_dbentry_counters(PgStat_StatDBEntry *dbentry) +PgStat_EntryRef * +pgstat_fetch_pending_entry(PgStat_Kind kind, Oid dboid, Oid objoid) { - HASHCTL hash_ctl; - - dbentry->n_xact_commit = 0; - dbentry->n_xact_rollback = 0; - dbentry->n_blocks_fetched = 0; - dbentry->n_blocks_hit = 0; - dbentry->n_tuples_returned = 0; - dbentry->n_tuples_fetched = 0; - dbentry->n_tuples_inserted = 0; - dbentry->n_tuples_updated = 0; - dbentry->n_tuples_deleted = 0; - dbentry->last_autovac_time = 0; - dbentry->n_conflict_tablespace = 0; - dbentry->n_conflict_lock = 0; - dbentry->n_conflict_snapshot = 0; - dbentry->n_conflict_bufferpin = 0; - dbentry->n_conflict_startup_deadlock = 0; - dbentry->n_temp_files = 0; - dbentry->n_temp_bytes = 0; - dbentry->n_deadlocks = 0; - dbentry->n_checksum_failures = 0; - dbentry->last_checksum_failure = 0; - dbentry->n_block_read_time = 0; - dbentry->n_block_write_time = 0; - dbentry->n_sessions = 0; - dbentry->total_session_time = 0; - dbentry->total_active_time = 0; - dbentry->total_idle_in_xact_time = 0; - dbentry->n_sessions_abandoned = 0; - dbentry->n_sessions_fatal = 0; - dbentry->n_sessions_killed = 0; - - dbentry->stat_reset_timestamp = GetCurrentTimestamp(); - dbentry->stats_timestamp = 0; - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatTabEntry); - dbentry->tables = hash_create("Per-database table", - PGSTAT_TAB_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry); - dbentry->functions = hash_create("Per-database function", - PGSTAT_FUNCTION_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); -} + PgStat_EntryRef *entry_ref; -/* - * Lookup the hash table entry for the specified database. If no hash - * table entry exists, initialize it, if the create parameter is true. - * Else, return NULL. - */ -static PgStat_StatDBEntry * -pgstat_get_db_entry(Oid databaseid, bool create) -{ - PgStat_StatDBEntry *result; - bool found; - HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + entry_ref = pgstat_get_entry_ref(kind, dboid, objoid, false, NULL); - /* Lookup or create the hash table entry for this database */ - result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, - &databaseid, - action, &found); - - if (!create && !found) + if (entry_ref == NULL || entry_ref->pending == NULL) return NULL; - /* - * If not found, initialize the new one. This creates empty hash tables - * for tables and functions, too. - */ - if (!found) - reset_dbentry_counters(result); - - return result; + return entry_ref; } -/* - * Lookup the hash table entry for the specified table. If no hash - * table entry exists, initialize it, if the create parameter is true. - * Else, return NULL. - */ -static PgStat_StatTabEntry * -pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) +void +pgstat_delete_pending_entry(PgStat_EntryRef *entry_ref) { - PgStat_StatTabEntry *result; - bool found; - HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + PgStat_Kind kind = entry_ref->shared_entry->key.kind; + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); + void *pending_data = entry_ref->pending; - /* Lookup or create the hash table entry for this table */ - result = (PgStat_StatTabEntry *) hash_search(dbentry->tables, - &tableoid, - action, &found); + Assert(pending_data != NULL); + /* !fixed_amount stats should be handled explicitly */ + Assert(!pgstat_get_kind_info(kind)->fixed_amount); - if (!create && !found) - return NULL; + if (kind_info->delete_pending_cb) + kind_info->delete_pending_cb(entry_ref); - /* If not found, initialize the new one. */ - if (!found) - { - result->numscans = 0; - result->tuples_returned = 0; - result->tuples_fetched = 0; - result->tuples_inserted = 0; - result->tuples_updated = 0; - result->tuples_deleted = 0; - result->tuples_hot_updated = 0; - result->n_live_tuples = 0; - result->n_dead_tuples = 0; - result->changes_since_analyze = 0; - result->inserts_since_vacuum = 0; - result->blocks_fetched = 0; - result->blocks_hit = 0; - result->vacuum_timestamp = 0; - result->vacuum_count = 0; - result->autovac_vacuum_timestamp = 0; - result->autovac_vacuum_count = 0; - result->analyze_timestamp = 0; - result->analyze_count = 0; - result->autovac_analyze_timestamp = 0; - result->autovac_analyze_count = 0; - } + pfree(pending_data); + entry_ref->pending = NULL; - return result; + dlist_delete(&entry_ref->pending_node); } /* - * Return the entry of replication slot stats with the given name. Return - * NULL if not found and the caller didn't request to create it. - * - * create tells whether to create the new slot entry if it is not found. + * Flush out pending stats for database objects (databases, relations, + * functions). */ -static PgStat_StatReplSlotEntry * -pgstat_get_replslot_entry(NameData name, bool create) +static bool +pgstat_flush_pending_entries(bool nowait) { - PgStat_StatReplSlotEntry *slotent; - bool found; + bool have_pending = false; + dlist_node *cur = NULL; + + /* + * Need to be a bit careful iterating over the list of pending entries. + * Processing a pending entry may queue further pending entries to the end + * of the list that we want to process, so a simple iteration won't do. + * Further complicating matters is that we want to delete the current + * entry in each iteration from the list if we flushed successfully. + * + * So we just keep track of the next pointer in each loop iteration. + */ + if (!dlist_is_empty(&pgStatPending)) + cur = dlist_head_node(&pgStatPending); - if (replSlotStatHash == NULL) + while (cur) { - HASHCTL hash_ctl; + PgStat_EntryRef *entry_ref = + dlist_container(PgStat_EntryRef, pending_node, cur); + PgStat_HashKey key = entry_ref->shared_entry->key; + PgStat_Kind kind = key.kind; + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); + bool did_flush; + dlist_node *next; - /* - * Quick return NULL if the hash table is empty and the caller didn't - * request to create the entry. - */ - if (!create) - return NULL; + Assert(!kind_info->fixed_amount); + Assert(kind_info->flush_pending_cb != NULL); - hash_ctl.keysize = sizeof(NameData); - hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); - replSlotStatHash = hash_create("Replication slots hash", - PGSTAT_REPLSLOT_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); - } + /* flush the stats, if possible */ + did_flush = kind_info->flush_pending_cb(entry_ref, nowait); - slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, - (void *) &name, - create ? HASH_ENTER : HASH_FIND, - &found); + Assert(did_flush || nowait); - if (!slotent) - { - /* not found */ - Assert(!create && !found); - return NULL; - } + /* determine next entry, before deleting the pending entry */ + if (dlist_has_next(&pgStatPending, cur)) + next = dlist_next_node(&pgStatPending, cur); + else + next = NULL; - /* initialize the entry */ - if (create && !found) - { - namestrcpy(&(slotent->slotname), NameStr(name)); - pgstat_reset_replslot_entry(slotent, 0); + /* if successfully flushed, remove entry */ + if (did_flush) + pgstat_delete_pending_entry(entry_ref); + else + have_pending = true; + + cur = next; } - return slotent; -} + Assert(dlist_is_empty(&pgStatPending) == !have_pending); -/* - * Reset the given replication slot stats. - */ -static void -pgstat_reset_replslot_entry(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) -{ - /* reset only counters. Don't clear slot name */ - slotent->spill_txns = 0; - slotent->spill_count = 0; - slotent->spill_bytes = 0; - slotent->stream_txns = 0; - slotent->stream_count = 0; - slotent->stream_bytes = 0; - slotent->total_txns = 0; - slotent->total_bytes = 0; - slotent->stat_reset_timestamp = ts; + return have_pending; } -/* - * Return the subscription statistics entry with the given subscription OID. - * If no subscription entry exists, initialize it, if the create parameter is - * true. Else, return NULL. + +/* ------------------------------------------------------------ + * Helper / infrastructure functions + * ------------------------------------------------------------ */ -static PgStat_StatSubEntry * -pgstat_get_subscription_entry(Oid subid, bool create) -{ - PgStat_StatSubEntry *subentry; - bool found; - HASHACTION action = (create ? HASH_ENTER : HASH_FIND); - if (subscriptionStatHash == NULL) +PgStat_Kind +pgstat_get_kind_from_str(char *kind_str) +{ + for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++) { - HASHCTL hash_ctl; - - /* - * Quick return NULL if the hash table is empty and the caller didn't - * request to create the entry. - */ - if (!create) - return NULL; - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); - subscriptionStatHash = hash_create("Subscription hash", - PGSTAT_SUBSCRIPTION_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); + if (pg_strcasecmp(kind_str, pgstat_kind_infos[kind].name) == 0) + return kind; } - subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, - (void *) &subid, - action, &found); + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid statistics kind: \"%s\"", kind_str))); + return PGSTAT_KIND_DATABASE; /* avoid compiler warnings */ +} - if (!create && !found) - return NULL; +static inline bool +pgstat_is_kind_valid(int ikind) +{ + return ikind >= PGSTAT_KIND_FIRST_VALID && ikind <= PGSTAT_KIND_LAST; +} - /* If not found, initialize the new one */ - if (!found) - pgstat_reset_subscription(subentry, 0); +const PgStat_KindInfo * +pgstat_get_kind_info(PgStat_Kind kind) +{ + AssertArg(pgstat_is_kind_valid(kind)); - return subentry; + return &pgstat_kind_infos[kind]; } /* - * Reset the given subscription stats. + * Stats should only be reported after pgstat_initialize() and before + * pgstat_shutdown(). This check is put in a few central places to catch + * violations of this rule more easily. */ -static void -pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts) +#ifdef USE_ASSERT_CHECKING +void +pgstat_assert_is_up(void) { - subentry->apply_error_count = 0; - subentry->sync_error_count = 0; - subentry->stat_reset_timestamp = ts; + Assert(pgstat_is_initialized && !pgstat_is_shutdown); } +#endif /* ------------------------------------------------------------ @@ -2103,28 +1204,38 @@ pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts) * ------------------------------------------------------------ */ +/* helpers for pgstat_write_statsfile() */ +static void +write_chunk(FILE *fpout, void *ptr, size_t len) +{ + int rc; + + rc = fwrite(ptr, len, 1, fpout); + + /* we'll check for errors with ferror once at the end */ + (void) rc; +} + +#define write_chunk_s(fpout, ptr) write_chunk(fpout, ptr, sizeof(*ptr)) + /* - * Write the global statistics file, as well as requested DB files. - * - * 'permanent' specifies writing to the permanent files not temporary ones. - * When true (happens only when the collector is shutting down), also remove - * the temporary files so that backends starting up under a new postmaster - * can't read old data before the new collector is ready. - * - * When 'allDbs' is false, only the requested databases (listed in - * pending_write_requests) will be written; otherwise, all databases - * will be written. + * This function is called in the last process that is accessing the shared + * stats so locking is not required. */ static void -pgstat_write_statsfiles(bool permanent, bool allDbs) +pgstat_write_statsfile(void) { - HASH_SEQ_STATUS hstat; - PgStat_StatDBEntry *dbentry; FILE *fpout; int32 format_id; - const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; - const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; - int rc; + const char *tmpfile = PGSTAT_STAT_PERMANENT_TMPFILE; + const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME; + dshash_seq_status hstat; + PgStatShared_HashEntry *ps; + + pgstat_assert_is_up(); + + /* we're shutting down, so it's ok to just override this */ + pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -2142,237 +1253,99 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } /* - * Set the timestamp of the stats file. - */ - globalStats.stats_timestamp = GetCurrentTimestamp(); - - /* * Write the file header --- currently just a format ID. */ format_id = PGSTAT_FILE_FORMAT_ID; - rc = fwrite(&format_id, sizeof(format_id), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + write_chunk_s(fpout, &format_id); /* - * Write global stats struct + * XXX: The following could now be generalized to just iterate over + * pgstat_kind_infos instead of knowing about the different kinds of + * stats. */ - rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ /* * Write archiver stats struct */ - rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + pgstat_build_snapshot_fixed(PGSTAT_KIND_ARCHIVER); + write_chunk_s(fpout, &pgStatLocal.snapshot.archiver); /* - * Write WAL stats struct + * Write bgwriter stats struct */ - rc = fwrite(&walStats, sizeof(walStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + pgstat_build_snapshot_fixed(PGSTAT_KIND_BGWRITER); + write_chunk_s(fpout, &pgStatLocal.snapshot.bgwriter); /* - * Write SLRU stats struct + * Write checkpointer stats struct */ - rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + pgstat_build_snapshot_fixed(PGSTAT_KIND_CHECKPOINTER); + write_chunk_s(fpout, &pgStatLocal.snapshot.checkpointer); /* - * Walk through the database table. + * Write SLRU stats struct */ - hash_seq_init(&hstat, pgStatDBHash); - while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) - { - /* - * Write out the table and function stats for this DB into the - * appropriate per-DB stat file, if required. - */ - if (allDbs || pgstat_db_requested(dbentry->databaseid)) - { - /* Make DB's timestamp consistent with the global stats */ - dbentry->stats_timestamp = globalStats.stats_timestamp; - - pgstat_write_db_statsfile(dbentry, permanent); - } - - /* - * Write out the DB entry. We don't write the tables or functions - * pointers, since they're of no use to any other process. - */ - fputc('D', fpout); - rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } + pgstat_build_snapshot_fixed(PGSTAT_KIND_SLRU); + write_chunk_s(fpout, &pgStatLocal.snapshot.slru); /* - * Write replication slot stats struct + * Write WAL stats struct */ - if (replSlotStatHash) - { - PgStat_StatReplSlotEntry *slotent; - - hash_seq_init(&hstat, replSlotStatHash); - while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) - { - fputc('R', fpout); - rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } - } + pgstat_build_snapshot_fixed(PGSTAT_KIND_WAL); + write_chunk_s(fpout, &pgStatLocal.snapshot.wal); /* - * Write subscription stats struct + * Walk through the stats entries */ - if (subscriptionStatHash) + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((ps = dshash_seq_next(&hstat)) != NULL) { - PgStat_StatSubEntry *subentry; - - hash_seq_init(&hstat, subscriptionStatHash); - while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) - { - fputc('S', fpout); - rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } - } + PgStatShared_Common *shstats; + const PgStat_KindInfo *kind_info = NULL; - /* - * No more output to be done. Close the temp file and replace the old - * pgstat.stat with it. The ferror() check replaces testing for error - * after each individual fputc or fwrite above. - */ - fputc('E', fpout); - - if (ferror(fpout)) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write temporary statistics file \"%s\": %m", - tmpfile))); - FreeFile(fpout); - unlink(tmpfile); - } - else if (FreeFile(fpout) < 0) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not close temporary statistics file \"%s\": %m", - tmpfile))); - unlink(tmpfile); - } - else if (rename(tmpfile, statfile) < 0) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m", - tmpfile, statfile))); - unlink(tmpfile); - } - - if (permanent) - unlink(pgstat_stat_filename); + CHECK_FOR_INTERRUPTS(); - /* - * Now throw away the list of requests. Note that requests sent after we - * started the write are still waiting on the network socket. - */ - list_free(pending_write_requests); - pending_write_requests = NIL; -} + /* we may have some "dropped" entries not yet removed, skip them */ + Assert(!ps->dropped); + if (ps->dropped) + continue; -/* - * return the filename for a DB stat file; filename is the output buffer, - * of length len. - */ -static void -get_dbstat_filename(bool permanent, bool tempname, Oid databaseid, - char *filename, int len) -{ - int printed; - - /* NB -- pgstat_reset_remove_files knows about the pattern this uses */ - printed = snprintf(filename, len, "%s/db_%u.%s", - permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY : - pgstat_stat_directory, - databaseid, - tempname ? "tmp" : "stat"); - if (printed >= len) - elog(ERROR, "overlength pgstat path"); -} + shstats = (PgStatShared_Common *) dsa_get_address(pgStatLocal.dsa, ps->body); -/* - * Write the stat file for a single database. - * - * If writing to the permanent file (happens when the collector is - * shutting down only), remove the temporary file so that backends - * starting up under a new postmaster can't read the old data before - * the new collector is ready. - */ -static void -pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) -{ - HASH_SEQ_STATUS tstat; - HASH_SEQ_STATUS fstat; - PgStat_StatTabEntry *tabentry; - PgStat_StatFuncEntry *funcentry; - FILE *fpout; - int32 format_id; - Oid dbid = dbentry->databaseid; - int rc; - char tmpfile[MAXPGPATH]; - char statfile[MAXPGPATH]; + kind_info = pgstat_get_kind_info(ps->key.kind); - get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH); - get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH); + /* if not dropped the valid-entry refcount should exist */ + Assert(pg_atomic_read_u32(&ps->refcount) > 0); - elog(DEBUG2, "writing stats file \"%s\"", statfile); - - /* - * Open the statistics temp file to write out the current values. - */ - fpout = AllocateFile(tmpfile, PG_BINARY_W); - if (fpout == NULL) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not open temporary statistics file \"%s\": %m", - tmpfile))); - return; - } + if (!kind_info->to_serialized_name) + { + /* normal stats entry, identified by PgStat_HashKey */ + fputc('S', fpout); + write_chunk_s(fpout, &ps->key); + } + else + { + /* stats entry identified by name on disk (e.g. slots) */ + NameData name; - /* - * Write the file header --- currently just a format ID. - */ - format_id = PGSTAT_FILE_FORMAT_ID; - rc = fwrite(&format_id, sizeof(format_id), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + kind_info->to_serialized_name(shstats, &name); - /* - * Walk through the database's access stats per table. - */ - hash_seq_init(&tstat, dbentry->tables); - while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL) - { - fputc('T', fpout); - rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } + fputc('N', fpout); + write_chunk_s(fpout, &ps->key.kind); + write_chunk_s(fpout, &name); + } - /* - * Walk through the database's function stats table. - */ - hash_seq_init(&fstat, dbentry->functions); - while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL) - { - fputc('F', fpout); - rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + /* Write except the header part of the entry */ + write_chunk(fpout, + pgstat_get_entry_data(ps->key.kind, shstats), + pgstat_get_entry_len(ps->key.kind)); } + dshash_seq_term(&hstat); /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error - * after each individual fputc or fwrite above. + * after each individual fputc or fwrite (in write_chunk()) above. */ fputc('E', fpout); @@ -2401,1806 +1374,230 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) tmpfile, statfile))); unlink(tmpfile); } +} - if (permanent) - { - get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH); - - elog(DEBUG2, "removing temporary stats file \"%s\"", statfile); - unlink(statfile); - } +/* helpers for pgstat_read_statsfile() */ +static bool +read_chunk(FILE *fpin, void *ptr, size_t len) +{ + return fread(ptr, 1, len, fpin) == len; } +#define read_chunk_s(fpin, ptr) read_chunk(fpin, ptr, sizeof(*ptr)) + /* - * Reads in some existing statistics collector files and returns the - * databases hash table that is the top level of the data. - * - * If 'onlydb' is not InvalidOid, it means we only want data for that DB - * plus the shared catalogs ("DB 0"). We'll still populate the DB hash - * table for all databases, but we don't bother even creating table/function - * hash tables for other databases. + * Reads in existing statistics file into the shared stats hash. * - * 'permanent' specifies reading from the permanent files not temporary ones. - * When true (happens only when the collector is starting up), remove the - * files after reading; the in-memory status is now authoritative, and the - * files would be out of date in case somebody else reads them. - * - * If a 'deep' read is requested, table/function stats are read, otherwise - * the table/function hash tables remain empty. + * This function is called in the only process that is accessing the shared + * stats so locking is not required. */ -static HTAB * -pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) +static void +pgstat_read_statsfile(void) { - PgStat_StatDBEntry *dbentry; - PgStat_StatDBEntry dbbuf; - HASHCTL hash_ctl; - HTAB *dbhash; FILE *fpin; int32 format_id; bool found; - const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; - int i; - TimestampTz ts; - - /* - * The tables will live in pgStatLocalContext. - */ - pgstat_setup_memcxt(); - - /* - * Create the DB hashtable - */ - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatDBEntry); - hash_ctl.hcxt = pgStatLocalContext; - dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME; + PgStat_ShmemControl *shmem = pgStatLocal.shmem; + TimestampTz ts = GetCurrentTimestamp(); - /* - * Clear out global, archiver, WAL and SLRU statistics so they start from - * zero in case we can't load an existing statsfile. - */ - memset(&globalStats, 0, sizeof(globalStats)); - memset(&archiverStats, 0, sizeof(archiverStats)); - memset(&walStats, 0, sizeof(walStats)); - memset(&slruStats, 0, sizeof(slruStats)); + /* shouldn't be called from postmaster */ + Assert(IsUnderPostmaster || !IsPostmasterEnvironment); - /* - * Set the current timestamp (will be kept only in case we can't load an - * existing statsfile). - */ - ts = GetCurrentTimestamp(); - globalStats.bgwriter.stat_reset_timestamp = ts; - archiverStats.stat_reset_timestamp = ts; - walStats.stat_reset_timestamp = ts; - - /* - * Set the same reset timestamp for all SLRU items too. - */ - for (i = 0; i < SLRU_NUM_ELEMENTS; i++) - slruStats[i].stat_reset_timestamp = ts; + elog(DEBUG2, "reading stats file \"%s\"", statfile); /* * Try to open the stats file. If it doesn't exist, the backends simply - * return zero for anything and the collector simply starts from scratch + * returns zero for anything and statistics simply starts from scratch * with empty counters. * - * ENOENT is a possibility if the stats collector is not running or has - * not yet written the stats file the first time. Any other failure - * condition is suspicious. + * ENOENT is a possibility if stats collection was previously disabled or + * has not yet written the stats file for the first time. Any other + * failure condition is suspicious. */ if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL) { if (errno != ENOENT) - ereport(pgStatRunningInCollector ? LOG : WARNING, + ereport(LOG, (errcode_for_file_access(), errmsg("could not open statistics file \"%s\": %m", statfile))); - return dbhash; + pgstat_reset_after_failure(ts); + return; } /* * Verify it's of the expected format. */ - if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) || + if (!read_chunk_s(fpin, &format_id) || format_id != PGSTAT_FILE_FORMAT_ID) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - goto done; - } + goto error; /* - * Read global stats struct + * XXX: The following could now be generalized to just iterate over + * pgstat_kind_infos instead of knowing about the different kinds of + * stats. */ - if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&globalStats, 0, sizeof(globalStats)); - goto done; - } /* - * In the collector, disregard the timestamp we read from the permanent - * stats file; we should be willing to write a temp stats file immediately - * upon the first request from any backend. This only matters if the old - * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not - * an unusual scenario. + * Read archiver stats struct */ - if (pgStatRunningInCollector) - globalStats.stats_timestamp = 0; + if (!read_chunk_s(fpin, &shmem->archiver.stats)) + goto error; /* - * Read archiver stats struct + * Read bgwriter stats struct */ - if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&archiverStats, 0, sizeof(archiverStats)); - goto done; - } + if (!read_chunk_s(fpin, &shmem->bgwriter.stats)) + goto error; /* - * Read WAL stats struct + * Read checkpointer stats struct */ - if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&walStats, 0, sizeof(walStats)); - goto done; - } + if (!read_chunk_s(fpin, &shmem->checkpointer.stats)) + goto error; /* * Read SLRU stats struct */ - if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&slruStats, 0, sizeof(slruStats)); - goto done; - } + if (!read_chunk_s(fpin, &shmem->slru.stats)) + goto error; + + /* + * Read WAL stats struct + */ + if (!read_chunk_s(fpin, &shmem->wal.stats)) + goto error; /* - * We found an existing collector stats file. Read it and put all the - * hashtable entries into place. + * We found an existing statistics file. Read it and put all the hash + * table entries into place. */ for (;;) { - switch (fgetc(fpin)) - { - /* - * 'D' A PgStat_StatDBEntry struct describing a database - * follows. - */ - case 'D': - if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables), - fpin) != offsetof(PgStat_StatDBEntry, tables)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - /* - * Add to the DB hash - */ - dbentry = (PgStat_StatDBEntry *) hash_search(dbhash, - (void *) &dbbuf.databaseid, - HASH_ENTER, - &found); - if (found) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } + char t = fgetc(fpin); - memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); - dbentry->tables = NULL; - dbentry->functions = NULL; - - /* - * In the collector, disregard the timestamp we read from the - * permanent stats file; we should be willing to write a temp - * stats file immediately upon the first request from any - * backend. - */ - if (pgStatRunningInCollector) - dbentry->stats_timestamp = 0; - - /* - * Don't create tables/functions hashtables for uninteresting - * databases. - */ - if (onlydb != InvalidOid) + switch (t) + { + case 'S': + case 'N': { - if (dbbuf.databaseid != onlydb && - dbbuf.databaseid != InvalidOid) - break; - } + PgStat_HashKey key; + PgStatShared_HashEntry *p; + PgStatShared_Common *header; - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatTabEntry); - hash_ctl.hcxt = pgStatLocalContext; - dbentry->tables = hash_create("Per-database table", - PGSTAT_TAB_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry); - hash_ctl.hcxt = pgStatLocalContext; - dbentry->functions = hash_create("Per-database function", - PGSTAT_FUNCTION_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - /* - * If requested, read the data from the database-specific - * file. Otherwise we just leave the hashtables empty. - */ - if (deep) - pgstat_read_db_statsfile(dbentry->databaseid, - dbentry->tables, - dbentry->functions, - permanent); - - break; - - /* - * 'R' A PgStat_StatReplSlotEntry struct describing a - * replication slot follows. - */ - case 'R': - { - PgStat_StatReplSlotEntry slotbuf; - PgStat_StatReplSlotEntry *slotent; + CHECK_FOR_INTERRUPTS(); - if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin) - != sizeof(PgStat_StatReplSlotEntry)) + if (t == 'S') { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } + /* normal stats entry, identified by PgStat_HashKey */ + if (!read_chunk_s(fpin, &key)) + goto error; - /* Create hash table if we don't have it already. */ - if (replSlotStatHash == NULL) - { - HASHCTL hash_ctl; - - hash_ctl.keysize = sizeof(NameData); - hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); - hash_ctl.hcxt = pgStatLocalContext; - replSlotStatHash = hash_create("Replication slots hash", - PGSTAT_REPLSLOT_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + if (!pgstat_is_kind_valid(key.kind)) + goto error; } - - slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, - (void *) &slotbuf.slotname, - HASH_ENTER, NULL); - memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry)); - break; - } - - /* - * 'S' A PgStat_StatSubEntry struct describing subscription - * statistics. - */ - case 'S': - { - PgStat_StatSubEntry subbuf; - PgStat_StatSubEntry *subentry; - - if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin) - != sizeof(PgStat_StatSubEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - if (subscriptionStatHash == NULL) + else { - HASHCTL hash_ctl; - - hash_ctl.keysize = sizeof(Oid); - hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); - hash_ctl.hcxt = pgStatLocalContext; - subscriptionStatHash = hash_create("Subscription hash", - PGSTAT_SUBSCRIPTION_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - } - - subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, - (void *) &subbuf.subid, - HASH_ENTER, NULL); - - memcpy(subentry, &subbuf, sizeof(subbuf)); - break; - } - - case 'E': - goto done; + /* stats entry identified by name on disk (e.g. slots) */ + const PgStat_KindInfo *kind_info = NULL; + PgStat_Kind kind; + NameData name; - default: - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - } - -done: - FreeFile(fpin); - - /* If requested to read the permanent file, also get rid of it. */ - if (permanent) - { - elog(DEBUG2, "removing permanent stats file \"%s\"", statfile); - unlink(statfile); - } + if (!read_chunk_s(fpin, &kind)) + goto error; + if (!read_chunk_s(fpin, &name)) + goto error; + if (!pgstat_is_kind_valid(kind)) + goto error; - return dbhash; -} + kind_info = pgstat_get_kind_info(kind); + if (!kind_info->from_serialized_name) + goto error; -/* - * Reads in the existing statistics collector file for the given database, - * filling the passed-in tables and functions hash tables. - * - * As in pgstat_read_statsfiles, if the permanent file is requested, it is - * removed after reading. - * - * Note: this code has the ability to skip storing per-table or per-function - * data, if NULL is passed for the corresponding hashtable. That's not used - * at the moment though. - */ -static void -pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - bool permanent) -{ - PgStat_StatTabEntry *tabentry; - PgStat_StatTabEntry tabbuf; - PgStat_StatFuncEntry funcbuf; - PgStat_StatFuncEntry *funcentry; - FILE *fpin; - int32 format_id; - bool found; - char statfile[MAXPGPATH]; + if (!kind_info->from_serialized_name(&name, &key)) + { + /* skip over data for entry we don't care about */ + if (fseek(fpin, pgstat_get_entry_len(kind), SEEK_CUR) != 0) + goto error; - get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH); + continue; + } - /* - * Try to open the stats file. If it doesn't exist, the backends simply - * return zero for anything and the collector simply starts from scratch - * with empty counters. - * - * ENOENT is a possibility if the stats collector is not running or has - * not yet written the stats file the first time. Any other failure - * condition is suspicious. - */ - if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL) - { - if (errno != ENOENT) - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errcode_for_file_access(), - errmsg("could not open statistics file \"%s\": %m", - statfile))); - return; - } - - /* - * Verify it's of the expected format. - */ - if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) || - format_id != PGSTAT_FILE_FORMAT_ID) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - goto done; - } - - /* - * We found an existing collector stats file. Read it and put all the - * hashtable entries into place. - */ - for (;;) - { - switch (fgetc(fpin)) - { - /* - * 'T' A PgStat_StatTabEntry follows. - */ - case 'T': - if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry), - fpin) != sizeof(PgStat_StatTabEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - /* - * Skip if table data not wanted. - */ - if (tabhash == NULL) - break; + Assert(key.kind == kind); + } - tabentry = (PgStat_StatTabEntry *) hash_search(tabhash, - (void *) &tabbuf.tableid, - HASH_ENTER, &found); + /* + * This intentionally doesn't use pgstat_get_entry_ref() - + * putting all stats into checkpointer's + * pgStatEntryRefHash would be wasted effort and memory. + */ + p = dshash_find_or_insert(pgStatLocal.shared_hash, &key, &found); - if (found) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } + /* don't allow duplicate entries */ + if (found) + { + dshash_release_lock(pgStatLocal.shared_hash, p); + elog(WARNING, "found duplicate stats entry %d/%u/%u", + key.kind, key.dboid, key.objoid); + goto error; + } - memcpy(tabentry, &tabbuf, sizeof(tabbuf)); - break; + header = pgstat_init_entry(key.kind, p); + dshash_release_lock(pgStatLocal.shared_hash, p); - /* - * 'F' A PgStat_StatFuncEntry follows. - */ - case 'F': - if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry), - fpin) != sizeof(PgStat_StatFuncEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } + if (!read_chunk(fpin, + pgstat_get_entry_data(key.kind, header), + pgstat_get_entry_len(key.kind))) + goto error; - /* - * Skip if function data not wanted. - */ - if (funchash == NULL) break; - - funcentry = (PgStat_StatFuncEntry *) hash_search(funchash, - (void *) &funcbuf.functionid, - HASH_ENTER, &found); - - if (found) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - memcpy(funcentry, &funcbuf, sizeof(funcbuf)); - break; - - /* - * 'E' The EOF marker of a complete stats file. - */ - case 'E': - goto done; - - default: - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - } - -done: - FreeFile(fpin); - - if (permanent) - { - elog(DEBUG2, "removing permanent stats file \"%s\"", statfile); - unlink(statfile); - } -} - -/* - * Attempt to determine the timestamp of the last db statfile write. - * Returns true if successful; the timestamp is stored in *ts. The caller must - * rely on timestamp stored in *ts iff the function returns true. - * - * This needs to be careful about handling databases for which no stats file - * exists, such as databases without a stat entry or those not yet written: - * - * - if there's a database entry in the global file, return the corresponding - * stats_timestamp value. - * - * - if there's no db stat entry (e.g. for a new or inactive database), - * there's no stats_timestamp value, but also nothing to write so we return - * the timestamp of the global statfile. - */ -static bool -pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, - TimestampTz *ts) -{ - PgStat_StatDBEntry dbentry; - PgStat_GlobalStats myGlobalStats; - PgStat_ArchiverStats myArchiverStats; - PgStat_WalStats myWalStats; - PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; - PgStat_StatReplSlotEntry myReplSlotStats; - PgStat_StatSubEntry mySubStats; - FILE *fpin; - int32 format_id; - const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; - - /* - * Try to open the stats file. As above, anything but ENOENT is worthy of - * complaining about. - */ - if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL) - { - if (errno != ENOENT) - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errcode_for_file_access(), - errmsg("could not open statistics file \"%s\": %m", - statfile))); - return false; - } - - /* - * Verify it's of the expected format. - */ - if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) || - format_id != PGSTAT_FILE_FORMAT_ID) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - FreeFile(fpin); - return false; - } - - /* - * Read global stats struct - */ - if (fread(&myGlobalStats, 1, sizeof(myGlobalStats), - fpin) != sizeof(myGlobalStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - FreeFile(fpin); - return false; - } - - /* - * Read archiver stats struct - */ - if (fread(&myArchiverStats, 1, sizeof(myArchiverStats), - fpin) != sizeof(myArchiverStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - FreeFile(fpin); - return false; - } - - /* - * Read WAL stats struct - */ - if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - FreeFile(fpin); - return false; - } - - /* - * Read SLRU stats struct - */ - if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", statfile))); - FreeFile(fpin); - return false; - } - - /* By default, we're going to return the timestamp of the global file. */ - *ts = myGlobalStats.stats_timestamp; - - /* - * We found an existing collector stats file. Read it and look for a - * record for the requested database. If found, use its timestamp. - */ - for (;;) - { - switch (fgetc(fpin)) - { - /* - * 'D' A PgStat_StatDBEntry struct describing a database - * follows. - */ - case 'D': - if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables), - fpin) != offsetof(PgStat_StatDBEntry, tables)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - FreeFile(fpin); - return false; - } - - /* - * If this is the DB we're looking for, save its timestamp and - * we're done. - */ - if (dbentry.databaseid == databaseid) - { - *ts = dbentry.stats_timestamp; - goto done; - } - - break; - - /* - * 'R' A PgStat_StatReplSlotEntry struct describing a - * replication slot follows. - */ - case 'R': - if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin) - != sizeof(PgStat_StatReplSlotEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - FreeFile(fpin); - return false; - } - break; - - /* - * 'S' A PgStat_StatSubEntry struct describing subscription - * statistics follows. - */ - case 'S': - if (fread(&mySubStats, 1, sizeof(PgStat_StatSubEntry), fpin) - != sizeof(PgStat_StatSubEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - FreeFile(fpin); - return false; } - break; - case 'E': goto done; default: - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - FreeFile(fpin); - return false; - } + goto error; } } done: FreeFile(fpin); - return true; -} - -/* - * If not already done, read the statistics collector stats file into - * some hash tables. The results will be kept until pgstat_clear_snapshot() - * is called (typically, at end of transaction). - */ -static void -backend_read_statsfile(void) -{ - TimestampTz min_ts = 0; - TimestampTz ref_ts = 0; - Oid inquiry_db; - int count; - - pgstat_assert_is_up(); - - /* already read it? */ - if (pgStatDBHash) - return; - Assert(!pgStatRunningInCollector); - - /* - * In a normal backend, we check staleness of the data for our own DB, and - * so we send MyDatabaseId in inquiry messages. In the autovac launcher, - * check staleness of the shared-catalog data, and send InvalidOid in - * inquiry messages so as not to force writing unnecessary data. - */ - if (IsAutoVacuumLauncherProcess()) - inquiry_db = InvalidOid; - else - inquiry_db = MyDatabaseId; - - /* - * Loop until fresh enough stats file is available or we ran out of time. - * The stats inquiry message is sent repeatedly in case collector drops - * it; but not every single time, as that just swamps the collector. - */ - for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++) - { - bool ok; - TimestampTz file_ts = 0; - TimestampTz cur_ts; - - CHECK_FOR_INTERRUPTS(); - - ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts); - - cur_ts = GetCurrentTimestamp(); - /* Calculate min acceptable timestamp, if we didn't already */ - if (count == 0 || cur_ts < ref_ts) - { - /* - * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL - * msec before now. This indirectly ensures that the collector - * needn't write the file more often than PGSTAT_STAT_INTERVAL. In - * an autovacuum worker, however, we want a lower delay to avoid - * using stale data, so we use PGSTAT_RETRY_DELAY (since the - * number of workers is low, this shouldn't be a problem). - * - * We don't recompute min_ts after sleeping, except in the - * unlikely case that cur_ts went backwards. So we might end up - * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In - * practice that shouldn't happen, though, as long as the sleep - * time is less than PGSTAT_STAT_INTERVAL; and we don't want to - * tell the collector that our cutoff time is less than what we'd - * actually accept. - */ - ref_ts = cur_ts; - if (IsAutoVacuumWorkerProcess()) - min_ts = TimestampTzPlusMilliseconds(ref_ts, - -PGSTAT_RETRY_DELAY); - else - min_ts = TimestampTzPlusMilliseconds(ref_ts, - -PGSTAT_STAT_INTERVAL); - } - - /* - * If the file timestamp is actually newer than cur_ts, we must have - * had a clock glitch (system time went backwards) or there is clock - * skew between our processor and the stats collector's processor. - * Accept the file, but send an inquiry message anyway to make - * pgstat_recv_inquiry do a sanity check on the collector's time. - */ - if (ok && file_ts > cur_ts) - { - /* - * A small amount of clock skew between processors isn't terribly - * surprising, but a large difference is worth logging. We - * arbitrarily define "large" as 1000 msec. - */ - if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000)) - { - char *filetime; - char *mytime; - - /* Copy because timestamptz_to_str returns a static buffer */ - filetime = pstrdup(timestamptz_to_str(file_ts)); - mytime = pstrdup(timestamptz_to_str(cur_ts)); - ereport(LOG, - (errmsg("statistics collector's time %s is later than backend local time %s", - filetime, mytime))); - pfree(filetime); - pfree(mytime); - } - - pgstat_send_inquiry(cur_ts, min_ts, inquiry_db); - break; - } - - /* Normal acceptance case: file is not older than cutoff time */ - if (ok && file_ts >= min_ts) - break; - - /* Not there or too old, so kick the collector and wait a bit */ - if ((count % PGSTAT_INQ_LOOP_COUNT) == 0) - pgstat_send_inquiry(cur_ts, min_ts, inquiry_db); - - pg_usleep(PGSTAT_RETRY_DELAY * 1000L); - } - - if (count >= PGSTAT_POLL_LOOP_COUNT) - ereport(LOG, - (errmsg("using stale statistics instead of current ones " - "because stats collector is not responding"))); - - /* - * Autovacuum launcher wants stats about all databases, but a shallow read - * is sufficient. Regular backends want a deep read for just the tables - * they can see (MyDatabaseId + shared catalogs). - */ - if (IsAutoVacuumLauncherProcess()) - pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false); - else - pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true); -} - -/* - * Do we need to write out any stats files? - */ -static bool -pgstat_write_statsfile_needed(void) -{ - if (pending_write_requests != NIL) - return true; - - /* Everything was written recently */ - return false; -} - -/* - * Checks whether stats for a particular DB need to be written to a file. - */ -static bool -pgstat_db_requested(Oid databaseid) -{ - /* - * If any requests are outstanding at all, we should write the stats for - * shared catalogs (the "database" with OID 0). This ensures that - * backends will see up-to-date stats for shared catalogs, even though - * they send inquiry messages mentioning only their own DB. - */ - if (databaseid == InvalidOid && pending_write_requests != NIL) - return true; - - /* Search to see if there's an open request to write this database. */ - if (list_member_oid(pending_write_requests, databaseid)) - return true; - - return false; -} - - -/* ------------------------------------------------------------ - * stats collector message processing functions - * ------------------------------------------------------------ - */ - -/* - * Process stat inquiry requests. - */ -static void -pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - elog(DEBUG2, "received inquiry for database %u", msg->databaseid); - - /* - * If there's already a write request for this DB, there's nothing to do. - * - * Note that if a request is found, we return early and skip the below - * check for clock skew. This is okay, since the only way for a DB - * request to be present in the list is that we have been here since the - * last write round. It seems sufficient to check for clock skew once per - * write round. - */ - if (list_member_oid(pending_write_requests, msg->databaseid)) - return; - - /* - * Check to see if we last wrote this database at a time >= the requested - * cutoff time. If so, this is a stale request that was generated before - * we updated the DB file, and we don't need to do so again. - * - * If the requestor's local clock time is older than stats_timestamp, we - * should suspect a clock glitch, ie system time going backwards; though - * the more likely explanation is just delayed message receipt. It is - * worth expending a GetCurrentTimestamp call to be sure, since a large - * retreat in the system clock reading could otherwise cause us to neglect - * to update the stats file for a long time. - */ - dbentry = pgstat_get_db_entry(msg->databaseid, false); - if (dbentry == NULL) - { - /* - * We have no data for this DB. Enter a write request anyway so that - * the global stats will get updated. This is needed to prevent - * backend_read_statsfile from waiting for data that we cannot supply, - * in the case of a new DB that nobody has yet reported any stats for. - * See the behavior of pgstat_read_db_statsfile_timestamp. - */ - } - else if (msg->clock_time < dbentry->stats_timestamp) - { - TimestampTz cur_ts = GetCurrentTimestamp(); - - if (cur_ts < dbentry->stats_timestamp) - { - /* - * Sure enough, time went backwards. Force a new stats file write - * to get back in sync; but first, log a complaint. - */ - char *writetime; - char *mytime; - - /* Copy because timestamptz_to_str returns a static buffer */ - writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp)); - mytime = pstrdup(timestamptz_to_str(cur_ts)); - ereport(LOG, - (errmsg("stats_timestamp %s is later than collector's time %s for database %u", - writetime, mytime, dbentry->databaseid))); - pfree(writetime); - pfree(mytime); - } - else - { - /* - * Nope, it's just an old request. Assuming msg's clock_time is - * >= its cutoff_time, it must be stale, so we can ignore it. - */ - return; - } - } - else if (msg->cutoff_time <= dbentry->stats_timestamp) - { - /* Stale request, ignore it */ - return; - } - - /* - * We need to write this DB, so create a request. - */ - pending_write_requests = lappend_oid(pending_write_requests, - msg->databaseid); -} - -/* - * Count what the backend has done. - */ -static void -pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatTabEntry *tabentry; - int i; - bool found; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - /* - * Update database-wide stats. - */ - dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit); - dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback); - dbentry->n_block_read_time += msg->m_block_read_time; - dbentry->n_block_write_time += msg->m_block_write_time; - - dbentry->total_session_time += msg->m_session_time; - dbentry->total_active_time += msg->m_active_time; - dbentry->total_idle_in_xact_time += msg->m_idle_in_xact_time; - - /* - * Process all table entries in the message. - */ - for (i = 0; i < msg->m_nentries; i++) - { - PgStat_TableEntry *tabmsg = &(msg->m_entry[i]); - - tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, - (void *) &(tabmsg->t_id), - HASH_ENTER, &found); - - if (!found) - { - /* - * If it's a new table entry, initialize counters to the values we - * just got. - */ - tabentry->numscans = tabmsg->t_counts.t_numscans; - tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned; - tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched; - tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted; - tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated; - tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted; - tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated; - tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples; - tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples; - tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples; - tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted; - tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched; - tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit; - - tabentry->vacuum_timestamp = 0; - tabentry->vacuum_count = 0; - tabentry->autovac_vacuum_timestamp = 0; - tabentry->autovac_vacuum_count = 0; - tabentry->analyze_timestamp = 0; - tabentry->analyze_count = 0; - tabentry->autovac_analyze_timestamp = 0; - tabentry->autovac_analyze_count = 0; - } - else - { - /* - * Otherwise add the values to the existing entry. - */ - tabentry->numscans += tabmsg->t_counts.t_numscans; - tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned; - tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched; - tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted; - tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated; - tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted; - tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated; - - /* - * If table was truncated/dropped, first reset the live/dead - * counters. - */ - if (tabmsg->t_counts.t_truncdropped) - { - tabentry->n_live_tuples = 0; - tabentry->n_dead_tuples = 0; - tabentry->inserts_since_vacuum = 0; - } - tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples; - tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples; - tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples; - tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted; - tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched; - tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit; - } - - /* Clamp n_live_tuples in case of negative delta_live_tuples */ - tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0); - /* Likewise for n_dead_tuples */ - tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0); - - /* - * Add per-table stats to the per-database entry, too. - */ - dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned; - dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched; - dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted; - dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated; - dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted; - dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched; - dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit; - } -} - -/* - * Arrange for dead table removal. - */ -static void -pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - int i; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - /* - * No need to purge if we don't even know the database. - */ - if (!dbentry || !dbentry->tables) - return; - - /* - * Process all table entries in the message. - */ - for (i = 0; i < msg->m_nentries; i++) - { - /* Remove from hashtable if present; we don't care if it's not. */ - (void) hash_search(dbentry->tables, - (void *) &(msg->m_tableid[i]), - HASH_REMOVE, NULL); - } -} - -/* - * Arrange for dead database removal - */ -static void -pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) -{ - Oid dbid = msg->m_databaseid; - PgStat_StatDBEntry *dbentry; - - /* - * Lookup the database in the hashtable. - */ - dbentry = pgstat_get_db_entry(dbid, false); - - /* - * If found, remove it (along with the db statfile). - */ - if (dbentry) - { - char statfile[MAXPGPATH]; - - get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH); - - elog(DEBUG2, "removing stats file \"%s\"", statfile); - unlink(statfile); - - if (dbentry->tables != NULL) - hash_destroy(dbentry->tables); - if (dbentry->functions != NULL) - hash_destroy(dbentry->functions); - - if (hash_search(pgStatDBHash, - (void *) &dbid, - HASH_REMOVE, NULL) == NULL) - ereport(ERROR, - (errmsg("database hash table corrupted during cleanup --- abort"))); - } -} - -/* - * Reset the statistics for the specified database. - */ -static void -pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - /* - * Lookup the database in the hashtable. Nothing to do if not there. - */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - if (!dbentry) - return; - - /* - * We simply throw away all the database's table entries by recreating a - * new hash table for them. - */ - if (dbentry->tables != NULL) - hash_destroy(dbentry->tables); - if (dbentry->functions != NULL) - hash_destroy(dbentry->functions); - - dbentry->tables = NULL; - dbentry->functions = NULL; - - /* - * Reset database-level stats, too. This creates empty hash tables for - * tables and functions. - */ - reset_dbentry_counters(dbentry); -} - -/* - * Reset some shared statistics of the cluster. - */ -static void -pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len) -{ - if (msg->m_resettarget == PGSTAT_KIND_BGWRITER || - msg->m_resettarget == PGSTAT_KIND_CHECKPOINTER) - { - /* - * Reset the global, bgwriter and checkpointer statistics for the - * cluster. - */ - memset(&globalStats, 0, sizeof(globalStats)); - globalStats.bgwriter.stat_reset_timestamp = GetCurrentTimestamp(); - } - else if (msg->m_resettarget == PGSTAT_KIND_ARCHIVER) - { - /* Reset the archiver statistics for the cluster. */ - memset(&archiverStats, 0, sizeof(archiverStats)); - archiverStats.stat_reset_timestamp = GetCurrentTimestamp(); - } - else if (msg->m_resettarget == PGSTAT_KIND_WAL) - { - /* Reset the WAL statistics for the cluster. */ - memset(&walStats, 0, sizeof(walStats)); - walStats.stat_reset_timestamp = GetCurrentTimestamp(); - } - - /* - * Presumably the sender of this message validated the target, don't - * complain here if it's not valid - */ -} - -/* - * Reset a statistics for a single object, which may be of current - * database or shared across all databases in the cluster. - */ -static void -pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - if (IsSharedRelation(msg->m_objectid)) - dbentry = pgstat_get_db_entry(InvalidOid, false); - else - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - if (!dbentry) - return; - - /* Set the reset timestamp for the whole database */ - dbentry->stat_reset_timestamp = GetCurrentTimestamp(); - - /* Remove object if it exists, ignore it if not */ - if (msg->m_resettype == PGSTAT_KIND_RELATION) - (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid), - HASH_REMOVE, NULL); - else if (msg->m_resettype == PGSTAT_KIND_FUNCTION) - (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), - HASH_REMOVE, NULL); -} - -/* - * Reset some SLRU statistics of the cluster. - */ -static void -pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len) -{ - int i; - TimestampTz ts = GetCurrentTimestamp(); - - for (i = 0; i < SLRU_NUM_ELEMENTS; i++) - { - /* reset entry with the given index, or all entries (index is -1) */ - if ((msg->m_index == -1) || (msg->m_index == i)) - { - memset(&slruStats[i], 0, sizeof(slruStats[i])); - slruStats[i].stat_reset_timestamp = ts; - } - } -} - -/* - * Reset some replication slot statistics of the cluster. - */ -static void -pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, - int len) -{ - PgStat_StatReplSlotEntry *slotent; - TimestampTz ts; - - /* Return if we don't have replication slot statistics */ - if (replSlotStatHash == NULL) - return; - - ts = GetCurrentTimestamp(); - if (msg->clearall) - { - HASH_SEQ_STATUS sstat; - - hash_seq_init(&sstat, replSlotStatHash); - while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL) - pgstat_reset_replslot_entry(slotent, ts); - } - else - { - /* Get the slot statistics to reset */ - slotent = pgstat_get_replslot_entry(msg->m_slotname, false); - - /* - * Nothing to do if the given slot entry is not found. This could - * happen when the slot with the given name is removed and the - * corresponding statistics entry is also removed before receiving the - * reset message. - */ - if (!slotent) - return; - - /* Reset the stats for the requested replication slot */ - pgstat_reset_replslot_entry(slotent, ts); - } -} - -/* - * Reset some subscription statistics of the cluster. - */ -static void -pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len) -{ - PgStat_StatSubEntry *subentry; - TimestampTz ts; - - /* Return if we don't have replication subscription statistics */ - if (subscriptionStatHash == NULL) - return; - - ts = GetCurrentTimestamp(); - if (!OidIsValid(msg->m_subid)) - { - HASH_SEQ_STATUS sstat; - - /* Clear all subscription counters */ - hash_seq_init(&sstat, subscriptionStatHash); - while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL) - pgstat_reset_subscription(subentry, ts); - } - else - { - /* Get the subscription statistics to reset */ - subentry = pgstat_get_subscription_entry(msg->m_subid, false); - - /* - * Nothing to do if the given subscription entry is not found. This - * could happen when the subscription with the subid is removed and - * the corresponding statistics entry is also removed before receiving - * the reset message. - */ - if (!subentry) - return; - - /* Reset the stats for the requested subscription */ - pgstat_reset_subscription(subentry, ts); - } -} - -/* - * Process an autovacuum signaling message. - */ -static void -pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - /* - * Store the last autovacuum time in the database's hashtable entry. - */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - dbentry->last_autovac_time = msg->m_start_time; -} - -/* - * Process a VACUUM message. - */ -static void -pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatTabEntry *tabentry; - - /* - * Store the data in the table's hashtable entry. - */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true); - tabentry->n_live_tuples = msg->m_live_tuples; - tabentry->n_dead_tuples = msg->m_dead_tuples; + elog(DEBUG2, "removing permanent stats file \"%s\"", statfile); + unlink(statfile); - /* - * It is quite possible that a non-aggressive VACUUM ended up skipping - * various pages, however, we'll zero the insert counter here regardless. - * It's currently used only to track when we need to perform an "insert" - * autovacuum, which are mainly intended to freeze newly inserted tuples. - * Zeroing this may just mean we'll not try to vacuum the table again - * until enough tuples have been inserted to trigger another insert - * autovacuum. An anti-wraparound autovacuum will catch any persistent - * stragglers. - */ - tabentry->inserts_since_vacuum = 0; - - if (msg->m_autovacuum) - { - tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime; - tabentry->autovac_vacuum_count++; - } - else - { - tabentry->vacuum_timestamp = msg->m_vacuumtime; - tabentry->vacuum_count++; - } -} - -/* - * Process an ANALYZE message. - */ -static void -pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatTabEntry *tabentry; - - /* - * Store the data in the table's hashtable entry. - */ - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true); - - tabentry->n_live_tuples = msg->m_live_tuples; - tabentry->n_dead_tuples = msg->m_dead_tuples; - - /* - * If commanded, reset changes_since_analyze to zero. This forgets any - * changes that were committed while the ANALYZE was in progress, but we - * have no good way to estimate how many of those there were. - */ - if (msg->m_resetcounter) - tabentry->changes_since_analyze = 0; - - if (msg->m_autovacuum) - { - tabentry->autovac_analyze_timestamp = msg->m_analyzetime; - tabentry->autovac_analyze_count++; - } - else - { - tabentry->analyze_timestamp = msg->m_analyzetime; - tabentry->analyze_count++; - } -} - -/* - * Process a ARCHIVER message. - */ -static void -pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len) -{ - if (msg->m_failed) - { - /* Failed archival attempt */ - ++archiverStats.failed_count; - memcpy(archiverStats.last_failed_wal, msg->m_xlog, - sizeof(archiverStats.last_failed_wal)); - archiverStats.last_failed_timestamp = msg->m_timestamp; - } - else - { - /* Successful archival operation */ - ++archiverStats.archived_count; - memcpy(archiverStats.last_archived_wal, msg->m_xlog, - sizeof(archiverStats.last_archived_wal)); - archiverStats.last_archived_timestamp = msg->m_timestamp; - } -} - -/* - * Process a BGWRITER message. - */ -static void -pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len) -{ - globalStats.bgwriter.buf_written_clean += msg->m_buf_written_clean; - globalStats.bgwriter.maxwritten_clean += msg->m_maxwritten_clean; - globalStats.bgwriter.buf_alloc += msg->m_buf_alloc; -} - -/* - * Process a CHECKPOINTER message. - */ -static void -pgstat_recv_checkpointer(PgStat_MsgCheckpointer *msg, int len) -{ - globalStats.checkpointer.timed_checkpoints += msg->m_timed_checkpoints; - globalStats.checkpointer.requested_checkpoints += msg->m_requested_checkpoints; - globalStats.checkpointer.checkpoint_write_time += msg->m_checkpoint_write_time; - globalStats.checkpointer.checkpoint_sync_time += msg->m_checkpoint_sync_time; - globalStats.checkpointer.buf_written_checkpoints += msg->m_buf_written_checkpoints; - globalStats.checkpointer.buf_written_backend += msg->m_buf_written_backend; - globalStats.checkpointer.buf_fsync_backend += msg->m_buf_fsync_backend; -} - -/* - * Process a WAL message. - */ -static void -pgstat_recv_wal(PgStat_MsgWal *msg, int len) -{ - walStats.wal_records += msg->m_wal_records; - walStats.wal_fpi += msg->m_wal_fpi; - walStats.wal_bytes += msg->m_wal_bytes; - walStats.wal_buffers_full += msg->m_wal_buffers_full; - walStats.wal_write += msg->m_wal_write; - walStats.wal_sync += msg->m_wal_sync; - walStats.wal_write_time += msg->m_wal_write_time; - walStats.wal_sync_time += msg->m_wal_sync_time; -} - -/* - * Process a SLRU message. - */ -static void -pgstat_recv_slru(PgStat_MsgSLRU *msg, int len) -{ - slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed; - slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit; - slruStats[msg->m_index].blocks_read += msg->m_blocks_read; - slruStats[msg->m_index].blocks_written += msg->m_blocks_written; - slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists; - slruStats[msg->m_index].flush += msg->m_flush; - slruStats[msg->m_index].truncate += msg->m_truncate; -} - -/* - * Process a RECOVERYCONFLICT message. - */ -static void -pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - switch (msg->m_reason) - { - case PROCSIG_RECOVERY_CONFLICT_DATABASE: - - /* - * Since we drop the information about the database as soon as it - * replicates, there is no point in counting these conflicts. - */ - break; - case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: - dbentry->n_conflict_tablespace++; - break; - case PROCSIG_RECOVERY_CONFLICT_LOCK: - dbentry->n_conflict_lock++; - break; - case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: - dbentry->n_conflict_snapshot++; - break; - case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: - dbentry->n_conflict_bufferpin++; - break; - case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: - dbentry->n_conflict_startup_deadlock++; - break; - } -} - -/* - * Process a DEADLOCK message. - */ -static void -pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - dbentry->n_deadlocks++; -} - -/* - * Process a CHECKSUMFAILURE message. - */ -static void -pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - dbentry->n_checksum_failures += msg->m_failurecount; - dbentry->last_checksum_failure = msg->m_failure_time; -} - -/* - * Process a REPLSLOT message. - */ -static void -pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) -{ - if (msg->m_drop) - { - Assert(!msg->m_create); - - /* Remove the replication slot statistics with the given name */ - if (replSlotStatHash != NULL) - (void) hash_search(replSlotStatHash, - (void *) &(msg->m_slotname), - HASH_REMOVE, - NULL); - } - else - { - PgStat_StatReplSlotEntry *slotent; - - slotent = pgstat_get_replslot_entry(msg->m_slotname, true); - Assert(slotent); - - if (msg->m_create) - { - /* - * If the message for dropping the slot with the same name gets - * lost, slotent has stats for the old slot. So we initialize all - * counters at slot creation. - */ - pgstat_reset_replslot_entry(slotent, 0); - } - else - { - /* Update the replication slot statistics */ - slotent->spill_txns += msg->m_spill_txns; - slotent->spill_count += msg->m_spill_count; - slotent->spill_bytes += msg->m_spill_bytes; - slotent->stream_txns += msg->m_stream_txns; - slotent->stream_count += msg->m_stream_count; - slotent->stream_bytes += msg->m_stream_bytes; - slotent->total_txns += msg->m_total_txns; - slotent->total_bytes += msg->m_total_bytes; - } - } -} - -/* - * Process a CONNECT message. - */ -static void -pgstat_recv_connect(PgStat_MsgConnect *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - dbentry->n_sessions++; -} - -/* - * Process a DISCONNECT message. - */ -static void -pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - switch (msg->m_cause) - { - case DISCONNECT_NOT_YET: - case DISCONNECT_NORMAL: - /* we don't collect these */ - break; - case DISCONNECT_CLIENT_EOF: - dbentry->n_sessions_abandoned++; - break; - case DISCONNECT_FATAL: - dbentry->n_sessions_fatal++; - break; - case DISCONNECT_KILLED: - dbentry->n_sessions_killed++; - break; - } -} + return; -/* - * Process a TEMPFILE message. - */ -static void -pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len) -{ - PgStat_StatDBEntry *dbentry; +error: + ereport(LOG, + (errmsg("corrupted statistics file \"%s\"", statfile))); - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + /* Set the current timestamp as reset timestamp */ + pgstat_reset_after_failure(ts); - dbentry->n_temp_bytes += msg->m_filesize; - dbentry->n_temp_files += 1; + goto done; } /* - * Count what the backend has done. + * Helper to reset / drop stats after restoring stats from disk failed, + * potentially after already loading parts. */ static void -pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len) +pgstat_reset_after_failure(TimestampTz ts) { - PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]); - PgStat_StatDBEntry *dbentry; - PgStat_StatFuncEntry *funcentry; - int i; - bool found; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); - - /* - * Process all function entries in the message. - */ - for (i = 0; i < msg->m_nentries; i++, funcmsg++) + /* reset fixed-numbered stats */ + for (int kind = PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++) { - funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions, - (void *) &(funcmsg->f_id), - HASH_ENTER, &found); - - if (!found) - { - /* - * If it's a new function entry, initialize counters to the values - * we just got. - */ - funcentry->f_numcalls = funcmsg->f_numcalls; - funcentry->f_total_time = funcmsg->f_total_time; - funcentry->f_self_time = funcmsg->f_self_time; - } - else - { - /* - * Otherwise add the values to the existing entry. - */ - funcentry->f_numcalls += funcmsg->f_numcalls; - funcentry->f_total_time += funcmsg->f_total_time; - funcentry->f_self_time += funcmsg->f_self_time; - } - } -} + const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind); -/* - * Arrange for dead function removal. - */ -static void -pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) -{ - PgStat_StatDBEntry *dbentry; - int i; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - /* - * No need to purge if we don't even know the database. - */ - if (!dbentry || !dbentry->functions) - return; + if (!kind_info->fixed_amount) + continue; - /* - * Process all function entries in the message. - */ - for (i = 0; i < msg->m_nentries; i++) - { - /* Remove from hashtable if present; we don't care if it's not. */ - (void) hash_search(dbentry->functions, - (void *) &(msg->m_functionid[i]), - HASH_REMOVE, NULL); + kind_info->reset_all_cb(ts); } -} -/* - * Process a SUBSCRIPTIONDROP message. - */ -static void -pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len) -{ - /* Return if we don't have replication subscription statistics */ - if (subscriptionStatHash == NULL) - return; - - /* Remove from hashtable if present; we don't care if it's not */ - (void) hash_search(subscriptionStatHash, (void *) &(msg->m_subid), - HASH_REMOVE, NULL); -} - -/* - * Process a SUBSCRIPTIONERROR message. - */ -static void -pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len) -{ - PgStat_StatSubEntry *subentry; - - /* Get the subscription stats */ - subentry = pgstat_get_subscription_entry(msg->m_subid, true); - Assert(subentry); - - if (msg->m_is_apply_error) - subentry->apply_error_count++; - else - subentry->sync_error_count++; + /* and drop variable-numbered ones */ + pgstat_drop_all_entries(); } |