diff options
Diffstat (limited to 'src/backend/utils/activity/pgstat_subscription.c')
-rw-r--r-- | src/backend/utils/activity/pgstat_subscription.c | 67 |
1 files changed, 56 insertions, 11 deletions
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index 689029b30af..e1072bd5bae 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -26,12 +26,17 @@ void pgstat_report_subscription_error(Oid subid, bool is_apply_error) { - PgStat_MsgSubscriptionError msg; + PgStat_EntryRef *entry_ref; + PgStat_BackendSubEntry *pending; - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR); - msg.m_subid = subid; - msg.m_is_apply_error = is_apply_error; - pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError)); + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, + InvalidOid, subid, NULL); + pending = entry_ref->pending; + + if (is_apply_error) + pending->apply_error_count++; + else + pending->sync_error_count++; } /* @@ -54,12 +59,52 @@ pgstat_create_subscription(Oid subid) void pgstat_drop_subscription(Oid subid) { - PgStat_MsgSubscriptionDrop msg; - - pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP); - msg.m_subid = subid; - pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop)); - pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid); } + +/* + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + */ +PgStat_StatSubEntry * +pgstat_fetch_stat_subscription(Oid subid) +{ + return (PgStat_StatSubEntry *) + pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid); +} + +/* + * Flush out pending stats for the entry + * + * If nowait is true, this function returns false if lock could not + * immediately acquired, otherwise true is returned. + */ +bool +pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + PgStat_BackendSubEntry *localent; + PgStatShared_Subscription *shsubent; + + localent = (PgStat_BackendSubEntry *) entry_ref->pending; + shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats; + + /* localent always has non-zero content */ + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + +#define SUB_ACC(fld) shsubent->stats.fld += localent->fld + SUB_ACC(apply_error_count); + SUB_ACC(sync_error_count); +#undef SUB_ACC + + pgstat_unlock_entry(entry_ref); + return true; +} + +void +pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts; +} |