diff options
-rw-r--r-- | doc/src/sgml/monitoring.sgml | 38 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
-rw-r--r-- | src/backend/postmaster/pgstat.c | 11 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 19 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 20 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 2 | ||||
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 9 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.dat | 6 | ||||
-rw-r--r-- | src/include/pgstat.h | 8 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 5 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 5 |
12 files changed, 111 insertions, 17 deletions
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 313e44ed549..98e19954538 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2634,6 +2634,44 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i <row> <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>stream_txns</structfield> <type>bigint</type> + </para> + <para> + Number of in-progress transactions streamed to the decoding output plugin + after the memory used by logical decoding of changes from WAL for this + slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only + works with toplevel transactions (subtransactions can't be streamed + independently), so the counter does not get incremented for subtransactions. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>stream_count</structfield><type>bigint</type> + </para> + <para> + Number of times in-progress transactions were streamed to the decoding + output plugin while decoding changes from WAL for this slot. Transactions + may get streamed repeatedly, and this counter gets incremented on every + such invocation. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>stream_bytes</structfield><type>bigint</type> + </para> + <para> + Amount of decoded in-progress transaction data streamed to the decoding + output plugin while decoding changes from WAL for this slot. This and other + streaming counters for this slot can be used to gauge the network I/O which + occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>. + </para> + </entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> <structfield>stats_reset</structfield> <type>timestamp with time zone</type> </para> <para> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c6dd084fbcc..5171ea05c7e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS s.spill_txns, s.spill_count, s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 822f0ebc628..f1dca2f25b7 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize) */ void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes) + int spillbytes, int streamtxns, int streamcount, int streambytes) { PgStat_MsgReplSlot msg; @@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, msg.m_spill_txns = spilltxns; msg.m_spill_count = spillcount; msg.m_spill_bytes = spillbytes; + msg.m_stream_txns = streamtxns; + msg.m_stream_count = streamcount; + msg.m_stream_bytes = streambytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].spill_txns += msg->m_spill_txns; replSlotStats[idx].spill_count += msg->m_spill_count; replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + replSlotStats[idx].stream_txns += msg->m_stream_txns; + replSlotStats[idx].stream_count += msg->m_stream_count; + replSlotStats[idx].stream_bytes += msg->m_stream_bytes; } } @@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts) replSlotStats[i].spill_txns = 0; replSlotStats[i].spill_count = 0; replSlotStats[i].spill_bytes = 0; + replSlotStats[i].stream_txns = 0; + replSlotStats[i].stream_count = 0; + replSlotStats[i].stream_bytes = 0; replSlotStats[i].stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8675832f4d6..d5cfbeaa4af 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ReorderBuffer *rb = ctx->reorder; /* - * Nothing to do if we haven't spilled anything since the last time the - * stats has been sent. + * Nothing to do if we haven't spilled or streamed anything since the last + * time the stats has been sent. */ - if (rb->spillBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld", + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, - (long long) rb->spillBytes); + (long long) rb->spillBytes, + (long long) rb->streamTxns, + (long long) rb->streamCount, + (long long) rb->streamBytes); pgstat_report_replslot(NameStr(ctx->slot->data.name), - rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns, rb->spillCount, rb->spillBytes, + rb->streamTxns, rb->streamCount, rb->streamBytes); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; + rb->streamTxns = 0; + rb->streamCount = 0; + rb->streamBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7a8bf760791..c1bd68011c5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -346,6 +346,9 @@ ReorderBufferAllocate(void) buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; + buffer->streamTxns = 0; + buffer->streamCount = 0; + buffer->streamBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { Snapshot snapshot_now; CommandId command_id; + Size stream_bytes; + bool txn_is_streamed; /* We can never reach here for a subtransaction. */ Assert(txn->toptxn == NULL); @@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->snapshot_now = NULL; } + /* + * Remember this information to be used later to update stats. We can't + * update the stats here as an error while processing the changes would + * lead to the accumulation of stats even though we haven't streamed all + * the changes. + */ + txn_is_streamed = rbtxn_is_streamed(txn); + stream_bytes = txn->total_size; + /* Process and send the changes to output plugin. */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); + rb->streamCount += 1; + rb->streamBytes += stream_bytes; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (txn_is_streamed) ? 0 : 1; + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 220b4cd6e99..09be1d8c485 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); /* * Now that the slot has been marked as in_use and active, it's safe to diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 472fa596e1f..a210fc93b41 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 5 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(s->spill_txns); values[2] = Int64GetDatum(s->spill_count); values[3] = Int64GetDatum(s->spill_bytes); + values[4] = Int64GetDatum(s->stream_txns); + values[5] = Int64GetDatum(s->stream_count); + values[6] = Int64GetDatum(s->stream_bytes); if (s->stat_reset_timestamp == 0) - nulls[4] = true; + nulls[7] = true; else - values[4] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 8cf02fc0d85..73650f88e94 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202010281 +#define CATALOG_VERSION_NO 202010291 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 24ec2cfed6a..d9770bbadd8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5260,9 +5260,9 @@ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}', + proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index a821ff4f158..257e515bfe7 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_spill_txns; PgStat_Counter m_spill_count; PgStat_Counter m_spill_bytes; + PgStat_Counter m_stream_txns; + PgStat_Counter m_stream_count; + PgStat_Counter m_stream_bytes; } PgStat_MsgReplSlot; @@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter spill_txns; PgStat_Counter spill_count; PgStat_Counter spill_bytes; + PgStat_Counter stream_txns; + PgStat_Counter stream_count; + PgStat_Counter stream_bytes; TimestampTz stat_reset_timestamp; } PgStat_ReplSlotStats; @@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes); + int spillbytes, int streamtxns, int streamcount, int streambytes); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1c77819aad2..dfdda938b2a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -551,6 +551,11 @@ struct ReorderBuffer int64 spillTxns; /* number of transactions spilled to disk */ int64 spillCount; /* spill-to-disk invocation counter */ int64 spillBytes; /* amount of data spilled to disk */ + + /* Statistics about transactions streamed to the decoding output plugin */ + int64 streamTxns; /* number of transactions streamed */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data streamed */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 492cdcf74c3..097ff5d111f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name, s.spill_txns, s.spill_count, s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset); + FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, |