diff options
Diffstat (limited to 'src/backend/access/brin/brin.c')
-rw-r--r-- | src/backend/access/brin/brin.c | 886 |
1 files changed, 871 insertions, 15 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 14be939ad82..23f081389b2 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -33,6 +33,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" @@ -40,7 +41,119 @@ #include "utils/index_selfuncs.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) + +/* + * Status record for spooling/sorting phase. + */ +typedef struct BrinSpool +{ + Tuplesortstate *sortstate; /* state data for tuplesort.c */ + Relation heap; + Relation index; +} BrinSpool; + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct BrinShared +{ + /* + * These fields are not modified during the build. They primarily exist + * for the benefit of worker processes that need to create state + * corresponding to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + BlockNumber pagesPerRange; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * results built by the workers (and before leader can write the data into + * the index). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to BRIN index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} BrinShared; + +/* + * Return pointer to a BrinShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromBrinShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct BrinLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * brinshared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * snapshot is the snapshot used by the scan iff an MVCC snapshot is + * required. + */ + BrinShared *brinshared; + Sharedsort *sharedsort; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} BrinLeader; /* * We use a BrinBuildState during initial construction of a BRIN index. @@ -49,7 +162,8 @@ typedef struct BrinBuildState { Relation bs_irel; - int bs_numtuples; + double bs_numtuples; + double bs_reltuples; Buffer bs_currentInsertBuf; BlockNumber bs_pagesPerRange; BlockNumber bs_currRangeStart; @@ -57,9 +171,19 @@ typedef struct BrinBuildState BrinRevmap *bs_rmAccess; BrinDesc *bs_bdesc; BrinMemTuple *bs_dtuple; + BrinTuple *bs_emptyTuple; Size bs_emptyTupleLen; MemoryContext bs_context; + + /* + * bs_leader is only present when a parallel index build is performed, and + * only in the leader process. (Actually, only the leader process has a + * BrinBuildState.) + */ + BrinLeader *bs_leader; + int bs_worker_id; + BrinSpool *bs_spool; } BrinBuildState; /* @@ -94,6 +218,7 @@ static void terminate_brin_buildstate(BrinBuildState *state); static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange, bool include_partial, double *numSummarized, double *numExisting); static void form_and_insert_tuple(BrinBuildState *state); +static void form_and_spill_tuple(BrinBuildState *state); static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b); static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy); @@ -103,6 +228,20 @@ static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys); static void brin_fill_empty_ranges(BrinBuildState *state, BlockNumber prevRange, BlockNumber maxRange); +/* parallel index builds */ +static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state); +static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, + Relation heap, Relation index); +static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, + BrinSpool *brinspool, + BrinShared *brinshared, + Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress); + /* * BRIN handler function: return IndexAmRoutine with access method parameters * and callbacks. @@ -127,6 +266,7 @@ brinhandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = false; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = true; @@ -883,6 +1023,65 @@ brinbuildCallback(Relation index, } /* + * Per-heap-tuple callback for table_index_build_scan with parallelism. + * + * A version of the callback used by parallel index builds. The main difference + * is that instead of writing the BRIN tuples into the index, we write them + * into a shared tuplesort, and leave the insertion up to the leader (which may + * reorder them a bit etc.). The callback also does not generate empty ranges, + * those will be added by the leader when merging results from workers. + */ +static void +brinbuildCallbackParallel(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate) +{ + BrinBuildState *state = (BrinBuildState *) brstate; + BlockNumber thisblock; + + thisblock = ItemPointerGetBlockNumber(tid); + + /* + * If we're in a block that belongs to a future range, summarize what + * we've got and start afresh. Note the scan might have skipped many + * pages, if they were devoid of live tuples; we do not create emptry BRIN + * ranges here - the leader is responsible for filling them in. + */ + if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1) + { + + BRIN_elog((DEBUG2, + "brinbuildCallback: completed a range: %u--%u", + state->bs_currRangeStart, + state->bs_currRangeStart + state->bs_pagesPerRange)); + + /* create the index tuple and write it into the tuplesort */ + form_and_spill_tuple(state); + + /* + * Set state to correspond to the next range (for this block). + * + * This skips ranges that are either empty (and so we don't get any + * tuples to summarize), or processed by other workers. We can't + * differentiate those cases here easily, so we leave it up to the + * leader to fill empty ranges where needed. + */ + state->bs_currRangeStart + = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange); + + /* re-initialize state for it */ + brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc); + } + + /* Accumulate the current tuple into the running state */ + (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple, + values, isnull); +} + +/* * brinbuild() -- build a new BRIN index. */ IndexBuildResult * @@ -944,29 +1143,105 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) state = initialize_brin_buildstate(index, revmap, pagesPerRange, RelationGetNumberOfBlocks(heap)); + state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + state->bs_spool->heap = heap; + state->bs_spool->index = index; + /* - * Now scan the relation. No syncscan allowed here because we want the - * heap blocks in physical order. + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent on + * maintenance_work_mem, requiring 32MB for each worker. That makes sense + * for btree, but not for BRIN, which can do away with much less memory. + * So maybe make that somehow less strict, optionally? */ - reltuples = table_index_build_scan(heap, index, indexInfo, false, true, - brinbuildCallback, (void *) state, NULL); - - /* process the final batch */ - form_and_insert_tuple(state); + if (indexInfo->ii_ParallelWorkers > 0) + _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); /* - * Backfill the final ranges with empty data. + * Now scan the relation. No syncscan allowed here because we want the + * heap blocks in physical order. * - * This saves us from doing what amounts to full table scans when the - * index with a predicate like WHERE (nonnull_column IS NULL), or other - * very selective predicates. + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state */ - brin_fill_empty_ranges(state, - state->bs_currRangeStart, - state->bs_maxRangeStart); + if (state->bs_leader) + { + SortCoordinate coordinate; + + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + state->bs_leader->nparticipanttuplesorts; + coordinate->sharedsort = state->bs_leader->sharedsort; + + + /* + * Begin serial/leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the + * same share of maintenance_work_mem as a serial sort (it is + * generally treated in the same way as a serial sort once we return). + * Parallel worker Tuplesortstates will have received only a fraction + * of maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may + * be some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory + * here. When its tuplesort_performsort() is called (by our caller), + * and significant amounts of memory are likely to be used, all + * workers must have already freed almost all memory held by their + * Tuplesortstates (they are about to go away completely, too). The + * overall effect is that maintenance_work_mem always represents an + * absolute high watermark on the amount of memory used by a CREATE + * INDEX operation, regardless of the use of parallelism or any other + * factor. + */ + state->bs_spool->sortstate = + tuplesort_begin_index_brin(heap, index, + maintenance_work_mem, coordinate, + TUPLESORT_NONE); + + /* + * In parallel mode, wait for workers to complete, and then read all + * tuples from the shared tuplesort and insert them into the index. + */ + _brin_end_parallel(state->bs_leader, state); + } + else /* no parallel index build */ + { + reltuples = table_index_build_scan(heap, index, indexInfo, false, true, + brinbuildCallback, (void *) state, NULL); + + /* + * process the final batch + * + * XXX Note this does not update state->bs_currRangeStart, i.e. it + * stays set to the last range added to the index. This is OK, because + * that's what brin_fill_empty_ranges expects. + */ + form_and_insert_tuple(state); + + /* + * Backfill the final ranges with empty data. + * + * This saves us from doing what amounts to full table scans when the + * index with a predicate like WHERE (nonnull_column IS NULL), or + * other very selective predicates. + */ + brin_fill_empty_ranges(state, + state->bs_currRangeStart, + state->bs_maxRangeStart); + + /* track the number of relation tuples */ + state->bs_reltuples = reltuples; + } /* release resources */ idxtuples = state->bs_numtuples; + reltuples = state->bs_reltuples; brinRevmapTerminate(state->bs_rmAccess); terminate_brin_buildstate(state); @@ -1387,12 +1662,19 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap, state->bs_irel = idxRel; state->bs_numtuples = 0; + state->bs_reltuples = 0; state->bs_currentInsertBuf = InvalidBuffer; state->bs_pagesPerRange = pagesPerRange; state->bs_currRangeStart = 0; state->bs_rmAccess = revmap; state->bs_bdesc = brin_build_desc(idxRel); state->bs_dtuple = brin_new_memtuple(state->bs_bdesc); + state->bs_leader = NULL; + state->bs_worker_id = 0; + state->bs_spool = NULL; + state->bs_context = CurrentMemoryContext; + state->bs_emptyTuple = NULL; + state->bs_emptyTupleLen = 0; /* Remember the memory context to use for an empty tuple, if needed. */ state->bs_context = CurrentMemoryContext; @@ -1702,6 +1984,32 @@ form_and_insert_tuple(BrinBuildState *state) } /* + * Given a deformed tuple in the build state, convert it into the on-disk + * format and write it to a (shared) tuplesort (the leader will insert it + * into the index later). + */ +static void +form_and_spill_tuple(BrinBuildState *state) +{ + BrinTuple *tup; + Size size; + + /* don't insert empty tuples in parallel build */ + if (state->bs_dtuple->bt_empty_range) + return; + + tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart, + state->bs_dtuple, &size); + + /* write the BRIN tuple to the tuplesort */ + tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size); + + state->bs_numtuples++; + + pfree(tup); +} + +/* * Given two deformed tuples, adjust the first one so that it's consistent * with the summary values in both. */ @@ -2021,6 +2329,554 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys) return true; } +static void +_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estbrinshared; + Size estsort; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of brin + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", + request); + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. + */ + estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + shm_toc_estimate_keys(&pcxt->estimator, 2); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); + /* Initialize immutable state */ + brinshared->heaprelid = RelationGetRelid(heap); + brinshared->indexrelid = RelationGetRelid(index); + brinshared->isconcurrent = isconcurrent; + brinshared->scantuplesortstates = scantuplesortstates; + brinshared->pagesPerRange = buildstate->bs_pagesPerRange; + ConditionVariableInit(&brinshared->workersdonecv); + SpinLockInit(&brinshared->mutex); + + /* Initialize mutable state */ + brinshared->nparticipantsdone = 0; + brinshared->reltuples = 0.0; + brinshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromBrinShared(brinshared), + snapshot); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + brinleader->pcxt = pcxt; + brinleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + brinleader->nparticipanttuplesorts++; + brinleader->brinshared = brinshared; + brinleader->sharedsort = sharedsort; + brinleader->snapshot = snapshot; + brinleader->walusage = walusage; + brinleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _brin_end_parallel(brinleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bs_leader = brinleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _brin_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) +{ + int i; + BrinTuple *btup; + BrinMemTuple *memtuple = NULL; + Size tuplen; + BrinShared *brinshared = brinleader->brinshared; + BlockNumber prevblkno = InvalidBlockNumber; + BrinSpool *spool; + MemoryContext rangeCxt, + oldCxt; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(brinleader->pcxt); + + /* + * If we didn't actually launch workers, we still have to make sure to + * exit parallel mode. + */ + if (!state) + goto cleanup; + + /* copy the data into leader state (we have to wait for the workers ) */ + state->bs_reltuples = brinshared->reltuples; + state->bs_numtuples = brinshared->indtuples; + + /* do the actual sort in the leader */ + spool = state->bs_spool; + tuplesort_performsort(spool->sortstate); + + /* + * Initialize BrinMemTuple we'll use to union summaries from workers (in + * case they happened to produce parts of the same paga range). + */ + memtuple = brin_new_memtuple(state->bs_bdesc); + + /* + * Create a memory context we'll reset to combine results for a single + * page range (received from the workers). We don't expect huge number of + * overlaps under regular circumstances, because for large tables the + * chunk size is likely larger than the BRIN page range), but it can + * happen, and the union functions may do all kinds of stuff. So we better + * reset the context once in a while. + */ + rangeCxt = AllocSetContextCreate(CurrentMemoryContext, + "brin union", + ALLOCSET_DEFAULT_SIZES); + oldCxt = MemoryContextSwitchTo(rangeCxt); + + /* + * Read the BRIN tuples from the shared tuplesort, sorted by block number. + * That probably gives us an index that is cheaper to scan, thanks to + * mostly getting data from the same index page as before. + */ + while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL) + { + /* Ranges should be multiples of pages_per_range for the index. */ + Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); + + /* + * Do we need to union summaries for the same page range? + * + * If this is the first brin tuple we read, then just deform it into + * the memtuple, and continue with the next one from tuplesort. We + * however may need to insert empty summaries into the index. + * + * If it's the same block as the last we saw, we simply union the brin + * tuple into it, and we're done - we don't even need to insert empty + * ranges, because that was done earlier when we saw the first brin + * tuple (for this range). + * + * Finally, if it's not the first brin tuple, and it's not the same + * page range, we need to do the insert and then deform the tuple into + * the memtuple. Then we'll insert empty ranges before the new brin + * tuple, if needed. + */ + if (prevblkno == InvalidBlockNumber) + { + /* First brin tuples, just deform into memtuple. */ + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + else if (memtuple->bt_blkno == btup->bt_blkno) + { + /* + * Not the first brin tuple, but same page range as the previous + * one, so we can merge it into the memtuple. + */ + union_tuples(state->bs_bdesc, memtuple, btup); + continue; + } + else + { + BrinTuple *tmp; + Size len; + + /* + * We got brin tuple for a different page range, so form a brin + * tuple from the memtuple, insert it, and re-init the memtuple + * from the new brin tuple. + */ + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + /* + * Reset the per-output-range context. This frees all the memory + * possibly allocated by the union functions, and also the BRIN + * tuple we just formed and inserted. + */ + MemoryContextReset(rangeCxt); + + memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple); + + /* continue to insert empty pages before thisblock */ + } + + /* Fill empty ranges for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno); + + prevblkno = btup->bt_blkno; + } + + tuplesort_end(spool->sortstate); + + /* Fill the BRIN tuple for the last page range with data. */ + if (prevblkno != InvalidBlockNumber) + { + BrinTuple *tmp; + Size len; + + tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno, + memtuple, &len); + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len); + + pfree(tmp); + } + + /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */ + brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart); + + /* + * Switch back to the original memory context, and destroy the one we + * created to isolate the union_tuple calls. + */ + MemoryContextSwitchTo(oldCxt); + MemoryContextDelete(rangeCxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + +cleanup: + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(brinleader->snapshot)) + UnregisterSnapshot(brinleader->snapshot); + DestroyParallelContext(brinleader->pcxt); + ExitParallelMode(); +} + +/* + * Returns size of shared memory required to store state for a parallel + * brin index build based on the snapshot its parallel scan will use. + */ +static Size +_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(BrinShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index) +{ + BrinLeader *brinleader = buildstate->bs_leader; + int sortmem; + + /* Allocate memory and initialize private spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = buildstate->bs_spool->heap; + buildstate->bs_spool->index = buildstate->bs_spool->index; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared, + brinleader->sharedsort, heap, index, sortmem, true); +} + +/* + * Perform a worker's portion of a parallel sort. + * + * This generates a tuplesort for passed btspool, and a second tuplesort + * state if a second btspool is need (i.e. for unique index builds). All + * other spool fields should already be set when this is called. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool, + BrinShared *brinshared, Sharedsort *sharedsort, + Relation heap, Relation index, int sortmem, + bool progress) +{ + SortCoordinate coordinate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Begin "partial" tuplesort */ + brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap, + brinspool->index, + sortmem, coordinate, + TUPLESORT_NONE); + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = brinshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromBrinShared(brinshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + brinbuildCallbackParallel, state, scan); + + /* insert the last item */ + form_and_spill_tuple(state); + + /* sort the BRIN ranges built by this worker */ + tuplesort_performsort(brinspool->sortstate); + + state->bs_reltuples += reltuples; + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&brinshared->mutex); + brinshared->nparticipantsdone++; + brinshared->reltuples += state->bs_reltuples; + brinshared->indtuples += state->bs_numtuples; + SpinLockRelease(&brinshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&brinshared->workersdonecv); + + tuplesort_end(brinspool->sortstate); +} + +/* + * Perform work within a launched parallel process. + */ +void +_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinBuildState *buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int sortmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up brin shared state */ + brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!brinshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(brinshared->heaprelid, heapLockmode); + indexRel = index_open(brinshared->indexrelid, indexLockmode); + + buildstate = initialize_brin_buildstate(indexRel, NULL, + brinshared->pagesPerRange, + InvalidBlockNumber); + + /* Initialize worker's own spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = heapRel; + buildstate->bs_spool->index = indexRel; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, + brinshared, sharedsort, + heapRel, indexRel, sortmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + /* * brin_build_empty_tuple * Maybe initialize a BRIN tuple representing empty range. |