diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2007-09-20 17:56:33 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2007-09-20 17:56:33 +0000 |
commit | 282d2a03dd30804b01f8042f640d638c2ee76604 (patch) | |
tree | 004f08ce31f1bfb03ab55571ad7867babe5b3d7f /src/backend/access/heap/heapam.c | |
parent | bbf4fdc2538097bb3103806e1419ceef1f289203 (diff) | |
download | postgresql-282d2a03dd30804b01f8042f640d638c2ee76604.tar.gz postgresql-282d2a03dd30804b01f8042f640d638c2ee76604.zip |
HOT updates. When we update a tuple without changing any of its indexed
columns, and the new version can be stored on the same heap page, we no longer
generate extra index entries for the new version. Instead, index searches
follow the HOT-chain links to ensure they find the correct tuple version.
In addition, this patch introduces the ability to "prune" dead tuples on a
per-page basis, without having to do a complete VACUUM pass to recover space.
VACUUM is still needed to clean up dead index entries, however.
Pavan Deolasee, with help from a bunch of other people.
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 640 |
1 files changed, 571 insertions, 69 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 09a70d813f7..d5a2f9a43d1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.240 2007/09/12 22:10:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.241 2007/09/20 17:56:30 tgl Exp $ * * * INTERFACE ROUTINES @@ -52,6 +52,7 @@ #include "pgstat.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/relcache.h" @@ -64,6 +65,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, bool is_bitmapscan); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move); +static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, + HeapTuple oldtup, HeapTuple newtup); /* ---------------------------------------------------------------- @@ -184,6 +187,11 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) snapshot = scan->rs_snapshot; /* + * Prune and repair fragmentation for the whole page, if possible. + */ + heap_page_prune_opt(scan->rs_rd, buffer, RecentGlobalXmin); + + /* * We must hold share lock on the buffer content while examining tuple * visibility. Afterwards, however, the tuples we have found to be * visible are guaranteed good as long as we hold the buffer pin. @@ -316,7 +324,7 @@ heapgettup(HeapScanDesc scan, * forward scanners. */ scan->rs_syncscan = false; - /* start from last page of the scan */ + /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else @@ -368,6 +376,7 @@ heapgettup(HeapScanDesc scan, dp = (Page) BufferGetPage(scan->rs_cbuf); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); @@ -583,7 +592,7 @@ heapgettup_pagemode(HeapScanDesc scan, * forward scanners. */ scan->rs_syncscan = false; - /* start from last page of the scan */ + /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else @@ -632,6 +641,7 @@ heapgettup_pagemode(HeapScanDesc scan, dp = (Page) BufferGetPage(scan->rs_cbuf); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); @@ -1246,6 +1256,9 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) * for statistical purposes. (This could be the heap rel itself, an * associated index, or NULL to not count the fetch at all.) * + * heap_fetch does not follow HOT chains: only the exact TID requested will + * be fetched. + * * It is somewhat inconsistent that we ereport() on invalid block number but * return false on invalid item number. There are a couple of reasons though. * One is that the caller can relatively easily check the block number for @@ -1390,6 +1403,143 @@ heap_release_fetch(Relation relation, } /* + * heap_hot_search_buffer - search HOT chain for tuple satisfying snapshot + * + * On entry, *tid is the TID of a tuple (either a simple tuple, or the root + * of a HOT chain), and buffer is the buffer holding this tuple. We search + * for the first chain member satisfying the given snapshot. If one is + * found, we update *tid to reference that tuple's offset number, and + * return TRUE. If no match, return FALSE without modifying *tid. + * + * If all_dead is not NULL, we check non-visible tuples to see if they are + * globally dead; *all_dead is set TRUE if all members of the HOT chain + * are vacuumable, FALSE if not. + * + * Unlike heap_fetch, the caller must already have pin and (at least) share + * lock on the buffer; it is still pinned/locked at exit. Also unlike + * heap_fetch, we do not report any pgstats count; caller may do so if wanted. + */ +bool +heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, + bool *all_dead) +{ + Page dp = (Page) BufferGetPage(buffer); + TransactionId prev_xmax = InvalidTransactionId; + OffsetNumber offnum; + bool at_chain_start; + + if (all_dead) + *all_dead = true; + + Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer)); + offnum = ItemPointerGetOffsetNumber(tid); + at_chain_start = true; + + /* Scan through possible multiple members of HOT-chain */ + for (;;) + { + ItemId lp; + HeapTupleData heapTuple; + + /* check for bogus TID */ + if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp)) + break; + + lp = PageGetItemId(dp, offnum); + + /* check for unused, dead, or redirected items */ + if (!ItemIdIsNormal(lp)) + { + /* We should only see a redirect at start of chain */ + if (ItemIdIsRedirected(lp) && at_chain_start) + { + /* Follow the redirect */ + offnum = ItemIdGetRedirect(lp); + at_chain_start = false; + continue; + } + /* else must be end of chain */ + break; + } + + heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp); + heapTuple.t_len = ItemIdGetLength(lp); + + /* + * Shouldn't see a HEAP_ONLY tuple at chain start. + */ + if (at_chain_start && HeapTupleIsHeapOnly(&heapTuple)) + break; + + /* + * The xmin should match the previous xmax value, else chain is broken. + */ + if (TransactionIdIsValid(prev_xmax) && + !TransactionIdEquals(prev_xmax, + HeapTupleHeaderGetXmin(heapTuple.t_data))) + break; + + /* If it's visible per the snapshot, we must return it */ + if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer)) + { + ItemPointerSetOffsetNumber(tid, offnum); + if (all_dead) + *all_dead = false; + return true; + } + + /* + * If we can't see it, maybe no one else can either. At caller + * request, check whether all chain members are dead to all + * transactions. + */ + if (all_dead && *all_dead && + HeapTupleSatisfiesVacuum(heapTuple.t_data, RecentGlobalXmin, + buffer) != HEAPTUPLE_DEAD) + *all_dead = false; + + /* + * Check to see if HOT chain continues past this tuple; if so + * fetch the next offnum and loop around. + */ + if (HeapTupleIsHotUpdated(&heapTuple)) + { + Assert(ItemPointerGetBlockNumber(&heapTuple.t_data->t_ctid) == + ItemPointerGetBlockNumber(tid)); + offnum = ItemPointerGetOffsetNumber(&heapTuple.t_data->t_ctid); + at_chain_start = false; + prev_xmax = HeapTupleHeaderGetXmax(heapTuple.t_data); + } + else + break; /* end of chain */ + } + + return false; +} + +/* + * heap_hot_search - search HOT chain for tuple satisfying snapshot + * + * This has the same API as heap_hot_search_buffer, except that the caller + * does not provide the buffer containing the page, rather we access it + * locally. + */ +bool +heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, + bool *all_dead) +{ + bool result; + Buffer buffer; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return result; +} + +/* * heap_get_latest_tid - get the latest tid of a specified tuple * * Actually, this gets the latest version that is visible according to @@ -1594,6 +1744,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; HeapTupleHeaderSetXmin(tup->t_data, xid); HeapTupleHeaderSetCmin(tup->t_data, cid); @@ -1628,6 +1779,17 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, RelationPutHeapTuple(relation, buffer, heaptup); + /* + * XXX Should we set PageSetPrunable on this page ? + * + * The inserting transaction may eventually abort thus making this tuple + * DEAD and hence available for pruning. Though we don't want to optimize + * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the + * aborted tuple will never be pruned until next vacuum is triggered. + * + * If you do add PageSetPrunable here, add it in heap_xlog_insert too. + */ + MarkBufferDirty(buffer); /* XLOG stuff */ @@ -1904,12 +2066,21 @@ l1: START_CRIT_SECTION(); + /* + * If this transaction commits, the tuple will become DEAD sooner or + * later. Set hint bit that this page is a candidate for pruning. If + * the transaction finally aborts, the subsequent page pruning will be + * a no-op and the hint will be cleared. + */ + PageSetPrunable((Page) dp); + /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleHeaderClearHotUpdated(tp.t_data); HeapTupleHeaderSetXmax(tp.t_data, xid); HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo); /* Make sure there is no forward chain link in t_ctid */ @@ -2045,7 +2216,8 @@ simple_heap_delete(Relation relation, ItemPointer tid) * * On success, the header fields of *newtup are updated to match the new * stored tuple; in particular, newtup->t_self is set to the TID where the - * new tuple was inserted. However, any TOAST changes in the new tuple's + * new tuple was inserted, and its HEAP_ONLY_TUPLE flag is set iff a HOT + * update was done. However, any TOAST changes in the new tuple's * data are not reflected into *newtup. * * In the failure cases, the routine returns the tuple's t_ctid and t_xmax. @@ -2060,6 +2232,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, { HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); + Bitmapset *hot_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; @@ -2072,9 +2245,24 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, pagefree; bool have_tuple_lock = false; bool iscombo; + bool use_hot_update = false; Assert(ItemPointerIsValid(otid)); + /* + * Fetch the list of attributes to be checked for HOT update. This is + * wasted effort if we fail to update or have to put the new tuple on + * a different page. But we must compute the list before obtaining + * buffer lock --- in the worst case, if we are doing an update on one + * of the relevant system catalogs, we could deadlock if we try to + * fetch the list later. In any case, the relcache caches the data + * so this is usually pretty cheap. + * + * Note that we get a copy here, so we need not worry about relcache + * flush happening midway through. + */ + hot_attrs = RelationGetIndexAttrBitmap(relation); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -2208,6 +2396,7 @@ l2: UnlockReleaseBuffer(buffer); if (have_tuple_lock) UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); + bms_free(hot_attrs); return result; } @@ -2227,6 +2416,7 @@ l2: } newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED); HeapTupleHeaderSetXmin(newtup->t_data, xid); HeapTupleHeaderSetCmin(newtup->t_data, cid); @@ -2261,17 +2451,20 @@ l2: HeapTupleHasExternal(newtup) || newtup->t_len > TOAST_TUPLE_THRESHOLD); - pagefree = PageGetFreeSpace((Page) dp); + pagefree = PageGetHeapFreeSpace((Page) dp); newtupsize = MAXALIGN(newtup->t_len); if (need_toast || newtupsize > pagefree) { + /* Clear obsolete visibility flags ... */ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleClearHotUpdated(&oldtup); + /* ... and store info about transaction updating this tuple */ HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo); /* temporarily make it look not-updated */ @@ -2324,7 +2517,7 @@ l2: /* Re-acquire the lock on the old tuple's page. */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* Re-check using the up-to-date free space */ - pagefree = PageGetFreeSpace((Page) dp); + pagefree = PageGetHeapFreeSpace((Page) dp); if (newtupsize > pagefree) { /* @@ -2357,18 +2550,66 @@ l2: * one pin is held. */ + if (newbuf == buffer) + { + /* + * Since the new tuple is going into the same page, we might be able + * to do a HOT update. Check if any of the index columns have been + * changed. If not, then HOT update is possible. + */ + if (HeapSatisfiesHOTUpdate(relation, hot_attrs, &oldtup, heaptup)) + use_hot_update = true; + } + else + { + /* Set a hint that the old page could use prune/defrag */ + PageSetFull(dp); + } + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); + /* + * If this transaction commits, the old tuple will become DEAD sooner or + * later. Set hint bit that this page is a candidate for pruning. If + * the transaction finally aborts, the subsequent page pruning will be + * a no-op and the hint will be cleared. + * + * XXX Should we set hint on newbuf as well? If the transaction + * aborts, there would be a prunable tuple in the newbuf; but for now + * we choose not to optimize for aborts. Note that heap_xlog_update + * must be kept in sync if this changes. + */ + PageSetPrunable(dp); + + if (use_hot_update) + { + /* Mark the old tuple as HOT-updated */ + HeapTupleSetHotUpdated(&oldtup); + /* And mark the new tuple as heap-only */ + HeapTupleSetHeapOnly(heaptup); + /* Mark the caller's copy too, in case different from heaptup */ + HeapTupleSetHeapOnly(newtup); + } + else + { + /* Make sure tuples are correctly marked as not-HOT */ + HeapTupleClearHotUpdated(&oldtup); + HeapTupleClearHeapOnly(heaptup); + HeapTupleClearHeapOnly(newtup); + } + RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */ if (!already_marked) { + /* Clear obsolete visibility flags ... */ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + /* ... and store info about transaction updating this tuple */ HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo); } @@ -2427,7 +2668,7 @@ l2: if (have_tuple_lock) UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); - pgstat_count_heap_update(relation); + pgstat_count_heap_update(relation, use_hot_update); /* * If heaptup is a private copy, release it. Don't forget to copy t_self @@ -2439,10 +2680,120 @@ l2: heap_freetuple(heaptup); } + bms_free(hot_attrs); + return HeapTupleMayBeUpdated; } /* + * Check if the specified attribute's value is same in both given tuples. + * Subroutine for HeapSatisfiesHOTUpdate. + */ +static bool +heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, + HeapTuple tup1, HeapTuple tup2) +{ + Datum value1, value2; + bool isnull1, isnull2; + Form_pg_attribute att; + + /* + * If it's a whole-tuple reference, say "not equal". It's not really + * worth supporting this case, since it could only succeed after a + * no-op update, which is hardly a case worth optimizing for. + */ + if (attrnum == 0) + return false; + + /* + * Likewise, automatically say "not equal" for any system attribute + * other than OID and tableOID; we cannot expect these to be consistent + * in a HOT chain, or even to be set correctly yet in the new tuple. + */ + if (attrnum < 0) + { + if (attrnum != ObjectIdAttributeNumber && + attrnum != TableOidAttributeNumber) + return false; + } + + /* + * Extract the corresponding values. XXX this is pretty inefficient + * if there are many indexed columns. Should HeapSatisfiesHOTUpdate + * do a single heap_deform_tuple call on each tuple, instead? But + * that doesn't work for system columns ... + */ + value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1); + value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2); + + /* + * If one value is NULL and other is not, then they are certainly + * not equal + */ + if (isnull1 != isnull2) + return false; + + /* + * If both are NULL, they can be considered equal. + */ + if (isnull1) + return true; + + /* + * We do simple binary comparison of the two datums. This may be overly + * strict because there can be multiple binary representations for the + * same logical value. But we should be OK as long as there are no false + * positives. Using a type-specific equality operator is messy because + * there could be multiple notions of equality in different operator + * classes; furthermore, we cannot safely invoke user-defined functions + * while holding exclusive buffer lock. + */ + if (attrnum <= 0) + { + /* The only allowed system columns are OIDs, so do this */ + return (DatumGetObjectId(value1) == DatumGetObjectId(value2)); + } + else + { + Assert(attrnum <= tupdesc->natts); + att = tupdesc->attrs[attrnum - 1]; + return datumIsEqual(value1, value2, att->attbyval, att->attlen); + } +} + +/* + * Check if the old and new tuples represent a HOT-safe update. To be able + * to do a HOT update, we must not have changed any columns used in index + * definitions. + * + * The set of attributes to be checked is passed in (we dare not try to + * compute it while holding exclusive buffer lock...) NOTE that hot_attrs + * is destructively modified! That is OK since this is invoked at most once + * by heap_update(). + * + * Returns true if safe to do HOT update. + */ +static bool +HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, + HeapTuple oldtup, HeapTuple newtup) +{ + int attrnum; + + while ((attrnum = bms_first_member(hot_attrs)) >= 0) + { + /* Adjust for system attributes */ + attrnum += FirstLowInvalidHeapAttributeNumber; + + /* If the attribute value has changed, we can't do HOT update */ + if (!heap_tuple_attr_equals(RelationGetDescr(relation), attrnum, + oldtup, newtup)) + return false; + } + + return true; +} + +/* * simple_heap_update - replace a tuple * * This routine may be used to update a tuple when concurrent updates of @@ -2865,6 +3216,7 @@ l3: * avoids possibly generating a useless combo CID. */ tuple->t_data->t_infomask = new_infomask; + HeapTupleHeaderClearHotUpdated(tuple->t_data); HeapTupleHeaderSetXmax(tuple->t_data, xid); /* Make sure there is no forward chain link in t_ctid */ tuple->t_data->t_ctid = *tid; @@ -3110,6 +3462,7 @@ recheck_xmax: */ tuple->t_infomask &= ~HEAP_XMAX_COMMITTED; tuple->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderClearHotUpdated(tuple); changed = true; } } @@ -3245,21 +3598,29 @@ heap_restrpos(HeapScanDesc scan) * Perform XLogInsert for a heap-clean operation. Caller must already * have modified the buffer and marked it dirty. * - * Note: for historical reasons, the entries in the unused[] array should - * be zero-based tuple indexes, not one-based. + * Note: prior to Postgres 8.3, the entries in the nowunused[] array were + * zero-based tuple indexes. Now they are one-based like other uses + * of OffsetNumber. */ XLogRecPtr -log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) +log_heap_clean(Relation reln, Buffer buffer, + OffsetNumber *redirected, int nredirected, + OffsetNumber *nowdead, int ndead, + OffsetNumber *nowunused, int nunused, + bool redirect_move) { xl_heap_clean xlrec; + uint8 info; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; /* Caller should not call me on a temp relation */ Assert(!reln->rd_istemp); xlrec.node = reln->rd_node; xlrec.block = BufferGetBlockNumber(buffer); + xlrec.nredirected = nredirected; + xlrec.ndead = ndead; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapClean; @@ -3267,14 +3628,17 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) rdata[0].next = &(rdata[1]); /* - * The unused-offsets array is not actually in the buffer, but pretend - * that it is. When XLogInsert stores the whole buffer, the offsets array - * need not be stored too. + * The OffsetNumber arrays are not actually in the buffer, but we pretend + * that they are. When XLogInsert stores the whole buffer, the offset + * arrays need not be stored too. Note that even if all three arrays + * are empty, we want to expose the buffer as a candidate for whole-page + * storage, since this record type implies a defragmentation operation + * even if no item pointers changed state. */ - if (uncnt > 0) + if (nredirected > 0) { - rdata[1].data = (char *) unused; - rdata[1].len = uncnt * sizeof(OffsetNumber); + rdata[1].data = (char *) redirected; + rdata[1].len = nredirected * sizeof(OffsetNumber) * 2; } else { @@ -3283,9 +3647,38 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) } rdata[1].buffer = buffer; rdata[1].buffer_std = true; - rdata[1].next = NULL; + rdata[1].next = &(rdata[2]); + + if (ndead > 0) + { + rdata[2].data = (char *) nowdead; + rdata[2].len = ndead * sizeof(OffsetNumber); + } + else + { + rdata[2].data = NULL; + rdata[2].len = 0; + } + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = &(rdata[3]); + + if (nunused > 0) + { + rdata[3].data = (char *) nowunused; + rdata[3].len = nunused * sizeof(OffsetNumber); + } + else + { + rdata[3].data = NULL; + rdata[3].len = 0; + } + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata); + info = redirect_move ? XLOG_HEAP2_CLEAN_MOVE : XLOG_HEAP2_CLEAN; + recptr = XLogInsert(RM_HEAP2_ID, info, rdata); return recptr; } @@ -3293,8 +3686,6 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) /* * Perform XLogInsert for a heap-freeze operation. Caller must already * have modified the buffer and marked it dirty. - * - * Unlike log_heap_clean(), the offsets[] entries are one-based. */ XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, @@ -3363,17 +3754,28 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, } xlhdr; int hsize = SizeOfHeapHeader; xl_heap_update xlrec; + uint8 info; XLogRecPtr recptr; XLogRecData rdata[4]; Page page = BufferGetPage(newbuf); - uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE; /* Caller should not call me on a temp relation */ Assert(!reln->rd_istemp); + if (move) + { + Assert(!HeapTupleIsHeapOnly(newtup)); + info = XLOG_HEAP_MOVE; + } + else if (HeapTupleIsHeapOnly(newtup)) + info = XLOG_HEAP_HOT_UPDATE; + else + info = XLOG_HEAP_UPDATE; + xlrec.target.node = reln->rd_node; xlrec.target.tid = from; xlrec.newtid = newtup->t_self; + rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; rdata[0].buffer = InvalidBuffer; @@ -3489,13 +3891,21 @@ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page) return recptr; } +/* + * Handles CLEAN and CLEAN_MOVE record types + */ static void -heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move) { xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); Relation reln; Buffer buffer; Page page; + OffsetNumber *offnum; + OffsetNumber *end; + int nredirected; + int ndead; + int i; if (record->xl_info & XLR_BKP_BLOCK_1) return; @@ -3512,25 +3922,63 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) return; } - if (record->xl_len > SizeOfHeapClean) - { - OffsetNumber *unused; - OffsetNumber *unend; - ItemId lp; + nredirected = xlrec->nredirected; + ndead = xlrec->ndead; + offnum = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); + end = (OffsetNumber *) ((char *) xlrec + record->xl_len); - unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); - unend = (OffsetNumber *) ((char *) xlrec + record->xl_len); + /* Update all redirected or moved line pointers */ + for (i = 0; i < nredirected; i++) + { + OffsetNumber fromoff = *offnum++; + OffsetNumber tooff = *offnum++; + ItemId fromlp = PageGetItemId(page, fromoff); - while (unused < unend) + if (clean_move) { - /* unused[] entries are zero-based */ - lp = PageGetItemId(page, *unused + 1); - ItemIdSetUnused(lp); - unused++; + /* Physically move the "to" item to the "from" slot */ + ItemId tolp = PageGetItemId(page, tooff); + HeapTupleHeader htup; + + *fromlp = *tolp; + ItemIdSetUnused(tolp); + + /* We also have to clear the tuple's heap-only bit */ + Assert(ItemIdIsNormal(fromlp)); + htup = (HeapTupleHeader) PageGetItem(page, fromlp); + Assert(HeapTupleHeaderIsHeapOnly(htup)); + HeapTupleHeaderClearHeapOnly(htup); + } + else + { + /* Just insert a REDIRECT link at fromoff */ + ItemIdSetRedirect(fromlp, tooff); } } - PageRepairFragmentation(page, NULL); + /* Update all now-dead line pointers */ + for (i = 0; i < ndead; i++) + { + OffsetNumber off = *offnum++; + ItemId lp = PageGetItemId(page, off); + + ItemIdSetDead(lp); + } + + /* Update all now-unused line pointers */ + while (offnum < end) + { + OffsetNumber off = *offnum++; + ItemId lp = PageGetItemId(page, off); + + ItemIdSetUnused(lp); + } + + /* + * Finally, repair any fragmentation, and update the page's hint bit + * about whether it has free pointers. + */ + PageRepairFragmentation(page); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); @@ -3655,8 +4103,13 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page); + /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; PageSetLSN(page, lsn); @@ -3736,7 +4189,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) HeapTupleHeaderSetCmin(htup, FirstCommandId); htup->t_ctid = xlrec->target.tid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true); + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_insert_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -3746,10 +4199,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) } /* - * Handles UPDATE & MOVE + * Handles UPDATE, HOT_UPDATE & MOVE */ static void -heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) +heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); Relation reln = XLogOpenRelation(xlrec->target.node); @@ -3808,6 +4261,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) HEAP_XMIN_INVALID | HEAP_MOVED_IN); htup->t_infomask |= HEAP_MOVED_OFF; + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXvac(htup, record->xl_xid); /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; @@ -3819,12 +4273,19 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + if (hot_update) + HeapTupleHeaderSetHotUpdated(htup); + else + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Set forward chain link in t_ctid */ htup->t_ctid = xlrec->newtid; } + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page); + /* * this test is ugly, but necessary to avoid thinking that insert change * is already applied @@ -3914,7 +4375,7 @@ newsame:; /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->newtid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true); + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -3971,6 +4432,7 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) htup->t_infomask |= HEAP_XMAX_SHARED_LOCK; else htup->t_infomask |= HEAP_XMAX_EXCL_LOCK; + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Make sure there is no forward chain link in t_ctid */ @@ -4039,25 +4501,35 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - info &= XLOG_HEAP_OPMASK; - if (info == XLOG_HEAP_INSERT) - heap_xlog_insert(lsn, record); - else if (info == XLOG_HEAP_DELETE) - heap_xlog_delete(lsn, record); - else if (info == XLOG_HEAP_UPDATE) - heap_xlog_update(lsn, record, false); - else if (info == XLOG_HEAP_MOVE) - heap_xlog_update(lsn, record, true); - else if (info == XLOG_HEAP_CLEAN) - heap_xlog_clean(lsn, record); - else if (info == XLOG_HEAP_NEWPAGE) - heap_xlog_newpage(lsn, record); - else if (info == XLOG_HEAP_LOCK) - heap_xlog_lock(lsn, record); - else if (info == XLOG_HEAP_INPLACE) - heap_xlog_inplace(lsn, record); - else - elog(PANIC, "heap_redo: unknown op code %u", info); + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP_INSERT: + heap_xlog_insert(lsn, record); + break; + case XLOG_HEAP_DELETE: + heap_xlog_delete(lsn, record); + break; + case XLOG_HEAP_UPDATE: + heap_xlog_update(lsn, record, false, false); + break; + case XLOG_HEAP_MOVE: + heap_xlog_update(lsn, record, true, false); + break; + case XLOG_HEAP_HOT_UPDATE: + heap_xlog_update(lsn, record, false, true); + break; + case XLOG_HEAP_NEWPAGE: + heap_xlog_newpage(lsn, record); + break; + case XLOG_HEAP_LOCK: + heap_xlog_lock(lsn, record); + break; + case XLOG_HEAP_INPLACE: + heap_xlog_inplace(lsn, record); + break; + default: + elog(PANIC, "heap_redo: unknown op code %u", info); + } } void @@ -4065,11 +4537,20 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - info &= XLOG_HEAP_OPMASK; - if (info == XLOG_HEAP2_FREEZE) - heap_xlog_freeze(lsn, record); - else - elog(PANIC, "heap2_redo: unknown op code %u", info); + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP2_FREEZE: + heap_xlog_freeze(lsn, record); + break; + case XLOG_HEAP2_CLEAN: + heap_xlog_clean(lsn, record, false); + break; + case XLOG_HEAP2_CLEAN_MOVE: + heap_xlog_clean(lsn, record, true); + break; + default: + elog(PANIC, "heap2_redo: unknown op code %u", info); + } } static void @@ -4130,13 +4611,18 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ItemPointerGetBlockNumber(&(xlrec->newtid)), ItemPointerGetOffsetNumber(&(xlrec->newtid))); } - else if (info == XLOG_HEAP_CLEAN) + else if (info == XLOG_HEAP_HOT_UPDATE) { - xl_heap_clean *xlrec = (xl_heap_clean *) rec; + xl_heap_update *xlrec = (xl_heap_update *) rec; - appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u", - xlrec->node.spcNode, xlrec->node.dbNode, - xlrec->node.relNode, xlrec->block); + if (xl_info & XLOG_HEAP_INIT_PAGE) /* can this case happen? */ + appendStringInfo(buf, "hot_update(init): "); + else + appendStringInfo(buf, "hot_update: "); + out_target(buf, &(xlrec->target)); + appendStringInfo(buf, "; new %u/%u", + ItemPointerGetBlockNumber(&(xlrec->newtid)), + ItemPointerGetOffsetNumber(&(xlrec->newtid))); } else if (info == XLOG_HEAP_NEWPAGE) { @@ -4187,6 +4673,22 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->node.relNode, xlrec->block, xlrec->cutoff_xid); } + else if (info == XLOG_HEAP2_CLEAN) + { + xl_heap_clean *xlrec = (xl_heap_clean *) rec; + + appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); + } + else if (info == XLOG_HEAP2_CLEAN_MOVE) + { + xl_heap_clean *xlrec = (xl_heap_clean *) rec; + + appendStringInfo(buf, "clean_move: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); + } else appendStringInfo(buf, "UNKNOWN"); } |