diff options
-rw-r--r-- | src/backend/access/brin/brin.c | 130 |
1 files changed, 97 insertions, 33 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 041415a40e7..32722f0961b 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -229,6 +229,8 @@ static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Rela bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static double _brin_parallel_heapscan(BrinBuildState *buildstate); +static double _brin_parallel_merge(BrinBuildState *buildstate); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index); static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, @@ -1201,6 +1203,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) tuplesort_begin_index_brin(maintenance_work_mem, coordinate, TUPLESORT_NONE); + /* scan the relation and merge per-worker results */ + reltuples = _brin_parallel_merge(state); + _brin_end_parallel(state->bs_leader, state); } else /* no parallel index build */ @@ -1233,14 +1238,10 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) brin_fill_empty_ranges(state, state->bs_currRangeStart, state->bs_maxRangeStart); - - /* track the number of relation tuples */ - state->bs_reltuples = reltuples; } /* release resources */ idxtuples = state->bs_numtuples; - reltuples = state->bs_reltuples; brinRevmapTerminate(state->bs_rmAccess); terminate_brin_buildstate(state); @@ -2329,6 +2330,22 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys) return true; } +/* + * Create parallel context, and launch workers for leader. + * + * buildstate argument should be initialized (with the exception of the + * tuplesort states, which may later be created based on shared + * state initially set up here). + * + * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY. + * + * request is the target number of parallel worker processes to launch. + * + * Sets buildstate's BrinLeader, which caller must use to shut down parallel + * mode by passing it to _brin_end_parallel() at the very end of its index + * build. If not even a single worker process can be launched, this is + * never set, and caller should proceed with a serial index build. + */ static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request) @@ -2517,27 +2534,87 @@ static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) { int i; - BrinTuple *btup; - BrinMemTuple *memtuple = NULL; - Size tuplen; - BrinShared *brinshared = brinleader->brinshared; - BlockNumber prevblkno = InvalidBlockNumber; - MemoryContext rangeCxt, - oldCxt; /* Shutdown worker processes */ WaitForParallelWorkersToFinish(brinleader->pcxt); /* - * If we didn't actually launch workers, we still have to make sure to - * exit parallel mode. + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) */ - if (!state) - goto cleanup; + for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(brinleader->snapshot)) + UnregisterSnapshot(brinleader->snapshot); + DestroyParallelContext(brinleader->pcxt); + ExitParallelMode(); +} + +/* + * Within leader, wait for end of heap scan. + * + * When called, parallel heap scan started by _brin_begin_parallel() will + * already be underway within worker processes (when leader participates + * as a worker, we should end up here just as workers are finishing). + * + * Returns the total number of heap tuples scanned. + */ +static double +_brin_parallel_heapscan(BrinBuildState *state) +{ + BrinShared *brinshared = state->bs_leader->brinshared; + int nparticipanttuplesorts; + + nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; + for (;;) + { + SpinLockAcquire(&brinshared->mutex); + if (brinshared->nparticipantsdone == nparticipanttuplesorts) + { + /* copy the data into leader state */ + state->bs_reltuples = brinshared->reltuples; + state->bs_numtuples = brinshared->indtuples; - /* copy the data into leader state (we have to wait for the workers ) */ - state->bs_reltuples = brinshared->reltuples; - state->bs_numtuples = brinshared->indtuples; + SpinLockRelease(&brinshared->mutex); + break; + } + SpinLockRelease(&brinshared->mutex); + + ConditionVariableSleep(&brinshared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); + + return state->bs_reltuples; +} + +/* + * Within leader, wait for end of heap scan and merge per-worker results. + * + * After waiting for all workers to finish, merge the per-worker results into + * the complete index. The results from each worker are sorted by block number + * (start of the page range). While combinig the per-worker results we merge + * summaries for the same page range, and also fill-in empty summaries for + * ranges without any tuples. + * + * Returns the total number of heap tuples scanned. + */ +static double +_brin_parallel_merge(BrinBuildState *state) +{ + BrinTuple *btup; + BrinMemTuple *memtuple = NULL; + Size tuplen; + BlockNumber prevblkno = InvalidBlockNumber; + MemoryContext rangeCxt, + oldCxt; + double reltuples; + + /* wait for workers to scan table and produce partial results */ + reltuples = _brin_parallel_heapscan(state); /* do the actual sort in the leader */ tuplesort_performsort(state->bs_sortstate); @@ -2569,7 +2646,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL) { /* Ranges should be multiples of pages_per_range for the index. */ - Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); + Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0); /* * Do we need to union summaries for the same page range? @@ -2665,20 +2742,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) MemoryContextSwitchTo(oldCxt); MemoryContextDelete(rangeCxt); - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - -cleanup: - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); - DestroyParallelContext(brinleader->pcxt); - ExitParallelMode(); + return reltuples; } /* |