aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c19
-rw-r--r--src/backend/executor/nodeAgg.c103
2 files changed, 119 insertions, 3 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 41cb41481df..382e78fb7fe 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
@@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggEstimate((AggState *) planstate, e->pcxt);
+ break;
default:
break;
}
@@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
+ break;
default:
break;
}
@@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
+ case T_AggState:
+ ExecAggRetrieveInstrumentation((AggState *) planstate);
+ break;
default:
break;
}
@@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
pwcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeWorker((AggState *) planstate, pwcxt);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 331acee2814..a20554ae65a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -240,6 +240,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
@@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
int numGroupingSets = Max(node->maxsets, 1);
int setno;
+ /*
+ * When ending a parallel worker, copy the statistics gathered by the
+ * worker back into shared memory so that it can be picked up by the main
+ * process to report in EXPLAIN ANALYZE.
+ */
+ if (node->shared_info && IsParallelWorker())
+ {
+ AggregateInstrumentation *si;
+
+ Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+ si = &node->shared_info->sinstrument[ParallelWorkerNumber];
+ si->hash_batches_used = node->hash_batches_used;
+ si->hash_disk_used = node->hash_disk_used;
+ si->hash_mem_peak = node->hash_mem_peak;
+ }
+
/* Make sure we have closed any open tuplesorts */
if (node->sort_in)
@@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}
+
+/* ----------------------------------------------------------------
+ * Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+ /* ----------------------------------------------------------------
+ * ExecAggEstimate
+ *
+ * Estimate space required to propagate aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggEstimate(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
+ size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeDSM
+ *
+ * Initialize DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + pcxt->nworkers * sizeof(AggregateInstrumentation);
+ node->shared_info = shm_toc_allocate(pcxt->toc, size);
+ /* ensure any unfilled slots will contain zeroes */
+ memset(node->shared_info, 0, size);
+ node->shared_info->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+ node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeWorker
+ *
+ * Attach worker to DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
+{
+ node->shared_info =
+ shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggRetrieveInstrumentation
+ *
+ * Transfer aggregate statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggRetrieveInstrumentation(AggState *node)
+{
+ Size size;
+ SharedAggInfo *si;
+
+ if (node->shared_info == NULL)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
+ si = palloc(size);
+ memcpy(si, node->shared_info, size);
+ node->shared_info = si;
+}