aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/nbtree/nbtinsert.c271
-rw-r--r--src/backend/access/nbtree/nbtpage.c57
-rw-r--r--src/backend/access/nbtree/nbtree.c15
3 files changed, 282 insertions, 61 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index dc17ceab111..8f23e16992a 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -34,7 +34,9 @@ typedef struct
int best_delta; /* best size delta so far */
} FindSplitData;
-void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
+Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
+
+static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
Relation heapRel, Buffer buf,
@@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
int keysz, ScanKey scankey,
BTItem btitem,
OffsetNumber afteritem);
+static void _bt_insertuple(Relation rel, Buffer buf,
+ Size itemsz, BTItem btitem, OffsetNumber newitemoff);
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz,
BTItem newitem, bool newitemonleft,
@@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel,
if (is_root)
{
+ Buffer rootbuf;
+
Assert(stack == (BTStack) NULL);
/* create a new root node and release the split buffers */
- _bt_newroot(rel, buf, rbuf);
+ rootbuf = _bt_newroot(rel, buf, rbuf);
+ _bt_wrtbuf(rel, rootbuf);
+ _bt_wrtbuf(rel, rbuf);
+ _bt_wrtbuf(rel, buf);
}
else
{
@@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel,
}
else
{
- START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
itup_off = newitemoff;
itup_blkno = BufferGetBlockNumber(buf);
- /* XLOG stuff */
- {
- xl_btree_insert xlrec;
- uint8 flag = XLOG_BTREE_INSERT;
- XLogRecPtr recptr;
- XLogRecData rdata[2];
- BTItemData truncitem;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char*)&xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].next = &(rdata[1]);
-
- /* Read comments in _bt_pgaddtup */
- if (!(P_ISLEAF(lpageop)) && newitemoff == P_FIRSTDATAKEY(lpageop))
- {
- truncitem = *btitem;
- truncitem.bti_itup.t_info = sizeof(BTItemData);
- rdata[1].data = (char*)&truncitem;
- rdata[1].len = sizeof(BTItemData);
- }
- else
- {
- rdata[1].data = (char*)btitem;
- rdata[1].len = IndexTupleDSize(btitem->bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- }
- rdata[1].buffer = buf;
- rdata[1].next = NULL;
+ _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
- if (P_ISLEAF(lpageop))
- flag |= XLOG_BTREE_LEAF;
-
- recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
-
- PageSetLSN(page, recptr);
- PageSetSUI(page, ThisStartUpID);
- }
-
- END_CRIT_SECTION();
/* Write out the updated page and release pin/lock */
_bt_wrtbuf(rel, buf);
}
@@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel,
return res;
}
+static void
+_bt_insertuple(Relation rel, Buffer buf,
+ Size itemsz, BTItem btitem, OffsetNumber newitemoff)
+{
+ Page page = BufferGetPage(buf);
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ START_CRIT_SECTION();
+ _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
+ /* XLOG stuff */
+ {
+ xl_btree_insert xlrec;
+ uint8 flag = XLOG_BTREE_INSERT;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+ BTItemData truncitem;
+ xlrec.target.node = rel->rd_node;
+ ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char*)&xlrec;
+ rdata[0].len = SizeOfBtreeInsert;
+ rdata[0].next = &(rdata[1]);
+
+ /* Read comments in _bt_pgaddtup */
+ if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop))
+ {
+ truncitem = *btitem;
+ truncitem.bti_itup.t_info = sizeof(BTItemData);
+ rdata[1].data = (char*)&truncitem;
+ rdata[1].len = sizeof(BTItemData);
+ }
+ else
+ {
+ rdata[1].data = (char*)btitem;
+ rdata[1].len = IndexTupleDSize(btitem->bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ }
+ rdata[1].buffer = buf;
+ rdata[1].next = NULL;
+ if (P_ISLEAF(pageop))
+ flag |= XLOG_BTREE_LEAF;
+
+ recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetSUI(page, ThisStartUpID);
+ }
+
+ END_CRIT_SECTION();
+}
+
/*
* _bt_split() -- split a page in the btree.
*
@@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack)
* graph.
*
* On entry, lbuf (the old root) and rbuf (its new peer) are write-
- * locked. On exit, a new root page exists with entries for the
- * two new children. The new root page is neither pinned nor locked, and
- * we have also written out lbuf and rbuf and dropped their pins/locks.
+ * locked. On exit, a new root page exists with entries for the
+ * two new children, metapage is updated and unlocked/unpinned.
+ * The new root buffer is returned to caller which has to unlock/unpin
+ * lbuf, rbuf & rootbuf.
*/
-void
+static Buffer
_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
{
Buffer rootbuf;
@@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
}
END_CRIT_SECTION();
- /* write and let go of the new root buffer */
- _bt_wrtbuf(rel, rootbuf);
+ /* write and let go of metapage buffer */
_bt_wrtbuf(rel, metabuf);
- /* update and release new sibling, and finally the old root */
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtbuf(rel, lbuf);
+ return(rootbuf);
+}
+
+/*
+ * In the event old root page was splitted but no new one was created we
+ * build required parent levels keeping write lock on old root page.
+ * Note: it's assumed that old root page' btpo_parent points to meta page,
+ * ie not to parent page. On exit, new root page buffer is write locked.
+ * If "release" is TRUE then oldrootbuf will be released immediately
+ * after upper level is builded.
+ */
+Buffer
+_bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
+{
+ Buffer rootbuf;
+ BlockNumber rootblk;
+ Page rootpage;
+ XLogRecPtr rootLSN;
+ Page oldrootpage = BufferGetPage(oldrootbuf);
+ BTPageOpaque oldrootopaque = (BTPageOpaque)
+ PageGetSpecialPointer(oldrootpage);
+ Buffer buf, leftbuf, rightbuf;
+ Page page, leftpage, rightpage;
+ BTPageOpaque opaque, leftopaque, rightopaque;
+ OffsetNumber newitemoff;
+ BTItem btitem, ritem;
+ Size itemsz;
+
+ if (! P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque))
+ elog(ERROR, "bt_fixroot: not valid old root page");
+
+ /* Read right neighbor and create new root page*/
+ leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE);
+ leftpage = BufferGetPage(leftbuf);
+ leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
+ rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf);
+ rootpage = BufferGetPage(rootbuf);
+ rootLSN = PageGetLSN(rootpage);
+ rootblk = BufferGetBlockNumber(rootbuf);
+
+ /*
+ * Update LSN & StartUpID of old root buffer and its neighbor to
+ * ensure that they will be written on disk after logging new
+ * root creation. Unfortunately, for the moment (?) we do not
+ * log this operation and so possibly break our rule to log entire
+ * page content of first after checkpoint modification.
+ */
+ HOLD_INTERRUPTS();
+ oldrootopaque->btpo_parent = rootblk;
+ leftopaque->btpo_parent = rootblk;
+ PageSetLSN(oldrootpage, rootLSN);
+ PageSetSUI(oldrootpage, ThisStartUpID);
+ PageSetLSN(leftpage, rootLSN);
+ PageSetSUI(leftpage, ThisStartUpID);
+ RESUME_INTERRUPTS();
+
+ /* parent page where to insert pointers */
+ buf = rootbuf;
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * Now read other pages (if any) on level and add them to new root.
+ * If concurrent process will split one of pages on this level then it
+ * will notice either btpo_parent == metablock or btpo_parent == rootblk.
+ * In first case it will give up its locks and try to lock leftmost page
+ * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
+ * continue. In second case it will try to lock rootbuf keeping its locks
+ * on buffers we already passed, also waiting for us. If we'll have to
+ * unlock rootbuf (split it) and that process will have to split page
+ * of new level we created (level of rootbuf) then it will wait while
+ * we create upper level. Etc.
+ */
+ while(! P_RIGHTMOST(leftopaque))
+ {
+ rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE);
+ rightpage = BufferGetPage(rightbuf);
+ rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
+
+ /* Update LSN & StartUpID (see comments above) */
+ HOLD_INTERRUPTS();
+ rightopaque->btpo_parent = rootblk;
+ if (XLByteLT(PageGetLSN(rightpage), rootLSN))
+ PageSetLSN(rightpage, rootLSN);
+ PageSetSUI(rightpage, ThisStartUpID);
+ RESUME_INTERRUPTS();
+
+ ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY));
+ btitem = _bt_formitem(&(ritem->bti_itup));
+ ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY);
+ itemsz = IndexTupleDSize(btitem->bti_itup)
+ + (sizeof(BTItemData) - sizeof(IndexTupleData));
+ itemsz = MAXALIGN(itemsz);
+
+ newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ if (PageGetFreeSpace(page) < itemsz)
+ {
+ Buffer newbuf;
+ OffsetNumber firstright;
+ OffsetNumber itup_off;
+ BlockNumber itup_blkno;
+ bool newitemonleft;
+
+ firstright = _bt_findsplitloc(rel, page,
+ newitemoff, itemsz, &newitemonleft);
+ newbuf = _bt_split(rel, buf, firstright,
+ newitemoff, itemsz, btitem, newitemonleft,
+ &itup_off, &itup_blkno);
+ /* Keep lock on new "root" buffer ! */
+ if (buf != rootbuf)
+ _bt_relbuf(rel, buf, BT_WRITE);
+ buf = newbuf;
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ }
+ else
+ _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
+
+ /* give up left buffer */
+ _bt_relbuf(rel, leftbuf, BT_WRITE);
+ leftbuf = rightbuf;
+ leftpage = rightpage;
+ leftopaque = rightopaque;
+ }
+
+ /* give up rightmost page buffer */
+ _bt_relbuf(rel, leftbuf, BT_WRITE);
+
+ /*
+ * Here we hold locks on old root buffer, new root buffer we've
+ * created with _bt_newroot() - rootbuf, - and buf we've used
+ * for last insert ops - buf. If rootbuf != buf then we have to
+ * create at least one more level. And if "release" is TRUE
+ * (ie we've already created some levels) then we give up
+ * oldrootbuf.
+ */
+ if (release)
+ _bt_relbuf(rel, oldrootbuf, BT_WRITE);
+
+ if (rootbuf != buf)
+ {
+ _bt_relbuf(rel, buf, BT_WRITE);
+ return(_bt_fixroot(rel, rootbuf, true));
+ }
+
+ return(rootbuf);
}
/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index ad9d69e13d3..0f68a066dc7 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.47 2001/01/24 19:42:48 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.48 2001/01/26 01:24:31 vadim Exp $
*
* NOTES
* Postgres btree pages look like ordinary relation pages. The opaque
@@ -28,6 +28,8 @@
#include "miscadmin.h"
#include "storage/lmgr.h"
+extern bool FixBTree; /* comments in nbtree.c */
+extern Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
/*
* We use high-concurrency locking on btrees. There are two cases in
@@ -237,7 +239,58 @@ _bt_getroot(Relation rel, int access)
if (! P_ISROOT(rootopaque))
{
- /* it happened, try again */
+ /*
+ * It happened, but if root page splitter failed to create
+ * new root page then we'll go in loop trying to call
+ * _bt_getroot again and again.
+ */
+ if (FixBTree)
+ {
+ Buffer newrootbuf;
+
+check_parent:;
+ if (rootopaque->btpo_parent == BTREE_METAPAGE) /* unupdated! */
+ {
+ LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(rootbuf, BT_WRITE);
+
+ /* handle concurrent fix of root page */
+ if (rootopaque->btpo_parent == BTREE_METAPAGE) /* unupdated! */
+ {
+ newrootbuf = _bt_fixroot(rel, rootbuf, true);
+ LockBuffer(newrootbuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(newrootbuf, BT_READ);
+ rootbuf = newrootbuf;
+ rootpage = BufferGetPage(rootbuf);
+ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
+ /* New root might be splitted while changing lock */
+ if (P_ISROOT(rootopaque))
+ return(rootbuf);
+ /* rootbuf is read locked */
+ goto check_parent;
+ }
+ else /* someone else already fixed root */
+ {
+ LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(rootbuf, BT_READ);
+ }
+ }
+ /*
+ * Ok, here we have old root page with btpo_parent pointing
+ * to upper level - check parent page because of there is
+ * good chance that parent is root page.
+ */
+ newrootbuf = _bt_getbuf(rel, rootopaque->btpo_parent, BT_READ);
+ _bt_relbuf(rel, rootbuf, BT_READ);
+ rootbuf = newrootbuf;
+ rootpage = BufferGetPage(rootbuf);
+ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
+ if (P_ISROOT(rootopaque))
+ return(rootbuf);
+ /* no luck -:( */
+ }
+
+ /* try again */
_bt_relbuf(rel, rootbuf, BT_READ);
return _bt_getroot(rel, access);
}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 7b2f7fa7d98..8685975edf5 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.75 2001/01/24 19:42:48 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.76 2001/01/26 01:24:31 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -26,13 +26,18 @@
#include "executor/executor.h"
#include "miscadmin.h"
#include "storage/sinval.h"
-
+#include "access/xlogutils.h"
bool BuildingBtree = false; /* see comment in btbuild() */
-bool FastBuild = true; /* use sort/build instead of insertion
- * build */
+bool FastBuild = true; /* use sort/build instead */
+ /* of insertion build */
-#include "access/xlogutils.h"
+
+/*
+ * TEMPORARY FLAG FOR TESTING NEW FIX TREE
+ * CODE WITHOUT AFFECTING ANYONE ELSE
+ */
+bool FixBTree = false;
static void _bt_restscan(IndexScanDesc scan);