aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql5
-rw-r--r--src/backend/replication/logical/reorderbuffer.c12
-rw-r--r--src/backend/replication/walsender.c42
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/replication/reorderbuffer.h11
-rw-r--r--src/include/replication/walsender_private.h5
-rw-r--r--src/test/regress/expected/rules.out7
8 files changed, 81 insertions, 9 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4456fefb38d..f7800f01a6b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -776,7 +776,10 @@ CREATE VIEW pg_stat_replication AS
W.replay_lag,
W.sync_priority,
W.sync_state,
- W.reply_time
+ W.reply_time,
+ W.spill_txns,
+ W.spill_count,
+ W.spill_bytes
FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index d82a5f18b0a..53affeb8772 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -308,6 +308,10 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0;
buffer->size = 0;
+ buffer->spillCount = 0;
+ buffer->spillTxns = 0;
+ buffer->spillBytes = 0;
+
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
@@ -2415,6 +2419,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
+ Size size = txn->size;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
@@ -2473,6 +2478,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
spilled++;
}
+ /* update the statistics */
+ rb->spillCount += 1;
+ rb->spillBytes += size;
+
+ /* Don't consider already serialized transaction. */
+ rb->spillTxns += txn->serialized ? 0 : 1;
+
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7f5671504f7..fa75872877e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
+static void UpdateSpillStats(LogicalDecodingContext *ctx);
static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
@@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/*
* LogicalDecodingContext 'update_progress' callback.
*
- * Write the current position to the lag tracker (see XLogSendPhysical).
+ * Write the current position to the lag tracker (see XLogSendPhysical),
+ * and update the spill statistics.
*/
static void
WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
@@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
LagTrackerWrite(lsn, now);
sendTime = now;
+
+ /*
+ * Update statistics about transactions that spilled to disk.
+ */
+ UpdateSpillStats(ctx);
}
/*
@@ -2318,6 +2325,9 @@ InitWalSenderSlot(void)
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+ walsnd->spillTxns = 0;
+ walsnd->spillCount = 0;
+ walsnd->spillBytes = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 12
+#define PG_STAT_GET_WAL_SENDERS_COLS 15
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int pid;
WalSndState state;
TimestampTz replyTime;
+ int64 spillTxns;
+ int64 spillCount;
+ int64 spillBytes;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
replyTime = walsnd->replyTime;
+ spillTxns = walsnd->spillTxns;
+ spillCount = walsnd->spillCount;
+ spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[11] = true;
else
values[11] = TimestampTzGetDatum(replyTime);
+
+ /* spill to disk */
+ values[12] = Int64GetDatum(spillTxns);
+ values[13] = Int64GetDatum(spillCount);
+ values[14] = Int64GetDatum(spillBytes);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Assert(time != 0);
return now - time;
}
+
+static void
+UpdateSpillStats(LogicalDecodingContext *ctx)
+{
+ ReorderBuffer *rb = ctx->reorder;
+
+ SpinLockAcquire(&MyWalSnd->mutex);
+
+ MyWalSnd->spillTxns = rb->spillTxns;
+ MyWalSnd->spillCount = rb->spillCount;
+ MyWalSnd->spillBytes = rb->spillBytes;
+
+ elog(DEBUG2, "UpdateSpillStats: updating stats %p %ld %ld %ld",
+ rb, rb->spillTxns, rb->spillCount, rb->spillBytes);
+
+ SpinLockRelease(&MyWalSnd->mutex);
+}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index e3996590201..555384cd2bf 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201911121
+#define CATALOG_VERSION_NO 201911211
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 58ea5b982b3..fa0a2a10021 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5166,9 +5166,9 @@
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+ proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}',
prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7c94d920fe9..0867ee9e636 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -402,6 +402,17 @@ struct ReorderBuffer
/* memory accounting */
Size size;
+
+ /*
+ * Statistics about transactions spilled to disk.
+ *
+ * A single transaction may be spilled repeatedly, which is why we keep
+ * two different counters. For spilling, the transaction counter includes
+ * both toplevel transactions and subtransactions.
+ */
+ int64 spillCount; /* spill-to-disk invocation counter */
+ int64 spillTxns; /* number of transactions spilled to disk */
+ int64 spillBytes; /* amount of data spilled to disk */
};
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 0dd6d1cf808..a6b32051ac4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -80,6 +80,11 @@ typedef struct WalSnd
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
+
+ /* Statistics for transactions spilled to disk. */
+ int64 spillTxns;
+ int64 spillCount;
+ int64 spillBytes;
} WalSnd;
extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index abe3a43cd27..c9cc5694048 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1952,9 +1952,12 @@ pg_stat_replication| SELECT s.pid,
w.replay_lag,
w.sync_priority,
w.sync_state,
- w.reply_time
+ w.reply_time,
+ w.spill_txns,
+ w.spill_count,
+ w.spill_bytes
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
s.ssl,