diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2006-06-28 12:00:14 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2006-06-28 12:00:14 +0000 |
commit | 1f7ef548ec2e594fa8766781c490fb5b998ea46b (patch) | |
tree | a552894bd93658d85c7136d00042c4b05e19a9a6 /src/backend/access/gist | |
parent | a1dc5c60bcd4c458e160bf0e355bed083a1cab57 (diff) | |
download | postgresql-1f7ef548ec2e594fa8766781c490fb5b998ea46b.tar.gz postgresql-1f7ef548ec2e594fa8766781c490fb5b998ea46b.zip |
Changes
* new split algorithm (as proposed in http://archives.postgresql.org/pgsql-hackers/2006-06/msg00254.php)
* possible call pickSplit() for second and below columns
* add spl_(l|r)datum_exists to GIST_SPLITVEC -
pickSplit should check its values to use already defined
spl_(l|r)datum for splitting. pickSplit should set
spl_(l|r)datum_exists to 'false' (if they was 'true') to
signal to caller about using spl_(l|r)datum.
* support for old pickSplit(): not very optimal
but correct split
* remove 'bytes' field from GISTENTRY: in any case size of
value is defined by it's type.
* split GIST_SPLITVEC to two structures: one for using in picksplit
and second - for internal use.
* some code refactoring
* support of subsplit to rtree opclasses
TODO: add support of subsplit to contrib modules
Diffstat (limited to 'src/backend/access/gist')
-rw-r--r-- | src/backend/access/gist/Makefile | 4 | ||||
-rw-r--r-- | src/backend/access/gist/gist.c | 182 | ||||
-rw-r--r-- | src/backend/access/gist/gistget.c | 3 | ||||
-rw-r--r-- | src/backend/access/gist/gistproc.c | 139 | ||||
-rw-r--r-- | src/backend/access/gist/gistutil.c | 421 |
5 files changed, 185 insertions, 564 deletions
diff --git a/src/backend/access/gist/Makefile b/src/backend/access/gist/Makefile index 10b091e1706..174185c80b1 100644 --- a/src/backend/access/gist/Makefile +++ b/src/backend/access/gist/Makefile @@ -4,7 +4,7 @@ # Makefile for access/gist # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.15 2005/07/01 19:19:02 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.16 2006/06/28 12:00:13 teodor Exp $ # #------------------------------------------------------------------------- @@ -13,7 +13,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o \ - gistproc.o + gistproc.o gistsplit.o all: SUBSYS.o diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 326e87e7889..39ff702c3d1 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.138 2006/05/29 12:50:06 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.139 2006/06/28 12:00:14 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -187,7 +187,7 @@ gistbuildCallback(Relation index, /* form an index tuple and point it at the heap tuple */ itup = gistFormTuple(&buildstate->giststate, index, - values, NULL /* size is currently bogus */, isnull); + values, isnull, true /* size is currently bogus */); itup->t_tid = htup->t_self; /* @@ -233,7 +233,7 @@ gistinsert(PG_FUNCTION_ARGS) initGISTstate(&giststate, r); itup = gistFormTuple(&giststate, r, - values, NULL /* size is currently bogus */, isnull); + values, isnull, true /* size is currently bogus */); itup->t_tid = *ht_ctid; gistdoinsert(r, itup, &giststate); @@ -900,150 +900,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) } /* - * simple split page - */ -static void -gistSplitHalf(GIST_SPLITVEC *v, int len) { - int i; - - v->spl_nright = v->spl_nleft = 0; - v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - for(i = 1; i <= len; i++) - if ( i<len/2 ) - v->spl_right[ v->spl_nright++ ] = i; - else - v->spl_left[ v->spl_nleft++ ] = i; -} - -/* - * if it was invalid tuple then we need special processing. - * We move all invalid tuples on right page. - * - * if there is no place on left page, gistSplit will be called one more - * time for left page. - * - * Normally, we never exec this code, but after crash replay it's possible - * to get 'invalid' tuples (probability is low enough) - */ -static void -gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) { - int i; - static OffsetNumber offInvTuples[ MaxOffsetNumber ]; - int nOffInvTuples = 0; - - for (i = 1; i <= len; i++) - if ( GistTupleIsInvalid(itup[i - 1]) ) - offInvTuples[ nOffInvTuples++ ] = i; - - if ( nOffInvTuples == len ) { - /* corner case, all tuples are invalid */ - v->spl_rightvalid= v->spl_leftvalid = false; - gistSplitHalf( v, len ); - } else { - GistSplitVec gsvp; - - v->spl_right = offInvTuples; - v->spl_nright = nOffInvTuples; - v->spl_rightvalid = false; - - v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->spl_nleft = 0; - for(i = 1; i <= len; i++) - if ( !GistTupleIsInvalid(itup[i - 1]) ) - v->spl_left[ v->spl_nleft++ ] = i; - v->spl_leftvalid = true; - - gsvp.idgrp = NULL; - gsvp.attrsize = v->spl_lattrsize; - gsvp.attr = v->spl_lattr; - gsvp.len = v->spl_nleft; - gsvp.entries = v->spl_left; - gsvp.isnull = v->spl_lisnull; - - gistunionsubkeyvec(giststate, itup, &gsvp, 0); - } -} - -/* - * trys to split page by attno key, in a case of null - * values move its to separate page. - */ -static void -gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate, - GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) { - int i; - static OffsetNumber offNullTuples[ MaxOffsetNumber ]; - int nOffNullTuples = 0; - - - for (i = 1; i <= len; i++) { - Datum datum; - bool IsNull; - - if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) { - gistSplitByInvalid(giststate, v, itup, len); - return; - } - - datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, attno, &(entryvec->vector[i]), - datum, r, page, i, - ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull), - FALSE, IsNull); - if ( IsNull ) - offNullTuples[ nOffNullTuples++ ] = i; - } - - v->spl_leftvalid = v->spl_rightvalid = true; - - if ( nOffNullTuples == len ) { - /* - * Corner case: All keys in attno column are null, we should try to - * by keys in next column. It all keys in all columns - * are NULL just split page half by half - */ - v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE; - if ( attno+1 == r->rd_att->natts ) - gistSplitHalf( v, len ); - else - gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1); - } else if ( nOffNullTuples > 0 ) { - int j=0; - - /* - * We don't want to mix NULLs and not-NULLs keys - * on one page, so move nulls to right page - */ - v->spl_right = offNullTuples; - v->spl_nright = nOffNullTuples; - v->spl_risnull[attno] = TRUE; - - v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->spl_nleft = 0; - for(i = 1; i <= len; i++) - if ( j<v->spl_nright && offNullTuples[j] == i ) - j++; - else - v->spl_left[ v->spl_nleft++ ] = i; - - v->spl_idgrp = NULL; - gistunionsubkey(giststate, itup, v, 0); - } else { - /* - * all keys are not-null - */ - if ( gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno+1 != r->rd_att->natts ) - /* - * Splitting on attno column is not optimized: unions of left and right - * page are the same, we will try to split page by - * following columns - */ - gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1); - } -} - -/* * gistSplit -- split a page in the tree and fill struct * used for XLOG and real writes buffers. Function is recursive, ie * it will split page until keys will fit in every page. @@ -1057,7 +913,7 @@ gistSplit(Relation r, { IndexTuple *lvectup, *rvectup; - GIST_SPLITVEC v; + GistSplitVector v; GistEntryVector *entryvec; int i; SplitedPageLayout *res = NULL; @@ -1066,6 +922,8 @@ gistSplit(Relation r, entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY)); entryvec->n = len + 1; + memset( v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts ); + memset( v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts ); gistSplitByKey(r, page, itup, len, giststate, &v, entryvec, 0); @@ -1073,31 +931,31 @@ gistSplit(Relation r, lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1)); rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1)); - for (i = 0; i < v.spl_nleft; i++) - lvectup[i] = itup[v.spl_left[i] - 1]; + for (i = 0; i < v.splitVector.spl_nleft; i++) + lvectup[i] = itup[v.splitVector.spl_left[i] - 1]; - for (i = 0; i < v.spl_nright; i++) - rvectup[i] = itup[v.spl_right[i] - 1]; + for (i = 0; i < v.splitVector.spl_nright; i++) + rvectup[i] = itup[v.splitVector.spl_right[i] - 1]; /* finalyze splitting (may need another split) */ - if (!gistfitpage(rvectup, v.spl_nright)) + if (!gistfitpage(rvectup, v.splitVector.spl_nright)) { - res = gistSplit(r, page, rvectup, v.spl_nright, giststate); + res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate); } else { ROTATEDIST(res); - res->block.num = v.spl_nright; - res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) ); - res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) + res->block.num = v.splitVector.spl_nright; + res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &( res->lenlist ) ); + res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } - if (!gistfitpage(lvectup, v.spl_nleft)) + if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) { SplitedPageLayout *resptr, *subres; - resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate); + resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate); /* install on list's tail */ while( resptr->next ) @@ -1109,9 +967,9 @@ gistSplit(Relation r, else { ROTATEDIST(res); - res->block.num = v.spl_nleft; - res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) ); - res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) + res->block.num = v.splitVector.spl_nleft; + res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &( res->lenlist ) ); + res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) : gist_form_invalid_tuple(GIST_ROOT_BLKNO); } diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index dcb8f2da54b..c69f2887b89 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.57 2006/05/24 11:01:39 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.58 2006/06/28 12:00:14 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -391,7 +391,6 @@ gistindex_keytest(IndexTuple tuple, gistdentryinit(giststate, key->sk_attno - 1, &de, datum, r, p, offset, - IndexTupleSize(tuple) - sizeof(IndexTupleData), FALSE, isNull); /* diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c index 8589ed0a8ff..0e3bd53d7db 100644 --- a/src/backend/access/gist/gistproc.c +++ b/src/backend/access/gist/gistproc.c @@ -10,7 +10,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.5 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.6 2006/06/28 12:00:14 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -112,6 +112,17 @@ gist_box_consistent(PG_FUNCTION_ARGS) strategy)); } +static void +adjustBox( BOX *b, BOX *addon ) { + if (b->high.x < addon->high.x) + b->high.x = addon->high.x; + if (b->low.x > addon->low.x) + b->low.x = addon->low.x; + if (b->high.y < addon->high.y) + b->high.y = addon->high.y; + if (b->low.y > addon->low.y) + b->low.y = addon->low.y; +} /* * The GiST Union method for boxes @@ -136,14 +147,7 @@ gist_box_union(PG_FUNCTION_ARGS) for (i = 1; i < numranges; i++) { cur = DatumGetBoxP(entryvec->vector[i].key); - if (pageunion->high.x < cur->high.x) - pageunion->high.x = cur->high.x; - if (pageunion->low.x > cur->low.x) - pageunion->low.x = cur->low.x; - if (pageunion->high.y < cur->high.y) - pageunion->high.y = cur->high.y; - if (pageunion->low.y > cur->low.y) - pageunion->low.y = cur->low.y; + adjustBox( pageunion, cur ); } *sizep = sizeof(BOX); @@ -206,6 +210,74 @@ compare_KB(const void *a, const void *b) return (sa > sb) ? 1 : -1; } +static void +chooseLR( GIST_SPLITVEC *v, + OffsetNumber *list1, int nlist1, BOX *union1, + OffsetNumber *list2, int nlist2, BOX *union2 ) +{ + bool firstToLeft = true; + + if ( v->spl_ldatum_exists || v->spl_rdatum_exists ) { + if ( v->spl_ldatum_exists && v->spl_rdatum_exists ) { + BOX LRl = *union1, LRr = *union2; + BOX RLl = *union2, RLr = *union1; + double sizeLR, sizeRL; + + adjustBox( &LRl, DatumGetBoxP( v->spl_ldatum ) ); + adjustBox( &LRr, DatumGetBoxP( v->spl_rdatum ) ); + adjustBox( &RLl, DatumGetBoxP( v->spl_ldatum ) ); + adjustBox( &RLr, DatumGetBoxP( v->spl_rdatum ) ); + + sizeLR = size_box( DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)) ); + sizeRL = size_box( DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)) ); + + if ( sizeLR > sizeRL ) + firstToLeft = false; + + } else { + float p1, p2; + GISTENTRY oldUnion, addon; + + gistentryinit(oldUnion, ( v->spl_ldatum_exists ) ? v->spl_ldatum : v->spl_rdatum, + NULL, NULL, InvalidOffsetNumber, FALSE); + + gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE); + DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&union1), PointerGetDatum(&p1)); + gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE); + DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&union2), PointerGetDatum(&p2)); + + if ( (v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2) ) + firstToLeft = false; + } + } + + if ( firstToLeft ) { + v->spl_left = list1; + v->spl_right = list2; + v->spl_nleft = nlist1; + v->spl_nright = nlist2; + if ( v->spl_ldatum_exists ) + adjustBox(union1, DatumGetBoxP( v->spl_ldatum ) ); + v->spl_ldatum = BoxPGetDatum(union1); + if ( v->spl_rdatum_exists ) + adjustBox(union2, DatumGetBoxP( v->spl_rdatum ) ); + v->spl_rdatum = BoxPGetDatum(union2); + } else { + v->spl_left = list2; + v->spl_right = list1; + v->spl_nleft = nlist2; + v->spl_nright = nlist1; + if ( v->spl_ldatum_exists ) + adjustBox(union2, DatumGetBoxP( v->spl_ldatum ) ); + v->spl_ldatum = BoxPGetDatum(union2); + if ( v->spl_rdatum_exists ) + adjustBox(union1, DatumGetBoxP( v->spl_rdatum ) ); + v->spl_rdatum = BoxPGetDatum(union1); + } + + v->spl_ldatum_exists = v->spl_rdatum_exists = false; +} + /* * The GiST PickSplit method * @@ -255,14 +327,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS) )) allisequal = false; - if (pageunion.high.x < cur->high.x) - pageunion.high.x = cur->high.x; - if (pageunion.low.x > cur->low.x) - pageunion.low.x = cur->low.x; - if (pageunion.high.y < cur->high.y) - pageunion.high.y = cur->high.y; - if (pageunion.low.y > cur->low.y) - pageunion.low.y = cur->low.y; + adjustBox( &pageunion, cur ); } nbytes = (maxoff + 2) * sizeof(OffsetNumber); @@ -294,9 +359,17 @@ gist_box_picksplit(PG_FUNCTION_ARGS) v->spl_nright++; } } + + if ( v->spl_ldatum_exists ) + adjustBox( unionL, DatumGetBoxP( v->spl_ldatum ) ); v->spl_ldatum = BoxPGetDatum(unionL); + + if ( v->spl_rdatum_exists ) + adjustBox( unionR, DatumGetBoxP( v->spl_rdatum ) ); v->spl_rdatum = BoxPGetDatum(unionR); + v->spl_ldatum_exists = v->spl_rdatum_exists = false; + PG_RETURN_POINTER(v); } } @@ -399,23 +472,13 @@ gist_box_picksplit(PG_FUNCTION_ARGS) } if (direction == 'x') - { - v->spl_left = listL; - v->spl_right = listR; - v->spl_nleft = posL; - v->spl_nright = posR; - v->spl_ldatum = BoxPGetDatum(unionL); - v->spl_rdatum = BoxPGetDatum(unionR); - } - else - { - v->spl_left = listB; - v->spl_right = listT; - v->spl_nleft = posB; - v->spl_nright = posT; - v->spl_ldatum = BoxPGetDatum(unionB); - v->spl_rdatum = BoxPGetDatum(unionT); - } + chooseLR( v, + listL, posL, unionL, + listR, posR, unionR ); + else + chooseLR( v, + listB, posB, unionB, + listT, posT, unionT ); PG_RETURN_POINTER(v); } @@ -629,14 +692,14 @@ gist_poly_compress(PG_FUNCTION_ARGS) memcpy((void *) r, (void *) &(in->boundbox), sizeof(BOX)); gistentryinit(*retval, PointerGetDatum(r), entry->rel, entry->page, - entry->offset, sizeof(BOX), FALSE); + entry->offset, FALSE); } else { gistentryinit(*retval, (Datum) 0, entry->rel, entry->page, - entry->offset, 0, FALSE); + entry->offset, FALSE); } } else @@ -700,14 +763,14 @@ gist_circle_compress(PG_FUNCTION_ARGS) r->low.y = in->center.y - in->radius; gistentryinit(*retval, PointerGetDatum(r), entry->rel, entry->page, - entry->offset, sizeof(BOX), FALSE); + entry->offset, FALSE); } else { gistentryinit(*retval, (Datum) 0, entry->rel, entry->page, - entry->offset, 0, FALSE); + entry->offset, FALSE); } } else diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 987aed89f71..3be4fd31f58 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.15 2006/05/29 12:50:06 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.16 2006/06/28 12:00:14 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -21,23 +21,12 @@ #include "miscadmin.h" #include "storage/freespace.h" -/* group flags ( in gistadjsubkey ) */ -#define LEFT_ADDED 0x01 -#define RIGHT_ADDED 0x02 -#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) - -static float gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2); - - /* * static *S used for temrorary storage (saves stack and palloc() call) */ -static int attrsizeS[INDEX_MAX_KEYS]; -static Datum attrS[INDEX_MAX_KEYS]; -static bool isnullS[INDEX_MAX_KEYS]; +static Datum attrS[INDEX_MAX_KEYS]; +static bool isnullS[INDEX_MAX_KEYS]; /* * Write itup vector to page, has no control of free space @@ -156,24 +145,31 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { * invalid tuple. Resulting Datums aren't compressed. */ -static bool +bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, - Datum *attr, bool *isnull, int *attrsize ) { + Datum *attr, bool *isnull ) { int i; GistEntryVector *evec; + int attrsize; - evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + evec = (GistEntryVector *) palloc( ( len + 2 ) * sizeof(GISTENTRY) + GEVHDRSZ); for (i = startkey; i < giststate->tupdesc->natts; i++) { int j; evec->n = 0; + if ( !isnull[i] ) { + gistentryinit( evec->vector[evec->n], attr[i], + NULL, NULL, (OffsetNumber) 0, + FALSE); + evec->n++; + } for (j = 0; j < len; j++) { Datum datum; bool IsNull; - if (GistTupleIsInvalid(itvec[j])) + if (GistTupleIsInvalid(itvec[j])) return FALSE; /* signals that union with invalid tuple => result is invalid */ datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); @@ -184,7 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke evec->vector + evec->n, datum, NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), FALSE, IsNull); evec->n++; } @@ -192,7 +187,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke /* If this tuple vector was all NULLs, the union is NULL */ if ( evec->n == 0 ) { attr[i] = (Datum) 0; - attrsize[i] = (Datum) 0; isnull[i] = TRUE; } else { if (evec->n == 1) { @@ -203,7 +197,7 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke /* Make union and store in attr array */ attr[i] = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), - PointerGetDatum(attrsize + i)); + PointerGetDatum(&attrsize)); isnull[i] = FALSE; } @@ -219,20 +213,24 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { - if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) ) + memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); + + if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS ) ) return gist_form_invalid_tuple(InvalidBlockNumber); - return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); + return gistFormTuple(giststate, r, attrS, isnullS, false); } /* * makes union of two key */ -static void +void gistMakeUnionKey( GISTSTATE *giststate, int attno, GISTENTRY *entry1, bool isnull1, GISTENTRY *entry2, bool isnull2, - Datum *dst, int *dstsize, bool *dstisnull ) { + Datum *dst, bool *dstisnull ) { + + int dstsize; static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ]; GistEntryVector *evec = (GistEntryVector*)storage; @@ -242,7 +240,6 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno, if ( isnull1 && isnull2 ) { *dstisnull = TRUE; *dst = (Datum)0; - *dstsize = 0; } else { if ( isnull1 == FALSE && isnull2 == FALSE ) { evec->vector[0] = *entry1; @@ -258,11 +255,11 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno, *dstisnull = FALSE; *dst = FunctionCall2(&giststate->unionFn[attno], PointerGetDatum(evec), - PointerGetDatum(dstsize)); + PointerGetDatum(&dstsize)); } } -static bool +bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) { bool result; @@ -273,6 +270,25 @@ gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) { } /* + * Decompress all keys in tuple + */ +void +gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, + OffsetNumber o, GISTENTRY *attdata, bool *isnull) +{ + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); + + gistdentryinit(giststate, i, &attdata[i], + datum, r, p, o, + FALSE, isnull[i]); + } +} + +/* * Forms union of oldtup and addtup, if union == oldtup then return NULL */ IndexTuple @@ -299,7 +315,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis gistMakeUnionKey( giststate, i, oldentries + i, oldisnull[i], addentries + i, addisnull[i], - attrS + i, attrsizeS + i, isnullS + i ); + attrS + i, isnullS + i ); if ( neednew ) /* we already need new key, so we can skip check */ @@ -318,7 +334,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis if (neednew) { /* need to update key */ - newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); + newtup = gistFormTuple(giststate, r, attrS, isnullS, false); newtup->t_tid = oldtup->t_tid; } @@ -326,237 +342,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis } /* - * Forms unions of subkeys after page split, but - * uses only tuples aren't in groups of equalent tuples - */ -void -gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec, - GistSplitVec *gsvp, int startkey) { - IndexTuple *cleanedItVec; - int i, cleanedLen=0; - - cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len); - - for(i=0;i<gsvp->len;i++) { - if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]]) - continue; - - cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1]; - } - - gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, - gsvp->attr, gsvp->isnull, gsvp->attrsize); - - pfree( cleanedItVec ); -} - -/* - * unions subkeys for after user picksplit over attno-1 column - */ -void -gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno) -{ - GistSplitVec gsvp; - - gsvp.idgrp = spl->spl_idgrp; - - gsvp.attrsize = spl->spl_lattrsize; - gsvp.attr = spl->spl_lattr; - gsvp.len = spl->spl_nleft; - gsvp.entries = spl->spl_left; - gsvp.isnull = spl->spl_lisnull; - - gistunionsubkeyvec(giststate, itvec, &gsvp, attno); - - gsvp.attrsize = spl->spl_rattrsize; - gsvp.attr = spl->spl_rattr; - gsvp.len = spl->spl_nright; - gsvp.entries = spl->spl_right; - gsvp.isnull = spl->spl_risnull; - - gistunionsubkeyvec(giststate, itvec, &gsvp, attno); -} - -/* - * find group in vector with equal value - */ -static int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno) -{ - int i; - int curid = 1; - - /* - * attno key is always not null (see gistSplitByKey), so we may not check for - * nulls - */ - for (i = 0; i < spl->spl_nleft; i++) - { - int j; - int len; - - if (spl->spl_idgrp[spl->spl_left[i]]) - continue; - len = 0; - /* find all equal value in right part */ - for (j = 0; j < spl->spl_nright; j++) - { - if (spl->spl_idgrp[spl->spl_right[j]]) - continue; - if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_right[j]].key)) - { - spl->spl_idgrp[spl->spl_right[j]] = curid; - len++; - } - } - /* find all other equal value in left part */ - if (len) - { - /* add current val to list of equal values */ - spl->spl_idgrp[spl->spl_left[i]] = curid; - /* searching .. */ - for (j = i + 1; j < spl->spl_nleft; j++) - { - if (spl->spl_idgrp[spl->spl_left[j]]) - continue; - if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_left[j]].key)) - { - spl->spl_idgrp[spl->spl_left[j]] = curid; - len++; - } - } - spl->spl_ngrp[curid] = len + 1; - curid++; - } - } - - return curid; -} - -/* - * Insert equivalent tuples to left or right page with minimum - * penalty - */ -static void -gistadjsubkey(Relation r, - IndexTuple *itup, /* contains compressed entry */ - int len, - GIST_SPLITVEC *v, - GISTSTATE *giststate, - int attno) -{ - int curlen; - OffsetNumber *curwpos; - GISTENTRY entry, - identry[INDEX_MAX_KEYS]; - float lpenalty = 0, - rpenalty = 0; - bool isnull[INDEX_MAX_KEYS]; - int i, - j; - - /* clear vectors */ - curlen = v->spl_nleft; - curwpos = v->spl_left; - for (i = 0; i < v->spl_nleft; i++) - { - if (v->spl_idgrp[v->spl_left[i]] == 0) - { - *curwpos = v->spl_left[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nleft = curlen; - - curlen = v->spl_nright; - curwpos = v->spl_right; - for (i = 0; i < v->spl_nright; i++) - { - if (v->spl_idgrp[v->spl_right[i]] == 0) - { - *curwpos = v->spl_right[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nright = curlen; - - /* add equivalent tuple */ - for (i = 0; i < len; i++) - { - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ - continue; - gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, isnull); - - v->spl_ngrp[v->spl_idgrp[i + 1]]--; - if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) - { - /* force last in group */ - rpenalty = 1.0; - lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; - } - else - { - /* where? */ - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j]); - - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j]); - - if (lpenalty != rpenalty) - break; - } - } - - /* - * add XXX: refactor this to avoid duplicating code - */ - if (lpenalty < rpenalty) - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft++] = i + 1; - - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistMakeUnionKey( giststate, j, - &entry, v->spl_lisnull[j], - identry + j, isnull[j], - v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j ); - } - } - else - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright++] = i + 1; - - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistMakeUnionKey( giststate, j, - &entry, v->spl_risnull[j], - identry + j, isnull[j], - v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j ); - } - } - } -} - -/* * find entry with lowest penalty */ OffsetNumber @@ -580,6 +365,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ it, NULL, (OffsetNumber) 0, identry, isnull); + Assert( maxoff >= FirstOffsetNumber ); + Assert( !GistPageIsLeaf(p) ); + for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { int j; @@ -602,7 +390,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, j, &entry, datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); usize = gistpenalty(giststate, j, &entry, IsNull, &identry[j], isnull[j]); @@ -637,23 +424,23 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull) + bool l, bool isNull) { - if (b && !isNull) + if (!isNull) { GISTENTRY *dep; - gistentryinit(*e, k, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, l); dep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], PointerGetDatum(e))); /* decompressFn may just return the given pointer */ if (dep != e) gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, - dep->bytes, dep->leafkey); + dep->leafkey); } else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); + gistentryinit(*e, (Datum) 0, r, pg, o, l); } @@ -663,28 +450,28 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, void gistcentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l, bool isNull) + Page pg, OffsetNumber o, bool l, bool isNull) { if (!isNull) { GISTENTRY *cep; - gistentryinit(*e, k, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, l); cep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], PointerGetDatum(e))); /* compressFn may just return the given pointer */ if (cep != e) gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); + cep->leafkey); } else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); + gistentryinit(*e, (Datum) 0, r, pg, o, l); } IndexTuple gistFormTuple(GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[], bool isnull[]) + Datum attdata[], bool isnull[], bool newValues) { GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; @@ -699,8 +486,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r, { gistcentryinit(giststate, i, ¢ry[i], attdata[i], r, NULL, (OffsetNumber) 0, - (datumsize) ? datumsize[i] : -1, - (datumsize) ? FALSE : TRUE, + newValues, FALSE); compatt[i] = centry[i].key; } @@ -711,24 +497,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r, return res; } -void -gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *isnull) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); - - gistdentryinit(giststate, i, &attdata[i], - datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), - FALSE, isnull[i]); - } -} - -static float +float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *orig, bool isNullOrig, GISTENTRY *add, bool isNullAdd) @@ -749,74 +518,6 @@ gistpenalty(GISTSTATE *giststate, int attno, } /* - * Calls user picksplit method for attno columns to split vector to - * two vectors. May use attno+n columns data to - * get better split. - * Returns TRUE if left and right unions of attno columns are the same, - * so caller may find better split - */ -bool -gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v, - IndexTuple *itup, int len, GISTSTATE *giststate) -{ - /* - * now let the user-defined picksplit function set up the split vector; in - * entryvec have no null value!! - */ - FunctionCall2(&giststate->picksplitFn[attno], - PointerGetDatum(entryvec), - PointerGetDatum(v)); - - /* compatibility with old code */ - if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber) - v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1); - if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber) - v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); - - v->spl_lattr[attno] = v->spl_ldatum; - v->spl_rattr[attno] = v->spl_rdatum; - v->spl_lisnull[attno] = false; - v->spl_risnull[attno] = false; - - /* - * if index is multikey, then we must to try get smaller bounding box for - * subkey(s) - */ - if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts) - { - if ( gistKeyIsEQ(giststate, attno, v->spl_ldatum, v->spl_rdatum) ) { - /* - * Left and right key's unions are equial, so - * we can get better split by following columns. Note, - * uninons for attno columns are already done. - */ - - return true; - } else { - int MaxGrpId; - - v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n); - v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n); - v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n); - - MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno); - - /* form union of sub keys for each page (l,p) */ - gistunionsubkey(giststate, itup, v, attno + 1); - - /* - * if possible, we insert equivalent tuples with control by penalty - * for a subkey(s) - */ - if (MaxGrpId > 1) - gistadjsubkey(r, itup, len, v, giststate, attno); - } - } - - return false; -} - -/* * Initialize a new index page */ void |