diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 147 |
1 files changed, 102 insertions, 45 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index f339a9cfcd1..6082af2fffe 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.3 2008/06/11 08:42:35 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.4 2010/08/29 19:33:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -60,9 +60,8 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, int leftfree, int rightfree, bool newitemonleft, Size firstrightitemsz); -static void _bt_pgaddtup(Relation rel, Page page, - Size itemsize, IndexTuple itup, - OffsetNumber itup_off, const char *where); +static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static void _bt_vacuum_one_page(Relation rel, Buffer buffer); @@ -593,7 +592,9 @@ _bt_insertonpg(Relation rel, /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page"); + if (!_bt_pgaddtup(page, itemsz, itup, newitemoff)) + elog(PANIC, "failed to add new item to block %u in index \"%s\"", + itup_blkno, RelationGetRelationName(rel)); MarkBufferDirty(buf); @@ -719,6 +720,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, Page origpage; Page leftpage, rightpage; + BlockNumber origpagenumber, + rightpagenumber; BTPageOpaque ropaque, lopaque, oopaque; @@ -735,11 +738,27 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; + /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + + /* + * origpage is the original page to be split. leftpage is a temporary + * buffer that receives the left-sibling data, which will be copied back + * into origpage on success. rightpage is the new page that receives + * the right-sibling data. If we fail before reaching the critical + * section, origpage hasn't been modified and leftpage is only workspace. + * In principle we shouldn't need to worry about rightpage either, + * because it hasn't been linked into the btree page structure; but to + * avoid leaving possibly-confusing junk behind, we are careful to rewrite + * rightpage as zeroes before throwing any error. + */ origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); rightpage = BufferGetPage(rbuf); + origpagenumber = BufferGetBlockNumber(buf); + rightpagenumber = BufferGetBlockNumber(rbuf); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* rightpage was already initialized by _bt_getbuf */ @@ -754,8 +773,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = rightpagenumber; + ropaque->btpo_prev = origpagenumber; ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* Since we already have write-lock on both pages, ok to read cycleid */ @@ -778,9 +797,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the right sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the right sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -805,9 +827,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the left sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the left sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); /* @@ -826,18 +851,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = leftoff; - itup_blkno = BufferGetBlockNumber(buf); + itup_blkno = origpagenumber; leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = rightoff; - itup_blkno = BufferGetBlockNumber(rbuf); + itup_blkno = rightpagenumber; rightoff = OffsetNumberNext(rightoff); } } @@ -845,14 +880,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* decide which page to put it on */ if (i < firstright) { - _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -862,18 +907,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = leftoff; - itup_blkno = BufferGetBlockNumber(buf); + itup_blkno = origpagenumber; leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = rightoff; - itup_blkno = BufferGetBlockNumber(rbuf); + itup_blkno = rightpagenumber; rightoff = OffsetNumberNext(rightoff); } } @@ -886,16 +941,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * neighbors. */ - if (!P_RIGHTMOST(ropaque)) + if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - if (sopaque->btpo_prev != ropaque->btpo_prev) - elog(PANIC, "right sibling's left-link doesn't match: " + if (sopaque->btpo_prev != origpagenumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "right sibling's left-link doesn't match: " "block %u links to %u instead of expected %u in index \"%s\"", - ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev, + oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, RelationGetRelationName(rel)); + } /* * Check to see if we can set the SPLIT_END flag in the right-hand @@ -920,8 +978,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * NO EREPORT(ERROR) till right sibling is updated. We can get away with * not starting the critical section till here because we haven't been - * scribbling on the original page yet, and we don't care about the new - * sibling until it's linked into the btree. + * scribbling on the original page yet; see comments above. */ START_CRIT_SECTION(); @@ -930,7 +987,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, if (!P_RIGHTMOST(ropaque)) { - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + sopaque->btpo_prev = rightpagenumber; MarkBufferDirty(sbuf); } @@ -945,9 +1002,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, xlrec.target.node = rel->rd_node; ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); if (newitemonleft) - xlrec.otherblk = BufferGetBlockNumber(rbuf); + xlrec.otherblk = rightpagenumber; else - xlrec.otherblk = BufferGetBlockNumber(buf); + xlrec.otherblk = origpagenumber; xlrec.leftblk = lopaque->btpo_prev; xlrec.rightblk = ropaque->btpo_next; xlrec.level = lopaque->btpo.level; @@ -1014,7 +1071,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * (in the page management code) that the center of a page always be * clean, and the most efficient way to guarantee this is just to compact * the data by reinserting it into a new left page. (XXX the latter - * comment is probably obsolete.) + * comment is probably obsolete; but in any case it's good to not scribble + * on the original page until we enter the critical section.) * * It's a bit weird that we don't fill in the left page till after writing * the XLOG entry, but not really worth changing. Note that we use the @@ -1648,13 +1706,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * we insert the tuples in order, so that the given itup_off does * represent the final position of the tuple! */ -static void -_bt_pgaddtup(Relation rel, - Page page, +static bool +_bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, - OffsetNumber itup_off, - const char *where) + OffsetNumber itup_off) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); IndexTupleData trunctuple; @@ -1669,8 +1725,9 @@ _bt_pgaddtup(Relation rel, if (PageAddItem(page, (Item) itup, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to the %s in index \"%s\"", - where, RelationGetRelationName(rel)); + return false; + + return true; } /* |