aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gist.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2001-05-31 18:16:55 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2001-05-31 18:16:55 +0000
commit3043810d977b8197f9671c98439104db80b8e914 (patch)
tree99be1d86e7f18cf24e2016b5c021c2748dc1a8af /src/backend/access/gist/gist.c
parente1107fc28536bf5f24f388bd701bffb98c09cd06 (diff)
downloadpostgresql-3043810d977b8197f9671c98439104db80b8e914.tar.gz
postgresql-3043810d977b8197f9671c98439104db80b8e914.zip
Updates to make GIST work with multi-key indexes (from Oleg Bartunov
and Teodor Sigaev). Declare key values as Datum where appropriate, rather than char* (Tom Lane).
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r--src/backend/access/gist/gist.c903
1 files changed, 674 insertions, 229 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 8f23d14d79d..8f0cfbbb499 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.77 2001/05/30 19:53:39 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.78 2001/05/31 18:16:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -26,10 +26,23 @@
#include "access/xlogutils.h"
+#undef GIST_PAGEADDITEM
+
+#define ATTSIZE( datum, Rel, i, isnull ) \
+ ( \
+ ( isnull ) ? 0 : \
+ att_addlength(0, (Rel)->rd_att->attrs[(i)-1]->attlen, (datum)) \
+ )
+
/* result's status */
#define INSERTED 0x01
#define SPLITED 0x02
+/* group flags ( in gistSplit ) */
+#define LEFT_ADDED 0x01
+#define RIGHT_ADDED 0x02
+#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
+
/* non-export function prototypes */
static void gistdoinsert(Relation r,
IndexTuple itup,
@@ -55,10 +68,15 @@ static IndexTuple *gistjoinvector(
IndexTuple *additvec, int addlen);
static IndexTuple gistunion(Relation r, IndexTuple *itvec,
int len, GISTSTATE *giststate);
+
static IndexTuple gistgetadjusted(Relation r,
IndexTuple oldtup,
IndexTuple addtup,
GISTSTATE *giststate);
+static int gistfindgroup( GISTSTATE *giststate,
+ GISTENTRY *valvec, GIST_SPLITVEC * spl );
+static IndexTuple gistFormTuple( GISTSTATE *giststate,
+ Relation r, Datum attdata[], int datumsize[] );
static IndexTuple *gistSplit(Relation r,
Buffer buffer,
IndexTuple *itup,
@@ -71,13 +89,18 @@ static void GISTInitBuffer(Buffer b, uint32 f);
static OffsetNumber gistchoose(Relation r, Page p,
IndexTuple it,
GISTSTATE *giststate);
+#ifdef GIST_PAGEADDITEM
static IndexTuple gist_tuple_replacekey(Relation r,
GISTENTRY entry, IndexTuple t);
-static void gistcentryinit(GISTSTATE *giststate,
- GISTENTRY *e, char *pr,
+#endif
+static void gistcentryinit(GISTSTATE *giststate, int nkey,
+ GISTENTRY *e, Datum k,
Relation r, Page pg,
OffsetNumber o, int b, bool l);
-
+static bool gistDeCompressAtt( GISTSTATE *giststate, Relation r,
+ IndexTuple tuple, Page p, OffsetNumber o,
+ GISTENTRY attdata[], bool decompvec[] );
+static void gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] );
#undef GISTDEBUG
#ifdef GISTDEBUG
@@ -230,15 +253,15 @@ gistbuild(PG_FUNCTION_ARGS)
/* immediately compress keys to normalize */
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
{
- gistcentryinit(&giststate, &tmpcentry, (char *) attdata[i],
+ gistcentryinit(&giststate, i, &tmpcentry, attdata[i],
(Relation) NULL, (Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE);
- if (attdata[i] != (Datum) tmpcentry.pred &&
+ if (attdata[i] != tmpcentry.key &&
!(giststate.keytypbyval))
compvec[i] = TRUE;
else
compvec[i] = FALSE;
- attdata[i] = (Datum) tmpcentry.pred;
+ attdata[i] = tmpcentry.key;
}
/* form an index tuple and point it at the heap tuple */
@@ -252,7 +275,6 @@ gistbuild(PG_FUNCTION_ARGS)
* is the right thing to do if you're inserting single tups, but
* not when you're initializing the whole index at once.
*/
-
gistdoinsert(index, itup, NULL, &giststate);
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
@@ -339,14 +361,14 @@ gistinsert(PG_FUNCTION_ARGS)
compvec = (bool *) palloc(sizeof(bool) * r->rd_att->natts);
for (i = 0; i < r->rd_att->natts; i++)
{
- gistcentryinit(&giststate, &tmpentry, (char *) datum[i],
+ gistcentryinit(&giststate, i,&tmpentry, datum[i],
(Relation) NULL, (Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE);
- if (datum[i] != (Datum) tmpentry.pred && !(giststate.keytypbyval))
+ if (datum[i] != tmpentry.key && !(giststate.keytypbyval))
compvec[i] = TRUE;
else
compvec[i] = FALSE;
- datum[i] = (Datum) tmpentry.pred;
+ datum[i] = tmpentry.key;
}
itup = index_formtuple(RelationGetDescr(r), datum, nulls);
itup->t_tid = *ht_ctid;
@@ -361,13 +383,14 @@ gistinsert(PG_FUNCTION_ARGS)
gistdoinsert(r, itup, &res, &giststate);
for (i = 0; i < r->rd_att->natts; i++)
if (compvec[i] == TRUE)
- pfree((char *) datum[i]);
+ pfree(DatumGetPointer(datum[i]));
pfree(itup);
pfree(compvec);
PG_RETURN_POINTER(res);
}
+#ifdef GIST_PAGEADDITEM
/*
** Take a compressed entry, and install it on a page. Since we now know
** where the entry will live, we decompress it and recompress it using
@@ -388,16 +411,20 @@ gistPageAddItem(GISTSTATE *giststate,
GISTENTRY tmpcentry;
IndexTuple itup = (IndexTuple) item;
OffsetNumber retval;
+ Datum datum;
+ bool IsNull;
/*
* recompress the item given that we now know the exact page and
* offset for insertion
*/
- gistdentryinit(giststate, dentry,
- (((char *) itup) + sizeof(IndexTupleData)),
- (Relation) 0, (Page) 0, (OffsetNumber) InvalidOffsetNumber,
- IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE);
- gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page,
+ datum = index_getattr(itup, 1, r->rd_att, &IsNull);
+ gistdentryinit(giststate, 0,dentry, datum,
+ (Relation) 0, (Page) 0,
+ (OffsetNumber) InvalidOffsetNumber,
+ ATTSIZE( datum, r, 1, IsNull ),
+ FALSE);
+ gistcentryinit(giststate, 0,&tmpcentry, dentry->key, r, page,
offsetNumber, dentry->bytes, FALSE);
*newtup = gist_tuple_replacekey(r, tmpcentry, itup);
retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
@@ -406,11 +433,13 @@ gistPageAddItem(GISTSTATE *giststate,
elog(ERROR, "gist: failed to add index item to %s",
RelationGetRelationName(r));
/* be tidy */
- if (tmpcentry.pred && tmpcentry.pred != dentry->pred
- && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
- pfree(tmpcentry.pred);
+ if (DatumGetPointer(tmpcentry.key) != NULL &&
+ tmpcentry.key != dentry->key &&
+ tmpcentry.key != datum )
+ pfree(DatumGetPointer(tmpcentry.key));
return (retval);
}
+#endif
static void
gistdoinsert(Relation r,
@@ -485,7 +514,6 @@ gistlayerinsert(Relation r, BlockNumber blkno,
if (!(ret & SPLITED))
{
IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
-
if (!newtup)
{
/* not need to update key */
@@ -509,18 +537,23 @@ gistlayerinsert(Relation r, BlockNumber blkno,
if (gistnospace(page, (*itup), *len))
{
/* no space for insertion */
- IndexTuple *itvec;
- int tlen;
+ IndexTuple *itvec, *newitup;
+ int tlen,oldlen;
ret |= SPLITED;
itvec = gistreadbuffer(r, buffer, &tlen);
itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
- pfree((*itup));
- (*itup) = gistSplit(r, buffer, itvec, &tlen, giststate,
+ oldlen = *len;
+ newitup = gistSplit(r, buffer, itvec, &tlen, giststate,
(opaque->flags & F_LEAF) ? res : NULL); /* res only for
* inserting in leaf */
ReleaseBuffer(buffer);
+ do
+ pfree( (*itup)[ oldlen-1 ] );
+ while ( (--oldlen) > 0 );
+ pfree((*itup));
pfree(itvec);
+ *itup = newitup;
*len = tlen; /* now tlen >= 2 */
}
else
@@ -551,6 +584,7 @@ gistlayerinsert(Relation r, BlockNumber blkno,
* parent
*/
IndexTuple newtup = gistunion(r, (*itup), *len, giststate);
+ ItemPointerSet(&(newtup->t_tid), blkno, 1);
for (i = 0; i < *len; i++)
pfree((*itup)[i]);
@@ -571,19 +605,30 @@ gistwritebuffer(Relation r, Page page, IndexTuple *itup,
{
OffsetNumber l = InvalidOffsetNumber;
int i;
+#ifdef GIST_PAGEADDITEM
GISTENTRY tmpdentry;
IndexTuple newtup;
-
+ bool IsNull;
+#endif
for (i = 0; i < len; i++)
{
+#ifdef GIST_PAGEADDITEM
l = gistPageAddItem(giststate, r, page,
(Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED, &tmpdentry, &newtup);
off = OffsetNumberNext(off);
- if (tmpdentry.pred != (((char *) itup[i]) + sizeof(IndexTupleData)) && tmpdentry.pred)
- pfree(tmpdentry.pred);
+ if (DatumGetPointer(tmpdentry.key) != NULL &&
+ tmpdentry.key != index_getattr(itup[i], 1, r->rd_att, &IsNull))
+ pfree(DatumGetPointer(tmpdentry.key));
if (itup[i] != newtup)
pfree(newtup);
+#else
+ l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
+ off, LP_USED);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "gist: failed to add index item to %s",
+ RelationGetRelationName(r));
+#endif
}
return l;
}
@@ -598,7 +643,7 @@ gistnospace(Page page, IndexTuple *itvec, int len)
int i;
for (i = 0; i < len; i++)
- size += IndexTupleSize(itvec[i]) + 4; /* ??? */
+ size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
return (PageGetFreeSpace(page) < size);
}
@@ -614,11 +659,11 @@ gistreadbuffer(Relation r, Buffer buffer, int *len /* out */ )
IndexTuple *itvec;
Page p = (Page) BufferGetPage(buffer);
- *len = 0;
maxoff = PageGetMaxOffsetNumber(p);
+ *len = maxoff;
itvec = palloc(sizeof(IndexTuple) * maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- itvec[(*len)++] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+ itvec[i-1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
return itvec;
}
@@ -639,46 +684,66 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
* return union of itup vector
*/
static IndexTuple
-gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
-{
+gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) {
+ Datum attr[INDEX_MAX_KEYS];
+ bool whatfree[INDEX_MAX_KEYS];
+ char isnull[INDEX_MAX_KEYS];
bytea *evec;
- char *datum;
+ Datum datum;
int datumsize,
- i;
- GISTENTRY centry;
- char isnull;
- IndexTuple newtup;
+ i,j;
+ GISTENTRY centry[INDEX_MAX_KEYS];
+ bool *needfree;
+ IndexTuple newtup;
+ bool IsNull;
+ needfree = (bool *) palloc(len * sizeof(bool));
evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ);
VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ;
- for (i = 0; i < len; i++)
- gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[i],
- (char *) itvec[i] + sizeof(IndexTupleData),
- (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
- IndexTupleSize((IndexTuple) itvec[i]) - sizeof(IndexTupleData), FALSE);
-
- datum = (char *)
- DatumGetPointer(FunctionCall2(&giststate->unionFn,
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize)));
+ for (j = 0; j < r->rd_att->natts; j++) {
+ for (i = 0; i < len; i++) {
+ datum = index_getattr(itvec[i], j+1, r->rd_att, &IsNull);
+ gistdentryinit(giststate, j,
+ &((GISTENTRY *) VARDATA(evec))[i],
+ datum,
+ (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
+ ATTSIZE( datum, r, j+1, IsNull ), FALSE);
+ if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL
+ && ((GISTENTRY *) VARDATA(evec))[i].key != datum )
+ needfree[i] = TRUE;
+ else
+ needfree[i] = FALSE;
+ }
- for (i = 0; i < len; i++)
- if (((GISTENTRY *) VARDATA(evec))[i].pred &&
- ((GISTENTRY *) VARDATA(evec))[i].pred !=
- ((char *) (itvec[i]) + sizeof(IndexTupleData)))
- pfree(((GISTENTRY *) VARDATA(evec))[i].pred);
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ for (i = 0; i < len; i++)
+ if ( needfree[i] )
+ pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key));
+
+ gistcentryinit(giststate, j, &centry[j], datum,
+ (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
+ datumsize, FALSE);
+ isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n';
+ attr[j] = centry[j].key;
+ if ( DatumGetPointer(centry[j].key) != NULL ) {
+ whatfree[j] = TRUE;
+ if ( centry[j].key != datum )
+ pfree(DatumGetPointer(datum));
+ } else
+ whatfree[j] = FALSE;
+ }
pfree(evec);
+ pfree(needfree);
- gistcentryinit(giststate, &centry, datum,
- (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
- datumsize, FALSE);
-
- isnull = (centry.pred) ? ' ' : 'n';
- newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &centry.pred, &isnull);
- if (centry.pred != datum)
- pfree(datum);
+ newtup = (IndexTuple) index_formtuple(r->rd_att, attr, isnull);
+ for (j = 0; j < r->rd_att->natts; j++)
+ if ( whatfree[j] )
+ pfree(DatumGetPointer(attr[j]));
return newtup;
}
@@ -690,71 +755,232 @@ static IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
bytea *evec;
- char *datum;
+ Datum datum;
int datumsize;
- bool result;
- char isnull;
- GISTENTRY centry,
+ bool result, neednew = false;
+ char isnull[INDEX_MAX_KEYS],
+ whatfree[INDEX_MAX_KEYS];
+ Datum attr[INDEX_MAX_KEYS];
+ GISTENTRY centry[INDEX_MAX_KEYS],
+ oldatt[INDEX_MAX_KEYS],
+ addatt[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
+ bool olddec[INDEX_MAX_KEYS],
+ adddec[INDEX_MAX_KEYS];
+
IndexTuple newtup = NULL;
+ int j;
evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ);
VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ;
-
- gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[0],
- (char *) oldtup + sizeof(IndexTupleData), (Relation) NULL,
- (Page) NULL, (OffsetNumber) 0,
- IndexTupleSize((IndexTuple) oldtup) - sizeof(IndexTupleData), FALSE);
ev0p = &((GISTENTRY *) VARDATA(evec))[0];
-
- gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[1],
- (char *) addtup + sizeof(IndexTupleData), (Relation) NULL,
- (Page) NULL, (OffsetNumber) 0,
- IndexTupleSize((IndexTuple) addtup) - sizeof(IndexTupleData), FALSE);
ev1p = &((GISTENTRY *) VARDATA(evec))[1];
+
+ gistDeCompressAtt( giststate, r, oldtup, (Page) NULL,
+ (OffsetNumber) 0, oldatt, olddec);
+
+ gistDeCompressAtt( giststate, r, addtup, (Page) NULL,
+ (OffsetNumber) 0, addatt, adddec);
+
+
+ for( j=0; j<r->rd_att->natts; j++ ) {
+ gistentryinit(*ev0p, oldatt[j].key, r, (Page) NULL,
+ (OffsetNumber) 0, oldatt[j].bytes, FALSE);
+ gistentryinit(*ev1p, addatt[j].key, r, (Page) NULL,
+ (OffsetNumber) 0, addatt[j].bytes, FALSE);
+
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ if (!(DatumGetPointer(ev0p->key) != NULL &&
+ DatumGetPointer(ev1p->key) != NULL))
+ result = (DatumGetPointer(ev0p->key) == NULL &&
+ DatumGetPointer(ev1p->key) == NULL);
+ else
+ {
+ FunctionCall3(&giststate->equalFn[j],
+ ev0p->key,
+ datum,
+ PointerGetDatum(&result));
+ }
+ if ( !result )
+ neednew = true;
+
+ if ( olddec[j] && DatumGetPointer(oldatt[j].key) != NULL )
+ pfree( DatumGetPointer(oldatt[j].key) );
+ if ( adddec[j] && DatumGetPointer(addatt[j].key) != NULL )
+ pfree( DatumGetPointer(addatt[j].key) );
+
+ gistcentryinit(giststate, j, &centry[j], datum,
+ (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
+ datumsize, FALSE);
+
+ isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n';
+ attr[j] = centry[j].key;
+ if ( DatumGetPointer(centry[j].key) != NULL ) {
+ whatfree[j] = TRUE;
+ if ( centry[j].key != datum )
+ pfree(DatumGetPointer(datum));
+ } else
+ whatfree[j] = FALSE;
+
+ }
+ pfree(evec);
- datum = (char *)
- DatumGetPointer(FunctionCall2(&giststate->unionFn,
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize)));
-
- if (!(ev0p->pred && ev1p->pred))
- result = (ev0p->pred == NULL && ev1p->pred == NULL);
- else
- {
- FunctionCall3(&giststate->equalFn,
- PointerGetDatum(ev0p->pred),
- PointerGetDatum(datum),
- PointerGetDatum(&result));
+ if (neednew) {
+ /* need to update key */
+ newtup = (IndexTuple) index_formtuple(r->rd_att, attr, isnull);
+ newtup->t_tid = oldtup->t_tid;
}
+
+ for (j = 0; j < r->rd_att->natts; j++)
+ if ( whatfree[j] )
+ pfree(DatumGetPointer(attr[j]));
- if (result)
- {
- /* not need to update key */
- pfree(datum);
- }
- else
- {
- gistcentryinit(giststate, &centry, datum, ev0p->rel, ev0p->page,
- ev0p->offset, datumsize, FALSE);
+ return newtup;
+}
- isnull = (centry.pred) ? ' ' : 'n';
- newtup = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &centry.pred, &isnull);
- newtup->t_tid = oldtup->t_tid;
- if (centry.pred != datum)
- pfree(datum);
+static void
+gistunionsubkey( Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC * spl ) {
+ int i,j,lr;
+ Datum *attr;
+ bool *needfree, IsNull;
+ int len, *attrsize;
+ OffsetNumber *entries;
+ bytea *evec;
+ Datum datum;
+ int datumsize;
+
+ for(lr=0;lr<=1;lr++) {
+ if ( lr ) {
+ attrsize = spl->spl_lattrsize;
+ attr = spl->spl_lattr;
+ len = spl->spl_nleft;
+ entries = spl->spl_left;
+ } else {
+ attrsize = spl->spl_rattrsize;
+ attr = spl->spl_rattr;
+ len = spl->spl_nright;
+ entries = spl->spl_right;
+ }
+
+ needfree = (bool *) palloc( (( len==1 ) ? 2 : len ) * sizeof(bool));
+ evec = (bytea *) palloc( (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ);
+ VARATT_SIZEP(evec) = (( len==1 ) ? 2 : len ) * sizeof(GISTENTRY) + VARHDRSZ;
+ for (j = 1; j < r->rd_att->natts; j++) {
+ for (i = 0; i < len; i++) {
+ if ( spl->spl_idgrp[ entries[i] ] )
+ {
+ datum = (Datum) 0;
+ IsNull = true;
+ } else
+ datum = index_getattr(itvec[ entries[i]-1 ], j+1,
+ r->rd_att, &IsNull);
+ gistdentryinit(giststate, j,
+ &((GISTENTRY *) VARDATA(evec))[i],
+ datum,
+ (Relation) NULL, (Page) NULL,
+ (OffsetNumber) NULL,
+ ATTSIZE( datum, r, j+1, IsNull ), FALSE);
+ if ( DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key) != NULL &&
+ ((GISTENTRY *) VARDATA(evec))[i].key != datum )
+ needfree[i] = TRUE;
+ else
+ needfree[i] = FALSE;
+
+ }
+ if ( len == 1 &&
+ DatumGetPointer(((GISTENTRY *) VARDATA(evec))[0].key) == NULL)
+ {
+ datum = (Datum) 0;
+ datumsize = 0;
+ } else {
+ /*
+ * ((GISTENTRY *) VARDATA(evec))[0].bytes may be not defined,
+ * so form union with itself
+ */
+ if ( len == 1 ) {
+ memcpy( (void*) &((GISTENTRY *) VARDATA(evec))[1],
+ (void*) &((GISTENTRY *) VARDATA(evec))[0],
+ sizeof( GISTENTRY ) );
+ }
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+ }
+
+ for (i = 0; i < len; i++)
+ if ( needfree[i] )
+ pfree(DatumGetPointer(((GISTENTRY *) VARDATA(evec))[i].key));
+
+ attr[j] = datum;
+ attrsize[j] = datumsize;
+ }
+ pfree(evec);
+ pfree(needfree);
}
+}
- if (ev0p->pred &&
- ev0p->pred != (char *) oldtup + sizeof(IndexTupleData))
- pfree(ev0p->pred);
- if (ev1p->pred &&
- ev1p->pred != (char *) addtup + sizeof(IndexTupleData))
- pfree(ev1p->pred);
- pfree(evec);
+/*
+ * find group in vector with equial value
+ */
+static int
+gistfindgroup( GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC * spl ) {
+ int i,j,len;
+ int curid = 1;
+ bool result;
+
+ for(i=0; i<spl->spl_nleft; i++) {
+ if ( spl->spl_idgrp[ spl->spl_left[i] ]) continue;
+ len = 0;
+ /* find all equal value in right part */
+ for(j=0; j < spl->spl_nright; j++) {
+ if ( spl->spl_idgrp[ spl->spl_right[j] ]) continue;
+ if (!(DatumGetPointer(valvec[ spl->spl_left[i] ].key) != NULL &&
+ DatumGetPointer(valvec[ spl->spl_right[j]].key) != NULL))
+ result =
+ DatumGetPointer(valvec[ spl->spl_left[i] ].key) == NULL &&
+ DatumGetPointer(valvec[ spl->spl_right[j]].key) == NULL;
+ else
+ FunctionCall3(&giststate->equalFn[0],
+ valvec[ spl->spl_left[i] ].key,
+ valvec[ spl->spl_right[j] ].key,
+ PointerGetDatum(&result));
+ if ( result ) {
+ spl->spl_idgrp[ spl->spl_right[j] ] = curid;
+ len++;
+ }
+ }
+ /* find all other equal value in left part */
+ if ( len ) {
+ /* add current val to list of equial values*/
+ spl->spl_idgrp[ spl->spl_left[i] ]=curid;
+ /* searching .. */
+ for(j=i+1; j < spl->spl_nleft; j++) {
+ if ( spl->spl_idgrp[ spl->spl_left[j] ]) continue;
+ if (!(DatumGetPointer(valvec[ spl->spl_left[i]].key) != NULL &&
+ DatumGetPointer(valvec[ spl->spl_left[j]].key) != NULL))
+ result =
+ DatumGetPointer(valvec[ spl->spl_left[i]].key) == NULL &&
+ DatumGetPointer(valvec[ spl->spl_left[j]].key) == NULL;
+ else
+ FunctionCall3(&giststate->equalFn[0],
+ valvec[ spl->spl_left[i] ].key,
+ valvec[ spl->spl_left[j] ].key,
+ PointerGetDatum(&result));
+ if ( result ) {
+ spl->spl_idgrp[ spl->spl_left[j] ] = curid;
+ len++;
+ }
+ }
+ spl->spl_ngrp[curid] = len+1;
+ curid++;
+ }
+ }
- return newtup;
+ return curid;
}
/*
@@ -773,23 +999,20 @@ gistSplit(Relation r,
rightbuf;
Page left,
right;
- OffsetNumber *spl_left,
- *spl_right;
IndexTuple *lvectup,
*rvectup,
*newtup;
- int leftoff,
- rightoff;
BlockNumber lbknum,
rbknum;
GISTPageOpaque opaque;
- char isnull;
GIST_SPLITVEC v;
bytea *entryvec;
bool *decompvec;
- GISTENTRY tmpentry;
- int i,
+ int i,j,
nlen;
+ int MaxGrpId = 1;
+ Datum datum;
+ bool IsNull;
p = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
@@ -827,49 +1050,187 @@ gistSplit(Relation r,
VARATT_SIZEP(entryvec) = (*len + 1) * sizeof(GISTENTRY) + VARHDRSZ;
for (i = 1; i <= *len; i++)
{
- gistdentryinit(giststate, &((GISTENTRY *) VARDATA(entryvec))[i],
- (((char *) itup[i - 1]) + sizeof(IndexTupleData)),
- r, p, i,
- IndexTupleSize(itup[i - 1]) - sizeof(IndexTupleData), FALSE);
- if ((char *) (((GISTENTRY *) VARDATA(entryvec))[i].pred)
- == (((char *) itup[i - 1]) + sizeof(IndexTupleData)))
+ datum = index_getattr(itup[i - 1], 1, r->rd_att, &IsNull);
+ gistdentryinit(giststate, 0,&((GISTENTRY *) VARDATA(entryvec))[i],
+ datum, r, p, i,
+ ATTSIZE( datum, r, 1, IsNull ), FALSE);
+ if (((GISTENTRY *) VARDATA(entryvec))[i].key == datum ||
+ DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key) == NULL)
decompvec[i] = FALSE;
else
decompvec[i] = TRUE;
}
/* now let the user-defined picksplit function set up the split vector */
- FunctionCall2(&giststate->picksplitFn,
+ FunctionCall2(&giststate->picksplitFn[0],
PointerGetDatum(entryvec),
PointerGetDatum(&v));
- /* clean up the entry vector: its preds need to be deleted, too */
+ /* compatibility with old code */
+ if ( v.spl_left[ v.spl_nleft-1 ] == InvalidOffsetNumber )
+ v.spl_left[ v.spl_nleft-1 ] = (OffsetNumber)*len;
+ if ( v.spl_right[ v.spl_nright-1 ] == InvalidOffsetNumber )
+ v.spl_right[ v.spl_nright-1 ] = (OffsetNumber)*len;
+
+ v.spl_lattr[0] = v.spl_ldatum;
+ v.spl_rattr[0] = v.spl_rdatum;
+
+ /* if index is multikey, then we must to try get smaller
+ * bounding box for subkey(s)
+ */
+ if ( r->rd_att->natts > 1 ) {
+ v.spl_idgrp = (int*) palloc( sizeof(int) * (*len + 1) );
+ MemSet((void*)v.spl_idgrp, 0, sizeof(int) * (*len + 1) );
+ v.spl_grpflag = (char*) palloc( sizeof(char) * (*len + 1) );
+ MemSet((void*)v.spl_grpflag, 0, sizeof(char) * (*len + 1) );
+ v.spl_ngrp = (int*) palloc( sizeof(int) * (*len + 1) );
+
+ MaxGrpId = gistfindgroup( giststate, (GISTENTRY *) VARDATA(entryvec), &v );
+
+ /* form union of sub keys for each page (l,p) */
+ gistunionsubkey( r, giststate, itup, &v );
+
+ /* if possible, we insert equivalrnt tuples
+ * with control by penalty for a subkey(s)
+ */
+ if ( MaxGrpId > 1 ) {
+ int curlen;
+ OffsetNumber *curwpos;
+ bool decfree[INDEX_MAX_KEYS];
+ GISTENTRY entry,identry[INDEX_MAX_KEYS], *ev0p, *ev1p;
+ float lpenalty, rpenalty;
+ bytea *evec;
+ int datumsize;
+
+ /* clear vectors */
+ curlen = v.spl_nleft;
+ curwpos = v.spl_left;
+ for( i=0; i<v.spl_nleft;i++ )
+ if ( v.spl_idgrp[ v.spl_left[i] ] == 0 ) {
+ *curwpos = v.spl_left[i];
+ curwpos++;
+ } else
+ curlen--;
+ v.spl_nleft = curlen;
+
+ curlen = v.spl_nright;
+ curwpos = v.spl_right;
+ for( i=0; i<v.spl_nright;i++ )
+ if ( v.spl_idgrp[ v.spl_right[i] ] == 0 ) {
+ *curwpos = v.spl_right[i];
+ curwpos++;
+ } else
+ curlen--;
+ v.spl_nright = curlen;
+
+ evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ);
+ VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ;
+ ev0p = &((GISTENTRY *) VARDATA(evec))[0];
+ ev1p = &((GISTENTRY *) VARDATA(evec))[1];
+
+ /* add equivalent tuple */
+ for(i = 0; i< *len; i++) {
+ if ( v.spl_idgrp[ i+1 ]==0 ) /* already inserted */
+ continue;
+ gistDeCompressAtt( giststate, r, itup[i], (Page) NULL, (OffsetNumber) 0,
+ identry, decfree);
+
+ v.spl_ngrp[ v.spl_idgrp[ i+1 ] ]--;
+ if ( v.spl_ngrp[ v.spl_idgrp[ i+1 ] ] == 0 &&
+ (v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] & BOTH_ADDED) != BOTH_ADDED ) {
+
+ /* force last in group */
+ rpenalty = 1.0;
+ lpenalty = ( v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] & LEFT_ADDED ) ? 2.0 : 0.0;
+ } else {
+ /*where?*/
+ for( j=1; j<r->rd_att->natts; j++ ) {
+ gistentryinit(entry,v.spl_lattr[j], r, (Page) NULL,
+ (OffsetNumber) 0, v.spl_lattrsize[j], FALSE);
+ FunctionCall3(&giststate->penaltyFn[j],
+ PointerGetDatum(&entry),
+ PointerGetDatum(&identry[j]),
+ PointerGetDatum(&lpenalty));
+
+ gistentryinit(entry,v.spl_rattr[j], r, (Page) NULL,
+ (OffsetNumber) 0, v.spl_rattrsize[j], FALSE);
+ FunctionCall3(&giststate->penaltyFn[j],
+ PointerGetDatum(&entry),
+ PointerGetDatum(&identry[j]),
+ PointerGetDatum(&rpenalty));
+
+ if ( lpenalty != rpenalty )
+ break;
+ }
+ }
+ /* add */
+ if ( lpenalty < rpenalty ) {
+ v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= LEFT_ADDED;
+ v.spl_left[ v.spl_nleft ] = i+1;
+ v.spl_nleft++;
+ for( j=1; j<r->rd_att->natts; j++ ) {
+ gistentryinit(*ev0p, v.spl_lattr[j], r, (Page) NULL,
+ (OffsetNumber) 0, v.spl_lattrsize[j], FALSE);
+ gistentryinit(*ev1p, identry[j].key, r, (Page) NULL,
+ (OffsetNumber) 0, identry[j].bytes, FALSE);
+
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ if ( DatumGetPointer(v.spl_lattr[j]) != NULL )
+ pfree( DatumGetPointer(v.spl_lattr[j]) );
+
+ v.spl_lattr[j] = datum;
+ v.spl_lattrsize[j] = datumsize;
+ }
+ } else {
+ v.spl_grpflag[ v.spl_idgrp[ i+1 ] ] |= RIGHT_ADDED;
+ v.spl_right[ v.spl_nright ] = i+1;
+ v.spl_nright++;
+ for( j=1; j<r->rd_att->natts; j++ ) {
+ gistentryinit(*ev0p, v.spl_rattr[j], r, (Page) NULL,
+ (OffsetNumber) 0, v.spl_rattrsize[j], FALSE);
+ gistentryinit(*ev1p, identry[j].key, r, (Page) NULL,
+ (OffsetNumber) 0, identry[j].bytes, FALSE);
+
+ datum = FunctionCall2(&giststate->unionFn[j],
+ PointerGetDatum(evec),
+ PointerGetDatum(&datumsize));
+
+ if ( DatumGetPointer(v.spl_rattr[j]) != NULL )
+ pfree( DatumGetPointer(v.spl_rattr[j]) );
+
+ v.spl_rattr[j] = datum;
+ v.spl_rattrsize[j] = datumsize;
+ }
+
+ }
+ gistFreeAtt( r, identry, decfree );
+ }
+ pfree(evec);
+ }
+ pfree( v.spl_idgrp );
+ pfree( v.spl_grpflag );
+ pfree( v.spl_ngrp );
+ }
+
+ /* clean up the entry vector: its keys need to be deleted, too */
for (i = 1; i <= *len; i++)
- if (decompvec[i] && ((GISTENTRY *) VARDATA(entryvec))[i].pred)
- pfree(((GISTENTRY *) VARDATA(entryvec))[i].pred);
+ if (decompvec[i])
+ pfree(DatumGetPointer(((GISTENTRY *) VARDATA(entryvec))[i].key));
pfree(entryvec);
pfree(decompvec);
- spl_left = v.spl_left;
- spl_right = v.spl_right;
-
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft);
rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright);
- leftoff = rightoff = 0;
- for (i = 1; i <= *len; i++)
- {
- if (i == *(spl_left) || (i == *len && *(spl_left) != FirstOffsetNumber))
- {
- lvectup[leftoff++] = itup[i - 1];
- spl_left++;
- }
- else
- {
- rvectup[rightoff++] = itup[i - 1];
- spl_right++;
- }
- }
+
+ for(i=0; i<v.spl_nleft;i++)
+ lvectup[i] = itup[ v.spl_left[i] - 1 ];
+
+ for(i=0; i<v.spl_nright;i++)
+ rvectup[i] = itup[ v.spl_right[i] - 1 ];
/* write on disk (may be need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
@@ -878,6 +1239,9 @@ gistSplit(Relation r,
newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate,
(res && rvectup[nlen - 1] == itup[*len - 1]) ? res : NULL);
ReleaseBuffer(rightbuf);
+ for( j=1; j<r->rd_att->natts; j++ )
+ if ( DatumGetPointer(v.spl_rattr[j]) != NULL )
+ pfree( DatumGetPointer(v.spl_rattr[j]) );
}
else
{
@@ -888,20 +1252,14 @@ gistSplit(Relation r,
if (res)
ItemPointerSet(&((*res)->pointerData), rbknum, l);
- gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL,
- (Page) NULL, (OffsetNumber) 0,
- -1, FALSE);
- if (v.spl_rdatum != tmpentry.pred)
- pfree(v.spl_rdatum);
- v.spl_rdatum = tmpentry.pred;
nlen = 1;
newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
- isnull = (v.spl_rdatum) ? ' ' : 'n';
- newtup[0] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_rdatum), &isnull);
+ newtup[0] = gistFormTuple( giststate, r, v.spl_rattr, v.spl_rattrsize );
ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
}
+
if (gistnospace(left, lvectup, v.spl_nleft))
{
int llen = v.spl_nleft;
@@ -911,6 +1269,10 @@ gistSplit(Relation r,
(res && lvectup[llen - 1] == itup[*len - 1]) ? res : NULL);
ReleaseBuffer(leftbuf);
+ for( j=1; j<r->rd_att->natts; j++ )
+ if ( DatumGetPointer(v.spl_lattr[j]) != NULL )
+ pfree( DatumGetPointer(v.spl_lattr[j]) );
+
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
pfree(lntup);
}
@@ -926,17 +1288,10 @@ gistSplit(Relation r,
if (res)
ItemPointerSet(&((*res)->pointerData), lbknum, l);
- gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL,
- (Page) NULL, (OffsetNumber) 0,
- -1, FALSE);
- if (v.spl_ldatum != tmpentry.pred)
- pfree(v.spl_ldatum);
- v.spl_ldatum = tmpentry.pred;
nlen += 1;
newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen);
- isnull = (v.spl_ldatum) ? ' ' : 'n';
- newtup[nlen - 1] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_ldatum), &isnull);
+ newtup[nlen - 1] = gistFormTuple( giststate, r, v.spl_lattr, v.spl_lattrsize );
ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
}
@@ -995,48 +1350,52 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
{
OffsetNumber maxoff;
OffsetNumber i;
- char *id;
- char *datum;
+ Datum datum;
float usize;
OffsetNumber which;
- float which_grow;
+ float sum_grow, which_grow[INDEX_MAX_KEYS];
GISTENTRY entry,
- identry;
- int size,
- idsize;
+ identry[INDEX_MAX_KEYS];
+ bool IsNull, decompvec[INDEX_MAX_KEYS];
+ int j;
- idsize = IndexTupleSize(it) - sizeof(IndexTupleData);
- id = ((char *) it) + sizeof(IndexTupleData);
maxoff = PageGetMaxOffsetNumber(p);
- which_grow = -1.0;
+ *which_grow = -1.0;
which = -1;
+ sum_grow=1;
+ gistDeCompressAtt( giststate, r,
+ it, (Page) NULL, (OffsetNumber) 0,
+ identry, decompvec );
- gistdentryinit(giststate, &identry, id, (Relation) NULL, (Page) NULL,
- (OffsetNumber) 0, idsize, FALSE);
-
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
{
- datum = (char *) PageGetItem(p, PageGetItemId(p, i));
- size = IndexTupleSize(datum) - sizeof(IndexTupleData);
- datum += sizeof(IndexTupleData);
- gistdentryinit(giststate, &entry, datum, r, p, i, size, FALSE);
- FunctionCall3(&giststate->penaltyFn,
- PointerGetDatum(&entry),
- PointerGetDatum(&identry),
- PointerGetDatum(&usize));
- if (which_grow < 0 || usize < which_grow)
- {
- which = i;
- which_grow = usize;
- if (which_grow == 0)
+ sum_grow=0;
+ for( j=0; j<r->rd_att->natts; j++ ) {
+ datum = index_getattr( (IndexTuple)PageGetItem(p, PageGetItemId(p, i)), j+1, r->rd_att, &IsNull);
+ gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE( datum, r, j+1, IsNull ), FALSE);
+ FunctionCall3(&giststate->penaltyFn[j],
+ PointerGetDatum(&entry),
+ PointerGetDatum(&identry[j]),
+ PointerGetDatum(&usize));
+
+ if (DatumGetPointer(entry.key) != NULL && entry.key != datum)
+ pfree(DatumGetPointer(entry.key));
+
+ if ( which_grow[j]<0 || usize < which_grow[j] ) {
+ which = i;
+ which_grow[j] = usize;
+ if ( j<r->rd_att->natts-1 && i==FirstOffsetNumber ) which_grow[j+1]=-1;
+ sum_grow += which_grow[j];
+ } else if ( which_grow[j] == usize ) {
+ sum_grow += usize;
+ } else {
+ sum_grow=1;
break;
+ }
}
- if (entry.pred && entry.pred != datum)
- pfree(entry.pred);
}
- if (identry.pred && identry.pred != id)
- pfree(identry.pred);
+ gistFreeAtt( r, identry, decompvec );
return which;
}
@@ -1104,21 +1463,28 @@ initGISTstate(GISTSTATE *giststate, Relation index)
HeapTuple htup;
Form_pg_index itupform;
Oid indexrelid;
-
- consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC);
- union_proc = index_getprocid(index, 1, GIST_UNION_PROC);
- compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC);
- decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC);
- penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC);
- picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC);
- equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC);
- fmgr_info(consistent_proc, &giststate->consistentFn);
- fmgr_info(union_proc, &giststate->unionFn);
- fmgr_info(compress_proc, &giststate->compressFn);
- fmgr_info(decompress_proc, &giststate->decompressFn);
- fmgr_info(penalty_proc, &giststate->penaltyFn);
- fmgr_info(picksplit_proc, &giststate->picksplitFn);
- fmgr_info(equal_proc, &giststate->equalFn);
+ int i;
+
+ if (index->rd_att->natts >= INDEX_MAX_KEYS)
+ elog(ERROR, "initGISTstate: numberOfAttributes %d > %d",
+ index->rd_att->natts, INDEX_MAX_KEYS);
+
+ for(i=0; i<index->rd_att->natts; i++) {
+ consistent_proc = index_getprocid(index, i+1, GIST_CONSISTENT_PROC );
+ union_proc = index_getprocid(index, i+1, GIST_UNION_PROC );
+ compress_proc = index_getprocid(index, i+1, GIST_COMPRESS_PROC );
+ decompress_proc = index_getprocid(index, i+1, GIST_DECOMPRESS_PROC );
+ penalty_proc = index_getprocid(index, i+1, GIST_PENALTY_PROC );
+ picksplit_proc = index_getprocid(index, i+1, GIST_PICKSPLIT_PROC );
+ equal_proc = index_getprocid(index, i+1, GIST_EQUAL_PROC );
+ fmgr_info(consistent_proc, &((giststate->consistentFn)[i]) );
+ fmgr_info(union_proc, &((giststate->unionFn)[i]) );
+ fmgr_info(compress_proc, &((giststate->compressFn)[i]) );
+ fmgr_info(decompress_proc, &((giststate->decompressFn)[i]) );
+ fmgr_info(penalty_proc, &((giststate->penaltyFn)[i]) );
+ fmgr_info(picksplit_proc, &((giststate->picksplitFn)[i]) );
+ fmgr_info(equal_proc, &((giststate->equalFn)[i]) );
+ }
/* see if key type is different from type of attribute being indexed */
htup = SearchSysCache(INDEXRELID,
@@ -1149,21 +1515,31 @@ initGISTstate(GISTSTATE *giststate, Relation index)
giststate->keytypbyval = FALSE;
}
-
+#ifdef GIST_PAGEADDITEM
/*
** Given an IndexTuple to be inserted on a page, this routine replaces
** the key with another key, which may involve generating a new IndexTuple
-** if the sizes don't match
+** if the sizes don't match or if the null status changes.
+**
+** XXX this only works for a single-column index tuple!
*/
static IndexTuple
gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
{
- char *datum = (((char *) t) + sizeof(IndexTupleData));
+ bool IsNull;
+ Datum datum = index_getattr(t, 1, r->rd_att, &IsNull);
- /* if new entry fits in index tuple, copy it in */
- if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData) || (Size) entry.bytes == 0)
+ /*
+ * If new entry fits in index tuple, copy it in. To avoid worrying
+ * about null-value bitmask, pass it off to the general index_formtuple
+ * routine if either the previous or new value is NULL.
+ */
+ if (!IsNull && DatumGetPointer(entry.key) != NULL &&
+ (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull))
{
- memcpy(datum, entry.pred, entry.bytes);
+ memcpy(DatumGetPointer(datum),
+ DatumGetPointer(entry.key),
+ entry.bytes);
/* clear out old size */
t->t_info &= ~INDEX_SIZE_MASK;
/* or in new size */
@@ -1178,65 +1554,131 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
IndexTuple newtup;
char isnull;
- isnull = (entry.pred) ? ' ' : 'n';
+ isnull = DatumGetPointer(entry.key) != NULL ? ' ' : 'n';
newtup = (IndexTuple) index_formtuple(tupDesc,
- (Datum *) &(entry.pred),
+ &(entry.key),
&isnull);
newtup->t_tid = t->t_tid;
return newtup;
}
}
-
+#endif
/*
-** initialize a GiST entry with a decompressed version of pred
+** initialize a GiST entry with a decompressed version of key
*/
void
-gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
- Page pg, OffsetNumber o, int b, bool l)
+gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+ Datum k, Relation r, Page pg, OffsetNumber o,
+ int b, bool l)
{
GISTENTRY *dep;
- gistentryinit(*e, pr, r, pg, o, b, l);
+ gistentryinit(*e, k, r, pg, o, b, l);
if (giststate->haskeytype)
{
if ( b ) {
dep = (GISTENTRY *)
- DatumGetPointer(FunctionCall1(&giststate->decompressFn,
+ DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
PointerGetDatum(e)));
- gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes,
+ gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, dep->bytes,
dep->leafkey);
if (dep != e)
pfree(dep);
} else {
- gistentryinit(*e, (char*)NULL, r, pg, o, 0, l);
+ gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
}
}
}
/*
-** initialize a GiST entry with a compressed version of pred
+** initialize a GiST entry with a compressed version of key
*/
static void
-gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
+gistcentryinit(GISTSTATE *giststate, int nkey,
+ GISTENTRY *e, Datum k, Relation r,
Page pg, OffsetNumber o, int b, bool l)
{
GISTENTRY *cep;
- gistentryinit(*e, pr, r, pg, o, b, l);
+ gistentryinit(*e, k, r, pg, o, b, l);
if (giststate->haskeytype)
{
cep = (GISTENTRY *)
- DatumGetPointer(FunctionCall1(&giststate->compressFn,
+ DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
PointerGetDatum(e)));
- gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes,
- cep->leafkey);
+ gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
+ cep->bytes, cep->leafkey);
if (cep != e)
pfree(cep);
}
}
+static IndexTuple
+gistFormTuple( GISTSTATE *giststate, Relation r,
+ Datum attdata[], int datumsize[] )
+{
+ IndexTuple tup;
+ char isnull[INDEX_MAX_KEYS];
+ bool whatfree[INDEX_MAX_KEYS];
+ GISTENTRY centry[INDEX_MAX_KEYS];
+ Datum compatt[INDEX_MAX_KEYS];
+ int j;
+
+ for (j = 0; j < r->rd_att->natts; j++) {
+ gistcentryinit(giststate, j, &centry[j], attdata[j],
+ (Relation) NULL, (Page) NULL, (OffsetNumber) NULL,
+ datumsize[j], FALSE);
+ isnull[j] = DatumGetPointer(centry[j].key) != NULL ? ' ' : 'n';
+ compatt[j] = centry[j].key;
+ if ( DatumGetPointer(centry[j].key) != NULL ) {
+ whatfree[j] = TRUE;
+ if ( centry[j].key != attdata[j] )
+ pfree(DatumGetPointer(attdata[j]));
+ } else
+ whatfree[j] = FALSE;
+ }
+
+ tup = (IndexTuple) index_formtuple(r->rd_att, compatt, isnull);
+ for (j = 0; j < r->rd_att->natts; j++)
+ if ( whatfree[j] ) pfree(DatumGetPointer(compatt[j]));
+
+ return tup;
+}
+
+static bool
+gistDeCompressAtt( GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
+ OffsetNumber o, GISTENTRY attdata[], bool decompvec[] ) {
+ bool allIsNull=true;
+ bool IsNull;
+ int i;
+ Datum datum;
+
+ for(i=0; i < r->rd_att->natts; i++ ) {
+ datum = index_getattr(tuple, i+1, r->rd_att, &IsNull);
+ if ( ! IsNull ) allIsNull = false;
+ gistdentryinit(giststate, i, &attdata[i],
+ datum, r, p, o,
+ ATTSIZE( datum, r, i+1, IsNull ), FALSE);
+ if (attdata[i].key == datum ||
+ DatumGetPointer(attdata[i].key) == NULL )
+ decompvec[i] = FALSE;
+ else
+ decompvec[i] = TRUE;
+ }
+
+ return allIsNull;
+}
+
+static void
+gistFreeAtt( Relation r, GISTENTRY attdata[], bool decompvec[] ) {
+ int i;
+ for(i=0; i < r->rd_att->natts; i++ )
+ if ( decompvec[i] && DatumGetPointer(attdata[i].key) != NULL )
+ pfree( DatumGetPointer(attdata[i].key) );
+}
+
#ifdef GISTDEBUG
static void
gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
@@ -1261,7 +1703,9 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
maxoff = PageGetMaxOffsetNumber(page);
- elog(NOTICE, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, (int) maxoff, PageGetFreeSpace(page));
+ elog(NOTICE, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred,
+ coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk,
+ (int) maxoff, PageGetFreeSpace(page));
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
@@ -1269,7 +1713,8 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
which = (IndexTuple) PageGetItem(page, iid);
cblk = ItemPointerGetBlockNumber(&(which->t_tid));
#ifdef PRINTTUPLE
- elog(NOTICE, "%s Tuple. blk: %d size: %d", pred, (int) cblk, IndexTupleSize(which));
+ elog(NOTICE, "%s Tuple. blk: %d size: %d", pred, (int) cblk,
+ IndexTupleSize(which));
#endif
if (!(opaque->flags & F_LEAF))