diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 2 | ||||
-rw-r--r-- | src/backend/postmaster/pgstat.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 18 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 21 | ||||
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 8 |
5 files changed, 45 insertions, 10 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 451db2ee0a0..6d78b335908 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_txns, s.stream_count, s.stream_bytes, + s.total_txns, + s.total_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 666ce95d083..e1ec7d8b7d6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) msg.m_stream_txns = repSlotStat->stream_txns; msg.m_stream_count = repSlotStat->stream_count; msg.m_stream_bytes = repSlotStat->stream_bytes; + msg.m_total_txns = repSlotStat->total_txns; + msg.m_total_bytes = repSlotStat->total_bytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].stream_txns += msg->m_stream_txns; replSlotStats[idx].stream_count += msg->m_stream_count; replSlotStats[idx].stream_bytes += msg->m_stream_bytes; + replSlotStats[idx].total_txns += msg->m_total_txns; + replSlotStats[idx].total_bytes += msg->m_total_bytes; } } @@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts) replSlotStats[i].stream_txns = 0; replSlotStats[i].stream_count = 0; replSlotStats[i].stream_bytes = 0; + replSlotStats[i].total_txns = 0; + replSlotStats[i].total_bytes = 0; replSlotStats[i].stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 68e210ce12b..35b0c676412 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ReorderBuffer *rb = ctx->reorder; PgStat_ReplSlotStats repSlotStat; - /* - * Nothing to do if we haven't spilled or streamed anything since the last - * time the stats has been sent. - */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0) + /* Nothing to do if we don't have any replication stats to be sent. */ + if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, (long long) rb->spillBytes, (long long) rb->streamTxns, (long long) rb->streamCount, - (long long) rb->streamBytes); + (long long) rb->streamBytes, + (long long) rb->totalTxns, + (long long) rb->totalBytes); namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); repSlotStat.spill_txns = rb->spillTxns; @@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; + repSlotStat.total_txns = rb->totalTxns; + repSlotStat.total_bytes = rb->totalBytes; pgstat_report_replslot(&repSlotStat); + rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; + rb->totalTxns = 0; + rb->totalBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 52d06285a21..5cb484f0323 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -350,6 +350,8 @@ ReorderBufferAllocate(void) buffer->streamTxns = 0; buffer->streamCount = 0; buffer->streamBytes = 0; + buffer->totalTxns = 0; + buffer->totalBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); + /* + * Update the total bytes processed before releasing the current set + * of changes and restoring the new set of changes. + */ + rb->totalBytes += rb->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { @@ -2364,6 +2371,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, iterstate = NULL; /* + * Update total transaction count and total transaction bytes + * processed. Ensure to not count the streamed transaction multiple + * times. + * + * Note that the statistics computation has to be done after + * ReorderBufferIterTXNFinish as it releases the serialized change + * which we have already accounted in ReorderBufferIterTXNNext. + */ + if (!rbtxn_is_streamed(txn)) + rb->totalTxns++; + + rb->totalBytes += rb->size; + + /* * Done with current changes, send the last message for this set of * changes depending upon streaming mode. */ diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 521ba736143..2680190a402 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[4] = Int64GetDatum(s->stream_txns); values[5] = Int64GetDatum(s->stream_count); values[6] = Int64GetDatum(s->stream_bytes); + values[7] = Int64GetDatum(s->total_txns); + values[8] = Int64GetDatum(s->total_bytes); if (s->stat_reset_timestamp == 0) - nulls[7] = true; + nulls[9] = true; else - values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } |