diff options
Diffstat (limited to 'src/backend')
25 files changed, 594 insertions, 75 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 6f6f1b1b415..e64c94d3569 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -135,7 +135,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls, MemoryContext tupcxt = NULL; MemoryContext oldcxt = NULL; - revmap = brinRevmapInitialize(idxRel, &pagesPerRange); + revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL); for (;;) { @@ -152,7 +152,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls, /* normalize the block number to be the first block in the range */ heapBlk = (heapBlk / pagesPerRange) * pagesPerRange; brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL, - BUFFER_LOCK_SHARE); + BUFFER_LOCK_SHARE, NULL); /* if range is unsummarized, there's nothing to do */ if (!brtup) @@ -285,7 +285,8 @@ brinbeginscan(Relation r, int nkeys, int norderbys) scan = RelationGetIndexScan(r, nkeys, norderbys); opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque)); - opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange); + opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange, + scan->xs_snapshot); opaque->bo_bdesc = brin_build_desc(r); scan->opaque = opaque; @@ -368,7 +369,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm) MemoryContextResetAndDeleteChildren(perRangeCxt); tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf, - &off, &size, BUFFER_LOCK_SHARE); + &off, &size, BUFFER_LOCK_SHARE, + scan->xs_snapshot); if (tup) { tup = brin_copy_tuple(tup, size); @@ -647,7 +649,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) /* * Initialize our state, including the deformed tuple state. */ - revmap = brinRevmapInitialize(index, &pagesPerRange); + revmap = brinRevmapInitialize(index, &pagesPerRange, NULL); state = initialize_brin_buildstate(index, revmap, pagesPerRange); /* @@ -1045,7 +1047,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel, * the same.) */ phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf, - &offset, &phsz, BUFFER_LOCK_SHARE); + &offset, &phsz, BUFFER_LOCK_SHARE, + NULL); /* the placeholder tuple must exist */ if (phtup == NULL) elog(ERROR, "missing placeholder tuple"); @@ -1080,7 +1083,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized, BlockNumber pagesPerRange; Buffer buf; - revmap = brinRevmapInitialize(index, &pagesPerRange); + revmap = brinRevmapInitialize(index, &pagesPerRange, NULL); /* * Scan the revmap to find unsummarized items. @@ -1095,7 +1098,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized, CHECK_FOR_INTERRUPTS(); tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL, - BUFFER_LOCK_SHARE); + BUFFER_LOCK_SHARE, NULL); if (tup == NULL) { /* no revmap entry for this heap range. Summarize it. */ diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c index ce21cbabb7a..5ed867cf30b 100644 --- a/src/backend/access/brin/brin_revmap.c +++ b/src/backend/access/brin/brin_revmap.c @@ -68,7 +68,8 @@ static void revmap_physical_extend(BrinRevmap *revmap); * brinRevmapTerminate when caller is done with it. */ BrinRevmap * -brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange) +brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange, + Snapshot snapshot) { BrinRevmap *revmap; Buffer meta; @@ -77,7 +78,7 @@ brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange) meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO); LockBuffer(meta, BUFFER_LOCK_SHARE); - page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT); metadata = (BrinMetaPageData *) PageGetContents(page); revmap = palloc(sizeof(BrinRevmap)); @@ -187,7 +188,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange, */ BrinTuple * brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, - Buffer *buf, OffsetNumber *off, Size *size, int mode) + Buffer *buf, OffsetNumber *off, Size *size, int mode, + Snapshot snapshot) { Relation idxRel = revmap->rm_irel; BlockNumber mapBlk; @@ -264,7 +266,8 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, *buf = ReadBuffer(idxRel, blk); } LockBuffer(*buf, mode); - page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(*buf, snapshot, idxRel, + BGP_TEST_FOR_OLD_SNAPSHOT); /* If we land on a revmap page, start over */ if (BRIN_IS_REGULAR_PAGE(page)) diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index 13258cca0ea..e593b2bbe99 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode) * is share-locked, and stack->parent is NULL. */ GinBtreeStack * -ginFindLeafPage(GinBtree btree, bool searchMode) +ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot) { GinBtreeStack *stack; @@ -89,7 +89,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode) stack->off = InvalidOffsetNumber; - page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(stack->buffer, snapshot, btree->index, + BGP_TEST_FOR_OLD_SNAPSHOT); access = ginTraverseLock(stack->buffer, searchMode); @@ -115,8 +116,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode) stack->buffer = ginStepRight(stack->buffer, btree->index, access); stack->blkno = rightlink; - page = BufferGetPage(stack->buffer, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(stack->buffer, snapshot, btree->index, + BGP_TEST_FOR_OLD_SNAPSHOT); if (!searchMode && GinPageIsIncompleteSplit(page)) ginFinishSplit(btree, stack, false, NULL); diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c index 9c501a1af5f..ed3d9174f65 100644 --- a/src/backend/access/gin/gindatapage.c +++ b/src/backend/access/gin/gindatapage.c @@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno, { /* search for the leaf page where the first item should go to */ btree.itemptr = insertdata.items[insertdata.curitem]; - stack = ginFindLeafPage(&btree, false); + stack = ginFindLeafPage(&btree, false, NULL); ginInsertValue(&btree, stack, &insertdata, buildStats); } @@ -1830,7 +1830,8 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno, * Starts a new scan on a posting tree. */ GinBtreeStack * -ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno) +ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno, + Snapshot snapshot) { GinBtreeStack *stack; @@ -1838,7 +1839,7 @@ ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno) btree->fullScan = TRUE; - stack = ginFindLeafPage(btree, TRUE); + stack = ginFindLeafPage(btree, TRUE, snapshot); return stack; } diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c index 33683278e10..b79ba1e62af 100644 --- a/src/backend/access/gin/ginget.c +++ b/src/backend/access/gin/ginget.c @@ -73,7 +73,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry, Page page; /* Descend to the leftmost leaf page */ - stack = ginScanBeginPostingTree(&btree, index, rootPostingTree); + stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot); buffer = stack->buffer; IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */ @@ -146,7 +146,8 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack, if (moveRightIfItNeeded(btree, stack) == false) return true; - page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(stack->buffer, snapshot, btree->index, + BGP_TEST_FOR_OLD_SNAPSHOT); itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off)); /* @@ -320,7 +321,7 @@ restartScanEntry: ginPrepareEntryScan(&btreeEntry, entry->attnum, entry->queryKey, entry->queryCategory, ginstate); - stackEntry = ginFindLeafPage(&btreeEntry, true); + stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot); page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); needUnlock = TRUE; @@ -385,7 +386,7 @@ restartScanEntry: needUnlock = FALSE; stack = ginScanBeginPostingTree(&entry->btree, ginstate->index, - rootPostingTree); + rootPostingTree, snapshot); entry->buffer = stack->buffer; /* @@ -627,7 +628,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry, entry->btree.itemptr.ip_posid++; } entry->btree.fullScan = false; - stack = ginFindLeafPage(&entry->btree, true); + stack = ginFindLeafPage(&entry->btree, true, snapshot); /* we don't need the stack, just the buffer. */ entry->buffer = stack->buffer; @@ -1335,8 +1336,8 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos) ItemPointerSetInvalid(&pos->item); for (;;) { - page = BufferGetPage(pos->pendingBuffer, NULL, - NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot, + scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT); maxoff = PageGetMaxOffsetNumber(page); if (pos->firstOffset > maxoff) @@ -1516,8 +1517,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos) memset(datumExtracted + pos->firstOffset - 1, 0, sizeof(bool) * (pos->lastOffset - pos->firstOffset)); - page = BufferGetPage(pos->pendingBuffer, NULL, - NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot, + scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT); for (i = 0; i < so->nkeys; i++) { @@ -1710,7 +1711,8 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids) *ntids = 0; LockBuffer(metabuffer, GIN_SHARE); - page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation, + BGP_TEST_FOR_OLD_SNAPSHOT); blkno = GinPageGetMeta(page)->head; /* diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 126501149d2..d4bfed06bcc 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate, ginPrepareEntryScan(&btree, attnum, key, category, ginstate); - stack = ginFindLeafPage(&btree, false); + stack = ginFindLeafPage(&btree, false, NULL); page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); if (btree.findItem(&btree, stack)) diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 13a039947ba..24af868466d 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -336,7 +336,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances, buffer = ReadBuffer(scan->indexRelation, pageItem->blkno); LockBuffer(buffer, GIST_SHARE); gistcheckpage(scan->indexRelation, buffer); - page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT); opaque = GistPageGetOpaque(page); /* diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index a5032e1251d..03cd0b006c3 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -278,7 +278,8 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir) buf = so->hashso_curbuf; Assert(BufferIsValid(buf)); - page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, scan->xs_snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); maxoffnum = PageGetMaxOffsetNumber(page); for (offnum = ItemPointerGetOffsetNumber(current); offnum <= maxoffnum; diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c index dd1f464e53a..4c14362c6fe 100644 --- a/src/backend/access/hash/hashsearch.c +++ b/src/backend/access/hash/hashsearch.c @@ -188,8 +188,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir) /* Read the metapage */ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE); - page = BufferGetPage(metabuf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(metabuf, scan->xs_snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); metap = HashPageGetMeta(page); /* @@ -242,8 +242,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir) /* Fetch the primary bucket page for the bucket */ buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE); - page = BufferGetPage(buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, scan->xs_snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (HashPageOpaque) PageGetSpecialPointer(page); Assert(opaque->hasho_bucket == bucket); @@ -350,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) _hash_readnext(rel, &buf, &page, &opaque); if (BufferIsValid(buf)) { + TestForOldSnapshot(scan->xs_snapshot, rel, page); maxoff = PageGetMaxOffsetNumber(page); offnum = _hash_binsearch(page, so->hashso_sk_hash); } @@ -391,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) _hash_readprev(rel, &buf, &page, &opaque); if (BufferIsValid(buf)) { + TestForOldSnapshot(scan->xs_snapshot, rel, page); maxoff = PageGetMaxOffsetNumber(page); offnum = _hash_binsearch_last(page, so->hashso_sk_hash); } diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 66b23540fe2..29fd31a819d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -394,7 +394,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) */ LockBuffer(buffer, BUFFER_LOCK_SHARE); - dp = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(buffer, snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = PageGetMaxOffsetNumber(dp); ntup = 0; @@ -537,7 +538,7 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, BGP_TEST_FOR_OLD_SNAPSHOT); lines = PageGetMaxOffsetNumber(dp); /* page and lineoff now reference the physically next tid */ @@ -582,7 +583,8 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = PageGetMaxOffsetNumber(dp); if (!scan->rs_inited) @@ -616,7 +618,8 @@ heapgettup(HeapScanDesc scan, heapgetpage(scan, page); /* Since the tuple was previously fetched, needn't lock page here */ - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -745,7 +748,8 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = PageGetMaxOffsetNumber((Page) dp); linesleft = lines; if (backward) @@ -832,7 +836,8 @@ heapgettup_pagemode(HeapScanDesc scan, lineindex = scan->rs_cindex + 1; } - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = scan->rs_ntuples; /* page and lineindex now reference the next visible tid */ @@ -875,7 +880,8 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_cblock; /* current page */ } - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = scan->rs_ntuples; if (!scan->rs_inited) @@ -908,7 +914,8 @@ heapgettup_pagemode(HeapScanDesc scan, heapgetpage(scan, page); /* Since the tuple was previously fetched, needn't lock page here */ - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -1027,7 +1034,8 @@ heapgettup_pagemode(HeapScanDesc scan, heapgetpage(scan, page); - dp = BufferGetPage(scan->rs_cbuf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + dp = BufferGetPage(scan->rs_cbuf, scan->rs_snapshot, scan->rs_rd, + BGP_TEST_FOR_OLD_SNAPSHOT); lines = scan->rs_ntuples; linesleft = lines; if (backward) @@ -1871,7 +1879,7 @@ heap_fetch(Relation relation, * Need share lock on buffer to examine tuple commit status. */ LockBuffer(buffer, BUFFER_LOCK_SHARE); - page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buffer, snapshot, relation, BGP_TEST_FOR_OLD_SNAPSHOT); /* * We'd better check for out-of-range offnum in case of VACUUM since the @@ -2200,7 +2208,8 @@ heap_get_latest_tid(Relation relation, */ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid)); LockBuffer(buffer, BUFFER_LOCK_SHARE); - page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buffer, snapshot, relation, + BGP_TEST_FOR_OLD_SNAPSHOT); /* * Check for bogus item number. This is not treated as an error diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c index 19201b0bca5..ce073ccdc23 100644 --- a/src/backend/access/heap/pruneheap.c +++ b/src/backend/access/heap/pruneheap.c @@ -92,12 +92,21 @@ heap_page_prune_opt(Relation relation, Buffer buffer) * need to use the horizon that includes slots, otherwise the data-only * horizon can be used. Note that the toast relation of user defined * relations are *not* considered catalog relations. + * + * It is OK to apply the old snapshot limit before acquiring the cleanup + * lock because the worst that can happen is that we are not quite as + * aggressive about the cleanup (by however many transaction IDs are + * consumed between this point and acquiring the lock). This allows us to + * save significant overhead in the case where the page is found not to be + * prunable. */ if (IsCatalogRelation(relation) || RelationIsAccessibleInLogicalDecoding(relation)) OldestXmin = RecentGlobalXmin; else - OldestXmin = RecentGlobalDataXmin; + OldestXmin = + TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin, + relation); Assert(TransactionIdIsValid(OldestXmin)); diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index bf7a8175517..3796656e177 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -119,7 +119,7 @@ _bt_doinsert(Relation rel, IndexTuple itup, top: /* find the first page containing this key */ - stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE); + stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL); offset = InvalidOffsetNumber; @@ -135,7 +135,7 @@ top: * precise description. */ buf = _bt_moveright(rel, buf, natts, itup_scankey, false, - true, stack, BT_WRITE); + true, stack, BT_WRITE, NULL); /* * If we're not allowing duplicates, make sure the key isn't already in @@ -1682,7 +1682,8 @@ _bt_insert_parent(Relation rel, elog(DEBUG2, "concurrent ROOT page split"); lpageop = (BTPageOpaque) PageGetSpecialPointer(page); /* Find the leftmost page at the next level up */ - pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false); + pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false, + NULL); /* Set up a phony stack entry pointing there */ stack = &fakestack; stack->bts_blkno = BufferGetBlockNumber(pbuf); diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 36b18047615..9ba61d5fe13 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -1255,7 +1255,7 @@ _bt_pagedel(Relation rel, Buffer buf) itup_scankey = _bt_mkscankey(rel, targetkey); /* find the leftmost leaf page containing this key */ stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, - false, &lbuf, BT_READ); + false, &lbuf, BT_READ, NULL); /* don't need a pin on the page */ _bt_relbuf(rel, lbuf); diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 83f790f7917..470bab0c521 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -79,6 +79,10 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp) * address of the leaf-page buffer, which is read-locked and pinned. * No locks are held on the parent pages, however! * + * If the snapshot parameter is not NULL, "old snapshot" checking will take + * place during the descent through the tree. This is not needed when + * positioning for an insert or delete, so NULL is used for those cases. + * * NOTE that the returned buffer is read-locked regardless of the access * parameter. However, access = BT_WRITE will allow an empty root page * to be created and returned. When access = BT_READ, an empty index @@ -87,7 +91,7 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp) */ BTStack _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, - Buffer *bufP, int access) + Buffer *bufP, int access, Snapshot snapshot) { BTStack stack_in = NULL; @@ -126,7 +130,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, */ *bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey, (access == BT_WRITE), stack_in, - BT_READ); + BT_READ, snapshot); /* if this is a leaf page, we're done */ page = BufferGetPage(*bufP, NULL, NULL, BGP_NO_SNAPSHOT_TEST); @@ -199,6 +203,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, * On entry, we have the buffer pinned and a lock of the type specified by * 'access'. If we move right, we release the buffer and lock and acquire * the same on the right sibling. Return value is the buffer we stop at. + * + * If the snapshot parameter is not NULL, "old snapshot" checking will take + * place during the descent through the tree. This is not needed when + * positioning for an insert or delete, so NULL is used for those cases. */ Buffer _bt_moveright(Relation rel, @@ -208,7 +216,8 @@ _bt_moveright(Relation rel, bool nextkey, bool forupdate, BTStack stack, - int access) + int access, + Snapshot snapshot) { Page page; BTPageOpaque opaque; @@ -233,7 +242,7 @@ _bt_moveright(Relation rel, for (;;) { - page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (P_RIGHTMOST(opaque)) @@ -972,7 +981,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * Use the manufactured insertion scan key to descend the tree and * position ourselves on the target leaf page. */ - stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ); + stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ, + scan->xs_snapshot); /* don't need to keep the stack around... */ _bt_freestack(stack); @@ -1337,8 +1347,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* step right one page */ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); /* check for deleted page */ - page = BufferGetPage(so->currPos.buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) { @@ -1412,8 +1422,8 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) * it's not half-dead and contains matching tuples. Else loop back * and do it all again. */ - page = BufferGetPage(so->currPos.buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(so->currPos.buf, scan->xs_snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) { @@ -1476,7 +1486,7 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot) /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); buf = _bt_getbuf(rel, blkno, BT_READ); - page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); /* @@ -1502,14 +1512,14 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot) break; blkno = opaque->btpo_next; buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ); - page = BufferGetPage(buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); } /* Return to the original page to see what's up */ buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ); - page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (P_ISDELETED(opaque)) { @@ -1526,8 +1536,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot) RelationGetRelationName(rel)); blkno = opaque->btpo_next; buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ); - page = BufferGetPage(buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_ISDELETED(opaque)) break; @@ -1564,7 +1574,8 @@ _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot) * The returned buffer is pinned and read-locked. */ Buffer -_bt_get_endpoint(Relation rel, uint32 level, bool rightmost) +_bt_get_endpoint(Relation rel, uint32 level, bool rightmost, + Snapshot snapshot) { Buffer buf; Page page; @@ -1586,7 +1597,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost) if (!BufferIsValid(buf)) return InvalidBuffer; - page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); for (;;) @@ -1605,8 +1616,8 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost) elog(ERROR, "fell off the end of index \"%s\"", RelationGetRelationName(rel)); buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ); - page = BufferGetPage(buf, NULL, NULL, - BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buf, snapshot, rel, + BGP_TEST_FOR_OLD_SNAPSHOT); opaque = (BTPageOpaque) PageGetSpecialPointer(page); } @@ -1659,7 +1670,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) * version of _bt_search(). We don't maintain a stack since we know we * won't need it. */ - buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir)); + buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot); if (!BufferIsValid(buf)) { diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index fafdca31f39..7acd71a2911 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -341,7 +341,7 @@ redirect: } /* else new pointer points to the same page, no work needed */ - page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); + page = BufferGetPage(buffer, snapshot, index, BGP_TEST_FOR_OLD_SNAPSHOT); isnull = SpGistPageStoresNulls(page) ? true : false; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 4cb4acf33a5..93361a0c99c 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -489,7 +489,8 @@ vacuum_set_xid_limits(Relation rel, * working on a particular table at any time, and that each vacuum is * always an independent transaction. */ - *oldestXmin = GetOldestXmin(rel, true); + *oldestXmin = + TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel); Assert(TransactionIdIsNormal(*oldestXmin)); diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 3f48ef40701..d0e92b33658 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -1660,7 +1660,8 @@ should_attempt_truncation(LVRelStats *vacrelstats) possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages; if (possibly_freeable > 0 && (possibly_freeable >= REL_TRUNCATE_MINIMUM || - possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)) + possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) && + old_snapshot_threshold < 0) return true; else return false; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 9874c3eaa04..c664984d0a1 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -4114,3 +4114,43 @@ IssuePendingWritebacks(WritebackContext *context) context->nr_pending = 0; } + + +/* + * Check whether the given snapshot is too old to have safely read the given + * page from the given table. If so, throw a "snapshot too old" error. + * + * This test generally needs to be performed after every BufferGetPage() call + * that is executed as part of a scan. It is not needed for calls made for + * modifying the page (for example, to position to the right place to insert a + * new index tuple or for vacuuming). To minimize errors of omission, the + * BufferGetPage() macro accepts parameters to specify whether the test should + * be run, and supply the necessary snapshot and relation parameters. See the + * declaration of BufferGetPage() for more details. + * + * Note that a NULL snapshot argument is allowed and causes a fast return + * without error; this is to support call sites which can be called from + * either scans or index modification areas. + * + * For best performance, keep the tests that are fastest and/or most likely to + * exclude a page from old snapshot testing near the front. + */ +extern Page +TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page) +{ + Assert(relation != NULL); + + if (old_snapshot_threshold >= 0 + && (snapshot) != NULL + && (snapshot)->satisfies == HeapTupleSatisfiesMVCC + && !XLogRecPtrIsInvalid((snapshot)->lsn) + && PageGetLSN(page) > (snapshot)->lsn + && !IsCatalogRelation(relation) + && !RelationIsAccessibleInLogicalDecoding(relation) + && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp()) + ereport(ERROR, + (errcode(ERRCODE_SNAPSHOT_TOO_OLD), + errmsg("snapshot too old"))); + + return page; +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 36a04fc5708..c04b17fa8ea 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -43,6 +43,7 @@ #include "storage/procsignal.h" #include "storage/sinvaladt.h" #include "storage/spin.h" +#include "utils/snapmgr.h" shmem_startup_hook_type shmem_startup_hook = NULL; @@ -136,6 +137,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, ReplicationOriginShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); + size = add_size(size, SnapMgrShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); @@ -247,6 +249,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) /* * Set up other modules that need some shared memory space */ + SnapMgrInit(); BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 01cfa9d5f90..5bc9fd6595e 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1759,6 +1759,15 @@ GetSnapshotData(Snapshot snapshot) snapshot->regd_count = 0; snapshot->copied = false; + /* + * Capture the current time and WAL stream location in case this snapshot + * becomes old enough to need to fall back on the special "old snapshot" + * logic. + */ + snapshot->lsn = GetXLogInsertRecPtr(); + snapshot->whenTaken = GetSnapshotCurrentTimestamp(); + MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin); + return snapshot; } diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index c557cb68d0b..f8996cd21a5 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -46,3 +46,4 @@ CommitTsControlLock 38 CommitTsLock 39 ReplicationOriginLock 40 MultiXactTruncationLock 41 +OldSnapshotTimeMapLock 42 diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 49494f9cd31..be924d58bd5 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -417,6 +417,10 @@ Section: Class 58 - System Error (errors external to PostgreSQL itself) 58P01 E ERRCODE_UNDEFINED_FILE undefined_file 58P02 E ERRCODE_DUPLICATE_FILE duplicate_file +Section: Class 72 - Snapshot Failure +# (class borrowed from Oracle) +72000 E ERRCODE_SNAPSHOT_TOO_OLD snapshot_too_old + Section: Class F0 - Configuration File Error # (PostgreSQL-specific error class) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f7ed167d7f8..fb091bc4a0b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2678,6 +2678,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS, + gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."), + gettext_noop("A value of -1 disables this feature."), + GUC_UNIT_MIN + }, + &old_snapshot_threshold, + -1, -1, MINS_PER_HOUR * HOURS_PER_DAY * 60, + NULL, NULL, NULL + }, + + { {"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER, gettext_noop("Time between issuing TCP keepalives."), gettext_noop("A value of 0 uses the system default."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index bcc86e29d27..d4dd285ef0a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -166,6 +166,8 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 #max_parallel_degree = 0 # max number of worker processes per node +#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate + # (change requires restart) #------------------------------------------------------------------------------ diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index b88e0120041..19504c35987 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -46,14 +46,18 @@ #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "catalog/catalog.h" #include "lib/pairingheap.h" #include "miscadmin.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinval.h" +#include "storage/spin.h" #include "utils/builtins.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/resowner_private.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -61,6 +65,64 @@ /* + * GUC parameters + */ +int old_snapshot_threshold; /* number of minutes, -1 disables */ + +/* + * Structure for dealing with old_snapshot_threshold implementation. + */ +typedef struct OldSnapshotControlData +{ + /* + * Variables for old snapshot handling are shared among processes and are + * only allowed to move forward. + */ + slock_t mutex_current; /* protect current timestamp */ + int64 current_timestamp; /* latest snapshot timestamp */ + slock_t mutex_latest_xmin; /* protect latest snapshot xmin */ + TransactionId latest_xmin; /* latest snapshot xmin */ + slock_t mutex_threshold; /* protect threshold fields */ + int64 threshold_timestamp; /* earlier snapshot is old */ + TransactionId threshold_xid; /* earlier xid may be gone */ + + /* + * Keep one xid per minute for old snapshot error handling. + * + * Use a circular buffer with a head offset, a count of entries currently + * used, and a timestamp corresponding to the xid at the head offset. A + * count_used value of zero means that there are no times stored; a + * count_used value of old_snapshot_threshold means that the buffer is + * full and the head must be advanced to add new entries. Use timestamps + * aligned to minute boundaries, since that seems less surprising than + * aligning based on the first usage timestamp. + * + * It is OK if the xid for a given time slot is from earlier than + * calculated by adding the number of minutes corresponding to the + * (possibly wrapped) distance from the head offset to the time of the + * head entry, since that just results in the vacuuming of old tuples + * being slightly less aggressive. It would not be OK for it to be off in + * the other direction, since it might result in vacuuming tuples that are + * still expected to be there. + * + * Use of an SLRU was considered but not chosen because it is more + * heavyweight than is needed for this, and would probably not be any less + * code to implement. + * + * Persistence is not needed. + */ + int head_offset; /* subscript of oldest tracked time */ + int64 head_timestamp; /* time corresponding to head xid */ + int count_used; /* how many slots are in use */ + TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]; +} OldSnapshotControlData; + +typedef struct OldSnapshotControlData *OldSnapshotControl; + +static volatile OldSnapshotControl oldSnapshotControl; + + +/* * CurrentSnapshot points to the only snapshot taken in transaction-snapshot * mode, and to the latest one taken in a read-committed transaction. * SecondarySnapshot is a snapshot that's always up-to-date as of the current @@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL; static List *exportedSnapshots = NIL; /* Prototypes for local functions */ +static int64 AlignTimestampToMinuteBoundary(int64 ts); static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); @@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData CommandId curcid; } SerializedSnapshotData; +Size +SnapMgrShmemSize(void) +{ + Size size; + + size = offsetof(OldSnapshotControlData, xid_by_minute); + if (old_snapshot_threshold > 0) + size = add_size(size, mul_size(sizeof(TransactionId), + old_snapshot_threshold)); + + return size; +} + +/* + * Initialize for managing old snapshot detection. + */ +void +SnapMgrInit(void) +{ + bool found; + + /* + * Create or attach to the OldSnapshotControl structure. + */ + oldSnapshotControl = (OldSnapshotControl) + ShmemInitStruct("OldSnapshotControlData", + SnapMgrShmemSize(), &found); + + if (!found) + { + SpinLockInit(&oldSnapshotControl->mutex_current); + oldSnapshotControl->current_timestamp = 0; + SpinLockInit(&oldSnapshotControl->mutex_latest_xmin); + oldSnapshotControl->latest_xmin = InvalidTransactionId; + SpinLockInit(&oldSnapshotControl->mutex_threshold); + oldSnapshotControl->threshold_timestamp = 0; + oldSnapshotControl->threshold_xid = InvalidTransactionId; + oldSnapshotControl->head_offset = 0; + oldSnapshotControl->head_timestamp = 0; + oldSnapshotControl->count_used = 0; + } +} + /* * GetTransactionSnapshot * Get the appropriate snapshot for a new query in a transaction. @@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void) return false; } + +/* + * Return an int64 timestamp which is exactly on a minute boundary. + * + * If the argument is already aligned, return that value, otherwise move to + * the next minute boundary following the given time. + */ +static int64 +AlignTimestampToMinuteBoundary(int64 ts) +{ + int64 retval = ts + (USECS_PER_MINUTE - 1); + + return retval - (retval % USECS_PER_MINUTE); +} + +/* + * Get current timestamp for snapshots as int64 that never moves backward. + */ +int64 +GetSnapshotCurrentTimestamp(void) +{ + int64 now = GetCurrentIntegerTimestamp(); + + /* + * Don't let time move backward; if it hasn't advanced, use the old value. + */ + SpinLockAcquire(&oldSnapshotControl->mutex_current); + if (now <= oldSnapshotControl->current_timestamp) + now = oldSnapshotControl->current_timestamp; + else + oldSnapshotControl->current_timestamp = now; + SpinLockRelease(&oldSnapshotControl->mutex_current); + + return now; +} + +/* + * Get timestamp through which vacuum may have processed based on last stored + * value for threshold_timestamp. + * + * XXX: So far, we never trust that a 64-bit value can be read atomically; if + * that ever changes, we could get rid of the spinlock here. + */ +int64 +GetOldSnapshotThresholdTimestamp(void) +{ + int64 threshold_timestamp; + + SpinLockAcquire(&oldSnapshotControl->mutex_threshold); + threshold_timestamp = oldSnapshotControl->threshold_timestamp; + SpinLockRelease(&oldSnapshotControl->mutex_threshold); + + return threshold_timestamp; +} + +static void +SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit) +{ + SpinLockAcquire(&oldSnapshotControl->mutex_threshold); + oldSnapshotControl->threshold_timestamp = ts; + oldSnapshotControl->threshold_xid = xlimit; + SpinLockRelease(&oldSnapshotControl->mutex_threshold); +} + +/* + * TransactionIdLimitedForOldSnapshots + * + * Apply old snapshot limit, if any. This is intended to be called for page + * pruning and table vacuuming, to allow old_snapshot_threshold to override + * the normal global xmin value. Actual testing for snapshot too old will be + * based on whether a snapshot timestamp is prior to the threshold timestamp + * set in this function. + */ +TransactionId +TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, + Relation relation) +{ + if (TransactionIdIsNormal(recentXmin) + && old_snapshot_threshold >= 0 + && RelationNeedsWAL(relation) + && !IsCatalogRelation(relation) + && !RelationIsAccessibleInLogicalDecoding(relation)) + { + int64 ts = GetSnapshotCurrentTimestamp(); + TransactionId xlimit = recentXmin; + TransactionId latest_xmin = oldSnapshotControl->latest_xmin; + bool same_ts_as_threshold = false; + + /* + * Zero threshold always overrides to latest xmin, if valid. Without + * some heuristic it will find its own snapshot too old on, for + * example, a simple UPDATE -- which would make it useless for most + * testing, but there is no principled way to ensure that it doesn't + * fail in this way. Use a five-second delay to try to get useful + * testing behavior, but this may need adjustment. + */ + if (old_snapshot_threshold == 0) + { + if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin) + && TransactionIdFollows(latest_xmin, xlimit)) + xlimit = latest_xmin; + + ts -= 5 * USECS_PER_SEC; + SetOldSnapshotThresholdTimestamp(ts, xlimit); + + return xlimit; + } + + ts = AlignTimestampToMinuteBoundary(ts) + - (old_snapshot_threshold * USECS_PER_MINUTE); + + /* Check for fast exit without LW locking. */ + SpinLockAcquire(&oldSnapshotControl->mutex_threshold); + if (ts == oldSnapshotControl->threshold_timestamp) + { + xlimit = oldSnapshotControl->threshold_xid; + same_ts_as_threshold = true; + } + SpinLockRelease(&oldSnapshotControl->mutex_threshold); + + if (!same_ts_as_threshold) + { + LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED); + + if (oldSnapshotControl->count_used > 0 + && ts >= oldSnapshotControl->head_timestamp) + { + int offset; + + offset = ((ts - oldSnapshotControl->head_timestamp) + / USECS_PER_MINUTE); + if (offset > oldSnapshotControl->count_used - 1) + offset = oldSnapshotControl->count_used - 1; + offset = (oldSnapshotControl->head_offset + offset) + % old_snapshot_threshold; + xlimit = oldSnapshotControl->xid_by_minute[offset]; + + if (NormalTransactionIdFollows(xlimit, recentXmin)) + SetOldSnapshotThresholdTimestamp(ts, xlimit); + } + + LWLockRelease(OldSnapshotTimeMapLock); + } + + /* + * Failsafe protection against vacuuming work of active transaction. + * + * This is not an assertion because we avoid the spinlock for + * performance, leaving open the possibility that xlimit could advance + * and be more current; but it seems prudent to apply this limit. It + * might make pruning a tiny bit less agressive than it could be, but + * protects against data loss bugs. + */ + if (TransactionIdIsNormal(latest_xmin) + && TransactionIdPrecedes(latest_xmin, xlimit)) + xlimit = latest_xmin; + + if (NormalTransactionIdFollows(xlimit, recentXmin)) + return xlimit; + } + + return recentXmin; +} + +/* + * Take care of the circular buffer that maps time to xid. + */ +void +MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin) +{ + int64 ts; + + /* Fast exit when old_snapshot_threshold is not used. */ + if (old_snapshot_threshold < 0) + return; + + /* Keep track of the latest xmin seen by any process. */ + SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); + if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin)) + oldSnapshotControl->latest_xmin = xmin; + SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin); + + /* No further tracking needed for 0 (used for testing). */ + if (old_snapshot_threshold == 0) + return; + + /* + * We don't want to do something stupid with unusual values, but we don't + * want to litter the log with warnings or break otherwise normal + * processing for this feature; so if something seems unreasonable, just + * log at DEBUG level and return without doing anything. + */ + if (whenTaken < 0) + { + elog(DEBUG1, + "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld", + (long) whenTaken); + return; + } + if (!TransactionIdIsNormal(xmin)) + { + elog(DEBUG1, + "MaintainOldSnapshotTimeMapping called with xmin = %lu", + (unsigned long) xmin); + return; + } + + ts = AlignTimestampToMinuteBoundary(whenTaken); + + LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE); + + Assert(oldSnapshotControl->head_offset >= 0); + Assert(oldSnapshotControl->head_offset < old_snapshot_threshold); + Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0); + Assert(oldSnapshotControl->count_used >= 0); + Assert(oldSnapshotControl->count_used <= old_snapshot_threshold); + + if (oldSnapshotControl->count_used == 0) + { + /* set up first entry for empty mapping */ + oldSnapshotControl->head_offset = 0; + oldSnapshotControl->head_timestamp = ts; + oldSnapshotControl->count_used = 1; + oldSnapshotControl->xid_by_minute[0] = xmin; + } + else if (ts < oldSnapshotControl->head_timestamp) + { + /* old ts; log it at DEBUG */ + LWLockRelease(OldSnapshotTimeMapLock); + elog(DEBUG1, + "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld", + (long) whenTaken); + return; + } + else if (ts <= (oldSnapshotControl->head_timestamp + + ((oldSnapshotControl->count_used - 1) + * USECS_PER_MINUTE))) + { + /* existing mapping; advance xid if possible */ + int bucket = (oldSnapshotControl->head_offset + + ((ts - oldSnapshotControl->head_timestamp) + / USECS_PER_MINUTE)) + % old_snapshot_threshold; + + if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin)) + oldSnapshotControl->xid_by_minute[bucket] = xmin; + } + else + { + /* We need a new bucket, but it might not be the very next one. */ + int advance = ((ts - oldSnapshotControl->head_timestamp) + / USECS_PER_MINUTE); + + oldSnapshotControl->head_timestamp = ts; + + if (advance >= old_snapshot_threshold) + { + /* Advance is so far that all old data is junk; start over. */ + oldSnapshotControl->head_offset = 0; + oldSnapshotControl->count_used = 1; + oldSnapshotControl->xid_by_minute[0] = xmin; + } + else + { + /* Store the new value in one or more buckets. */ + int i; + + for (i = 0; i < advance; i++) + { + if (oldSnapshotControl->count_used == old_snapshot_threshold) + { + /* Map full and new value replaces old head. */ + int old_head = oldSnapshotControl->head_offset; + + if (old_head == (old_snapshot_threshold - 1)) + oldSnapshotControl->head_offset = 0; + else + oldSnapshotControl->head_offset = old_head + 1; + oldSnapshotControl->xid_by_minute[old_head] = xmin; + } + else + { + /* Extend map to unused entry. */ + int new_tail = (oldSnapshotControl->head_offset + + oldSnapshotControl->count_used) + % old_snapshot_threshold; + + oldSnapshotControl->count_used++; + oldSnapshotControl->xid_by_minute[new_tail] = xmin; + } + } + } + } + + LWLockRelease(OldSnapshotTimeMapLock); +} + + /* * Setup a snapshot that replaces normal catalog snapshots that allows catalog * access to behave just like it did at a certain point in the past. |