diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 19 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 103 |
2 files changed, 119 insertions, 3 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 41cb41481df..382e78fb7fe 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAgg.h" #include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" @@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggEstimate((AggState *) planstate, e->pcxt); + break; default: break; } @@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeDSM((AggState *) planstate, d->pcxt); + break; default: break; } @@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, case T_HashState: ExecHashRetrieveInstrumentation((HashState *) planstate); break; + case T_AggState: + ExecAggRetrieveInstrumentation((AggState *) planstate); + break; default: break; } @@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate, pwcxt); break; - + case T_AggState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecAggInitializeWorker((AggState *) planstate, pwcxt); + break; default: break; } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 331acee2814..a20554ae65a 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -240,6 +240,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" @@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node) int numGroupingSets = Max(node->maxsets, 1); int setno; + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->shared_info && IsParallelWorker()) + { + AggregateInstrumentation *si; + + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + si->hash_batches_used = node->hash_batches_used; + si->hash_disk_used = node->hash_disk_used; + si->hash_mem_peak = node->hash_mem_peak; + } + /* Make sure we have closed any open tuplesorts */ if (node->sort_in) @@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ } + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + + /* ---------------------------------------------------------------- + * ExecAggEstimate + * + * Estimate space required to propagate aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggEstimate(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); + size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeDSM + * + * Initialize DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + pcxt->nworkers * sizeof(AggregateInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeWorker + * + * Attach worker to DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); +} + +/* ---------------------------------------------------------------- + * ExecAggRetrieveInstrumentation + * + * Transfer aggregate statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecAggRetrieveInstrumentation(AggState *node) +{ + Size size; + SharedAggInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + node->shared_info->num_workers * sizeof(AggregateInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} |