aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/gist/gist.c256
-rw-r--r--src/backend/access/gist/gistget.c19
-rw-r--r--src/backend/access/gist/gistutil.c476
-rw-r--r--src/include/access/gist_private.h25
-rw-r--r--src/include/catalog/catversion.h4
-rw-r--r--src/include/catalog/pg_am.h4
6 files changed, 365 insertions, 419 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index cb10cbc35bd..54ac45ee2fe 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.137 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -181,32 +181,13 @@ gistbuildCallback(Relation index,
{
GISTBuildState *buildstate = (GISTBuildState *) state;
IndexTuple itup;
- GISTENTRY tmpcentry;
- int i;
MemoryContext oldCtx;
- /* GiST cannot index tuples with leading NULLs */
- if (isnull[0])
- return;
-
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
- /* immediately compress keys to normalize */
- for (i = 0; i < buildstate->numindexattrs; i++)
- {
- if (isnull[i])
- values[i] = (Datum) 0;
- else
- {
- gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i],
- NULL, NULL, (OffsetNumber) 0,
- -1 /* size is currently bogus */ , TRUE, FALSE);
- values[i] = tmpcentry.key;
- }
- }
-
/* form an index tuple and point it at the heap tuple */
- itup = index_form_tuple(buildstate->giststate.tupdesc, values, isnull);
+ itup = gistFormTuple(&buildstate->giststate, index,
+ values, NULL /* size is currently bogus */, isnull);
itup->t_tid = htup->t_self;
/*
@@ -243,34 +224,16 @@ gistinsert(PG_FUNCTION_ARGS)
#endif
IndexTuple itup;
GISTSTATE giststate;
- GISTENTRY tmpentry;
- int i;
MemoryContext oldCtx;
MemoryContext insertCtx;
- /* GiST cannot index tuples with leading NULLs */
- if (isnull[0])
- PG_RETURN_BOOL(false);
-
insertCtx = createTempGistContext();
oldCtx = MemoryContextSwitchTo(insertCtx);
initGISTstate(&giststate, r);
- /* immediately compress keys to normalize */
- for (i = 0; i < r->rd_att->natts; i++)
- {
- if (isnull[i])
- values[i] = (Datum) 0;
- else
- {
- gistcentryinit(&giststate, i, &tmpentry, values[i],
- NULL, NULL, (OffsetNumber) 0,
- -1 /* size is currently bogus */ , TRUE, FALSE);
- values[i] = tmpentry.key;
- }
- }
- itup = index_form_tuple(giststate.tupdesc, values, isnull);
+ itup = gistFormTuple(&giststate, r,
+ values, NULL /* size is currently bogus */, isnull);
itup->t_tid = *ht_ctid;
gistdoinsert(r, itup, &giststate);
@@ -937,7 +900,147 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
}
/*
- * gistSplit -- split a page in the tree.
+ * simple split page
+ */
+static void
+gistSplitHalf(GIST_SPLITVEC *v, int len) {
+ int i;
+
+ v->spl_nright = v->spl_nleft = 0;
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ for(i = 1; i <= len; i++)
+ if ( i<len/2 )
+ v->spl_right[ v->spl_nright++ ] = i;
+ else
+ v->spl_left[ v->spl_nleft++ ] = i;
+}
+
+/*
+ * if it was invalid tuple then we need special processing.
+ * We move all invalid tuples on right page.
+ *
+ * if there is no place on left page, gistSplit will be called one more
+ * time for left page.
+ *
+ * Normally, we never exec this code, but after crash replay it's possible
+ * to get 'invalid' tuples (probability is low enough)
+ */
+static void
+gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) {
+ int i;
+ static OffsetNumber offInvTuples[ MaxOffsetNumber ];
+ int nOffInvTuples = 0;
+
+ for (i = 1; i <= len; i++)
+ if ( GistTupleIsInvalid(itup[i - 1]) )
+ offInvTuples[ nOffInvTuples++ ] = i;
+
+ if ( nOffInvTuples == len ) {
+ /* corner case, all tuples are invalid */
+ v->spl_rightvalid= v->spl_leftvalid = false;
+ gistSplitHalf( v, len );
+ } else {
+ GistSplitVec gsvp;
+
+ v->spl_right = offInvTuples;
+ v->spl_nright = nOffInvTuples;
+ v->spl_rightvalid = false;
+
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ for(i = 1; i <= len; i++)
+ if ( !GistTupleIsInvalid(itup[i - 1]) )
+ v->spl_left[ v->spl_nleft++ ] = i;
+ v->spl_leftvalid = true;
+
+ gsvp.idgrp = NULL;
+ gsvp.attrsize = v->spl_lattrsize;
+ gsvp.attr = v->spl_lattr;
+ gsvp.len = v->spl_nleft;
+ gsvp.entries = v->spl_left;
+ gsvp.isnull = v->spl_lisnull;
+
+ gistunionsubkeyvec(giststate, itup, &gsvp, 0);
+ }
+}
+
+/*
+ * trys to split page by attno key, in a case of null
+ * values move its to separate page.
+ */
+static void
+gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
+ GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) {
+ int i;
+ static OffsetNumber offNullTuples[ MaxOffsetNumber ];
+ int nOffNullTuples = 0;
+
+
+ for (i = 1; i <= len; i++) {
+ Datum datum;
+ bool IsNull;
+
+ if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
+ gistSplitByInvalid(giststate, v, itup, len);
+ return;
+ }
+
+ datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
+ gistdentryinit(giststate, attno, &(entryvec->vector[i]),
+ datum, r, page, i,
+ ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull),
+ FALSE, IsNull);
+ if ( IsNull )
+ offNullTuples[ nOffNullTuples++ ] = i;
+ }
+
+ v->spl_leftvalid = v->spl_rightvalid = true;
+
+ if ( nOffNullTuples == len ) {
+ /*
+ * Corner case: All keys in attno column are null, we should try to
+ * by keys in next column. It all keys in all columns
+ * are NULL just split page half by half
+ */
+ v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
+ if ( attno+1 == r->rd_att->natts )
+ gistSplitHalf( v, len );
+ else
+ gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+ } else if ( nOffNullTuples > 0 ) {
+ int j=0;
+
+ /*
+ * We don't want to mix NULLs and not-NULLs keys
+ * on one page, so move nulls to right page
+ */
+ v->spl_right = offNullTuples;
+ v->spl_nright = nOffNullTuples;
+ v->spl_risnull[attno] = TRUE;
+
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ for(i = 1; i <= len; i++)
+ if ( j<v->spl_nright && offNullTuples[j] == i )
+ j++;
+ else
+ v->spl_left[ v->spl_nleft++ ] = i;
+
+ v->spl_idgrp = NULL;
+ gistunionsubkey(giststate, itup, v, 0);
+ } else {
+ /*
+ * all keys are not-null
+ */
+ gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate);
+ }
+}
+
+/*
+ * gistSplit -- split a page in the tree and fill struct
+ * used for XLOG and real writes buffers. Function is recursive, ie
+ * it will split page until keys will fit in every page.
*/
SplitedPageLayout *
gistSplit(Relation r,
@@ -951,77 +1054,14 @@ gistSplit(Relation r,
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i;
- OffsetNumber offInvTuples[ MaxOffsetNumber ];
- int nOffInvTuples = 0;
SplitedPageLayout *res = NULL;
/* generate the item array */
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
- for (i = 1; i <= len; i++)
- {
- Datum datum;
- bool IsNull;
-
- if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- /* remember position of invalid tuple */
- offInvTuples[ nOffInvTuples++ ] = i;
-
- if ( nOffInvTuples > 0 )
- /* we can safely do not decompress other keys, because
- we will do splecial processing, but
- it's needed to find another invalid tuples */
- continue;
-
- datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, 0, &(entryvec->vector[i]),
- datum, r, page, i,
- ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
- FALSE, IsNull);
- }
-
- /*
- * if it was invalid tuple then we need special processing.
- * We move all invalid tuples on right page.
- *
- * if there is no place on left page, gistSplit will be called one more
- * time for left page.
- *
- * Normally, we never exec this code, but after crash replay it's possible
- * to get 'invalid' tuples (probability is low enough)
- */
- if (nOffInvTuples > 0)
- {
- GistSplitVec gsvp;
-
- v.spl_right = offInvTuples;
- v.spl_nright = nOffInvTuples;
- v.spl_rightvalid = false;
-
- v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
- v.spl_nleft = 0;
- for(i = 1; i <= len; i++)
- if ( !GistTupleIsInvalid(itup[i - 1]) )
- v.spl_left[ v.spl_nleft++ ] = i;
- v.spl_leftvalid = true;
-
- gsvp.idgrp = NULL;
- gsvp.attrsize = v.spl_lattrsize;
- gsvp.attr = v.spl_lattr;
- gsvp.len = v.spl_nleft;
- gsvp.entries = v.spl_left;
- gsvp.isnull = v.spl_lisnull;
-
- gistunionsubkeyvec(giststate, itup, &gsvp, true);
- }
- else
- {
- /* there is no invalid tuples, so usial processing */
- gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
- v.spl_leftvalid = v.spl_rightvalid = true;
- }
-
+ gistSplitByKey(r, page, itup, len, giststate,
+ &v, entryvec, 0);
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 55d45b4a038..dcb8f2da54b 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.56 2006/03/05 15:58:20 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.57 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -361,7 +361,7 @@ gistindex_keytest(IndexTuple tuple,
IncrIndexProcessed();
/*
- * Tuple doesn't restore after crash recovery because of inclomplete
+ * Tuple doesn't restore after crash recovery because of incomplete
* insert
*/
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple))
@@ -378,14 +378,15 @@ gistindex_keytest(IndexTuple tuple,
key->sk_attno,
giststate->tupdesc,
&isNull);
- /* is the index entry NULL? */
- if (isNull)
- {
- /* XXX eventually should check if SK_ISNULL */
+
+ if ( key->sk_flags & SK_ISNULL ) {
+ /* is the compared-to datum NULL? on non-leaf page it's possible
+ to have nulls in childs :( */
+
+ if ( isNull || !GistPageIsLeaf(p) )
+ return true;
return false;
- }
- /* is the compared-to datum NULL? */
- if (key->sk_flags & SK_ISNULL)
+ } else if ( isNull )
return false;
gistdentryinit(giststate, key->sk_attno - 1, &de,
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 92798a27d30..3db72aa199d 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.14 2006/05/24 11:01:39 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -26,30 +26,18 @@
#define RIGHT_ADDED 0x02
#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
+static float gistpenalty(GISTSTATE *giststate, int attno,
+ GISTENTRY *key1, bool isNull1,
+ GISTENTRY *key2, bool isNull2);
+
/*
- * This defines is only for shorter code, used in gistgetadjusted
- * and gistadjsubkey only
+ * static *S used for temrorary storage (saves stack and palloc() call)
*/
-#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
- if (isnullkey) { \
- gistentryinit((evp), rkey, r, NULL, \
- (OffsetNumber) 0, rkeyb, FALSE); \
- } else { \
- gistentryinit((evp), okey, r, NULL, \
- (OffsetNumber) 0, okeyb, FALSE); \
- } \
-} while(0)
-
-#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
- FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
- FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
-} while(0);
-
-
-static void gistpenalty(GISTSTATE *giststate, int attno,
- GISTENTRY *key1, bool isNull1,
- GISTENTRY *key2, bool isNull2, float *penalty);
+
+static int attrsizeS[INDEX_MAX_KEYS];
+static Datum attrS[INDEX_MAX_KEYS];
+static bool isnullS[INDEX_MAX_KEYS];
/*
* Write itup vector to page, has no control of free space
@@ -164,87 +152,115 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
}
/*
- * Return an IndexTuple containing the result of applying the "union"
- * method to the specified IndexTuple vector.
+ * Make unions of keys in IndexTuple vector, return FALSE if itvec contains
+ * invalid tuple. Resulting Datums aren't compressed.
*/
-IndexTuple
-gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
-{
- Datum attr[INDEX_MAX_KEYS];
- bool isnull[INDEX_MAX_KEYS];
- GistEntryVector *evec;
+
+static bool
+gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
+ Datum *attr, bool *isnull, int *attrsize ) {
int i;
- GISTENTRY centry[INDEX_MAX_KEYS];
- IndexTuple res;
+ GistEntryVector *evec;
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
- for (i = 0; i < len; i++)
- if (GistTupleIsInvalid(itvec[i]))
- return gist_form_invalid_tuple(InvalidBlockNumber);
+ for (i = startkey; i < giststate->tupdesc->natts; i++) {
+ int j;
- for (i = 0; i < r->rd_att->natts; i++)
- {
- Datum datum;
- int j;
- int real_len;
+ evec->n = 0;
- real_len = 0;
- for (j = 0; j < len; j++)
- {
- bool IsNull;
+ for (j = 0; j < len; j++) {
+ Datum datum;
+ bool IsNull;
+
+ if (GistTupleIsInvalid(itvec[j]))
+ return FALSE; /* signals that union with invalid tuple => result is invalid */
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
- &(evec->vector[real_len]),
+ evec->vector + evec->n,
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
- real_len++;
+ evec->n++;
}
/* If this tuple vector was all NULLs, the union is NULL */
- if (real_len == 0)
- {
+ if ( evec->n == 0 ) {
attr[i] = (Datum) 0;
+ attrsize[i] = (Datum) 0;
isnull[i] = TRUE;
- }
- else
- {
- int datumsize;
-
- if (real_len == 1)
- {
+ } else {
+ if (evec->n == 1) {
evec->n = 2;
- gistentryinit(evec->vector[1],
- evec->vector[0].key, r, NULL,
- (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
- }
- else
- evec->n = real_len;
+ evec->vector[1] = evec->vector[0];
+ }
- /* Compress the result of the union and store in attr array */
- datum = FunctionCall2(&giststate->unionFn[i],
+ /* Make union and store in attr array */
+ attr[i] = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
+ PointerGetDatum(attrsize + i));
- gistcentryinit(giststate, i, &centry[i], datum,
- NULL, NULL, (OffsetNumber) 0,
- datumsize, FALSE, FALSE);
isnull[i] = FALSE;
- attr[i] = centry[i].key;
}
}
- res = index_form_tuple(giststate->tupdesc, attr, isnull);
- GistTupleSetValid(res);
- return res;
+ return TRUE;
+}
+
+/*
+ * Return an IndexTuple containing the result of applying the "union"
+ * method to the specified IndexTuple vector.
+ */
+IndexTuple
+gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+{
+ if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) )
+ return gist_form_invalid_tuple(InvalidBlockNumber);
+
+ return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
}
+/*
+ * makes union of two key
+ */
+static void
+gistMakeUnionKey( GISTSTATE *giststate, int attno,
+ GISTENTRY *entry1, bool isnull1,
+ GISTENTRY *entry2, bool isnull2,
+ Datum *dst, int *dstsize, bool *dstisnull ) {
+
+ static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ];
+ GistEntryVector *evec = (GistEntryVector*)storage;
+
+ evec->n = 2;
+
+ if ( isnull1 && isnull2 ) {
+ *dstisnull = TRUE;
+ *dst = (Datum)0;
+ *dstsize = 0;
+ } else {
+ if ( isnull1 == FALSE && isnull2 == FALSE ) {
+ evec->vector[0] = *entry1;
+ evec->vector[1] = *entry2;
+ } else if ( isnull1 == FALSE ) {
+ evec->vector[0] = *entry1;
+ evec->vector[1] = *entry1;
+ } else {
+ evec->vector[0] = *entry2;
+ evec->vector[1] = *entry2;
+ }
+
+ *dstisnull = FALSE;
+ *dst = FunctionCall2(&giststate->unionFn[attno],
+ PointerGetDatum(evec),
+ PointerGetDatum(dstsize));
+ }
+}
/*
* Forms union of oldtup and addtup, if union == oldtup then return NULL
@@ -252,15 +268,9 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
- GistEntryVector *evec;
- bool neednew = false;
- bool isnull[INDEX_MAX_KEYS];
- Datum attr[INDEX_MAX_KEYS];
- GISTENTRY centry[INDEX_MAX_KEYS],
- oldatt[INDEX_MAX_KEYS],
- addatt[INDEX_MAX_KEYS],
- *ev0p,
- *ev1p;
+ bool neednew = FALSE;
+ GISTENTRY oldentries[INDEX_MAX_KEYS],
+ addentries[INDEX_MAX_KEYS];
bool oldisnull[INDEX_MAX_KEYS],
addisnull[INDEX_MAX_KEYS];
IndexTuple newtup = NULL;
@@ -269,147 +279,83 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
- evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
- evec->n = 2;
- ev0p = &(evec->vector[0]);
- ev1p = &(evec->vector[1]);
-
-
gistDeCompressAtt(giststate, r, oldtup, NULL,
- (OffsetNumber) 0, oldatt, oldisnull);
+ (OffsetNumber) 0, oldentries, oldisnull);
gistDeCompressAtt(giststate, r, addtup, NULL,
- (OffsetNumber) 0, addatt, addisnull);
+ (OffsetNumber) 0, addentries, addisnull);
- for (i = 0; i < r->rd_att->natts; i++)
- {
- if (oldisnull[i] && addisnull[i])
- {
- attr[i] = (Datum) 0;
- isnull[i] = TRUE;
- }
- else
- {
- Datum datum;
- int datumsize;
+ for(i = 0; i < r->rd_att->natts; i++) {
+ gistMakeUnionKey( giststate, i,
+ oldentries + i, oldisnull[i],
+ addentries + i, addisnull[i],
+ attrS + i, attrsizeS + i, isnullS + i );
- FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
- addisnull[i], addatt[i].key, addatt[i].bytes);
+ if ( neednew )
+ /* we already need new key, so we can skip check */
+ continue;
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
+ if ( isnullS[i] )
+ /* union of key may be NULL if and only if both keys are NULL */
+ continue;
- if (oldisnull[i] || addisnull[i])
- {
- if (oldisnull[i])
- neednew = true;
- }
- else
- {
- bool result;
+ if ( !addisnull[i] ) {
+ if ( oldisnull[i] )
+ neednew = true;
+ else {
+ bool result;
FunctionCall3(&giststate->equalFn[i],
- ev0p->key,
- datum,
- PointerGetDatum(&result));
+ oldentries[i].key,
+ attrS[i],
+ PointerGetDatum(&result));
if (!result)
neednew = true;
}
-
- gistcentryinit(giststate, i, &centry[i], datum,
- NULL, NULL, (OffsetNumber) 0,
- datumsize, FALSE, FALSE);
-
- attr[i] = centry[i].key;
- isnull[i] = FALSE;
}
}
if (neednew)
{
/* need to update key */
- newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
+ newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
newtup->t_tid = oldtup->t_tid;
}
return newtup;
}
+/*
+ * Forms unions of subkeys after page split, but
+ * uses only tuples aren't in groups of equalent tuples
+ */
void
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
- GistSplitVec *gsvp, bool isall) {
- int i;
- GistEntryVector *evec;
-
- evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
+ GistSplitVec *gsvp, int startkey) {
+ IndexTuple *cleanedItVec;
+ int i, cleanedLen=0;
- for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
- {
- int j;
- Datum datum;
- int datumsize;
- int real_len;
+ cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len);
- real_len = 0;
- for (j = 0; j < gsvp->len; j++)
- {
- bool IsNull;
-
- if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
- continue;
-
- datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
- giststate->tupdesc, &IsNull);
- if (IsNull)
- continue;
- gistdentryinit(giststate, i,
- &(evec->vector[real_len]),
- datum,
- NULL, NULL, (OffsetNumber) 0,
- ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
- FALSE, IsNull);
- real_len++;
+ for(i=0;i<gsvp->len;i++) {
+ if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]])
+ continue;
- }
+ cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
+ }
- if (real_len == 0)
- {
- datum = (Datum) 0;
- datumsize = 0;
- gsvp->isnull[i] = true;
- }
- else
- {
- /*
- * evec->vector[0].bytes may be not defined, so form union
- * with itself
- */
- if (real_len == 1)
- {
- evec->n = 2;
- memcpy(&(evec->vector[1]), &(evec->vector[0]),
- sizeof(GISTENTRY));
- }
- else
- evec->n = real_len;
- datum = FunctionCall2(&giststate->unionFn[i],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
- gsvp->isnull[i] = false;
- }
+ gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
+ gsvp->attr, gsvp->isnull, gsvp->attrsize);
- gsvp->attr[i] = datum;
- gsvp->attrsize[i] = datumsize;
- }
+ pfree( cleanedItVec );
}
/*
- * unions subkey for after user picksplit over first column
+ * unions subkeys for after user picksplit over attno-1 column
*/
-static void
-gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno)
{
GistSplitVec gsvp;
@@ -421,7 +367,7 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
gsvp.entries = spl->spl_left;
gsvp.isnull = spl->spl_lisnull;
- gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+ gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
gsvp.attrsize = spl->spl_rattrsize;
gsvp.attr = spl->spl_rattr;
@@ -429,20 +375,20 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
gsvp.entries = spl->spl_right;
gsvp.isnull = spl->spl_risnull;
- gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+ gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
}
/*
* find group in vector with equal value
*/
-int
-gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
+static int
+gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno)
{
int i;
int curid = 1;
/*
- * first key is always not null (see gistinsert), so we may not check for
+ * attno key is always not null (see gistSplitByKey), so we may not check for
* nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
@@ -459,7 +405,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
if (spl->spl_idgrp[spl->spl_right[j]])
continue;
- FunctionCall3(&giststate->equalFn[0],
+ FunctionCall3(&giststate->equalFn[attno],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_right[j]].key,
PointerGetDatum(&result));
@@ -479,7 +425,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
if (spl->spl_idgrp[spl->spl_left[j]])
continue;
- FunctionCall3(&giststate->equalFn[0],
+ FunctionCall3(&giststate->equalFn[attno],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_left[j]].key,
PointerGetDatum(&result));
@@ -501,23 +447,20 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
-void
+static void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
int len,
GIST_SPLITVEC *v,
- GISTSTATE *giststate)
+ GISTSTATE *giststate,
+ int attno)
{
int curlen;
OffsetNumber *curwpos;
GISTENTRY entry,
- identry[INDEX_MAX_KEYS],
- *ev0p,
- *ev1p;
- float lpenalty,
- rpenalty;
- GistEntryVector *evec;
- int datumsize;
+ identry[INDEX_MAX_KEYS];
+ float lpenalty = 0,
+ rpenalty = 0;
bool isnull[INDEX_MAX_KEYS];
int i,
j;
@@ -551,16 +494,9 @@ gistadjsubkey(Relation r,
}
v->spl_nright = curlen;
- evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
- evec->n = 2;
- ev0p = &(evec->vector[0]);
- ev1p = &(evec->vector[1]);
-
/* add equivalent tuple */
for (i = 0; i < len; i++)
{
- Datum datum;
-
if (v->spl_idgrp[i + 1] == 0) /* already inserted */
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
@@ -577,17 +513,17 @@ gistadjsubkey(Relation r,
else
{
/* where? */
- for (j = 1; j < r->rd_att->natts; j++)
+ for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
- gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
- &identry[j], isnull[j], &lpenalty);
+ lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
+ &identry[j], isnull[j]);
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
- gistpenalty(giststate, j, &entry, v->spl_risnull[j],
- &identry[j], isnull[j], &rpenalty);
+ rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j],
+ &identry[j], isnull[j]);
if (lpenalty != rpenalty)
break;
@@ -600,55 +536,31 @@ gistadjsubkey(Relation r,
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
- v->spl_left[v->spl_nleft] = i + 1;
- v->spl_nleft++;
- for (j = 1; j < r->rd_att->natts; j++)
- {
- if (isnull[j] && v->spl_lisnull[j])
- {
- v->spl_lattr[j] = (Datum) 0;
- v->spl_lattrsize[j] = 0;
- }
- else
- {
- FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
- isnull[j], identry[j].key, identry[j].bytes);
+ v->spl_left[v->spl_nleft++] = i + 1;
- datum = FunctionCall2(&giststate->unionFn[j],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- v->spl_lattr[j] = datum;
- v->spl_lattrsize[j] = datumsize;
- v->spl_lisnull[j] = false;
- }
+ for (j = attno+1; j < r->rd_att->natts; j++)
+ {
+ gistentryinit(entry, v->spl_lattr[j], r, NULL,
+ (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
+ gistMakeUnionKey( giststate, j,
+ &entry, v->spl_lisnull[j],
+ identry + j, isnull[j],
+ v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j );
}
}
else
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
- v->spl_right[v->spl_nright] = i + 1;
- v->spl_nright++;
- for (j = 1; j < r->rd_att->natts; j++)
- {
- if (isnull[j] && v->spl_risnull[j])
- {
- v->spl_rattr[j] = (Datum) 0;
- v->spl_rattrsize[j] = 0;
- }
- else
- {
- FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
- isnull[j], identry[j].key, identry[j].bytes);
+ v->spl_right[v->spl_nright++] = i + 1;
- datum = FunctionCall2(&giststate->unionFn[j],
- PointerGetDatum(evec),
- PointerGetDatum(&datumsize));
-
- v->spl_rattr[j] = datum;
- v->spl_rattrsize[j] = datumsize;
- v->spl_risnull[j] = false;
- }
+ for (j = attno+1; j < r->rd_att->natts; j++)
+ {
+ gistentryinit(entry, v->spl_rattr[j], r, NULL,
+ (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
+ gistMakeUnionKey( giststate, j,
+ &entry, v->spl_risnull[j],
+ identry + j, isnull[j],
+ v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j );
}
}
}
@@ -702,8 +614,8 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
gistdentryinit(giststate, j, &entry, datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
FALSE, IsNull);
- gistpenalty(giststate, j, &entry, IsNull,
- &identry[j], isnull[j], &usize);
+ usize = gistpenalty(giststate, j, &entry, IsNull,
+ &identry[j], isnull[j]);
if (which_grow[j] < 0 || usize < which_grow[j])
{
@@ -796,8 +708,10 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
else
{
gistcentryinit(giststate, i, &centry[i], attdata[i],
- NULL, NULL, (OffsetNumber) 0,
- datumsize[i], FALSE, FALSE);
+ r, NULL, (OffsetNumber) 0,
+ (datumsize) ? datumsize[i] : -1,
+ (datumsize) ? FALSE : TRUE,
+ FALSE);
compatt[i] = centry[i].key;
}
}
@@ -824,29 +738,35 @@ gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
}
}
-static void
+static float
gistpenalty(GISTSTATE *giststate, int attno,
- GISTENTRY *key1, bool isNull1,
- GISTENTRY *key2, bool isNull2, float *penalty)
+ GISTENTRY *orig, bool isNullOrig,
+ GISTENTRY *add, bool isNullAdd)
{
- if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
- *penalty = 0.0;
- else
+ float penalty = 0.0;
+
+ if ( giststate->penaltyFn[attno].fn_strict==FALSE || ( isNullOrig == FALSE && isNullAdd == FALSE ) )
FunctionCall3(&giststate->penaltyFn[attno],
- PointerGetDatum(key1),
- PointerGetDatum(key2),
- PointerGetDatum(penalty));
+ PointerGetDatum(orig),
+ PointerGetDatum(add),
+ PointerGetDatum(&penalty));
+ else if ( isNullOrig && isNullAdd )
+ penalty = 0.0;
+ else
+ penalty = 1e10; /* try to prevent to mix null and non-null value */
+
+ return penalty;
}
void
-gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate)
{
/*
* now let the user-defined picksplit function set up the split vector; in
* entryvec have no null value!!
*/
- FunctionCall2(&giststate->picksplitFn[0],
+ FunctionCall2(&giststate->picksplitFn[attno],
PointerGetDatum(entryvec),
PointerGetDatum(v));
@@ -856,16 +776,16 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber)
v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
- v->spl_lattr[0] = v->spl_ldatum;
- v->spl_rattr[0] = v->spl_rdatum;
- v->spl_lisnull[0] = false;
- v->spl_risnull[0] = false;
+ v->spl_lattr[attno] = v->spl_ldatum;
+ v->spl_rattr[attno] = v->spl_rdatum;
+ v->spl_lisnull[attno] = false;
+ v->spl_risnull[attno] = false;
/*
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
- if (giststate->tupdesc->natts > 1)
+ if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts)
{
int MaxGrpId;
@@ -873,17 +793,17 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n);
v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n);
- MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
+ MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno);
/* form union of sub keys for each page (l,p) */
- gistunionsubkey(giststate, itup, v);
+ gistunionsubkey(giststate, itup, v, attno + 1);
/*
* if possible, we insert equivalent tuples with control by penalty
* for a subkey(s)
*/
if (MaxGrpId > 1)
- gistadjsubkey(r, itup, len, v, giststate);
+ gistadjsubkey(r, itup, len, v, giststate, attno);
}
}
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index f08d49dbf90..43a6f62943a 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.16 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -206,17 +206,6 @@ typedef struct
/* root page of a gist index */
#define GIST_ROOT_BLKNO 0
-/*
- * When we update a relation on which we're doing a scan, we need to
- * check the scan and fix it if the update affected any of the pages
- * it touches. Otherwise, we can miss records that we should see.
- * The only times we need to do this are for deletions and splits. See
- * the code in gistscan.c for how the scan is fixed. These two
- * constants tell us what sort of operation changed the index.
- */
-#define GISTOP_DEL 0
-/* #define GISTOP_SPLIT 1 */
-
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
(isnull) ? 0 : \
@@ -291,12 +280,6 @@ extern IndexTuple gistgetadjusted(Relation r,
IndexTuple oldtup,
IndexTuple addtup,
GISTSTATE *giststate);
-extern int gistfindgroup(GISTSTATE *giststate,
- GISTENTRY *valvec, GIST_SPLITVEC *spl);
-extern void gistadjsubkey(Relation r,
- IndexTuple *itup, int len,
- GIST_SPLITVEC *v,
- GISTSTATE *giststate);
extern IndexTuple gistFormTuple(GISTSTATE *giststate,
Relation r, Datum *attdata, int *datumsize, bool *isnull);
@@ -321,13 +304,15 @@ typedef struct {
} GistSplitVec;
extern void gistunionsubkeyvec(GISTSTATE *giststate,
- IndexTuple *itvec, GistSplitVec *gsvp, bool isall);
+ IndexTuple *itvec, GistSplitVec *gsvp, int startkey);
+extern void gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec,
+ GIST_SPLITVEC *spl, int attno);
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull);
-void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+void gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate);
/* gistvacuum.c */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 17ce73d015f..06f0a125bfd 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -37,7 +37,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.333 2006/05/19 19:08:26 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.334 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 200605191
+#define CATALOG_VERSION_NO 200605241
#endif
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 07a74c892d1..141b42e02d7 100644
--- a/src/include/catalog/pg_am.h
+++ b/src/include/catalog/pg_am.h
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.42 2006/05/02 22:25:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.43 2006/05/24 11:01:39 teodor Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
@@ -114,7 +114,7 @@ DESCR("b-tree index access method");
DATA(insert OID = 405 ( hash 1 1 0 f f f f f t f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate ));
DESCR("hash index access method");
#define HASH_AM_OID 405
-DATA(insert OID = 783 ( gist 100 7 0 f t f f t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
+DATA(insert OID = 783 ( gist 100 7 0 f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
DATA(insert OID = 2742 ( gin 100 4 0 f f f f t t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ));