aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c473
1 files changed, 112 insertions, 361 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 34bc60f625f..cc67dd813d2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -91,9 +91,6 @@ static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
static TM_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple,
ItemPointer ctid, TransactionId xid,
LockTupleMode mode);
-static int heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
- xl_heap_freeze_plan *plans_out,
- OffsetNumber *offsets_out);
static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
uint16 *new_infomask2);
static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax,
@@ -6746,180 +6743,19 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
/* Now WAL-log freezing if necessary */
if (RelationNeedsWAL(rel))
{
- xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
- OffsetNumber offsets[MaxHeapTuplesPerPage];
- int nplans;
- xl_heap_freeze_page xlrec;
- XLogRecPtr recptr;
-
- /* Prepare deduplicated representation for use in WAL record */
- nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets);
-
- xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
- xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
- xlrec.nplans = nplans;
-
- XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
-
- /*
- * The freeze plan array and offset array are not actually in the
- * buffer, but pretend that they are. When XLogInsert stores the
- * whole buffer, the arrays need not be stored too.
- */
- XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
- XLogRegisterBufData(0, (char *) plans,
- nplans * sizeof(xl_heap_freeze_plan));
- XLogRegisterBufData(0, (char *) offsets,
- ntuples * sizeof(OffsetNumber));
-
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
-
- PageSetLSN(page, recptr);
+ log_heap_prune_and_freeze(rel, buffer, snapshotConflictHorizon,
+ false, /* no cleanup lock required */
+ PRUNE_VACUUM_SCAN,
+ tuples, ntuples,
+ NULL, 0, /* redirected */
+ NULL, 0, /* dead */
+ NULL, 0); /* unused */
}
END_CRIT_SECTION();
}
/*
- * Comparator used to deduplicate XLOG_HEAP2_FREEZE_PAGE freeze plans
- */
-static int
-heap_log_freeze_cmp(const void *arg1, const void *arg2)
-{
- HeapTupleFreeze *frz1 = (HeapTupleFreeze *) arg1;
- HeapTupleFreeze *frz2 = (HeapTupleFreeze *) arg2;
-
- if (frz1->xmax < frz2->xmax)
- return -1;
- else if (frz1->xmax > frz2->xmax)
- return 1;
-
- if (frz1->t_infomask2 < frz2->t_infomask2)
- return -1;
- else if (frz1->t_infomask2 > frz2->t_infomask2)
- return 1;
-
- if (frz1->t_infomask < frz2->t_infomask)
- return -1;
- else if (frz1->t_infomask > frz2->t_infomask)
- return 1;
-
- if (frz1->frzflags < frz2->frzflags)
- return -1;
- else if (frz1->frzflags > frz2->frzflags)
- return 1;
-
- /*
- * heap_log_freeze_eq would consider these tuple-wise plans to be equal.
- * (So the tuples will share a single canonical freeze plan.)
- *
- * We tiebreak on page offset number to keep each freeze plan's page
- * offset number array individually sorted. (Unnecessary, but be tidy.)
- */
- if (frz1->offset < frz2->offset)
- return -1;
- else if (frz1->offset > frz2->offset)
- return 1;
-
- Assert(false);
- return 0;
-}
-
-/*
- * Compare fields that describe actions required to freeze tuple with caller's
- * open plan. If everything matches then the frz tuple plan is equivalent to
- * caller's plan.
- */
-static inline bool
-heap_log_freeze_eq(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
-{
- if (plan->xmax == frz->xmax &&
- plan->t_infomask2 == frz->t_infomask2 &&
- plan->t_infomask == frz->t_infomask &&
- plan->frzflags == frz->frzflags)
- return true;
-
- /* Caller must call heap_log_freeze_new_plan again for frz */
- return false;
-}
-
-/*
- * Start new plan initialized using tuple-level actions. At least one tuple
- * will have steps required to freeze described by caller's plan during REDO.
- */
-static inline void
-heap_log_freeze_new_plan(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
-{
- plan->xmax = frz->xmax;
- plan->t_infomask2 = frz->t_infomask2;
- plan->t_infomask = frz->t_infomask;
- plan->frzflags = frz->frzflags;
- plan->ntuples = 1; /* for now */
-}
-
-/*
- * Deduplicate tuple-based freeze plans so that each distinct set of
- * processing steps is only stored once in XLOG_HEAP2_FREEZE_PAGE records.
- * Called during original execution of freezing (for logged relations).
- *
- * Return value is number of plans set in *plans_out for caller. Also writes
- * an array of offset numbers into *offsets_out output argument for caller
- * (actually there is one array per freeze plan, but that's not of immediate
- * concern to our caller).
- */
-static int
-heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
- xl_heap_freeze_plan *plans_out,
- OffsetNumber *offsets_out)
-{
- int nplans = 0;
-
- /* Sort tuple-based freeze plans in the order required to deduplicate */
- qsort(tuples, ntuples, sizeof(HeapTupleFreeze), heap_log_freeze_cmp);
-
- for (int i = 0; i < ntuples; i++)
- {
- HeapTupleFreeze *frz = tuples + i;
-
- if (i == 0)
- {
- /* New canonical freeze plan starting with first tup */
- heap_log_freeze_new_plan(plans_out, frz);
- nplans++;
- }
- else if (heap_log_freeze_eq(plans_out, frz))
- {
- /* tup matches open canonical plan -- include tup in it */
- Assert(offsets_out[i - 1] < frz->offset);
- plans_out->ntuples++;
- }
- else
- {
- /* Tup doesn't match current plan -- done with it now */
- plans_out++;
-
- /* New canonical freeze plan starting with this tup */
- heap_log_freeze_new_plan(plans_out, frz);
- nplans++;
- }
-
- /*
- * Save page offset number in dedicated buffer in passing.
- *
- * REDO routine relies on the record's offset numbers array grouping
- * offset numbers by freeze plan. The sort order within each grouping
- * is ascending offset number order, just to keep things tidy.
- */
- offsets_out[i] = frz->offset;
- }
-
- Assert(nplans > 0 && nplans <= ntuples);
-
- return nplans;
-}
-
-/*
* heap_freeze_tuple
* Freeze tuple in place, without WAL logging.
*
@@ -7892,10 +7728,10 @@ heap_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate)
* must have considered the original tuple header as part of
* generating its own snapshotConflictHorizon value.
*
- * Relying on XLOG_HEAP2_PRUNE records like this is the same
- * strategy that index vacuuming uses in all cases. Index VACUUM
- * WAL records don't even have a snapshotConflictHorizon field of
- * their own for this reason.
+ * Relying on XLOG_HEAP2_PRUNE_VACUUM_SCAN records like this is
+ * the same strategy that index vacuuming uses in all cases. Index
+ * VACUUM WAL records don't even have a snapshotConflictHorizon
+ * field of their own for this reason.
*/
if (!ItemIdIsNormal(lp))
break;
@@ -8753,162 +8589,149 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
}
/*
- * Handles XLOG_HEAP2_PRUNE record type.
- *
- * Acquires a full cleanup lock.
+ * Replay XLOG_HEAP2_PRUNE_* records.
*/
static void
-heap_xlog_prune(XLogReaderState *record)
+heap_xlog_prune_freeze(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_prune *xlrec = (xl_heap_prune *) XLogRecGetData(record);
+ char *maindataptr = XLogRecGetData(record);
+ xl_heap_prune xlrec;
Buffer buffer;
RelFileLocator rlocator;
BlockNumber blkno;
XLogRedoAction action;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+ memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
+ maindataptr += SizeOfHeapPrune;
/*
- * We're about to remove tuples. In Hot Standby mode, ensure that there's
- * no queries running for which the removed tuples are still visible.
+ * We will take an ordinary exclusive lock or a cleanup lock depending on
+ * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive
+ * lock, we better not be doing anything that requires moving existing
+ * tuple data.
*/
- if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
- xlrec->isCatalogRel,
- rlocator);
+ Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
+ (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
/*
- * If we have a full-page image, restore it (using a cleanup lock) and
- * we're done.
+ * We are about to remove and/or freeze tuples. In Hot Standby mode,
+ * ensure that there are no queries running for which the removed tuples
+ * are still visible or which still consider the frozen xids as running.
+ * The conflict horizon XID comes after xl_heap_prune.
*/
- action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true,
+ if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
+ {
+ TransactionId snapshot_conflict_horizon;
+
+ /* memcpy() because snapshot_conflict_horizon is stored unaligned */
+ memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
+ maindataptr += sizeof(TransactionId);
+
+ if (InHotStandby)
+ ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
+ (xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
+ rlocator);
+ }
+
+ /*
+ * If we have a full-page image, restore it and we're done.
+ */
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+ (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
&buffer);
if (action == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
- OffsetNumber *end;
OffsetNumber *redirected;
OffsetNumber *nowdead;
OffsetNumber *nowunused;
int nredirected;
int ndead;
int nunused;
+ int nplans;
Size datalen;
+ xlhp_freeze_plan *plans;
+ OffsetNumber *frz_offsets;
+ char *dataptr = XLogRecGetBlockData(record, 0, &datalen);
- redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
- nredirected = xlrec->nredirected;
- ndead = xlrec->ndead;
- end = (OffsetNumber *) ((char *) redirected + datalen);
- nowdead = redirected + (nredirected * 2);
- nowunused = nowdead + ndead;
- nunused = (end - nowunused);
- Assert(nunused >= 0);
-
- /* Update all line pointers per the record, and repair fragmentation */
- heap_page_prune_execute(buffer,
- redirected, nredirected,
- nowdead, ndead,
- nowunused, nunused);
+ heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
+ &nplans, &plans, &frz_offsets,
+ &nredirected, &redirected,
+ &ndead, &nowdead,
+ &nunused, &nowunused);
/*
- * Note: we don't worry about updating the page's prunability hints.
- * At worst this will cause an extra prune cycle to occur soon.
+ * Update all line pointers per the record, and repair fragmentation
+ * if needed.
*/
+ if (nredirected > 0 || ndead > 0 || nunused > 0)
+ heap_page_prune_execute(buffer,
+ (xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
+ redirected, nredirected,
+ nowdead, ndead,
+ nowunused, nunused);
+
+ /* Freeze tuples */
+ for (int p = 0; p < nplans; p++)
+ {
+ HeapTupleFreeze frz;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
+ /*
+ * Convert freeze plan representation from WAL record into
+ * per-tuple format used by heap_execute_freeze_tuple
+ */
+ frz.xmax = plans[p].xmax;
+ frz.t_infomask2 = plans[p].t_infomask2;
+ frz.t_infomask = plans[p].t_infomask;
+ frz.frzflags = plans[p].frzflags;
+ frz.offset = InvalidOffsetNumber; /* unused, but be tidy */
- if (BufferIsValid(buffer))
- {
- Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
+ for (int i = 0; i < plans[p].ntuples; i++)
+ {
+ OffsetNumber offset = *(frz_offsets++);
+ ItemId lp;
+ HeapTupleHeader tuple;
- UnlockReleaseBuffer(buffer);
+ lp = PageGetItemId(page, offset);
+ tuple = (HeapTupleHeader) PageGetItem(page, lp);
+ heap_execute_freeze_tuple(tuple, &frz);
+ }
+ }
+
+ /* There should be no more data */
+ Assert((char *) frz_offsets == dataptr + datalen);
/*
- * After pruning records from a page, it's useful to update the FSM
- * about it, as it may cause the page become target for insertions
- * later even if vacuum decides not to visit it (which is possible if
- * gets marked all-visible.)
- *
- * Do this regardless of a full-page image being applied, since the
- * FSM data is not in the page anyway.
+ * Note: we don't worry about updating the page's prunability hints.
+ * At worst this will cause an extra prune cycle to occur soon.
*/
- XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
- }
-}
-
-/*
- * Handles XLOG_HEAP2_VACUUM record type.
- *
- * Acquires an ordinary exclusive lock only.
- */
-static void
-heap_xlog_vacuum(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_vacuum *xlrec = (xl_heap_vacuum *) XLogRecGetData(record);
- Buffer buffer;
- BlockNumber blkno;
- XLogRedoAction action;
-
- /*
- * If we have a full-page image, restore it (without using a cleanup lock)
- * and we're done.
- */
- action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, false,
- &buffer);
- if (action == BLK_NEEDS_REDO)
- {
- Page page = (Page) BufferGetPage(buffer);
- OffsetNumber *nowunused;
- Size datalen;
- OffsetNumber *offnum;
-
- nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
- /* Shouldn't be a record unless there's something to do */
- Assert(xlrec->nunused > 0);
-
- /* Update all now-unused line pointers */
- offnum = nowunused;
- for (int i = 0; i < xlrec->nunused; i++)
- {
- OffsetNumber off = *offnum++;
- ItemId lp = PageGetItemId(page, off);
-
- Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp));
- ItemIdSetUnused(lp);
- }
-
- /* Attempt to truncate line pointer array now */
- PageTruncateLinePointerArray(page);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
+ /*
+ * If we released any space or line pointers, update the free space map.
+ *
+ * Do this regardless of a full-page image being applied, since the FSM
+ * data is not in the page anyway.
+ */
if (BufferIsValid(buffer))
{
- Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
- RelFileLocator rlocator;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+ if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
+ XLHP_HAS_DEAD_ITEMS |
+ XLHP_HAS_NOW_UNUSED_ITEMS))
+ {
+ Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(buffer);
- /*
- * After vacuuming LP_DEAD items from a page, it's useful to update
- * the FSM about it, as it may cause the page become target for
- * insertions later even if vacuum decides not to visit it (which is
- * possible if gets marked all-visible.)
- *
- * Do this regardless of a full-page image being applied, since the
- * FSM data is not in the page anyway.
- */
- XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+ XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+ }
+ else
+ UnlockReleaseBuffer(buffer);
}
}
@@ -9050,74 +8873,6 @@ heap_xlog_visible(XLogReaderState *record)
}
/*
- * Replay XLOG_HEAP2_FREEZE_PAGE records
- */
-static void
-heap_xlog_freeze_page(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
- Buffer buffer;
-
- /*
- * In Hot Standby mode, ensure that there's no queries running which still
- * consider the frozen xids as running.
- */
- if (InHotStandby)
- {
- RelFileLocator rlocator;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
- xlrec->isCatalogRel,
- rlocator);
- }
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- Page page = BufferGetPage(buffer);
- xl_heap_freeze_plan *plans;
- OffsetNumber *offsets;
- int curoff = 0;
-
- plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
- offsets = (OffsetNumber *) ((char *) plans +
- (xlrec->nplans *
- sizeof(xl_heap_freeze_plan)));
- for (int p = 0; p < xlrec->nplans; p++)
- {
- HeapTupleFreeze frz;
-
- /*
- * Convert freeze plan representation from WAL record into
- * per-tuple format used by heap_execute_freeze_tuple
- */
- frz.xmax = plans[p].xmax;
- frz.t_infomask2 = plans[p].t_infomask2;
- frz.t_infomask = plans[p].t_infomask;
- frz.frzflags = plans[p].frzflags;
- frz.offset = InvalidOffsetNumber; /* unused, but be tidy */
-
- for (int i = 0; i < plans[p].ntuples; i++)
- {
- OffsetNumber offset = offsets[curoff++];
- ItemId lp;
- HeapTupleHeader tuple;
-
- lp = PageGetItemId(page, offset);
- tuple = (HeapTupleHeader) PageGetItem(page, lp);
- heap_execute_freeze_tuple(tuple, &frz);
- }
- }
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-/*
* Given an "infobits" field from an XLog record, set the correct bits in the
* given infomask and infomask2 for the tuple touched by the record.
*
@@ -10017,14 +9772,10 @@ heap2_redo(XLogReaderState *record)
switch (info & XLOG_HEAP_OPMASK)
{
- case XLOG_HEAP2_PRUNE:
- heap_xlog_prune(record);
- break;
- case XLOG_HEAP2_VACUUM:
- heap_xlog_vacuum(record);
- break;
- case XLOG_HEAP2_FREEZE_PAGE:
- heap_xlog_freeze_page(record);
+ case XLOG_HEAP2_PRUNE_ON_ACCESS:
+ case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
+ case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
+ heap_xlog_prune_freeze(record);
break;
case XLOG_HEAP2_VISIBLE:
heap_xlog_visible(record);