aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/brin/brin.c57
1 files changed, 49 insertions, 8 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 2622a7efb13..795354ea1b7 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -957,7 +957,8 @@ terminate_brin_buildstate(BrinBuildState *state)
}
/*
- * Summarize the given page range of the given index.
+ * On the given BRIN index, summarize the heap page range that corresponds
+ * to the heap block number given.
*
* This routine can run in parallel with insertions into the heap. To avoid
* missing those values from the summary tuple, we first insert a placeholder
@@ -967,6 +968,12 @@ terminate_brin_buildstate(BrinBuildState *state)
* update of the index value happens in a loop, so that if somebody updates
* the placeholder tuple after we read it, we detect the case and try again.
* This ensures that the concurrently inserted tuples are not lost.
+ *
+ * A further corner case is this routine being asked to summarize the partial
+ * range at the end of the table. heapNumBlocks is the (possibly outdated)
+ * table size; if we notice that the requested range lies beyond that size,
+ * we re-compute the table size after inserting the placeholder tuple, to
+ * avoid missing pages that were appended recently.
*/
static void
summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
@@ -988,6 +995,33 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
heapBlk, phtup, phsz);
/*
+ * Compute range end. We hold ShareUpdateExclusive lock on table, so it
+ * cannot shrink concurrently (but it can grow).
+ */
+ Assert(heapBlk % state->bs_pagesPerRange == 0);
+ if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
+ {
+ /*
+ * If we're asked to scan what we believe to be the final range on the
+ * table (i.e. a range that might be partial) we need to recompute our
+ * idea of what the latest page is after inserting the placeholder
+ * tuple. Anyone that grows the table later will update the
+ * placeholder tuple, so it doesn't matter that we won't scan these
+ * pages ourselves. Careful: the table might have been extended
+ * beyond the current range, so clamp our result.
+ *
+ * Fortunately, this should occur infrequently.
+ */
+ scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
+ state->bs_pagesPerRange);
+ }
+ else
+ {
+ /* Easy case: range is known to be complete */
+ scanNumBlks = state->bs_pagesPerRange;
+ }
+
+ /*
* Execute the partial heap scan covering the heap blocks in the specified
* page range, summarizing the heap tuples in it. This scan stops just
* short of brinbuildCallback creating the new index entry.
@@ -997,8 +1031,6 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
* by transactions that are still in progress, among other corner cases.
*/
state->bs_currRangeStart = heapBlk;
- scanNumBlks = heapBlk + state->bs_pagesPerRange <= heapNumBlks ?
- state->bs_pagesPerRange : heapNumBlks - heapBlk;
IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
heapBlk, scanNumBlks,
brinbuildCallback, (void *) state);
@@ -1074,25 +1106,34 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
BrinBuildState *state = NULL;
IndexInfo *indexInfo = NULL;
BlockNumber heapNumBlocks;
- BlockNumber heapBlk;
BlockNumber pagesPerRange;
Buffer buf;
+ BlockNumber startBlk;
revmap = brinRevmapInitialize(index, &pagesPerRange);
+ /* determine range of pages to process: always start from the beginning */
+ heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
+ startBlk = 0;
+
/*
* Scan the revmap to find unsummarized items.
*/
buf = InvalidBuffer;
- heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
- for (heapBlk = 0; heapBlk < heapNumBlocks; heapBlk += pagesPerRange)
+ for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
{
BrinTuple *tup;
OffsetNumber off;
+ /*
+ * Go away now if we think the next range is partial.
+ */
+ if (startBlk + pagesPerRange > heapNumBlocks)
+ break;
+
CHECK_FOR_INTERRUPTS();
- tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
+ tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
BUFFER_LOCK_SHARE);
if (tup == NULL)
{
@@ -1105,7 +1146,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
pagesPerRange);
indexInfo = BuildIndexInfo(index);
}
- summarize_range(indexInfo, state, heapRel, heapBlk, heapNumBlocks);
+ summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
/* and re-initialize state for the next range */
brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);