aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r--src/backend/access/spgist/spgxlog.c883
1 files changed, 401 insertions, 482 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index e4f7fbb53a5..911a14828e3 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -113,6 +113,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
SpGistLeafTupleData leafTupleHdr;
Buffer buffer;
Page page;
+ XLogRedoAction action;
ptr += sizeof(spgxlogAddLeaf);
leafTuple = ptr;
@@ -124,82 +125,78 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
* simultaneously; but in WAL replay it should be safe to update the leaf
* page before updating the parent.
*/
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ if (xldata->newPage)
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true);
+ SpGistInitBuffer(buffer,
+ SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
+ action = BLK_NEEDS_REDO;
+ }
else
+ action = XLogReadBufferForRedo(lsn, record, 0,
+ xldata->node, xldata->blknoLeaf,
+ &buffer);
+
+ if (action == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
- xldata->newPage);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
+ page = BufferGetPage(buffer);
- if (xldata->newPage)
- SpGistInitBuffer(buffer,
- SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
+ /* insert new tuple */
+ if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
+ {
+ /* normal cases, tuple was added by SpGistPageAddNewItem */
+ addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
+ xldata->offnumLeaf);
- if (lsn > PageGetLSN(page))
+ /* update head tuple's chain link if needed */
+ if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
{
- /* insert new tuple */
- if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
- {
- /* normal cases, tuple was added by SpGistPageAddNewItem */
- addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
- xldata->offnumLeaf);
-
- /* update head tuple's chain link if needed */
- if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
- {
- SpGistLeafTuple head;
-
- head = (SpGistLeafTuple) PageGetItem(page,
- PageGetItemId(page, xldata->offnumHeadLeaf));
- Assert(head->nextOffset == leafTupleHdr.nextOffset);
- head->nextOffset = xldata->offnumLeaf;
- }
- }
- else
- {
- /* replacing a DEAD tuple */
- PageIndexTupleDelete(page, xldata->offnumLeaf);
- if (PageAddItem(page,
- (Item) leafTuple, leafTupleHdr.size,
- xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
- elog(ERROR, "failed to add item of size %u to SPGiST index page",
- leafTupleHdr.size);
- }
+ SpGistLeafTuple head;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
+ head = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumHeadLeaf));
+ Assert(head->nextOffset == leafTupleHdr.nextOffset);
+ head->nextOffset = xldata->offnumLeaf;
}
- UnlockReleaseBuffer(buffer);
}
+ else
+ {
+ /* replacing a DEAD tuple */
+ PageIndexTupleDelete(page, xldata->offnumLeaf);
+ if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size,
+ xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ leafTupleHdr.size);
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
/* update parent downlink if necessary */
- if (record->xl_info & XLR_BKP_BLOCK(1))
- (void) RestoreBackupBlock(lsn, record, 1, false, false);
- else if (xldata->blknoParent != InvalidBlockNumber)
+ if (xldata->blknoParent != InvalidBlockNumber)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
- if (BufferIsValid(buffer))
+ if (XLogReadBufferForRedo(lsn, record, 1,
+ xldata->node, xldata->blknoParent,
+ &buffer) == BLK_NEEDS_REDO)
{
+ SpGistInnerTuple tuple;
+
page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- SpGistInnerTuple tuple;
- tuple = (SpGistInnerTuple) PageGetItem(page,
+ tuple = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(tuple, xldata->nodeI,
- xldata->blknoLeaf, xldata->offnumLeaf);
+ spgUpdateNodeLink(tuple, xldata->nodeI,
+ xldata->blknoLeaf, xldata->offnumLeaf);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
}
@@ -214,6 +211,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
int nInsert;
Buffer buffer;
Page page;
+ XLogRedoAction action;
fillFakeState(&state, xldata->stateSrc);
@@ -234,98 +232,82 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
*/
/* Insert tuples on the dest page (do first, so redirect is valid) */
- if (record->xl_info & XLR_BKP_BLOCK(1))
- (void) RestoreBackupBlock(lsn, record, 1, false, false);
- else
+ if (xldata->newPage)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
- xldata->newPage);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
-
- if (xldata->newPage)
- SpGistInitBuffer(buffer,
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true);
+ SpGistInitBuffer(buffer,
SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ action = XLogReadBufferForRedo(lsn, record, 1,
+ xldata->node, xldata->blknoDst,
+ &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ int i;
- if (lsn > PageGetLSN(page))
- {
- int i;
-
- for (i = 0; i < nInsert; i++)
- {
- char *leafTuple;
- SpGistLeafTupleData leafTupleHdr;
-
- /*
- * the tuples are not aligned, so must copy to access
- * the size field.
- */
- leafTuple = ptr;
- memcpy(&leafTupleHdr, leafTuple,
- sizeof(SpGistLeafTupleData));
-
- addOrReplaceTuple(page, (Item) leafTuple,
- leafTupleHdr.size, toInsert[i]);
- ptr += leafTupleHdr.size;
- }
+ page = BufferGetPage(buffer);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ for (i = 0; i < nInsert; i++)
+ {
+ char *leafTuple;
+ SpGistLeafTupleData leafTupleHdr;
+
+ /*
+ * the tuples are not aligned, so must copy to access the size
+ * field.
+ */
+ leafTuple = ptr;
+ memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
+
+ addOrReplaceTuple(page, (Item) leafTuple,
+ leafTupleHdr.size, toInsert[i]);
+ ptr += leafTupleHdr.size;
}
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
/* Delete tuples from the source page, inserting a redirection pointer */
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
+ page = BufferGetPage(buffer);
+ spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
- SPGIST_PLACEHOLDER,
- xldata->blknoDst,
- toInsert[nInsert - 1]);
+ SPGIST_PLACEHOLDER,
+ xldata->blknoDst,
+ toInsert[nInsert - 1]);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
/* And update the parent downlink */
- if (record->xl_info & XLR_BKP_BLOCK(2))
- (void) RestoreBackupBlock(lsn, record, 2, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- SpGistInnerTuple tuple;
+ SpGistInnerTuple tuple;
+
+ page = BufferGetPage(buffer);
- tuple = (SpGistInnerTuple) PageGetItem(page,
+ tuple = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(tuple, xldata->nodeI,
- xldata->blknoDst, toInsert[nInsert - 1]);
+ spgUpdateNodeLink(tuple, xldata->nodeI,
+ xldata->blknoDst, toInsert[nInsert - 1]);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -339,6 +321,7 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
int bbi;
+ XLogRedoAction action;
ptr += sizeof(spgxlogAddNode);
innerTuple = ptr;
@@ -351,29 +334,21 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
{
/* update in place */
Assert(xldata->blknoParent == InvalidBlockNumber);
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- PageIndexTupleDelete(page, xldata->offnum);
- if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
- xldata->offnum,
- false, false) != xldata->offnum)
- elog(ERROR, "failed to add item of size %u to SPGiST index page",
- innerTupleHdr.size);
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ page = BufferGetPage(buffer);
+ PageIndexTupleDelete(page, xldata->offnum);
+ if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
+ xldata->offnum, false, false) != xldata->offnum)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ innerTupleHdr.size);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
else
{
@@ -390,90 +365,79 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
Assert(xldata->blkno != xldata->blknoNew);
/* Install new tuple first so redirect is valid */
- if (record->xl_info & XLR_BKP_BLOCK(1))
- (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ if (xldata->newPage)
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true);
+ /* AddNode is not used for nulls pages */
+ SpGistInitBuffer(buffer, 0);
+ action = BLK_NEEDS_REDO;
+ }
else
+ action = XLogReadBufferForRedo(lsn, record, 1,
+ xldata->node, xldata->blknoNew,
+ &buffer);
+ if (action == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
- xldata->newPage);
- if (BufferIsValid(buffer))
+ page = BufferGetPage(buffer);
+
+ addOrReplaceTuple(page, (Item) innerTuple,
+ innerTupleHdr.size, xldata->offnumNew);
+
+ /*
+ * If parent is in this same page, don't advance LSN; doing so
+ * would fool us into not applying the parent downlink update
+ * below. We'll update the LSN when we fix the parent downlink.
+ */
+ if (xldata->blknoParent != xldata->blknoNew)
{
- page = BufferGetPage(buffer);
-
- /* AddNode is not used for nulls pages */
- if (xldata->newPage)
- SpGistInitBuffer(buffer, 0);
-
- if (lsn > PageGetLSN(page))
- {
- addOrReplaceTuple(page, (Item) innerTuple,
- innerTupleHdr.size, xldata->offnumNew);
-
- /*
- * If parent is in this same page, don't advance LSN;
- * doing so would fool us into not applying the parent
- * downlink update below. We'll update the LSN when we
- * fix the parent downlink.
- */
- if (xldata->blknoParent != xldata->blknoNew)
- {
- PageSetLSN(page, lsn);
- }
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ PageSetLSN(page, lsn);
}
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
/* Delete old tuple, replacing it with redirect or placeholder tuple */
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (BufferIsValid(buffer))
+ SpGistDeadTuple dt;
+
+ page = BufferGetPage(buffer);
+
+ if (state.isBuild)
+ dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+ else
+ dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
+ xldata->blknoNew,
+ xldata->offnumNew);
+
+ PageIndexTupleDelete(page, xldata->offnum);
+ if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum,
+ false, false) != xldata->offnum)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ dt->size);
+
+ if (state.isBuild)
+ SpGistPageGetOpaque(page)->nPlaceholder++;
+ else
+ SpGistPageGetOpaque(page)->nRedirection++;
+
+ /*
+ * If parent is in this same page, don't advance LSN; doing so
+ * would fool us into not applying the parent downlink update
+ * below. We'll update the LSN when we fix the parent downlink.
+ */
+ if (xldata->blknoParent != xldata->blkno)
{
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- SpGistDeadTuple dt;
-
- if (state.isBuild)
- dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
- InvalidBlockNumber,
- InvalidOffsetNumber);
- else
- dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
- xldata->blknoNew,
- xldata->offnumNew);
-
- PageIndexTupleDelete(page, xldata->offnum);
- if (PageAddItem(page, (Item) dt, dt->size,
- xldata->offnum,
- false, false) != xldata->offnum)
- elog(ERROR, "failed to add item of size %u to SPGiST index page",
- dt->size);
-
- if (state.isBuild)
- SpGistPageGetOpaque(page)->nPlaceholder++;
- else
- SpGistPageGetOpaque(page)->nRedirection++;
-
- /*
- * If parent is in this same page, don't advance LSN;
- * doing so would fool us into not applying the parent
- * downlink update below. We'll update the LSN when we
- * fix the parent downlink.
- */
- if (xldata->blknoParent != xldata->blkno)
- {
- PageSetLSN(page, lsn);
- }
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ PageSetLSN(page, lsn);
}
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
/*
* Update parent downlink. Since parent could be in either of the
@@ -491,29 +455,32 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
{
if (bbi == 2) /* else we already did it */
(void) RestoreBackupBlock(lsn, record, bbi, false, false);
+ action = BLK_RESTORED;
+ buffer = InvalidBuffer;
}
else
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- SpGistInnerTuple innerTuple;
+ action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node,
+ xldata->blknoParent, &buffer);
+ Assert(action != BLK_RESTORED);
+ }
+ if (action == BLK_NEEDS_REDO)
+ {
+ SpGistInnerTuple innerTuple;
+
+ page = BufferGetPage(buffer);
- innerTuple = (SpGistInnerTuple) PageGetItem(page,
+ innerTuple = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(innerTuple, xldata->nodeI,
- xldata->blknoNew, xldata->offnumNew);
+ spgUpdateNodeLink(innerTuple, xldata->nodeI,
+ xldata->blknoNew, xldata->offnumNew);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
}
@@ -545,60 +512,56 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
*/
/* insert postfix tuple first to avoid dangling link */
- if (record->xl_info & XLR_BKP_BLOCK(1))
- (void) RestoreBackupBlock(lsn, record, 1, false, false);
- else if (xldata->blknoPostfix != xldata->blknoPrefix)
+ if (xldata->blknoPostfix != xldata->blknoPrefix)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
- xldata->newPage);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
+ XLogRedoAction action;
+ if (xldata->newPage)
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true);
/* SplitTuple is not used for nulls pages */
- if (xldata->newPage)
- SpGistInitBuffer(buffer, 0);
+ SpGistInitBuffer(buffer, 0);
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ action = XLogReadBufferForRedo(lsn, record, 1,
+ xldata->node, xldata->blknoPostfix,
+ &buffer);
- if (lsn > PageGetLSN(page))
- {
- addOrReplaceTuple(page, (Item) postfixTuple,
- postfixTupleHdr.size, xldata->offnumPostfix);
+ if (action == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(buffer);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ addOrReplaceTuple(page, (Item) postfixTuple,
+ postfixTupleHdr.size, xldata->offnumPostfix);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
/* now handle the original page */
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- PageIndexTupleDelete(page, xldata->offnumPrefix);
- if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
+ page = BufferGetPage(buffer);
+ PageIndexTupleDelete(page, xldata->offnumPrefix);
+ if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
- elog(ERROR, "failed to add item of size %u to SPGiST index page",
- prefixTupleHdr.size);
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ prefixTupleHdr.size);
- if (xldata->blknoPostfix == xldata->blknoPrefix)
- addOrReplaceTuple(page, (Item) postfixTuple,
- postfixTupleHdr.size,
- xldata->offnumPostfix);
+ if (xldata->blknoPostfix == xldata->blknoPrefix)
+ addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size,
+ xldata->offnumPostfix);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -616,9 +579,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
Buffer destBuffer;
Page srcPage;
Page destPage;
+ Buffer innerBuffer;
Page page;
int bbi;
int i;
+ XLogRedoAction action;
fillFakeState(&state, xldata->stateSrc);
@@ -668,46 +633,37 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
* inserting leaf tuples and the new inner tuple, else the added
* redirect tuple will be a dangling link.)
*/
- if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ if (XLogReadBufferForRedo(lsn, record, bbi,
+ xldata->node, xldata->blknoSrc,
+ &srcBuffer) == BLK_NEEDS_REDO)
{
- srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
- srcPage = NULL; /* don't need to do any page updates */
+ srcPage = BufferGetPage(srcBuffer);
+
+ /*
+ * We have it a bit easier here than in doPickSplit(), because we
+ * know the inner tuple's location already, so we can inject the
+ * correct redirection tuple now.
+ */
+ if (!state.isBuild)
+ spgPageIndexMultiDelete(&state, srcPage,
+ toDelete, xldata->nDelete,
+ SPGIST_REDIRECT,
+ SPGIST_PLACEHOLDER,
+ xldata->blknoInner,
+ xldata->offnumInner);
+ else
+ spgPageIndexMultiDelete(&state, srcPage,
+ toDelete, xldata->nDelete,
+ SPGIST_PLACEHOLDER,
+ SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ /* don't update LSN etc till we're done with it */
}
else
{
- srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
- if (BufferIsValid(srcBuffer))
- {
- srcPage = BufferGetPage(srcBuffer);
- if (lsn > PageGetLSN(srcPage))
- {
- /*
- * We have it a bit easier here than in doPickSplit(),
- * because we know the inner tuple's location already, so
- * we can inject the correct redirection tuple now.
- */
- if (!state.isBuild)
- spgPageIndexMultiDelete(&state, srcPage,
- toDelete, xldata->nDelete,
- SPGIST_REDIRECT,
- SPGIST_PLACEHOLDER,
- xldata->blknoInner,
- xldata->offnumInner);
- else
- spgPageIndexMultiDelete(&state, srcPage,
- toDelete, xldata->nDelete,
- SPGIST_PLACEHOLDER,
- SPGIST_PLACEHOLDER,
- InvalidBlockNumber,
- InvalidOffsetNumber);
-
- /* don't update LSN etc till we're done with it */
- }
- else
- srcPage = NULL; /* don't do any page updates */
- }
- else
- srcPage = NULL;
+ srcPage = NULL; /* don't do any page updates */
}
bbi++;
}
@@ -735,22 +691,15 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
* We could probably release the page lock immediately in the
* full-page-image case, but for safety let's hold it till later.
*/
- if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ if (XLogReadBufferForRedo(lsn, record, bbi,
+ xldata->node, xldata->blknoDest,
+ &destBuffer) == BLK_NEEDS_REDO)
{
- destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
- destPage = NULL; /* don't need to do any page updates */
+ destPage = (Page) BufferGetPage(destBuffer);
}
else
{
- destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
- if (BufferIsValid(destBuffer))
- {
- destPage = (Page) BufferGetPage(destBuffer);
- if (lsn <= PageGetLSN(destPage))
- destPage = NULL; /* don't do any page updates */
- }
- else
- destPage = NULL;
+ destPage = NULL; /* don't do any page updates */
}
bbi++;
}
@@ -787,43 +736,40 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
/* restore new inner tuple */
- if (record->xl_info & XLR_BKP_BLOCK(bbi))
- (void) RestoreBackupBlock(lsn, record, bbi, false, false);
- else
+ if (xldata->initInner)
{
- Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
- xldata->initInner);
-
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
+ innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true);
+ SpGistInitBuffer(innerBuffer,
+ (xldata->storesNulls ? SPGIST_NULLS : 0));
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node,
+ xldata->blknoInner, &innerBuffer);
- if (xldata->initInner)
- SpGistInitBuffer(buffer,
- (xldata->storesNulls ? SPGIST_NULLS : 0));
+ if (action == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(innerBuffer);
- if (lsn > PageGetLSN(page))
- {
- addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
- xldata->offnumInner);
+ addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
+ xldata->offnumInner);
- /* if inner is also parent, update link while we're here */
- if (xldata->blknoInner == xldata->blknoParent)
- {
- SpGistInnerTuple parent;
+ /* if inner is also parent, update link while we're here */
+ if (xldata->blknoInner == xldata->blknoParent)
+ {
+ SpGistInnerTuple parent;
- parent = (SpGistInnerTuple) PageGetItem(page,
+ parent = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(parent, xldata->nodeI,
- xldata->blknoInner, xldata->offnumInner);
- }
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ spgUpdateNodeLink(parent, xldata->nodeI,
+ xldata->blknoInner, xldata->offnumInner);
}
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(innerBuffer);
}
+ if (BufferIsValid(innerBuffer))
+ UnlockReleaseBuffer(innerBuffer);
bbi++;
/*
@@ -843,31 +789,26 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
else if (xldata->blknoInner != xldata->blknoParent)
{
- if (record->xl_info & XLR_BKP_BLOCK(bbi))
- (void) RestoreBackupBlock(lsn, record, bbi, false, false);
- else
- {
- Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
+ Buffer parentBuffer;
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
+ if (XLogReadBufferForRedo(lsn, record, bbi,
+ xldata->node, xldata->blknoParent,
+ &parentBuffer) == BLK_NEEDS_REDO)
+ {
+ SpGistInnerTuple parent;
- if (lsn > PageGetLSN(page))
- {
- SpGistInnerTuple parent;
+ page = BufferGetPage(parentBuffer);
- parent = (SpGistInnerTuple) PageGetItem(page,
+ parent = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(parent, xldata->nodeI,
- xldata->blknoInner, xldata->offnumInner);
+ spgUpdateNodeLink(parent, xldata->nodeI,
+ xldata->blknoInner, xldata->offnumInner);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(parentBuffer);
}
+ if (BufferIsValid(parentBuffer))
+ UnlockReleaseBuffer(parentBuffer);
}
}
@@ -902,62 +843,56 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(OffsetNumber) * xldata->nChain;
chainDest = (OffsetNumber *) ptr;
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (BufferIsValid(buffer))
+ page = BufferGetPage(buffer);
+
+ spgPageIndexMultiDelete(&state, page,
+ toDead, xldata->nDead,
+ SPGIST_DEAD, SPGIST_DEAD,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ spgPageIndexMultiDelete(&state, page,
+ toPlaceholder, xldata->nPlaceholder,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ /* see comments in vacuumLeafPage() */
+ for (i = 0; i < xldata->nMove; i++)
{
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- spgPageIndexMultiDelete(&state, page,
- toDead, xldata->nDead,
- SPGIST_DEAD, SPGIST_DEAD,
- InvalidBlockNumber,
- InvalidOffsetNumber);
+ ItemId idSrc = PageGetItemId(page, moveSrc[i]);
+ ItemId idDest = PageGetItemId(page, moveDest[i]);
+ ItemIdData tmp;
- spgPageIndexMultiDelete(&state, page,
- toPlaceholder, xldata->nPlaceholder,
- SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
- InvalidBlockNumber,
- InvalidOffsetNumber);
+ tmp = *idSrc;
+ *idSrc = *idDest;
+ *idDest = tmp;
+ }
- /* see comments in vacuumLeafPage() */
- for (i = 0; i < xldata->nMove; i++)
- {
- ItemId idSrc = PageGetItemId(page, moveSrc[i]);
- ItemId idDest = PageGetItemId(page, moveDest[i]);
- ItemIdData tmp;
-
- tmp = *idSrc;
- *idSrc = *idDest;
- *idDest = tmp;
- }
-
- spgPageIndexMultiDelete(&state, page,
- moveSrc, xldata->nMove,
- SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
- InvalidBlockNumber,
- InvalidOffsetNumber);
+ spgPageIndexMultiDelete(&state, page,
+ moveSrc, xldata->nMove,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
- for (i = 0; i < xldata->nChain; i++)
- {
- SpGistLeafTuple lt;
+ for (i = 0; i < xldata->nChain; i++)
+ {
+ SpGistLeafTuple lt;
- lt = (SpGistLeafTuple) PageGetItem(page,
+ lt = (SpGistLeafTuple) PageGetItem(page,
PageGetItemId(page, chainSrc[i]));
- Assert(lt->tupstate == SPGIST_LIVE);
- lt->nextOffset = chainDest[i];
- }
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ Assert(lt->tupstate == SPGIST_LIVE);
+ lt->nextOffset = chainDest[i];
}
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -971,25 +906,19 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
toDelete = xldata->offsets;
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (BufferIsValid(buffer))
- {
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- /* The tuple numbers are in order */
- PageIndexMultiDelete(page, toDelete, xldata->nDelete);
+ page = BufferGetPage(buffer);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ /* The tuple numbers are in order */
+ PageIndexMultiDelete(page, toDelete, xldata->nDelete);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -999,7 +928,6 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
OffsetNumber *itemToPlaceholder;
Buffer buffer;
- Page page;
itemToPlaceholder = xldata->offsets;
@@ -1014,64 +942,55 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
xldata->node);
}
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ Page page = BufferGetPage(buffer);
+ SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
+ int i;
- if (BufferIsValid(buffer))
+ /* Convert redirect pointers to plain placeholders */
+ for (i = 0; i < xldata->nToPlaceholder; i++)
{
- page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
- int i;
+ SpGistDeadTuple dt;
- /* Convert redirect pointers to plain placeholders */
- for (i = 0; i < xldata->nToPlaceholder; i++)
- {
- SpGistDeadTuple dt;
-
- dt = (SpGistDeadTuple) PageGetItem(page,
+ dt = (SpGistDeadTuple) PageGetItem(page,
PageGetItemId(page, itemToPlaceholder[i]));
- Assert(dt->tupstate == SPGIST_REDIRECT);
- dt->tupstate = SPGIST_PLACEHOLDER;
- ItemPointerSetInvalid(&dt->pointer);
- }
-
- Assert(opaque->nRedirection >= xldata->nToPlaceholder);
- opaque->nRedirection -= xldata->nToPlaceholder;
- opaque->nPlaceholder += xldata->nToPlaceholder;
-
- /* Remove placeholder tuples at end of page */
- if (xldata->firstPlaceholder != InvalidOffsetNumber)
- {
- int max = PageGetMaxOffsetNumber(page);
- OffsetNumber *toDelete;
+ Assert(dt->tupstate == SPGIST_REDIRECT);
+ dt->tupstate = SPGIST_PLACEHOLDER;
+ ItemPointerSetInvalid(&dt->pointer);
+ }
- toDelete = palloc(sizeof(OffsetNumber) * max);
+ Assert(opaque->nRedirection >= xldata->nToPlaceholder);
+ opaque->nRedirection -= xldata->nToPlaceholder;
+ opaque->nPlaceholder += xldata->nToPlaceholder;
- for (i = xldata->firstPlaceholder; i <= max; i++)
- toDelete[i - xldata->firstPlaceholder] = i;
+ /* Remove placeholder tuples at end of page */
+ if (xldata->firstPlaceholder != InvalidOffsetNumber)
+ {
+ int max = PageGetMaxOffsetNumber(page);
+ OffsetNumber *toDelete;
- i = max - xldata->firstPlaceholder + 1;
- Assert(opaque->nPlaceholder >= i);
- opaque->nPlaceholder -= i;
+ toDelete = palloc(sizeof(OffsetNumber) * max);
- /* The array is sorted, so can use PageIndexMultiDelete */
- PageIndexMultiDelete(page, toDelete, i);
+ for (i = xldata->firstPlaceholder; i <= max; i++)
+ toDelete[i - xldata->firstPlaceholder] = i;
- pfree(toDelete);
- }
+ i = max - xldata->firstPlaceholder + 1;
+ Assert(opaque->nPlaceholder >= i);
+ opaque->nPlaceholder -= i;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
+ /* The array is sorted, so can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, toDelete, i);
- UnlockReleaseBuffer(buffer);
+ pfree(toDelete);
}
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
void