diff options
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r-- | src/backend/access/gin/ginxlog.c | 184 |
1 files changed, 116 insertions, 68 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index c13e01a3c6a..9a978ce2bdb 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -78,7 +78,7 @@ static void ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) { ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record); - ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree)); + char *ptr; Buffer buffer; Page page; @@ -89,9 +89,14 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); - GinInitBuffer(buffer, GIN_DATA | GIN_LEAF); - memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem); - GinPageGetOpaque(page)->maxoff = data->nitem; + GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED); + + ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree); + + /* Place page data */ + memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size); + + GinDataLeafPageSetPostingListSize(page, data->size); PageSetLSN(page, lsn); @@ -100,11 +105,11 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno, - void *rdata) +ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata) { Page page = BufferGetPage(buffer); ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata; + OffsetNumber offset = data->offset; IndexTuple itup; if (rightblkno != InvalidBlockNumber) @@ -138,30 +143,43 @@ ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno, } static void -ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno, - void *rdata) +ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data) +{ + Pointer segment; + + /* Copy the new data to the right place */ + segment = ((Pointer) GinDataLeafPageGetPostingList(page)) + + data->unmodifiedsize; + memcpy(segment, data->newdata, data->length - data->unmodifiedsize); + GinDataLeafPageSetPostingListSize(page, data->length); + GinPageSetCompressed(page); +} + +static void +ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata) { Page page = BufferGetPage(buffer); - if (GinPageIsLeaf(page)) + if (isLeaf) { - ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata; - ItemPointerData *items = data->items; - OffsetNumber i; + ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata; - for (i = 0; i < data->nitem; i++) - GinDataPageAddItemPointer(page, &items[i], offset + i); + Assert(GinPageIsLeaf(page)); + + ginRedoRecompress(page, data); } else { - PostingItem *pitem = (PostingItem *) rdata; + ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata; PostingItem *oldpitem; + Assert(!GinPageIsLeaf(page)); + /* update link to right page after split */ - oldpitem = GinDataPageGetPostingItem(page, offset); + oldpitem = GinDataPageGetPostingItem(page, data->offset); PostingItemSetBlockNumber(oldpitem, rightblkno); - GinDataPageAddPostingItem(page, pitem, offset); + GinDataPageAddPostingItem(page, &data->newitem, data->offset); } } @@ -213,12 +231,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) if (data->flags & GIN_INSERT_ISDATA) { Assert(GinPageIsData(page)); - ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload); + ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload); } else { Assert(!GinPageIsData(page)); - ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload); + ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload); } PageSetLSN(page, lsn); @@ -253,38 +271,42 @@ ginRedoSplitEntry(Page lpage, Page rpage, void *rdata) static void ginRedoSplitData(Page lpage, Page rpage, void *rdata) { - ginxlogSplitData *data = (ginxlogSplitData *) rdata; bool isleaf = GinPageIsLeaf(lpage); - char *ptr = (char *) rdata + sizeof(ginxlogSplitData); - OffsetNumber i; - ItemPointer bound; if (isleaf) { - ItemPointer items = (ItemPointer) ptr; - for (i = 0; i < data->separator; i++) - GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber); - for (i = data->separator; i < data->nitem; i++) - GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber); + ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata; + Pointer lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf); + Pointer rptr = lptr + data->lsize; + + Assert(data->lsize > 0 && data->lsize <= GinDataLeafMaxContentSize); + Assert(data->rsize > 0 && data->rsize <= GinDataLeafMaxContentSize); + + memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize); + memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize); + + GinDataLeafPageSetPostingListSize(lpage, data->lsize); + GinDataLeafPageSetPostingListSize(rpage, data->rsize); + *GinDataPageGetRightBound(lpage) = data->lrightbound; + *GinDataPageGetRightBound(rpage) = data->rrightbound; } else { - PostingItem *items = (PostingItem *) ptr; + ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata; + PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal)); + OffsetNumber i; + OffsetNumber maxoff; + for (i = 0; i < data->separator; i++) GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber); for (i = data->separator; i < data->nitem; i++) GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber); - } - - /* set up right key */ - bound = GinDataPageGetRightBound(lpage); - if (isleaf) - *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff); - else - *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key; - bound = GinDataPageGetRightBound(rpage); - *bound = data->rightbound; + /* set up right key */ + maxoff = GinPageGetOpaque(lpage)->maxoff; + *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key; + *GinDataPageGetRightBound(rpage) = data->rightbound; + } } static void @@ -317,9 +339,10 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (isLeaf) flags |= GIN_LEAF; - if (isData) flags |= GIN_DATA; + if (isLeaf && isData) + flags |= GIN_COMPRESSED; lbuffer = XLogReadBuffer(data->node, data->lblkno, true); Assert(BufferIsValid(lbuffer)); @@ -352,7 +375,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true); Page rootPage = BufferGetPage(rootBuf); - GinInitBuffer(rootBuf, flags & ~GIN_LEAF); + GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED); if (isData) { @@ -383,13 +406,20 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(lbuffer); } +/* + * This is functionally the same as heap_xlog_newpage. + */ static void ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) { - ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record); + ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record); + char *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage); Buffer buffer; Page page; + Assert(xlrec->hole_offset < BLCKSZ); + Assert(xlrec->hole_length < BLCKSZ); + /* If we have a full-page image, restore it and we're done */ if (record->xl_info & XLR_BKP_BLOCK(0)) { @@ -397,41 +427,56 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) return; } - buffer = XLogReadBuffer(data->node, data->blkno, false); + buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) + if (xlrec->hole_length == 0) { - if (GinPageIsData(page)) - { - memcpy(GinDataPageGetData(page), - XLogRecGetData(record) + sizeof(ginxlogVacuumPage), - data->nitem * GinSizeOfDataPageItem(page)); - GinPageGetOpaque(page)->maxoff = data->nitem; - } - else - { - OffsetNumber i, - *tod; - IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); + memcpy((char *) page, blk, BLCKSZ); + } + else + { + memcpy((char *) page, blk, xlrec->hole_offset); + /* must zero-fill the hole */ + MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length); + memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length), + blk + xlrec->hole_offset, + BLCKSZ - (xlrec->hole_offset + xlrec->hole_length)); + } - tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); - for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) - tod[i - 1] = i; + PageSetLSN(page, lsn); - PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); +} - for (i = 0; i < data->nitem; i++) - { - if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); - } - } +static void +ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record) +{ + ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record); + Buffer buffer; + Page page; + + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); + return; + } + buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + Assert(GinPageIsLeaf(page)); + Assert(GinPageIsData(page)); + + if (lsn > PageGetLSN(page)) + { + ginRedoRecompress(page, &xlrec->data); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } @@ -747,6 +792,9 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_GIN_VACUUM_PAGE: ginRedoVacuumPage(lsn, record); break; + case XLOG_GIN_VACUUM_DATA_LEAF_PAGE: + ginRedoVacuumDataLeafPage(lsn, record); + break; case XLOG_GIN_DELETE_PAGE: ginRedoDeletePage(lsn, record); break; |