aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist
diff options
context:
space:
mode:
authorTeodor Sigaev <teodor@sigaev.ru>2005-06-20 15:22:38 +0000
committerTeodor Sigaev <teodor@sigaev.ru>2005-06-20 15:22:38 +0000
commit1bfdd1a89321c390201ebe15fe47571f54f9c80a (patch)
tree5a329310ca2e125ac8fe3ada4200e4705f0b78e8 /src/backend/access/gist
parent3f6a094be1c03336072cc737038b048e77d78bc4 (diff)
downloadpostgresql-1bfdd1a89321c390201ebe15fe47571f54f9c80a.tar.gz
postgresql-1bfdd1a89321c390201ebe15fe47571f54f9c80a.zip
fix founded hole in recovery after crash, add vacuum_delay_point()
Diffstat (limited to 'src/backend/access/gist')
-rw-r--r--src/backend/access/gist/gist.c56
-rw-r--r--src/backend/access/gist/gistvacuum.c5
-rw-r--r--src/backend/access/gist/gistxlog.c176
3 files changed, 72 insertions, 165 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 340f6b9b4f1..89ba0713554 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.120 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.121 2005/06/20 15:22:37 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -317,17 +317,10 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
if ( !state->r->rd_istemp ) {
- OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ];
XLogRecPtr recptr;
XLogRecData *rdata;
- if ( state->stack->todelete ) {
- offs[0] = state->stack->childoffnum;
- noffs=1;
- }
-
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
- offs, noffs, state->itup, state->ituplen,
&(state->key), state->path, state->pathlen, dist);
START_CRIT_SECTION();
@@ -716,31 +709,27 @@ gistSplit(Relation r,
/* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
{
- int i;
- SplitedPageLayout *d, *origd=*dist;
-
nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
- /* XLOG stuff */
- d=*dist;
- /* translate offsetnumbers to our */
- while( d && d!=origd ) {
- for(i=0;i<d->block.num;i++)
- d->list[i] = v.spl_right[ d->list[i]-1 ];
- d=d->next;
- }
ReleaseBuffer(rightbuf);
}
else
{
OffsetNumber l;
+ char *ptr;
l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
/* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
(*dist)->block.num = v.spl_nright;
- (*dist)->list = v.spl_right;
+ (*dist)->list = (IndexTupleData*)palloc( BLCKSZ );
+ ptr = (char*) ( (*dist)->list );
+ for(i=0;i<v.spl_nright;i++) {
+ memcpy( ptr, rvectup[i], IndexTupleSize( rvectup[i] ) );
+ ptr += IndexTupleSize( rvectup[i] );
+ }
+ (*dist)->lenlist = ptr - ( (char*) ( (*dist)->list ) );
(*dist)->buffer = rightbuf;
nlen = 1;
@@ -754,20 +743,8 @@ gistSplit(Relation r,
{
int llen = v.spl_nleft;
IndexTuple *lntup;
- int i;
- SplitedPageLayout *d, *origd=*dist;
lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
-
- /* XLOG stuff */
- d=*dist;
- /* translate offsetnumbers to our */
- while( d && d!=origd ) {
- for(i=0;i<d->block.num;i++)
- d->list[i] = v.spl_left[ d->list[i]-1 ];
- d=d->next;
- }
-
ReleaseBuffer(leftbuf);
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
@@ -775,18 +752,25 @@ gistSplit(Relation r,
else
{
OffsetNumber l;
+ char *ptr;
l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
- if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
- PageRestoreTempPage(left, p);
-
/* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
(*dist)->block.num = v.spl_nleft;
- (*dist)->list = v.spl_left;
+ (*dist)->list = (IndexTupleData*)palloc( BLCKSZ );
+ ptr = (char*) ( (*dist)->list );
+ for(i=0;i<v.spl_nleft;i++) {
+ memcpy( ptr, lvectup[i], IndexTupleSize( lvectup[i] ) );
+ ptr += IndexTupleSize( lvectup[i] );
+ }
+ (*dist)->lenlist = ptr - ( (char*) ( (*dist)->list ) );
(*dist)->buffer = leftbuf;
+ if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
+ PageRestoreTempPage(left, p);
+
nlen += 1;
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
newtup[nlen - 1] = ( v.spl_leftvalid ) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 8f8e7f75070..e462d2af596 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.1 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.2 2005/06/20 15:22:37 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -183,7 +183,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
/* path is need to recovery because there is new pages, in a case of
crash it's needed to add inner tuple pointers on parent page */
rdata = formSplitRdata(gv->index->rd_node, blkno,
- todelete, ntodelete, addon, curlenaddon,
&key, gv->path, gv->curpathlen, dist);
MemoryContextSwitchTo(oldCtx);
@@ -507,6 +506,8 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
ptr = stack->next;
pfree( stack );
stack = ptr;
+
+ vacuum_delay_point();
}
MemoryContextDelete( opCtx );
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b6c0696e1af..d2f2697affa 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -33,20 +33,13 @@ typedef struct {
typedef struct {
gistxlogPage *header;
- OffsetNumber *offnum;
-
- /* to work with */
- Page page;
- Buffer buffer;
- bool is_ok;
+ IndexTuple *itup;
} NewPage;
typedef struct {
gistxlogPageSplit *data;
NewPage *page;
- IndexTuple *itup;
BlockNumber *path;
- OffsetNumber *todelete;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
@@ -259,11 +252,10 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
- int i=0, addpath = 0;
+ int j,i=0, addpath = 0;
decoded->data = (gistxlogPageSplit*)begin;
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
- decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
if ( decoded->data->pathlen ) {
addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
@@ -271,27 +263,21 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
} else
decoded->path = NULL;
- if ( decoded->data->ntodelete ) {
- decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath);
- addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
- } else
- decoded->todelete = NULL;
-
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
- for(i=0;i<decoded->data->nitup;i++) {
- Assert( ptr - begin < record->xl_len );
- decoded->itup[i] = (IndexTuple)ptr;
- ptr += IndexTupleSize( decoded->itup[i] );
- }
-
for(i=0;i<decoded->data->npage;i++) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].header = (gistxlogPage*)ptr;
ptr += sizeof(gistxlogPage);
- Assert( ptr - begin < record->xl_len );
- decoded->page[i].offnum = (OffsetNumber*)ptr;
- ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num );
+ decoded->page[i].itup = (IndexTuple*)
+ palloc( sizeof(IndexTuple) * decoded->page[i].header->num );
+ j=0;
+ while(j<decoded->page[i].header->num) {
+ Assert( ptr - begin < record->xl_len );
+ decoded->page[i].itup[j] = (IndexTuple)ptr;
+ ptr += IndexTupleSize((IndexTuple)ptr);
+ j++;
+ }
}
}
@@ -301,92 +287,53 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
Relation reln;
Buffer buffer;
Page page;
- int i, len=0;
- IndexTuple *itup, *institup;
- GISTPageOpaque opaque;
- bool release=true;
+ int i;
+ int flags=0;
decodePageSplitRecord( &xlrec, record );
-
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
+
+ /* first of all wee need get F_LEAF flag from original page */
buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
if (!BufferIsValid(buffer))
- elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+ elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", xlrec.data->origblkno);
page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
-
- if (XLByteLE(lsn, PageGetLSN(page))) {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return;
- }
-
- if ( xlrec.data->ntodelete ) {
- int i;
- for(i=0; i < xlrec.data->ntodelete ; i++)
- PageIndexTupleDelete(page, xlrec.todelete[i]);
- }
-
- itup = gistextractbuffer(buffer, &len);
- itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
- institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
- opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ if ( PageIsNew((PageHeader) page) )
+ elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page blkno %u",
+ xlrec.data->origblkno);
- /* read and fill all pages */
- for(i=0;i<xlrec.data->npage;i++) {
- int j;
- NewPage *newpage = xlrec.page + i;
-
- /* prepare itup vector per page */
- for(j=0;j<newpage->header->num;j++)
- institup[j] = itup[ newpage->offnum[j] - 1 ];
-
- if ( newpage->header->blkno == xlrec.data->origblkno ) {
- /* IncrBufferRefCount(buffer); */
- newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData));
- newpage->buffer = buffer;
- newpage->is_ok=false;
- } else {
- newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno);
- if (!BufferIsValid(newpage->buffer))
- elog(PANIC, "gistRedoPageSplitRecord: lost page");
- newpage->page = (Page) BufferGetPage(newpage->buffer);
- if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
- LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(newpage->buffer);
- newpage->is_ok=true;
- continue; /* good page */
- } else {
- newpage->is_ok=false;
- GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF);
- }
- }
- gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber);
- }
+ flags = ( GistPageIsLeaf(page) ) ? F_LEAF : 0;
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ /* loop around all pages */
for(i=0;i<xlrec.data->npage;i++) {
NewPage *newpage = xlrec.page + i;
-
- if ( newpage->is_ok )
+ bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
+
+ buffer = XLogReadBuffer( !isorigpage, reln, newpage->header->blkno);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistRedoEntryUpdateRecord: block %u unfound", newpage->header->blkno);
+ page = (Page) BufferGetPage(buffer);
+
+ if (XLByteLE(lsn, PageGetLSN(page))) {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
continue;
-
- if ( newpage->header->blkno == xlrec.data->origblkno ) {
- PageRestoreTempPage(newpage->page, page);
- release = false;
}
- PageSetLSN(newpage->page, lsn);
- PageSetTLI(newpage->page, ThisTimeLineID);
- LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
- WriteBuffer(newpage->buffer);
- }
-
- if ( release ) {
+ /* ok, clear buffer */
+ GISTInitBuffer(buffer, flags);
+
+ /* and fill it */
+ gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
+ WriteBuffer(buffer);
}
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
@@ -496,9 +443,8 @@ static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
- sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages",
- xlrec->origblkno,
- xlrec->nitup, xlrec->npage);
+ sprintf(buf + strlen(buf), "; block number %u splits to %d pages",
+ xlrec->origblkno, xlrec->npage);
}
void
@@ -716,14 +662,13 @@ gist_xlog_cleanup(void) {
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno,
- OffsetNumber *todelete, int ntodelete,
- IndexTuple *itup, int ituplen, ItemPointer key,
+ ItemPointer key,
BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
SplitedPageLayout *ptr;
- int npage = 0, cur=1, i;
+ int npage = 0, cur=1;
ptr=dist;
while( ptr ) {
@@ -731,13 +676,11 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
ptr=ptr->next;
}
- rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3));
+ rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + 2));
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->npage = (uint16)npage;
- xlrec->nitup = (uint16)ituplen;
- xlrec->ntodelete = (uint16)ntodelete;
xlrec->pathlen = (uint16)pathlen;
if ( key )
xlrec->key = *key;
@@ -758,25 +701,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
cur++;
}
- if ( ntodelete ) {
- rdata[cur-1].next = &(rdata[cur]);
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)todelete;
- rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
- rdata[cur].next = NULL;
- cur++;
- }
-
- /* new tuples */
- for(i=0;i<ituplen;i++) {
- rdata[cur].buffer = InvalidBuffer;
- rdata[cur].data = (char*)(itup[i]);
- rdata[cur].len = IndexTupleSize(itup[i]);
- rdata[cur].next = NULL;
- rdata[cur-1].next = &(rdata[cur]);
- cur++;
- }
-
ptr=dist;
while(ptr) {
rdata[cur].buffer = InvalidBuffer;
@@ -787,9 +711,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)(ptr->list);
- rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
- if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
- rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+ rdata[cur].len = ptr->lenlist;
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].next=NULL;
cur++;