1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
/* -------------------------------------------------------------------------
*
* pgstat_subscription.c
* Implementation of subscription statistics.
*
* This file contains the implementation of subscription statistics. It is kept
* separate from pgstat.c to enforce the line between the statistics access /
* storage implementation and the details about individual types of
* statistics.
*
* Copyright (c) 2001-2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/utils/activity/pgstat_subscription.c
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "utils/pgstat_internal.h"
/*
* Report a subscription error.
*/
void
pgstat_report_subscription_error(Oid subid, bool is_apply_error)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid, NULL);
pending = entry_ref->pending;
if (is_apply_error)
pending->apply_error_count++;
else
pending->sync_error_count++;
}
/*
* Report a subscription conflict.
*/
void
pgstat_report_subscription_conflict(Oid subid, ConflictType type)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid, NULL);
pending = entry_ref->pending;
pending->conflict_count[type]++;
}
/*
* Report creating the subscription.
*/
void
pgstat_create_subscription(Oid subid)
{
/* Ensures that stats are dropped if transaction rolls back */
pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid);
/* Create and initialize the subscription stats entry */
pgstat_get_entry_ref(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid,
true, NULL);
pgstat_reset_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 0);
}
/*
* Report dropping the subscription.
*
* Ensures that stats are dropped if transaction commits.
*/
void
pgstat_drop_subscription(Oid subid)
{
pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid);
}
/*
* Support function for the SQL-callable pgstat* functions. Returns
* the collected statistics for one subscription or NULL.
*/
PgStat_StatSubEntry *
pgstat_fetch_stat_subscription(Oid subid)
{
return (PgStat_StatSubEntry *)
pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid);
}
/*
* Flush out pending stats for the entry
*
* If nowait is true and the lock could not be immediately acquired, returns
* false without flushing the entry. Otherwise returns true.
*/
bool
pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
{
PgStat_BackendSubEntry *localent;
PgStatShared_Subscription *shsubent;
localent = (PgStat_BackendSubEntry *) entry_ref->pending;
shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats;
/* localent always has non-zero content */
if (!pgstat_lock_entry(entry_ref, nowait))
return false;
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);
#undef SUB_ACC
pgstat_unlock_entry(entry_ref);
return true;
}
void
pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
{
((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts;
}
|