diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-08-13 15:39:08 +0300 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-09-02 15:10:28 +0300 |
commit | f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2 (patch) | |
tree | 92a7aa95ce72fbac48325761761821f8d7a742ed /src/backend/access/spgist/spgxlog.c | |
parent | 26f8b99b248aae989e63ca0969a746f30b0c8c21 (diff) | |
download | postgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.tar.gz postgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.zip |
Refactor per-page logic common to all redo routines to a new function.
Every redo routine uses the same idiom to determine what to do to a page:
check if there's a backup block for it, and if not read, the buffer if the
block exists, and check its LSN. Refactor that into a common function,
XLogReadBufferForRedo, making all the redo routines shorter and more
readable.
This has no user-visible effect, and makes no changes to the WAL format.
Reviewed by Andres Freund, Alvaro Herrera, Michael Paquier.
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 883 |
1 files changed, 401 insertions, 482 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index e4f7fbb53a5..911a14828e3 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -113,6 +113,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) SpGistLeafTupleData leafTupleHdr; Buffer buffer; Page page; + XLogRedoAction action; ptr += sizeof(spgxlogAddLeaf); leafTuple = ptr; @@ -124,82 +125,78 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) * simultaneously; but in WAL replay it should be safe to update the leaf * page before updating the parent. */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); + if (xldata->newPage) + { + buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } else + action = XLogReadBufferForRedo(lsn, record, 0, + xldata->node, xldata->blknoLeaf, + &buffer); + + if (action == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, - xldata->newPage); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); + page = BufferGetPage(buffer); - if (xldata->newPage) - SpGistInitBuffer(buffer, - SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + /* insert new tuple */ + if (xldata->offnumLeaf != xldata->offnumHeadLeaf) + { + /* normal cases, tuple was added by SpGistPageAddNewItem */ + addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf); - if (lsn > PageGetLSN(page)) + /* update head tuple's chain link if needed */ + if (xldata->offnumHeadLeaf != InvalidOffsetNumber) { - /* insert new tuple */ - if (xldata->offnumLeaf != xldata->offnumHeadLeaf) - { - /* normal cases, tuple was added by SpGistPageAddNewItem */ - addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, - xldata->offnumLeaf); - - /* update head tuple's chain link if needed */ - if (xldata->offnumHeadLeaf != InvalidOffsetNumber) - { - SpGistLeafTuple head; - - head = (SpGistLeafTuple) PageGetItem(page, - PageGetItemId(page, xldata->offnumHeadLeaf)); - Assert(head->nextOffset == leafTupleHdr.nextOffset); - head->nextOffset = xldata->offnumLeaf; - } - } - else - { - /* replacing a DEAD tuple */ - PageIndexTupleDelete(page, xldata->offnumLeaf); - if (PageAddItem(page, - (Item) leafTuple, leafTupleHdr.size, - xldata->offnumLeaf, false, false) != xldata->offnumLeaf) - elog(ERROR, "failed to add item of size %u to SPGiST index page", - leafTupleHdr.size); - } + SpGistLeafTuple head; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); + head = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumHeadLeaf)); + Assert(head->nextOffset == leafTupleHdr.nextOffset); + head->nextOffset = xldata->offnumLeaf; } - UnlockReleaseBuffer(buffer); } + else + { + /* replacing a DEAD tuple */ + PageIndexTupleDelete(page, xldata->offnumLeaf); + if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf, false, false) != xldata->offnumLeaf) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + leafTupleHdr.size); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* update parent downlink if necessary */ - if (record->xl_info & XLR_BKP_BLOCK(1)) - (void) RestoreBackupBlock(lsn, record, 1, false, false); - else if (xldata->blknoParent != InvalidBlockNumber) + if (xldata->blknoParent != InvalidBlockNumber) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); - if (BufferIsValid(buffer)) + if (XLogReadBufferForRedo(lsn, record, 1, + xldata->node, xldata->blknoParent, + &buffer) == BLK_NEEDS_REDO) { + SpGistInnerTuple tuple; + page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - SpGistInnerTuple tuple; - tuple = (SpGistInnerTuple) PageGetItem(page, + tuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoLeaf, xldata->offnumLeaf); + spgUpdateNodeLink(tuple, xldata->nodeI, + xldata->blknoLeaf, xldata->offnumLeaf); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } } @@ -214,6 +211,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) int nInsert; Buffer buffer; Page page; + XLogRedoAction action; fillFakeState(&state, xldata->stateSrc); @@ -234,98 +232,82 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) */ /* Insert tuples on the dest page (do first, so redirect is valid) */ - if (record->xl_info & XLR_BKP_BLOCK(1)) - (void) RestoreBackupBlock(lsn, record, 1, false, false); - else + if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, - xldata->newPage); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - - if (xldata->newPage) - SpGistInitBuffer(buffer, + buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true); + SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(lsn, record, 1, + xldata->node, xldata->blknoDst, + &buffer); + if (action == BLK_NEEDS_REDO) + { + int i; - if (lsn > PageGetLSN(page)) - { - int i; - - for (i = 0; i < nInsert; i++) - { - char *leafTuple; - SpGistLeafTupleData leafTupleHdr; - - /* - * the tuples are not aligned, so must copy to access - * the size field. - */ - leafTuple = ptr; - memcpy(&leafTupleHdr, leafTuple, - sizeof(SpGistLeafTupleData)); - - addOrReplaceTuple(page, (Item) leafTuple, - leafTupleHdr.size, toInsert[i]); - ptr += leafTupleHdr.size; - } + page = BufferGetPage(buffer); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + for (i = 0; i < nInsert; i++) + { + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + + /* + * the tuples are not aligned, so must copy to access the size + * field. + */ + leafTuple = ptr; + memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + + addOrReplaceTuple(page, (Item) leafTuple, + leafTupleHdr.size, toInsert[i]); + ptr += leafTupleHdr.size; } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* Delete tuples from the source page, inserting a redirection pointer */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, + page = BufferGetPage(buffer); + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, - SPGIST_PLACEHOLDER, - xldata->blknoDst, - toInsert[nInsert - 1]); + SPGIST_PLACEHOLDER, + xldata->blknoDst, + toInsert[nInsert - 1]); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* And update the parent downlink */ - if (record->xl_info & XLR_BKP_BLOCK(2)) - (void) RestoreBackupBlock(lsn, record, 2, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - SpGistInnerTuple tuple; + SpGistInnerTuple tuple; + + page = BufferGetPage(buffer); - tuple = (SpGistInnerTuple) PageGetItem(page, + tuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoDst, toInsert[nInsert - 1]); + spgUpdateNodeLink(tuple, xldata->nodeI, + xldata->blknoDst, toInsert[nInsert - 1]); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -339,6 +321,7 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; int bbi; + XLogRedoAction action; ptr += sizeof(spgxlogAddNode); innerTuple = ptr; @@ -351,29 +334,21 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) { /* update in place */ Assert(xldata->blknoParent == InvalidBlockNumber); - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - PageIndexTupleDelete(page, xldata->offnum); - if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, - xldata->offnum, - false, false) != xldata->offnum) - elog(ERROR, "failed to add item of size %u to SPGiST index page", - innerTupleHdr.size); - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnum, false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + innerTupleHdr.size); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } else { @@ -390,90 +365,79 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) Assert(xldata->blkno != xldata->blknoNew); /* Install new tuple first so redirect is valid */ - if (record->xl_info & XLR_BKP_BLOCK(1)) - (void) RestoreBackupBlock(lsn, record, 1, false, false); + if (xldata->newPage) + { + buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true); + /* AddNode is not used for nulls pages */ + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } else + action = XLogReadBufferForRedo(lsn, record, 1, + xldata->node, xldata->blknoNew, + &buffer); + if (action == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, - xldata->newPage); - if (BufferIsValid(buffer)) + page = BufferGetPage(buffer); + + addOrReplaceTuple(page, (Item) innerTuple, + innerTupleHdr.size, xldata->offnumNew); + + /* + * If parent is in this same page, don't advance LSN; doing so + * would fool us into not applying the parent downlink update + * below. We'll update the LSN when we fix the parent downlink. + */ + if (xldata->blknoParent != xldata->blknoNew) { - page = BufferGetPage(buffer); - - /* AddNode is not used for nulls pages */ - if (xldata->newPage) - SpGistInitBuffer(buffer, 0); - - if (lsn > PageGetLSN(page)) - { - addOrReplaceTuple(page, (Item) innerTuple, - innerTupleHdr.size, xldata->offnumNew); - - /* - * If parent is in this same page, don't advance LSN; - * doing so would fool us into not applying the parent - * downlink update below. We'll update the LSN when we - * fix the parent downlink. - */ - if (xldata->blknoParent != xldata->blknoNew) - { - PageSetLSN(page, lsn); - } - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); } + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* Delete old tuple, replacing it with redirect or placeholder tuple */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (BufferIsValid(buffer)) + SpGistDeadTuple dt; + + page = BufferGetPage(buffer); + + if (state.isBuild) + dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + else + dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, + xldata->blknoNew, + xldata->offnumNew); + + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum, + false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + dt->size); + + if (state.isBuild) + SpGistPageGetOpaque(page)->nPlaceholder++; + else + SpGistPageGetOpaque(page)->nRedirection++; + + /* + * If parent is in this same page, don't advance LSN; doing so + * would fool us into not applying the parent downlink update + * below. We'll update the LSN when we fix the parent downlink. + */ + if (xldata->blknoParent != xldata->blkno) { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - SpGistDeadTuple dt; - - if (state.isBuild) - dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, - InvalidBlockNumber, - InvalidOffsetNumber); - else - dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, - xldata->blknoNew, - xldata->offnumNew); - - PageIndexTupleDelete(page, xldata->offnum); - if (PageAddItem(page, (Item) dt, dt->size, - xldata->offnum, - false, false) != xldata->offnum) - elog(ERROR, "failed to add item of size %u to SPGiST index page", - dt->size); - - if (state.isBuild) - SpGistPageGetOpaque(page)->nPlaceholder++; - else - SpGistPageGetOpaque(page)->nRedirection++; - - /* - * If parent is in this same page, don't advance LSN; - * doing so would fool us into not applying the parent - * downlink update below. We'll update the LSN when we - * fix the parent downlink. - */ - if (xldata->blknoParent != xldata->blkno) - { - PageSetLSN(page, lsn); - } - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); } + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* * Update parent downlink. Since parent could be in either of the @@ -491,29 +455,32 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) { if (bbi == 2) /* else we already did it */ (void) RestoreBackupBlock(lsn, record, bbi, false, false); + action = BLK_RESTORED; + buffer = InvalidBuffer; } else { - buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - SpGistInnerTuple innerTuple; + action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, + xldata->blknoParent, &buffer); + Assert(action != BLK_RESTORED); + } + if (action == BLK_NEEDS_REDO) + { + SpGistInnerTuple innerTuple; + + page = BufferGetPage(buffer); - innerTuple = (SpGistInnerTuple) PageGetItem(page, + innerTuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(innerTuple, xldata->nodeI, - xldata->blknoNew, xldata->offnumNew); + spgUpdateNodeLink(innerTuple, xldata->nodeI, + xldata->blknoNew, xldata->offnumNew); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } } @@ -545,60 +512,56 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) */ /* insert postfix tuple first to avoid dangling link */ - if (record->xl_info & XLR_BKP_BLOCK(1)) - (void) RestoreBackupBlock(lsn, record, 1, false, false); - else if (xldata->blknoPostfix != xldata->blknoPrefix) + if (xldata->blknoPostfix != xldata->blknoPrefix) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, - xldata->newPage); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); + XLogRedoAction action; + if (xldata->newPage) + { + buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true); /* SplitTuple is not used for nulls pages */ - if (xldata->newPage) - SpGistInitBuffer(buffer, 0); + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(lsn, record, 1, + xldata->node, xldata->blknoPostfix, + &buffer); - if (lsn > PageGetLSN(page)) - { - addOrReplaceTuple(page, (Item) postfixTuple, - postfixTupleHdr.size, xldata->offnumPostfix); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } /* now handle the original page */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - PageIndexTupleDelete(page, xldata->offnumPrefix); - if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, + page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnumPrefix); + if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, xldata->offnumPrefix, false, false) != xldata->offnumPrefix) - elog(ERROR, "failed to add item of size %u to SPGiST index page", - prefixTupleHdr.size); + elog(ERROR, "failed to add item of size %u to SPGiST index page", + prefixTupleHdr.size); - if (xldata->blknoPostfix == xldata->blknoPrefix) - addOrReplaceTuple(page, (Item) postfixTuple, - postfixTupleHdr.size, - xldata->offnumPostfix); + if (xldata->blknoPostfix == xldata->blknoPrefix) + addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size, + xldata->offnumPostfix); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -616,9 +579,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) Buffer destBuffer; Page srcPage; Page destPage; + Buffer innerBuffer; Page page; int bbi; int i; + XLogRedoAction action; fillFakeState(&state, xldata->stateSrc); @@ -668,46 +633,37 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * inserting leaf tuples and the new inner tuple, else the added * redirect tuple will be a dangling link.) */ - if (record->xl_info & XLR_BKP_BLOCK(bbi)) + if (XLogReadBufferForRedo(lsn, record, bbi, + xldata->node, xldata->blknoSrc, + &srcBuffer) == BLK_NEEDS_REDO) { - srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true); - srcPage = NULL; /* don't need to do any page updates */ + srcPage = BufferGetPage(srcBuffer); + + /* + * We have it a bit easier here than in doPickSplit(), because we + * know the inner tuple's location already, so we can inject the + * correct redirection tuple now. + */ + if (!state.isBuild) + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + xldata->blknoInner, + xldata->offnumInner); + else + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_PLACEHOLDER, + SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* don't update LSN etc till we're done with it */ } else { - srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false); - if (BufferIsValid(srcBuffer)) - { - srcPage = BufferGetPage(srcBuffer); - if (lsn > PageGetLSN(srcPage)) - { - /* - * We have it a bit easier here than in doPickSplit(), - * because we know the inner tuple's location already, so - * we can inject the correct redirection tuple now. - */ - if (!state.isBuild) - spgPageIndexMultiDelete(&state, srcPage, - toDelete, xldata->nDelete, - SPGIST_REDIRECT, - SPGIST_PLACEHOLDER, - xldata->blknoInner, - xldata->offnumInner); - else - spgPageIndexMultiDelete(&state, srcPage, - toDelete, xldata->nDelete, - SPGIST_PLACEHOLDER, - SPGIST_PLACEHOLDER, - InvalidBlockNumber, - InvalidOffsetNumber); - - /* don't update LSN etc till we're done with it */ - } - else - srcPage = NULL; /* don't do any page updates */ - } - else - srcPage = NULL; + srcPage = NULL; /* don't do any page updates */ } bbi++; } @@ -735,22 +691,15 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * We could probably release the page lock immediately in the * full-page-image case, but for safety let's hold it till later. */ - if (record->xl_info & XLR_BKP_BLOCK(bbi)) + if (XLogReadBufferForRedo(lsn, record, bbi, + xldata->node, xldata->blknoDest, + &destBuffer) == BLK_NEEDS_REDO) { - destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true); - destPage = NULL; /* don't need to do any page updates */ + destPage = (Page) BufferGetPage(destBuffer); } else { - destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false); - if (BufferIsValid(destBuffer)) - { - destPage = (Page) BufferGetPage(destBuffer); - if (lsn <= PageGetLSN(destPage)) - destPage = NULL; /* don't do any page updates */ - } - else - destPage = NULL; + destPage = NULL; /* don't do any page updates */ } bbi++; } @@ -787,43 +736,40 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } /* restore new inner tuple */ - if (record->xl_info & XLR_BKP_BLOCK(bbi)) - (void) RestoreBackupBlock(lsn, record, bbi, false, false); - else + if (xldata->initInner) { - Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner, - xldata->initInner); - - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); + innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true); + SpGistInitBuffer(innerBuffer, + (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, + xldata->blknoInner, &innerBuffer); - if (xldata->initInner) - SpGistInitBuffer(buffer, - (xldata->storesNulls ? SPGIST_NULLS : 0)); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(innerBuffer); - if (lsn > PageGetLSN(page)) - { - addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size, - xldata->offnumInner); + addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnumInner); - /* if inner is also parent, update link while we're here */ - if (xldata->blknoInner == xldata->blknoParent) - { - SpGistInnerTuple parent; + /* if inner is also parent, update link while we're here */ + if (xldata->blknoInner == xldata->blknoParent) + { + SpGistInnerTuple parent; - parent = (SpGistInnerTuple) PageGetItem(page, + parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); - } - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + spgUpdateNodeLink(parent, xldata->nodeI, + xldata->blknoInner, xldata->offnumInner); } + + PageSetLSN(page, lsn); + MarkBufferDirty(innerBuffer); } + if (BufferIsValid(innerBuffer)) + UnlockReleaseBuffer(innerBuffer); bbi++; /* @@ -843,31 +789,26 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } else if (xldata->blknoInner != xldata->blknoParent) { - if (record->xl_info & XLR_BKP_BLOCK(bbi)) - (void) RestoreBackupBlock(lsn, record, bbi, false, false); - else - { - Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); + Buffer parentBuffer; - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); + if (XLogReadBufferForRedo(lsn, record, bbi, + xldata->node, xldata->blknoParent, + &parentBuffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parent; - if (lsn > PageGetLSN(page)) - { - SpGistInnerTuple parent; + page = BufferGetPage(parentBuffer); - parent = (SpGistInnerTuple) PageGetItem(page, + parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + spgUpdateNodeLink(parent, xldata->nodeI, + xldata->blknoInner, xldata->offnumInner); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(parentBuffer); } + if (BufferIsValid(parentBuffer)) + UnlockReleaseBuffer(parentBuffer); } } @@ -902,62 +843,56 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(OffsetNumber) * xldata->nChain; chainDest = (OffsetNumber *) ptr; - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (BufferIsValid(buffer)) + page = BufferGetPage(buffer); + + spgPageIndexMultiDelete(&state, page, + toDead, xldata->nDead, + SPGIST_DEAD, SPGIST_DEAD, + InvalidBlockNumber, + InvalidOffsetNumber); + + spgPageIndexMultiDelete(&state, page, + toPlaceholder, xldata->nPlaceholder, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* see comments in vacuumLeafPage() */ + for (i = 0; i < xldata->nMove; i++) { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - spgPageIndexMultiDelete(&state, page, - toDead, xldata->nDead, - SPGIST_DEAD, SPGIST_DEAD, - InvalidBlockNumber, - InvalidOffsetNumber); + ItemId idSrc = PageGetItemId(page, moveSrc[i]); + ItemId idDest = PageGetItemId(page, moveDest[i]); + ItemIdData tmp; - spgPageIndexMultiDelete(&state, page, - toPlaceholder, xldata->nPlaceholder, - SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, - InvalidBlockNumber, - InvalidOffsetNumber); + tmp = *idSrc; + *idSrc = *idDest; + *idDest = tmp; + } - /* see comments in vacuumLeafPage() */ - for (i = 0; i < xldata->nMove; i++) - { - ItemId idSrc = PageGetItemId(page, moveSrc[i]); - ItemId idDest = PageGetItemId(page, moveDest[i]); - ItemIdData tmp; - - tmp = *idSrc; - *idSrc = *idDest; - *idDest = tmp; - } - - spgPageIndexMultiDelete(&state, page, - moveSrc, xldata->nMove, - SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, - InvalidBlockNumber, - InvalidOffsetNumber); + spgPageIndexMultiDelete(&state, page, + moveSrc, xldata->nMove, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); - for (i = 0; i < xldata->nChain; i++) - { - SpGistLeafTuple lt; + for (i = 0; i < xldata->nChain; i++) + { + SpGistLeafTuple lt; - lt = (SpGistLeafTuple) PageGetItem(page, + lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, chainSrc[i])); - Assert(lt->tupstate == SPGIST_LIVE); - lt->nextOffset = chainDest[i]; - } - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + Assert(lt->tupstate == SPGIST_LIVE); + lt->nextOffset = chainDest[i]; } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -971,25 +906,19 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) toDelete = xldata->offsets; - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (BufferIsValid(buffer)) - { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - /* The tuple numbers are in order */ - PageIndexMultiDelete(page, toDelete, xldata->nDelete); + page = BufferGetPage(buffer); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + /* The tuple numbers are in order */ + PageIndexMultiDelete(page, toDelete, xldata->nDelete); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -999,7 +928,6 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; OffsetNumber *itemToPlaceholder; Buffer buffer; - Page page; itemToPlaceholder = xldata->offsets; @@ -1014,64 +942,55 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) xldata->node); } - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); + Page page = BufferGetPage(buffer); + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + int i; - if (BufferIsValid(buffer)) + /* Convert redirect pointers to plain placeholders */ + for (i = 0; i < xldata->nToPlaceholder; i++) { - page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - SpGistPageOpaque opaque = SpGistPageGetOpaque(page); - int i; + SpGistDeadTuple dt; - /* Convert redirect pointers to plain placeholders */ - for (i = 0; i < xldata->nToPlaceholder; i++) - { - SpGistDeadTuple dt; - - dt = (SpGistDeadTuple) PageGetItem(page, + dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, itemToPlaceholder[i])); - Assert(dt->tupstate == SPGIST_REDIRECT); - dt->tupstate = SPGIST_PLACEHOLDER; - ItemPointerSetInvalid(&dt->pointer); - } - - Assert(opaque->nRedirection >= xldata->nToPlaceholder); - opaque->nRedirection -= xldata->nToPlaceholder; - opaque->nPlaceholder += xldata->nToPlaceholder; - - /* Remove placeholder tuples at end of page */ - if (xldata->firstPlaceholder != InvalidOffsetNumber) - { - int max = PageGetMaxOffsetNumber(page); - OffsetNumber *toDelete; + Assert(dt->tupstate == SPGIST_REDIRECT); + dt->tupstate = SPGIST_PLACEHOLDER; + ItemPointerSetInvalid(&dt->pointer); + } - toDelete = palloc(sizeof(OffsetNumber) * max); + Assert(opaque->nRedirection >= xldata->nToPlaceholder); + opaque->nRedirection -= xldata->nToPlaceholder; + opaque->nPlaceholder += xldata->nToPlaceholder; - for (i = xldata->firstPlaceholder; i <= max; i++) - toDelete[i - xldata->firstPlaceholder] = i; + /* Remove placeholder tuples at end of page */ + if (xldata->firstPlaceholder != InvalidOffsetNumber) + { + int max = PageGetMaxOffsetNumber(page); + OffsetNumber *toDelete; - i = max - xldata->firstPlaceholder + 1; - Assert(opaque->nPlaceholder >= i); - opaque->nPlaceholder -= i; + toDelete = palloc(sizeof(OffsetNumber) * max); - /* The array is sorted, so can use PageIndexMultiDelete */ - PageIndexMultiDelete(page, toDelete, i); + for (i = xldata->firstPlaceholder; i <= max; i++) + toDelete[i - xldata->firstPlaceholder] = i; - pfree(toDelete); - } + i = max - xldata->firstPlaceholder + 1; + Assert(opaque->nPlaceholder >= i); + opaque->nPlaceholder -= i; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } + /* The array is sorted, so can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, toDelete, i); - UnlockReleaseBuffer(buffer); + pfree(toDelete); } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } void |