diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 15:22:38 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 15:22:38 +0000 |
commit | 1bfdd1a89321c390201ebe15fe47571f54f9c80a (patch) | |
tree | 5a329310ca2e125ac8fe3ada4200e4705f0b78e8 /src/backend/access/gist/gistxlog.c | |
parent | 3f6a094be1c03336072cc737038b048e77d78bc4 (diff) | |
download | postgresql-1bfdd1a89321c390201ebe15fe47571f54f9c80a.tar.gz postgresql-1bfdd1a89321c390201ebe15fe47571f54f9c80a.zip |
fix founded hole in recovery after crash, add vacuum_delay_point()
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 176 |
1 files changed, 49 insertions, 127 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b6c0696e1af..d2f2697affa 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -33,20 +33,13 @@ typedef struct { typedef struct { gistxlogPage *header; - OffsetNumber *offnum; - - /* to work with */ - Page page; - Buffer buffer; - bool is_ok; + IndexTuple *itup; } NewPage; typedef struct { gistxlogPageSplit *data; NewPage *page; - IndexTuple *itup; BlockNumber *path; - OffsetNumber *todelete; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ @@ -259,11 +252,10 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; - int i=0, addpath = 0; + int j,i=0, addpath = 0; decoded->data = (gistxlogPageSplit*)begin; decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage ); - decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup ); if ( decoded->data->pathlen ) { addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); @@ -271,27 +263,21 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { } else decoded->path = NULL; - if ( decoded->data->ntodelete ) { - decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath); - addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); - } else - decoded->todelete = NULL; - ptr=begin+sizeof( gistxlogPageSplit ) + addpath; - for(i=0;i<decoded->data->nitup;i++) { - Assert( ptr - begin < record->xl_len ); - decoded->itup[i] = (IndexTuple)ptr; - ptr += IndexTupleSize( decoded->itup[i] ); - } - for(i=0;i<decoded->data->npage;i++) { Assert( ptr - begin < record->xl_len ); decoded->page[i].header = (gistxlogPage*)ptr; ptr += sizeof(gistxlogPage); - Assert( ptr - begin < record->xl_len ); - decoded->page[i].offnum = (OffsetNumber*)ptr; - ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num ); + decoded->page[i].itup = (IndexTuple*) + palloc( sizeof(IndexTuple) * decoded->page[i].header->num ); + j=0; + while(j<decoded->page[i].header->num) { + Assert( ptr - begin < record->xl_len ); + decoded->page[i].itup[j] = (IndexTuple)ptr; + ptr += IndexTupleSize((IndexTuple)ptr); + j++; + } } } @@ -301,92 +287,53 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { Relation reln; Buffer buffer; Page page; - int i, len=0; - IndexTuple *itup, *institup; - GISTPageOpaque opaque; - bool release=true; + int i; + int flags=0; decodePageSplitRecord( &xlrec, record ); - reln = XLogOpenRelation(xlrec.data->node); if (!RelationIsValid(reln)) return; + + /* first of all wee need get F_LEAF flag from original page */ buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno); if (!BufferIsValid(buffer)) - elog(PANIC, "gistRedoEntryUpdateRecord: block unfound"); + elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", xlrec.data->origblkno); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page"); - - if (XLByteLE(lsn, PageGetLSN(page))) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } - - if ( xlrec.data->ntodelete ) { - int i; - for(i=0; i < xlrec.data->ntodelete ; i++) - PageIndexTupleDelete(page, xlrec.todelete[i]); - } - - itup = gistextractbuffer(buffer, &len); - itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup); - institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len ); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + if ( PageIsNew((PageHeader) page) ) + elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page blkno %u", + xlrec.data->origblkno); - /* read and fill all pages */ - for(i=0;i<xlrec.data->npage;i++) { - int j; - NewPage *newpage = xlrec.page + i; - - /* prepare itup vector per page */ - for(j=0;j<newpage->header->num;j++) - institup[j] = itup[ newpage->offnum[j] - 1 ]; - - if ( newpage->header->blkno == xlrec.data->origblkno ) { - /* IncrBufferRefCount(buffer); */ - newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData)); - newpage->buffer = buffer; - newpage->is_ok=false; - } else { - newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno); - if (!BufferIsValid(newpage->buffer)) - elog(PANIC, "gistRedoPageSplitRecord: lost page"); - newpage->page = (Page) BufferGetPage(newpage->buffer); - if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { - LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(newpage->buffer); - newpage->is_ok=true; - continue; /* good page */ - } else { - newpage->is_ok=false; - GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF); - } - } - gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber); - } + flags = ( GistPageIsLeaf(page) ) ? F_LEAF : 0; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + /* loop around all pages */ for(i=0;i<xlrec.data->npage;i++) { NewPage *newpage = xlrec.page + i; - - if ( newpage->is_ok ) + bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false; + + buffer = XLogReadBuffer( !isorigpage, reln, newpage->header->blkno); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", newpage->header->blkno); + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); continue; - - if ( newpage->header->blkno == xlrec.data->origblkno ) { - PageRestoreTempPage(newpage->page, page); - release = false; } - PageSetLSN(newpage->page, lsn); - PageSetTLI(newpage->page, ThisTimeLineID); - LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); - WriteBuffer(newpage->buffer); - } - - if ( release ) { + /* ok, clear buffer */ + GISTInitBuffer(buffer, flags); + + /* and fill it */ + gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); + WriteBuffer(buffer); } if ( ItemPointerIsValid( &(xlrec.data->key) ) ) { @@ -496,9 +443,8 @@ static void out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { strcat(buf, "page_split: "); out_target(buf, xlrec->node, xlrec->key); - sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages", - xlrec->origblkno, - xlrec->nitup, xlrec->npage); + sprintf(buf + strlen(buf), "; block number %u splits to %d pages", + xlrec->origblkno, xlrec->npage); } void @@ -716,14 +662,13 @@ gist_xlog_cleanup(void) { XLogRecData * formSplitRdata(RelFileNode node, BlockNumber blkno, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key, + ItemPointer key, BlockNumber *path, int pathlen, SplitedPageLayout *dist ) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; - int npage = 0, cur=1, i; + int npage = 0, cur=1; ptr=dist; while( ptr ) { @@ -731,13 +676,11 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, ptr=ptr->next; } - rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3)); + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + 2)); xlrec->node = node; xlrec->origblkno = blkno; xlrec->npage = (uint16)npage; - xlrec->nitup = (uint16)ituplen; - xlrec->ntodelete = (uint16)ntodelete; xlrec->pathlen = (uint16)pathlen; if ( key ) xlrec->key = *key; @@ -758,25 +701,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, cur++; } - if ( ntodelete ) { - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)todelete; - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete); - rdata[cur].next = NULL; - cur++; - } - - /* new tuples */ - for(i=0;i<ituplen;i++) { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)(itup[i]); - rdata[cur].len = IndexTupleSize(itup[i]); - rdata[cur].next = NULL; - rdata[cur-1].next = &(rdata[cur]); - cur++; - } - ptr=dist; while(ptr) { rdata[cur].buffer = InvalidBuffer; @@ -787,9 +711,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char*)(ptr->list); - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); - if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) - rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur].len = ptr->lenlist; rdata[cur-1].next = &(rdata[cur]); rdata[cur].next=NULL; cur++; |