diff options
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r-- | src/backend/access/gist/gist.c | 1530 |
1 files changed, 457 insertions, 1073 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 2a55c91f0df..4e3faccdf92 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -23,43 +23,6 @@ #include "miscadmin.h" #include "utils/memutils.h" - -#undef GIST_PAGEADDITEM - -#define ATTSIZE(datum, tupdesc, i, isnull) \ - ( \ - (isnull) ? 0 : \ - att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \ - ) - -/* result's status */ -#define INSERTED 0x01 -#define SPLITED 0x02 - -/* group flags ( in gistSplit ) */ -#define LEFT_ADDED 0x01 -#define RIGHT_ADDED 0x02 -#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) - -/* - * This defines only for shorter code, used in gistgetadjusted - * and gistadjsubkey only - */ -#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ - if (isnullkey) { \ - gistentryinit((evp), rkey, r, NULL, \ - (OffsetNumber) 0, rkeyb, FALSE); \ - } else { \ - gistentryinit((evp), okey, r, NULL, \ - (OffsetNumber) 0, okeyb, FALSE); \ - } \ -} while(0) - -#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ - FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ - FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ -} while(0); - /* Working state for gistbuild and its callback */ typedef struct { @@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *GISTstate); -static int gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, - int *len, - GISTSTATE *giststate); -static OffsetNumber gistwritebuffer(Relation r, - Page page, - IndexTuple *itup, - int len, - OffsetNumber off); -static bool gistnospace(Page page, IndexTuple *itvec, int len); -static IndexTuple *gistreadbuffer(Buffer buffer, int *len); -static IndexTuple *gistjoinvector( - IndexTuple *itvec, int *len, - IndexTuple *additvec, int addlen); -static IndexTuple gistunion(Relation r, IndexTuple *itvec, - int len, GISTSTATE *giststate); - -static IndexTuple gistgetadjusted(Relation r, - IndexTuple oldtup, - IndexTuple addtup, +static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); -static int gistfindgroup(GISTSTATE *giststate, - GISTENTRY *valvec, GIST_SPLITVEC *spl); -static void gistadjsubkey(Relation r, - IndexTuple *itup, int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate); -static IndexTuple gistFormTuple(GISTSTATE *giststate, - Relation r, Datum *attdata, int *datumsize, bool *isnull); + + +typedef struct PageLayout { + gistxlogPage block; + OffsetNumber *list; + Buffer buffer; /* to write after all proceed */ + + struct PageLayout *next; +} PageLayout; + + +#define ROTATEDIST(d) do { \ + PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \ + memset(tmp,0,sizeof(PageLayout)); \ + tmp->next = (d); \ + (d)=tmp; \ +} while(0) + + static IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, int *len, + PageLayout **dist, GISTSTATE *giststate); -static void gistnewroot(Relation r, IndexTuple *itup, int len); -static void GISTInitBuffer(Buffer b, uint32 f); -static OffsetNumber gistchoose(Relation r, Page p, - IndexTuple it, - GISTSTATE *giststate); -static void gistdelete(Relation r, ItemPointer tid); - -#ifdef GIST_PAGEADDITEM -static IndexTuple gist_tuple_replacekey(Relation r, - GISTENTRY entry, IndexTuple t); -#endif -static void gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, - Relation r, Page pg, - OffsetNumber o, int b, bool l, bool isNull); -static void gistDeCompressAtt(GISTSTATE *giststate, Relation r, - IndexTuple tuple, Page p, OffsetNumber o, - GISTENTRY *attdata, bool *isnull); -static void gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, - float *penalty); + #undef GISTDEBUG @@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS) /* initialize the root page */ buffer = ReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); + if ( !index->rd_istemp ) { + XLogRecPtr recptr; + XLogRecData rdata; + Page page; + + rdata.buffer = InvalidBuffer; + rdata.data = (char*)&(index->rd_node); + rdata.len = sizeof(RelFileNode); + rdata.next = NULL; + + page = BufferGetPage(buffer); + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buffer); /* build the index */ @@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } -#ifdef GIST_PAGEADDITEM -/* - * Take a compressed entry, and install it on a page. Since we now know - * where the entry will live, we decompress it and recompress it using - * that knowledge (some compression routines may want to fish around - * on the page, for example, or do something special for leaf nodes.) - */ -static OffsetNumber -gistPageAddItem(GISTSTATE *giststate, - Relation r, - Page page, - Item item, - Size size, - OffsetNumber offsetNumber, - ItemIdFlags flags, - GISTENTRY *dentry, - IndexTuple *newtup) -{ - GISTENTRY tmpcentry; - IndexTuple itup = (IndexTuple) item; - OffsetNumber retval; - Datum datum; - bool IsNull; - - /* - * recompress the item given that we now know the exact page and - * offset for insertion - */ - datum = index_getattr(itup, 1, r->rd_att, &IsNull); - gistdentryinit(giststate, 0, dentry, datum, - (Relation) 0, (Page) 0, - (OffsetNumber) InvalidOffsetNumber, - ATTSIZE(datum, r, 1, IsNull), - FALSE, IsNull); - gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page, - offsetNumber, dentry->bytes, FALSE); - *newtup = gist_tuple_replacekey(r, tmpcentry, itup); - retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), - offsetNumber, flags); - if (retval == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); - return retval; -} -#endif /* * Workhouse routine for doing insertion into a GiST index. Note that @@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) { - IndexTuple *instup; - int ret, - len = 1; + GISTInsertState state; - instup = (IndexTuple *) palloc(sizeof(IndexTuple)); - instup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(instup[0], itup, IndexTupleSize(itup)); + memset(&state, 0, sizeof(GISTInsertState)); - ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate); - if (ret & SPLITED) - gistnewroot(r, instup, len); -} + state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); + state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); + memcpy(state.itup[0], itup, IndexTupleSize(itup)); + state.ituplen=1; + state.r = r; + state.key = itup->t_tid; + state.needInsertComplete = true; + state.xlog_mode = false; -static int -gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, /* in - out, has compressed entry */ - int *len, /* in - out */ - GISTSTATE *giststate) -{ - Buffer buffer; - Page page; - int ret; - GISTPageOpaque opaque; + state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + memset( state.stack, 0, sizeof(GISTInsertStack)); + state.stack->blkno=GIST_ROOT_BLKNO; - buffer = ReadBuffer(r, blkno); - page = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + gistfindleaf(&state, giststate); + gistmakedeal(&state, giststate); +} - if (!(opaque->flags & F_LEAF)) +static bool +gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { + bool is_splitted = false; + + if (gistnospace(state->stack->page, state->itup, state->ituplen)) { - /* - * This is an internal page, so continue to walk down the - * tree. We find the child node that has the minimum insertion - * penalty and recursively invoke ourselves to modify that - * node. Once the recursive call returns, we may need to - * adjust the parent node for two reasons: the child node - * split, or the key in this node needs to be adjusted for the - * newly inserted key below us. - */ - ItemId iid; - BlockNumber nblkno; - ItemPointerData oldtid; - IndexTuple oldtup; - OffsetNumber child; - - child = gistchoose(r, page, *(*itup), giststate); - iid = PageGetItemId(page, child); - oldtup = (IndexTuple) PageGetItem(page, iid); - nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + /* no space for insertion */ + IndexTuple *itvec, + *newitup; + int tlen,olen; + PageLayout *dist=NULL, *ptr; + + memset(&dist, 0, sizeof(PageLayout)); + is_splitted = true; + itvec = gistextractbuffer(state->stack->buffer, &tlen); + olen=tlen; + itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); + newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); + + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogPageSplit xlrec; + XLogRecPtr recptr; + XLogRecData *rdata; + int i, npage = 0, cur=1; + + ptr=dist; + while( ptr ) { + npage++; + ptr=ptr->next; + } - /* - * After this call: 1. if child page was splited, then itup - * contains keys for each page 2. if child page wasn't splited, - * then itup contains additional for adjustment of current key - */ - ret = gistlayerinsert(r, nblkno, itup, len, giststate); + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2)); + + xlrec.node = state->r->rd_node; + xlrec.origblkno = state->stack->blkno; + xlrec.npage = npage; + xlrec.nitup = state->ituplen; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogPageSplit ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i<state->ituplen;i++) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(state->itup[i]); + rdata[cur].len = IndexTupleSize(state->itup[i]); + rdata[cur-1].next = &(rdata[cur]); + cur++; + } - /* nothing inserted in child */ - if (!(ret & INSERTED)) - { - ReleaseBuffer(buffer); - return 0x00; - } + /* new page layout */ + ptr=dist; + while(ptr) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)&(ptr->block); + rdata[cur].len = sizeof(gistxlogPage); + rdata[cur-1].next = &(rdata[cur]); + cur++; + + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(ptr->list); + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); + if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) + rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].next=NULL; + cur++; + + ptr=ptr->next; + } - /* child did not split */ - if (!(ret & SPLITED)) - { - IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate); + START_CRIT_SECTION(); - if (!newtup) - { - /* not need to update key */ - ReleaseBuffer(buffer); - return 0x00; + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + ptr=ptr->next; } - (*itup)[0] = newtup; + END_CRIT_SECTION(); } - /* - * This node's key has been modified, either because a child - * split occurred or because we needed to adjust our key for - * an insert in a child node. Therefore, remove the old - * version of this node's key. - */ - ItemPointerSet(&oldtid, blkno, child); - gistdelete(r, &oldtid); - - /* - * if child was splitted, new key for child will be inserted in - * the end list of child, so we must say to any scans that page is - * changed beginning from 'child' offset - */ - if (ret & SPLITED) - gistadjscans(r, GISTOP_SPLIT, blkno, child); - } + ptr = dist; + while(ptr) { + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } - ret = INSERTED; + state->itup = newitup; + state->ituplen = tlen; /* now tlen >= 2 */ - if (gistnospace(page, (*itup), *len)) - { - /* no space for insertion */ - IndexTuple *itvec, - *newitup; - int tlen, - oldlen; - - ret |= SPLITED; - itvec = gistreadbuffer(buffer, &tlen); - itvec = gistjoinvector(itvec, &tlen, (*itup), *len); - oldlen = *len; - newitup = gistSplit(r, buffer, itvec, &tlen, giststate); - ReleaseBuffer(buffer); - *itup = newitup; - *len = tlen; /* now tlen >= 2 */ + if ( state->stack->blkno == GIST_ROOT_BLKNO ) { + gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode); + state->needInsertComplete=false; + } + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); } else { /* enough space */ - OffsetNumber off, - l; + OffsetNumber off, l; - off = (PageIsEmpty(page)) ? + off = (PageIsEmpty(state->stack->page)) ? FirstOffsetNumber : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); - l = gistwritebuffer(r, page, *itup, *len, off); - WriteBuffer(buffer); + OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); + l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) ); + int i, cur=0; + + xlrec.node = state->r->rd_node; + xlrec.blkno = state->stack->blkno; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + for(i=1; i<=state->ituplen; i++) { /* adding tuples */ + rdata[i+cur].buffer = InvalidBuffer; + rdata[i+cur].data = (char*)(state->itup[i-1]); + rdata[i+cur].len = IndexTupleSize(state->itup[i-1]); + rdata[i+cur].next = NULL; + rdata[i-1+cur].next = &(rdata[i+cur]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(state->stack->page, recptr); + PageSetTLI(state->stack->page, ThisTimeLineID); + + END_CRIT_SECTION(); + } + + if ( state->stack->blkno == GIST_ROOT_BLKNO ) + state->needInsertComplete=false; - if (*len > 1) - { /* previous insert ret & SPLITED != 0 */ + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(state->stack->buffer); + + if (state->ituplen > 1) + { /* previous is_splitted==true */ /* * child was splited, so we must form union for insertion in * parent */ - IndexTuple newtup = gistunion(r, (*itup), *len, giststate); - ItemPointerSet(&(newtup->t_tid), blkno, 1); - (*itup)[0] = newtup; - *len = 1; + IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); + ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber); + state->itup[0] = newtup; + state->ituplen = 1; } } - - return ret; -} - -/* - * Write itup vector to page, has no control of free space - */ -static OffsetNumber -gistwritebuffer(Relation r, Page page, IndexTuple *itup, - int len, OffsetNumber off) -{ - OffsetNumber l = InvalidOffsetNumber; - int i; - - for (i = 0; i < len; i++) - { -#ifdef GIST_PAGEADDITEM - GISTENTRY tmpdentry; - IndexTuple newtup; - bool IsNull; - - l = gistPageAddItem(giststate, r, page, - (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED, &tmpdentry, &newtup); - off = OffsetNumberNext(off); -#else - l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED); - if (l == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); -#endif - } - return l; + return is_splitted; } -/* - * Check space for itup vector on page - */ -static bool -gistnospace(Page page, IndexTuple *itvec, int len) -{ - unsigned int size = 0; - int i; - - for (i = 0; i < len; i++) - size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); - - return (PageGetFreeSpace(page) < size); -} - -/* - * Read buffer into itup vector - */ -static IndexTuple * -gistreadbuffer(Buffer buffer, int *len /* out */ ) -{ - OffsetNumber i, - maxoff; - IndexTuple *itvec; - Page p = (Page) BufferGetPage(buffer); - - maxoff = PageGetMaxOffsetNumber(p); - *len = maxoff; - itvec = palloc(sizeof(IndexTuple) * maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - - return itvec; -} - -/* - * join two vectors into one - */ -static IndexTuple * -gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) -{ - itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); - memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); - *len += addlen; - return itvec; -} - -/* - * Return an IndexTuple containing the result of applying the "union" - * method to the specified IndexTuple vector. - */ -static IndexTuple -gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +static void +gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { - Datum attr[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - GistEntryVector *evec; - int i; - GISTENTRY centry[INDEX_MAX_KEYS]; - - evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum; - int j; - int real_len; + ItemId iid; + IndexTuple oldtup; + GISTInsertStack *ptr; - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); - if (IsNull) - continue; - - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - } + /* walk down */ + while( true ) { + GISTPageOpaque opaque; - /* If this tuple vector was all NULLs, the union is NULL */ - if (real_len == 0) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else + state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + state->stack->page = (Page) BufferGetPage(state->stack->buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page); + + if (!(opaque->flags & F_LEAF)) { - int datumsize; - - if (real_len == 1) - { - evec->n = 2; - gistentryinit(evec->vector[1], - evec->vector[0].key, r, NULL, - (OffsetNumber) 0, evec->vector[0].bytes, FALSE); - } - else - evec->n = real_len; - - /* Compress the result of the union and store in attr array */ - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - isnull[i] = FALSE; - attr[i] = centry[i].key; - } + /* + * This is an internal page, so continue to walk down the + * tree. We find the child node that has the minimum insertion + * penalty and recursively invoke ourselves to modify that + * node. Once the recursive call returns, we may need to + * adjust the parent node for two reasons: the child node + * split, or the key in this node needs to be adjusted for the + * newly inserted key below us. + */ + GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + + state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); + + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + item->parent = state->stack; + item->todelete = false; + state->stack = item; + } else + break; } - return index_form_tuple(giststate->tupdesc, attr, isnull); -} - - -/* - * Forms union of oldtup and addtup, if union == oldtup then return NULL - */ -static IndexTuple -gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) -{ - GistEntryVector *evec; - bool neednew = false; - bool isnull[INDEX_MAX_KEYS]; - Datum attr[INDEX_MAX_KEYS]; - GISTENTRY centry[INDEX_MAX_KEYS], - oldatt[INDEX_MAX_KEYS], - addatt[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - bool oldisnull[INDEX_MAX_KEYS], - addisnull[INDEX_MAX_KEYS]; - IndexTuple newtup = NULL; - int i; - - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); - - gistDeCompressAtt(giststate, r, oldtup, NULL, - (OffsetNumber) 0, oldatt, oldisnull); - - gistDeCompressAtt(giststate, r, addtup, NULL, - (OffsetNumber) 0, addatt, addisnull); - - for (i = 0; i < r->rd_att->natts; i++) - { - if (oldisnull[i] && addisnull[i]) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else - { - Datum datum; - int datumsize; - - FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, - addisnull[i], addatt[i].key, addatt[i].bytes); - - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if (oldisnull[i] || addisnull[i]) - { - if (oldisnull[i]) - neednew = true; - } - else - { - bool result; - - FunctionCall3(&giststate->equalFn[i], - ev0p->key, - datum, - PointerGetDatum(&result)); + /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */ - if (!result) - neednew = true; - } - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - - attr[i] = centry[i].key; - isnull[i] = FALSE; - } + /* form state->path to work xlog */ + ptr = state->stack; + state->pathlen=1; + while( ptr ) { + state->pathlen++; + ptr=ptr->parent; } - - if (neednew) - { - /* need to update key */ - newtup = index_form_tuple(giststate->tupdesc, attr, isnull); - newtup->t_tid = oldtup->t_tid; + state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen); + ptr = state->stack; + state->pathlen=0; + while( ptr ) { + state->path[ state->pathlen ] = ptr->blkno; + state->pathlen++; + ptr=ptr->parent; } - - return newtup; + state->pathlen--; + state->path++; } -static void -gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) -{ - int lr; - - for (lr = 0; lr < 2; lr++) - { - OffsetNumber *entries; - int i; - Datum *attr; - int len, - *attrsize; - bool *isnull; - GistEntryVector *evec; - - if (lr) - { - attrsize = spl->spl_lattrsize; - attr = spl->spl_lattr; - len = spl->spl_nleft; - entries = spl->spl_left; - isnull = spl->spl_lisnull; - } - else - { - attrsize = spl->spl_rattrsize; - attr = spl->spl_rattr; - len = spl->spl_nright; - entries = spl->spl_right; - isnull = spl->spl_risnull; - } - - evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 1; i < r->rd_att->natts; i++) - { - int j; - Datum datum; - int datumsize; - int real_len; - - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - - if (spl->spl_idgrp[entries[j]]) - continue; - datum = index_getattr(itvec[entries[j] - 1], i + 1, - giststate->tupdesc, &IsNull); - if (IsNull) - continue; - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - - } - if (real_len == 0) - { - datum = (Datum) 0; - datumsize = 0; - isnull[i] = true; - } - else - { - /* - * evec->vector[0].bytes may be not defined, so form union - * with itself - */ - if (real_len == 1) - { - evec->n = 2; - memcpy(&(evec->vector[1]), &(evec->vector[0]), - sizeof(GISTENTRY)); - } - else - evec->n = real_len; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - isnull[i] = false; - } +void +gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { + int is_splitted; + ItemId iid; + IndexTuple oldtup, newtup; - attr[i] = datum; - attrsize[i] = datumsize; - } - } -} + /* walk up */ + while( true ) { + /* + * After this call: 1. if child page was splited, then itup + * contains keys for each page 2. if child page wasn't splited, + * then itup contains additional for adjustment of current key + */ -/* - * find group in vector with equal value - */ -static int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) -{ - int i; - int curid = 1; - - /* - * first key is always not null (see gistinsert), so we may not check - * for nulls - */ - for (i = 0; i < spl->spl_nleft; i++) - { - int j; - int len; - bool result; - - if (spl->spl_idgrp[spl->spl_left[i]]) - continue; - len = 0; - /* find all equal value in right part */ - for (j = 0; j < spl->spl_nright; j++) - { - if (spl->spl_idgrp[spl->spl_right[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_right[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_right[j]] = curid; - len++; - } - } - /* find all other equal value in left part */ - if (len) - { - /* add current val to list of equal values */ - spl->spl_idgrp[spl->spl_left[i]] = curid; - /* searching .. */ - for (j = i + 1; j < spl->spl_nleft; j++) - { - if (spl->spl_idgrp[spl->spl_left[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_left[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_left[j]] = curid; - len++; - } - } - spl->spl_ngrp[curid] = len + 1; - curid++; - } - } + is_splitted = gistplacetopage(state, giststate ); - return curid; -} + /* pop page from stack */ + state->stack = state->stack->parent; + state->pathlen--; + state->path++; + + /* stack is void */ + if ( ! state->stack ) + break; -/* - * Insert equivalent tuples to left or right page with minimum - * penalty - */ -static void -gistadjsubkey(Relation r, - IndexTuple *itup, /* contains compressed entry */ - int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate) -{ - int curlen; - OffsetNumber *curwpos; - GISTENTRY entry, - identry[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - float lpenalty, - rpenalty; - GistEntryVector *evec; - int datumsize; - bool isnull[INDEX_MAX_KEYS]; - int i, - j; - /* clear vectors */ - curlen = v->spl_nleft; - curwpos = v->spl_left; - for (i = 0; i < v->spl_nleft; i++) - { - if (v->spl_idgrp[v->spl_left[i]] == 0) + /* child did not split */ + if (!is_splitted) { - *curwpos = v->spl_left[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nleft = curlen; + /* parent's tuple */ + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); + + if (!newtup) /* not need to update key */ + break; - curlen = v->spl_nright; - curwpos = v->spl_right; - for (i = 0; i < v->spl_nright; i++) - { - if (v->spl_idgrp[v->spl_right[i]] == 0) - { - *curwpos = v->spl_right[i]; - curwpos++; + state->itup[0] = newtup; } - else - curlen--; + + /* + * This node's key has been modified, either because a child + * split occurred or because we needed to adjust our key for + * an insert in a child node. Therefore, remove the old + * version of this node's key. + */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); + PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); + if ( !state->r->rd_istemp ) + state->stack->todelete = true; + + /* + * if child was splitted, new key for child will be inserted in + * the end list of child, so we must say to any scans that page is + * changed beginning from 'child' offset + */ + if (is_splitted) + gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum); + } /* while */ + + /* release all buffers */ + while( state->stack ) { + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); + state->stack = state->stack->parent; } - v->spl_nright = curlen; - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); + /* say to xlog that insert is completed */ + if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) { + gistxlogInsertComplete xlrec; + XLogRecData rdata; + + xlrec.node = state->r->rd_node; + xlrec.key = state->key; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogInsertComplete ); + rdata.next = NULL; - /* add equivalent tuple */ - for (i = 0; i < *len; i++) - { - Datum datum; + START_CRIT_SECTION(); - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ - continue; - gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, isnull); + XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata); - v->spl_ngrp[v->spl_idgrp[i + 1]]--; - if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) - { - /* force last in group */ - rpenalty = 1.0; - lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; - } - else - { - /* where? */ - for (j = 1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j], &lpenalty); - - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j], &rpenalty); - - if (lpenalty != rpenalty) - break; - } - } - - /* - * add - * XXX: refactor this to avoid duplicating code - */ - if (lpenalty < rpenalty) - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft] = i + 1; - v->spl_nleft++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_lisnull[j]) - { - v->spl_lattr[j] = (Datum) 0; - v->spl_lattrsize[j] = 0; - } - else - { - FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_lattr[j] = datum; - v->spl_lattrsize[j] = datumsize; - v->spl_lisnull[j] = false; - } - } - } - else - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright] = i + 1; - v->spl_nright++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_risnull[j]) - { - v->spl_rattr[j] = (Datum) 0; - v->spl_rattrsize[j] = 0; - } - else - { - FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_rattr[j] = datum; - v->spl_rattrsize[j] = datumsize; - v->spl_risnull[j] = false; - } - } - } + END_CRIT_SECTION(); } } @@ -1105,6 +674,7 @@ gistSplit(Relation r, Buffer buffer, IndexTuple *itup, /* contains compressed entry */ int *len, + PageLayout **dist, GISTSTATE *giststate) { Page p; @@ -1225,29 +795,57 @@ gistSplit(Relation r, /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { + int i; + PageLayout *d, *origd=*dist; + nlen = v.spl_nright; - newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate); + newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_right[ d->list[i]-1 ]; + d=d->next; + } ReleaseBuffer(rightbuf); } else { OffsetNumber l; - l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); - WriteBuffer(rightbuf); - + l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(rightbuf); + (*dist)->block.num = v.spl_nright; + (*dist)->list = v.spl_right; + (*dist)->buffer = rightbuf; + nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); - ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); + ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber); } if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; IndexTuple *lntup; - - lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate); + int i; + PageLayout *d, *origd=*dist; + + lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); + + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_left[ d->list[i]-1 ]; + d=d->next; + } + ReleaseBuffer(leftbuf); newtup = gistjoinvector(newtup, &nlen, lntup, llen); @@ -1256,151 +854,76 @@ gistSplit(Relation r, { OffsetNumber l; - l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); + l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) PageRestoreTempPage(left, p); - WriteBuffer(leftbuf); - + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(leftbuf); + (*dist)->block.num = v.spl_nleft; + (*dist)->list = v.spl_left; + (*dist)->buffer = leftbuf; + nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); - ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); + ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber); } *len = nlen; return newtup; } -static void -gistnewroot(Relation r, IndexTuple *itup, int len) -{ - Buffer b; - Page p; - - b = ReadBuffer(r, GIST_ROOT_BLKNO); - GISTInitBuffer(b, 0); - p = BufferGetPage(b); - - gistwritebuffer(r, p, itup, len, FirstOffsetNumber); - WriteBuffer(b); -} - -static void -GISTInitBuffer(Buffer b, uint32 f) +void +gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode) { - GISTPageOpaque opaque; + Buffer buffer; Page page; - Size pageSize; - - pageSize = BufferGetPageSize(b); - page = BufferGetPage(b); - PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); - - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - opaque->flags = f; -} - -/* - * find entry with lowest penalty - */ -static OffsetNumber -gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ - GISTSTATE *giststate) -{ - OffsetNumber maxoff; - OffsetNumber i; - OffsetNumber which; - float sum_grow, - which_grow[INDEX_MAX_KEYS]; - GISTENTRY entry, - identry[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - - maxoff = PageGetMaxOffsetNumber(p); - *which_grow = -1.0; - which = -1; - sum_grow = 1; - gistDeCompressAtt(giststate, r, - it, NULL, (OffsetNumber) 0, - identry, isnull); - - for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) - { - int j; - IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - sum_grow = 0; - for (j = 0; j < r->rd_att->natts; j++) - { - Datum datum; - float usize; - bool IsNull; - - datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, j, &entry, datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), - FALSE, IsNull); - gistpenalty(giststate, j, &entry, IsNull, - &identry[j], isnull[j], &usize); - - if (which_grow[j] < 0 || usize < which_grow[j]) - { - which = i; - which_grow[j] = usize; - if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) - which_grow[j + 1] = -1; - sum_grow += which_grow[j]; - } - else if (which_grow[j] == usize) - sum_grow += usize; - else - { - sum_grow = 1; - break; - } - } + buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO); + GISTInitBuffer(buffer, 0); + page = BufferGetPage(buffer); + + gistfillbuffer(r, page, itup, len, FirstOffsetNumber); + if ( !xlog_mode && !r->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) ); + int i; + + xlrec.node = r->rd_node; + xlrec.blkno = GIST_ROOT_BLKNO; + xlrec.todeleteoffnum = InvalidOffsetNumber; + xlrec.key = *key; + xlrec.pathlen=0; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + for(i=1; i<=len; i++) { + rdata[i].buffer = InvalidBuffer; + rdata[i].data = (char*)(itup[i-1]); + rdata[i].len = IndexTupleSize(itup[i-1]); + rdata[i].next = NULL; + rdata[i-1].next = &(rdata[i]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); } - - return which; + if ( xlog_mode ) + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); } -/* - * Retail deletion of a single tuple. - * - * NB: this is no longer called externally, but is still needed by - * gistlayerinsert(). That dependency will have to be fixed if GIST - * is ever going to allow concurrent insertions. - */ -static void -gistdelete(Relation r, ItemPointer tid) -{ - BlockNumber blkno; - OffsetNumber offnum; - Buffer buf; - Page page; - - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - - blkno = ItemPointerGetBlockNumber(tid); - offnum = ItemPointerGetOffsetNumber(tid); - - /* adjust any scans that will be affected by this deletion */ - /* NB: this works only for scans in *this* backend! */ - gistadjscans(r, GISTOP_DEL, blkno, offnum); - - /* delete the index tuple */ - buf = ReadBuffer(r, blkno); - page = BufferGetPage(buf); - - PageIndexTupleDelete(page, offnum); - - WriteBuffer(buf); -} /* * Bulk deletion of all index entries pointing to a set of heap tuples. @@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS) page = BufferGetPage(buf); PageIndexTupleDelete(page, offnum); + if ( !rel->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData rdata; + + xlrec.node = rel->rd_node; + xlrec.blkno = blkno; + xlrec.todeleteoffnum = offnum; + xlrec.pathlen=0; + ItemPointerSetInvalid( &(xlrec.key) ); + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogEntryUpdate ); + rdata.next = NULL; + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buf); @@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate) /* no work */ } -#ifdef GIST_PAGEADDITEM -/* - * Given an IndexTuple to be inserted on a page, this routine replaces - * the key with another key, which may involve generating a new IndexTuple - * if the sizes don't match or if the null status changes. - * - * XXX this only works for a single-column index tuple! - */ -static IndexTuple -gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) -{ - bool IsNull; - Datum datum = index_getattr(t, 1, r->rd_att, &IsNull); - - /* - * If new entry fits in index tuple, copy it in. To avoid worrying - * about null-value bitmask, pass it off to the general - * index_form_tuple routine if either the previous or new value is - * NULL. - */ - if (!IsNull && DatumGetPointer(entry.key) != NULL && - (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull)) - { - memcpy(DatumGetPointer(datum), - DatumGetPointer(entry.key), - entry.bytes); - /* clear out old size */ - t->t_info &= ~INDEX_SIZE_MASK; - /* or in new size */ - t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData)); - - return t; - } - else - { - /* generate a new index tuple for the compressed entry */ - TupleDesc tupDesc = r->rd_att; - IndexTuple newtup; - bool isnull; - - isnull = (DatumGetPointer(entry.key) == NULL); - newtup = index_form_tuple(tupDesc, &(entry.key), &isnull); - newtup->t_tid = t->t_tid; - return newtup; - } -} -#endif - -/* - * initialize a GiST entry with a decompressed version of key - */ -void -gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, - Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull) -{ - if (b && !isNull) - { - GISTENTRY *dep; - - gistentryinit(*e, k, r, pg, o, b, l); - dep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], - PointerGetDatum(e))); - /* decompressFn may just return the given pointer */ - if (dep != e) - gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, - dep->bytes, dep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - - -/* - * initialize a GiST entry with a compressed version of key - */ -static void -gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l, bool isNull) -{ - if (!isNull) - { - GISTENTRY *cep; - - gistentryinit(*e, k, r, pg, o, b, l); - cep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], - PointerGetDatum(e))); - /* compressFn may just return the given pointer */ - if (cep != e) - gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - -static IndexTuple -gistFormTuple(GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[], bool isnull[]) -{ - GISTENTRY centry[INDEX_MAX_KEYS]; - Datum compatt[INDEX_MAX_KEYS]; - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - if (isnull[i]) - compatt[i] = (Datum) 0; - else - { - gistcentryinit(giststate, i, ¢ry[i], attdata[i], - NULL, NULL, (OffsetNumber) 0, - datumsize[i], FALSE, FALSE); - compatt[i] = centry[i].key; - } - } - - return index_form_tuple(giststate->tupdesc, compatt, isnull); -} - -static void -gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *isnull) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); - gistdentryinit(giststate, i, &attdata[i], - datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), - FALSE, isnull[i]); - } -} - -static void -gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, float *penalty) -{ - if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) - *penalty = 0.0; - else - FunctionCall3(&giststate->penaltyFn[attno], - PointerGetDatum(key1), - PointerGetDatum(key2), - PointerGetDatum(penalty)); -} - #ifdef GISTDEBUG static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) @@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) } #endif /* defined GISTDEBUG */ -void -gist_redo(XLogRecPtr lsn, XLogRecord *record) -{ - elog(PANIC, "gist_redo: unimplemented"); -} - -void -gist_desc(char *buf, uint8 xl_info, char *rec) -{ -} |