diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 175 |
1 files changed, 172 insertions, 3 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 87a0aaaa7a4..058b13b6a43 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.1 2003/02/21 00:06:21 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.2 2003/02/23 06:17:13 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -404,6 +404,171 @@ btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) } static void +btree_xlog_delete_page(bool redo, bool ismeta, + XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record); + Relation reln; + BlockNumber parent; + BlockNumber target; + BlockNumber leftsib; + BlockNumber rightsib; + Buffer buffer; + Page page; + BTPageOpaque pageop; + char *op = (redo) ? "redo" : "undo"; + + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + + parent = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + target = xlrec->deadblk; + leftsib = xlrec->leftblk; + rightsib = xlrec->rightblk; + + /* parent page */ + if (redo && !(record->xl_info & XLR_BKP_BLOCK_1)) + { + buffer = XLogReadBuffer(false, reln, parent); + if (!BufferIsValid(buffer)) + elog(PANIC, "btree_delete_page_redo: parent block unfound"); + page = (Page) BufferGetPage(buffer); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "btree_delete_page_redo: uninitialized parent page"); + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockAndReleaseBuffer(buffer); + } + else + { + OffsetNumber poffset; + + poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (poffset >= PageGetMaxOffsetNumber(page)) + { + Assert(poffset == P_FIRSTDATAKEY(pageop)); + PageIndexTupleDelete(page, poffset); + pageop->btpo_flags |= BTP_HALF_DEAD; + } + else + { + ItemId itemid; + BTItem btitem; + OffsetNumber nextoffset; + + itemid = PageGetItemId(page, poffset); + btitem = (BTItem) PageGetItem(page, itemid); + ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY); + nextoffset = OffsetNumberNext(poffset); + PageIndexTupleDelete(page, nextoffset); + } + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + + /* Fix left-link of right sibling */ + if (redo && !(record->xl_info & XLR_BKP_BLOCK_2)) + { + buffer = XLogReadBuffer(false, reln, rightsib); + if (!BufferIsValid(buffer)) + elog(PANIC, "btree_delete_page_redo: lost right sibling"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "btree_delete_page_redo: uninitialized right sibling"); + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockAndReleaseBuffer(buffer); + } + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = leftsib; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + + /* Fix right-link of left sibling, if any */ + if (redo && !(record->xl_info & XLR_BKP_BLOCK_3)) + { + if (leftsib != P_NONE) + { + buffer = XLogReadBuffer(false, reln, leftsib); + if (!BufferIsValid(buffer)) + elog(PANIC, "btree_delete_page_redo: lost left sibling"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "btree_delete_page_redo: uninitialized left sibling"); + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockAndReleaseBuffer(buffer); + } + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_next = rightsib; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + } + + /* Rewrite target page as empty deleted page */ + buffer = XLogReadBuffer(false, reln, target); + if (!BufferIsValid(buffer)) + elog(PANIC, "btree_delete_page_%s: lost target page", op); + page = (Page) BufferGetPage(buffer); + if (redo) + _bt_pageinit(page, BufferGetPageSize(buffer)); + else if (PageIsNew((PageHeader) page)) + elog(PANIC, "btree_delete_page_undo: uninitialized target page"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (redo) + { + pageop->btpo_prev = leftsib; + pageop->btpo_next = rightsib; + pageop->btpo.xact = FrozenTransactionId; + pageop->btpo_flags = BTP_DELETED; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + else + { + /* undo */ + if (XLByteLT(PageGetLSN(page), lsn)) + elog(PANIC, "btree_delete_page_undo: bad left sibling LSN"); + elog(PANIC, "btree_delete_page_undo: unimplemented"); + } + + /* Update metapage if needed */ + if (redo) /* metapage changes not undoable */ + { + if (ismeta) + { + xl_btree_metadata md; + + memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage, + sizeof(xl_btree_metadata)); + _bt_restore_meta(reln, lsn, + md.root, md.level, + md.fastroot, md.fastlevel); + } + } +} + +static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); @@ -534,8 +699,10 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record) btree_xlog_delete(true, lsn, record); break; case XLOG_BTREE_DELETE_PAGE: + btree_xlog_delete_page(true, false, lsn, record); + break; case XLOG_BTREE_DELETE_PAGE_META: - // ??? + btree_xlog_delete_page(true, true, lsn, record); break; case XLOG_BTREE_NEWROOT: btree_xlog_newroot(true, lsn, record); @@ -583,8 +750,10 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record) btree_xlog_delete(false, lsn, record); break; case XLOG_BTREE_DELETE_PAGE: + btree_xlog_delete_page(false, false, lsn, record); + break; case XLOG_BTREE_DELETE_PAGE_META: - // ??? + btree_xlog_delete_page(false, true, lsn, record); break; case XLOG_BTREE_NEWROOT: btree_xlog_newroot(false, lsn, record); |