diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/commands/explain.c | 46 | ||||
-rw-r--r-- | src/backend/executor/nodeHash.c | 58 | ||||
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 10 | ||||
-rw-r--r-- | src/include/executor/nodeHash.h | 4 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 18 |
5 files changed, 87 insertions, 49 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 455f54ef83f..f3c8da1e01b 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2964,22 +2964,25 @@ show_hash_info(HashState *hashstate, ExplainState *es) HashInstrumentation hinstrument = {0}; /* + * Collect stats from the local process, even when it's a parallel query. * In a parallel query, the leader process may or may not have run the * hash join, and even if it did it may not have built a hash table due to * timing (if it started late it might have seen no tuples in the outer * relation and skipped building the hash table). Therefore we have to be * prepared to get instrumentation data from all participants. */ - if (hashstate->hashtable) - ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable); + if (hashstate->hinstrument) + memcpy(&hinstrument, hashstate->hinstrument, + sizeof(HashInstrumentation)); /* * Merge results from workers. In the parallel-oblivious case, the * results from all participants should be identical, except where * participants didn't run the join at all so have no data. In the * parallel-aware case, we need to consider all the results. Each worker - * may have seen a different subset of batches and we want to find the - * highest memory usage for any one batch across all batches. + * may have seen a different subset of batches and we want to report the + * highest memory usage across all batches. We take the maxima of other + * values too, for the same reasons as in ExecHashAccumInstrumentation. */ if (hashstate->shared_info) { @@ -2990,31 +2993,16 @@ show_hash_info(HashState *hashstate, ExplainState *es) { HashInstrumentation *worker_hi = &shared_info->hinstrument[i]; - if (worker_hi->nbatch > 0) - { - /* - * Every participant should agree on the buckets, so to be - * sure we have a value we'll just overwrite each time. - */ - hinstrument.nbuckets = worker_hi->nbuckets; - hinstrument.nbuckets_original = worker_hi->nbuckets_original; - - /* - * Normally every participant should agree on the number of - * batches too, but it's possible for a backend that started - * late and missed the whole join not to have the final nbatch - * number. So we'll take the largest number. - */ - hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch); - hinstrument.nbatch_original = worker_hi->nbatch_original; - - /* - * In a parallel-aware hash join, for now we report the - * maximum peak memory reported by any worker. - */ - hinstrument.space_peak = - Max(hinstrument.space_peak, worker_hi->space_peak); - } + hinstrument.nbuckets = Max(hinstrument.nbuckets, + worker_hi->nbuckets); + hinstrument.nbuckets_original = Max(hinstrument.nbuckets_original, + worker_hi->nbuckets_original); + hinstrument.nbatch = Max(hinstrument.nbatch, + worker_hi->nbatch); + hinstrument.nbatch_original = Max(hinstrument.nbatch_original, + worker_hi->nbatch_original); + hinstrument.space_peak = Max(hinstrument.space_peak, + worker_hi->space_peak); } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index c881dc1de81..5da13ada726 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2597,7 +2597,10 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) size = offsetof(SharedHashInfo, hinstrument) + pcxt->nworkers * sizeof(HashInstrumentation); node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); + + /* Each per-worker area must start out as zeroes. */ memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, node->shared_info); @@ -2616,22 +2619,33 @@ ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) if (!node->ps.instrument) return; + /* + * Find our entry in the shared area, and set up a pointer to it so that + * we'll accumulate stats there when shutting down or rebuilding the hash + * table. + */ shared_info = (SharedHashInfo *) shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; } /* - * Copy instrumentation data from this worker's hash table (if it built one) - * to DSM memory so the leader can retrieve it. This must be done in an - * ExecShutdownHash() rather than ExecEndHash() because the latter runs after - * we've detached from the DSM segment. + * Collect EXPLAIN stats if needed, saving them into DSM memory if + * ExecHashInitializeWorker was called, or local storage if not. In the + * parallel case, this must be done in ExecShutdownHash() rather than + * ExecEndHash() because the latter runs after we've detached from the DSM + * segment. */ void ExecShutdownHash(HashState *node) { + /* Allocate save space if EXPLAIN'ing and we didn't do so already */ + if (node->ps.instrument && !node->hinstrument) + node->hinstrument = (HashInstrumentation *) + palloc0(sizeof(HashInstrumentation)); + /* Now accumulate data for the current (final) hash table */ if (node->hinstrument && node->hashtable) - ExecHashGetInstrumentation(node->hinstrument, node->hashtable); + ExecHashAccumInstrumentation(node->hinstrument, node->hashtable); } /* @@ -2655,18 +2669,34 @@ ExecHashRetrieveInstrumentation(HashState *node) } /* - * Copy the instrumentation data from 'hashtable' into a HashInstrumentation - * struct. + * Accumulate instrumentation data from 'hashtable' into an + * initially-zeroed HashInstrumentation struct. + * + * This is used to merge information across successive hash table instances + * within a single plan node. We take the maximum values of each interesting + * number. The largest nbuckets and largest nbatch values might have occurred + * in different instances, so there's some risk of confusion from reporting + * unrelated numbers; but there's a bigger risk of misdiagnosing a performance + * issue if we don't report the largest values. Similarly, we want to report + * the largest spacePeak regardless of whether it happened in the same + * instance as the largest nbuckets or nbatch. All the instances should have + * the same nbuckets_original and nbatch_original; but there's little value + * in depending on that here, so handle them the same way. */ void -ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable) +ExecHashAccumInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable) { - instrument->nbuckets = hashtable->nbuckets; - instrument->nbuckets_original = hashtable->nbuckets_original; - instrument->nbatch = hashtable->nbatch; - instrument->nbatch_original = hashtable->nbatch_original; - instrument->space_peak = hashtable->spacePeak; + instrument->nbuckets = Max(instrument->nbuckets, + hashtable->nbuckets); + instrument->nbuckets_original = Max(instrument->nbuckets_original, + hashtable->nbuckets_original); + instrument->nbatch = Max(instrument->nbatch, + hashtable->nbatch); + instrument->nbatch_original = Max(instrument->nbatch_original, + hashtable->nbatch_original); + instrument->space_peak = Max(instrument->space_peak, + hashtable->spacePeak); } /* diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9e28ddd8951..cc8edacdd01 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1338,8 +1338,16 @@ ExecReScanHashJoin(HashJoinState *node) /* must destroy and rebuild hash table */ HashState *hashNode = castNode(HashState, innerPlanState(node)); - /* for safety, be sure to clear child plan node's pointer too */ Assert(hashNode->hashtable == node->hj_HashTable); + /* accumulate stats from old hash table, if wanted */ + /* (this should match ExecShutdownHash) */ + if (hashNode->ps.instrument && !hashNode->hinstrument) + hashNode->hinstrument = (HashInstrumentation *) + palloc0(sizeof(HashInstrumentation)); + if (hashNode->hinstrument) + ExecHashAccumInstrumentation(hashNode->hinstrument, + hashNode->hashtable); + /* for safety, be sure to clear child plan node's pointer too */ hashNode->hashtable = NULL; ExecHashTableDestroy(node->hj_HashTable); diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 1336fde6b4d..64d2ce693ca 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -73,7 +73,7 @@ extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); -extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable); +extern void ExecHashAccumInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4c009b1a7c5..4fee043bb2b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2358,7 +2358,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ - size_t space_peak; /* peak memory usage in bytes */ + Size space_peak; /* peak memory usage in bytes */ } HashInstrumentation; /* ---------------- @@ -2381,8 +2381,20 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ - SharedHashInfo *shared_info; /* one entry per worker */ - HashInstrumentation *hinstrument; /* this worker's entry */ + /* + * In a parallelized hash join, the leader retains a pointer to the + * shared-memory stats area in its shared_info field, and then copies the + * shared-memory info back to local storage before DSM shutdown. The + * shared_info field remains NULL in workers, or in non-parallel joins. + */ + SharedHashInfo *shared_info; + + /* + * If we are collecting hash stats, this points to an initially-zeroed + * collection area, which could be either local storage or in shared + * memory; either way it's for just one process. + */ + HashInstrumentation *hinstrument; /* Parallel hash state. */ struct ParallelHashJoinState *parallel_state; |