aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r--src/backend/access/nbtree/nbtxlog.c268
1 files changed, 256 insertions, 12 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 2e5202c2d6e..99d0914e724 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -22,6 +22,9 @@
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "storage/procarray.h"
+#include "utils/memutils.h"
+
+static MemoryContext opCtx; /* working memory for operations */
/*
* _bt_restore_page -- re-enter all the index tuples on a page
@@ -111,6 +114,7 @@ _bt_restore_meta(XLogReaderState *record, uint8 block_id)
Assert(md->btm_version >= BTREE_NOVAC_VERSION);
md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
+ md->btm_allequalimage = xlrec->allequalimage;
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
@@ -156,7 +160,8 @@ _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
}
static void
-btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
+btree_xlog_insert(bool isleaf, bool ismeta, bool posting,
+ XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
@@ -181,9 +186,52 @@ btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
page = BufferGetPage(buffer);
- if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
- false, false) == InvalidOffsetNumber)
- elog(PANIC, "btree_xlog_insert: failed to add item");
+ if (!posting)
+ {
+ /* Simple retail insertion */
+ if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add new item");
+ }
+ else
+ {
+ ItemId itemid;
+ IndexTuple oposting,
+ newitem,
+ nposting;
+ uint16 postingoff;
+
+ /*
+ * A posting list split occurred during leaf page insertion. WAL
+ * record data will start with an offset number representing the
+ * point in an existing posting list that a split occurs at.
+ *
+ * Use _bt_swap_posting() to repeat posting list split steps from
+ * primary. Note that newitem from WAL record is 'orignewitem',
+ * not the final version of newitem that is actually inserted on
+ * page.
+ */
+ postingoff = *((uint16 *) datapos);
+ datapos += sizeof(uint16);
+ datalen -= sizeof(uint16);
+
+ itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum));
+ oposting = (IndexTuple) PageGetItem(page, itemid);
+
+ /* Use mutable, aligned newitem copy in _bt_swap_posting() */
+ Assert(isleaf && postingoff > 0);
+ newitem = CopyIndexTuple((IndexTuple) datapos);
+ nposting = _bt_swap_posting(newitem, oposting, postingoff);
+
+ /* Replace existing posting list with post-split version */
+ memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting)));
+
+ /* Insert "final" new item (not orignewitem from WAL stream) */
+ Assert(IndexTupleSize(newitem) == datalen);
+ if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add posting split new item");
+ }
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
@@ -265,20 +313,38 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
IndexTuple newitem = NULL,
- left_hikey = NULL;
+ left_hikey = NULL,
+ nposting = NULL;
Size newitemsz = 0,
left_hikeysz = 0;
Page newlpage;
- OffsetNumber leftoff;
+ OffsetNumber leftoff,
+ replacepostingoff = InvalidOffsetNumber;
datapos = XLogRecGetBlockData(record, 0, &datalen);
- if (onleft)
+ if (onleft || xlrec->postingoff != 0)
{
newitem = (IndexTuple) datapos;
newitemsz = MAXALIGN(IndexTupleSize(newitem));
datapos += newitemsz;
datalen -= newitemsz;
+
+ if (xlrec->postingoff != 0)
+ {
+ ItemId itemid;
+ IndexTuple oposting;
+
+ /* Posting list must be at offset number before new item's */
+ replacepostingoff = OffsetNumberPrev(xlrec->newitemoff);
+
+ /* Use mutable, aligned newitem copy in _bt_swap_posting() */
+ newitem = CopyIndexTuple(newitem);
+ itemid = PageGetItemId(lpage, replacepostingoff);
+ oposting = (IndexTuple) PageGetItem(lpage, itemid);
+ nposting = _bt_swap_posting(newitem, oposting,
+ xlrec->postingoff);
+ }
}
/*
@@ -308,8 +374,20 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
Size itemsz;
IndexTuple item;
+ /* Add replacement posting list when required */
+ if (off == replacepostingoff)
+ {
+ Assert(onleft || xlrec->firstright == xlrec->newitemoff);
+ if (PageAddItem(newlpage, (Item) nposting,
+ MAXALIGN(IndexTupleSize(nposting)), leftoff,
+ false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add new posting list item to left page after split");
+ leftoff = OffsetNumberNext(leftoff);
+ continue; /* don't insert oposting */
+ }
+
/* add the new item if it was inserted on left page */
- if (onleft && off == xlrec->newitemoff)
+ else if (onleft && off == xlrec->newitemoff)
{
if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
@@ -384,6 +462,98 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
}
static void
+btree_xlog_dedup(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record);
+ Buffer buf;
+
+ if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
+ {
+ char *ptr = XLogRecGetBlockData(record, 0, NULL);
+ Page page = (Page) BufferGetPage(buf);
+ BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ OffsetNumber offnum,
+ minoff,
+ maxoff;
+ BTDedupState state;
+ BTDedupInterval *intervals;
+ Page newpage;
+
+ state = (BTDedupState) palloc(sizeof(BTDedupStateData));
+ state->deduplicate = true; /* unused */
+ /* Conservatively use larger maxpostingsize than primary */
+ state->maxpostingsize = BTMaxItemSize(page);
+ state->base = NULL;
+ state->baseoff = InvalidOffsetNumber;
+ state->basetupsize = 0;
+ state->htids = palloc(state->maxpostingsize);
+ state->nhtids = 0;
+ state->nitems = 0;
+ state->phystupsize = 0;
+ state->nintervals = 0;
+
+ minoff = P_FIRSTDATAKEY(opaque);
+ maxoff = PageGetMaxOffsetNumber(page);
+ newpage = PageGetTempPageCopySpecial(page);
+
+ if (!P_RIGHTMOST(opaque))
+ {
+ ItemId itemid = PageGetItemId(page, P_HIKEY);
+ Size itemsz = ItemIdGetLength(itemid);
+ IndexTuple item = (IndexTuple) PageGetItem(page, itemid);
+
+ if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY,
+ false, false) == InvalidOffsetNumber)
+ elog(ERROR, "deduplication failed to add highkey");
+ }
+
+ intervals = (BTDedupInterval *) ptr;
+ for (offnum = minoff;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemid = PageGetItemId(page, offnum);
+ IndexTuple itup = (IndexTuple) PageGetItem(page, itemid);
+
+ if (offnum == minoff)
+ _bt_dedup_start_pending(state, itup, offnum);
+ else if (state->nintervals < xlrec->nintervals &&
+ state->baseoff == intervals[state->nintervals].baseoff &&
+ state->nitems < intervals[state->nintervals].nitems)
+ {
+ if (!_bt_dedup_save_htid(state, itup))
+ elog(ERROR, "deduplication failed to add heap tid to pending posting list");
+ }
+ else
+ {
+ _bt_dedup_finish_pending(newpage, state);
+ _bt_dedup_start_pending(state, itup, offnum);
+ }
+ }
+
+ _bt_dedup_finish_pending(newpage, state);
+ Assert(state->nintervals == xlrec->nintervals);
+ Assert(memcmp(state->intervals, intervals,
+ state->nintervals * sizeof(BTDedupInterval)) == 0);
+
+ if (P_HAS_GARBAGE(opaque))
+ {
+ BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
+
+ nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
+ }
+
+ PageRestoreTempPage(newpage, page);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buf);
+ }
+
+ if (BufferIsValid(buf))
+ UnlockReleaseBuffer(buf);
+}
+
+static void
btree_xlog_vacuum(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
@@ -405,7 +575,56 @@ btree_xlog_vacuum(XLogReaderState *record)
page = (Page) BufferGetPage(buffer);
- PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
+ if (xlrec->nupdated > 0)
+ {
+ OffsetNumber *updatedoffsets;
+ xl_btree_update *updates;
+
+ updatedoffsets = (OffsetNumber *)
+ (ptr + xlrec->ndeleted * sizeof(OffsetNumber));
+ updates = (xl_btree_update *) ((char *) updatedoffsets +
+ xlrec->nupdated *
+ sizeof(OffsetNumber));
+
+ for (int i = 0; i < xlrec->nupdated; i++)
+ {
+ BTVacuumPosting vacposting;
+ IndexTuple origtuple;
+ ItemId itemid;
+ Size itemsz;
+
+ itemid = PageGetItemId(page, updatedoffsets[i]);
+ origtuple = (IndexTuple) PageGetItem(page, itemid);
+
+ vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) +
+ updates->ndeletedtids * sizeof(uint16));
+ vacposting->updatedoffset = updatedoffsets[i];
+ vacposting->itup = origtuple;
+ vacposting->ndeletedtids = updates->ndeletedtids;
+ memcpy(vacposting->deletetids,
+ (char *) updates + SizeOfBtreeUpdate,
+ updates->ndeletedtids * sizeof(uint16));
+
+ _bt_update_posting(vacposting);
+
+ /* Overwrite updated version of tuple */
+ itemsz = MAXALIGN(IndexTupleSize(vacposting->itup));
+ if (!PageIndexTupleOverwrite(page, updatedoffsets[i],
+ (Item) vacposting->itup, itemsz))
+ elog(PANIC, "failed to update partially dead item");
+
+ pfree(vacposting->itup);
+ pfree(vacposting);
+
+ /* advance to next xl_btree_update from array */
+ updates = (xl_btree_update *)
+ ((char *) updates + SizeOfBtreeUpdate +
+ updates->ndeletedtids * sizeof(uint16));
+ }
+ }
+
+ if (xlrec->ndeleted > 0)
+ PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
/*
* Mark the page as not containing any LP_DEAD items --- see comments
@@ -724,17 +943,19 @@ void
btree_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ MemoryContext oldCtx;
+ oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_BTREE_INSERT_LEAF:
- btree_xlog_insert(true, false, record);
+ btree_xlog_insert(true, false, false, record);
break;
case XLOG_BTREE_INSERT_UPPER:
- btree_xlog_insert(false, false, record);
+ btree_xlog_insert(false, false, false, record);
break;
case XLOG_BTREE_INSERT_META:
- btree_xlog_insert(false, true, record);
+ btree_xlog_insert(false, true, false, record);
break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, record);
@@ -742,6 +963,12 @@ btree_redo(XLogReaderState *record)
case XLOG_BTREE_SPLIT_R:
btree_xlog_split(false, record);
break;
+ case XLOG_BTREE_INSERT_POST:
+ btree_xlog_insert(true, false, true, record);
+ break;
+ case XLOG_BTREE_DEDUP:
+ btree_xlog_dedup(record);
+ break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record);
break;
@@ -767,6 +994,23 @@ btree_redo(XLogReaderState *record)
default:
elog(PANIC, "btree_redo: unknown op code %u", info);
}
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextReset(opCtx);
+}
+
+void
+btree_xlog_startup(void)
+{
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Btree recovery temporary context",
+ ALLOCSET_DEFAULT_SIZES);
+}
+
+void
+btree_xlog_cleanup(void)
+{
+ MemoryContextDelete(opCtx);
+ opCtx = NULL;
}
/*