aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist')
-rw-r--r--src/backend/access/spgist/spgdoinsert.c243
-rw-r--r--src/backend/access/spgist/spginsert.c17
-rw-r--r--src/backend/access/spgist/spgvacuum.c72
-rw-r--r--src/backend/access/spgist/spgxlog.c335
4 files changed, 325 insertions, 342 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 21a071ab199..1a17cc467ed 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -16,8 +16,8 @@
#include "postgres.h"
#include "access/genam.h"
-#include "access/xloginsert.h"
#include "access/spgist_private.h"
+#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
@@ -202,25 +202,17 @@ static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
- XLogRecData rdata[4];
spgxlogAddLeaf xlrec;
- xlrec.node = index->rd_node;
- xlrec.blknoLeaf = current->blkno;
xlrec.newPage = isNew;
xlrec.storesNulls = isNulls;
/* these will be filled below as needed */
xlrec.offnumLeaf = InvalidOffsetNumber;
xlrec.offnumHeadLeaf = InvalidOffsetNumber;
- xlrec.blknoParent = InvalidBlockNumber;
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
- ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
- ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
- ACCEPT_RDATA_BUFFER(current->buffer, 2);
-
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
@@ -237,13 +229,10 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
/* Must update parent's downlink if any */
if (parent->buffer != InvalidBuffer)
{
- xlrec.blknoParent = parent->blkno;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
-
- ACCEPT_RDATA_BUFFER(parent->buffer, 3);
}
}
else
@@ -303,12 +292,20 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+ XLogRegisterData((char *) leafTuple, leafTuple->size);
+
+ XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
+ if (xlrec.offnumParent != InvalidOffsetNumber)
+ XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
PageSetLSN(current->page, recptr);
/* update parent only if we actually changed it */
- if (xlrec.blknoParent != InvalidBlockNumber)
+ if (xlrec.offnumParent != InvalidOffsetNumber)
{
PageSetLSN(parent->page, recptr);
}
@@ -399,7 +396,6 @@ moveLeafs(Relation index, SpGistState *state,
OffsetNumber *toDelete;
OffsetNumber *toInsert;
BlockNumber nblkno;
- XLogRecData rdata[7];
spgxlogMoveLeafs xlrec;
char *leafdata,
*leafptr;
@@ -455,20 +451,6 @@ moveLeafs(Relation index, SpGistState *state,
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
- /* prepare WAL info */
- xlrec.node = index->rd_node;
- STORE_STATE(state, xlrec.stateSrc);
-
- xlrec.blknoSrc = current->blkno;
- xlrec.blknoDst = nblkno;
- xlrec.nMoves = nDelete;
- xlrec.replaceDead = replaceDead;
- xlrec.storesNulls = isNulls;
-
- xlrec.blknoParent = parent->blkno;
- xlrec.offnumParent = parent->offnum;
- xlrec.nodeI = parent->node;
-
leafdata = leafptr = palloc(size);
START_CRIT_SECTION();
@@ -533,15 +515,29 @@ moveLeafs(Relation index, SpGistState *state,
{
XLogRecPtr recptr;
- ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogMoveLeafs, 0);
- ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * nDelete, 1);
- ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nInsert, 2);
- ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
- ACCEPT_RDATA_BUFFER(current->buffer, 4);
- ACCEPT_RDATA_BUFFER(nbuf, 5);
- ACCEPT_RDATA_BUFFER(parent->buffer, 6);
+ /* prepare WAL info */
+ STORE_STATE(state, xlrec.stateSrc);
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
+ xlrec.nMoves = nDelete;
+ xlrec.replaceDead = replaceDead;
+ xlrec.storesNulls = isNulls;
+
+ xlrec.offnumParent = parent->offnum;
+ xlrec.nodeI = parent->node;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
+ XLogRegisterData((char *) toDelete,
+ sizeof(OffsetNumber) * nDelete);
+ XLogRegisterData((char *) toInsert,
+ sizeof(OffsetNumber) * nInsert);
+ XLogRegisterData((char *) leafdata, leafptr - leafdata);
+
+ XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
+ XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
PageSetLSN(current->page, recptr);
PageSetLSN(npage, recptr);
@@ -701,8 +697,6 @@ doPickSplit(Relation index, SpGistState *state,
int currentFreeSpace;
int totalLeafSizes;
bool allTheSame;
- XLogRecData rdata[10];
- int nRdata;
spgxlogPickSplit xlrec;
char *leafdata,
*leafptr;
@@ -725,7 +719,6 @@ doPickSplit(Relation index, SpGistState *state,
newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
- xlrec.node = index->rd_node;
STORE_STATE(state, xlrec.stateSrc);
/*
@@ -971,10 +964,6 @@ doPickSplit(Relation index, SpGistState *state,
}
/*
- * Because a WAL record can't involve more than four buffers, we can only
- * afford to deal with two leaf pages in each picksplit action, ie the
- * current page and at most one other.
- *
* The new leaf tuples converted from the existing ones should require the
* same or less space, and therefore should all fit onto one page
* (although that's not necessarily the current page, since we can't
@@ -1108,17 +1097,13 @@ doPickSplit(Relation index, SpGistState *state,
}
/* Start preparing WAL record */
- xlrec.blknoSrc = current->blkno;
- xlrec.blknoDest = InvalidBlockNumber;
xlrec.nDelete = 0;
xlrec.initSrc = isNew;
xlrec.storesNulls = isNulls;
+ xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
leafdata = leafptr = (char *) palloc(totalLeafSizes);
- ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogPickSplit, 0);
- nRdata = 1;
-
/* Here we begin making the changes to the target pages */
START_CRIT_SECTION();
@@ -1150,12 +1135,6 @@ doPickSplit(Relation index, SpGistState *state,
else
{
xlrec.nDelete = nToDelete;
- ACCEPT_RDATA_DATA(toDelete,
- sizeof(OffsetNumber) * nToDelete,
- nRdata);
- nRdata++;
- ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
- nRdata++;
if (!state->isBuild)
{
@@ -1240,25 +1219,8 @@ doPickSplit(Relation index, SpGistState *state,
if (newLeafBuffer != InvalidBuffer)
{
MarkBufferDirty(newLeafBuffer);
- /* also save block number for WAL */
- xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
- if (!xlrec.initDest)
- {
- ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
- nRdata++;
- }
}
- xlrec.nInsert = nToInsert;
- ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nToInsert, nRdata);
- nRdata++;
- ACCEPT_RDATA_DATA(leafPageSelect, sizeof(uint8) * nToInsert, nRdata);
- nRdata++;
- ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, nRdata);
- nRdata++;
- ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
- nRdata++;
-
/* Remember current buffer, since we're about to change "current" */
saveCurrent = *current;
@@ -1276,7 +1238,6 @@ doPickSplit(Relation index, SpGistState *state,
current->blkno = parent->blkno;
current->buffer = parent->buffer;
current->page = parent->page;
- xlrec.blknoInner = current->blkno;
xlrec.offnumInner = current->offnum =
SpGistPageAddNewItem(state, current->page,
(Item) innerTuple, innerTuple->size,
@@ -1285,14 +1246,11 @@ doPickSplit(Relation index, SpGistState *state,
/*
* Update parent node link and mark parent page dirty
*/
- xlrec.blknoParent = parent->blkno;
+ xlrec.innerIsParent = true;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
- ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
- nRdata++;
-
/*
* Update redirection link (in old current buffer)
*/
@@ -1314,7 +1272,6 @@ doPickSplit(Relation index, SpGistState *state,
current->buffer = newInnerBuffer;
current->blkno = BufferGetBlockNumber(current->buffer);
current->page = BufferGetPage(current->buffer);
- xlrec.blknoInner = current->blkno;
xlrec.offnumInner = current->offnum =
SpGistPageAddNewItem(state, current->page,
(Item) innerTuple, innerTuple->size,
@@ -1326,16 +1283,11 @@ doPickSplit(Relation index, SpGistState *state,
/*
* Update parent node link and mark parent page dirty
*/
- xlrec.blknoParent = parent->blkno;
+ xlrec.innerIsParent = (parent->buffer == current->buffer);
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
- ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
- nRdata++;
- ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
- nRdata++;
-
/*
* Update redirection link (in old current buffer)
*/
@@ -1357,8 +1309,8 @@ doPickSplit(Relation index, SpGistState *state,
SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
+ xlrec.innerIsParent = false;
- xlrec.blknoInner = current->blkno;
xlrec.offnumInner = current->offnum =
PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
InvalidOffsetNumber, false, false);
@@ -1367,7 +1319,6 @@ doPickSplit(Relation index, SpGistState *state,
innerTuple->size);
/* No parent link to update, nor redirection to do */
- xlrec.blknoParent = InvalidBlockNumber;
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
@@ -1381,9 +1332,46 @@ doPickSplit(Relation index, SpGistState *state,
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
+ int flags;
+
+ XLogBeginInsert();
+
+ xlrec.nInsert = nToInsert;
+ XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
+
+ XLogRegisterData((char *) toDelete,
+ sizeof(OffsetNumber) * xlrec.nDelete);
+ XLogRegisterData((char *) toInsert,
+ sizeof(OffsetNumber) * xlrec.nInsert);
+ XLogRegisterData((char *) leafPageSelect,
+ sizeof(uint8) * xlrec.nInsert);
+ XLogRegisterData((char *) innerTuple, innerTuple->size);
+ XLogRegisterData(leafdata, leafptr - leafdata);
+
+ flags = REGBUF_STANDARD;
+ if (xlrec.initSrc)
+ flags |= REGBUF_WILL_INIT;
+ if (BufferIsValid(saveCurrent.buffer))
+ XLogRegisterBuffer(0, saveCurrent.buffer, flags);
+
+ if (BufferIsValid(newLeafBuffer))
+ {
+ flags = REGBUF_STANDARD;
+ if (xlrec.initDest)
+ flags |= REGBUF_WILL_INIT;
+ XLogRegisterBuffer(1, newLeafBuffer, flags);
+ }
+ XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD);
+ if (parent->buffer != InvalidBuffer)
+ {
+ if (parent->buffer != current->buffer)
+ XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
+ else
+ Assert(xlrec.innerIsParent);
+ }
/* Issue the WAL record */
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
/* Update page LSNs on all affected pages */
if (newLeafBuffer != InvalidBuffer)
@@ -1489,7 +1477,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
int nodeN, Datum nodeLabel)
{
SpGistInnerTuple newInnerTuple;
- XLogRecData rdata[5];
spgxlogAddNode xlrec;
/* Should not be applied to nulls */
@@ -1499,25 +1486,18 @@ spgAddNodeAction(Relation index, SpGistState *state,
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
/* Prepare WAL record */
- xlrec.node = index->rd_node;
STORE_STATE(state, xlrec.stateSrc);
- xlrec.blkno = current->blkno;
xlrec.offnum = current->offnum;
/* we don't fill these unless we need to change the parent downlink */
- xlrec.blknoParent = InvalidBlockNumber;
+ xlrec.parentBlk = -1;
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
/* we don't fill these unless tuple has to be moved */
- xlrec.blknoNew = InvalidBlockNumber;
xlrec.offnumNew = InvalidOffsetNumber;
xlrec.newPage = false;
- ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
- ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
- ACCEPT_RDATA_BUFFER(current->buffer, 2);
-
if (PageGetExactFreeSpace(current->page) >=
newInnerTuple->size - innerTuple->size)
{
@@ -1539,7 +1519,13 @@ spgAddNodeAction(Relation index, SpGistState *state,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+ XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
+
+ XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
PageSetLSN(current->page, recptr);
}
@@ -1565,7 +1551,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
saveCurrent = *current;
- xlrec.blknoParent = parent->blkno;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
@@ -1580,8 +1565,6 @@ spgAddNodeAction(Relation index, SpGistState *state,
current->blkno = BufferGetBlockNumber(current->buffer);
current->page = BufferGetPage(current->buffer);
- xlrec.blknoNew = current->blkno;
-
/*
* Let's just make real sure new current isn't same as old. Right now
* that's impossible, but if SpGistGetBuffer ever got smart enough to
@@ -1590,17 +1573,19 @@ spgAddNodeAction(Relation index, SpGistState *state,
* replay would be subtly wrong, so I think a mere assert isn't enough
* here.
*/
- if (xlrec.blknoNew == xlrec.blkno)
+ if (current->blkno == saveCurrent.blkno)
elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
/*
* New current and parent buffer will both be modified; but note that
* parent buffer could be same as either new or old current.
*/
- ACCEPT_RDATA_BUFFER(current->buffer, 3);
- if (parent->buffer != current->buffer &&
- parent->buffer != saveCurrent.buffer)
- ACCEPT_RDATA_BUFFER(parent->buffer, 4);
+ if (parent->buffer == saveCurrent.buffer)
+ xlrec.parentBlk = 0;
+ else if (parent->buffer == current->buffer)
+ xlrec.parentBlk = 1;
+ else
+ xlrec.parentBlk = 2;
START_CRIT_SECTION();
@@ -1647,7 +1632,20 @@ spgAddNodeAction(Relation index, SpGistState *state,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
+ XLogBeginInsert();
+
+ /* orig page */
+ XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
+ /* new page */
+ XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD);
+ /* parent page (if different from orig and new) */
+ if (xlrec.parentBlk == 2)
+ XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
+
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+ XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
/* we don't bother to check if any of these are redundant */
PageSetLSN(current->page, recptr);
@@ -1682,7 +1680,6 @@ spgSplitNodeAction(Relation index, SpGistState *state,
BlockNumber postfixBlkno;
OffsetNumber postfixOffset;
int i;
- XLogRecData rdata[5];
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
@@ -1725,14 +1722,8 @@ spgSplitNodeAction(Relation index, SpGistState *state,
postfixTuple->allTheSame = innerTuple->allTheSame;
/* prep data for WAL record */
- xlrec.node = index->rd_node;
xlrec.newPage = false;
- ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
- ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
- ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
- ACCEPT_RDATA_BUFFER(current->buffer, 3);
-
/*
* If we can't fit both tuples on the current page, get a new page for the
* postfix tuple. In particular, can't split to the root page.
@@ -1752,7 +1743,6 @@ spgSplitNodeAction(Relation index, SpGistState *state,
GBUF_INNER_PARITY(current->blkno + 1),
postfixTuple->size + sizeof(ItemIdData),
&xlrec.newPage);
- ACCEPT_RDATA_BUFFER(newBuffer, 4);
}
START_CRIT_SECTION();
@@ -1767,27 +1757,28 @@ spgSplitNodeAction(Relation index, SpGistState *state,
if (xlrec.offnumPrefix != current->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
prefixTuple->size);
- xlrec.blknoPrefix = current->blkno;
/*
* put postfix tuple into appropriate page
*/
if (newBuffer == InvalidBuffer)
{
- xlrec.blknoPostfix = postfixBlkno = current->blkno;
+ postfixBlkno = current->blkno;
xlrec.offnumPostfix = postfixOffset =
SpGistPageAddNewItem(state, current->page,
(Item) postfixTuple, postfixTuple->size,
NULL, false);
+ xlrec.postfixBlkSame = true;
}
else
{
- xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
+ postfixBlkno = BufferGetBlockNumber(newBuffer);
xlrec.offnumPostfix = postfixOffset =
SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
(Item) postfixTuple, postfixTuple->size,
NULL, false);
MarkBufferDirty(newBuffer);
+ xlrec.postfixBlkSame = false;
}
/*
@@ -1808,7 +1799,23 @@ spgSplitNodeAction(Relation index, SpGistState *state,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+ XLogRegisterData((char *) prefixTuple, prefixTuple->size);
+ XLogRegisterData((char *) postfixTuple, postfixTuple->size);
+
+ XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
+ if (newBuffer != InvalidBuffer)
+ {
+ int flags;
+
+ flags = REGBUF_STANDARD;
+ if (xlrec.newPage)
+ flags |= REGBUF_WILL_INIT;
+ XLogRegisterBuffer(1, newBuffer, flags);
+ }
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
PageSetLSN(current->page, recptr);
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index e1dfc8e3580..f168ac5c5cf 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -105,15 +105,18 @@ spgbuild(PG_FUNCTION_ARGS)
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
- XLogRecData rdata;
- /* WAL data is just the relfilenode */
- rdata.data = (char *) &(index->rd_node);
- rdata.len = sizeof(RelFileNode);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX, &rdata);
+ /*
+ * Replay will re-initialize the pages, so don't take full pages
+ * images. No other data to log.
+ */
+ XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, rootbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
+ XLogRegisterBuffer(2, nullbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX);
PageSetLSN(BufferGetPage(metabuffer), recptr);
PageSetLSN(BufferGetPage(rootbuffer), recptr);
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 2e05d22b749..c95b80b5c7c 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -127,7 +127,6 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
{
Page page = BufferGetPage(buffer);
spgxlogVacuumLeaf xlrec;
- XLogRecData rdata[8];
OffsetNumber toDead[MaxIndexTuplesPerPage];
OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
OffsetNumber moveSrc[MaxIndexTuplesPerPage];
@@ -323,20 +322,6 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
elog(ERROR, "inconsistent counts of deletable tuples");
- /* Prepare WAL record */
- xlrec.node = index->rd_node;
- xlrec.blkno = BufferGetBlockNumber(buffer);
- STORE_STATE(&bds->spgstate, xlrec.stateSrc);
-
- ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumLeaf, 0);
- ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1);
- ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2);
- ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3);
- ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4);
- ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5);
- ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6);
- ACCEPT_RDATA_BUFFER(buffer, 7);
-
/* Do the updates */
START_CRIT_SECTION();
@@ -389,7 +374,22 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata);
+ XLogBeginInsert();
+
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
+ XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
+ XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
+ XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
+ XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
+ XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
PageSetLSN(page, recptr);
}
@@ -407,12 +407,10 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
{
Page page = BufferGetPage(buffer);
spgxlogVacuumRoot xlrec;
- XLogRecData rdata[3];
OffsetNumber toDelete[MaxIndexTuplesPerPage];
OffsetNumber i,
max = PageGetMaxOffsetNumber(page);
- xlrec.blkno = BufferGetBlockNumber(buffer);
xlrec.nDelete = 0;
/* Scan page, identify tuples to delete, accumulate stats */
@@ -448,15 +446,6 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
if (xlrec.nDelete == 0)
return; /* nothing more to do */
- /* Prepare WAL record */
- xlrec.node = index->rd_node;
- STORE_STATE(&bds->spgstate, xlrec.stateSrc);
-
- ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumRoot, 0);
- /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
- ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1);
- ACCEPT_RDATA_BUFFER(buffer, 2);
-
/* Do the update */
START_CRIT_SECTION();
@@ -469,7 +458,19 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata);
+ XLogBeginInsert();
+
+ /* Prepare WAL record */
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ XLogRegisterData((char *) toDelete,
+ sizeof(OffsetNumber) * xlrec.nDelete);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
PageSetLSN(page, recptr);
}
@@ -499,10 +500,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
OffsetNumber itemnos[MaxIndexTuplesPerPage];
spgxlogVacuumRedirect xlrec;
- XLogRecData rdata[3];
- xlrec.node = index->rd_node;
- xlrec.blkno = BufferGetBlockNumber(buffer);
xlrec.nToPlaceholder = 0;
xlrec.newestRedirectXid = InvalidTransactionId;
@@ -585,11 +583,15 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
{
XLogRecPtr recptr;
- ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumRedirect, 0);
- ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1);
- ACCEPT_RDATA_BUFFER(buffer, 2);
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
+ XLogRegisterData((char *) itemToPlaceholder,
+ sizeof(OffsetNumber) * xlrec.nToPlaceholder);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
- recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata);
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
PageSetLSN(page, recptr);
}
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 920739436ac..ac6d4bd369a 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -71,33 +71,30 @@ addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
}
static void
-spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
+spgRedoCreateIndex(XLogReaderState *record)
{
- RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
- /* Backup blocks are not used in create_index records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
-
- buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
+ Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
page = (Page) BufferGetPage(buffer);
SpGistInitMetapage(page);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
- buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 1);
+ Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
SpGistInitBuffer(buffer, SPGIST_LEAF);
page = (Page) BufferGetPage(buffer);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
- buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 2);
+ Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
page = (Page) BufferGetPage(buffer);
PageSetLSN(page, lsn);
@@ -106,8 +103,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
+spgRedoAddLeaf(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
char *leafTuple;
@@ -128,15 +126,13 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
*/
if (xldata->newPage)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true);
+ buffer = XLogInitBufferForRedo(record, 0);
SpGistInitBuffer(buffer,
SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 0,
- xldata->node, xldata->blknoLeaf,
- &buffer);
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
@@ -164,7 +160,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
{
/* replacing a DEAD tuple */
PageIndexTupleDelete(page, xldata->offnumLeaf);
- if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size,
+ if (PageAddItem(page,
+ (Item) leafTuple, leafTupleHdr.size,
xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
leafTupleHdr.size);
@@ -177,13 +174,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
/* update parent downlink if necessary */
- if (xldata->blknoParent != InvalidBlockNumber)
+ if (xldata->offnumParent != InvalidOffsetNumber)
{
- if (XLogReadBufferForRedo(lsn, record, 1,
- xldata->node, xldata->blknoParent,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
SpGistInnerTuple tuple;
+ BlockNumber blknoLeaf;
+
+ XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
page = BufferGetPage(buffer);
@@ -191,7 +189,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(tuple, xldata->nodeI,
- xldata->blknoLeaf, xldata->offnumLeaf);
+ blknoLeaf, xldata->offnumLeaf);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
@@ -202,8 +200,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
+spgRedoMoveLeafs(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
SpGistState state;
@@ -213,6 +212,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
XLogRedoAction action;
+ BlockNumber blknoDst;
+
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
fillFakeState(&state, xldata->stateSrc);
@@ -235,15 +237,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
/* Insert tuples on the dest page (do first, so redirect is valid) */
if (xldata->newPage)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true);
+ buffer = XLogInitBufferForRedo(record, 1);
SpGistInitBuffer(buffer,
SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 1,
- xldata->node, xldata->blknoDst,
- &buffer);
+ action = XLogReadBufferForRedo(record, 1, &buffer);
+
if (action == BLK_NEEDS_REDO)
{
int i;
@@ -260,7 +261,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
* field.
*/
leafTuple = ptr;
- memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
+ memcpy(&leafTupleHdr, leafTuple,
+ sizeof(SpGistLeafTupleData));
addOrReplaceTuple(page, (Item) leafTuple,
leafTupleHdr.size, toInsert[i]);
@@ -274,14 +276,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
/* Delete tuples from the source page, inserting a redirection pointer */
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
+
spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
- xldata->blknoDst,
+ blknoDst,
toInsert[nInsert - 1]);
PageSetLSN(page, lsn);
@@ -291,8 +293,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
/* And update the parent downlink */
- if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
{
SpGistInnerTuple tuple;
@@ -302,7 +303,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(tuple, xldata->nodeI,
- xldata->blknoDst, toInsert[nInsert - 1]);
+ blknoDst, toInsert[nInsert - 1]);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
@@ -312,8 +313,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
+spgRedoAddNode(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
char *innerTuple;
@@ -321,7 +323,6 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
SpGistState state;
Buffer buffer;
Page page;
- int bbi;
XLogRedoAction action;
ptr += sizeof(spgxlogAddNode);
@@ -331,17 +332,18 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
fillFakeState(&state, xldata->stateSrc);
- if (xldata->blknoNew == InvalidBlockNumber)
+ if (!XLogRecHasBlockRef(record, 1))
{
/* update in place */
- Assert(xldata->blknoParent == InvalidBlockNumber);
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ Assert(xldata->parentBlk == -1);
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
+
PageIndexTupleDelete(page, xldata->offnum);
if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
- xldata->offnum, false, false) != xldata->offnum)
+ xldata->offnum,
+ false, false) != xldata->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
innerTupleHdr.size);
@@ -353,30 +355,30 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
}
else
{
+ BlockNumber blkno;
+ BlockNumber blknoNew;
+
+ XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
+
/*
* In normal operation we would have all three pages (source, dest,
* and parent) locked simultaneously; but in WAL replay it should be
* safe to update them one at a time, as long as we do it in the right
- * order.
- *
- * The logic here depends on the assumption that blkno != blknoNew,
- * else we can't tell which BKP bit goes with which page, and the LSN
- * checks could go wrong too.
+ * order. We must insert the new tuple before replacing the old tuple
+ * with the redirect tuple.
*/
- Assert(xldata->blkno != xldata->blknoNew);
/* Install new tuple first so redirect is valid */
if (xldata->newPage)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true);
/* AddNode is not used for nulls pages */
+ buffer = XLogInitBufferForRedo(record, 1);
SpGistInitBuffer(buffer, 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 1,
- xldata->node, xldata->blknoNew,
- &buffer);
+ action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
@@ -385,22 +387,26 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
innerTupleHdr.size, xldata->offnumNew);
/*
- * If parent is in this same page, don't advance LSN; doing so
- * would fool us into not applying the parent downlink update
- * below. We'll update the LSN when we fix the parent downlink.
+ * If parent is in this same page, update it now.
*/
- if (xldata->blknoParent != xldata->blknoNew)
+ if (xldata->parentBlk == 1)
{
- PageSetLSN(page, lsn);
+ SpGistInnerTuple parentTuple;
+
+ parentTuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+
+ spgUpdateNodeLink(parentTuple, xldata->nodeI,
+ blknoNew, xldata->offnumNew);
}
+ PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Delete old tuple, replacing it with redirect or placeholder tuple */
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
SpGistDeadTuple dt;
@@ -412,11 +418,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
InvalidOffsetNumber);
else
dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
- xldata->blknoNew,
+ blknoNew,
xldata->offnumNew);
PageIndexTupleDelete(page, xldata->offnum);
- if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum,
+ if (PageAddItem(page, (Item) dt, dt->size,
+ xldata->offnum,
false, false) != xldata->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
dt->size);
@@ -427,67 +434,55 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
SpGistPageGetOpaque(page)->nRedirection++;
/*
- * If parent is in this same page, don't advance LSN; doing so
- * would fool us into not applying the parent downlink update
- * below. We'll update the LSN when we fix the parent downlink.
+ * If parent is in this same page, update it now.
*/
- if (xldata->blknoParent != xldata->blkno)
+ if (xldata->parentBlk == 0)
{
- PageSetLSN(page, lsn);
+ SpGistInnerTuple parentTuple;
+
+ parentTuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, xldata->offnumParent));
+
+ spgUpdateNodeLink(parentTuple, xldata->nodeI,
+ blknoNew, xldata->offnumNew);
}
+ PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
- * Update parent downlink. Since parent could be in either of the
- * previous two buffers, it's a bit tricky to determine which BKP bit
- * applies.
+ * Update parent downlink (if we didn't do it as part of the source or
+ * destination page update already).
*/
- if (xldata->blknoParent == xldata->blkno)
- bbi = 0;
- else if (xldata->blknoParent == xldata->blknoNew)
- bbi = 1;
- else
- bbi = 2;
-
- if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ if (xldata->parentBlk == 2)
{
- if (bbi == 2) /* else we already did it */
- (void) RestoreBackupBlock(lsn, record, bbi, false, false);
- action = BLK_RESTORED;
- buffer = InvalidBuffer;
- }
- else
- {
- action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node,
- xldata->blknoParent, &buffer);
- Assert(action != BLK_RESTORED);
- }
- if (action == BLK_NEEDS_REDO)
- {
- SpGistInnerTuple innerTuple;
+ if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
+ {
+ SpGistInnerTuple parentTuple;
- page = BufferGetPage(buffer);
+ page = BufferGetPage(buffer);
- innerTuple = (SpGistInnerTuple) PageGetItem(page,
+ parentTuple = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
- spgUpdateNodeLink(innerTuple, xldata->nodeI,
- xldata->blknoNew, xldata->offnumNew);
+ spgUpdateNodeLink(parentTuple, xldata->nodeI,
+ blknoNew, xldata->offnumNew);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
}
}
static void
-spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
+spgRedoSplitTuple(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
char *prefixTuple;
@@ -496,6 +491,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
SpGistInnerTupleData postfixTupleHdr;
Buffer buffer;
Page page;
+ XLogRedoAction action;
ptr += sizeof(spgxlogSplitTuple);
prefixTuple = ptr;
@@ -513,22 +509,17 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
*/
/* insert postfix tuple first to avoid dangling link */
- if (xldata->blknoPostfix != xldata->blknoPrefix)
+ if (!xldata->postfixBlkSame)
{
- XLogRedoAction action;
-
if (xldata->newPage)
{
- buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true);
+ buffer = XLogInitBufferForRedo(record, 1);
/* SplitTuple is not used for nulls pages */
SpGistInitBuffer(buffer, 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 1,
- xldata->node, xldata->blknoPostfix,
- &buffer);
-
+ action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
@@ -544,18 +535,19 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
}
/* now handle the original page */
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
+
PageIndexTupleDelete(page, xldata->offnumPrefix);
if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
prefixTupleHdr.size);
- if (xldata->blknoPostfix == xldata->blknoPrefix)
- addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size,
+ if (xldata->postfixBlkSame)
+ addOrReplaceTuple(page, (Item) postfixTuple,
+ postfixTupleHdr.size,
xldata->offnumPostfix);
PageSetLSN(page, lsn);
@@ -566,8 +558,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
+spgRedoPickSplit(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
char *innerTuple;
@@ -578,14 +571,16 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
uint8 *leafPageSelect;
Buffer srcBuffer;
Buffer destBuffer;
+ Buffer innerBuffer;
Page srcPage;
Page destPage;
- Buffer innerBuffer;
Page page;
- int bbi;
int i;
+ BlockNumber blknoInner;
XLogRedoAction action;
+ XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
+
fillFakeState(&state, xldata->stateSrc);
ptr += SizeOfSpgxlogPickSplit;
@@ -603,13 +598,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
/* now ptr points to the list of leaf tuples */
- /*
- * It's a bit tricky to identify which pages have been handled as
- * full-page images, so we explicitly count each referenced buffer.
- */
- bbi = 0;
-
- if (SpGistBlockIsRoot(xldata->blknoSrc))
+ if (xldata->isRootSplit)
{
/* when splitting root, we touch it only in the guise of new inner */
srcBuffer = InvalidBuffer;
@@ -618,8 +607,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
else if (xldata->initSrc)
{
/* just re-init the source page */
- srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
- Assert(BufferIsValid(srcBuffer));
+ srcBuffer = XLogInitBufferForRedo(record, 0);
srcPage = (Page) BufferGetPage(srcBuffer);
SpGistInitBuffer(srcBuffer,
@@ -634,9 +622,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
* inserting leaf tuples and the new inner tuple, else the added
* redirect tuple will be a dangling link.)
*/
- if (XLogReadBufferForRedo(lsn, record, bbi,
- xldata->node, xldata->blknoSrc,
- &srcBuffer) == BLK_NEEDS_REDO)
+ srcPage = NULL;
+ if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
{
srcPage = BufferGetPage(srcBuffer);
@@ -650,7 +637,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
toDelete, xldata->nDelete,
SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
- xldata->blknoInner,
+ blknoInner,
xldata->offnumInner);
else
spgPageIndexMultiDelete(&state, srcPage,
@@ -662,15 +649,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
/* don't update LSN etc till we're done with it */
}
- else
- {
- srcPage = NULL; /* don't do any page updates */
- }
- bbi++;
}
/* try to access dest page if any */
- if (xldata->blknoDest == InvalidBlockNumber)
+ if (!XLogRecHasBlockRef(record, 1))
{
destBuffer = InvalidBuffer;
destPage = NULL;
@@ -678,8 +660,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
else if (xldata->initDest)
{
/* just re-init the dest page */
- destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
- Assert(BufferIsValid(destBuffer));
+ destBuffer = XLogInitBufferForRedo(record, 1);
destPage = (Page) BufferGetPage(destBuffer);
SpGistInitBuffer(destBuffer,
@@ -692,17 +673,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
* We could probably release the page lock immediately in the
* full-page-image case, but for safety let's hold it till later.
*/
- if (XLogReadBufferForRedo(lsn, record, bbi,
- xldata->node, xldata->blknoDest,
- &destBuffer) == BLK_NEEDS_REDO)
- {
+ if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
destPage = (Page) BufferGetPage(destBuffer);
- }
else
- {
destPage = NULL; /* don't do any page updates */
- }
- bbi++;
}
/* restore leaf tuples to src and/or dest page */
@@ -739,14 +713,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
/* restore new inner tuple */
if (xldata->initInner)
{
- innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true);
- SpGistInitBuffer(innerBuffer,
- (xldata->storesNulls ? SPGIST_NULLS : 0));
+ innerBuffer = XLogInitBufferForRedo(record, 2);
+ SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node,
- xldata->blknoInner, &innerBuffer);
+ action = XLogReadBufferForRedo(record, 2, &innerBuffer);
if (action == BLK_NEEDS_REDO)
{
@@ -756,14 +728,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
xldata->offnumInner);
/* if inner is also parent, update link while we're here */
- if (xldata->blknoInner == xldata->blknoParent)
+ if (xldata->innerIsParent)
{
SpGistInnerTuple parent;
parent = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parent, xldata->nodeI,
- xldata->blknoInner, xldata->offnumInner);
+ blknoInner, xldata->offnumInner);
}
PageSetLSN(page, lsn);
@@ -771,7 +743,6 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
if (BufferIsValid(innerBuffer))
UnlockReleaseBuffer(innerBuffer);
- bbi++;
/*
* Now we can release the leaf-page locks. It's okay to do this before
@@ -783,18 +754,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(destBuffer);
/* update parent downlink, unless we did it above */
- if (xldata->blknoParent == InvalidBlockNumber)
- {
- /* no parent cause we split the root */
- Assert(SpGistBlockIsRoot(xldata->blknoInner));
- }
- else if (xldata->blknoInner != xldata->blknoParent)
+ if (XLogRecHasBlockRef(record, 3))
{
Buffer parentBuffer;
- if (XLogReadBufferForRedo(lsn, record, bbi,
- xldata->node, xldata->blknoParent,
- &parentBuffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
{
SpGistInnerTuple parent;
@@ -803,7 +767,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
parent = (SpGistInnerTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parent, xldata->nodeI,
- xldata->blknoInner, xldata->offnumInner);
+ blknoInner, xldata->offnumInner);
PageSetLSN(page, lsn);
MarkBufferDirty(parentBuffer);
@@ -811,11 +775,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
if (BufferIsValid(parentBuffer))
UnlockReleaseBuffer(parentBuffer);
}
+ else
+ Assert(xldata->innerIsParent || xldata->isRootSplit);
}
static void
-spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
+spgRedoVacuumLeaf(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
OffsetNumber *toDead;
@@ -844,8 +811,7 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(OffsetNumber) * xldata->nChain;
chainDest = (OffsetNumber *) ptr;
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
@@ -897,8 +863,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
+spgRedoVacuumRoot(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
OffsetNumber *toDelete;
@@ -907,8 +874,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
toDelete = xldata->offsets;
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
@@ -923,8 +889,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
}
static void
-spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
+spgRedoVacuumRedirect(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
char *ptr = XLogRecGetData(record);
spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
OffsetNumber *itemToPlaceholder;
@@ -939,12 +906,16 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
if (InHotStandby)
{
if (TransactionIdIsValid(xldata->newestRedirectXid))
+ {
+ RelFileNode node;
+
+ XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
- xldata->node);
+ node);
+ }
}
- if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
Page page = BufferGetPage(buffer);
SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
@@ -995,40 +966,40 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
}
void
-spg_redo(XLogRecPtr lsn, XLogRecord *record)
+spg_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_SPGIST_CREATE_INDEX:
- spgRedoCreateIndex(lsn, record);
+ spgRedoCreateIndex(record);
break;
case XLOG_SPGIST_ADD_LEAF:
- spgRedoAddLeaf(lsn, record);
+ spgRedoAddLeaf(record);
break;
case XLOG_SPGIST_MOVE_LEAFS:
- spgRedoMoveLeafs(lsn, record);
+ spgRedoMoveLeafs(record);
break;
case XLOG_SPGIST_ADD_NODE:
- spgRedoAddNode(lsn, record);
+ spgRedoAddNode(record);
break;
case XLOG_SPGIST_SPLIT_TUPLE:
- spgRedoSplitTuple(lsn, record);
+ spgRedoSplitTuple(record);
break;
case XLOG_SPGIST_PICKSPLIT:
- spgRedoPickSplit(lsn, record);
+ spgRedoPickSplit(record);
break;
case XLOG_SPGIST_VACUUM_LEAF:
- spgRedoVacuumLeaf(lsn, record);
+ spgRedoVacuumLeaf(record);
break;
case XLOG_SPGIST_VACUUM_ROOT:
- spgRedoVacuumRoot(lsn, record);
+ spgRedoVacuumRoot(record);
break;
case XLOG_SPGIST_VACUUM_REDIRECT:
- spgRedoVacuumRedirect(lsn, record);
+ spgRedoVacuumRedirect(record);
break;
default:
elog(PANIC, "spg_redo: unknown op code %u", info);