diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 473 |
1 files changed, 112 insertions, 361 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 34bc60f625f..cc67dd813d2 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -91,9 +91,6 @@ static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, static TM_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid, TransactionId xid, LockTupleMode mode); -static int heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples, - xl_heap_freeze_plan *plans_out, - OffsetNumber *offsets_out); static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask, uint16 *new_infomask2); static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax, @@ -6746,180 +6743,19 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer, /* Now WAL-log freezing if necessary */ if (RelationNeedsWAL(rel)) { - xl_heap_freeze_plan plans[MaxHeapTuplesPerPage]; - OffsetNumber offsets[MaxHeapTuplesPerPage]; - int nplans; - xl_heap_freeze_page xlrec; - XLogRecPtr recptr; - - /* Prepare deduplicated representation for use in WAL record */ - nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets); - - xlrec.snapshotConflictHorizon = snapshotConflictHorizon; - xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel); - xlrec.nplans = nplans; - - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage); - - /* - * The freeze plan array and offset array are not actually in the - * buffer, but pretend that they are. When XLogInsert stores the - * whole buffer, the arrays need not be stored too. - */ - XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); - XLogRegisterBufData(0, (char *) plans, - nplans * sizeof(xl_heap_freeze_plan)); - XLogRegisterBufData(0, (char *) offsets, - ntuples * sizeof(OffsetNumber)); - - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE); - - PageSetLSN(page, recptr); + log_heap_prune_and_freeze(rel, buffer, snapshotConflictHorizon, + false, /* no cleanup lock required */ + PRUNE_VACUUM_SCAN, + tuples, ntuples, + NULL, 0, /* redirected */ + NULL, 0, /* dead */ + NULL, 0); /* unused */ } END_CRIT_SECTION(); } /* - * Comparator used to deduplicate XLOG_HEAP2_FREEZE_PAGE freeze plans - */ -static int -heap_log_freeze_cmp(const void *arg1, const void *arg2) -{ - HeapTupleFreeze *frz1 = (HeapTupleFreeze *) arg1; - HeapTupleFreeze *frz2 = (HeapTupleFreeze *) arg2; - - if (frz1->xmax < frz2->xmax) - return -1; - else if (frz1->xmax > frz2->xmax) - return 1; - - if (frz1->t_infomask2 < frz2->t_infomask2) - return -1; - else if (frz1->t_infomask2 > frz2->t_infomask2) - return 1; - - if (frz1->t_infomask < frz2->t_infomask) - return -1; - else if (frz1->t_infomask > frz2->t_infomask) - return 1; - - if (frz1->frzflags < frz2->frzflags) - return -1; - else if (frz1->frzflags > frz2->frzflags) - return 1; - - /* - * heap_log_freeze_eq would consider these tuple-wise plans to be equal. - * (So the tuples will share a single canonical freeze plan.) - * - * We tiebreak on page offset number to keep each freeze plan's page - * offset number array individually sorted. (Unnecessary, but be tidy.) - */ - if (frz1->offset < frz2->offset) - return -1; - else if (frz1->offset > frz2->offset) - return 1; - - Assert(false); - return 0; -} - -/* - * Compare fields that describe actions required to freeze tuple with caller's - * open plan. If everything matches then the frz tuple plan is equivalent to - * caller's plan. - */ -static inline bool -heap_log_freeze_eq(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz) -{ - if (plan->xmax == frz->xmax && - plan->t_infomask2 == frz->t_infomask2 && - plan->t_infomask == frz->t_infomask && - plan->frzflags == frz->frzflags) - return true; - - /* Caller must call heap_log_freeze_new_plan again for frz */ - return false; -} - -/* - * Start new plan initialized using tuple-level actions. At least one tuple - * will have steps required to freeze described by caller's plan during REDO. - */ -static inline void -heap_log_freeze_new_plan(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz) -{ - plan->xmax = frz->xmax; - plan->t_infomask2 = frz->t_infomask2; - plan->t_infomask = frz->t_infomask; - plan->frzflags = frz->frzflags; - plan->ntuples = 1; /* for now */ -} - -/* - * Deduplicate tuple-based freeze plans so that each distinct set of - * processing steps is only stored once in XLOG_HEAP2_FREEZE_PAGE records. - * Called during original execution of freezing (for logged relations). - * - * Return value is number of plans set in *plans_out for caller. Also writes - * an array of offset numbers into *offsets_out output argument for caller - * (actually there is one array per freeze plan, but that's not of immediate - * concern to our caller). - */ -static int -heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples, - xl_heap_freeze_plan *plans_out, - OffsetNumber *offsets_out) -{ - int nplans = 0; - - /* Sort tuple-based freeze plans in the order required to deduplicate */ - qsort(tuples, ntuples, sizeof(HeapTupleFreeze), heap_log_freeze_cmp); - - for (int i = 0; i < ntuples; i++) - { - HeapTupleFreeze *frz = tuples + i; - - if (i == 0) - { - /* New canonical freeze plan starting with first tup */ - heap_log_freeze_new_plan(plans_out, frz); - nplans++; - } - else if (heap_log_freeze_eq(plans_out, frz)) - { - /* tup matches open canonical plan -- include tup in it */ - Assert(offsets_out[i - 1] < frz->offset); - plans_out->ntuples++; - } - else - { - /* Tup doesn't match current plan -- done with it now */ - plans_out++; - - /* New canonical freeze plan starting with this tup */ - heap_log_freeze_new_plan(plans_out, frz); - nplans++; - } - - /* - * Save page offset number in dedicated buffer in passing. - * - * REDO routine relies on the record's offset numbers array grouping - * offset numbers by freeze plan. The sort order within each grouping - * is ascending offset number order, just to keep things tidy. - */ - offsets_out[i] = frz->offset; - } - - Assert(nplans > 0 && nplans <= ntuples); - - return nplans; -} - -/* * heap_freeze_tuple * Freeze tuple in place, without WAL logging. * @@ -7892,10 +7728,10 @@ heap_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate) * must have considered the original tuple header as part of * generating its own snapshotConflictHorizon value. * - * Relying on XLOG_HEAP2_PRUNE records like this is the same - * strategy that index vacuuming uses in all cases. Index VACUUM - * WAL records don't even have a snapshotConflictHorizon field of - * their own for this reason. + * Relying on XLOG_HEAP2_PRUNE_VACUUM_SCAN records like this is + * the same strategy that index vacuuming uses in all cases. Index + * VACUUM WAL records don't even have a snapshotConflictHorizon + * field of their own for this reason. */ if (!ItemIdIsNormal(lp)) break; @@ -8753,162 +8589,149 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, } /* - * Handles XLOG_HEAP2_PRUNE record type. - * - * Acquires a full cleanup lock. + * Replay XLOG_HEAP2_PRUNE_* records. */ static void -heap_xlog_prune(XLogReaderState *record) +heap_xlog_prune_freeze(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; - xl_heap_prune *xlrec = (xl_heap_prune *) XLogRecGetData(record); + char *maindataptr = XLogRecGetData(record); + xl_heap_prune xlrec; Buffer buffer; RelFileLocator rlocator; BlockNumber blkno; XLogRedoAction action; XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); + memcpy(&xlrec, maindataptr, SizeOfHeapPrune); + maindataptr += SizeOfHeapPrune; /* - * We're about to remove tuples. In Hot Standby mode, ensure that there's - * no queries running for which the removed tuples are still visible. + * We will take an ordinary exclusive lock or a cleanup lock depending on + * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive + * lock, we better not be doing anything that requires moving existing + * tuple data. */ - if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, - xlrec->isCatalogRel, - rlocator); + Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 || + (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0); /* - * If we have a full-page image, restore it (using a cleanup lock) and - * we're done. + * We are about to remove and/or freeze tuples. In Hot Standby mode, + * ensure that there are no queries running for which the removed tuples + * are still visible or which still consider the frozen xids as running. + * The conflict horizon XID comes after xl_heap_prune. */ - action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, + if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0) + { + TransactionId snapshot_conflict_horizon; + + /* memcpy() because snapshot_conflict_horizon is stored unaligned */ + memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId)); + maindataptr += sizeof(TransactionId); + + if (InHotStandby) + ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon, + (xlrec.flags & XLHP_IS_CATALOG_REL) != 0, + rlocator); + } + + /* + * If we have a full-page image, restore it and we're done. + */ + action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, + (xlrec.flags & XLHP_CLEANUP_LOCK) != 0, &buffer); if (action == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buffer); - OffsetNumber *end; OffsetNumber *redirected; OffsetNumber *nowdead; OffsetNumber *nowunused; int nredirected; int ndead; int nunused; + int nplans; Size datalen; + xlhp_freeze_plan *plans; + OffsetNumber *frz_offsets; + char *dataptr = XLogRecGetBlockData(record, 0, &datalen); - redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen); - - nredirected = xlrec->nredirected; - ndead = xlrec->ndead; - end = (OffsetNumber *) ((char *) redirected + datalen); - nowdead = redirected + (nredirected * 2); - nowunused = nowdead + ndead; - nunused = (end - nowunused); - Assert(nunused >= 0); - - /* Update all line pointers per the record, and repair fragmentation */ - heap_page_prune_execute(buffer, - redirected, nredirected, - nowdead, ndead, - nowunused, nunused); + heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags, + &nplans, &plans, &frz_offsets, + &nredirected, &redirected, + &ndead, &nowdead, + &nunused, &nowunused); /* - * Note: we don't worry about updating the page's prunability hints. - * At worst this will cause an extra prune cycle to occur soon. + * Update all line pointers per the record, and repair fragmentation + * if needed. */ + if (nredirected > 0 || ndead > 0 || nunused > 0) + heap_page_prune_execute(buffer, + (xlrec.flags & XLHP_CLEANUP_LOCK) == 0, + redirected, nredirected, + nowdead, ndead, + nowunused, nunused); + + /* Freeze tuples */ + for (int p = 0; p < nplans; p++) + { + HeapTupleFreeze frz; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } + /* + * Convert freeze plan representation from WAL record into + * per-tuple format used by heap_execute_freeze_tuple + */ + frz.xmax = plans[p].xmax; + frz.t_infomask2 = plans[p].t_infomask2; + frz.t_infomask = plans[p].t_infomask; + frz.frzflags = plans[p].frzflags; + frz.offset = InvalidOffsetNumber; /* unused, but be tidy */ - if (BufferIsValid(buffer)) - { - Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); + for (int i = 0; i < plans[p].ntuples; i++) + { + OffsetNumber offset = *(frz_offsets++); + ItemId lp; + HeapTupleHeader tuple; - UnlockReleaseBuffer(buffer); + lp = PageGetItemId(page, offset); + tuple = (HeapTupleHeader) PageGetItem(page, lp); + heap_execute_freeze_tuple(tuple, &frz); + } + } + + /* There should be no more data */ + Assert((char *) frz_offsets == dataptr + datalen); /* - * After pruning records from a page, it's useful to update the FSM - * about it, as it may cause the page become target for insertions - * later even if vacuum decides not to visit it (which is possible if - * gets marked all-visible.) - * - * Do this regardless of a full-page image being applied, since the - * FSM data is not in the page anyway. + * Note: we don't worry about updating the page's prunability hints. + * At worst this will cause an extra prune cycle to occur soon. */ - XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); - } -} - -/* - * Handles XLOG_HEAP2_VACUUM record type. - * - * Acquires an ordinary exclusive lock only. - */ -static void -heap_xlog_vacuum(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_vacuum *xlrec = (xl_heap_vacuum *) XLogRecGetData(record); - Buffer buffer; - BlockNumber blkno; - XLogRedoAction action; - - /* - * If we have a full-page image, restore it (without using a cleanup lock) - * and we're done. - */ - action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, false, - &buffer); - if (action == BLK_NEEDS_REDO) - { - Page page = (Page) BufferGetPage(buffer); - OffsetNumber *nowunused; - Size datalen; - OffsetNumber *offnum; - - nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen); - - /* Shouldn't be a record unless there's something to do */ - Assert(xlrec->nunused > 0); - - /* Update all now-unused line pointers */ - offnum = nowunused; - for (int i = 0; i < xlrec->nunused; i++) - { - OffsetNumber off = *offnum++; - ItemId lp = PageGetItemId(page, off); - - Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp)); - ItemIdSetUnused(lp); - } - - /* Attempt to truncate line pointer array now */ - PageTruncateLinePointerArray(page); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } + /* + * If we released any space or line pointers, update the free space map. + * + * Do this regardless of a full-page image being applied, since the FSM + * data is not in the page anyway. + */ if (BufferIsValid(buffer)) { - Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); - RelFileLocator rlocator; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); + if (xlrec.flags & (XLHP_HAS_REDIRECTIONS | + XLHP_HAS_DEAD_ITEMS | + XLHP_HAS_NOW_UNUSED_ITEMS)) + { + Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); - UnlockReleaseBuffer(buffer); + UnlockReleaseBuffer(buffer); - /* - * After vacuuming LP_DEAD items from a page, it's useful to update - * the FSM about it, as it may cause the page become target for - * insertions later even if vacuum decides not to visit it (which is - * possible if gets marked all-visible.) - * - * Do this regardless of a full-page image being applied, since the - * FSM data is not in the page anyway. - */ - XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); + XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); + } + else + UnlockReleaseBuffer(buffer); } } @@ -9050,74 +8873,6 @@ heap_xlog_visible(XLogReaderState *record) } /* - * Replay XLOG_HEAP2_FREEZE_PAGE records - */ -static void -heap_xlog_freeze_page(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); - Buffer buffer; - - /* - * In Hot Standby mode, ensure that there's no queries running which still - * consider the frozen xids as running. - */ - if (InHotStandby) - { - RelFileLocator rlocator; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, - xlrec->isCatalogRel, - rlocator); - } - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - Page page = BufferGetPage(buffer); - xl_heap_freeze_plan *plans; - OffsetNumber *offsets; - int curoff = 0; - - plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL); - offsets = (OffsetNumber *) ((char *) plans + - (xlrec->nplans * - sizeof(xl_heap_freeze_plan))); - for (int p = 0; p < xlrec->nplans; p++) - { - HeapTupleFreeze frz; - - /* - * Convert freeze plan representation from WAL record into - * per-tuple format used by heap_execute_freeze_tuple - */ - frz.xmax = plans[p].xmax; - frz.t_infomask2 = plans[p].t_infomask2; - frz.t_infomask = plans[p].t_infomask; - frz.frzflags = plans[p].frzflags; - frz.offset = InvalidOffsetNumber; /* unused, but be tidy */ - - for (int i = 0; i < plans[p].ntuples; i++) - { - OffsetNumber offset = offsets[curoff++]; - ItemId lp; - HeapTupleHeader tuple; - - lp = PageGetItemId(page, offset); - tuple = (HeapTupleHeader) PageGetItem(page, lp); - heap_execute_freeze_tuple(tuple, &frz); - } - } - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -/* * Given an "infobits" field from an XLog record, set the correct bits in the * given infomask and infomask2 for the tuple touched by the record. * @@ -10017,14 +9772,10 @@ heap2_redo(XLogReaderState *record) switch (info & XLOG_HEAP_OPMASK) { - case XLOG_HEAP2_PRUNE: - heap_xlog_prune(record); - break; - case XLOG_HEAP2_VACUUM: - heap_xlog_vacuum(record); - break; - case XLOG_HEAP2_FREEZE_PAGE: - heap_xlog_freeze_page(record); + case XLOG_HEAP2_PRUNE_ON_ACCESS: + case XLOG_HEAP2_PRUNE_VACUUM_SCAN: + case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP: + heap_xlog_prune_freeze(record); break; case XLOG_HEAP2_VISIBLE: heap_xlog_visible(record); |