diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 640 |
1 files changed, 571 insertions, 69 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 09a70d813f7..d5a2f9a43d1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.240 2007/09/12 22:10:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.241 2007/09/20 17:56:30 tgl Exp $ * * * INTERFACE ROUTINES @@ -52,6 +52,7 @@ #include "pgstat.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/relcache.h" @@ -64,6 +65,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, bool is_bitmapscan); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move); +static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, + HeapTuple oldtup, HeapTuple newtup); /* ---------------------------------------------------------------- @@ -184,6 +187,11 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) snapshot = scan->rs_snapshot; /* + * Prune and repair fragmentation for the whole page, if possible. + */ + heap_page_prune_opt(scan->rs_rd, buffer, RecentGlobalXmin); + + /* * We must hold share lock on the buffer content while examining tuple * visibility. Afterwards, however, the tuples we have found to be * visible are guaranteed good as long as we hold the buffer pin. @@ -316,7 +324,7 @@ heapgettup(HeapScanDesc scan, * forward scanners. */ scan->rs_syncscan = false; - /* start from last page of the scan */ + /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else @@ -368,6 +376,7 @@ heapgettup(HeapScanDesc scan, dp = (Page) BufferGetPage(scan->rs_cbuf); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); @@ -583,7 +592,7 @@ heapgettup_pagemode(HeapScanDesc scan, * forward scanners. */ scan->rs_syncscan = false; - /* start from last page of the scan */ + /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else @@ -632,6 +641,7 @@ heapgettup_pagemode(HeapScanDesc scan, dp = (Page) BufferGetPage(scan->rs_cbuf); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); @@ -1246,6 +1256,9 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) * for statistical purposes. (This could be the heap rel itself, an * associated index, or NULL to not count the fetch at all.) * + * heap_fetch does not follow HOT chains: only the exact TID requested will + * be fetched. + * * It is somewhat inconsistent that we ereport() on invalid block number but * return false on invalid item number. There are a couple of reasons though. * One is that the caller can relatively easily check the block number for @@ -1390,6 +1403,143 @@ heap_release_fetch(Relation relation, } /* + * heap_hot_search_buffer - search HOT chain for tuple satisfying snapshot + * + * On entry, *tid is the TID of a tuple (either a simple tuple, or the root + * of a HOT chain), and buffer is the buffer holding this tuple. We search + * for the first chain member satisfying the given snapshot. If one is + * found, we update *tid to reference that tuple's offset number, and + * return TRUE. If no match, return FALSE without modifying *tid. + * + * If all_dead is not NULL, we check non-visible tuples to see if they are + * globally dead; *all_dead is set TRUE if all members of the HOT chain + * are vacuumable, FALSE if not. + * + * Unlike heap_fetch, the caller must already have pin and (at least) share + * lock on the buffer; it is still pinned/locked at exit. Also unlike + * heap_fetch, we do not report any pgstats count; caller may do so if wanted. + */ +bool +heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, + bool *all_dead) +{ + Page dp = (Page) BufferGetPage(buffer); + TransactionId prev_xmax = InvalidTransactionId; + OffsetNumber offnum; + bool at_chain_start; + + if (all_dead) + *all_dead = true; + + Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer)); + offnum = ItemPointerGetOffsetNumber(tid); + at_chain_start = true; + + /* Scan through possible multiple members of HOT-chain */ + for (;;) + { + ItemId lp; + HeapTupleData heapTuple; + + /* check for bogus TID */ + if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp)) + break; + + lp = PageGetItemId(dp, offnum); + + /* check for unused, dead, or redirected items */ + if (!ItemIdIsNormal(lp)) + { + /* We should only see a redirect at start of chain */ + if (ItemIdIsRedirected(lp) && at_chain_start) + { + /* Follow the redirect */ + offnum = ItemIdGetRedirect(lp); + at_chain_start = false; + continue; + } + /* else must be end of chain */ + break; + } + + heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp); + heapTuple.t_len = ItemIdGetLength(lp); + + /* + * Shouldn't see a HEAP_ONLY tuple at chain start. + */ + if (at_chain_start && HeapTupleIsHeapOnly(&heapTuple)) + break; + + /* + * The xmin should match the previous xmax value, else chain is broken. + */ + if (TransactionIdIsValid(prev_xmax) && + !TransactionIdEquals(prev_xmax, + HeapTupleHeaderGetXmin(heapTuple.t_data))) + break; + + /* If it's visible per the snapshot, we must return it */ + if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer)) + { + ItemPointerSetOffsetNumber(tid, offnum); + if (all_dead) + *all_dead = false; + return true; + } + + /* + * If we can't see it, maybe no one else can either. At caller + * request, check whether all chain members are dead to all + * transactions. + */ + if (all_dead && *all_dead && + HeapTupleSatisfiesVacuum(heapTuple.t_data, RecentGlobalXmin, + buffer) != HEAPTUPLE_DEAD) + *all_dead = false; + + /* + * Check to see if HOT chain continues past this tuple; if so + * fetch the next offnum and loop around. + */ + if (HeapTupleIsHotUpdated(&heapTuple)) + { + Assert(ItemPointerGetBlockNumber(&heapTuple.t_data->t_ctid) == + ItemPointerGetBlockNumber(tid)); + offnum = ItemPointerGetOffsetNumber(&heapTuple.t_data->t_ctid); + at_chain_start = false; + prev_xmax = HeapTupleHeaderGetXmax(heapTuple.t_data); + } + else + break; /* end of chain */ + } + + return false; +} + +/* + * heap_hot_search - search HOT chain for tuple satisfying snapshot + * + * This has the same API as heap_hot_search_buffer, except that the caller + * does not provide the buffer containing the page, rather we access it + * locally. + */ +bool +heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, + bool *all_dead) +{ + bool result; + Buffer buffer; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return result; +} + +/* * heap_get_latest_tid - get the latest tid of a specified tuple * * Actually, this gets the latest version that is visible according to @@ -1594,6 +1744,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; HeapTupleHeaderSetXmin(tup->t_data, xid); HeapTupleHeaderSetCmin(tup->t_data, cid); @@ -1628,6 +1779,17 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, RelationPutHeapTuple(relation, buffer, heaptup); + /* + * XXX Should we set PageSetPrunable on this page ? + * + * The inserting transaction may eventually abort thus making this tuple + * DEAD and hence available for pruning. Though we don't want to optimize + * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the + * aborted tuple will never be pruned until next vacuum is triggered. + * + * If you do add PageSetPrunable here, add it in heap_xlog_insert too. + */ + MarkBufferDirty(buffer); /* XLOG stuff */ @@ -1904,12 +2066,21 @@ l1: START_CRIT_SECTION(); + /* + * If this transaction commits, the tuple will become DEAD sooner or + * later. Set hint bit that this page is a candidate for pruning. If + * the transaction finally aborts, the subsequent page pruning will be + * a no-op and the hint will be cleared. + */ + PageSetPrunable((Page) dp); + /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleHeaderClearHotUpdated(tp.t_data); HeapTupleHeaderSetXmax(tp.t_data, xid); HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo); /* Make sure there is no forward chain link in t_ctid */ @@ -2045,7 +2216,8 @@ simple_heap_delete(Relation relation, ItemPointer tid) * * On success, the header fields of *newtup are updated to match the new * stored tuple; in particular, newtup->t_self is set to the TID where the - * new tuple was inserted. However, any TOAST changes in the new tuple's + * new tuple was inserted, and its HEAP_ONLY_TUPLE flag is set iff a HOT + * update was done. However, any TOAST changes in the new tuple's * data are not reflected into *newtup. * * In the failure cases, the routine returns the tuple's t_ctid and t_xmax. @@ -2060,6 +2232,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, { HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); + Bitmapset *hot_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; @@ -2072,9 +2245,24 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, pagefree; bool have_tuple_lock = false; bool iscombo; + bool use_hot_update = false; Assert(ItemPointerIsValid(otid)); + /* + * Fetch the list of attributes to be checked for HOT update. This is + * wasted effort if we fail to update or have to put the new tuple on + * a different page. But we must compute the list before obtaining + * buffer lock --- in the worst case, if we are doing an update on one + * of the relevant system catalogs, we could deadlock if we try to + * fetch the list later. In any case, the relcache caches the data + * so this is usually pretty cheap. + * + * Note that we get a copy here, so we need not worry about relcache + * flush happening midway through. + */ + hot_attrs = RelationGetIndexAttrBitmap(relation); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -2208,6 +2396,7 @@ l2: UnlockReleaseBuffer(buffer); if (have_tuple_lock) UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); + bms_free(hot_attrs); return result; } @@ -2227,6 +2416,7 @@ l2: } newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK); + newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED); HeapTupleHeaderSetXmin(newtup->t_data, xid); HeapTupleHeaderSetCmin(newtup->t_data, cid); @@ -2261,17 +2451,20 @@ l2: HeapTupleHasExternal(newtup) || newtup->t_len > TOAST_TUPLE_THRESHOLD); - pagefree = PageGetFreeSpace((Page) dp); + pagefree = PageGetHeapFreeSpace((Page) dp); newtupsize = MAXALIGN(newtup->t_len); if (need_toast || newtupsize > pagefree) { + /* Clear obsolete visibility flags ... */ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleClearHotUpdated(&oldtup); + /* ... and store info about transaction updating this tuple */ HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo); /* temporarily make it look not-updated */ @@ -2324,7 +2517,7 @@ l2: /* Re-acquire the lock on the old tuple's page. */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* Re-check using the up-to-date free space */ - pagefree = PageGetFreeSpace((Page) dp); + pagefree = PageGetHeapFreeSpace((Page) dp); if (newtupsize > pagefree) { /* @@ -2357,18 +2550,66 @@ l2: * one pin is held. */ + if (newbuf == buffer) + { + /* + * Since the new tuple is going into the same page, we might be able + * to do a HOT update. Check if any of the index columns have been + * changed. If not, then HOT update is possible. + */ + if (HeapSatisfiesHOTUpdate(relation, hot_attrs, &oldtup, heaptup)) + use_hot_update = true; + } + else + { + /* Set a hint that the old page could use prune/defrag */ + PageSetFull(dp); + } + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); + /* + * If this transaction commits, the old tuple will become DEAD sooner or + * later. Set hint bit that this page is a candidate for pruning. If + * the transaction finally aborts, the subsequent page pruning will be + * a no-op and the hint will be cleared. + * + * XXX Should we set hint on newbuf as well? If the transaction + * aborts, there would be a prunable tuple in the newbuf; but for now + * we choose not to optimize for aborts. Note that heap_xlog_update + * must be kept in sync if this changes. + */ + PageSetPrunable(dp); + + if (use_hot_update) + { + /* Mark the old tuple as HOT-updated */ + HeapTupleSetHotUpdated(&oldtup); + /* And mark the new tuple as heap-only */ + HeapTupleSetHeapOnly(heaptup); + /* Mark the caller's copy too, in case different from heaptup */ + HeapTupleSetHeapOnly(newtup); + } + else + { + /* Make sure tuples are correctly marked as not-HOT */ + HeapTupleClearHotUpdated(&oldtup); + HeapTupleClearHeapOnly(heaptup); + HeapTupleClearHeapOnly(newtup); + } + RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */ if (!already_marked) { + /* Clear obsolete visibility flags ... */ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + /* ... and store info about transaction updating this tuple */ HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo); } @@ -2427,7 +2668,7 @@ l2: if (have_tuple_lock) UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); - pgstat_count_heap_update(relation); + pgstat_count_heap_update(relation, use_hot_update); /* * If heaptup is a private copy, release it. Don't forget to copy t_self @@ -2439,10 +2680,120 @@ l2: heap_freetuple(heaptup); } + bms_free(hot_attrs); + return HeapTupleMayBeUpdated; } /* + * Check if the specified attribute's value is same in both given tuples. + * Subroutine for HeapSatisfiesHOTUpdate. + */ +static bool +heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, + HeapTuple tup1, HeapTuple tup2) +{ + Datum value1, value2; + bool isnull1, isnull2; + Form_pg_attribute att; + + /* + * If it's a whole-tuple reference, say "not equal". It's not really + * worth supporting this case, since it could only succeed after a + * no-op update, which is hardly a case worth optimizing for. + */ + if (attrnum == 0) + return false; + + /* + * Likewise, automatically say "not equal" for any system attribute + * other than OID and tableOID; we cannot expect these to be consistent + * in a HOT chain, or even to be set correctly yet in the new tuple. + */ + if (attrnum < 0) + { + if (attrnum != ObjectIdAttributeNumber && + attrnum != TableOidAttributeNumber) + return false; + } + + /* + * Extract the corresponding values. XXX this is pretty inefficient + * if there are many indexed columns. Should HeapSatisfiesHOTUpdate + * do a single heap_deform_tuple call on each tuple, instead? But + * that doesn't work for system columns ... + */ + value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1); + value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2); + + /* + * If one value is NULL and other is not, then they are certainly + * not equal + */ + if (isnull1 != isnull2) + return false; + + /* + * If both are NULL, they can be considered equal. + */ + if (isnull1) + return true; + + /* + * We do simple binary comparison of the two datums. This may be overly + * strict because there can be multiple binary representations for the + * same logical value. But we should be OK as long as there are no false + * positives. Using a type-specific equality operator is messy because + * there could be multiple notions of equality in different operator + * classes; furthermore, we cannot safely invoke user-defined functions + * while holding exclusive buffer lock. + */ + if (attrnum <= 0) + { + /* The only allowed system columns are OIDs, so do this */ + return (DatumGetObjectId(value1) == DatumGetObjectId(value2)); + } + else + { + Assert(attrnum <= tupdesc->natts); + att = tupdesc->attrs[attrnum - 1]; + return datumIsEqual(value1, value2, att->attbyval, att->attlen); + } +} + +/* + * Check if the old and new tuples represent a HOT-safe update. To be able + * to do a HOT update, we must not have changed any columns used in index + * definitions. + * + * The set of attributes to be checked is passed in (we dare not try to + * compute it while holding exclusive buffer lock...) NOTE that hot_attrs + * is destructively modified! That is OK since this is invoked at most once + * by heap_update(). + * + * Returns true if safe to do HOT update. + */ +static bool +HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, + HeapTuple oldtup, HeapTuple newtup) +{ + int attrnum; + + while ((attrnum = bms_first_member(hot_attrs)) >= 0) + { + /* Adjust for system attributes */ + attrnum += FirstLowInvalidHeapAttributeNumber; + + /* If the attribute value has changed, we can't do HOT update */ + if (!heap_tuple_attr_equals(RelationGetDescr(relation), attrnum, + oldtup, newtup)) + return false; + } + + return true; +} + +/* * simple_heap_update - replace a tuple * * This routine may be used to update a tuple when concurrent updates of @@ -2865,6 +3216,7 @@ l3: * avoids possibly generating a useless combo CID. */ tuple->t_data->t_infomask = new_infomask; + HeapTupleHeaderClearHotUpdated(tuple->t_data); HeapTupleHeaderSetXmax(tuple->t_data, xid); /* Make sure there is no forward chain link in t_ctid */ tuple->t_data->t_ctid = *tid; @@ -3110,6 +3462,7 @@ recheck_xmax: */ tuple->t_infomask &= ~HEAP_XMAX_COMMITTED; tuple->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderClearHotUpdated(tuple); changed = true; } } @@ -3245,21 +3598,29 @@ heap_restrpos(HeapScanDesc scan) * Perform XLogInsert for a heap-clean operation. Caller must already * have modified the buffer and marked it dirty. * - * Note: for historical reasons, the entries in the unused[] array should - * be zero-based tuple indexes, not one-based. + * Note: prior to Postgres 8.3, the entries in the nowunused[] array were + * zero-based tuple indexes. Now they are one-based like other uses + * of OffsetNumber. */ XLogRecPtr -log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) +log_heap_clean(Relation reln, Buffer buffer, + OffsetNumber *redirected, int nredirected, + OffsetNumber *nowdead, int ndead, + OffsetNumber *nowunused, int nunused, + bool redirect_move) { xl_heap_clean xlrec; + uint8 info; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; /* Caller should not call me on a temp relation */ Assert(!reln->rd_istemp); xlrec.node = reln->rd_node; xlrec.block = BufferGetBlockNumber(buffer); + xlrec.nredirected = nredirected; + xlrec.ndead = ndead; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapClean; @@ -3267,14 +3628,17 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) rdata[0].next = &(rdata[1]); /* - * The unused-offsets array is not actually in the buffer, but pretend - * that it is. When XLogInsert stores the whole buffer, the offsets array - * need not be stored too. + * The OffsetNumber arrays are not actually in the buffer, but we pretend + * that they are. When XLogInsert stores the whole buffer, the offset + * arrays need not be stored too. Note that even if all three arrays + * are empty, we want to expose the buffer as a candidate for whole-page + * storage, since this record type implies a defragmentation operation + * even if no item pointers changed state. */ - if (uncnt > 0) + if (nredirected > 0) { - rdata[1].data = (char *) unused; - rdata[1].len = uncnt * sizeof(OffsetNumber); + rdata[1].data = (char *) redirected; + rdata[1].len = nredirected * sizeof(OffsetNumber) * 2; } else { @@ -3283,9 +3647,38 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) } rdata[1].buffer = buffer; rdata[1].buffer_std = true; - rdata[1].next = NULL; + rdata[1].next = &(rdata[2]); + + if (ndead > 0) + { + rdata[2].data = (char *) nowdead; + rdata[2].len = ndead * sizeof(OffsetNumber); + } + else + { + rdata[2].data = NULL; + rdata[2].len = 0; + } + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = &(rdata[3]); + + if (nunused > 0) + { + rdata[3].data = (char *) nowunused; + rdata[3].len = nunused * sizeof(OffsetNumber); + } + else + { + rdata[3].data = NULL; + rdata[3].len = 0; + } + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata); + info = redirect_move ? XLOG_HEAP2_CLEAN_MOVE : XLOG_HEAP2_CLEAN; + recptr = XLogInsert(RM_HEAP2_ID, info, rdata); return recptr; } @@ -3293,8 +3686,6 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) /* * Perform XLogInsert for a heap-freeze operation. Caller must already * have modified the buffer and marked it dirty. - * - * Unlike log_heap_clean(), the offsets[] entries are one-based. */ XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, @@ -3363,17 +3754,28 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, } xlhdr; int hsize = SizeOfHeapHeader; xl_heap_update xlrec; + uint8 info; XLogRecPtr recptr; XLogRecData rdata[4]; Page page = BufferGetPage(newbuf); - uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE; /* Caller should not call me on a temp relation */ Assert(!reln->rd_istemp); + if (move) + { + Assert(!HeapTupleIsHeapOnly(newtup)); + info = XLOG_HEAP_MOVE; + } + else if (HeapTupleIsHeapOnly(newtup)) + info = XLOG_HEAP_HOT_UPDATE; + else + info = XLOG_HEAP_UPDATE; + xlrec.target.node = reln->rd_node; xlrec.target.tid = from; xlrec.newtid = newtup->t_self; + rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; rdata[0].buffer = InvalidBuffer; @@ -3489,13 +3891,21 @@ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page) return recptr; } +/* + * Handles CLEAN and CLEAN_MOVE record types + */ static void -heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move) { xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); Relation reln; Buffer buffer; Page page; + OffsetNumber *offnum; + OffsetNumber *end; + int nredirected; + int ndead; + int i; if (record->xl_info & XLR_BKP_BLOCK_1) return; @@ -3512,25 +3922,63 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) return; } - if (record->xl_len > SizeOfHeapClean) - { - OffsetNumber *unused; - OffsetNumber *unend; - ItemId lp; + nredirected = xlrec->nredirected; + ndead = xlrec->ndead; + offnum = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); + end = (OffsetNumber *) ((char *) xlrec + record->xl_len); - unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); - unend = (OffsetNumber *) ((char *) xlrec + record->xl_len); + /* Update all redirected or moved line pointers */ + for (i = 0; i < nredirected; i++) + { + OffsetNumber fromoff = *offnum++; + OffsetNumber tooff = *offnum++; + ItemId fromlp = PageGetItemId(page, fromoff); - while (unused < unend) + if (clean_move) { - /* unused[] entries are zero-based */ - lp = PageGetItemId(page, *unused + 1); - ItemIdSetUnused(lp); - unused++; + /* Physically move the "to" item to the "from" slot */ + ItemId tolp = PageGetItemId(page, tooff); + HeapTupleHeader htup; + + *fromlp = *tolp; + ItemIdSetUnused(tolp); + + /* We also have to clear the tuple's heap-only bit */ + Assert(ItemIdIsNormal(fromlp)); + htup = (HeapTupleHeader) PageGetItem(page, fromlp); + Assert(HeapTupleHeaderIsHeapOnly(htup)); + HeapTupleHeaderClearHeapOnly(htup); + } + else + { + /* Just insert a REDIRECT link at fromoff */ + ItemIdSetRedirect(fromlp, tooff); } } - PageRepairFragmentation(page, NULL); + /* Update all now-dead line pointers */ + for (i = 0; i < ndead; i++) + { + OffsetNumber off = *offnum++; + ItemId lp = PageGetItemId(page, off); + + ItemIdSetDead(lp); + } + + /* Update all now-unused line pointers */ + while (offnum < end) + { + OffsetNumber off = *offnum++; + ItemId lp = PageGetItemId(page, off); + + ItemIdSetUnused(lp); + } + + /* + * Finally, repair any fragmentation, and update the page's hint bit + * about whether it has free pointers. + */ + PageRepairFragmentation(page); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); @@ -3655,8 +4103,13 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page); + /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; PageSetLSN(page, lsn); @@ -3736,7 +4189,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) HeapTupleHeaderSetCmin(htup, FirstCommandId); htup->t_ctid = xlrec->target.tid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true); + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_insert_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -3746,10 +4199,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) } /* - * Handles UPDATE & MOVE + * Handles UPDATE, HOT_UPDATE & MOVE */ static void -heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) +heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); Relation reln = XLogOpenRelation(xlrec->target.node); @@ -3808,6 +4261,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) HEAP_XMIN_INVALID | HEAP_MOVED_IN); htup->t_infomask |= HEAP_MOVED_OFF; + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXvac(htup, record->xl_xid); /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; @@ -3819,12 +4273,19 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move) HEAP_XMAX_IS_MULTI | HEAP_IS_LOCKED | HEAP_MOVED); + if (hot_update) + HeapTupleHeaderSetHotUpdated(htup); + else + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Set forward chain link in t_ctid */ htup->t_ctid = xlrec->newtid; } + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page); + /* * this test is ugly, but necessary to avoid thinking that insert change * is already applied @@ -3914,7 +4375,7 @@ newsame:; /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->newtid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true); + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -3971,6 +4432,7 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) htup->t_infomask |= HEAP_XMAX_SHARED_LOCK; else htup->t_infomask |= HEAP_XMAX_EXCL_LOCK; + HeapTupleHeaderClearHotUpdated(htup); HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Make sure there is no forward chain link in t_ctid */ @@ -4039,25 +4501,35 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - info &= XLOG_HEAP_OPMASK; - if (info == XLOG_HEAP_INSERT) - heap_xlog_insert(lsn, record); - else if (info == XLOG_HEAP_DELETE) - heap_xlog_delete(lsn, record); - else if (info == XLOG_HEAP_UPDATE) - heap_xlog_update(lsn, record, false); - else if (info == XLOG_HEAP_MOVE) - heap_xlog_update(lsn, record, true); - else if (info == XLOG_HEAP_CLEAN) - heap_xlog_clean(lsn, record); - else if (info == XLOG_HEAP_NEWPAGE) - heap_xlog_newpage(lsn, record); - else if (info == XLOG_HEAP_LOCK) - heap_xlog_lock(lsn, record); - else if (info == XLOG_HEAP_INPLACE) - heap_xlog_inplace(lsn, record); - else - elog(PANIC, "heap_redo: unknown op code %u", info); + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP_INSERT: + heap_xlog_insert(lsn, record); + break; + case XLOG_HEAP_DELETE: + heap_xlog_delete(lsn, record); + break; + case XLOG_HEAP_UPDATE: + heap_xlog_update(lsn, record, false, false); + break; + case XLOG_HEAP_MOVE: + heap_xlog_update(lsn, record, true, false); + break; + case XLOG_HEAP_HOT_UPDATE: + heap_xlog_update(lsn, record, false, true); + break; + case XLOG_HEAP_NEWPAGE: + heap_xlog_newpage(lsn, record); + break; + case XLOG_HEAP_LOCK: + heap_xlog_lock(lsn, record); + break; + case XLOG_HEAP_INPLACE: + heap_xlog_inplace(lsn, record); + break; + default: + elog(PANIC, "heap_redo: unknown op code %u", info); + } } void @@ -4065,11 +4537,20 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - info &= XLOG_HEAP_OPMASK; - if (info == XLOG_HEAP2_FREEZE) - heap_xlog_freeze(lsn, record); - else - elog(PANIC, "heap2_redo: unknown op code %u", info); + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP2_FREEZE: + heap_xlog_freeze(lsn, record); + break; + case XLOG_HEAP2_CLEAN: + heap_xlog_clean(lsn, record, false); + break; + case XLOG_HEAP2_CLEAN_MOVE: + heap_xlog_clean(lsn, record, true); + break; + default: + elog(PANIC, "heap2_redo: unknown op code %u", info); + } } static void @@ -4130,13 +4611,18 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) ItemPointerGetBlockNumber(&(xlrec->newtid)), ItemPointerGetOffsetNumber(&(xlrec->newtid))); } - else if (info == XLOG_HEAP_CLEAN) + else if (info == XLOG_HEAP_HOT_UPDATE) { - xl_heap_clean *xlrec = (xl_heap_clean *) rec; + xl_heap_update *xlrec = (xl_heap_update *) rec; - appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u", - xlrec->node.spcNode, xlrec->node.dbNode, - xlrec->node.relNode, xlrec->block); + if (xl_info & XLOG_HEAP_INIT_PAGE) /* can this case happen? */ + appendStringInfo(buf, "hot_update(init): "); + else + appendStringInfo(buf, "hot_update: "); + out_target(buf, &(xlrec->target)); + appendStringInfo(buf, "; new %u/%u", + ItemPointerGetBlockNumber(&(xlrec->newtid)), + ItemPointerGetOffsetNumber(&(xlrec->newtid))); } else if (info == XLOG_HEAP_NEWPAGE) { @@ -4187,6 +4673,22 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->node.relNode, xlrec->block, xlrec->cutoff_xid); } + else if (info == XLOG_HEAP2_CLEAN) + { + xl_heap_clean *xlrec = (xl_heap_clean *) rec; + + appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); + } + else if (info == XLOG_HEAP2_CLEAN_MOVE) + { + xl_heap_clean *xlrec = (xl_heap_clean *) rec; + + appendStringInfo(buf, "clean_move: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); + } else appendStringInfo(buf, "UNKNOWN"); } |