diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 268 |
1 files changed, 256 insertions, 12 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 2e5202c2d6e..99d0914e724 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -22,6 +22,9 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "storage/procarray.h" +#include "utils/memutils.h" + +static MemoryContext opCtx; /* working memory for operations */ /* * _bt_restore_page -- re-enter all the index tuples on a page @@ -111,6 +114,7 @@ _bt_restore_meta(XLogReaderState *record, uint8 block_id) Assert(md->btm_version >= BTREE_NOVAC_VERSION); md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact; md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples; + md->btm_allequalimage = xlrec->allequalimage; pageop = (BTPageOpaque) PageGetSpecialPointer(metapg); pageop->btpo_flags = BTP_META; @@ -156,7 +160,8 @@ _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id) } static void -btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record) +btree_xlog_insert(bool isleaf, bool ismeta, bool posting, + XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record); @@ -181,9 +186,52 @@ btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record) page = BufferGetPage(buffer); - if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, - false, false) == InvalidOffsetNumber) - elog(PANIC, "btree_xlog_insert: failed to add item"); + if (!posting) + { + /* Simple retail insertion */ + if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add new item"); + } + else + { + ItemId itemid; + IndexTuple oposting, + newitem, + nposting; + uint16 postingoff; + + /* + * A posting list split occurred during leaf page insertion. WAL + * record data will start with an offset number representing the + * point in an existing posting list that a split occurs at. + * + * Use _bt_swap_posting() to repeat posting list split steps from + * primary. Note that newitem from WAL record is 'orignewitem', + * not the final version of newitem that is actually inserted on + * page. + */ + postingoff = *((uint16 *) datapos); + datapos += sizeof(uint16); + datalen -= sizeof(uint16); + + itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum)); + oposting = (IndexTuple) PageGetItem(page, itemid); + + /* Use mutable, aligned newitem copy in _bt_swap_posting() */ + Assert(isleaf && postingoff > 0); + newitem = CopyIndexTuple((IndexTuple) datapos); + nposting = _bt_swap_posting(newitem, oposting, postingoff); + + /* Replace existing posting list with post-split version */ + memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting))); + + /* Insert "final" new item (not orignewitem from WAL stream) */ + Assert(IndexTupleSize(newitem) == datalen); + if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add posting split new item"); + } PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -265,20 +313,38 @@ btree_xlog_split(bool onleft, XLogReaderState *record) BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); OffsetNumber off; IndexTuple newitem = NULL, - left_hikey = NULL; + left_hikey = NULL, + nposting = NULL; Size newitemsz = 0, left_hikeysz = 0; Page newlpage; - OffsetNumber leftoff; + OffsetNumber leftoff, + replacepostingoff = InvalidOffsetNumber; datapos = XLogRecGetBlockData(record, 0, &datalen); - if (onleft) + if (onleft || xlrec->postingoff != 0) { newitem = (IndexTuple) datapos; newitemsz = MAXALIGN(IndexTupleSize(newitem)); datapos += newitemsz; datalen -= newitemsz; + + if (xlrec->postingoff != 0) + { + ItemId itemid; + IndexTuple oposting; + + /* Posting list must be at offset number before new item's */ + replacepostingoff = OffsetNumberPrev(xlrec->newitemoff); + + /* Use mutable, aligned newitem copy in _bt_swap_posting() */ + newitem = CopyIndexTuple(newitem); + itemid = PageGetItemId(lpage, replacepostingoff); + oposting = (IndexTuple) PageGetItem(lpage, itemid); + nposting = _bt_swap_posting(newitem, oposting, + xlrec->postingoff); + } } /* @@ -308,8 +374,20 @@ btree_xlog_split(bool onleft, XLogReaderState *record) Size itemsz; IndexTuple item; + /* Add replacement posting list when required */ + if (off == replacepostingoff) + { + Assert(onleft || xlrec->firstright == xlrec->newitemoff); + if (PageAddItem(newlpage, (Item) nposting, + MAXALIGN(IndexTupleSize(nposting)), leftoff, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add new posting list item to left page after split"); + leftoff = OffsetNumberNext(leftoff); + continue; /* don't insert oposting */ + } + /* add the new item if it was inserted on left page */ - if (onleft && off == xlrec->newitemoff) + else if (onleft && off == xlrec->newitemoff) { if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber) @@ -384,6 +462,98 @@ btree_xlog_split(bool onleft, XLogReaderState *record) } static void +btree_xlog_dedup(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record); + Buffer buf; + + if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO) + { + char *ptr = XLogRecGetBlockData(record, 0, NULL); + Page page = (Page) BufferGetPage(buf); + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + OffsetNumber offnum, + minoff, + maxoff; + BTDedupState state; + BTDedupInterval *intervals; + Page newpage; + + state = (BTDedupState) palloc(sizeof(BTDedupStateData)); + state->deduplicate = true; /* unused */ + /* Conservatively use larger maxpostingsize than primary */ + state->maxpostingsize = BTMaxItemSize(page); + state->base = NULL; + state->baseoff = InvalidOffsetNumber; + state->basetupsize = 0; + state->htids = palloc(state->maxpostingsize); + state->nhtids = 0; + state->nitems = 0; + state->phystupsize = 0; + state->nintervals = 0; + + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + newpage = PageGetTempPageCopySpecial(page); + + if (!P_RIGHTMOST(opaque)) + { + ItemId itemid = PageGetItemId(page, P_HIKEY); + Size itemsz = ItemIdGetLength(itemid); + IndexTuple item = (IndexTuple) PageGetItem(page, itemid); + + if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(ERROR, "deduplication failed to add highkey"); + } + + intervals = (BTDedupInterval *) ptr; + for (offnum = minoff; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemid = PageGetItemId(page, offnum); + IndexTuple itup = (IndexTuple) PageGetItem(page, itemid); + + if (offnum == minoff) + _bt_dedup_start_pending(state, itup, offnum); + else if (state->nintervals < xlrec->nintervals && + state->baseoff == intervals[state->nintervals].baseoff && + state->nitems < intervals[state->nintervals].nitems) + { + if (!_bt_dedup_save_htid(state, itup)) + elog(ERROR, "deduplication failed to add heap tid to pending posting list"); + } + else + { + _bt_dedup_finish_pending(newpage, state); + _bt_dedup_start_pending(state, itup, offnum); + } + } + + _bt_dedup_finish_pending(newpage, state); + Assert(state->nintervals == xlrec->nintervals); + Assert(memcmp(state->intervals, intervals, + state->nintervals * sizeof(BTDedupInterval)) == 0); + + if (P_HAS_GARBAGE(opaque)) + { + BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(newpage); + + nopaque->btpo_flags &= ~BTP_HAS_GARBAGE; + } + + PageRestoreTempPage(newpage, page); + PageSetLSN(page, lsn); + MarkBufferDirty(buf); + } + + if (BufferIsValid(buf)) + UnlockReleaseBuffer(buf); +} + +static void btree_xlog_vacuum(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; @@ -405,7 +575,56 @@ btree_xlog_vacuum(XLogReaderState *record) page = (Page) BufferGetPage(buffer); - PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted); + if (xlrec->nupdated > 0) + { + OffsetNumber *updatedoffsets; + xl_btree_update *updates; + + updatedoffsets = (OffsetNumber *) + (ptr + xlrec->ndeleted * sizeof(OffsetNumber)); + updates = (xl_btree_update *) ((char *) updatedoffsets + + xlrec->nupdated * + sizeof(OffsetNumber)); + + for (int i = 0; i < xlrec->nupdated; i++) + { + BTVacuumPosting vacposting; + IndexTuple origtuple; + ItemId itemid; + Size itemsz; + + itemid = PageGetItemId(page, updatedoffsets[i]); + origtuple = (IndexTuple) PageGetItem(page, itemid); + + vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) + + updates->ndeletedtids * sizeof(uint16)); + vacposting->updatedoffset = updatedoffsets[i]; + vacposting->itup = origtuple; + vacposting->ndeletedtids = updates->ndeletedtids; + memcpy(vacposting->deletetids, + (char *) updates + SizeOfBtreeUpdate, + updates->ndeletedtids * sizeof(uint16)); + + _bt_update_posting(vacposting); + + /* Overwrite updated version of tuple */ + itemsz = MAXALIGN(IndexTupleSize(vacposting->itup)); + if (!PageIndexTupleOverwrite(page, updatedoffsets[i], + (Item) vacposting->itup, itemsz)) + elog(PANIC, "failed to update partially dead item"); + + pfree(vacposting->itup); + pfree(vacposting); + + /* advance to next xl_btree_update from array */ + updates = (xl_btree_update *) + ((char *) updates + SizeOfBtreeUpdate + + updates->ndeletedtids * sizeof(uint16)); + } + } + + if (xlrec->ndeleted > 0) + PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted); /* * Mark the page as not containing any LP_DEAD items --- see comments @@ -724,17 +943,19 @@ void btree_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + MemoryContext oldCtx; + oldCtx = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_BTREE_INSERT_LEAF: - btree_xlog_insert(true, false, record); + btree_xlog_insert(true, false, false, record); break; case XLOG_BTREE_INSERT_UPPER: - btree_xlog_insert(false, false, record); + btree_xlog_insert(false, false, false, record); break; case XLOG_BTREE_INSERT_META: - btree_xlog_insert(false, true, record); + btree_xlog_insert(false, true, false, record); break; case XLOG_BTREE_SPLIT_L: btree_xlog_split(true, record); @@ -742,6 +963,12 @@ btree_redo(XLogReaderState *record) case XLOG_BTREE_SPLIT_R: btree_xlog_split(false, record); break; + case XLOG_BTREE_INSERT_POST: + btree_xlog_insert(true, false, true, record); + break; + case XLOG_BTREE_DEDUP: + btree_xlog_dedup(record); + break; case XLOG_BTREE_VACUUM: btree_xlog_vacuum(record); break; @@ -767,6 +994,23 @@ btree_redo(XLogReaderState *record) default: elog(PANIC, "btree_redo: unknown op code %u", info); } + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(opCtx); +} + +void +btree_xlog_startup(void) +{ + opCtx = AllocSetContextCreate(CurrentMemoryContext, + "Btree recovery temporary context", + ALLOCSET_DEFAULT_SIZES); +} + +void +btree_xlog_cleanup(void) +{ + MemoryContextDelete(opCtx); + opCtx = NULL; } /* |