diff options
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r-- | src/backend/executor/nodeHash.c | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index f7cd8fb3472..6fe5d69d558 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1638,6 +1638,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } /* + * Reserve space in the DSM segment for instrumentation data. + */ +void +ExecHashEstimate(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); + size = add_size(size, offsetof(SharedHashInfo, hinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up a space in the DSM for all workers to record instrumentation data + * about their hash table. + */ +void +ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = offsetof(SharedHashInfo, hinstrument) + + pcxt->nworkers * sizeof(HashInstrumentation); + node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, + node->shared_info); +} + +/* + * Reset shared state before beginning a fresh scan. + */ +void +ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + if (node->shared_info != NULL) + { + memset(node->shared_info->hinstrument, 0, + node->shared_info->num_workers * sizeof(HashInstrumentation)); + } +} + +/* + * Locate the DSM space for hash table instrumentation data that we'll write + * to at shutdown time. + */ +void +ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) +{ + SharedHashInfo *shared_info; + + shared_info = (SharedHashInfo *) + shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true); + node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; +} + +/* + * Copy instrumentation data from this worker's hash table (if it built one) + * to DSM memory so the leader can retrieve it. This must be done in an + * ExecShutdownHash() rather than ExecEndHash() because the latter runs after + * we've detached from the DSM segment. + */ +void +ExecShutdownHash(HashState *node) +{ + if (node->hinstrument && node->hashtable) + ExecHashGetInstrumentation(node->hinstrument, node->hashtable); +} + +/* + * Retrieve instrumentation data from workers before the DSM segment is + * detached, so that EXPLAIN can access it. + */ +void +ExecHashRetrieveInstrumentation(HashState *node) +{ + SharedHashInfo *shared_info = node->shared_info; + size_t size; + + /* Replace node->shared_info with a copy in backend-local memory. */ + size = offsetof(SharedHashInfo, hinstrument) + + shared_info->num_workers * sizeof(HashInstrumentation); + node->shared_info = palloc(size); + memcpy(node->shared_info, shared_info, size); +} + +/* + * Copy the instrumentation data from 'hashtable' into a HashInstrumentation + * struct. + */ +void +ExecHashGetInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable) +{ + instrument->nbuckets = hashtable->nbuckets; + instrument->nbuckets_original = hashtable->nbuckets_original; + instrument->nbatch = hashtable->nbatch; + instrument->nbatch_original = hashtable->nbatch_original; + instrument->space_peak = hashtable->spacePeak; +} + +/* * Allocate 'size' bytes from the currently active HashMemoryChunk */ static void * |