diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a753d6efa01..b7d07199538 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,7 +12,7 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer usage info, and + * any ParamListInfo associated with the query, buffer/WAL usage info, and * the actual plan to be passed down to the worker. * * IDENTIFICATION @@ -62,6 +62,7 @@ #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -573,6 +574,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_space; char *paramlistinfo_space; BufferUsage *bufusage_space; + WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -646,6 +648,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* + * Same thing for WalUsage. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); @@ -728,6 +737,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); pei->buffer_usage = bufusage_space; + /* Same for WalUsage. */ + walusage_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); + pei->wal_usage = walusage_space; + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -1069,7 +1084,7 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate, /* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage. + * accumulate their buffer/WAL usage. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -1109,11 +1124,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) WaitForParallelWorkersToFinish(pei->pcxt); /* - * Next, accumulate buffer usage. (This must wait for the workers to + * Next, accumulate buffer/WAL usage. (This must wait for the workers to * finish, or we might get incomplete data.) */ for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i]); + InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); pei->finished = true; } @@ -1333,6 +1348,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; + WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1386,11 +1402,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); /* - * Prepare to track buffer usage during query execution. + * Prepare to track buffer/WAL usage during query execution. * * We do this after starting up the executor to match what happens in the - * leader, which also doesn't count buffer accesses that occur during - * executor startup. + * leader, which also doesn't count buffer accesses and WAL activity that + * occur during executor startup. */ InstrStartParallelQuery(); @@ -1406,9 +1422,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Shut down the executor */ ExecutorFinish(queryDesc); - /* Report buffer usage during parallel execution. */ + /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); + wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) |