aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r--src/backend/access/spgist/spgxlog.c1070
1 files changed, 1070 insertions, 0 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
new file mode 100644
index 00000000000..e508f09703d
--- /dev/null
+++ b/src/backend/access/spgist/spgxlog.c
@@ -0,0 +1,1070 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgxlog.c
+ * WAL replay logic for SP-GiST
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/spgist/spgxlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/spgist_private.h"
+#include "access/xlogutils.h"
+#include "storage/bufmgr.h"
+#include "utils/memutils.h"
+
+
+static MemoryContext opCtx; /* working memory for operations */
+
+
+/*
+ * Prepare a dummy SpGistState, with just the minimum info needed for replay.
+ *
+ * At present, all we need is enough info to support spgFormDeadTuple(),
+ * plus the isBuild flag.
+ */
+static void
+fillFakeState(SpGistState *state, spgxlogState stateSrc)
+{
+ memset(state, 0, sizeof(*state));
+
+ state->myXid = stateSrc.myXid;
+ state->isBuild = stateSrc.isBuild;
+ state->deadTupleStorage = palloc0(SGDTSIZE);
+}
+
+/*
+ * Add a leaf tuple, or replace an existing placeholder tuple. This is used
+ * to replay SpGistPageAddNewItem() operations. If the offset points at an
+ * existing tuple, it had better be a placeholder tuple.
+ */
+static void
+addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
+{
+ if (offset <= PageGetMaxOffsetNumber(page))
+ {
+ SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
+ PageGetItemId(page, offset));
+
+ if (dt->tupstate != SPGIST_PLACEHOLDER)
+ elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
+
+ Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
+ SpGistPageGetOpaque(page)->nPlaceholder--;
+
+ PageIndexTupleDelete(page, offset);
+ }
+
+ Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
+
+ if (PageAddItem(page, tuple, size, offset, false, false) != offset)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ size);
+}
+
+static void
+spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
+{
+ RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+
+ buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+ SpGistInitMetapage(page);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+
+ buffer = XLogReadBuffer(*node, SPGIST_HEAD_BLKNO, true);
+ Assert(BufferIsValid(buffer));
+ SpGistInitBuffer(buffer, SPGIST_LEAF);
+ page = (Page) BufferGetPage(buffer);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
+ SpGistLeafTuple leafTuple;
+ Buffer buffer;
+ Page page;
+
+ /* we assume this is adequately aligned */
+ ptr += sizeof(spgxlogAddLeaf);
+ leafTuple = (SpGistLeafTuple) ptr;
+
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
+ xldata->newPage);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (xldata->newPage)
+ SpGistInitBuffer(buffer, SPGIST_LEAF);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ /* insert new tuple */
+ if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
+ {
+ /* normal cases, tuple was added by SpGistPageAddNewItem */
+ addOrReplaceTuple(page, (Item) leafTuple, leafTuple->size,
+ xldata->offnumLeaf);
+
+ /* update head tuple's chain link if needed */
+ if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
+ {
+ SpGistLeafTuple head;
+
+ head = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumHeadLeaf));
+ Assert(head->nextOffset == leafTuple->nextOffset);
+ head->nextOffset = xldata->offnumLeaf;
+ }
+ }
+ else
+ {
+ /* replacing a DEAD tuple */
+ PageIndexTupleDelete(page, xldata->offnumLeaf);
+ if (PageAddItem(page,
+ (Item) leafTuple, leafTuple->size,
+ xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ leafTuple->size);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /* update parent downlink if necessary */
+ if (xldata->blknoParent != InvalidBlockNumber &&
+ !(record->xl_info & XLR_BKP_BLOCK_2))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistInnerTuple tuple;
+
+ tuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+
+ updateNodeLink(tuple, xldata->nodeI,
+ xldata->blknoLeaf, xldata->offnumLeaf);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+static void
+spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
+ SpGistState state;
+ OffsetNumber *toDelete;
+ OffsetNumber *toInsert;
+ int nInsert;
+ Buffer buffer;
+ Page page;
+
+ fillFakeState(&state, xldata->stateSrc);
+
+ nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
+
+ ptr += MAXALIGN(sizeof(spgxlogMoveLeafs));
+ toDelete = (OffsetNumber *) ptr;
+ ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nMoves);
+ toInsert = (OffsetNumber *) ptr;
+ ptr += MAXALIGN(sizeof(OffsetNumber) * nInsert);
+
+ /* now ptr points to the list of leaf tuples */
+
+ /* Insert tuples on the dest page (do first, so redirect is valid) */
+ if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
+ xldata->newPage);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (xldata->newPage)
+ SpGistInitBuffer(buffer, SPGIST_LEAF);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ int i;
+
+ for (i = 0; i < nInsert; i++)
+ {
+ SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
+
+ addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
+ ptr += lt->size;
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /* Delete tuples from the source page, inserting a redirection pointer */
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
+ state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
+ SPGIST_PLACEHOLDER,
+ xldata->blknoDst,
+ toInsert[nInsert - 1]);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /* And update the parent downlink */
+ if (!(record->xl_info & XLR_BKP_BLOCK_3))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistInnerTuple tuple;
+
+ tuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+
+ updateNodeLink(tuple, xldata->nodeI,
+ xldata->blknoDst, toInsert[nInsert - 1]);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+static void
+spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
+ SpGistInnerTuple innerTuple;
+ SpGistState state;
+ Buffer buffer;
+ Page page;
+ int bbi;
+
+ /* we assume this is adequately aligned */
+ ptr += sizeof(spgxlogAddNode);
+ innerTuple = (SpGistInnerTuple) ptr;
+
+ fillFakeState(&state, xldata->stateSrc);
+
+ if (xldata->blknoNew == InvalidBlockNumber)
+ {
+ /* update in place */
+ Assert(xldata->blknoParent == InvalidBlockNumber);
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ PageIndexTupleDelete(page, xldata->offnum);
+ if (PageAddItem(page, (Item) innerTuple, innerTuple->size,
+ xldata->offnum,
+ false, false) != xldata->offnum)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ innerTuple->size);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+ }
+ else
+ {
+ /* Install new tuple first so redirect is valid */
+ if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
+ xldata->newPage);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (xldata->newPage)
+ SpGistInitBuffer(buffer, 0);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ addOrReplaceTuple(page, (Item) innerTuple,
+ innerTuple->size, xldata->offnumNew);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /* Delete old tuple, replacing it with redirect or placeholder tuple */
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistDeadTuple dt;
+
+ if (state.isBuild)
+ dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+ else
+ dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
+ xldata->blknoNew,
+ xldata->offnumNew);
+
+ PageIndexTupleDelete(page, xldata->offnum);
+ if (PageAddItem(page, (Item) dt, dt->size,
+ xldata->offnum,
+ false, false) != xldata->offnum)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ dt->size);
+
+ if (state.isBuild)
+ SpGistPageGetOpaque(page)->nPlaceholder++;
+ else
+ SpGistPageGetOpaque(page)->nRedirection++;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /*
+ * Update parent downlink. Since parent could be in either of the
+ * previous two buffers, it's a bit tricky to determine which BKP bit
+ * applies.
+ */
+ if (xldata->blknoParent == xldata->blkno)
+ bbi = 0;
+ else if (xldata->blknoParent == xldata->blknoNew)
+ bbi = 1;
+ else
+ bbi = 2;
+
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistInnerTuple innerTuple;
+
+ innerTuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+
+ updateNodeLink(innerTuple, xldata->nodeI,
+ xldata->blknoNew, xldata->offnumNew);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+ }
+}
+
+static void
+spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
+ SpGistInnerTuple prefixTuple;
+ SpGistInnerTuple postfixTuple;
+ Buffer buffer;
+ Page page;
+
+ /* we assume this is adequately aligned */
+ ptr += sizeof(spgxlogSplitTuple);
+ prefixTuple = (SpGistInnerTuple) ptr;
+ ptr += prefixTuple->size;
+ postfixTuple = (SpGistInnerTuple) ptr;
+
+ /* insert postfix tuple first to avoid dangling link */
+ if (xldata->blknoPostfix != xldata->blknoPrefix &&
+ !(record->xl_info & XLR_BKP_BLOCK_2))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
+ xldata->newPage);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (xldata->newPage)
+ SpGistInitBuffer(buffer, 0);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ addOrReplaceTuple(page, (Item) postfixTuple,
+ postfixTuple->size, xldata->offnumPostfix);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+
+ /* now handle the original page */
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ PageIndexTupleDelete(page, xldata->offnumPrefix);
+ if (PageAddItem(page, (Item) prefixTuple, prefixTuple->size,
+ xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
+ elog(ERROR, "failed to add item of size %u to SPGiST index page",
+ prefixTuple->size);
+
+ if (xldata->blknoPostfix == xldata->blknoPrefix)
+ addOrReplaceTuple(page, (Item) postfixTuple,
+ postfixTuple->size,
+ xldata->offnumPostfix);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+static void
+spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
+ SpGistInnerTuple innerTuple;
+ SpGistState state;
+ OffsetNumber *toDelete;
+ OffsetNumber *toInsert;
+ uint8 *leafPageSelect;
+ Buffer srcBuffer;
+ Buffer destBuffer;
+ Page page;
+ int bbi;
+ int i;
+
+ fillFakeState(&state, xldata->stateSrc);
+
+ ptr += MAXALIGN(sizeof(spgxlogPickSplit));
+ innerTuple = (SpGistInnerTuple) ptr;
+ ptr += innerTuple->size;
+ toDelete = (OffsetNumber *) ptr;
+ ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nDelete);
+ toInsert = (OffsetNumber *) ptr;
+ ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nInsert);
+ leafPageSelect = (uint8 *) ptr;
+ ptr += MAXALIGN(sizeof(uint8) * xldata->nInsert);
+
+ /* now ptr points to the list of leaf tuples */
+
+ /*
+ * It's a bit tricky to identify which pages have been handled as
+ * full-page images, so we explicitly count each referenced buffer.
+ */
+ bbi = 0;
+
+ if (xldata->blknoSrc == SPGIST_HEAD_BLKNO)
+ {
+ /* when splitting root, we touch it only in the guise of new inner */
+ srcBuffer = InvalidBuffer;
+ }
+ else if (xldata->initSrc)
+ {
+ /* just re-init the source page */
+ srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
+ Assert(BufferIsValid(srcBuffer));
+ page = (Page) BufferGetPage(srcBuffer);
+
+ SpGistInitBuffer(srcBuffer, SPGIST_LEAF);
+ /* don't update LSN etc till we're done with it */
+ }
+ else
+ {
+ /* delete the specified tuples from source page */
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ {
+ srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
+ if (BufferIsValid(srcBuffer))
+ {
+ page = BufferGetPage(srcBuffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ /*
+ * We have it a bit easier here than in doPickSplit(),
+ * because we know the inner tuple's location already,
+ * so we can inject the correct redirection tuple now.
+ */
+ if (!state.isBuild)
+ spgPageIndexMultiDelete(&state, page,
+ toDelete, xldata->nDelete,
+ SPGIST_REDIRECT,
+ SPGIST_PLACEHOLDER,
+ xldata->blknoInner,
+ xldata->offnumInner);
+ else
+ spgPageIndexMultiDelete(&state, page,
+ toDelete, xldata->nDelete,
+ SPGIST_PLACEHOLDER,
+ SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ /* don't update LSN etc till we're done with it */
+ }
+ }
+ }
+ else
+ srcBuffer = InvalidBuffer;
+ bbi++;
+ }
+
+ /* try to access dest page if any */
+ if (xldata->blknoDest == InvalidBlockNumber)
+ {
+ destBuffer = InvalidBuffer;
+ }
+ else if (xldata->initDest)
+ {
+ /* just re-init the dest page */
+ destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
+ Assert(BufferIsValid(destBuffer));
+ page = (Page) BufferGetPage(destBuffer);
+
+ SpGistInitBuffer(destBuffer, SPGIST_LEAF);
+ /* don't update LSN etc till we're done with it */
+ }
+ else
+ {
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
+ else
+ destBuffer = InvalidBuffer;
+ bbi++;
+ }
+
+ /* restore leaf tuples to src and/or dest page */
+ for (i = 0; i < xldata->nInsert; i++)
+ {
+ SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
+ Buffer leafBuffer;
+
+ ptr += lt->size;
+
+ leafBuffer = leafPageSelect[i] ? destBuffer : srcBuffer;
+ if (!BufferIsValid(leafBuffer))
+ continue; /* no need to touch this page */
+ page = BufferGetPage(leafBuffer);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
+ }
+ }
+
+ /* Now update src and dest page LSNs */
+ if (BufferIsValid(srcBuffer))
+ {
+ page = BufferGetPage(srcBuffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(srcBuffer);
+ }
+ UnlockReleaseBuffer(srcBuffer);
+ }
+ if (BufferIsValid(destBuffer))
+ {
+ page = BufferGetPage(destBuffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(destBuffer);
+ }
+ UnlockReleaseBuffer(destBuffer);
+ }
+
+ /* restore new inner tuple */
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ {
+ Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
+ xldata->initInner);
+
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (xldata->initInner)
+ SpGistInitBuffer(buffer, 0);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ addOrReplaceTuple(page, (Item) innerTuple, innerTuple->size,
+ xldata->offnumInner);
+
+ /* if inner is also parent, update link while we're here */
+ if (xldata->blknoInner == xldata->blknoParent)
+ {
+ SpGistInnerTuple parent;
+
+ parent = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+ updateNodeLink(parent, xldata->nodeI,
+ xldata->blknoInner, xldata->offnumInner);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+ bbi++;
+
+ /* update parent downlink, unless we did it above */
+ if (xldata->blknoParent == InvalidBlockNumber)
+ {
+ /* no parent cause we split the root */
+ Assert(xldata->blknoInner == SPGIST_HEAD_BLKNO);
+ }
+ else if (xldata->blknoInner != xldata->blknoParent)
+ {
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ {
+ Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
+
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistInnerTuple parent;
+
+ parent = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+ updateNodeLink(parent, xldata->nodeI,
+ xldata->blknoInner, xldata->offnumInner);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+ }
+}
+
+static void
+spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
+ OffsetNumber *toDead;
+ OffsetNumber *toPlaceholder;
+ OffsetNumber *moveSrc;
+ OffsetNumber *moveDest;
+ OffsetNumber *chainSrc;
+ OffsetNumber *chainDest;
+ SpGistState state;
+ Buffer buffer;
+ Page page;
+ int i;
+
+ fillFakeState(&state, xldata->stateSrc);
+
+ ptr += sizeof(spgxlogVacuumLeaf);
+ toDead = (OffsetNumber *) ptr;
+ ptr += sizeof(OffsetNumber) * xldata->nDead;
+ toPlaceholder = (OffsetNumber *) ptr;
+ ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
+ moveSrc = (OffsetNumber *) ptr;
+ ptr += sizeof(OffsetNumber) * xldata->nMove;
+ moveDest = (OffsetNumber *) ptr;
+ ptr += sizeof(OffsetNumber) * xldata->nMove;
+ chainSrc = (OffsetNumber *) ptr;
+ ptr += sizeof(OffsetNumber) * xldata->nChain;
+ chainDest = (OffsetNumber *) ptr;
+
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ spgPageIndexMultiDelete(&state, page,
+ toDead, xldata->nDead,
+ SPGIST_DEAD, SPGIST_DEAD,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ spgPageIndexMultiDelete(&state, page,
+ toPlaceholder, xldata->nPlaceholder,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ /* see comments in vacuumLeafPage() */
+ for (i = 0; i < xldata->nMove; i++)
+ {
+ ItemId idSrc = PageGetItemId(page, moveSrc[i]);
+ ItemId idDest = PageGetItemId(page, moveDest[i]);
+ ItemIdData tmp;
+
+ tmp = *idSrc;
+ *idSrc = *idDest;
+ *idDest = tmp;
+ }
+
+ spgPageIndexMultiDelete(&state, page,
+ moveSrc, xldata->nMove,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber,
+ InvalidOffsetNumber);
+
+ for (i = 0; i < xldata->nChain; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, chainSrc[i]));
+ Assert(lt->tupstate == SPGIST_LIVE);
+ lt->nextOffset = chainDest[i];
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+static void
+spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
+ OffsetNumber *toDelete;
+ Buffer buffer;
+ Page page;
+
+ ptr += sizeof(spgxlogVacuumRoot);
+ toDelete = (OffsetNumber *) ptr;
+
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, SPGIST_HEAD_BLKNO, false);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ /* The tuple numbers are in order */
+ PageIndexMultiDelete(page, toDelete, xldata->nDelete);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+static void
+spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
+{
+ char *ptr = XLogRecGetData(record);
+ spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
+ OffsetNumber *itemToPlaceholder;
+ Buffer buffer;
+ Page page;
+
+ ptr += sizeof(spgxlogVacuumRedirect);
+ itemToPlaceholder = (OffsetNumber *) ptr;
+
+ if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ {
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
+ int i;
+
+ /* Convert redirect pointers to plain placeholders */
+ for (i = 0; i < xldata->nToPlaceholder; i++)
+ {
+ SpGistDeadTuple dt;
+
+ dt = (SpGistDeadTuple) PageGetItem(page,
+ PageGetItemId(page, itemToPlaceholder[i]));
+ Assert(dt->tupstate == SPGIST_REDIRECT);
+ dt->tupstate = SPGIST_PLACEHOLDER;
+ ItemPointerSetInvalid(&dt->pointer);
+ }
+
+ Assert(opaque->nRedirection >= xldata->nToPlaceholder);
+ opaque->nRedirection -= xldata->nToPlaceholder;
+ opaque->nPlaceholder += xldata->nToPlaceholder;
+
+ /* Remove placeholder tuples at end of page */
+ if (xldata->firstPlaceholder != InvalidOffsetNumber)
+ {
+ int max = PageGetMaxOffsetNumber(page);
+ OffsetNumber *toDelete;
+
+ toDelete = palloc(sizeof(OffsetNumber) * max);
+
+ for (i = xldata->firstPlaceholder; i <= max; i++)
+ toDelete[i - xldata->firstPlaceholder] = i;
+
+ i = max - xldata->firstPlaceholder + 1;
+ Assert(opaque->nPlaceholder >= i);
+ opaque->nPlaceholder -= i;
+
+ /* The array is sorted, so can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, toDelete, i);
+
+ pfree(toDelete);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+
+ UnlockReleaseBuffer(buffer);
+ }
+ }
+}
+
+void
+spg_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ MemoryContext oldCxt;
+
+ /*
+ * SP-GiST indexes do not require any conflict processing. NB: If we ever
+ * implement a similar optimization as we have in b-tree, and remove
+ * killed tuples outside VACUUM, we'll need to handle that here.
+ */
+ RestoreBkpBlocks(lsn, record, false);
+
+ oldCxt = MemoryContextSwitchTo(opCtx);
+ switch (info)
+ {
+ case XLOG_SPGIST_CREATE_INDEX:
+ spgRedoCreateIndex(lsn, record);
+ break;
+ case XLOG_SPGIST_ADD_LEAF:
+ spgRedoAddLeaf(lsn, record);
+ break;
+ case XLOG_SPGIST_MOVE_LEAFS:
+ spgRedoMoveLeafs(lsn, record);
+ break;
+ case XLOG_SPGIST_ADD_NODE:
+ spgRedoAddNode(lsn, record);
+ break;
+ case XLOG_SPGIST_SPLIT_TUPLE:
+ spgRedoSplitTuple(lsn, record);
+ break;
+ case XLOG_SPGIST_PICKSPLIT:
+ spgRedoPickSplit(lsn, record);
+ break;
+ case XLOG_SPGIST_VACUUM_LEAF:
+ spgRedoVacuumLeaf(lsn, record);
+ break;
+ case XLOG_SPGIST_VACUUM_ROOT:
+ spgRedoVacuumRoot(lsn, record);
+ break;
+ case XLOG_SPGIST_VACUUM_REDIRECT:
+ spgRedoVacuumRedirect(lsn, record);
+ break;
+ default:
+ elog(PANIC, "spg_redo: unknown op code %u", info);
+ }
+
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextReset(opCtx);
+}
+
+static void
+out_target(StringInfo buf, RelFileNode node)
+{
+ appendStringInfo(buf, "rel %u/%u/%u ",
+ node.spcNode, node.dbNode, node.relNode);
+}
+
+void
+spg_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_SPGIST_CREATE_INDEX:
+ appendStringInfo(buf, "create_index: rel %u/%u/%u",
+ ((RelFileNode *) rec)->spcNode,
+ ((RelFileNode *) rec)->dbNode,
+ ((RelFileNode *) rec)->relNode);
+ break;
+ case XLOG_SPGIST_ADD_LEAF:
+ out_target(buf, ((spgxlogAddLeaf *) rec)->node);
+ appendStringInfo(buf, "add leaf to page: %u",
+ ((spgxlogAddLeaf *) rec)->blknoLeaf);
+ break;
+ case XLOG_SPGIST_MOVE_LEAFS:
+ out_target(buf, ((spgxlogMoveLeafs *) rec)->node);
+ appendStringInfo(buf, "move %u leafs from page %u to page %u",
+ ((spgxlogMoveLeafs *) rec)->nMoves,
+ ((spgxlogMoveLeafs *) rec)->blknoSrc,
+ ((spgxlogMoveLeafs *) rec)->blknoDst);
+ break;
+ case XLOG_SPGIST_ADD_NODE:
+ out_target(buf, ((spgxlogAddNode *) rec)->node);
+ appendStringInfo(buf, "add node to %u:%u",
+ ((spgxlogAddNode *) rec)->blkno,
+ ((spgxlogAddNode *) rec)->offnum);
+ break;
+ case XLOG_SPGIST_SPLIT_TUPLE:
+ out_target(buf, ((spgxlogSplitTuple *) rec)->node);
+ appendStringInfo(buf, "split node %u:%u to %u:%u",
+ ((spgxlogSplitTuple *) rec)->blknoPrefix,
+ ((spgxlogSplitTuple *) rec)->offnumPrefix,
+ ((spgxlogSplitTuple *) rec)->blknoPostfix,
+ ((spgxlogSplitTuple *) rec)->offnumPostfix);
+ break;
+ case XLOG_SPGIST_PICKSPLIT:
+ out_target(buf, ((spgxlogPickSplit *) rec)->node);
+ appendStringInfo(buf, "split leaf page");
+ break;
+ case XLOG_SPGIST_VACUUM_LEAF:
+ out_target(buf, ((spgxlogVacuumLeaf *) rec)->node);
+ appendStringInfo(buf, "vacuum leaf tuples on page %u",
+ ((spgxlogVacuumLeaf *) rec)->blkno);
+ break;
+ case XLOG_SPGIST_VACUUM_ROOT:
+ out_target(buf, ((spgxlogVacuumRoot *) rec)->node);
+ appendStringInfo(buf, "vacuum leaf tuples on root page");
+ break;
+ case XLOG_SPGIST_VACUUM_REDIRECT:
+ out_target(buf, ((spgxlogVacuumRedirect *) rec)->node);
+ appendStringInfo(buf, "vacuum redirect tuples on page %u",
+ ((spgxlogVacuumRedirect *) rec)->blkno);
+ break;
+ default:
+ appendStringInfo(buf, "unknown spgist op code %u", info);
+ break;
+ }
+}
+
+void
+spg_xlog_startup(void)
+{
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "SP-GiST temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+}
+
+void
+spg_xlog_cleanup(void)
+{
+ MemoryContextDelete(opCtx);
+ opCtx = NULL;
+}