aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/explain.c46
-rw-r--r--src/backend/executor/nodeHash.c58
-rw-r--r--src/backend/executor/nodeHashjoin.c10
-rw-r--r--src/include/executor/nodeHash.h4
-rw-r--r--src/include/nodes/execnodes.h18
5 files changed, 87 insertions, 49 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 455f54ef83f..f3c8da1e01b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2964,22 +2964,25 @@ show_hash_info(HashState *hashstate, ExplainState *es)
HashInstrumentation hinstrument = {0};
/*
+ * Collect stats from the local process, even when it's a parallel query.
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
* prepared to get instrumentation data from all participants.
*/
- if (hashstate->hashtable)
- ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
+ if (hashstate->hinstrument)
+ memcpy(&hinstrument, hashstate->hinstrument,
+ sizeof(HashInstrumentation));
/*
* Merge results from workers. In the parallel-oblivious case, the
* results from all participants should be identical, except where
* participants didn't run the join at all so have no data. In the
* parallel-aware case, we need to consider all the results. Each worker
- * may have seen a different subset of batches and we want to find the
- * highest memory usage for any one batch across all batches.
+ * may have seen a different subset of batches and we want to report the
+ * highest memory usage across all batches. We take the maxima of other
+ * values too, for the same reasons as in ExecHashAccumInstrumentation.
*/
if (hashstate->shared_info)
{
@@ -2990,31 +2993,16 @@ show_hash_info(HashState *hashstate, ExplainState *es)
{
HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
- if (worker_hi->nbatch > 0)
- {
- /*
- * Every participant should agree on the buckets, so to be
- * sure we have a value we'll just overwrite each time.
- */
- hinstrument.nbuckets = worker_hi->nbuckets;
- hinstrument.nbuckets_original = worker_hi->nbuckets_original;
-
- /*
- * Normally every participant should agree on the number of
- * batches too, but it's possible for a backend that started
- * late and missed the whole join not to have the final nbatch
- * number. So we'll take the largest number.
- */
- hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
- hinstrument.nbatch_original = worker_hi->nbatch_original;
-
- /*
- * In a parallel-aware hash join, for now we report the
- * maximum peak memory reported by any worker.
- */
- hinstrument.space_peak =
- Max(hinstrument.space_peak, worker_hi->space_peak);
- }
+ hinstrument.nbuckets = Max(hinstrument.nbuckets,
+ worker_hi->nbuckets);
+ hinstrument.nbuckets_original = Max(hinstrument.nbuckets_original,
+ worker_hi->nbuckets_original);
+ hinstrument.nbatch = Max(hinstrument.nbatch,
+ worker_hi->nbatch);
+ hinstrument.nbatch_original = Max(hinstrument.nbatch_original,
+ worker_hi->nbatch_original);
+ hinstrument.space_peak = Max(hinstrument.space_peak,
+ worker_hi->space_peak);
}
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index c881dc1de81..5da13ada726 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2597,7 +2597,10 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
size = offsetof(SharedHashInfo, hinstrument) +
pcxt->nworkers * sizeof(HashInstrumentation);
node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+
+ /* Each per-worker area must start out as zeroes. */
memset(node->shared_info, 0, size);
+
node->shared_info->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
node->shared_info);
@@ -2616,22 +2619,33 @@ ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
if (!node->ps.instrument)
return;
+ /*
+ * Find our entry in the shared area, and set up a pointer to it so that
+ * we'll accumulate stats there when shutting down or rebuilding the hash
+ * table.
+ */
shared_info = (SharedHashInfo *)
shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
}
/*
- * Copy instrumentation data from this worker's hash table (if it built one)
- * to DSM memory so the leader can retrieve it. This must be done in an
- * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
- * we've detached from the DSM segment.
+ * Collect EXPLAIN stats if needed, saving them into DSM memory if
+ * ExecHashInitializeWorker was called, or local storage if not. In the
+ * parallel case, this must be done in ExecShutdownHash() rather than
+ * ExecEndHash() because the latter runs after we've detached from the DSM
+ * segment.
*/
void
ExecShutdownHash(HashState *node)
{
+ /* Allocate save space if EXPLAIN'ing and we didn't do so already */
+ if (node->ps.instrument && !node->hinstrument)
+ node->hinstrument = (HashInstrumentation *)
+ palloc0(sizeof(HashInstrumentation));
+ /* Now accumulate data for the current (final) hash table */
if (node->hinstrument && node->hashtable)
- ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+ ExecHashAccumInstrumentation(node->hinstrument, node->hashtable);
}
/*
@@ -2655,18 +2669,34 @@ ExecHashRetrieveInstrumentation(HashState *node)
}
/*
- * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
- * struct.
+ * Accumulate instrumentation data from 'hashtable' into an
+ * initially-zeroed HashInstrumentation struct.
+ *
+ * This is used to merge information across successive hash table instances
+ * within a single plan node. We take the maximum values of each interesting
+ * number. The largest nbuckets and largest nbatch values might have occurred
+ * in different instances, so there's some risk of confusion from reporting
+ * unrelated numbers; but there's a bigger risk of misdiagnosing a performance
+ * issue if we don't report the largest values. Similarly, we want to report
+ * the largest spacePeak regardless of whether it happened in the same
+ * instance as the largest nbuckets or nbatch. All the instances should have
+ * the same nbuckets_original and nbatch_original; but there's little value
+ * in depending on that here, so handle them the same way.
*/
void
-ExecHashGetInstrumentation(HashInstrumentation *instrument,
- HashJoinTable hashtable)
+ExecHashAccumInstrumentation(HashInstrumentation *instrument,
+ HashJoinTable hashtable)
{
- instrument->nbuckets = hashtable->nbuckets;
- instrument->nbuckets_original = hashtable->nbuckets_original;
- instrument->nbatch = hashtable->nbatch;
- instrument->nbatch_original = hashtable->nbatch_original;
- instrument->space_peak = hashtable->spacePeak;
+ instrument->nbuckets = Max(instrument->nbuckets,
+ hashtable->nbuckets);
+ instrument->nbuckets_original = Max(instrument->nbuckets_original,
+ hashtable->nbuckets_original);
+ instrument->nbatch = Max(instrument->nbatch,
+ hashtable->nbatch);
+ instrument->nbatch_original = Max(instrument->nbatch_original,
+ hashtable->nbatch_original);
+ instrument->space_peak = Max(instrument->space_peak,
+ hashtable->spacePeak);
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 9e28ddd8951..cc8edacdd01 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1338,8 +1338,16 @@ ExecReScanHashJoin(HashJoinState *node)
/* must destroy and rebuild hash table */
HashState *hashNode = castNode(HashState, innerPlanState(node));
- /* for safety, be sure to clear child plan node's pointer too */
Assert(hashNode->hashtable == node->hj_HashTable);
+ /* accumulate stats from old hash table, if wanted */
+ /* (this should match ExecShutdownHash) */
+ if (hashNode->ps.instrument && !hashNode->hinstrument)
+ hashNode->hinstrument = (HashInstrumentation *)
+ palloc0(sizeof(HashInstrumentation));
+ if (hashNode->hinstrument)
+ ExecHashAccumInstrumentation(hashNode->hinstrument,
+ hashNode->hashtable);
+ /* for safety, be sure to clear child plan node's pointer too */
hashNode->hashtable = NULL;
ExecHashTableDestroy(node->hj_HashTable);
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1336fde6b4d..64d2ce693ca 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -73,7 +73,7 @@ extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
-extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
- HashJoinTable hashtable);
+extern void ExecHashAccumInstrumentation(HashInstrumentation *instrument,
+ HashJoinTable hashtable);
#endif /* NODEHASH_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4c009b1a7c5..4fee043bb2b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2358,7 +2358,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of buckets */
int nbatch; /* number of batches at end of execution */
int nbatch_original; /* planned number of batches */
- size_t space_peak; /* peak memory usage in bytes */
+ Size space_peak; /* peak memory usage in bytes */
} HashInstrumentation;
/* ----------------
@@ -2381,8 +2381,20 @@ typedef struct HashState
HashJoinTable hashtable; /* hash table for the hashjoin */
List *hashkeys; /* list of ExprState nodes */
- SharedHashInfo *shared_info; /* one entry per worker */
- HashInstrumentation *hinstrument; /* this worker's entry */
+ /*
+ * In a parallelized hash join, the leader retains a pointer to the
+ * shared-memory stats area in its shared_info field, and then copies the
+ * shared-memory info back to local storage before DSM shutdown. The
+ * shared_info field remains NULL in workers, or in non-parallel joins.
+ */
+ SharedHashInfo *shared_info;
+
+ /*
+ * If we are collecting hash stats, this points to an initially-zeroed
+ * collection area, which could be either local storage or in shared
+ * memory; either way it's for just one process.
+ */
+ HashInstrumentation *hinstrument;
/* Parallel hash state. */
struct ParallelHashJoinState *parallel_state;