aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtinsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r--src/backend/access/nbtree/nbtinsert.c147
1 files changed, 102 insertions, 45 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index f339a9cfcd1..6082af2fffe 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.3 2008/06/11 08:42:35 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.4 2010/08/29 19:33:43 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -60,9 +60,8 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
int leftfree, int rightfree,
bool newitemonleft, Size firstrightitemsz);
-static void _bt_pgaddtup(Relation rel, Page page,
- Size itemsize, IndexTuple itup,
- OffsetNumber itup_off, const char *where);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
@@ -593,7 +592,9 @@ _bt_insertonpg(Relation rel,
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
+ if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+ elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+ itup_blkno, RelationGetRelationName(rel));
MarkBufferDirty(buf);
@@ -719,6 +720,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
@@ -735,11 +738,27 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber maxoff;
OffsetNumber i;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives
+ * the right-sibling data. If we fail before reaching the critical
+ * section, origpage hasn't been modified and leftpage is only workspace.
+ * In principle we shouldn't need to worry about rightpage either,
+ * because it hasn't been linked into the btree page structure; but to
+ * avoid leaving possibly-confusing junk behind, we are careful to rewrite
+ * rightpage as zeroes before throwing any error.
+ */
origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
/* rightpage was already initialized by _bt_getbuf */
@@ -754,8 +773,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
- lopaque->btpo_next = BufferGetBlockNumber(rbuf);
- ropaque->btpo_prev = BufferGetBlockNumber(buf);
+ lopaque->btpo_next = rightpagenumber;
+ ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
@@ -778,9 +797,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the right sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the right sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
@@ -805,9 +827,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the left sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
/*
@@ -826,18 +851,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
itup_off = leftoff;
- itup_blkno = BufferGetBlockNumber(buf);
+ itup_blkno = origpagenumber;
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
itup_off = rightoff;
- itup_blkno = BufferGetBlockNumber(rbuf);
+ itup_blkno = rightpagenumber;
rightoff = OffsetNumberNext(rightoff);
}
}
@@ -845,14 +880,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
/* decide which page to put it on */
if (i < firstright)
{
- _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
@@ -862,18 +907,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
itup_off = leftoff;
- itup_blkno = BufferGetBlockNumber(buf);
+ itup_blkno = origpagenumber;
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
itup_off = rightoff;
- itup_blkno = BufferGetBlockNumber(rbuf);
+ itup_blkno = rightpagenumber;
rightoff = OffsetNumberNext(rightoff);
}
}
@@ -886,16 +941,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
* neighbors.
*/
- if (!P_RIGHTMOST(ropaque))
+ if (!P_RIGHTMOST(oopaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
- if (sopaque->btpo_prev != ropaque->btpo_prev)
- elog(PANIC, "right sibling's left-link doesn't match: "
+ if (sopaque->btpo_prev != origpagenumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "right sibling's left-link doesn't match: "
"block %u links to %u instead of expected %u in index \"%s\"",
- ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
+ oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
RelationGetRelationName(rel));
+ }
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
@@ -920,8 +978,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
- * scribbling on the original page yet, and we don't care about the new
- * sibling until it's linked into the btree.
+ * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
@@ -930,7 +987,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
if (!P_RIGHTMOST(ropaque))
{
- sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+ sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
@@ -945,9 +1002,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
xlrec.target.node = rel->rd_node;
ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
if (newitemonleft)
- xlrec.otherblk = BufferGetBlockNumber(rbuf);
+ xlrec.otherblk = rightpagenumber;
else
- xlrec.otherblk = BufferGetBlockNumber(buf);
+ xlrec.otherblk = origpagenumber;
xlrec.leftblk = lopaque->btpo_prev;
xlrec.rightblk = ropaque->btpo_next;
xlrec.level = lopaque->btpo.level;
@@ -1014,7 +1071,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
- * comment is probably obsolete.)
+ * comment is probably obsolete; but in any case it's good to not scribble
+ * on the original page until we enter the critical section.)
*
* It's a bit weird that we don't fill in the left page till after writing
* the XLOG entry, but not really worth changing. Note that we use the
@@ -1648,13 +1706,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
-static void
-_bt_pgaddtup(Relation rel,
- Page page,
+static bool
+_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
- OffsetNumber itup_off,
- const char *where)
+ OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
@@ -1669,8 +1725,9 @@ _bt_pgaddtup(Relation rel,
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to the %s in index \"%s\"",
- where, RelationGetRelationName(rel));
+ return false;
+
+ return true;
}
/*