diff options
author | Robert Haas <rhaas@postgresql.org> | 2017-03-08 12:05:43 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2017-03-08 12:05:43 -0500 |
commit | f35742ccb7aa53ee3ed8416bbb378b0c3eeb6bb9 (patch) | |
tree | 66c88951dd3b1546159869072e8ff3e3b506568b /src/backend/executor/nodeBitmapHeapscan.c | |
parent | 4eafdcc27608dfb8a3afa3155d6ae07f66179782 (diff) | |
download | postgresql-f35742ccb7aa53ee3ed8416bbb378b0c3eeb6bb9.tar.gz postgresql-f35742ccb7aa53ee3ed8416bbb378b0c3eeb6bb9.zip |
Support parallel bitmap heap scans.
The index is scanned by a single process, but then all cooperating
processes can iterate jointly over the resulting set of heap blocks.
In the future, we might also want to support using a parallel bitmap
index scan to set up for a parallel bitmap heap scan, but that's a
job for another day.
Dilip Kumar, with some corrections and cosmetic changes by me. The
larger patch set of which this is a part has been reviewed and tested
by (at least) Andres Freund, Amit Khandekar, Tushar Ahuja, Rafia
Sabih, Haribabu Kommi, Thomas Munro, and me.
Discussion: http://postgr.es/m/CAFiTN-uc4=0WxRGfCzs-xfkMYcSEWUC-Fon6thkJGjkh9i=13A@mail.gmail.com
Diffstat (limited to 'src/backend/executor/nodeBitmapHeapscan.c')
-rw-r--r-- | src/backend/executor/nodeBitmapHeapscan.c | 422 |
1 files changed, 378 insertions, 44 deletions
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index c1aa9f13bdf..833a93e1b7d 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -53,11 +53,15 @@ static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node); static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres); +static inline void BitmapDoneInitializingSharedState( + ParallelBitmapHeapState *pstate); static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node, TBMIterateResult *tbmres); static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node); static inline void BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan); +static bool BitmapShouldInitializeSharedState( + ParallelBitmapHeapState *pstate); /* ---------------------------------------------------------------- @@ -73,9 +77,12 @@ BitmapHeapNext(BitmapHeapScanState *node) HeapScanDesc scan; TIDBitmap *tbm; TBMIterator *tbmiterator; + TBMSharedIterator *shared_tbmiterator; TBMIterateResult *tbmres; OffsetNumber targoffset; TupleTableSlot *slot; + ParallelBitmapHeapState *pstate = node->pstate; + dsa_area *dsa = node->ss.ps.state->es_query_dsa; /* * extract necessary information from index scan node @@ -84,7 +91,10 @@ BitmapHeapNext(BitmapHeapScanState *node) slot = node->ss.ss_ScanTupleSlot; scan = node->ss.ss_currentScanDesc; tbm = node->tbm; - tbmiterator = node->tbmiterator; + if (pstate == NULL) + tbmiterator = node->tbmiterator; + else + shared_tbmiterator = node->shared_tbmiterator; tbmres = node->tbmres; /* @@ -99,25 +109,82 @@ BitmapHeapNext(BitmapHeapScanState *node) * node->prefetch_maximum. This is to avoid doing a lot of prefetching in * a scan that stops after a few tuples because of a LIMIT. */ - if (tbm == NULL) + if (!node->initialized) { - tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node)); + if (!pstate) + { + tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node)); - if (!tbm || !IsA(tbm, TIDBitmap)) - elog(ERROR, "unrecognized result from subplan"); + if (!tbm || !IsA(tbm, TIDBitmap)) + elog(ERROR, "unrecognized result from subplan"); - node->tbm = tbm; - node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm); - node->tbmres = tbmres = NULL; + node->tbm = tbm; + node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm); + node->tbmres = tbmres = NULL; #ifdef USE_PREFETCH - if (node->prefetch_maximum > 0) - { - node->prefetch_iterator = tbm_begin_iterate(tbm); - node->prefetch_pages = 0; - node->prefetch_target = -1; + if (node->prefetch_maximum > 0) + { + node->prefetch_iterator = tbm_begin_iterate(tbm); + node->prefetch_pages = 0; + node->prefetch_target = -1; + } +#endif /* USE_PREFETCH */ } + else + { + /* + * The leader will immediately come out of the function, but + * others will be blocked until leader populates the TBM and wakes + * them up. + */ + if (BitmapShouldInitializeSharedState(pstate)) + { + tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node)); + if (!tbm || !IsA(tbm, TIDBitmap)) + elog(ERROR, "unrecognized result from subplan"); + + node->tbm = tbm; + + /* + * Prepare to iterate over the TBM. This will return the + * dsa_pointer of the iterator state which will be used by + * multiple processes to iterate jointly. + */ + pstate->tbmiterator = tbm_prepare_shared_iterate(tbm); +#ifdef USE_PREFETCH + if (node->prefetch_maximum > 0) + { + pstate->prefetch_iterator = + tbm_prepare_shared_iterate(tbm); + + /* + * We don't need the mutex here as we haven't yet woke up + * others. + */ + pstate->prefetch_pages = 0; + pstate->prefetch_target = -1; + } +#endif + + /* We have initialized the shared state so wake up others. */ + BitmapDoneInitializingSharedState(pstate); + } + + /* Allocate a private iterator and attach the shared state to it */ + node->shared_tbmiterator = shared_tbmiterator = + tbm_attach_shared_iterate(dsa, pstate->tbmiterator); + node->tbmres = tbmres = NULL; + +#ifdef USE_PREFETCH + if (node->prefetch_maximum > 0) + { + node->shared_prefetch_iterator = + tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator); + } #endif /* USE_PREFETCH */ + } + node->initialized = true; } for (;;) @@ -130,7 +197,10 @@ BitmapHeapNext(BitmapHeapScanState *node) */ if (tbmres == NULL) { - node->tbmres = tbmres = tbm_iterate(tbmiterator); + if (!pstate) + node->tbmres = tbmres = tbm_iterate(tbmiterator); + else + node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator); if (tbmres == NULL) { /* no more entries in the bitmap */ @@ -182,8 +252,19 @@ BitmapHeapNext(BitmapHeapScanState *node) * Try to prefetch at least a few pages even before we get to the * second page if we don't stop reading after the first tuple. */ - if (node->prefetch_target < node->prefetch_maximum) - node->prefetch_target++; + if (!pstate) + { + if (node->prefetch_target < node->prefetch_maximum) + node->prefetch_target++; + } + else if (pstate->prefetch_target < node->prefetch_maximum) + { + /* take spinlock while updating shared state */ + SpinLockAcquire(&pstate->mutex); + if (pstate->prefetch_target < node->prefetch_maximum) + pstate->prefetch_target++; + SpinLockRelease(&pstate->mutex); + } #endif /* USE_PREFETCH */ } @@ -362,6 +443,21 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres) } /* + * BitmapDoneInitializingSharedState - Shared state is initialized + * + * By this time the leader has already populated the TBM and initialized the + * shared state so wake up other processes. + */ +static inline void +BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate) +{ + SpinLockAcquire(&pstate->mutex); + pstate->state = BM_FINISHED; + SpinLockRelease(&pstate->mutex); + ConditionVariableBroadcast(&pstate->cv); +} + +/* * BitmapAdjustPrefetchIterator - Adjust the prefetch iterator */ static inline void @@ -369,20 +465,53 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node, TBMIterateResult *tbmres) { #ifdef USE_PREFETCH - TBMIterator *prefetch_iterator = node->prefetch_iterator; + ParallelBitmapHeapState *pstate = node->pstate; - if (node->prefetch_pages > 0) + if (pstate == NULL) { - /* The main iterator has closed the distance by one page */ - node->prefetch_pages--; + TBMIterator *prefetch_iterator = node->prefetch_iterator; + + if (node->prefetch_pages > 0) + { + /* The main iterator has closed the distance by one page */ + node->prefetch_pages--; + } + else if (prefetch_iterator) + { + /* Do not let the prefetch iterator get behind the main one */ + TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + + if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno) + elog(ERROR, "prefetch and main iterators are out of sync"); + } + return; } - else if (prefetch_iterator) + + if (node->prefetch_maximum > 0) { - /* Do not let the prefetch iterator get behind the main one */ - TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator; + + SpinLockAcquire(&pstate->mutex); + if (pstate->prefetch_pages > 0) + { + node->prefetch_pages--; + SpinLockRelease(&pstate->mutex); + } + else + { + /* Release the mutex before iterating */ + SpinLockRelease(&pstate->mutex); - if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno) - elog(ERROR, "prefetch and main iterators are out of sync"); + /* + * In case of shared mode, we can not ensure that the current + * blockno of the main iterator and that of the prefetch iterator + * are same. It's possible that whatever blockno we are + * prefetching will be processed by another process. Therefore, we + * don't validate the blockno here as we do in non-parallel case. + */ + if (prefetch_iterator) + tbm_shared_iterate(prefetch_iterator); + } } #endif /* USE_PREFETCH */ } @@ -399,14 +528,35 @@ static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node) { #ifdef USE_PREFETCH - if (node->prefetch_target >= node->prefetch_maximum) - /* don't increase any further */ ; - else if (node->prefetch_target >= node->prefetch_maximum / 2) - node->prefetch_target = node->prefetch_maximum; - else if (node->prefetch_target > 0) - node->prefetch_target *= 2; - else - node->prefetch_target++; + ParallelBitmapHeapState *pstate = node->pstate; + + if (pstate == NULL) + { + if (node->prefetch_target >= node->prefetch_maximum) + /* don't increase any further */ ; + else if (node->prefetch_target >= node->prefetch_maximum / 2) + node->prefetch_target = node->prefetch_maximum; + else if (node->prefetch_target > 0) + node->prefetch_target *= 2; + else + node->prefetch_target++; + return; + } + + /* Do an unlocked check first to save spinlock acquisitions. */ + if (pstate->prefetch_target < node->prefetch_maximum) + { + SpinLockAcquire(&pstate->mutex); + if (pstate->prefetch_target >= node->prefetch_maximum) + /* don't increase any further */ ; + else if (pstate->prefetch_target >= node->prefetch_maximum / 2) + pstate->prefetch_target = node->prefetch_maximum; + else if (pstate->prefetch_target > 0) + pstate->prefetch_target *= 2; + else + pstate->prefetch_target++; + SpinLockRelease(&pstate->mutex); + } #endif /* USE_PREFETCH */ } @@ -417,23 +567,70 @@ static inline void BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan) { #ifdef USE_PREFETCH - TBMIterator *prefetch_iterator = node->prefetch_iterator; + ParallelBitmapHeapState *pstate = node->pstate; - if (prefetch_iterator) + if (pstate == NULL) { - while (node->prefetch_pages < node->prefetch_target) + TBMIterator *prefetch_iterator = node->prefetch_iterator; + + if (prefetch_iterator) { - TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + while (node->prefetch_pages < node->prefetch_target) + { + TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + + if (tbmpre == NULL) + { + /* No more pages to prefetch */ + tbm_end_iterate(prefetch_iterator); + node->prefetch_iterator = NULL; + break; + } + node->prefetch_pages++; + PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); + } + } + + return; + } - if (tbmpre == NULL) + if (pstate->prefetch_pages < pstate->prefetch_target) + { + TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator; + + if (prefetch_iterator) + { + while (1) { - /* No more pages to prefetch */ - tbm_end_iterate(prefetch_iterator); - node->prefetch_iterator = NULL; - break; + TBMIterateResult *tbmpre; + bool do_prefetch = false; + + /* + * Recheck under the mutex. If some other process has already + * done enough prefetching then we need not to do anything. + */ + SpinLockAcquire(&pstate->mutex); + if (pstate->prefetch_pages < pstate->prefetch_target) + { + pstate->prefetch_pages++; + do_prefetch = true; + } + SpinLockRelease(&pstate->mutex); + + if (!do_prefetch) + return; + + tbmpre = tbm_shared_iterate(prefetch_iterator); + if (tbmpre == NULL) + { + /* No more pages to prefetch */ + tbm_end_shared_iterate(prefetch_iterator); + node->shared_prefetch_iterator = NULL; + break; + } + + PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); } - node->prefetch_pages++; - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); } } #endif /* USE_PREFETCH */ @@ -488,12 +685,36 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node) tbm_end_iterate(node->tbmiterator); if (node->prefetch_iterator) tbm_end_iterate(node->prefetch_iterator); + if (node->shared_tbmiterator) + tbm_end_shared_iterate(node->shared_tbmiterator); + if (node->shared_prefetch_iterator) + tbm_end_shared_iterate(node->shared_prefetch_iterator); if (node->tbm) tbm_free(node->tbm); node->tbm = NULL; node->tbmiterator = NULL; node->tbmres = NULL; node->prefetch_iterator = NULL; + node->initialized = false; + node->shared_tbmiterator = NULL; + node->shared_prefetch_iterator = NULL; + + /* Reset parallel bitmap state, if present */ + if (node->pstate) + { + dsa_area *dsa = node->ss.ps.state->es_query_dsa; + + node->pstate->state = BM_INITIAL; + + if (DsaPointerIsValid(node->pstate->tbmiterator)) + tbm_free_shared_area(dsa, node->pstate->tbmiterator); + + if (DsaPointerIsValid(node->pstate->prefetch_iterator)) + tbm_free_shared_area(dsa, node->pstate->prefetch_iterator); + + node->pstate->tbmiterator = InvalidDsaPointer; + node->pstate->prefetch_iterator = InvalidDsaPointer; + } ExecScanReScan(&node->ss); @@ -546,6 +767,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node) tbm_end_iterate(node->prefetch_iterator); if (node->tbm) tbm_free(node->tbm); + if (node->shared_tbmiterator) + tbm_end_shared_iterate(node->shared_tbmiterator); + if (node->shared_prefetch_iterator) + tbm_end_shared_iterate(node->shared_prefetch_iterator); /* * close heap scan @@ -597,6 +822,10 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) scanstate->prefetch_target = 0; /* may be updated below */ scanstate->prefetch_maximum = target_prefetch_pages; + scanstate->pscan_len = 0; + scanstate->initialized = false; + scanstate->shared_tbmiterator = NULL; + scanstate->pstate = NULL; /* * Miscellaneous initialization @@ -681,3 +910,108 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) */ return scanstate; } + +/*---------------- + * BitmapShouldInitializeSharedState + * + * The first process to come here and see the state to the BM_INITIAL + * will become the leader for the parallel bitmap scan and will be + * responsible for populating the TIDBitmap. The other processes will + * be blocked by the condition variable until the leader wakes them up. + * --------------- + */ +static bool +BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate) +{ + SharedBitmapState state; + + while (1) + { + SpinLockAcquire(&pstate->mutex); + state = pstate->state; + if (pstate->state == BM_INITIAL) + pstate->state = BM_INPROGRESS; + SpinLockRelease(&pstate->mutex); + + /* Exit if bitmap is done, or if we're the leader. */ + if (state != BM_INPROGRESS) + break; + + /* Wait for the leader to wake us up. */ + ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN); + } + + ConditionVariableCancelSleep(); + + return (state == BM_INITIAL); +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapEstimate + * + * estimates the space required to serialize bitmap scan node. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapEstimate(BitmapHeapScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + + node->pscan_len = add_size(offsetof(ParallelBitmapHeapState, + phs_snapshot_data), + EstimateSnapshotSpace(estate->es_snapshot)); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapInitializeDSM + * + * Set up a parallel bitmap heap scan descriptor. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt) +{ + ParallelBitmapHeapState *pstate; + EState *estate = node->ss.ps.state; + + pstate = shm_toc_allocate(pcxt->toc, node->pscan_len); + + pstate->tbmiterator = 0; + pstate->prefetch_iterator = 0; + + /* Initialize the mutex */ + SpinLockInit(&pstate->mutex); + pstate->prefetch_pages = 0; + pstate->prefetch_target = 0; + pstate->state = BM_INITIAL; + + ConditionVariableInit(&pstate->cv); + SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data); + + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate); + node->pstate = pstate; +} + +/* ---------------------------------------------------------------- + * ExecBitmapHeapInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void +ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) +{ + ParallelBitmapHeapState *pstate; + Snapshot snapshot; + + pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + node->pstate = pstate; + + snapshot = RestoreSnapshot(pstate->phs_snapshot_data); + heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot); +} |