aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r--src/backend/access/nbtree/nbtxlog.c323
1 files changed, 136 insertions, 187 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index bc376fd4094..665e60b7bd2 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -21,61 +21,6 @@
#include "miscadmin.h"
/*
- * We must keep track of expected insertions due to page splits, and apply
- * them manually if they are not seen in the WAL log during replay. This
- * makes it safe for page insertion to be a multiple-WAL-action process.
- *
- * The data structure is a simple linked list --- this should be good enough,
- * since we don't expect a page split to remain incomplete for long. In any
- * case we need to respect the order of operations.
- */
-typedef struct bt_incomplete_split
-{
- RelFileNode node; /* the index */
- bool is_root; /* we split the root */
- BlockNumber leftblk; /* left half of split */
- BlockNumber rightblk; /* right half of split */
-} bt_incomplete_split;
-
-static List *incomplete_splits;
-
-
-static void
-log_incomplete_split(RelFileNode node, BlockNumber leftblk,
- BlockNumber rightblk, bool is_root)
-{
- bt_incomplete_split *action = palloc(sizeof(bt_incomplete_split));
-
- action->node = node;
- action->is_root = is_root;
- action->leftblk = leftblk;
- action->rightblk = rightblk;
- incomplete_splits = lappend(incomplete_splits, action);
-}
-
-static void
-forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
-{
- ListCell *l;
-
- foreach(l, incomplete_splits)
- {
- bt_incomplete_split *action = (bt_incomplete_split *) lfirst(l);
-
- if (RelFileNodeEquals(node, action->node) &&
- downlink == action->rightblk)
- {
- if (is_root != action->is_root)
- elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
- action->is_root, is_root);
- incomplete_splits = list_delete_ptr(incomplete_splits, action);
- pfree(action);
- break; /* need not look further */
- }
- }
-}
-
-/*
* _bt_restore_page -- re-enter all the index tuples on a page
*
* The page is freshly init'd, and *from (length len) is a copy of what
@@ -149,23 +94,60 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
UnlockReleaseBuffer(metabuf);
}
+/*
+ * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
+ *
+ * This is a common subroutine of the redo functions of all the WAL record
+ * types that can insert a downlink: insert, split, and newroot.
+ */
+static void
+_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
+ RelFileNode rnode, BlockNumber cblock)
+{
+ Buffer buf;
+
+ buf = XLogReadBuffer(rnode, cblock, false);
+ if (BufferIsValid(buf))
+ {
+ Page page = (Page) BufferGetPage(buf);
+
+ if (lsn > PageGetLSN(page))
+ {
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
+ pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buf);
+ }
+ UnlockReleaseBuffer(buf);
+ }
+}
+
static void
btree_xlog_insert(bool isleaf, bool ismeta,
XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
Buffer buffer;
+ Buffer cbuffer = InvalidBuffer;
Page page;
char *datapos;
int datalen;
xl_btree_metadata md;
- BlockNumber downlink = 0;
+ BlockNumber cblkno = 0;
+ int main_blk_index;
datapos = (char *) xlrec + SizeOfBtreeInsert;
datalen = record->xl_len - SizeOfBtreeInsert;
- if (!isleaf)
+ /*
+ * if this insert finishes a split at lower level, extract the block
+ * number of the (left) child.
+ */
+ if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0)
{
- memcpy(&downlink, datapos, sizeof(BlockNumber));
+ memcpy(&cblkno, datapos, sizeof(BlockNumber));
+ Assert(cblkno != 0);
datapos += sizeof(BlockNumber);
datalen -= sizeof(BlockNumber);
}
@@ -176,8 +158,19 @@ btree_xlog_insert(bool isleaf, bool ismeta,
datalen -= sizeof(xl_btree_metadata);
}
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ if (!isleaf)
+ {
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ _bt_clear_incomplete_split(lsn, record, xlrec->target.node, cblkno);
+ main_blk_index = 1;
+ }
+ else
+ main_blk_index = 0;
+
+ if (record->xl_info & XLR_BKP_BLOCK(main_blk_index))
+ (void) RestoreBackupBlock(lsn, record, main_blk_index, false, false);
else
{
buffer = XLogReadBuffer(xlrec->target.node,
@@ -187,11 +180,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
{
page = (Page) BufferGetPage(buffer);
- if (lsn <= PageGetLSN(page))
- {
- UnlockReleaseBuffer(buffer);
- }
- else
+ if (lsn > PageGetLSN(page))
{
if (PageAddItem(page, (Item) datapos, datalen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
@@ -200,11 +189,14 @@ btree_xlog_insert(bool isleaf, bool ismeta,
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
}
+ UnlockReleaseBuffer(buffer);
}
}
+ if (BufferIsValid(cbuffer))
+ UnlockReleaseBuffer(cbuffer);
+
/*
* Note: in normal operation, we'd update the metapage while still holding
* lock on the page we inserted into. But during replay it's not
@@ -216,10 +208,6 @@ btree_xlog_insert(bool isleaf, bool ismeta,
_bt_restore_meta(xlrec->target.node, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
-
- /* Forget any split this insertion completes */
- if (!isleaf)
- forget_matching_split(xlrec->target.node, downlink, false);
}
static void
@@ -227,6 +215,8 @@ btree_xlog_split(bool onleft, bool isroot,
XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
+ bool isleaf = (xlrec->level == 0);
+ Buffer lbuf;
Buffer rbuf;
Page rpage;
BTPageOpaque ropaque;
@@ -237,42 +227,18 @@ btree_xlog_split(bool onleft, bool isroot,
Size newitemsz = 0;
Item left_hikey = NULL;
Size left_hikeysz = 0;
+ BlockNumber cblkno = InvalidBlockNumber;
datapos = (char *) xlrec + SizeOfBtreeSplit;
datalen = record->xl_len - SizeOfBtreeSplit;
- /* Forget any split this insertion completes */
- if (xlrec->level > 0)
- {
- /* we assume SizeOfBtreeSplit is at least 16-bit aligned */
- BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
-
- datapos += sizeof(BlockIdData);
- datalen -= sizeof(BlockIdData);
-
- forget_matching_split(xlrec->node, downlink, false);
-
- /* Extract left hikey and its size (still assuming 16-bit alignment) */
- if (!(record->xl_info & XLR_BKP_BLOCK(0)))
- {
- /* We assume 16-bit alignment is enough for IndexTupleSize */
- left_hikey = (Item) datapos;
- left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
-
- datapos += left_hikeysz;
- datalen -= left_hikeysz;
- }
- }
-
- /* Extract newitem and newitemoff, if present */
+ /* Extract newitemoff and newitem, if present */
if (onleft)
{
- /* Extract the offset (still assuming 16-bit alignment) */
memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
datapos += sizeof(OffsetNumber);
datalen -= sizeof(OffsetNumber);
}
-
if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
{
/*
@@ -286,6 +252,37 @@ btree_xlog_split(bool onleft, bool isroot,
datalen -= newitemsz;
}
+ /* Extract left hikey and its size (still assuming 16-bit alignment) */
+ if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0)))
+ {
+ left_hikey = (Item) datapos;
+ left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
+ datapos += left_hikeysz;
+ datalen -= left_hikeysz;
+ }
+ /*
+ * If this insertion finishes an incomplete split, get the block number
+ * of the child.
+ */
+ if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1)))
+ {
+ memcpy(&cblkno, datapos, sizeof(BlockNumber));
+ datapos += sizeof(BlockNumber);
+ datalen -= sizeof(BlockNumber);
+ }
+
+ /*
+ * Clear the incomplete split flag on the left sibling of the child page
+ * this is a downlink for.
+ */
+ if (!isleaf)
+ {
+ if (record->xl_info & XLR_BKP_BLOCK(2))
+ (void) RestoreBackupBlock(lsn, record, 2, false, false);
+ else
+ _bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
+ }
+
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
Assert(BufferIsValid(rbuf));
@@ -297,7 +294,7 @@ btree_xlog_split(bool onleft, bool isroot,
ropaque->btpo_prev = xlrec->leftsib;
ropaque->btpo_next = xlrec->rnext;
ropaque->btpo.level = xlrec->level;
- ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
+ ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
_bt_restore_page(rpage, datapos, datalen);
@@ -306,7 +303,7 @@ btree_xlog_split(bool onleft, bool isroot,
* On leaf level, the high key of the left page is equal to the first key
* on the right page.
*/
- if (xlrec->level == 0)
+ if (isleaf)
{
ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
@@ -321,10 +318,10 @@ btree_xlog_split(bool onleft, bool isroot,
/* Now reconstruct left (original) sibling page */
if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ lbuf = RestoreBackupBlock(lsn, record, 0, false, true);
else
{
- Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
+ lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
if (BufferIsValid(lbuf))
{
@@ -381,19 +378,21 @@ btree_xlog_split(bool onleft, bool isroot,
elog(PANIC, "failed to add high key to left page after split");
/* Fix opaque fields */
- lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
+ lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
+ if (isleaf)
+ lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = xlrec->rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
-
- UnlockReleaseBuffer(lbuf);
}
}
- /* We no longer need the right buffer */
+ /* We no longer need the buffers */
+ if (BufferIsValid(lbuf))
+ UnlockReleaseBuffer(lbuf);
UnlockReleaseBuffer(rbuf);
/*
@@ -404,32 +403,39 @@ btree_xlog_split(bool onleft, bool isroot,
* replay, because no other index update can be in progress, and readers
* will cope properly when following an obsolete left-link.
*/
- if (record->xl_info & XLR_BKP_BLOCK(1))
- (void) RestoreBackupBlock(lsn, record, 1, false, false);
- else if (xlrec->rnext != P_NONE)
+ if (xlrec->rnext != P_NONE)
{
- Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
+ /*
+ * the backup block containing right sibling is 2 or 3, depending
+ * whether this was a leaf or internal page.
+ */
+ int rnext_index = isleaf ? 2 : 3;
- if (BufferIsValid(buffer))
+ if (record->xl_info & XLR_BKP_BLOCK(rnext_index))
+ (void) RestoreBackupBlock(lsn, record, rnext_index, false, false);
+ else
{
- Page page = (Page) BufferGetPage(buffer);
+ Buffer buffer;
- if (lsn > PageGetLSN(page))
+ buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
+
+ if (BufferIsValid(buffer))
{
- BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ Page page = (Page) BufferGetPage(buffer);
- pageop->btpo_prev = xlrec->rightsib;
+ if (lsn > PageGetLSN(page))
+ {
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
+ pageop->btpo_prev = xlrec->rightsib;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
}
- UnlockReleaseBuffer(buffer);
}
}
-
- /* The job ain't done till the parent link is inserted... */
- log_incomplete_split(xlrec->node,
- xlrec->leftsib, xlrec->rightsib, isroot);
}
static void
@@ -1003,10 +1009,6 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
BTPageOpaque pageop;
- BlockNumber downlink = 0;
-
- /* Backup blocks are not used in newroot records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
Assert(BufferIsValid(buffer));
@@ -1025,14 +1027,21 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
if (record->xl_len > SizeOfBtreeNewroot)
{
IndexTuple itup;
+ BlockNumber cblkno;
_bt_restore_page(page,
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
- /* extract downlink to the right-hand split page */
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
- downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ /* extract block number of the left-hand split page */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY));
+ cblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+
+ /* Clear the incomplete-split flag in left child */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ _bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
}
PageSetLSN(page, lsn);
@@ -1042,10 +1051,6 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
_bt_restore_meta(xlrec->node, lsn,
xlrec->rootblk, xlrec->level,
xlrec->rootblk, xlrec->level);
-
- /* Check to see if this satisfies any incomplete insertions */
- if (record->xl_len > SizeOfBtreeNewroot)
- forget_matching_split(xlrec->node, downlink, true);
}
static void
@@ -1125,59 +1130,3 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
elog(PANIC, "btree_redo: unknown op code %u", info);
}
}
-
-void
-btree_xlog_startup(void)
-{
- incomplete_splits = NIL;
-}
-
-void
-btree_xlog_cleanup(void)
-{
- ListCell *l;
-
- foreach(l, incomplete_splits)
- {
- /* finish an incomplete split */
- bt_incomplete_split *action = (bt_incomplete_split *) lfirst(l);
- Buffer lbuf,
- rbuf;
- Page lpage,
- rpage;
- BTPageOpaque lpageop,
- rpageop;
- bool is_only;
- Relation reln;
-
- lbuf = XLogReadBuffer(action->node, action->leftblk, false);
- /* failure is impossible because we wrote this page earlier */
- if (!BufferIsValid(lbuf))
- elog(PANIC, "btree_xlog_cleanup: left block unfound");
- lpage = (Page) BufferGetPage(lbuf);
- lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
- rbuf = XLogReadBuffer(action->node, action->rightblk, false);
- /* failure is impossible because we wrote this page earlier */
- if (!BufferIsValid(rbuf))
- elog(PANIC, "btree_xlog_cleanup: right block unfound");
- rpage = (Page) BufferGetPage(rbuf);
- rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
-
- /* if the pages are all of their level, it's a only-page split */
- is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
-
- reln = CreateFakeRelcacheEntry(action->node);
- _bt_insert_parent(reln, lbuf, rbuf, NULL,
- action->is_root, is_only);
- FreeFakeRelcacheEntry(reln);
- }
- incomplete_splits = NIL;
-}
-
-bool
-btree_safe_restartpoint(void)
-{
- if (incomplete_splits)
- return false;
- return true;
-}