aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginbtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginbtree.c')
-rw-r--r--src/backend/access/gin/ginbtree.c111
1 files changed, 59 insertions, 52 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index 5365477000a..99f40a871f0 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -326,7 +326,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
- XLogRecData *payloadrdata;
GinPlaceToPageRC rc;
uint16 xlflags = 0;
Page childpage = NULL;
@@ -351,12 +350,36 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
/*
* Try to put the incoming tuple on the page. placeToPage will decide if
* the page needs to be split.
+ *
+ * WAL-logging this operation is a bit funny:
+ *
+ * We're responsible for calling XLogBeginInsert() and XLogInsert().
+ * XLogBeginInsert() must be called before placeToPage, because
+ * placeToPage can register some data to the WAL record.
+ *
+ * If placeToPage returns INSERTED, placeToPage has already called
+ * START_CRIT_SECTION(), and we're responsible for calling
+ * END_CRIT_SECTION. When it returns INSERTED, it is also responsible for
+ * registering any data required to replay the operation with
+ * XLogRegisterData(0, ...). It may only add data to block index 0; the
+ * main data of the WAL record is reserved for this function.
+ *
+ * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
+ * Splits happen infrequently, so we just make a full-page image of all
+ * the pages involved.
*/
+
+ if (RelationNeedsWAL(btree->index))
+ XLogBeginInsert();
+
rc = btree->placeToPage(btree, stack->buffer, stack,
insertdata, updateblkno,
- &payloadrdata, &newlpage, &newrpage);
+ &newlpage, &newrpage);
if (rc == UNMODIFIED)
+ {
+ XLogResetInsertion();
return true;
+ }
else if (rc == INSERTED)
{
/* placeToPage did START_CRIT_SECTION() */
@@ -372,17 +395,18 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
- XLogRecData rdata[3];
ginxlogInsert xlrec;
BlockIdData childblknos[2];
- xlrec.node = btree->index->rd_node;
- xlrec.blkno = BufferGetBlockNumber(stack->buffer);
+ /*
+ * placetopage already registered stack->buffer as block 0.
+ */
xlrec.flags = xlflags;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(ginxlogInsert);
+ if (childbuf != InvalidBuffer)
+ XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
+
+ XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
/*
* Log information about child if this was an insertion of a
@@ -390,26 +414,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*/
if (childbuf != InvalidBuffer)
{
- rdata[0].next = &rdata[1];
-
BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) childblknos;
- rdata[1].len = sizeof(BlockIdData) * 2;
- rdata[1].next = &rdata[2];
-
- rdata[2].buffer = childbuf;
- rdata[2].buffer_std = false;
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].next = payloadrdata;
+ XLogRegisterData((char *) childblknos,
+ sizeof(BlockIdData) * 2);
}
- else
- rdata[0].next = payloadrdata;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
PageSetLSN(page, recptr);
if (childbuf != InvalidBuffer)
PageSetLSN(childpage, recptr);
@@ -421,10 +432,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
}
else if (rc == SPLIT)
{
- /* Didn't fit, have to split */
+ /* Didn't fit, had to split */
Buffer rbuffer;
BlockNumber savedRightLink;
- XLogRecData rdata[2];
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
@@ -448,7 +458,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*/
data.node = btree->index->rd_node;
- data.rblkno = BufferGetBlockNumber(rbuffer);
data.flags = xlflags;
if (childbuf != InvalidBuffer)
{
@@ -462,23 +471,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
else
data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
-
- if (childbuf != InvalidBuffer)
- {
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = childbuf;
- rdata[1].buffer_std = false;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = payloadrdata;
- }
- else
- rdata[0].next = payloadrdata;
-
if (stack->parent == NULL)
{
/*
@@ -496,12 +488,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
buildStats->nEntryPages++;
}
- /*
- * root never has a right-link, so we borrow the rrlink field to
- * store the root block number.
- */
- data.rrlink = BufferGetBlockNumber(stack->buffer);
- data.lblkno = BufferGetBlockNumber(lbuffer);
+ data.rrlink = InvalidBlockNumber;
data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
@@ -524,7 +511,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
{
/* split non-root page */
data.rrlink = savedRightLink;
- data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
@@ -572,7 +558,28 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ /*
+ * We just take full page images of all the split pages. Splits
+ * are uncommon enough that it's not worth complicating the code
+ * to be more efficient.
+ */
+ if (stack->parent == NULL)
+ {
+ XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ }
+ else
+ {
+ XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+ }
+ if (BufferIsValid(childbuf))
+ XLogRegisterBuffer(3, childbuf, 0);
+
+ XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
PageSetLSN(BufferGetPage(stack->buffer), recptr);
PageSetLSN(BufferGetPage(rbuffer), recptr);
if (stack->parent == NULL)