aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/vacuumlazy.c37
-rw-r--r--src/backend/access/nbtree/nbtsort.c40
-rw-r--r--src/backend/access/transam/xlog.c12
-rw-r--r--src/backend/access/transam/xloginsert.c13
-rw-r--r--src/backend/executor/execParallel.c36
-rw-r--r--src/backend/executor/instrument.c53
-rw-r--r--src/include/access/xlog.h3
-rw-r--r--src/include/executor/execParallel.h1
-rw-r--r--src/include/executor/instrument.h18
-rw-r--r--src/tools/pgindent/typedefs.list1
10 files changed, 182 insertions, 32 deletions
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 9f9596c7184..3ca7f5d1364 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -139,6 +139,7 @@
#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
@@ -275,6 +276,9 @@ typedef struct LVParallelState
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
+ /* Points to WAL usage area in DSM */
+ WalUsage *wal_usage;
+
/*
* The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively.
@@ -2143,8 +2147,8 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
vacrelstats->dead_tuples, nindexes, vacrelstats);
/*
- * Next, accumulate buffer usage. (This must wait for the workers to
- * finish, or we might get incomplete data.)
+ * Next, accumulate buffer and WAL usage. (This must wait for the workers
+ * to finish, or we might get incomplete data.)
*/
if (nworkers > 0)
{
@@ -2154,7 +2158,7 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
WaitForParallelWorkersToFinish(lps->pcxt);
for (i = 0; i < lps->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&lps->buffer_usage[i]);
+ InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
}
/*
@@ -3171,6 +3175,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
LVShared *shared;
LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
bool *can_parallel_vacuum;
long maxtuples;
char *sharedquery;
@@ -3255,15 +3260,19 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
- * Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
+ * Estimate space for BufferUsage and WalUsage --
+ * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
*
* If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgBufferUsage, so do
- * it unconditionally.
+ * have no way of knowing whether anyone's looking at pgBufferUsage or
+ * pgWalUsage, so do it unconditionally.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string);
@@ -3299,11 +3308,18 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
vacrelstats->dead_tuples = dead_tuples;
- /* Allocate space for each worker's BufferUsage; no need to initialize */
+ /*
+ * Allocate space for each worker's BufferUsage and WalUsage; no need to
+ * initialize
+ */
buffer_usage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
lps->buffer_usage = buffer_usage;
+ wal_usage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+ lps->wal_usage = wal_usage;
/* Store query string for workers */
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
@@ -3435,6 +3451,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
LVShared *lvshared;
LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
int nindexes;
char *sharedquery;
IndexBulkDeleteResult **stats;
@@ -3511,9 +3528,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
&vacrelstats);
- /* Report buffer usage during parallel execution */
+ /* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+ wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+ &wal_usage[ParallelWorkerNumber]);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 3924945664a..4a85865fc50 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -67,6 +67,7 @@
#include "access/xloginsert.h"
#include "catalog/index.h"
#include "commands/progress.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/smgr.h"
@@ -81,6 +82,7 @@
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005)
/*
* DISABLE_LEADER_PARTICIPATION disables the leader's participation in
@@ -203,6 +205,7 @@ typedef struct BTLeader
Sharedsort *sharedsort;
Sharedsort *sharedsort2;
Snapshot snapshot;
+ WalUsage *walusage;
} BTLeader;
/*
@@ -1476,6 +1479,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
Sharedsort *sharedsort2;
BTSpool *btspool = buildstate->spool;
BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+ WalUsage *walusage;
bool leaderparticipates = true;
char *sharedquery;
int querylen;
@@ -1528,6 +1532,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
shm_toc_estimate_keys(&pcxt->estimator, 3);
}
+ /*
+ * Estimate space for WalUsage -- PARALLEL_KEY_WAL_USAGE
+ *
+ * WalUsage during execution of maintenance command can be used by an
+ * extension that reports the WAL usage, such as pg_stat_statements. We
+ * have no way of knowing whether anyone's looking at pgWalUsage, so do it
+ * unconditionally.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string);
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
@@ -1599,6 +1615,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
memcpy(sharedquery, debug_query_string, querylen + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+ /* Allocate space for each worker's WalUsage; no need to initialize */
+ walusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
btleader->pcxt = pcxt;
@@ -1609,6 +1630,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
btleader->sharedsort = sharedsort;
btleader->sharedsort2 = sharedsort2;
btleader->snapshot = snapshot;
+ btleader->walusage = walusage;
/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
@@ -1637,8 +1659,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
static void
_bt_end_parallel(BTLeader *btleader)
{
+ int i;
+
/* Shutdown worker processes */
WaitForParallelWorkersToFinish(btleader->pcxt);
+
+ /*
+ * Next, accumulate WAL usage. (This must wait for the workers to finish,
+ * or we might get incomplete data.)
+ */
+ for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(NULL, &btleader->walusage[i]);
+
/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(btleader->snapshot))
UnregisterSnapshot(btleader->snapshot);
@@ -1769,6 +1801,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
Relation indexRel;
LOCKMODE heapLockmode;
LOCKMODE indexLockmode;
+ WalUsage *walusage;
int sortmem;
#ifdef BTREE_BUILD_STATS
@@ -1830,11 +1863,18 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
tuplesort_attach_shared(sharedsort2, seg);
}
+ /* Prepare to track buffer usage during parallel execution */
+ InstrStartParallelQuery();
+
/* Perform sorting of spool, and possibly a spool2 */
sortmem = maintenance_work_mem / btshared->scantuplesortstates;
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
sharedsort2, sortmem, false);
+ /* Report WAL usage during parallel execution */
+ walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(NULL, &walusage[ParallelWorkerNumber]);
+
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a29456f7890..7b70bfef1ae 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -43,6 +43,7 @@
#include "commands/progress.h"
#include "commands/tablespace.h"
#include "common/controldata_utils.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
@@ -996,7 +997,8 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
XLogRecPtr fpw_lsn,
- uint8 flags)
+ uint8 flags,
+ int num_fpw)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
pg_crc32c rdata_crc;
@@ -1252,6 +1254,14 @@ XLogInsertRecord(XLogRecData *rdata,
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
+ /* Report WAL traffic to the instrumentation. */
+ if (inserted)
+ {
+ pgWalUsage.wal_bytes += rechdr->xl_tot_len;
+ pgWalUsage.wal_records++;
+ pgWalUsage.wal_num_fpw += num_fpw;
+ }
+
return EndPos;
}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a618dec776c..5e032e7042d 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -25,6 +25,7 @@
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/origin.h"
@@ -108,7 +109,7 @@ static MemoryContext xloginsert_cxt;
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn);
+ XLogRecPtr *fpw_lsn, int *num_fpw);
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
uint16 hole_length, char *dest, uint16 *dlen);
@@ -448,6 +449,7 @@ XLogInsert(RmgrId rmid, uint8 info)
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
+ int num_fpw = 0;
/*
* Get values needed to decide whether to do full-page writes. Since
@@ -457,9 +459,9 @@ XLogInsert(RmgrId rmid, uint8 info)
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
- &fpw_lsn);
+ &fpw_lsn, &num_fpw);
- EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
+ EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpw);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
@@ -482,7 +484,7 @@ XLogInsert(RmgrId rmid, uint8 info)
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn)
+ XLogRecPtr *fpw_lsn, int *num_fpw)
{
XLogRecData *rdt;
uint32 total_len = 0;
@@ -635,6 +637,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+ /* Report a full page image constructed for the WAL record */
+ *num_fpw += 1;
+
/*
* Construct XLogRecData entries for the page content.
*/
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index a753d6efa01..b7d07199538 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,7 +12,7 @@
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL usage info, and
* the actual plan to be passed down to the worker.
*
* IDENTIFICATION
@@ -62,6 +62,7 @@
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -573,6 +574,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *pstmt_space;
char *paramlistinfo_space;
BufferUsage *bufusage_space;
+ WalUsage *walusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
@@ -646,6 +648,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /*
+ * Same thing for WalUsage.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -728,6 +737,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
+ /* Same for WalUsage. */
+ walusage_space = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
+ pei->wal_usage = walusage_space;
+
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
@@ -1069,7 +1084,7 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
/*
* Finish parallel execution. We wait for parallel workers to finish, and
- * accumulate their buffer usage.
+ * accumulate their buffer/WAL usage.
*/
void
ExecParallelFinish(ParallelExecutorInfo *pei)
@@ -1109,11 +1124,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
WaitForParallelWorkersToFinish(pei->pcxt);
/*
- * Next, accumulate buffer usage. (This must wait for the workers to
+ * Next, accumulate buffer/WAL usage. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
- InstrAccumParallelQuery(&pei->buffer_usage[i]);
+ InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
pei->finished = true;
}
@@ -1333,6 +1348,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
@@ -1386,11 +1402,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
/*
- * Prepare to track buffer usage during query execution.
+ * Prepare to track buffer/WAL usage during query execution.
*
* We do this after starting up the executor to match what happens in the
- * leader, which also doesn't count buffer accesses that occur during
- * executor startup.
+ * leader, which also doesn't count buffer accesses and WAL activity that
+ * occur during executor startup.
*/
InstrStartParallelQuery();
@@ -1406,9 +1422,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Shut down the executor */
ExecutorFinish(queryDesc);
- /* Report buffer usage during parallel execution. */
+ /* Report buffer/WAL usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+ wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+ &wal_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */
if (instrumentation != NULL)
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 042e10f96bc..74ee4808e36 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -19,8 +19,11 @@
BufferUsage pgBufferUsage;
static BufferUsage save_pgBufferUsage;
+WalUsage pgWalUsage;
+static WalUsage save_pgWalUsage;
static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
+static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* Allocate new instrumentation structure(s) */
@@ -31,15 +34,17 @@ InstrAlloc(int n, int instrument_options)
/* initialize all fields to zeroes, then modify as needed */
instr = palloc0(n * sizeof(Instrumentation));
- if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER))
+ if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
{
bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+ bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
int i;
for (i = 0; i < n; i++)
{
instr[i].need_bufusage = need_buffers;
+ instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
}
}
@@ -53,6 +58,7 @@ InstrInit(Instrumentation *instr, int instrument_options)
{
memset(instr, 0, sizeof(Instrumentation));
instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+ instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
}
@@ -67,6 +73,9 @@ InstrStartNode(Instrumentation *instr)
/* save buffer usage totals at node entry, if needed */
if (instr->need_bufusage)
instr->bufusage_start = pgBufferUsage;
+
+ if (instr->need_walusage)
+ instr->walusage_start = pgWalUsage;
}
/* Exit from a plan node */
@@ -95,6 +104,10 @@ InstrStopNode(Instrumentation *instr, double nTuples)
BufferUsageAccumDiff(&instr->bufusage,
&pgBufferUsage, &instr->bufusage_start);
+ if (instr->need_walusage)
+ WalUsageAccumDiff(&instr->walusage,
+ &pgWalUsage, &instr->walusage_start);
+
/* Is this the first tuple of this cycle? */
if (!instr->running)
{
@@ -158,6 +171,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add)
/* Add delta of buffer usage since entry to node's totals */
if (dst->need_bufusage)
BufferUsageAdd(&dst->bufusage, &add->bufusage);
+
+ if (dst->need_walusage)
+ WalUsageAdd(&dst->walusage, &add->walusage);
}
/* note current values during parallel executor startup */
@@ -165,21 +181,29 @@ void
InstrStartParallelQuery(void)
{
save_pgBufferUsage = pgBufferUsage;
+ save_pgWalUsage = pgWalUsage;
}
/* report usage after parallel executor shutdown */
void
-InstrEndParallelQuery(BufferUsage *result)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
- memset(result, 0, sizeof(BufferUsage));
- BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage);
+ if (bufusage)
+ {
+ memset(bufusage, 0, sizeof(BufferUsage));
+ BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
+ }
+ memset(walusage, 0, sizeof(WalUsage));
+ WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
}
/* accumulate work done by workers in leader's stats */
void
-InstrAccumParallelQuery(BufferUsage *result)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
- BufferUsageAdd(&pgBufferUsage, result);
+ if (bufusage)
+ BufferUsageAdd(&pgBufferUsage, bufusage);
+ WalUsageAdd(&pgWalUsage, walusage);
}
/* dst += add */
@@ -221,3 +245,20 @@ BufferUsageAccumDiff(BufferUsage *dst,
INSTR_TIME_ACCUM_DIFF(dst->blk_write_time,
add->blk_write_time, sub->blk_write_time);
}
+
+/* helper functions for WAL usage accumulation */
+static void
+WalUsageAdd(WalUsage *dst, WalUsage *add)
+{
+ dst->wal_bytes += add->wal_bytes;
+ dst->wal_records += add->wal_records;
+ dst->wal_num_fpw += add->wal_num_fpw;
+}
+
+void
+WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
+{
+ dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
+ dst->wal_records += add->wal_records - sub->wal_records;
+ dst->wal_num_fpw += add->wal_num_fpw - sub->wal_num_fpw;
+}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31cce1..b91e724b2d4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -259,7 +259,8 @@ struct XLogRecData;
extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
XLogRecPtr fpw_lsn,
- uint8 flags);
+ uint8 flags,
+ int num_fpw);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 17d07cf020c..5a39a5b29c4 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -26,6 +26,7 @@ typedef struct ParallelExecutorInfo
PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
+ WalUsage *wal_usage; /* walusage area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 3825a5ac1f3..64439c6819c 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -32,12 +32,20 @@ typedef struct BufferUsage
instr_time blk_write_time; /* time spent writing */
} BufferUsage;
+typedef struct WalUsage
+{
+ long wal_records; /* # of WAL records produced */
+ long wal_num_fpw; /* # of WAL full page image writes produced */
+ uint64 wal_bytes; /* size of WAL records produced */
+} WalUsage;
+
/* Flag bits included in InstrAlloc's instrument_options bitmask */
typedef enum InstrumentOption
{
INSTRUMENT_TIMER = 1 << 0, /* needs timer (and row counts) */
INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */
INSTRUMENT_ROWS = 1 << 2, /* needs row count */
+ INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */
INSTRUMENT_ALL = PG_INT32_MAX
} InstrumentOption;
@@ -46,6 +54,7 @@ typedef struct Instrumentation
/* Parameters set at node creation: */
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
+ bool need_walusage; /* true if we need WAL usage data */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
instr_time starttime; /* start time of current iteration of node */
@@ -53,6 +62,7 @@ typedef struct Instrumentation
double firsttuple; /* time for first tuple of this cycle */
double tuplecount; /* # of tuples emitted so far this cycle */
BufferUsage bufusage_start; /* buffer usage at start */
+ WalUsage walusage_start; /* WAL usage at start */
/* Accumulated statistics across all completed cycles: */
double startup; /* total startup time (in seconds) */
double total; /* total time (in seconds) */
@@ -62,6 +72,7 @@ typedef struct Instrumentation
double nfiltered1; /* # of tuples removed by scanqual or joinqual */
double nfiltered2; /* # of tuples removed by "other" quals */
BufferUsage bufusage; /* total buffer usage */
+ WalUsage walusage; /* total WAL usage */
} Instrumentation;
typedef struct WorkerInstrumentation
@@ -71,6 +82,7 @@ typedef struct WorkerInstrumentation
} WorkerInstrumentation;
extern PGDLLIMPORT BufferUsage pgBufferUsage;
+extern PGDLLIMPORT WalUsage pgWalUsage;
extern Instrumentation *InstrAlloc(int n, int instrument_options);
extern void InstrInit(Instrumentation *instr, int instrument_options);
@@ -79,9 +91,11 @@ extern void InstrStopNode(Instrumentation *instr, double nTuples);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *result);
-extern void InstrAccumParallelQuery(BufferUsage *result);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
extern void BufferUsageAccumDiff(BufferUsage *dst,
const BufferUsage *add, const BufferUsage *sub);
+extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
+ const WalUsage *sub);
#endif /* INSTRUMENT_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 939de985d32..34623523a70 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2643,6 +2643,7 @@ WalSndCtlData
WalSndSendDataCallback
WalSndState
WalTimeSample
+WalUsage
WalWriteMethod
Walfile
WindowAgg