aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c640
1 files changed, 571 insertions, 69 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 09a70d813f7..d5a2f9a43d1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.240 2007/09/12 22:10:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.241 2007/09/20 17:56:30 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -52,6 +52,7 @@
#include "pgstat.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
+#include "utils/datum.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
@@ -64,6 +65,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
bool is_bitmapscan);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
+static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
+ HeapTuple oldtup, HeapTuple newtup);
/* ----------------------------------------------------------------
@@ -184,6 +187,11 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
snapshot = scan->rs_snapshot;
/*
+ * Prune and repair fragmentation for the whole page, if possible.
+ */
+ heap_page_prune_opt(scan->rs_rd, buffer, RecentGlobalXmin);
+
+ /*
* We must hold share lock on the buffer content while examining tuple
* visibility. Afterwards, however, the tuples we have found to be
* visible are guaranteed good as long as we hold the buffer pin.
@@ -316,7 +324,7 @@ heapgettup(HeapScanDesc scan,
* forward scanners.
*/
scan->rs_syncscan = false;
- /* start from last page of the scan */
+ /* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
@@ -368,6 +376,7 @@ heapgettup(HeapScanDesc scan,
dp = (Page) BufferGetPage(scan->rs_cbuf);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
+ Assert(ItemIdIsNormal(lpp));
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
@@ -583,7 +592,7 @@ heapgettup_pagemode(HeapScanDesc scan,
* forward scanners.
*/
scan->rs_syncscan = false;
- /* start from last page of the scan */
+ /* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
@@ -632,6 +641,7 @@ heapgettup_pagemode(HeapScanDesc scan,
dp = (Page) BufferGetPage(scan->rs_cbuf);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
+ Assert(ItemIdIsNormal(lpp));
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
@@ -1246,6 +1256,9 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
* for statistical purposes. (This could be the heap rel itself, an
* associated index, or NULL to not count the fetch at all.)
*
+ * heap_fetch does not follow HOT chains: only the exact TID requested will
+ * be fetched.
+ *
* It is somewhat inconsistent that we ereport() on invalid block number but
* return false on invalid item number. There are a couple of reasons though.
* One is that the caller can relatively easily check the block number for
@@ -1390,6 +1403,143 @@ heap_release_fetch(Relation relation,
}
/*
+ * heap_hot_search_buffer - search HOT chain for tuple satisfying snapshot
+ *
+ * On entry, *tid is the TID of a tuple (either a simple tuple, or the root
+ * of a HOT chain), and buffer is the buffer holding this tuple. We search
+ * for the first chain member satisfying the given snapshot. If one is
+ * found, we update *tid to reference that tuple's offset number, and
+ * return TRUE. If no match, return FALSE without modifying *tid.
+ *
+ * If all_dead is not NULL, we check non-visible tuples to see if they are
+ * globally dead; *all_dead is set TRUE if all members of the HOT chain
+ * are vacuumable, FALSE if not.
+ *
+ * Unlike heap_fetch, the caller must already have pin and (at least) share
+ * lock on the buffer; it is still pinned/locked at exit. Also unlike
+ * heap_fetch, we do not report any pgstats count; caller may do so if wanted.
+ */
+bool
+heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
+ bool *all_dead)
+{
+ Page dp = (Page) BufferGetPage(buffer);
+ TransactionId prev_xmax = InvalidTransactionId;
+ OffsetNumber offnum;
+ bool at_chain_start;
+
+ if (all_dead)
+ *all_dead = true;
+
+ Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
+ offnum = ItemPointerGetOffsetNumber(tid);
+ at_chain_start = true;
+
+ /* Scan through possible multiple members of HOT-chain */
+ for (;;)
+ {
+ ItemId lp;
+ HeapTupleData heapTuple;
+
+ /* check for bogus TID */
+ if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
+ break;
+
+ lp = PageGetItemId(dp, offnum);
+
+ /* check for unused, dead, or redirected items */
+ if (!ItemIdIsNormal(lp))
+ {
+ /* We should only see a redirect at start of chain */
+ if (ItemIdIsRedirected(lp) && at_chain_start)
+ {
+ /* Follow the redirect */
+ offnum = ItemIdGetRedirect(lp);
+ at_chain_start = false;
+ continue;
+ }
+ /* else must be end of chain */
+ break;
+ }
+
+ heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
+ heapTuple.t_len = ItemIdGetLength(lp);
+
+ /*
+ * Shouldn't see a HEAP_ONLY tuple at chain start.
+ */
+ if (at_chain_start && HeapTupleIsHeapOnly(&heapTuple))
+ break;
+
+ /*
+ * The xmin should match the previous xmax value, else chain is broken.
+ */
+ if (TransactionIdIsValid(prev_xmax) &&
+ !TransactionIdEquals(prev_xmax,
+ HeapTupleHeaderGetXmin(heapTuple.t_data)))
+ break;
+
+ /* If it's visible per the snapshot, we must return it */
+ if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer))
+ {
+ ItemPointerSetOffsetNumber(tid, offnum);
+ if (all_dead)
+ *all_dead = false;
+ return true;
+ }
+
+ /*
+ * If we can't see it, maybe no one else can either. At caller
+ * request, check whether all chain members are dead to all
+ * transactions.
+ */
+ if (all_dead && *all_dead &&
+ HeapTupleSatisfiesVacuum(heapTuple.t_data, RecentGlobalXmin,
+ buffer) != HEAPTUPLE_DEAD)
+ *all_dead = false;
+
+ /*
+ * Check to see if HOT chain continues past this tuple; if so
+ * fetch the next offnum and loop around.
+ */
+ if (HeapTupleIsHotUpdated(&heapTuple))
+ {
+ Assert(ItemPointerGetBlockNumber(&heapTuple.t_data->t_ctid) ==
+ ItemPointerGetBlockNumber(tid));
+ offnum = ItemPointerGetOffsetNumber(&heapTuple.t_data->t_ctid);
+ at_chain_start = false;
+ prev_xmax = HeapTupleHeaderGetXmax(heapTuple.t_data);
+ }
+ else
+ break; /* end of chain */
+ }
+
+ return false;
+}
+
+/*
+ * heap_hot_search - search HOT chain for tuple satisfying snapshot
+ *
+ * This has the same API as heap_hot_search_buffer, except that the caller
+ * does not provide the buffer containing the page, rather we access it
+ * locally.
+ */
+bool
+heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
+ bool *all_dead)
+{
+ bool result;
+ Buffer buffer;
+
+ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return result;
+}
+
+/*
* heap_get_latest_tid - get the latest tid of a specified tuple
*
* Actually, this gets the latest version that is visible according to
@@ -1594,6 +1744,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
}
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+ tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
HeapTupleHeaderSetXmin(tup->t_data, xid);
HeapTupleHeaderSetCmin(tup->t_data, cid);
@@ -1628,6 +1779,17 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
RelationPutHeapTuple(relation, buffer, heaptup);
+ /*
+ * XXX Should we set PageSetPrunable on this page ?
+ *
+ * The inserting transaction may eventually abort thus making this tuple
+ * DEAD and hence available for pruning. Though we don't want to optimize
+ * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
+ * aborted tuple will never be pruned until next vacuum is triggered.
+ *
+ * If you do add PageSetPrunable here, add it in heap_xlog_insert too.
+ */
+
MarkBufferDirty(buffer);
/* XLOG stuff */
@@ -1904,12 +2066,21 @@ l1:
START_CRIT_SECTION();
+ /*
+ * If this transaction commits, the tuple will become DEAD sooner or
+ * later. Set hint bit that this page is a candidate for pruning. If
+ * the transaction finally aborts, the subsequent page pruning will be
+ * a no-op and the hint will be cleared.
+ */
+ PageSetPrunable((Page) dp);
+
/* store transaction information of xact deleting the tuple */
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
HEAP_XMAX_IS_MULTI |
HEAP_IS_LOCKED |
HEAP_MOVED);
+ HeapTupleHeaderClearHotUpdated(tp.t_data);
HeapTupleHeaderSetXmax(tp.t_data, xid);
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
/* Make sure there is no forward chain link in t_ctid */
@@ -2045,7 +2216,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
*
* On success, the header fields of *newtup are updated to match the new
* stored tuple; in particular, newtup->t_self is set to the TID where the
- * new tuple was inserted. However, any TOAST changes in the new tuple's
+ * new tuple was inserted, and its HEAP_ONLY_TUPLE flag is set iff a HOT
+ * update was done. However, any TOAST changes in the new tuple's
* data are not reflected into *newtup.
*
* In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
@@ -2060,6 +2232,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
{
HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
+ Bitmapset *hot_attrs;
ItemId lp;
HeapTupleData oldtup;
HeapTuple heaptup;
@@ -2072,9 +2245,24 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
pagefree;
bool have_tuple_lock = false;
bool iscombo;
+ bool use_hot_update = false;
Assert(ItemPointerIsValid(otid));
+ /*
+ * Fetch the list of attributes to be checked for HOT update. This is
+ * wasted effort if we fail to update or have to put the new tuple on
+ * a different page. But we must compute the list before obtaining
+ * buffer lock --- in the worst case, if we are doing an update on one
+ * of the relevant system catalogs, we could deadlock if we try to
+ * fetch the list later. In any case, the relcache caches the data
+ * so this is usually pretty cheap.
+ *
+ * Note that we get a copy here, so we need not worry about relcache
+ * flush happening midway through.
+ */
+ hot_attrs = RelationGetIndexAttrBitmap(relation);
+
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -2208,6 +2396,7 @@ l2:
UnlockReleaseBuffer(buffer);
if (have_tuple_lock)
UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+ bms_free(hot_attrs);
return result;
}
@@ -2227,6 +2416,7 @@ l2:
}
newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+ newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
HeapTupleHeaderSetXmin(newtup->t_data, xid);
HeapTupleHeaderSetCmin(newtup->t_data, cid);
@@ -2261,17 +2451,20 @@ l2:
HeapTupleHasExternal(newtup) ||
newtup->t_len > TOAST_TUPLE_THRESHOLD);
- pagefree = PageGetFreeSpace((Page) dp);
+ pagefree = PageGetHeapFreeSpace((Page) dp);
newtupsize = MAXALIGN(newtup->t_len);
if (need_toast || newtupsize > pagefree)
{
+ /* Clear obsolete visibility flags ... */
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
HEAP_XMAX_IS_MULTI |
HEAP_IS_LOCKED |
HEAP_MOVED);
+ HeapTupleClearHotUpdated(&oldtup);
+ /* ... and store info about transaction updating this tuple */
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
/* temporarily make it look not-updated */
@@ -2324,7 +2517,7 @@ l2:
/* Re-acquire the lock on the old tuple's page. */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* Re-check using the up-to-date free space */
- pagefree = PageGetFreeSpace((Page) dp);
+ pagefree = PageGetHeapFreeSpace((Page) dp);
if (newtupsize > pagefree)
{
/*
@@ -2357,18 +2550,66 @@ l2:
* one pin is held.
*/
+ if (newbuf == buffer)
+ {
+ /*
+ * Since the new tuple is going into the same page, we might be able
+ * to do a HOT update. Check if any of the index columns have been
+ * changed. If not, then HOT update is possible.
+ */
+ if (HeapSatisfiesHOTUpdate(relation, hot_attrs, &oldtup, heaptup))
+ use_hot_update = true;
+ }
+ else
+ {
+ /* Set a hint that the old page could use prune/defrag */
+ PageSetFull(dp);
+ }
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
+ /*
+ * If this transaction commits, the old tuple will become DEAD sooner or
+ * later. Set hint bit that this page is a candidate for pruning. If
+ * the transaction finally aborts, the subsequent page pruning will be
+ * a no-op and the hint will be cleared.
+ *
+ * XXX Should we set hint on newbuf as well? If the transaction
+ * aborts, there would be a prunable tuple in the newbuf; but for now
+ * we choose not to optimize for aborts. Note that heap_xlog_update
+ * must be kept in sync if this changes.
+ */
+ PageSetPrunable(dp);
+
+ if (use_hot_update)
+ {
+ /* Mark the old tuple as HOT-updated */
+ HeapTupleSetHotUpdated(&oldtup);
+ /* And mark the new tuple as heap-only */
+ HeapTupleSetHeapOnly(heaptup);
+ /* Mark the caller's copy too, in case different from heaptup */
+ HeapTupleSetHeapOnly(newtup);
+ }
+ else
+ {
+ /* Make sure tuples are correctly marked as not-HOT */
+ HeapTupleClearHotUpdated(&oldtup);
+ HeapTupleClearHeapOnly(heaptup);
+ HeapTupleClearHeapOnly(newtup);
+ }
+
RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */
if (!already_marked)
{
+ /* Clear obsolete visibility flags ... */
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
HEAP_XMAX_IS_MULTI |
HEAP_IS_LOCKED |
HEAP_MOVED);
+ /* ... and store info about transaction updating this tuple */
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
}
@@ -2427,7 +2668,7 @@ l2:
if (have_tuple_lock)
UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
- pgstat_count_heap_update(relation);
+ pgstat_count_heap_update(relation, use_hot_update);
/*
* If heaptup is a private copy, release it. Don't forget to copy t_self
@@ -2439,10 +2680,120 @@ l2:
heap_freetuple(heaptup);
}
+ bms_free(hot_attrs);
+
return HeapTupleMayBeUpdated;
}
/*
+ * Check if the specified attribute's value is same in both given tuples.
+ * Subroutine for HeapSatisfiesHOTUpdate.
+ */
+static bool
+heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
+ HeapTuple tup1, HeapTuple tup2)
+{
+ Datum value1, value2;
+ bool isnull1, isnull2;
+ Form_pg_attribute att;
+
+ /*
+ * If it's a whole-tuple reference, say "not equal". It's not really
+ * worth supporting this case, since it could only succeed after a
+ * no-op update, which is hardly a case worth optimizing for.
+ */
+ if (attrnum == 0)
+ return false;
+
+ /*
+ * Likewise, automatically say "not equal" for any system attribute
+ * other than OID and tableOID; we cannot expect these to be consistent
+ * in a HOT chain, or even to be set correctly yet in the new tuple.
+ */
+ if (attrnum < 0)
+ {
+ if (attrnum != ObjectIdAttributeNumber &&
+ attrnum != TableOidAttributeNumber)
+ return false;
+ }
+
+ /*
+ * Extract the corresponding values. XXX this is pretty inefficient
+ * if there are many indexed columns. Should HeapSatisfiesHOTUpdate
+ * do a single heap_deform_tuple call on each tuple, instead? But
+ * that doesn't work for system columns ...
+ */
+ value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1);
+ value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2);
+
+ /*
+ * If one value is NULL and other is not, then they are certainly
+ * not equal
+ */
+ if (isnull1 != isnull2)
+ return false;
+
+ /*
+ * If both are NULL, they can be considered equal.
+ */
+ if (isnull1)
+ return true;
+
+ /*
+ * We do simple binary comparison of the two datums. This may be overly
+ * strict because there can be multiple binary representations for the
+ * same logical value. But we should be OK as long as there are no false
+ * positives. Using a type-specific equality operator is messy because
+ * there could be multiple notions of equality in different operator
+ * classes; furthermore, we cannot safely invoke user-defined functions
+ * while holding exclusive buffer lock.
+ */
+ if (attrnum <= 0)
+ {
+ /* The only allowed system columns are OIDs, so do this */
+ return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
+ }
+ else
+ {
+ Assert(attrnum <= tupdesc->natts);
+ att = tupdesc->attrs[attrnum - 1];
+ return datumIsEqual(value1, value2, att->attbyval, att->attlen);
+ }
+}
+
+/*
+ * Check if the old and new tuples represent a HOT-safe update. To be able
+ * to do a HOT update, we must not have changed any columns used in index
+ * definitions.
+ *
+ * The set of attributes to be checked is passed in (we dare not try to
+ * compute it while holding exclusive buffer lock...) NOTE that hot_attrs
+ * is destructively modified! That is OK since this is invoked at most once
+ * by heap_update().
+ *
+ * Returns true if safe to do HOT update.
+ */
+static bool
+HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
+ HeapTuple oldtup, HeapTuple newtup)
+{
+ int attrnum;
+
+ while ((attrnum = bms_first_member(hot_attrs)) >= 0)
+ {
+ /* Adjust for system attributes */
+ attrnum += FirstLowInvalidHeapAttributeNumber;
+
+ /* If the attribute value has changed, we can't do HOT update */
+ if (!heap_tuple_attr_equals(RelationGetDescr(relation), attrnum,
+ oldtup, newtup))
+ return false;
+ }
+
+ return true;
+}
+
+/*
* simple_heap_update - replace a tuple
*
* This routine may be used to update a tuple when concurrent updates of
@@ -2865,6 +3216,7 @@ l3:
* avoids possibly generating a useless combo CID.
*/
tuple->t_data->t_infomask = new_infomask;
+ HeapTupleHeaderClearHotUpdated(tuple->t_data);
HeapTupleHeaderSetXmax(tuple->t_data, xid);
/* Make sure there is no forward chain link in t_ctid */
tuple->t_data->t_ctid = *tid;
@@ -3110,6 +3462,7 @@ recheck_xmax:
*/
tuple->t_infomask &= ~HEAP_XMAX_COMMITTED;
tuple->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderClearHotUpdated(tuple);
changed = true;
}
}
@@ -3245,21 +3598,29 @@ heap_restrpos(HeapScanDesc scan)
* Perform XLogInsert for a heap-clean operation. Caller must already
* have modified the buffer and marked it dirty.
*
- * Note: for historical reasons, the entries in the unused[] array should
- * be zero-based tuple indexes, not one-based.
+ * Note: prior to Postgres 8.3, the entries in the nowunused[] array were
+ * zero-based tuple indexes. Now they are one-based like other uses
+ * of OffsetNumber.
*/
XLogRecPtr
-log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
+log_heap_clean(Relation reln, Buffer buffer,
+ OffsetNumber *redirected, int nredirected,
+ OffsetNumber *nowdead, int ndead,
+ OffsetNumber *nowunused, int nunused,
+ bool redirect_move)
{
xl_heap_clean xlrec;
+ uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ XLogRecData rdata[4];
/* Caller should not call me on a temp relation */
Assert(!reln->rd_istemp);
xlrec.node = reln->rd_node;
xlrec.block = BufferGetBlockNumber(buffer);
+ xlrec.nredirected = nredirected;
+ xlrec.ndead = ndead;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapClean;
@@ -3267,14 +3628,17 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
rdata[0].next = &(rdata[1]);
/*
- * The unused-offsets array is not actually in the buffer, but pretend
- * that it is. When XLogInsert stores the whole buffer, the offsets array
- * need not be stored too.
+ * The OffsetNumber arrays are not actually in the buffer, but we pretend
+ * that they are. When XLogInsert stores the whole buffer, the offset
+ * arrays need not be stored too. Note that even if all three arrays
+ * are empty, we want to expose the buffer as a candidate for whole-page
+ * storage, since this record type implies a defragmentation operation
+ * even if no item pointers changed state.
*/
- if (uncnt > 0)
+ if (nredirected > 0)
{
- rdata[1].data = (char *) unused;
- rdata[1].len = uncnt * sizeof(OffsetNumber);
+ rdata[1].data = (char *) redirected;
+ rdata[1].len = nredirected * sizeof(OffsetNumber) * 2;
}
else
{
@@ -3283,9 +3647,38 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
}
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ rdata[1].next = &(rdata[2]);
+
+ if (ndead > 0)
+ {
+ rdata[2].data = (char *) nowdead;
+ rdata[2].len = ndead * sizeof(OffsetNumber);
+ }
+ else
+ {
+ rdata[2].data = NULL;
+ rdata[2].len = 0;
+ }
+ rdata[2].buffer = buffer;
+ rdata[2].buffer_std = true;
+ rdata[2].next = &(rdata[3]);
+
+ if (nunused > 0)
+ {
+ rdata[3].data = (char *) nowunused;
+ rdata[3].len = nunused * sizeof(OffsetNumber);
+ }
+ else
+ {
+ rdata[3].data = NULL;
+ rdata[3].len = 0;
+ }
+ rdata[3].buffer = buffer;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);
+ info = redirect_move ? XLOG_HEAP2_CLEAN_MOVE : XLOG_HEAP2_CLEAN;
+ recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
return recptr;
}
@@ -3293,8 +3686,6 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt)
/*
* Perform XLogInsert for a heap-freeze operation. Caller must already
* have modified the buffer and marked it dirty.
- *
- * Unlike log_heap_clean(), the offsets[] entries are one-based.
*/
XLogRecPtr
log_heap_freeze(Relation reln, Buffer buffer,
@@ -3363,17 +3754,28 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
} xlhdr;
int hsize = SizeOfHeapHeader;
xl_heap_update xlrec;
+ uint8 info;
XLogRecPtr recptr;
XLogRecData rdata[4];
Page page = BufferGetPage(newbuf);
- uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
/* Caller should not call me on a temp relation */
Assert(!reln->rd_istemp);
+ if (move)
+ {
+ Assert(!HeapTupleIsHeapOnly(newtup));
+ info = XLOG_HEAP_MOVE;
+ }
+ else if (HeapTupleIsHeapOnly(newtup))
+ info = XLOG_HEAP_HOT_UPDATE;
+ else
+ info = XLOG_HEAP_UPDATE;
+
xlrec.target.node = reln->rd_node;
xlrec.target.tid = from;
xlrec.newtid = newtup->t_self;
+
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
rdata[0].buffer = InvalidBuffer;
@@ -3489,13 +3891,21 @@ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
return recptr;
}
+/*
+ * Handles CLEAN and CLEAN_MOVE record types
+ */
static void
-heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
{
xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
+ OffsetNumber *offnum;
+ OffsetNumber *end;
+ int nredirected;
+ int ndead;
+ int i;
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
@@ -3512,25 +3922,63 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
return;
}
- if (record->xl_len > SizeOfHeapClean)
- {
- OffsetNumber *unused;
- OffsetNumber *unend;
- ItemId lp;
+ nredirected = xlrec->nredirected;
+ ndead = xlrec->ndead;
+ offnum = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
+ end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
- unused = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
- unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
+ /* Update all redirected or moved line pointers */
+ for (i = 0; i < nredirected; i++)
+ {
+ OffsetNumber fromoff = *offnum++;
+ OffsetNumber tooff = *offnum++;
+ ItemId fromlp = PageGetItemId(page, fromoff);
- while (unused < unend)
+ if (clean_move)
{
- /* unused[] entries are zero-based */
- lp = PageGetItemId(page, *unused + 1);
- ItemIdSetUnused(lp);
- unused++;
+ /* Physically move the "to" item to the "from" slot */
+ ItemId tolp = PageGetItemId(page, tooff);
+ HeapTupleHeader htup;
+
+ *fromlp = *tolp;
+ ItemIdSetUnused(tolp);
+
+ /* We also have to clear the tuple's heap-only bit */
+ Assert(ItemIdIsNormal(fromlp));
+ htup = (HeapTupleHeader) PageGetItem(page, fromlp);
+ Assert(HeapTupleHeaderIsHeapOnly(htup));
+ HeapTupleHeaderClearHeapOnly(htup);
+ }
+ else
+ {
+ /* Just insert a REDIRECT link at fromoff */
+ ItemIdSetRedirect(fromlp, tooff);
}
}
- PageRepairFragmentation(page, NULL);
+ /* Update all now-dead line pointers */
+ for (i = 0; i < ndead; i++)
+ {
+ OffsetNumber off = *offnum++;
+ ItemId lp = PageGetItemId(page, off);
+
+ ItemIdSetDead(lp);
+ }
+
+ /* Update all now-unused line pointers */
+ while (offnum < end)
+ {
+ OffsetNumber off = *offnum++;
+ ItemId lp = PageGetItemId(page, off);
+
+ ItemIdSetUnused(lp);
+ }
+
+ /*
+ * Finally, repair any fragmentation, and update the page's hint bit
+ * about whether it has free pointers.
+ */
+ PageRepairFragmentation(page);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
@@ -3655,8 +4103,13 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
HEAP_XMAX_IS_MULTI |
HEAP_IS_LOCKED |
HEAP_MOVED);
+ HeapTupleHeaderClearHotUpdated(htup);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+
+ /* Mark the page as a candidate for pruning */
+ PageSetPrunable(page);
+
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = xlrec->target.tid;
PageSetLSN(page, lsn);
@@ -3736,7 +4189,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
HeapTupleHeaderSetCmin(htup, FirstCommandId);
htup->t_ctid = xlrec->target.tid;
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true);
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_insert_redo: failed to add tuple");
PageSetLSN(page, lsn);
@@ -3746,10 +4199,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
}
/*
- * Handles UPDATE & MOVE
+ * Handles UPDATE, HOT_UPDATE & MOVE
*/
static void
-heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
+heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
{
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
Relation reln = XLogOpenRelation(xlrec->target.node);
@@ -3808,6 +4261,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
HEAP_XMIN_INVALID |
HEAP_MOVED_IN);
htup->t_infomask |= HEAP_MOVED_OFF;
+ HeapTupleHeaderClearHotUpdated(htup);
HeapTupleHeaderSetXvac(htup, record->xl_xid);
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = xlrec->target.tid;
@@ -3819,12 +4273,19 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
HEAP_XMAX_IS_MULTI |
HEAP_IS_LOCKED |
HEAP_MOVED);
+ if (hot_update)
+ HeapTupleHeaderSetHotUpdated(htup);
+ else
+ HeapTupleHeaderClearHotUpdated(htup);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Set forward chain link in t_ctid */
htup->t_ctid = xlrec->newtid;
}
+ /* Mark the page as a candidate for pruning */
+ PageSetPrunable(page);
+
/*
* this test is ugly, but necessary to avoid thinking that insert change
* is already applied
@@ -3914,7 +4375,7 @@ newsame:;
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = xlrec->newtid;
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true);
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_update_redo: failed to add tuple");
PageSetLSN(page, lsn);
@@ -3971,6 +4432,7 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
htup->t_infomask |= HEAP_XMAX_SHARED_LOCK;
else
htup->t_infomask |= HEAP_XMAX_EXCL_LOCK;
+ HeapTupleHeaderClearHotUpdated(htup);
HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Make sure there is no forward chain link in t_ctid */
@@ -4039,25 +4501,35 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- info &= XLOG_HEAP_OPMASK;
- if (info == XLOG_HEAP_INSERT)
- heap_xlog_insert(lsn, record);
- else if (info == XLOG_HEAP_DELETE)
- heap_xlog_delete(lsn, record);
- else if (info == XLOG_HEAP_UPDATE)
- heap_xlog_update(lsn, record, false);
- else if (info == XLOG_HEAP_MOVE)
- heap_xlog_update(lsn, record, true);
- else if (info == XLOG_HEAP_CLEAN)
- heap_xlog_clean(lsn, record);
- else if (info == XLOG_HEAP_NEWPAGE)
- heap_xlog_newpage(lsn, record);
- else if (info == XLOG_HEAP_LOCK)
- heap_xlog_lock(lsn, record);
- else if (info == XLOG_HEAP_INPLACE)
- heap_xlog_inplace(lsn, record);
- else
- elog(PANIC, "heap_redo: unknown op code %u", info);
+ switch (info & XLOG_HEAP_OPMASK)
+ {
+ case XLOG_HEAP_INSERT:
+ heap_xlog_insert(lsn, record);
+ break;
+ case XLOG_HEAP_DELETE:
+ heap_xlog_delete(lsn, record);
+ break;
+ case XLOG_HEAP_UPDATE:
+ heap_xlog_update(lsn, record, false, false);
+ break;
+ case XLOG_HEAP_MOVE:
+ heap_xlog_update(lsn, record, true, false);
+ break;
+ case XLOG_HEAP_HOT_UPDATE:
+ heap_xlog_update(lsn, record, false, true);
+ break;
+ case XLOG_HEAP_NEWPAGE:
+ heap_xlog_newpage(lsn, record);
+ break;
+ case XLOG_HEAP_LOCK:
+ heap_xlog_lock(lsn, record);
+ break;
+ case XLOG_HEAP_INPLACE:
+ heap_xlog_inplace(lsn, record);
+ break;
+ default:
+ elog(PANIC, "heap_redo: unknown op code %u", info);
+ }
}
void
@@ -4065,11 +4537,20 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- info &= XLOG_HEAP_OPMASK;
- if (info == XLOG_HEAP2_FREEZE)
- heap_xlog_freeze(lsn, record);
- else
- elog(PANIC, "heap2_redo: unknown op code %u", info);
+ switch (info & XLOG_HEAP_OPMASK)
+ {
+ case XLOG_HEAP2_FREEZE:
+ heap_xlog_freeze(lsn, record);
+ break;
+ case XLOG_HEAP2_CLEAN:
+ heap_xlog_clean(lsn, record, false);
+ break;
+ case XLOG_HEAP2_CLEAN_MOVE:
+ heap_xlog_clean(lsn, record, true);
+ break;
+ default:
+ elog(PANIC, "heap2_redo: unknown op code %u", info);
+ }
}
static void
@@ -4130,13 +4611,18 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid)));
}
- else if (info == XLOG_HEAP_CLEAN)
+ else if (info == XLOG_HEAP_HOT_UPDATE)
{
- xl_heap_clean *xlrec = (xl_heap_clean *) rec;
+ xl_heap_update *xlrec = (xl_heap_update *) rec;
- appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u",
- xlrec->node.spcNode, xlrec->node.dbNode,
- xlrec->node.relNode, xlrec->block);
+ if (xl_info & XLOG_HEAP_INIT_PAGE) /* can this case happen? */
+ appendStringInfo(buf, "hot_update(init): ");
+ else
+ appendStringInfo(buf, "hot_update: ");
+ out_target(buf, &(xlrec->target));
+ appendStringInfo(buf, "; new %u/%u",
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ ItemPointerGetOffsetNumber(&(xlrec->newtid)));
}
else if (info == XLOG_HEAP_NEWPAGE)
{
@@ -4187,6 +4673,22 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->node.relNode, xlrec->block,
xlrec->cutoff_xid);
}
+ else if (info == XLOG_HEAP2_CLEAN)
+ {
+ xl_heap_clean *xlrec = (xl_heap_clean *) rec;
+
+ appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode, xlrec->block);
+ }
+ else if (info == XLOG_HEAP2_CLEAN_MOVE)
+ {
+ xl_heap_clean *xlrec = (xl_heap_clean *) rec;
+
+ appendStringInfo(buf, "clean_move: rel %u/%u/%u; blk %u",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode, xlrec->block);
+ }
else
appendStringInfo(buf, "UNKNOWN");
}