aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r--src/backend/access/gin/ginxlog.c184
1 files changed, 116 insertions, 68 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index c13e01a3c6a..9a978ce2bdb 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -78,7 +78,7 @@ static void
ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
- ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
+ char *ptr;
Buffer buffer;
Page page;
@@ -89,9 +89,14 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
- GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
- memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
- GinPageGetOpaque(page)->maxoff = data->nitem;
+ GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
+
+ ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
+
+ /* Place page data */
+ memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
+
+ GinDataLeafPageSetPostingListSize(page, data->size);
PageSetLSN(page, lsn);
@@ -100,11 +105,11 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
}
static void
-ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
- void *rdata)
+ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
{
Page page = BufferGetPage(buffer);
ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
+ OffsetNumber offset = data->offset;
IndexTuple itup;
if (rightblkno != InvalidBlockNumber)
@@ -138,30 +143,43 @@ ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
}
static void
-ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
- void *rdata)
+ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
+{
+ Pointer segment;
+
+ /* Copy the new data to the right place */
+ segment = ((Pointer) GinDataLeafPageGetPostingList(page))
+ + data->unmodifiedsize;
+ memcpy(segment, data->newdata, data->length - data->unmodifiedsize);
+ GinDataLeafPageSetPostingListSize(page, data->length);
+ GinPageSetCompressed(page);
+}
+
+static void
+ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
{
Page page = BufferGetPage(buffer);
- if (GinPageIsLeaf(page))
+ if (isLeaf)
{
- ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
- ItemPointerData *items = data->items;
- OffsetNumber i;
+ ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
- for (i = 0; i < data->nitem; i++)
- GinDataPageAddItemPointer(page, &items[i], offset + i);
+ Assert(GinPageIsLeaf(page));
+
+ ginRedoRecompress(page, data);
}
else
{
- PostingItem *pitem = (PostingItem *) rdata;
+ ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
PostingItem *oldpitem;
+ Assert(!GinPageIsLeaf(page));
+
/* update link to right page after split */
- oldpitem = GinDataPageGetPostingItem(page, offset);
+ oldpitem = GinDataPageGetPostingItem(page, data->offset);
PostingItemSetBlockNumber(oldpitem, rightblkno);
- GinDataPageAddPostingItem(page, pitem, offset);
+ GinDataPageAddPostingItem(page, &data->newitem, data->offset);
}
}
@@ -213,12 +231,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (data->flags & GIN_INSERT_ISDATA)
{
Assert(GinPageIsData(page));
- ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
+ ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
}
else
{
Assert(!GinPageIsData(page));
- ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
+ ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
}
PageSetLSN(page, lsn);
@@ -253,38 +271,42 @@ ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
static void
ginRedoSplitData(Page lpage, Page rpage, void *rdata)
{
- ginxlogSplitData *data = (ginxlogSplitData *) rdata;
bool isleaf = GinPageIsLeaf(lpage);
- char *ptr = (char *) rdata + sizeof(ginxlogSplitData);
- OffsetNumber i;
- ItemPointer bound;
if (isleaf)
{
- ItemPointer items = (ItemPointer) ptr;
- for (i = 0; i < data->separator; i++)
- GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
- for (i = data->separator; i < data->nitem; i++)
- GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
+ ginxlogSplitDataLeaf *data = (ginxlogSplitDataLeaf *) rdata;
+ Pointer lptr = (Pointer) rdata + sizeof(ginxlogSplitDataLeaf);
+ Pointer rptr = lptr + data->lsize;
+
+ Assert(data->lsize > 0 && data->lsize <= GinDataLeafMaxContentSize);
+ Assert(data->rsize > 0 && data->rsize <= GinDataLeafMaxContentSize);
+
+ memcpy(GinDataLeafPageGetPostingList(lpage), lptr, data->lsize);
+ memcpy(GinDataLeafPageGetPostingList(rpage), rptr, data->rsize);
+
+ GinDataLeafPageSetPostingListSize(lpage, data->lsize);
+ GinDataLeafPageSetPostingListSize(rpage, data->rsize);
+ *GinDataPageGetRightBound(lpage) = data->lrightbound;
+ *GinDataPageGetRightBound(rpage) = data->rrightbound;
}
else
{
- PostingItem *items = (PostingItem *) ptr;
+ ginxlogSplitDataInternal *data = (ginxlogSplitDataInternal *) rdata;
+ PostingItem *items = (PostingItem *) ((char *) rdata + sizeof(ginxlogSplitDataInternal));
+ OffsetNumber i;
+ OffsetNumber maxoff;
+
for (i = 0; i < data->separator; i++)
GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
for (i = data->separator; i < data->nitem; i++)
GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
- }
-
- /* set up right key */
- bound = GinDataPageGetRightBound(lpage);
- if (isleaf)
- *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
- else
- *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
- bound = GinDataPageGetRightBound(rpage);
- *bound = data->rightbound;
+ /* set up right key */
+ maxoff = GinPageGetOpaque(lpage)->maxoff;
+ *GinDataPageGetRightBound(lpage) = GinDataPageGetPostingItem(lpage, maxoff)->key;
+ *GinDataPageGetRightBound(rpage) = data->rightbound;
+ }
}
static void
@@ -317,9 +339,10 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (isLeaf)
flags |= GIN_LEAF;
-
if (isData)
flags |= GIN_DATA;
+ if (isLeaf && isData)
+ flags |= GIN_COMPRESSED;
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
@@ -352,7 +375,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
- GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
+ GinInitBuffer(rootBuf, flags & ~GIN_LEAF & ~GIN_COMPRESSED);
if (isData)
{
@@ -383,13 +406,20 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(lbuffer);
}
+/*
+ * This is functionally the same as heap_xlog_newpage.
+ */
static void
ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
{
- ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
+ ginxlogVacuumPage *xlrec = (ginxlogVacuumPage *) XLogRecGetData(record);
+ char *blk = ((char *) xlrec) + sizeof(ginxlogVacuumPage);
Buffer buffer;
Page page;
+ Assert(xlrec->hole_offset < BLCKSZ);
+ Assert(xlrec->hole_length < BLCKSZ);
+
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
@@ -397,41 +427,56 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
return;
}
- buffer = XLogReadBuffer(data->node, data->blkno, false);
+ buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
+ if (xlrec->hole_length == 0)
{
- if (GinPageIsData(page))
- {
- memcpy(GinDataPageGetData(page),
- XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
- data->nitem * GinSizeOfDataPageItem(page));
- GinPageGetOpaque(page)->maxoff = data->nitem;
- }
- else
- {
- OffsetNumber i,
- *tod;
- IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+ memcpy((char *) page, blk, BLCKSZ);
+ }
+ else
+ {
+ memcpy((char *) page, blk, xlrec->hole_offset);
+ /* must zero-fill the hole */
+ MemSet((char *) page + xlrec->hole_offset, 0, xlrec->hole_length);
+ memcpy((char *) page + (xlrec->hole_offset + xlrec->hole_length),
+ blk + xlrec->hole_offset,
+ BLCKSZ - (xlrec->hole_offset + xlrec->hole_length));
+ }
- tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
- for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
- tod[i - 1] = i;
+ PageSetLSN(page, lsn);
- PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
- for (i = 0; i < data->nitem; i++)
- {
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
- }
+static void
+ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
+{
+ ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ return;
+ }
+ buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
+ if (!BufferIsValid(buffer))
+ return;
+ page = (Page) BufferGetPage(buffer);
+
+ Assert(GinPageIsLeaf(page));
+ Assert(GinPageIsData(page));
+
+ if (lsn > PageGetLSN(page))
+ {
+ ginRedoRecompress(page, &xlrec->data);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
@@ -747,6 +792,9 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_GIN_VACUUM_PAGE:
ginRedoVacuumPage(lsn, record);
break;
+ case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
+ ginRedoVacuumDataLeafPage(lsn, record);
+ break;
case XLOG_GIN_DELETE_PAGE:
ginRedoDeletePage(lsn, record);
break;