diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 153 |
1 files changed, 142 insertions, 11 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f6d75c6e2b2..c561e8f960b 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.270 2008/11/19 10:34:50 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.271 2008/12/03 13:05:22 heikki Exp $ * * * INTERFACE ROUTINES @@ -47,6 +47,7 @@ #include "access/transam.h" #include "access/tuptoaster.h" #include "access/valid.h" +#include "access/visibilitymap.h" #include "access/xact.h" #include "access/xlogutils.h" #include "catalog/catalog.h" @@ -195,6 +196,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) int ntup; OffsetNumber lineoff; ItemId lpp; + bool all_visible; Assert(page < scan->rs_nblocks); @@ -233,20 +235,32 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) lines = PageGetMaxOffsetNumber(dp); ntup = 0; + /* + * If the all-visible flag indicates that all tuples on the page are + * visible to everyone, we can skip the per-tuple visibility tests. + */ + all_visible = PageIsAllVisible(dp); + for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff); lineoff <= lines; lineoff++, lpp++) { if (ItemIdIsNormal(lpp)) { - HeapTupleData loctup; bool valid; - loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - loctup.t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(loctup.t_self), page, lineoff); + if (all_visible) + valid = true; + else + { + HeapTupleData loctup; + + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + loctup.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(loctup.t_self), page, lineoff); - valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + } if (valid) scan->rs_vistuples[ntup++] = lineoff; } @@ -1860,6 +1874,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; + bool all_visible_cleared = false; if (relation->rd_rel->relhasoids) { @@ -1920,6 +1935,12 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, RelationPutHeapTuple(relation, buffer, heaptup); + if (PageIsAllVisible(BufferGetPage(buffer))) + { + all_visible_cleared = true; + PageClearAllVisible(BufferGetPage(buffer)); + } + /* * XXX Should we set PageSetPrunable on this page ? * @@ -1943,6 +1964,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; @@ -1994,6 +2016,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, UnlockReleaseBuffer(buffer); + /* Clear the bit in the visibility map if necessary */ + if (all_visible_cleared) + visibilitymap_clear(relation, + ItemPointerGetBlockNumber(&(heaptup->t_self))); + /* * If tuple is cachable, mark it for invalidation from the caches in case * we abort. Note it is OK to do this after releasing the buffer, because @@ -2070,6 +2097,7 @@ heap_delete(Relation relation, ItemPointer tid, Buffer buffer; bool have_tuple_lock = false; bool iscombo; + bool all_visible_cleared = false; Assert(ItemPointerIsValid(tid)); @@ -2216,6 +2244,12 @@ l1: */ PageSetPrunable(page, xid); + if (PageIsAllVisible(page)) + { + all_visible_cleared = true; + PageClearAllVisible(page); + } + /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | @@ -2237,6 +2271,7 @@ l1: XLogRecPtr recptr; XLogRecData rdata[2]; + xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = tp.t_self; rdata[0].data = (char *) &xlrec; @@ -2281,6 +2316,10 @@ l1: */ CacheInvalidateHeapTuple(relation, &tp); + /* Clear the bit in the visibility map if necessary */ + if (all_visible_cleared) + visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); + /* Now we can release the buffer */ ReleaseBuffer(buffer); @@ -2388,6 +2427,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, bool have_tuple_lock = false; bool iscombo; bool use_hot_update = false; + bool all_visible_cleared = false; + bool all_visible_cleared_new = false; Assert(ItemPointerIsValid(otid)); @@ -2763,6 +2804,12 @@ l2: MarkBufferDirty(newbuf); MarkBufferDirty(buffer); + /* + * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL + * record, because log_heap_update looks at those flags to set the + * corresponding flags in the WAL record. + */ + /* XLOG stuff */ if (!relation->rd_istemp) { @@ -2778,6 +2825,18 @@ l2: PageSetTLI(BufferGetPage(buffer), ThisTimeLineID); } + /* Clear PD_ALL_VISIBLE flags */ + if (PageIsAllVisible(BufferGetPage(buffer))) + { + all_visible_cleared = true; + PageClearAllVisible(BufferGetPage(buffer)); + } + if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf))) + { + all_visible_cleared_new = true; + PageClearAllVisible(BufferGetPage(newbuf)); + } + END_CRIT_SECTION(); if (newbuf != buffer) @@ -2791,6 +2850,12 @@ l2: */ CacheInvalidateHeapTuple(relation, &oldtup); + /* Clear bits in visibility map */ + if (all_visible_cleared) + visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); + if (all_visible_cleared_new) + visibilitymap_clear(relation, BufferGetBlockNumber(newbuf)); + /* Now we can release the buffer(s) */ if (newbuf != buffer) ReleaseBuffer(newbuf); @@ -3412,6 +3477,11 @@ l3: LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); /* + * Don't update the visibility map here. Locking a tuple doesn't + * change visibility info. + */ + + /* * Now that we have successfully marked the tuple as locked, we can * release the lmgr tuple lock, if we had it. */ @@ -3916,7 +3986,9 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, xlrec.target.node = reln->rd_node; xlrec.target.tid = from; + xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf)); xlrec.newtid = newtup->t_self; + xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf)); rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; @@ -4185,13 +4257,25 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; + BlockNumber blkno; + + blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, blkno); + FreeFakeRelcacheEntry(reln); + } if (record->xl_info & XLR_BKP_BLOCK_1) return; - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); + buffer = XLogReadBuffer(xlrec->target.node, blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); @@ -4223,6 +4307,9 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; PageSetLSN(page, lsn); @@ -4249,11 +4336,22 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) Size freespace; BlockNumber blkno; + blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, blkno); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_1) return; - blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - if (record->xl_info & XLOG_HEAP_INIT_PAGE) { buffer = XLogReadBuffer(xlrec->target.node, blkno, true); @@ -4307,6 +4405,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); + + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); @@ -4347,6 +4449,18 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) uint32 newlen; Size freespace; + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, + ItemPointerGetBlockNumber(&xlrec->target.tid)); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_1) { if (samepage) @@ -4411,6 +4525,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + /* * this test is ugly, but necessary to avoid thinking that insert change * is already applied @@ -4426,6 +4543,17 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) newt:; + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->new_all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid)); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_2) return; @@ -4504,6 +4632,9 @@ newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); + if (xlrec->new_all_visible_cleared) + PageClearAllVisible(page); + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ PageSetLSN(page, lsn); |