aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/activity/pgstat_subscription.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/activity/pgstat_subscription.c')
-rw-r--r--src/backend/utils/activity/pgstat_subscription.c67
1 files changed, 56 insertions, 11 deletions
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index 689029b30af..e1072bd5bae 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -26,12 +26,17 @@
void
pgstat_report_subscription_error(Oid subid, bool is_apply_error)
{
- PgStat_MsgSubscriptionError msg;
+ PgStat_EntryRef *entry_ref;
+ PgStat_BackendSubEntry *pending;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR);
- msg.m_subid = subid;
- msg.m_is_apply_error = is_apply_error;
- pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError));
+ entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+ InvalidOid, subid, NULL);
+ pending = entry_ref->pending;
+
+ if (is_apply_error)
+ pending->apply_error_count++;
+ else
+ pending->sync_error_count++;
}
/*
@@ -54,12 +59,52 @@ pgstat_create_subscription(Oid subid)
void
pgstat_drop_subscription(Oid subid)
{
- PgStat_MsgSubscriptionDrop msg;
-
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP);
- msg.m_subid = subid;
- pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop));
-
pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid);
}
+
+/*
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * the collected statistics for one subscription or NULL.
+ */
+PgStat_StatSubEntry *
+pgstat_fetch_stat_subscription(Oid subid)
+{
+ return (PgStat_StatSubEntry *)
+ pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid);
+}
+
+/*
+ * Flush out pending stats for the entry
+ *
+ * If nowait is true, this function returns false if lock could not
+ * immediately acquired, otherwise true is returned.
+ */
+bool
+pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
+{
+ PgStat_BackendSubEntry *localent;
+ PgStatShared_Subscription *shsubent;
+
+ localent = (PgStat_BackendSubEntry *) entry_ref->pending;
+ shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats;
+
+ /* localent always has non-zero content */
+
+ if (!pgstat_lock_entry(entry_ref, nowait))
+ return false;
+
+#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
+ SUB_ACC(apply_error_count);
+ SUB_ACC(sync_error_count);
+#undef SUB_ACC
+
+ pgstat_unlock_entry(entry_ref);
+ return true;
+}
+
+void
+pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
+{
+ ((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts;
+}