aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/explain.c110
-rw-r--r--src/backend/executor/execParallel.c19
-rw-r--r--src/backend/executor/nodeAgg.c103
-rw-r--r--src/include/executor/nodeAgg.h7
-rw-r--r--src/include/nodes/execnodes.h22
5 files changed, 244 insertions, 17 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 9092b4b3094..67bdcb2b278 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es)
Agg *agg = (Agg *) aggstate->ss.ps.plan;
int64 memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
- Assert(IsA(aggstate, AggState));
-
if (agg->aggstrategy != AGG_HASHED &&
agg->aggstrategy != AGG_MIXED)
return;
- if (es->costs && aggstate->hash_planned_partitions > 0)
+ if (es->format != EXPLAIN_FORMAT_TEXT)
{
- ExplainPropertyInteger("Planned Partitions", NULL,
- aggstate->hash_planned_partitions, es);
+
+ if (es->costs && aggstate->hash_planned_partitions > 0)
+ {
+ ExplainPropertyInteger("Planned Partitions", NULL,
+ aggstate->hash_planned_partitions, es);
+ }
+
+ if (!es->analyze)
+ return;
+
+ /* EXPLAIN ANALYZE */
+ ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
+ if (aggstate->hash_batches_used > 0)
+ {
+ ExplainPropertyInteger("Disk Usage", "kB",
+ aggstate->hash_disk_used, es);
+ ExplainPropertyInteger("HashAgg Batches", NULL,
+ aggstate->hash_batches_used, es);
+ }
}
+ else
+ {
+ bool gotone = false;
- if (!es->analyze)
- return;
+ if (es->costs && aggstate->hash_planned_partitions > 0)
+ {
+ ExplainIndentText(es);
+ appendStringInfo(es->str, "Planned Partitions: %d",
+ aggstate->hash_planned_partitions);
+ gotone = true;
+ }
+
+ if (!es->analyze)
+ {
+ if (gotone)
+ appendStringInfoChar(es->str, '\n');
+ return;
+ }
+
+ if (!gotone)
+ ExplainIndentText(es);
+ else
+ appendStringInfoString(es->str, " ");
+
+ appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
+ memPeakKb);
- /* EXPLAIN ANALYZE */
- ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
- if (aggstate->hash_batches_used > 0)
+ if (aggstate->hash_batches_used > 0)
+ appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d",
+ aggstate->hash_disk_used,
+ aggstate->hash_batches_used);
+ appendStringInfoChar(es->str, '\n');
+ }
+
+ /* Display stats for each parallel worker */
+ if (es->analyze && aggstate->shared_info != NULL)
{
- ExplainPropertyInteger("Disk Usage", "kB",
- aggstate->hash_disk_used, es);
- ExplainPropertyInteger("HashAgg Batches", NULL,
- aggstate->hash_batches_used, es);
+ for (int n = 0; n < aggstate->shared_info->num_workers; n++)
+ {
+ AggregateInstrumentation *sinstrument;
+ uint64 hash_disk_used;
+ int hash_batches_used;
+
+ sinstrument = &aggstate->shared_info->sinstrument[n];
+ hash_disk_used = sinstrument->hash_disk_used;
+ hash_batches_used = sinstrument->hash_batches_used;
+ memPeakKb = (sinstrument->hash_mem_peak + 1023) / 1024;
+
+ if (es->workers_state)
+ ExplainOpenWorker(n, es);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ ExplainIndentText(es);
+
+ appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
+ memPeakKb);
+
+ if (hash_batches_used > 0)
+ appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d",
+ hash_disk_used, hash_batches_used);
+ appendStringInfoChar(es->str, '\n');
+ }
+ else
+ {
+ ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb,
+ es);
+ if (hash_batches_used > 0)
+ {
+ ExplainPropertyInteger("Disk Usage", "kB", hash_disk_used,
+ es);
+ ExplainPropertyInteger("HashAgg Batches", NULL,
+ hash_batches_used, es);
+ }
+ }
+
+ if (es->workers_state)
+ ExplainCloseWorker(n, es);
+ }
}
}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 41cb41481df..382e78fb7fe 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
@@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggEstimate((AggState *) planstate, e->pcxt);
+ break;
default:
break;
}
@@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
+ break;
default:
break;
}
@@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
+ case T_AggState:
+ ExecAggRetrieveInstrumentation((AggState *) planstate);
+ break;
default:
break;
}
@@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
pwcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeWorker((AggState *) planstate, pwcxt);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 331acee2814..a20554ae65a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -240,6 +240,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
@@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
int numGroupingSets = Max(node->maxsets, 1);
int setno;
+ /*
+ * When ending a parallel worker, copy the statistics gathered by the
+ * worker back into shared memory so that it can be picked up by the main
+ * process to report in EXPLAIN ANALYZE.
+ */
+ if (node->shared_info && IsParallelWorker())
+ {
+ AggregateInstrumentation *si;
+
+ Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+ si = &node->shared_info->sinstrument[ParallelWorkerNumber];
+ si->hash_batches_used = node->hash_batches_used;
+ si->hash_disk_used = node->hash_disk_used;
+ si->hash_mem_peak = node->hash_mem_peak;
+ }
+
/* Make sure we have closed any open tuplesorts */
if (node->sort_in)
@@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}
+
+/* ----------------------------------------------------------------
+ * Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+ /* ----------------------------------------------------------------
+ * ExecAggEstimate
+ *
+ * Estimate space required to propagate aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggEstimate(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
+ size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeDSM
+ *
+ * Initialize DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + pcxt->nworkers * sizeof(AggregateInstrumentation);
+ node->shared_info = shm_toc_allocate(pcxt->toc, size);
+ /* ensure any unfilled slots will contain zeroes */
+ memset(node->shared_info, 0, size);
+ node->shared_info->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+ node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeWorker
+ *
+ * Attach worker to DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
+{
+ node->shared_info =
+ shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggRetrieveInstrumentation
+ *
+ * Transfer aggregate statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggRetrieveInstrumentation(AggState *node)
+{
+ Size size;
+ SharedAggInfo *si;
+
+ if (node->shared_info == NULL)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
+ si = palloc(size);
+ memcpy(si, node->shared_info, size);
+ node->shared_info = si;
+}
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index 92c2337fd3a..bb0805abe09 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -14,6 +14,7 @@
#ifndef NODEAGG_H
#define NODEAGG_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
@@ -323,4 +324,10 @@ extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
int used_bits, Size *mem_limit,
uint64 *ngroups_limit, int *num_partitions);
+/* parallel instrumentation support */
+extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt);
+extern void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt);
+extern void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAggRetrieveInstrumentation(AggState *node);
+
#endif /* NODEAGG_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 98e0072b8ad..f5dfa32d55c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2102,6 +2102,27 @@ typedef struct GroupState
} GroupState;
/* ---------------------
+ * per-worker aggregate information
+ * ---------------------
+ */
+typedef struct AggregateInstrumentation
+{
+ Size hash_mem_peak; /* peak hash table memory usage */
+ uint64 hash_disk_used; /* kB of disk space used */
+ int hash_batches_used; /* batches used during entire execution */
+} AggregateInstrumentation;
+
+/* ----------------
+ * Shared memory container for per-worker aggregate information
+ * ----------------
+ */
+typedef struct SharedAggInfo
+{
+ int num_workers;
+ AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedAggInfo;
+
+/* ---------------------
* AggState information
*
* ss.ss_ScanTupleSlot refers to output of underlying plan.
@@ -2190,6 +2211,7 @@ typedef struct AggState
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
* ->hash_pergroup */
ProjectionInfo *combinedproj; /* projection machinery */
+ SharedAggInfo *shared_info; /* one entry per worker */
} AggState;
/* ----------------