diff options
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 656 |
1 files changed, 307 insertions, 349 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f530985..53ddd930e6e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,7 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 -#define PGSTAT_SUBWORKER_HASH_SIZE 32 +#define PGSTAT_SUBSCRIPTION_HASH_SIZE 32 #define PGSTAT_REPLSLOT_HASH_SIZE 32 @@ -284,6 +284,7 @@ static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; +static HTAB *subscriptionStatHash = NULL; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -322,14 +323,13 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); -static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, - Oid subid, Oid subrelid, - bool create); +static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid, bool create); +static void pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts); static void pgstat_write_statsfiles(bool permanent, bool allDbs); static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - HTAB *subworkerhash, bool permanent); + bool permanent); static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); @@ -341,7 +341,6 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); -static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); static void pgstat_report_disconnect(Oid dboid); @@ -363,6 +362,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); +static void pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -380,8 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); -static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); -static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); +static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len); +static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1188,6 +1188,32 @@ pgstat_vacuum_stat(void) } /* + * Repeat the above steps for subscriptions, if subscription stats are + * being collected. + */ + if (subscriptionStatHash) + { + PgStat_StatSubEntry *subentry; + + /* + * Read pg_subscription and make a list of OIDs of all existing + * subscriptions. + */ + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + hash_seq_init(&hstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL) + pgstat_report_subscription_drop(subentry->subid); + } + + hash_destroy(htab); + } + + /* * Lookup our own database entry; if not found, nothing more to do. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, @@ -1311,74 +1337,6 @@ pgstat_vacuum_stat(void) hash_destroy(htab); } - - /* - * Repeat for subscription workers. Similarly, we needn't bother in the - * common case where no subscription workers' stats are being collected. - */ - if (dbentry->subworkers != NULL && - hash_get_num_entries(dbentry->subworkers) > 0) - { - PgStat_StatSubWorkerEntry *subwentry; - PgStat_MsgSubscriptionPurge spmsg; - - /* - * Read pg_subscription and make a list of OIDs of all existing - * subscriptions - */ - htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); - - spmsg.m_databaseid = MyDatabaseId; - spmsg.m_nentries = 0; - - hash_seq_init(&hstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) - { - bool exists = false; - Oid subid = subwentry->key.subid; - - CHECK_FOR_INTERRUPTS(); - - if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL) - continue; - - /* - * It is possible that we have multiple entries for the - * subscription corresponding to apply worker and tablesync - * workers. In such cases, we don't need to add the same subid - * again. - */ - for (int i = 0; i < spmsg.m_nentries; i++) - { - if (spmsg.m_subids[i] == subid) - { - exists = true; - break; - } - } - - if (exists) - continue; - - /* This subscription is dead, add the subid to the message */ - spmsg.m_subids[spmsg.m_nentries++] = subid; - - /* - * If the message is full, send it out and reinitialize to empty - */ - if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) - { - pgstat_send_subscription_purge(&spmsg); - spmsg.m_nentries = 0; - } - } - - /* Send the rest of dead subscriptions */ - if (spmsg.m_nentries > 0) - pgstat_send_subscription_purge(&spmsg); - - hash_destroy(htab); - } } @@ -1551,8 +1509,7 @@ pgstat_reset_shared_counters(const char *target) * ---------- */ void -pgstat_reset_single_counter(Oid objoid, Oid subobjoid, - PgStat_Single_Reset_Type type) +pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) { PgStat_MsgResetsinglecounter msg; @@ -1563,7 +1520,6 @@ pgstat_reset_single_counter(Oid objoid, Oid subobjoid, msg.m_databaseid = MyDatabaseId; msg.m_resettype = type; msg.m_objectid = objoid; - msg.m_subobjectid = subobjoid; pgstat_send(&msg, sizeof(msg)); } @@ -1624,6 +1580,30 @@ pgstat_reset_replslot_counter(const char *name) } /* ---------- + * pgstat_reset_subscription_counter() - + * + * Tell the statistics collector to reset a single subscription + * counter, or all subscription counters (when subid is InvalidOid). + * + * Permission checking for this function is managed through the normal + * GRANT system. + * ---------- + */ +void +pgstat_reset_subscription_counter(Oid subid) +{ + PgStat_MsgResetsubcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + msg.m_subid = subid; + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER); + + pgstat_send(&msg, sizeof(msg)); +} + +/* ---------- * pgstat_report_autovac() - * * Called from autovacuum.c to report startup of an autovacuum process. @@ -1949,31 +1929,20 @@ pgstat_report_replslot_drop(const char *slotname) } /* ---------- - * pgstat_report_subworker_error() - + * pgstat_report_subscription_error() - * - * Tell the collector about the subscription worker error. + * Tell the collector about the subscription error. * ---------- */ void -pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, - LogicalRepMsgType command, TransactionId xid, - const char *errmsg) +pgstat_report_subscription_error(Oid subid, bool is_apply_error) { - PgStat_MsgSubWorkerError msg; - int len; + PgStat_MsgSubscriptionError msg; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); - msg.m_databaseid = MyDatabaseId; + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR); msg.m_subid = subid; - msg.m_subrelid = subrelid; - msg.m_relid = relid; - msg.m_command = command; - msg.m_xid = xid; - msg.m_timestamp = GetCurrentTimestamp(); - strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); - - len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1; - pgstat_send(&msg, len); + msg.m_is_apply_error = is_apply_error; + pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError)); } /* ---------- @@ -1985,12 +1954,11 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, void pgstat_report_subscription_drop(Oid subid) { - PgStat_MsgSubscriptionPurge msg; + PgStat_MsgSubscriptionDrop msg; - msg.m_databaseid = MyDatabaseId; - msg.m_subids[0] = subid; - msg.m_nentries = 1; - pgstat_send_subscription_purge(&msg); + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP); + msg.m_subid = subid; + pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop)); } /* ---------- @@ -3000,36 +2968,6 @@ pgstat_fetch_stat_funcentry(Oid func_id) /* * --------- - * pgstat_fetch_stat_subworker_entry() - - * - * Support function for the SQL-callable pgstat* functions. Returns - * the collected statistics for subscription worker or NULL. - * --------- - */ -PgStat_StatSubWorkerEntry * -pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid) -{ - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *wentry = NULL; - - /* Load the stats file if needed */ - backend_read_statsfile(); - - /* - * Lookup our database, then find the requested subscription worker stats. - */ - dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); - if (dbentry != NULL && dbentry->subworkers != NULL) - { - wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid, - false); - } - - return wentry; -} - -/* - * --------- * pgstat_fetch_stat_archiver() - * * Support function for the SQL-callable pgstat* functions. Returns @@ -3141,6 +3079,23 @@ pgstat_fetch_replslot(NameData slotname) } /* + * --------- + * pgstat_fetch_stat_subscription() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + * --------- + */ +PgStat_StatSubEntry * +pgstat_fetch_stat_subscription(Oid subid) +{ + /* Load the stats file if needed */ + backend_read_statsfile(); + + return pgstat_get_subscription_entry(subid, false); +} + +/* * Shut down a single backend's statistics reporting at process exit. * * Flush any remaining statistics counts out to the collector. @@ -3465,24 +3420,6 @@ pgstat_send_slru(void) } } -/* -------- - * pgstat_send_subscription_purge() - - * - * Send a subscription purge message to the collector - * -------- - */ -static void -pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg) -{ - int len; - - len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) - + msg->m_nentries * sizeof(Oid); - - pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); - pgstat_send(msg, len); -} - /* ---------- * PgstatCollectorMain() - * @@ -3668,6 +3605,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETSUBCOUNTER: + pgstat_recv_resetsubcounter(&msg.msg_resetsubcounter, len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -3738,12 +3679,12 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; - case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: - pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + case PGSTAT_MTYPE_SUBSCRIPTIONDROP: + pgstat_recv_subscription_drop(&msg.msg_subscriptiondrop, len); break; - case PGSTAT_MTYPE_SUBWORKERERROR: - pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + case PGSTAT_MTYPE_SUBSCRIPTIONERROR: + pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len); break; default: @@ -3791,8 +3732,7 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Subroutine to clear stats in a database entry * - * Tables, functions, and subscription workers hashes are initialized - * to empty. + * Tables and functions hashes are initialized to empty. */ static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry) @@ -3845,13 +3785,6 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) PGSTAT_FUNCTION_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS); - - hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); - hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); - dbentry->subworkers = hash_create("Per-database subscription worker", - PGSTAT_SUBWORKER_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS); } /* @@ -3876,7 +3809,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) /* * If not found, initialize the new one. This creates empty hash tables - * for tables, functions, and subscription workers, too. + * for tables and functions, too. */ if (!found) reset_dbentry_counters(result); @@ -3935,48 +3868,6 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) } /* ---------- - * pgstat_get_subworker_entry - * - * Return subscription worker entry with the given subscription OID and - * relation OID. If subrelid is InvalidOid, it returns an entry of the - * apply worker otherwise returns an entry of the table sync worker - * associated with subrelid. If no subscription worker entry exists, - * initialize it, if the create parameter is true. Else, return NULL. - * ---------- - */ -static PgStat_StatSubWorkerEntry * -pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, - bool create) -{ - PgStat_StatSubWorkerEntry *subwentry; - PgStat_StatSubWorkerKey key; - bool found; - HASHACTION action = (create ? HASH_ENTER : HASH_FIND); - - key.subid = subid; - key.subrelid = subrelid; - subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers, - (void *) &key, - action, &found); - - if (!create && !found) - return NULL; - - /* If not found, initialize the new one */ - if (!found) - { - subwentry->last_error_relid = InvalidOid; - subwentry->last_error_command = 0; - subwentry->last_error_xid = InvalidTransactionId; - subwentry->last_error_count = 0; - subwentry->last_error_time = 0; - subwentry->last_error_message[0] = '\0'; - } - - return subwentry; -} - -/* ---------- * pgstat_write_statsfiles() - * Write the global statistics file, as well as requested DB files. * @@ -4059,8 +3950,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) { /* - * Write out the table, function, and subscription-worker stats for - * this DB into the appropriate per-DB stat file, if required. + * Write out the table and function stats for this DB into the + * appropriate per-DB stat file, if required. */ if (allDbs || pgstat_db_requested(dbentry->databaseid)) { @@ -4096,6 +3987,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } /* + * Write subscription stats struct + */ + if (subscriptionStatHash) + { + PgStat_StatSubEntry *subentry; + + hash_seq_init(&hstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + } + + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error * after each individual fputc or fwrite above. @@ -4174,10 +4081,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) { HASH_SEQ_STATUS tstat; HASH_SEQ_STATUS fstat; - HASH_SEQ_STATUS sstat; PgStat_StatTabEntry *tabentry; PgStat_StatFuncEntry *funcentry; - PgStat_StatSubWorkerEntry *subwentry; FILE *fpout; int32 format_id; Oid dbid = dbentry->databaseid; @@ -4233,17 +4138,6 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) } /* - * Walk through the database's subscription worker stats table. - */ - hash_seq_init(&sstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) - { - fputc('S', fpout); - rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); - (void) rc; /* we'll check for error with ferror */ - } - - /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error * after each individual fputc or fwrite above. @@ -4301,9 +4195,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) * files after reading; the in-memory status is now authoritative, and the * files would be out of date in case somebody else reads them. * - * If a 'deep' read is requested, table/function/subscription-worker stats are - * read, otherwise the table/function/subscription-worker hash tables remain - * empty. + * If a 'deep' read is requested, table/function stats are read, otherwise + * the table/function hash tables remain empty. * ---------- */ static HTAB * @@ -4482,7 +4375,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->functions = NULL; - dbentry->subworkers = NULL; /* * In the collector, disregard the timestamp we read from the @@ -4494,8 +4386,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbentry->stats_timestamp = 0; /* - * Don't create tables/functions/subworkers hashtables for - * uninteresting databases. + * Don't create tables/functions hashtables for uninteresting + * databases. */ if (onlydb != InvalidOid) { @@ -4520,14 +4412,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); - hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); - hash_ctl.hcxt = pgStatLocalContext; - dbentry->subworkers = hash_create("Per-database subscription worker", - PGSTAT_SUBWORKER_HASH_SIZE, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* * If requested, read the data from the database-specific * file. Otherwise we just leave the hashtables empty. @@ -4536,7 +4420,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) pgstat_read_db_statsfile(dbentry->databaseid, dbentry->tables, dbentry->functions, - dbentry->subworkers, permanent); break; @@ -4580,6 +4463,45 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; } + /* + * 'S' A PgStat_StatSubEntry struct describing subscription + * statistics. + */ + case 'S': + { + PgStat_StatSubEntry subbuf; + PgStat_StatSubEntry *subentry; + + if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin) + != sizeof(PgStat_StatSubEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + if (subscriptionStatHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); + hash_ctl.hcxt = pgStatLocalContext; + subscriptionStatHash = hash_create("Subscription hash", + PGSTAT_SUBSCRIPTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, + (void *) &subbuf.subid, + HASH_ENTER, NULL); + + memcpy(subentry, &subbuf, sizeof(subbuf)); + break; + } + case 'E': goto done; @@ -4614,21 +4536,19 @@ done: * As in pgstat_read_statsfiles, if the permanent file is requested, it is * removed after reading. * - * Note: this code has the ability to skip storing per-table, per-function, or - * per-subscription-worker data, if NULL is passed for the corresponding hashtable. - * That's not used at the moment though. + * Note: this code has the ability to skip storing per-table or per-function + * data, if NULL is passed for the corresponding hashtable. That's not used + * at the moment though. * ---------- */ static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - HTAB *subworkerhash, bool permanent) + bool permanent) { PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; PgStat_StatFuncEntry funcbuf; PgStat_StatFuncEntry *funcentry; - PgStat_StatSubWorkerEntry subwbuf; - PgStat_StatSubWorkerEntry *subwentry; FILE *fpin; int32 format_id; bool found; @@ -4743,41 +4663,6 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, break; /* - * 'S' A PgStat_StatSubWorkerEntry struct describing - * subscription worker statistics. - */ - case 'S': - if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry), - fpin) != sizeof(PgStat_StatSubWorkerEntry)) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - /* - * Skip if subscription worker data not wanted. - */ - if (subworkerhash == NULL) - break; - - subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash, - (void *) &subwbuf.key, - HASH_ENTER, &found); - - if (found) - { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - goto done; - } - - memcpy(subwentry, &subwbuf, sizeof(subwbuf)); - break; - - /* * 'E' The EOF marker of a complete stats file. */ case 'E': @@ -4829,6 +4714,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; PgStat_StatReplSlotEntry myReplSlotStats; + PgStat_StatSubEntry mySubStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -4959,6 +4845,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, } break; + /* + * 'S' A PgStat_StatSubEntry struct describing subscription + * statistics follows. + */ + case 'S': + if (fread(&mySubStats, 1, sizeof(PgStat_StatSubEntry), fpin) + != sizeof(PgStat_StatSubEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + break; + case 'E': goto done; @@ -5164,6 +5066,7 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStatHash = NULL; + subscriptionStatHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5450,8 +5353,6 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); - if (dbentry->subworkers != NULL) - hash_destroy(dbentry->subworkers); if (hash_search(pgStatDBHash, (void *) &dbid, @@ -5489,16 +5390,13 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); - if (dbentry->subworkers != NULL) - hash_destroy(dbentry->subworkers); dbentry->tables = NULL; dbentry->functions = NULL; - dbentry->subworkers = NULL; /* * Reset database-level stats, too. This creates empty hash tables for - * tables, functions, and subscription workers. + * tables and functions. */ reset_dbentry_counters(dbentry); } @@ -5567,14 +5465,6 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) else if (msg->m_resettype == RESET_FUNCTION) (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), HASH_REMOVE, NULL); - else if (msg->m_resettype == RESET_SUBWORKER) - { - PgStat_StatSubWorkerKey key; - - key.subid = msg->m_objectid; - key.subrelid = msg->m_subobjectid; - (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL); - } } /* ---------- @@ -5645,6 +5535,51 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, } } +/* ---------- + * pgstat_recv_resetsubcounter() - + * + * Reset some subscription statistics of the cluster. + * ---------- + */ +static void +pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len) +{ + PgStat_StatSubEntry *subentry; + TimestampTz ts; + + /* Return if we don't have replication subscription statistics */ + if (subscriptionStatHash == NULL) + return; + + ts = GetCurrentTimestamp(); + if (!OidIsValid(msg->m_subid)) + { + HASH_SEQ_STATUS sstat; + + /* Clear all subscription counters */ + hash_seq_init(&sstat, subscriptionStatHash); + while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_subscription(subentry, ts); + } + else + { + /* Get the subscription statistics to reset */ + subentry = pgstat_get_subscription_entry(msg->m_subid, false); + + /* + * Nothing to do if the given subscription entry is not found. This + * could happen when the subscription with the subid is removed and + * the corresponding statistics entry is also removed before receiving + * the reset message. + */ + if (!subentry) + return; + + /* Reset the stats for the requested subscription */ + pgstat_reset_subscription(subentry, ts); + } +} + /* ---------- * pgstat_recv_autovac() - @@ -6118,81 +6053,42 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } /* ---------- - * pgstat_recv_subscription_purge() - + * pgstat_recv_subscription_drop() - * - * Process a SUBSCRIPTIONPURGE message. + * Process a SUBSCRIPTIONDROP message. * ---------- */ static void -pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len) { - HASH_SEQ_STATUS hstat; - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *subwentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, false); - - /* No need to purge if we don't even know the database */ - if (!dbentry || !dbentry->subworkers) + /* Return if we don't have replication subscription statistics */ + if (subscriptionStatHash == NULL) return; - /* Remove all subscription worker statistics for the given subscriptions */ - hash_seq_init(&hstat, dbentry->subworkers); - while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) - { - for (int i = 0; i < msg->m_nentries; i++) - { - if (subwentry->key.subid == msg->m_subids[i]) - { - (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key), - HASH_REMOVE, NULL); - break; - } - } - } + /* Remove from hashtable if present; we don't care if it's not */ + (void) hash_search(subscriptionStatHash, (void *) &(msg->m_subid), + HASH_REMOVE, NULL); } /* ---------- - * pgstat_recv_subworker_error() - + * pgstat_recv_subscription_error() - * - * Process a SUBWORKERERROR message. + * Process a SUBSCRIPTIONERROR message. * ---------- */ static void -pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len) { - PgStat_StatDBEntry *dbentry; - PgStat_StatSubWorkerEntry *subwentry; - - dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + PgStat_StatSubEntry *subentry; - /* Get the subscription worker stats */ - subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, - msg->m_subrelid, true); - Assert(subwentry); - - if (subwentry->last_error_relid == msg->m_relid && - subwentry->last_error_command == msg->m_command && - subwentry->last_error_xid == msg->m_xid && - strcmp(subwentry->last_error_message, msg->m_message) == 0) - { - /* - * The same error occurred again in succession, just update its - * timestamp and count. - */ - subwentry->last_error_count++; - subwentry->last_error_time = msg->m_timestamp; - return; - } + /* Get the subscription stats */ + subentry = pgstat_get_subscription_entry(msg->m_subid, true); + Assert(subentry); - /* Otherwise, update the error information */ - subwentry->last_error_relid = msg->m_relid; - subwentry->last_error_command = msg->m_command; - subwentry->last_error_xid = msg->m_xid; - subwentry->last_error_count = 1; - subwentry->last_error_time = msg->m_timestamp; - strlcpy(subwentry->last_error_message, msg->m_message, - PGSTAT_SUBWORKERERROR_MSGLEN); + if (msg->m_is_apply_error) + subentry->apply_error_count++; + else + subentry->sync_error_count++; } /* ---------- @@ -6313,6 +6209,68 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) slotent->stat_reset_timestamp = ts; } +/* ---------- + * pgstat_get_subscription_entry + * + * Return the subscription statistics entry with the given subscription OID. + * If no subscription entry exists, initialize it, if the create parameter is + * true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubEntry * +pgstat_get_subscription_entry(Oid subid, bool create) +{ + PgStat_StatSubEntry *subentry; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + if (subscriptionStatHash == NULL) + { + HASHCTL hash_ctl; + + /* + * Quick return NULL if the hash table is empty and the caller didn't + * request to create the entry. + */ + if (!create) + return NULL; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubEntry); + subscriptionStatHash = hash_create("Subscription hash", + PGSTAT_SUBSCRIPTION_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + } + + subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash, + (void *) &subid, + action, &found); + + if (!create && !found) + return NULL; + + /* If not found, initialize the new one */ + if (!found) + pgstat_reset_subscription(subentry, 0); + + return subentry; +} + +/* ---------- + * pgstat_reset_subscription + * + * Reset the given subscription stats. + * ---------- + */ +static void +pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts) +{ + subentry->apply_error_count = 0; + subentry->sync_error_count = 0; + subentry->stat_reset_timestamp = ts; +} + /* * pgstat_slru_index * |