aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c43
-rw-r--r--src/backend/executor/execProcnode.c3
-rw-r--r--src/backend/executor/nodeHash.c104
3 files changed, 140 insertions, 10 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 53c5254be13..0aca00b0e68 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -29,6 +29,7 @@
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
+#include "executor/nodeHash.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
@@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashEstimate((HashState *) planstate, e->pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortEstimate((SortState *) planstate, e->pcxt);
break;
@@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
break;
@@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashReInitializeDSM((HashState *) planstate, pcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortReInitializeDSM((SortState *) planstate, pcxt);
break;
@@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
- /*
- * Perform any node-type-specific work that needs to be done. Currently,
- * only Sort nodes need to do anything here.
- */
- if (IsA(planstate, SortState))
- ExecSortRetrieveInstrumentation((SortState *) planstate);
+ /* Perform any node-type-specific work that needs to be done. */
+ switch (nodeTag(planstate))
+ {
+ case T_SortState:
+ ExecSortRetrieveInstrumentation((SortState *) planstate);
+ break;
+ case T_HashState:
+ ExecHashRetrieveInstrumentation((HashState *) planstate);
+ break;
+ default:
+ break;
+ }
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
@@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
+ case T_HashState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecHashInitializeWorker((HashState *) planstate, pwcxt);
+ break;
case T_SortState:
- /* even when not parallel-aware */
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeWorker((SortState *) planstate, pwcxt);
break;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index c1aa5064c90..9befca90161 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node)
case T_GatherMergeState:
ExecShutdownGatherMerge((GatherMergeState *) node);
break;
+ case T_HashState:
+ ExecShutdownHash((HashState *) node);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index f7cd8fb3472..6fe5d69d558 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -1638,6 +1638,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
/*
+ * Reserve space in the DSM segment for instrumentation data.
+ */
+void
+ExecHashEstimate(HashState *node, ParallelContext *pcxt)
+{
+ size_t size;
+
+ size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
+ size = add_size(size, offsetof(SharedHashInfo, hinstrument));
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up a space in the DSM for all workers to record instrumentation data
+ * about their hash table.
+ */
+void
+ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+ size_t size;
+
+ size = offsetof(SharedHashInfo, hinstrument) +
+ pcxt->nworkers * sizeof(HashInstrumentation);
+ node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+ memset(node->shared_info, 0, size);
+ node->shared_info->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
+ node->shared_info);
+}
+
+/*
+ * Reset shared state before beginning a fresh scan.
+ */
+void
+ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+ if (node->shared_info != NULL)
+ {
+ memset(node->shared_info->hinstrument, 0,
+ node->shared_info->num_workers * sizeof(HashInstrumentation));
+ }
+}
+
+/*
+ * Locate the DSM space for hash table instrumentation data that we'll write
+ * to at shutdown time.
+ */
+void
+ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
+{
+ SharedHashInfo *shared_info;
+
+ shared_info = (SharedHashInfo *)
+ shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
+ node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
+}
+
+/*
+ * Copy instrumentation data from this worker's hash table (if it built one)
+ * to DSM memory so the leader can retrieve it. This must be done in an
+ * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
+ * we've detached from the DSM segment.
+ */
+void
+ExecShutdownHash(HashState *node)
+{
+ if (node->hinstrument && node->hashtable)
+ ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+}
+
+/*
+ * Retrieve instrumentation data from workers before the DSM segment is
+ * detached, so that EXPLAIN can access it.
+ */
+void
+ExecHashRetrieveInstrumentation(HashState *node)
+{
+ SharedHashInfo *shared_info = node->shared_info;
+ size_t size;
+
+ /* Replace node->shared_info with a copy in backend-local memory. */
+ size = offsetof(SharedHashInfo, hinstrument) +
+ shared_info->num_workers * sizeof(HashInstrumentation);
+ node->shared_info = palloc(size);
+ memcpy(node->shared_info, shared_info, size);
+}
+
+/*
+ * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
+ * struct.
+ */
+void
+ExecHashGetInstrumentation(HashInstrumentation *instrument,
+ HashJoinTable hashtable)
+{
+ instrument->nbuckets = hashtable->nbuckets;
+ instrument->nbuckets_original = hashtable->nbuckets_original;
+ instrument->nbatch = hashtable->nbatch;
+ instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->space_peak = hashtable->spacePeak;
+}
+
+/*
* Allocate 'size' bytes from the currently active HashMemoryChunk
*/
static void *