diff options
Diffstat (limited to 'src/backend/access/gin')
-rw-r--r-- | src/backend/access/gin/ginbtree.c | 111 | ||||
-rw-r--r-- | src/backend/access/gin/gindatapage.c | 162 | ||||
-rw-r--r-- | src/backend/access/gin/ginentrypage.c | 64 | ||||
-rw-r--r-- | src/backend/access/gin/ginfast.c | 92 | ||||
-rw-r--r-- | src/backend/access/gin/gininsert.c | 10 | ||||
-rw-r--r-- | src/backend/access/gin/ginutil.c | 10 | ||||
-rw-r--r-- | src/backend/access/gin/ginvacuum.c | 114 | ||||
-rw-r--r-- | src/backend/access/gin/ginxlog.c | 341 |
8 files changed, 293 insertions, 611 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index 5365477000a..99f40a871f0 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -326,7 +326,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, Buffer childbuf, GinStatsData *buildStats) { Page page = BufferGetPage(stack->buffer); - XLogRecData *payloadrdata; GinPlaceToPageRC rc; uint16 xlflags = 0; Page childpage = NULL; @@ -351,12 +350,36 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, /* * Try to put the incoming tuple on the page. placeToPage will decide if * the page needs to be split. + * + * WAL-logging this operation is a bit funny: + * + * We're responsible for calling XLogBeginInsert() and XLogInsert(). + * XLogBeginInsert() must be called before placeToPage, because + * placeToPage can register some data to the WAL record. + * + * If placeToPage returns INSERTED, placeToPage has already called + * START_CRIT_SECTION(), and we're responsible for calling + * END_CRIT_SECTION. When it returns INSERTED, it is also responsible for + * registering any data required to replay the operation with + * XLogRegisterData(0, ...). It may only add data to block index 0; the + * main data of the WAL record is reserved for this function. + * + * If placeToPage returns SPLIT, we're wholly responsible for WAL logging. + * Splits happen infrequently, so we just make a full-page image of all + * the pages involved. */ + + if (RelationNeedsWAL(btree->index)) + XLogBeginInsert(); + rc = btree->placeToPage(btree, stack->buffer, stack, insertdata, updateblkno, - &payloadrdata, &newlpage, &newrpage); + &newlpage, &newrpage); if (rc == UNMODIFIED) + { + XLogResetInsertion(); return true; + } else if (rc == INSERTED) { /* placeToPage did START_CRIT_SECTION() */ @@ -372,17 +395,18 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, if (RelationNeedsWAL(btree->index)) { XLogRecPtr recptr; - XLogRecData rdata[3]; ginxlogInsert xlrec; BlockIdData childblknos[2]; - xlrec.node = btree->index->rd_node; - xlrec.blkno = BufferGetBlockNumber(stack->buffer); + /* + * placetopage already registered stack->buffer as block 0. + */ xlrec.flags = xlflags; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(ginxlogInsert); + if (childbuf != InvalidBuffer) + XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD); + + XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert)); /* * Log information about child if this was an insertion of a @@ -390,26 +414,13 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, */ if (childbuf != InvalidBuffer) { - rdata[0].next = &rdata[1]; - BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf)); BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink); - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) childblknos; - rdata[1].len = sizeof(BlockIdData) * 2; - rdata[1].next = &rdata[2]; - - rdata[2].buffer = childbuf; - rdata[2].buffer_std = false; - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].next = payloadrdata; + XLogRegisterData((char *) childblknos, + sizeof(BlockIdData) * 2); } - else - rdata[0].next = payloadrdata; - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT); PageSetLSN(page, recptr); if (childbuf != InvalidBuffer) PageSetLSN(childpage, recptr); @@ -421,10 +432,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, } else if (rc == SPLIT) { - /* Didn't fit, have to split */ + /* Didn't fit, had to split */ Buffer rbuffer; BlockNumber savedRightLink; - XLogRecData rdata[2]; ginxlogSplit data; Buffer lbuffer = InvalidBuffer; Page newrootpg = NULL; @@ -448,7 +458,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, */ data.node = btree->index->rd_node; - data.rblkno = BufferGetBlockNumber(rbuffer); data.flags = xlflags; if (childbuf != InvalidBuffer) { @@ -462,23 +471,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, else data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogSplit); - - if (childbuf != InvalidBuffer) - { - rdata[0].next = &rdata[1]; - - rdata[1].buffer = childbuf; - rdata[1].buffer_std = false; - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].next = payloadrdata; - } - else - rdata[0].next = payloadrdata; - if (stack->parent == NULL) { /* @@ -496,12 +488,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, buildStats->nEntryPages++; } - /* - * root never has a right-link, so we borrow the rrlink field to - * store the root block number. - */ - data.rrlink = BufferGetBlockNumber(stack->buffer); - data.lblkno = BufferGetBlockNumber(lbuffer); + data.rrlink = InvalidBlockNumber; data.flags |= GIN_SPLIT_ROOT; GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber; @@ -524,7 +511,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, { /* split non-root page */ data.rrlink = savedRightLink; - data.lblkno = BufferGetBlockNumber(stack->buffer); GinPageGetOpaque(newrpage)->rightlink = savedRightLink; GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT; @@ -572,7 +558,28 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, { XLogRecPtr recptr; - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); + /* + * We just take full page images of all the split pages. Splits + * are uncommon enough that it's not worth complicating the code + * to be more efficient. + */ + if (stack->parent == NULL) + { + XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); + XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); + } + else + { + XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); + } + if (BufferIsValid(childbuf)) + XLogRegisterBuffer(3, childbuf, 0); + + XLogRegisterData((char *) &data, sizeof(ginxlogSplit)); + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT); PageSetLSN(BufferGetPage(stack->buffer), recptr); PageSetLSN(BufferGetPage(rbuffer), recptr); if (stack->parent == NULL) diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c index 97cd706c08e..012225eaa35 100644 --- a/src/backend/access/gin/gindatapage.c +++ b/src/backend/access/gin/gindatapage.c @@ -98,20 +98,19 @@ static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems); static void dataSplitPageInternal(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, void *insertdata, BlockNumber updateblkno, - XLogRecData **prdata, Page *newlpage, Page *newrpage); + Page *newlpage, Page *newrpage); static disassembledLeaf *disassembleLeaf(Page page); static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining); static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems); -static XLogRecData *constructLeafRecompressWALData(Buffer buf, - disassembledLeaf *leaf); +static void registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf); static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf); static void dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf, ItemPointerData lbound, ItemPointerData rbound, - XLogRecData **prdata, Page lpage, Page rpage); + Page lpage, Page rpage); /* * Read TIDs from leaf data page to single uncompressed array. The TIDs are @@ -428,8 +427,7 @@ GinPageDeletePostingItem(Page page, OffsetNumber offset) */ static GinPlaceToPageRC dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack, - void *insertdata, XLogRecData **prdata, - Page *newlpage, Page *newrpage) + void *insertdata, Page *newlpage, Page *newrpage) { GinBtreeDataLeafInsertData *items = insertdata; ItemPointer newItems = &items->items[items->curitem]; @@ -602,9 +600,7 @@ dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack, */ MemoryContextSwitchTo(oldCxt); if (RelationNeedsWAL(btree->index)) - *prdata = constructLeafRecompressWALData(buf, leaf); - else - *prdata = NULL; + registerLeafRecompressWALData(buf, leaf); START_CRIT_SECTION(); dataPlaceToPageLeafRecompress(buf, leaf); @@ -685,7 +681,7 @@ dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack, *newrpage = MemoryContextAlloc(oldCxt, BLCKSZ); dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound, - prdata, *newlpage, *newrpage); + *newlpage, *newrpage); Assert(GinPageRightMost(page) || ginCompareItemPointers(GinDataPageGetRightBound(*newlpage), @@ -791,7 +787,6 @@ ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs) */ if (removedsomething) { - XLogRecData *payloadrdata = NULL; bool modified; /* @@ -818,7 +813,10 @@ ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs) } if (RelationNeedsWAL(indexrel)) - payloadrdata = constructLeafRecompressWALData(buffer, leaf); + { + XLogBeginInsert(); + registerLeafRecompressWALData(buffer, leaf); + } START_CRIT_SECTION(); dataPlaceToPageLeafRecompress(buffer, leaf); @@ -827,18 +825,8 @@ ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs) if (RelationNeedsWAL(indexrel)) { XLogRecPtr recptr; - XLogRecData rdata; - ginxlogVacuumDataLeafPage xlrec; - xlrec.node = indexrel->rd_node; - xlrec.blkno = BufferGetBlockNumber(buffer); - - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = offsetof(ginxlogVacuumDataLeafPage, data); - rdata.next = payloadrdata; - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE, &rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE); PageSetLSN(page, recptr); } @@ -850,13 +838,12 @@ ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs) * Construct a ginxlogRecompressDataLeaf record representing the changes * in *leaf. */ -static XLogRecData * -constructLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf) +static void +registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf) { int nmodified = 0; char *walbufbegin; char *walbufend; - XLogRecData *rdata; dlist_iter iter; int segno; ginxlogRecompressDataLeaf *recompress_xlog; @@ -871,12 +858,11 @@ constructLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf) nmodified++; } - walbufbegin = palloc( - sizeof(ginxlogRecompressDataLeaf) + - BLCKSZ + /* max size needed to hold the segment - * data */ - nmodified * 2 + /* (segno + action) per action */ - sizeof(XLogRecData)); + walbufbegin = + palloc(sizeof(ginxlogRecompressDataLeaf) + + BLCKSZ + /* max size needed to hold the segment data */ + nmodified * 2 /* (segno + action) per action */ + ); walbufend = walbufbegin; recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend; @@ -944,14 +930,10 @@ constructLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf) segno++; } - rdata = (XLogRecData *) MAXALIGN(walbufend); - rdata->buffer = buf; - rdata->buffer_std = TRUE; - rdata->data = walbufbegin; - rdata->len = walbufend - walbufbegin; - rdata->next = NULL; - return rdata; + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBufData(0, walbufbegin, walbufend - walbufbegin); + } /* @@ -1024,7 +1006,7 @@ dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf) static void dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf, ItemPointerData lbound, ItemPointerData rbound, - XLogRecData **prdata, Page lpage, Page rpage) + Page lpage, Page rpage) { char *ptr; int segsize; @@ -1034,10 +1016,6 @@ dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf, dlist_node *firstright; leafSegmentInfo *seginfo; - /* these must be static so they can be returned to caller */ - static ginxlogSplitDataLeaf split_xlog; - static XLogRecData rdata[3]; - /* Initialize temporary pages to hold the new left and right pages */ GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ); GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ); @@ -1092,29 +1070,6 @@ dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf, Assert(rsize == leaf->rsize); GinDataPageSetDataSize(rpage, rsize); *GinDataPageGetRightBound(rpage) = rbound; - - /* Create WAL record */ - split_xlog.lsize = lsize; - split_xlog.rsize = rsize; - split_xlog.lrightbound = lbound; - split_xlog.rrightbound = rbound; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &split_xlog; - rdata[0].len = sizeof(ginxlogSplitDataLeaf); - rdata[0].next = &rdata[1]; - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) GinDataLeafPageGetPostingList(lpage); - rdata[1].len = lsize; - rdata[1].next = &rdata[2]; - - rdata[2].buffer = InvalidBuffer; - rdata[2].data = (char *) GinDataLeafPageGetPostingList(rpage); - rdata[2].len = rsize; - rdata[2].next = NULL; - - *prdata = rdata; } /* @@ -1124,29 +1079,30 @@ dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf, * * In addition to inserting the given item, the downlink of the existing item * at 'off' is updated to point to 'updateblkno'. + * + * On INSERTED, registers the buffer as buffer ID 0, with data. + * On SPLIT, returns rdata that represents the split pages in *prdata. */ static GinPlaceToPageRC dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertdata, BlockNumber updateblkno, - XLogRecData **prdata, Page *newlpage, Page *newrpage) + Page *newlpage, Page *newrpage) { Page page = BufferGetPage(buf); OffsetNumber off = stack->off; PostingItem *pitem; - /* these must be static so they can be returned to caller */ - static XLogRecData rdata; + /* this must be static so it can be returned to caller */ static ginxlogInsertDataInternal data; /* split if we have to */ if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem)) { dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno, - prdata, newlpage, newrpage); + newlpage, newrpage); return SPLIT; } - *prdata = &rdata; Assert(GinPageIsData(page)); START_CRIT_SECTION(); @@ -1159,14 +1115,15 @@ dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack, pitem = (PostingItem *) insertdata; GinDataPageAddPostingItem(page, pitem, off); - data.offset = off; - data.newitem = *pitem; + if (RelationNeedsWAL(btree->index)) + { + data.offset = off; + data.newitem = *pitem; - rdata.buffer = buf; - rdata.buffer_std = TRUE; - rdata.data = (char *) &data; - rdata.len = sizeof(ginxlogInsertDataInternal); - rdata.next = NULL; + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBufData(0, (char *) &data, + sizeof(ginxlogInsertDataInternal)); + } return INSERTED; } @@ -1178,7 +1135,6 @@ dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack, static GinPlaceToPageRC dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertdata, BlockNumber updateblkno, - XLogRecData **prdata, Page *newlpage, Page *newrpage) { Page page = BufferGetPage(buf); @@ -1187,11 +1143,11 @@ dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, if (GinPageIsLeaf(page)) return dataPlaceToPageLeaf(btree, buf, stack, insertdata, - prdata, newlpage, newrpage); + newlpage, newrpage); else return dataPlaceToPageInternal(btree, buf, stack, insertdata, updateblkno, - prdata, newlpage, newrpage); + newlpage, newrpage); } /* @@ -1202,7 +1158,7 @@ static void dataSplitPageInternal(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, void *insertdata, BlockNumber updateblkno, - XLogRecData **prdata, Page *newlpage, Page *newrpage) + Page *newlpage, Page *newrpage) { Page oldpage = BufferGetPage(origbuf); OffsetNumber off = stack->off; @@ -1215,19 +1171,13 @@ dataSplitPageInternal(GinBtree btree, Buffer origbuf, Page lpage; Page rpage; OffsetNumber separator; - - /* these must be static so they can be returned to caller */ - static ginxlogSplitDataInternal data; - static XLogRecData rdata[4]; - static PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1]; + PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1]; lpage = PageGetTempPage(oldpage); rpage = PageGetTempPage(oldpage); GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize); GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize); - *prdata = rdata; - /* * First construct a new list of PostingItems, which includes all the old * items, and the new item. @@ -1277,20 +1227,6 @@ dataSplitPageInternal(GinBtree btree, Buffer origbuf, /* set up right bound for right page */ *GinDataPageGetRightBound(rpage) = oldbound; - data.separator = separator; - data.nitem = nitems; - data.rightbound = oldbound; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogSplitDataInternal); - rdata[0].next = &rdata[1]; - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) allitems; - rdata[1].len = nitems * sizeof(PostingItem); - rdata[1].next = NULL; - *newlpage = lpage; *newrpage = rpage; } @@ -1797,24 +1733,18 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems, if (RelationNeedsWAL(index)) { XLogRecPtr recptr; - XLogRecData rdata[2]; ginxlogCreatePostingTree data; - data.node = index->rd_node; - data.blkno = blkno; data.size = rootsize; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogCreatePostingTree); - rdata[0].next = &rdata[1]; + XLogBeginInsert(); + XLogRegisterData((char *) &data, sizeof(ginxlogCreatePostingTree)); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) GinDataLeafPageGetPostingList(page); - rdata[1].len = rootsize; - rdata[1].next = NULL; + XLogRegisterData((char *) GinDataLeafPageGetPostingList(page), + rootsize); + XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE); PageSetLSN(page, recptr); } diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c index 84dc1e228c1..2dae7b95499 100644 --- a/src/backend/access/gin/ginentrypage.c +++ b/src/backend/access/gin/ginentrypage.c @@ -22,7 +22,7 @@ static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, void *insertPayload, - BlockNumber updateblkno, XLogRecData **prdata, + BlockNumber updateblkno, Page *newlpage, Page *newrpage); /* @@ -515,33 +515,33 @@ entryPreparePage(GinBtree btree, Page page, OffsetNumber off, * On insertion to an internal node, in addition to inserting the given item, * the downlink of the existing item at 'off' is updated to point to * 'updateblkno'. + * + * On INSERTED, registers the buffer as buffer ID 0, with data. + * On SPLIT, returns rdata that represents the split pages in *prdata. */ static GinPlaceToPageRC entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload, BlockNumber updateblkno, - XLogRecData **prdata, Page *newlpage, Page *newrpage) + Page *newlpage, Page *newrpage) { GinBtreeEntryInsertData *insertData = insertPayload; Page page = BufferGetPage(buf); OffsetNumber off = stack->off; OffsetNumber placed; - int cnt = 0; - /* these must be static so they can be returned to caller */ - static XLogRecData rdata[3]; + /* this must be static so it can be returned to caller. */ static ginxlogInsertEntry data; /* quick exit if it doesn't fit */ if (!entryIsEnoughSpace(btree, buf, off, insertData)) { entrySplitPage(btree, buf, stack, insertPayload, updateblkno, - prdata, newlpage, newrpage); + newlpage, newrpage); return SPLIT; } START_CRIT_SECTION(); - *prdata = rdata; entryPreparePage(btree, page, off, insertData, updateblkno); placed = PageAddItem(page, @@ -552,21 +552,17 @@ entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(btree->index)); - data.isDelete = insertData->isDelete; - data.offset = off; - - rdata[cnt].buffer = buf; - rdata[cnt].buffer_std = true; - rdata[cnt].data = (char *) &data; - rdata[cnt].len = offsetof(ginxlogInsertEntry, tuple); - rdata[cnt].next = &rdata[cnt + 1]; - cnt++; - - rdata[cnt].buffer = buf; - rdata[cnt].buffer_std = true; - rdata[cnt].data = (char *) insertData->entry; - rdata[cnt].len = IndexTupleSize(insertData->entry); - rdata[cnt].next = NULL; + if (RelationNeedsWAL(btree->index)) + { + data.isDelete = insertData->isDelete; + data.offset = off; + + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBufData(0, (char *) &data, + offsetof(ginxlogInsertEntry, tuple)); + XLogRegisterBufData(0, (char *) insertData->entry, + IndexTupleSize(insertData->entry)); + } return INSERTED; } @@ -581,7 +577,7 @@ static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, void *insertPayload, - BlockNumber updateblkno, XLogRecData **prdata, + BlockNumber updateblkno, Page *newlpage, Page *newrpage) { GinBtreeEntryInsertData *insertData = insertPayload; @@ -590,7 +586,6 @@ entrySplitPage(GinBtree btree, Buffer origbuf, maxoff, separator = InvalidOffsetNumber; Size totalsize = 0; - Size tupstoresize; Size lsize = 0, size; char *ptr; @@ -599,13 +594,8 @@ entrySplitPage(GinBtree btree, Buffer origbuf, Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf)); Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf)); Size pageSize = PageGetPageSize(lpage); + char tupstore[2 * BLCKSZ]; - /* these must be static so they can be returned to caller */ - static XLogRecData rdata[2]; - static ginxlogSplitEntry data; - static char tupstore[2 * BLCKSZ]; - - *prdata = rdata; entryPreparePage(btree, lpage, off, insertData, updateblkno); /* @@ -638,7 +628,6 @@ entrySplitPage(GinBtree btree, Buffer origbuf, ptr += size; totalsize += size + sizeof(ItemIdData); } - tupstoresize = ptr - tupstore; /* * Initialize the left and right pages, and copy all the tuples back to @@ -673,19 +662,6 @@ entrySplitPage(GinBtree btree, Buffer origbuf, ptr += MAXALIGN(IndexTupleSize(itup)); } - data.separator = separator; - data.nitem = maxoff; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogSplitEntry); - rdata[0].next = &rdata[1]; - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = tupstore; - rdata[1].len = tupstoresize; - rdata[1].next = NULL; - *newlpage = lpage; *newrpage = rpage; } diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c index 25746995b5e..fd81d675570 100644 --- a/src/backend/access/gin/ginfast.c +++ b/src/backend/access/gin/ginfast.c @@ -108,26 +108,19 @@ writeListPage(Relation index, Buffer buffer, if (RelationNeedsWAL(index)) { - XLogRecData rdata[2]; ginxlogInsertListPage data; XLogRecPtr recptr; - data.node = index->rd_node; - data.blkno = BufferGetBlockNumber(buffer); data.rightlink = rightlink; data.ntuples = ntuples; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogInsertListPage); - rdata[0].next = rdata + 1; + XLogBeginInsert(); + XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage)); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = workspace; - rdata[1].len = size; - rdata[1].next = NULL; + XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); + XLogRegisterBufData(0, workspace, size); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE); PageSetLSN(page, recptr); } @@ -224,26 +217,23 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) Buffer metabuffer; Page metapage; GinMetaPageData *metadata = NULL; - XLogRecData rdata[2]; Buffer buffer = InvalidBuffer; Page page = NULL; ginxlogUpdateMeta data; bool separateList = false; bool needCleanup = false; int cleanupSize; + bool needWal; if (collector->ntuples == 0) return; + needWal = RelationNeedsWAL(index); + data.node = index->rd_node; data.ntuples = 0; data.newRightlink = data.prevTail = InvalidBlockNumber; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogUpdateMeta); - rdata[0].next = NULL; - metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); metapage = BufferGetPage(metabuffer); @@ -283,6 +273,9 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) memset(&sublist, 0, sizeof(GinMetaPageData)); makeSublist(index, collector->tuples, collector->ntuples, &sublist); + if (needWal) + XLogBeginInsert(); + /* * metapage was unlocked, see above */ @@ -315,14 +308,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) LockBuffer(buffer, GIN_EXCLUSIVE); page = BufferGetPage(buffer); - rdata[0].next = rdata + 1; - - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].next = NULL; - Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber); START_CRIT_SECTION(); @@ -336,6 +321,9 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) metadata->nPendingPages += sublist.nPendingPages; metadata->nPendingHeapTuples += sublist.nPendingHeapTuples; + + if (needWal) + XLogRegisterBuffer(1, buffer, REGBUF_STANDARD); } } else @@ -348,6 +336,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) int i, tupsize; char *ptr; + char *collectordata; buffer = ReadBuffer(index, metadata->tail); LockBuffer(buffer, GIN_EXCLUSIVE); @@ -356,16 +345,13 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); - rdata[0].next = rdata + 1; - - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - ptr = rdata[1].data = (char *) palloc(collector->sumsize); - rdata[1].len = collector->sumsize; - rdata[1].next = NULL; + collectordata = ptr = (char *) palloc(collector->sumsize); data.ntuples = collector->ntuples; + if (needWal) + XLogBeginInsert(); + START_CRIT_SECTION(); /* @@ -390,7 +376,12 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) off++; } - Assert((ptr - rdata[1].data) <= collector->sumsize); + Assert((ptr - collectordata) <= collector->sumsize); + if (needWal) + { + XLogRegisterBuffer(1, buffer, REGBUF_STANDARD); + XLogRegisterBufData(1, collectordata, collector->sumsize); + } metadata->tailFreeSize = PageGetExactFreeSpace(page); @@ -402,13 +393,16 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) */ MarkBufferDirty(metabuffer); - if (RelationNeedsWAL(index)) + if (needWal) { XLogRecPtr recptr; memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata); + XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); + XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta)); + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE); PageSetLSN(metapage, recptr); if (buffer != InvalidBuffer) @@ -526,20 +520,11 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, int i; int64 nDeletedHeapTuples = 0; ginxlogDeleteListPages data; - XLogRecData rdata[1]; Buffer buffers[GIN_NDELETE_AT_ONCE]; - data.node = index->rd_node; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &data; - rdata[0].len = sizeof(ginxlogDeleteListPages); - rdata[0].next = NULL; - data.ndeleted = 0; while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead) { - data.toDelete[data.ndeleted] = blknoToDelete; buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete); LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE); page = BufferGetPage(buffers[data.ndeleted]); @@ -562,6 +547,13 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, if (stats) stats->pages_deleted += data.ndeleted; + /* + * This operation touches an unusually large number of pages, so + * prepare the XLogInsert machinery for that before entering the + * critical section. + */ + XLogEnsureRecordSpace(data.ndeleted, 0); + START_CRIT_SECTION(); metadata->head = blknoToDelete; @@ -592,9 +584,17 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, { XLogRecPtr recptr; + XLogBeginInsert(); + XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); + for (i = 0; i < data.ndeleted; i++) + XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT); + memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata); + XLogRegisterData((char *) &data, + sizeof(ginxlogDeleteListPages)); + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE); PageSetLSN(metapage, recptr); for (i = 0; i < data.ndeleted; i++) diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 370884ed17f..c1ad0fd8c4d 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -347,15 +347,13 @@ ginbuild(PG_FUNCTION_ARGS) if (RelationNeedsWAL(index)) { XLogRecPtr recptr; - XLogRecData rdata; Page page; - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &(index->rd_node); - rdata.len = sizeof(RelFileNode); - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX, &rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX); page = BufferGetPage(RootBuffer); PageSetLSN(page, recptr); diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index d0458cfd0cf..f593a7224f2 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -605,19 +605,17 @@ ginUpdateStats(Relation index, const GinStatsData *stats) { XLogRecPtr recptr; ginxlogUpdateMeta data; - XLogRecData rdata; data.node = index->rd_node; data.ntuples = 0; data.newRightlink = data.prevTail = InvalidBlockNumber; memcpy(&data.metadata, metadata, sizeof(GinMetaPageData)); - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &data; - rdata.len = sizeof(ginxlogUpdateMeta); - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta)); + XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, &rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE); PageSetLSN(metapage, recptr); } diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c index 3a61321a835..6f32600ed79 100644 --- a/src/backend/access/gin/ginvacuum.c +++ b/src/backend/access/gin/ginvacuum.c @@ -89,10 +89,6 @@ xlogVacuumPage(Relation index, Buffer buffer) { Page page = BufferGetPage(buffer); XLogRecPtr recptr; - XLogRecData rdata[3]; - ginxlogVacuumPage xlrec; - uint16 lower; - uint16 upper; /* This is only used for entry tree leaf pages. */ Assert(!GinPageIsData(page)); @@ -101,57 +97,14 @@ xlogVacuumPage(Relation index, Buffer buffer) if (!RelationNeedsWAL(index)) return; - xlrec.node = index->rd_node; - xlrec.blkno = BufferGetBlockNumber(buffer); - - /* Assume we can omit data between pd_lower and pd_upper */ - lower = ((PageHeader) page)->pd_lower; - upper = ((PageHeader) page)->pd_upper; - - Assert(lower < BLCKSZ); - Assert(upper < BLCKSZ); - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - xlrec.hole_offset = lower; - xlrec.hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - xlrec.hole_offset = 0; - xlrec.hole_length = 0; - } - - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(ginxlogVacuumPage); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &rdata[1]; - - if (xlrec.hole_length == 0) - { - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - } - else - { - /* must skip the hole */ - rdata[1].data = (char *) page; - rdata[1].len = xlrec.hole_offset; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &rdata[2]; - - rdata[2].data = (char *) page + (xlrec.hole_offset + xlrec.hole_length); - rdata[2].len = BLCKSZ - (xlrec.hole_offset + xlrec.hole_length); - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - } + /* + * Always create a full image, we don't track the changes on the page at + * any more fine-grained level. This could obviously be improved... + */ + XLogBeginInsert(); + XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE); PageSetLSN(page, recptr); } @@ -292,48 +245,27 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn if (RelationNeedsWAL(gvs->index)) { XLogRecPtr recptr; - XLogRecData rdata[4]; ginxlogDeletePage data; - data.node = gvs->index->rd_node; - data.blkno = deleteBlkno; - data.parentBlkno = parentBlkno; + /* + * We can't pass REGBUF_STANDARD for the deleted page, because we + * didn't set pd_lower on pre-9.4 versions. The page might've been + * binary-upgraded from an older version, and hence not have pd_lower + * set correctly. Ditto for the left page, but removing the item from + * the parent updated its pd_lower, so we know that's OK at this + * point. + */ + XLogBeginInsert(); + XLogRegisterBuffer(0, dBuffer, 0); + XLogRegisterBuffer(1, pBuffer, REGBUF_STANDARD); + XLogRegisterBuffer(2, lBuffer, 0); + data.parentOffset = myoff; - data.leftBlkno = leftBlkno; data.rightLink = GinPageGetOpaque(page)->rightlink; - /* - * We can't pass buffer_std = TRUE, because we didn't set pd_lower on - * pre-9.4 versions. The page might've been binary-upgraded from an - * older version, and hence not have pd_lower set correctly. Ditto for - * the left page, but removing the item from the parent updated its - * pd_lower, so we know that's OK at this point. - */ - rdata[0].buffer = dBuffer; - rdata[0].buffer_std = FALSE; - rdata[0].data = NULL; - rdata[0].len = 0; - rdata[0].next = rdata + 1; - - rdata[1].buffer = pBuffer; - rdata[1].buffer_std = TRUE; - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].next = rdata + 2; - - rdata[2].buffer = lBuffer; - rdata[2].buffer_std = FALSE; - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].next = rdata + 3; - - rdata[3].buffer = InvalidBuffer; - rdata[3].buffer_std = FALSE; - rdata[3].len = sizeof(ginxlogDeletePage); - rdata[3].data = (char *) &data; - rdata[3].next = NULL; - - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata); + XLogRegisterData((char *) &data, sizeof(ginxlogDeletePage)); + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE); PageSetLSN(page, recptr); PageSetLSN(parentPage, recptr); PageSetLSN(BufferGetPage(lBuffer), recptr); diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index d0553bb8f72..6c0042bd795 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -20,18 +20,15 @@ static MemoryContext opCtx; /* working memory for operations */ static void -ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record, - int block_index, - RelFileNode node, BlockNumber blkno) +ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; - if (XLogReadBufferForRedo(lsn, record, block_index, node, blkno, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); - GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT; PageSetLSN(page, lsn); @@ -42,18 +39,15 @@ ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record, } static void -ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) +ginRedoCreateIndex(XLogReaderState *record) { - RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer RootBuffer, MetaBuffer; Page page; - /* Backup blocks are not used in create_index records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true); - Assert(BufferIsValid(MetaBuffer)); + MetaBuffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(MetaBuffer) == GIN_METAPAGE_BLKNO); page = (Page) BufferGetPage(MetaBuffer); GinInitMetabuffer(MetaBuffer); @@ -61,8 +55,8 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); MarkBufferDirty(MetaBuffer); - RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true); - Assert(BufferIsValid(RootBuffer)); + RootBuffer = XLogInitBufferForRedo(record, 1); + Assert(BufferGetBlockNumber(RootBuffer) == GIN_ROOT_BLKNO); page = (Page) BufferGetPage(RootBuffer); GinInitBuffer(RootBuffer, GIN_LEAF); @@ -75,18 +69,15 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) +ginRedoCreatePTree(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record); char *ptr; Buffer buffer; Page page; - /* Backup blocks are not used in create_ptree records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - buffer = XLogReadBuffer(data->node, data->blkno, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED); @@ -328,35 +319,40 @@ ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdat } static void -ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) +ginRedoInsert(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); Buffer buffer; - char *payload; +#ifdef NOT_USED BlockNumber leftChildBlkno = InvalidBlockNumber; +#endif BlockNumber rightChildBlkno = InvalidBlockNumber; bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0; - payload = XLogRecGetData(record) + sizeof(ginxlogInsert); - /* * First clear incomplete-split flag on child page if this finishes a * split. */ if (!isLeaf) { + char *payload = XLogRecGetData(record) + sizeof(ginxlogInsert); + +#ifdef NOT_USED leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload); +#endif payload += sizeof(BlockIdData); rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload); payload += sizeof(BlockIdData); - ginRedoClearIncompleteSplit(lsn, record, 0, data->node, leftChildBlkno); + ginRedoClearIncompleteSplit(record, 1); } - if (XLogReadBufferForRedo(lsn, record, isLeaf ? 0 : 1, data->node, - data->blkno, &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); + Size len; + char *payload = XLogRecGetBlockData(record, 0, &len); /* How to insert the payload is tree-type specific */ if (data->flags & GIN_INSERT_ISDATA) @@ -378,161 +374,33 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoSplitEntry(Page lpage, Page rpage, void *rdata) -{ - ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata; - IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry)); - OffsetNumber i; - - for (i = 0; i < data->separator; i++) - { - if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to gin index page"); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); - } - - for (i = data->separator; i < data->nitem; i++) - { - if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to gin index page"); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); - } -} - -static void -ginRedoSplitData(Page lpage, Page rpage, void *rdata) -{ - bool isleaf = GinPageIsLeaf(lpage); - - if (isleaf) - { - ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata; - Pointer lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf); - Pointer rptr = lptr + data->lsize; - - Assert(data->lsize > 0 && data->lsize <= GinDataPageMaxDataSize); - Assert(data->rsize > 0 && data->rsize <= GinDataPageMaxDataSize); - - memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize); - memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize); - - GinDataPageSetDataSize(lpage, data->lsize); - GinDataPageSetDataSize(rpage, data->rsize); - *GinDataPageGetRightBound(lpage) = data->lrightbound; - *GinDataPageGetRightBound(rpage) = data->rrightbound; - } - else - { - ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata; - PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal)); - OffsetNumber i; - OffsetNumber maxoff; - - for (i = 0; i < data->separator; i++) - GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber); - for (i = data->separator; i < data->nitem; i++) - GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber); - - /* set up right key */ - maxoff = GinPageGetOpaque(lpage)->maxoff; - *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key; - *GinDataPageGetRightBound(rpage) = data->rightbound; - } -} - -static void -ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) +ginRedoSplit(XLogReaderState *record) { ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record); Buffer lbuffer, - rbuffer; - Page lpage, - rpage; - uint32 flags; - uint32 lflags, - rflags; - char *payload; + rbuffer, + rootbuf; bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0; - bool isData = (data->flags & GIN_INSERT_ISDATA) != 0; bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0; - payload = XLogRecGetData(record) + sizeof(ginxlogSplit); - /* * First clear incomplete-split flag on child page if this finishes a * split */ if (!isLeaf) - ginRedoClearIncompleteSplit(lsn, record, 0, data->node, data->leftChildBlkno); - - flags = 0; - if (isLeaf) - flags |= GIN_LEAF; - if (isData) - flags |= GIN_DATA; - if (isLeaf && isData) - flags |= GIN_COMPRESSED; - - lflags = rflags = flags; - if (!isRoot) - lflags |= GIN_INCOMPLETE_SPLIT; - - lbuffer = XLogReadBuffer(data->node, data->lblkno, true); - Assert(BufferIsValid(lbuffer)); - lpage = (Page) BufferGetPage(lbuffer); - GinInitBuffer(lbuffer, lflags); - - rbuffer = XLogReadBuffer(data->node, data->rblkno, true); - Assert(BufferIsValid(rbuffer)); - rpage = (Page) BufferGetPage(rbuffer); - GinInitBuffer(rbuffer, rflags); - - GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer); - GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink; - - /* Do the tree-type specific portion to restore the page contents */ - if (isData) - ginRedoSplitData(lpage, rpage, payload); - else - ginRedoSplitEntry(lpage, rpage, payload); + ginRedoClearIncompleteSplit(record, 3); - PageSetLSN(rpage, lsn); - MarkBufferDirty(rbuffer); + if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED) + elog(ERROR, "GIN split record did not contain a full-page image of left page"); - PageSetLSN(lpage, lsn); - MarkBufferDirty(lbuffer); + if (XLogReadBufferForRedo(record, 1, &rbuffer) != BLK_RESTORED) + elog(ERROR, "GIN split record did not contain a full-page image of right page"); if (isRoot) { - BlockNumber rootBlkno = data->rrlink; - Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true); - Page rootPage = BufferGetPage(rootBuf); - - GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED); - - if (isData) - { - Assert(rootBlkno != GIN_ROOT_BLKNO); - ginDataFillRoot(NULL, BufferGetPage(rootBuf), - BufferGetBlockNumber(lbuffer), - BufferGetPage(lbuffer), - BufferGetBlockNumber(rbuffer), - BufferGetPage(rbuffer)); - } - else - { - Assert(rootBlkno == GIN_ROOT_BLKNO); - ginEntryFillRoot(NULL, BufferGetPage(rootBuf), - BufferGetBlockNumber(lbuffer), - BufferGetPage(lbuffer), - BufferGetBlockNumber(rbuffer), - BufferGetPage(rbuffer)); - } - - PageSetLSN(rootPage, lsn); - - MarkBufferDirty(rootBuf); - UnlockReleaseBuffer(rootBuf); + if (XLogReadBufferForRedo(record, 2, &rootbuf) != BLK_RESTORED) + elog(ERROR, "GIN split record did not contain a full-page image of root page"); + UnlockReleaseBuffer(rootbuf); } UnlockReleaseBuffer(rbuffer); @@ -544,54 +412,30 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) * a XLOG_FPI record. */ static void -ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) +ginRedoVacuumPage(XLogReaderState *record) { - ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record); - char *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage); Buffer buffer; - Page page; - - Assert(xlrec->hole_offset < BLCKSZ); - Assert(xlrec->hole_length < BLCKSZ); - - /* Backup blocks are not used, we'll re-initialize the page always. */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - - if (xlrec->hole_length == 0) + if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED) { - memcpy((char *) page, blk, BLCKSZ); + elog(ERROR, "replay of gin entry tree page vacuum did not restore the page"); } - else - { - memcpy((char *) page, blk, xlrec->hole_offset); - /* must zero-fill the hole */ - MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length); - memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length), - blk + xlrec->hole_offset, - BLCKSZ - (xlrec->hole_offset + xlrec->hole_length)); - } - - PageSetLSN(page, lsn); - - MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } static void -ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record) +ginRedoVacuumDataLeafPage(XLogReaderState *record) { - ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); + Size len; + ginxlogVacuumDataLeafPage *xlrec; + + xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetBlockData(record, 0, &len); Assert(GinPageIsLeaf(page)); Assert(GinPageIsData(page)); @@ -605,30 +449,27 @@ ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) +ginRedoDeletePage(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record); Buffer dbuffer; Buffer pbuffer; Buffer lbuffer; Page page; - if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->blkno, &dbuffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &dbuffer) == BLK_NEEDS_REDO) { page = BufferGetPage(dbuffer); - Assert(GinPageIsData(page)); GinPageGetOpaque(page)->flags = GIN_DELETED; PageSetLSN(page, lsn); MarkBufferDirty(dbuffer); } - if (XLogReadBufferForRedo(lsn, record, 1, data->node, data->parentBlkno, - &pbuffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &pbuffer) == BLK_NEEDS_REDO) { page = BufferGetPage(pbuffer); - Assert(GinPageIsData(page)); Assert(!GinPageIsLeaf(page)); GinPageDeletePostingItem(page, data->parentOffset); @@ -636,11 +477,9 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) MarkBufferDirty(pbuffer); } - if (XLogReadBufferForRedo(lsn, record, 2, data->node, data->leftBlkno, - &lbuffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &lbuffer) == BLK_NEEDS_REDO) { page = BufferGetPage(lbuffer); - Assert(GinPageIsData(page)); GinPageGetOpaque(page)->rightlink = data->rightLink; PageSetLSN(page, lsn); @@ -656,8 +495,9 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) +ginRedoUpdateMetapage(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record); Buffer metabuffer; Page metapage; @@ -668,9 +508,8 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) * image, so restore the metapage unconditionally without looking at the * LSN, to avoid torn page hazards. */ - metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); - if (!BufferIsValid(metabuffer)) - return; /* assume index was deleted, nothing to do */ + metabuffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO); metapage = BufferGetPage(metabuffer); memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData)); @@ -682,17 +521,18 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) /* * insert into tail page */ - if (XLogReadBufferForRedo(lsn, record, 0, data->node, - data->metadata.tail, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); OffsetNumber off; int i; Size tupsize; + char *payload; IndexTuple tuples; + Size totaltupsize; - tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta)); + payload = XLogRecGetBlockData(record, 1, &totaltupsize); + tuples = (IndexTuple) payload; if (PageIsEmpty(page)) off = FirstOffsetNumber; @@ -711,6 +551,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) off++; } + Assert(payload + totaltupsize == (char *) tuples); /* * Increase counter of heap tuples @@ -728,8 +569,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) /* * New tail */ - if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->prevTail, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); @@ -746,8 +586,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) +ginRedoInsertListPage(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record); Buffer buffer; Page page; @@ -755,15 +596,12 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) off = FirstOffsetNumber; int i, tupsize; - IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage)); - - /* - * Backup blocks are not used, we always re-initialize the page. - */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + char *payload; + IndexTuple tuples; + Size totaltupsize; - buffer = XLogReadBuffer(data->node, data->blkno, true); - Assert(BufferIsValid(buffer)); + /* We always re-initialize the page. */ + buffer = XLogInitBufferForRedo(record, 0); page = BufferGetPage(buffer); GinInitBuffer(buffer, GIN_LIST); @@ -779,6 +617,9 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) GinPageGetOpaque(page)->maxoff = 0; } + payload = XLogRecGetBlockData(record, 0, &totaltupsize); + + tuples = (IndexTuple) payload; for (i = 0; i < data->ntuples; i++) { tupsize = IndexTupleSize(tuples); @@ -791,6 +632,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) tuples = (IndexTuple) (((char *) tuples) + tupsize); off++; } + Assert((char *) tuples == payload + totaltupsize); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -799,21 +641,20 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) +ginRedoDeleteListPages(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record); Buffer metabuffer; Page metapage; int i; - /* Backup blocks are not used in delete_listpage records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); - if (!BufferIsValid(metabuffer)) - return; /* assume index was deleted, nothing to do */ + metabuffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO); metapage = BufferGetPage(metabuffer); + GinInitPage(metapage, GIN_META, BufferGetPageSize(metabuffer)); + memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData)); PageSetLSN(metapage, lsn); MarkBufferDirty(metabuffer); @@ -838,7 +679,7 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - buffer = XLogReadBuffer(data->node, data->toDelete[i], true); + buffer = XLogInitBufferForRedo(record, i + 1); page = BufferGetPage(buffer); GinInitBuffer(buffer, GIN_DELETED); @@ -851,9 +692,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) } void -gin_redo(XLogRecPtr lsn, XLogRecord *record) +gin_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; MemoryContext oldCtx; /* @@ -866,34 +707,34 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) switch (info) { case XLOG_GIN_CREATE_INDEX: - ginRedoCreateIndex(lsn, record); + ginRedoCreateIndex(record); break; case XLOG_GIN_CREATE_PTREE: - ginRedoCreatePTree(lsn, record); + ginRedoCreatePTree(record); break; case XLOG_GIN_INSERT: - ginRedoInsert(lsn, record); + ginRedoInsert(record); break; case XLOG_GIN_SPLIT: - ginRedoSplit(lsn, record); + ginRedoSplit(record); break; case XLOG_GIN_VACUUM_PAGE: - ginRedoVacuumPage(lsn, record); + ginRedoVacuumPage(record); break; case XLOG_GIN_VACUUM_DATA_LEAF_PAGE: - ginRedoVacuumDataLeafPage(lsn, record); + ginRedoVacuumDataLeafPage(record); break; case XLOG_GIN_DELETE_PAGE: - ginRedoDeletePage(lsn, record); + ginRedoDeletePage(record); break; case XLOG_GIN_UPDATE_META_PAGE: - ginRedoUpdateMetapage(lsn, record); + ginRedoUpdateMetapage(record); break; case XLOG_GIN_INSERT_LISTPAGE: - ginRedoInsertListPage(lsn, record); + ginRedoInsertListPage(record); break; case XLOG_GIN_DELETE_LISTPAGE: - ginRedoDeleteListPages(lsn, record); + ginRedoDeleteListPages(record); break; default: elog(PANIC, "gin_redo: unknown op code %u", info); |