diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 447 |
1 files changed, 380 insertions, 67 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 743583bccab..534927717fc 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.12 1997/04/16 01:48:11 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.13 1997/05/30 18:35:31 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -28,13 +28,14 @@ #endif static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem); -static Buffer _bt_split(Relation rel, Buffer buf); +static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit); static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem); static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); +static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey); /* * _bt_doinsert() -- Handle insertion of a single btitem in the tree. @@ -225,31 +226,152 @@ _bt_insertonpg(Relation rel, Buffer rbuf; Buffer pbuf; Page rpage; - ScanKey newskey; BTItem ritem; + BTPageOpaque lpageop; BTPageOpaque rpageop; BlockNumber rbknum, itup_blkno; OffsetNumber itup_off; int itemsz; - InsertIndexResult newres; - BTItem new_item = (BTItem) NULL; - BTItem lowLeftItem; - OffsetNumber leftmost_offset; Page ppage; BTPageOpaque ppageop; - BlockNumber bknum; page = BufferGetPage(buf); + lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + itemsz = IndexTupleDSize(btitem->bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = DOUBLEALIGN(itemsz); /* be safe, PageAddItem will do this but we need to be consistent */ + /* + * If we have to insert item on the leftmost page which is the first + * page in the chain of duplicates then: + * 1. if scankey == hikey (i.e. - new duplicate item) then + * insert it here; + * 2. if scankey < hikey then we grab new page, copy current page + * content there and insert new item on the current page. + */ + if ( lpageop->btpo_flags & BTP_CHAIN ) + { + OffsetNumber maxoff = PageGetMaxOffsetNumber (page); + ItemId hitemid; + BTItem hitem; + + Assert ( !P_RIGHTMOST(lpageop) ); + hitemid = PageGetItemId(page, P_HIKEY); + hitem = (BTItem) PageGetItem(page, hitemid); + if ( maxoff > P_HIKEY && + !_bt_itemcmp (rel, keysz, hitem, + (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), + BTEqualStrategyNumber) ) + elog (FATAL, "btree: bad key on the page in the chain of duplicates"); + + if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, + BTEqualStrategyNumber) ) + { + if ( !P_LEFTMOST(lpageop) ) + elog (FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates"); + if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, + BTLessStrategyNumber) ) + elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); + return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem)); + } + } + - if (PageGetFreeSpace(page) < itemsz) { + if (PageGetFreeSpace(page) < itemsz) + { + BlockNumber bknum = BufferGetBlockNumber(buf); + BTItem lowLeftItem; + BTItem hiRightItem = NULL; + + /* + * If we have to split leaf page in the chain of duplicates + * then we try to move righter to avoid splitting. + */ + if ( ( lpageop->btpo_flags & BTP_CHAIN ) && + ( lpageop->btpo_flags & BTP_LEAF ) ) + { + bool use_left = true; + + for ( ; ; ) + { + bool keys_equal = false; + + rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + if ( P_RIGHTMOST (rpageop) ) + { + Assert ( !( rpageop->btpo_flags & BTP_CHAIN ) ); + use_left = false; + break; + } + /* + * If we have the same hikey here then it's + * yet another page in chain and we may move + * even righter. + */ + if ( _bt_skeycmp (rel, keysz, scankey, rpage, + PageGetItemId(rpage, P_HIKEY), + BTEqualStrategyNumber) ) + { + if ( !( rpageop->btpo_flags & BTP_CHAIN ) ) + elog (FATAL, "btree: lost page in the chain of duplicates"); + keys_equal = true; + } + else if ( _bt_skeycmp (rel, keysz, scankey, rpage, + PageGetItemId(rpage, P_HIKEY), + BTGreaterStrategyNumber) ) + elog (FATAL, "btree: hikey is out of order"); + /* + * If hikey > scankey and BTP_CHAIN is ON + * then it's first page of the chain of higher keys: + * our left sibling hikey was lying! We can't add new + * item here, but we can turn BTP_CHAIN off on our + * left page and overwrite its hikey. + */ + if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) ) + { + BTItem tmp; + + lpageop->btpo_flags &= ~BTP_CHAIN; + tmp = (BTItem) PageGetItem(rpage, + PageGetItemId(rpage, P_HIKEY)); + hiRightItem = _bt_formitem(&(tmp->bti_itup)); + break; + } + /* + * if there is room here or hikey > scankey (so it's our + * last page in the chain and we can't move righter) + * we have to use this page . + */ + if ( PageGetFreeSpace (rpage) > itemsz || !keys_equal ) + { + use_left = false; + break; + } + /* try to move righter */ + _bt_relbuf(rel, buf, BT_WRITE); + buf = rbuf; + page = rpage; + lpageop = rpageop; + } + if ( !use_left ) /* insert on the right page */ + { + _bt_relbuf(rel, buf, BT_WRITE); + return ( _bt_insertonpg(rel, rbuf, stack, keysz, + scankey, btitem, afteritem) ); + } + _bt_relbuf(rel, rbuf, BT_WRITE); + bknum = BufferGetBlockNumber(buf); + } /* split the buffer into left and right halves */ - rbuf = _bt_split(rel, buf); + rbuf = _bt_split(rel, buf, hiRightItem); + + if ( hiRightItem != (BTItem) NULL ) + pfree (hiRightItem); /* which new page (left half or right half) gets the tuple? */ if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) { @@ -264,6 +386,14 @@ _bt_insertonpg(Relation rel, itup_blkno = BufferGetBlockNumber(rbuf); } + lowLeftItem = (BTItem) PageGetItem(page, + PageGetItemId(page, P_FIRSTKEY)); + + if ( _bt_itemcmp (rel, keysz, lowLeftItem, + (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), + BTEqualStrategyNumber) ) + lpageop->btpo_flags |= BTP_CHAIN; + /* * By here, * @@ -287,6 +417,11 @@ _bt_insertonpg(Relation rel, _bt_relbuf(rel, rbuf, BT_WRITE); } else { + ScanKey newskey; + InsertIndexResult newres; + BTItem new_item; + OffsetNumber upditem_offset = P_HIKEY; + bool do_update = false; /* form a index tuple that points at the new right page */ rbknum = BufferGetBlockNumber(rbuf); @@ -294,27 +429,43 @@ _bt_insertonpg(Relation rel, rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); /* - * By convention, the first entry (0) on every + * By convention, the first entry (1) on every * non-rightmost page is the high key for that page. In * order to get the lowest key on the new right page, we - * actually look at its second (1) entry. + * actually look at its second (2) entry. */ - if (! P_RIGHTMOST(rpageop)) { + if (! P_RIGHTMOST(rpageop)) + { ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_FIRSTKEY)); - } else { + if ( _bt_itemcmp (rel, keysz, ritem, + (BTItem) PageGetItem(rpage, + PageGetItemId(rpage, P_HIKEY)), + BTEqualStrategyNumber) ) + rpageop->btpo_flags |= BTP_CHAIN; + } + else ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY)); - } /* get a unique btitem for this key */ new_item = _bt_formitem(&(ritem->bti_itup)); ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); - /* find the parent buffer */ + /* + * Find the parent buffer and get the parent page. + * + * Oops - if we were moved right then we need to + * change stack item! We want to find parent pointing to + * where we are, right ? - vadim 05/27/97 + */ + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); /* * If the key of new_item is < than the key of the item @@ -330,29 +481,59 @@ _bt_insertonpg(Relation rel, * key spills over to our new right page, we get an * inconsistency if we don't update the left key in the * parent page. + * + * Also, new duplicates handling code require us to update + * parent item if some smaller items left on the left page + * (which is possible in splitting leftmost page) and + * current parent item == new_item. - vadim 05/27/97 */ - - if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item, - BTGreaterStrategyNumber)) { - ppageop = (BTPageOpaque) PageGetSpecialPointer(page); - Assert (P_LEFTMOST(ppageop)); - lowLeftItem = - (BTItem) PageGetItem(page, - PageGetItemId(page, P_FIRSTKEY)); - - /* this method does not work--_bt_updateitem tries to */ - /* overwrite an entry with another entry that might be */ - /* bigger. if lowLeftItem is bigger, it corrupts the */ - /* parent page. instead, we have to delete the original */ - /* leftmost item from the parent, and insert the new one */ - /* with a regular _bt_insertonpg (it could cause a split */ - /* because it's bigger than what was there before). */ - /* --djm 8/21/96 */ - + if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item, + BTGreaterStrategyNumber) || + ( _bt_itemcmp(rel, keysz, stack->bts_btitem, + new_item, BTEqualStrategyNumber) && + _bt_itemcmp(rel, keysz, lowLeftItem, + new_item, BTLessStrategyNumber) ) ) + { + do_update = true; /* - * but it works for items with the same size and so why don't - * use it for them ? - vadim 12/05/96 + * figure out which key is leftmost (if the parent page + * is rightmost, too, it must be the root) */ + if(P_RIGHTMOST(ppageop)) + upditem_offset = P_HIKEY; + else + upditem_offset = P_FIRSTKEY; + if ( !P_LEFTMOST(lpageop) || + stack->bts_offset != upditem_offset ) + elog (FATAL, "btree: items are out of order"); + } + /* + * There was bug caused by deletion all minimum keys (K1) from + * an index page and insertion there (up to page splitting) + * higher duplicate keys (K2): after it parent item for left + * page contained K1 and the next item (for new right page) - K2, + * - and scan for the key = K2 lost items on the left page. + * So, we have to update parent item if its key < minimum + * key on the left and minimum keys on the left and on the right + * are equal. It would be nice to update hikey on the previous + * page of the left one too, but we may get deadlock here + * (read comments in _bt_split), so we leave previous page + * hikey _inconsistent_, but there should to be BTP_CHAIN flag + * on it, which privents _bt_moveright from dangerous movings + * from there. - vadim 05/27/97 + */ + else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, + lowLeftItem, BTLessStrategyNumber) && + _bt_itemcmp (rel, keysz, new_item, + lowLeftItem, BTEqualStrategyNumber) ) + { + do_update = true; + upditem_offset = stack->bts_offset; + } + + if ( do_update ) + { + /* Try to update in place. */ if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) == DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) { @@ -363,33 +544,16 @@ _bt_insertonpg(Relation rel, } else { - /* get the parent page */ - ppage = BufferGetPage(pbuf); - ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); - - /* - * figure out which key is leftmost (if the parent page - * is rightmost, too, it must be the root) - */ - if(P_RIGHTMOST(ppageop)) { - leftmost_offset = P_HIKEY; - } else { - leftmost_offset = P_FIRSTKEY; - } - PageIndexTupleDelete(ppage, leftmost_offset); + PageIndexTupleDelete(ppage, upditem_offset); /* * don't write anything out yet--we still have the write * lock, and now we call another _bt_insertonpg to - * insert the correct leftmost key + * insert the correct key. + * First, make a new item, using the tuple data from + * lowLeftItem. Point it to the left child. + * Update it on the stack at the same time. */ - - /* - * make a new leftmost item, using the tuple data from - * lowLeftItem. point it to the left child. - * update it on the stack at the same time. - */ - bknum = BufferGetBlockNumber(buf); pfree(stack->bts_btitem); stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup)); ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), @@ -400,9 +564,10 @@ _bt_insertonpg(Relation rel, _bt_relbuf(rel, rbuf, BT_WRITE); /* - * a regular _bt_binsrch should find the right place to - * put the new entry, since it should be lower than any - * other key on the page, therefore set afteritem to NULL + * A regular _bt_binsrch should find the right place to + * put the new entry, since it should be either lower + * than any other key on the page or unique. + * Therefore set afteritem to NULL. */ newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup)); newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, @@ -458,7 +623,7 @@ _bt_insertonpg(Relation rel, * pin and lock on buf are maintained. */ static Buffer -_bt_split(Relation rel, Buffer buf) +_bt_split(Relation rel, Buffer buf, BTItem hiRightItem) { Buffer rbuf; Page origpage; @@ -492,6 +657,7 @@ _bt_split(Relation rel, Buffer buf) /* if we're splitting this page, it won't be the root when we're done */ oopaque->btpo_flags &= ~BTP_ROOT; + oopaque->btpo_flags &= ~BTP_CHAIN; lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; ropaque->btpo_prev = BufferGetBlockNumber(buf); @@ -516,10 +682,23 @@ _bt_split(Relation rel, Buffer buf) /* splitting a non-rightmost page, start at the first data item */ start = P_FIRSTKEY; - /* copy the original high key to the new page */ - itemid = PageGetItemId(origpage, P_HIKEY); - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); + /* + * Copy the original high key to the new page if high key + * was not passed by caller. + */ + if ( hiRightItem == NULL ) + { + itemid = PageGetItemId(origpage, P_HIKEY); + itemsz = ItemIdGetLength(itemid); + item = (BTItem) PageGetItem(origpage, itemid); + } + else + { + item = hiRightItem; + itemsz = IndexTupleDSize(hiRightItem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = DOUBLEALIGN(itemsz); + } (void) PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED); rightoff = P_FIRSTKEY; } else { @@ -744,7 +923,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) itemsz = ItemIdGetLength(itemid); item = (BTItem) PageGetItem(lpage, itemid); new_item = _bt_formitem(&(item->bti_itup)); - ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_FIRSTKEY); + ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); /* * insert the left page pointer into the new root page. the root @@ -1098,3 +1277,137 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, /* by here, the keys are equal */ return (true); } + +/* + * _bt_shift - insert btitem on the passed page after shifting page + * to the right in the tree. + * + * NOTE: tested for shifting leftmost page only, having btitem < hikey. + */ +static InsertIndexResult +_bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, + ScanKey scankey, BTItem btitem, BTItem hikey) +{ + InsertIndexResult res; + int itemsz; + Page page; + BlockNumber bknum; + BTPageOpaque pageop; + Buffer rbuf; + Page rpage; + BTPageOpaque rpageop; + Buffer pbuf; + Page ppage; + BTPageOpaque ppageop; + Buffer nbuf; + Page npage; + BTPageOpaque npageop; + BlockNumber nbknum; + BTItem nitem; + OffsetNumber afteroff; + + btitem = _bt_formitem(&(btitem->bti_itup)); + hikey = _bt_formitem(&(hikey->bti_itup)); + + page = BufferGetPage(buf); + + /* grab new page */ + nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + nbknum = BufferGetBlockNumber(nbuf); + npage = BufferGetPage(nbuf); + _bt_pageinit(npage, BufferGetPageSize(nbuf)); + npageop = (BTPageOpaque) PageGetSpecialPointer(npage); + + /* copy content of the passed page */ + memmove ((char *) npage, (char *) page, BufferGetPageSize(buf)); + + /* re-init old (passed) page */ + _bt_pageinit(page, BufferGetPageSize(buf)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + /* init old page opaque */ + pageop->btpo_flags = npageop->btpo_flags; /* restore flags */ + pageop->btpo_flags &= ~BTP_CHAIN; + if ( _bt_itemcmp (rel, keysz, hikey, btitem, BTEqualStrategyNumber) ) + pageop->btpo_flags |= BTP_CHAIN; + pageop->btpo_prev = npageop->btpo_prev; /* restore prev */ + pageop->btpo_next = nbknum; /* next points to the new page */ + + /* init shifted page opaque */ + npageop->btpo_prev = bknum = BufferGetBlockNumber(buf); + + /* shifted page is ok, populate old page */ + + /* add passed hikey */ + itemsz = IndexTupleDSize(hikey->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = DOUBLEALIGN(itemsz); + (void) PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED); + pfree (hikey); + + /* add btitem */ + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = DOUBLEALIGN(itemsz); + (void) PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED); + pfree (btitem); + nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); + btitem = _bt_formitem(&(nitem->bti_itup)); + ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY); + + /* ok, write them out */ + _bt_wrtnorelbuf(rel, nbuf); + _bt_wrtnorelbuf(rel, buf); + + /* fix btpo_prev on right sibling of old page */ + if ( !P_RIGHTMOST (npageop) ) + { + rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + rpageop->btpo_prev = nbknum; + _bt_wrtbuf(rel, rbuf); + } + + /* get parent pointing to the old page */ + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); + + _bt_relbuf(rel, nbuf, BT_WRITE); + _bt_relbuf(rel, buf, BT_WRITE); + + /* re-set parent' pointer - we shifted our page to the right ! */ + nitem = (BTItem) PageGetItem (ppage, + PageGetItemId (ppage, stack->bts_offset)); + ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY); + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY); + _bt_wrtnorelbuf(rel, pbuf); + + /* + * Now we want insert into the parent pointer to our old page. It has to + * be inserted before the pointer to new page. You may get problems here + * (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I don't + * know. It works if old page is leftmost (nitem is NULL) and + * btitem < hikey and it's all what we need currently. - vadim 05/30/97 + */ + nitem = NULL; + afteroff = P_FIRSTKEY; + if ( !P_RIGHTMOST (ppageop) ) + afteroff = OffsetNumberNext (afteroff); + if ( stack->bts_offset >= afteroff ) + { + afteroff = OffsetNumberPrev (stack->bts_offset); + nitem = (BTItem) PageGetItem (ppage, PageGetItemId (ppage, afteroff)); + nitem = _bt_formitem(&(nitem->bti_itup)); + } + res = _bt_insertonpg(rel, pbuf, stack->bts_parent, + keysz, scankey, btitem, nitem); + pfree (btitem); + + ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY); + + return (res); +} |