/*------------------------------------------------------------------------- * * gistsplit.c * Split page algorithm * * * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.1 2006/06/28 12:08:35 teodor Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/gist_private.h" #include "access/gistscan.h" #include "access/heapam.h" #include "catalog/index.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "utils/memutils.h" typedef struct { Datum *attr; int len; OffsetNumber *entries; bool *isnull; bool *equiv; } GistSplitUnion; /* * Forms unions of subkeys after page split, but * uses only tuples aren't in groups of equalent tuples */ static void gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec, GistSplitUnion *gsvp, int startkey) { IndexTuple *cleanedItVec; int i, cleanedLen=0; cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len); for(i=0;ilen;i++) { if ( gsvp->equiv && gsvp->equiv[gsvp->entries[i]]) continue; cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1]; } gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, gsvp->attr, gsvp->isnull); pfree( cleanedItVec ); } /* * unions subkeys for after user picksplit over attno-1 column */ static void gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno) { GistSplitUnion gsvp; gsvp.equiv = spl->spl_equiv; gsvp.attr = spl->spl_lattr; gsvp.len = spl->splitVector.spl_nleft; gsvp.entries = spl->splitVector.spl_left; gsvp.isnull = spl->spl_lisnull; gistunionsubkeyvec(giststate, itvec, &gsvp, attno); gsvp.attr = spl->spl_rattr; gsvp.len = spl->splitVector.spl_nright; gsvp.entries = spl->splitVector.spl_right; gsvp.isnull = spl->spl_risnull; gistunionsubkeyvec(giststate, itvec, &gsvp, attno); } /* * find group in vector with equivalent value */ static int gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno) { int i; GISTENTRY entry; int len=0; /* * attno key is always not null (see gistSplitByKey), so we may not check for * nulls */ gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE); for (i = 0; i < spl->splitVector.spl_nleft; i++) { float penalty = gistpenalty(giststate, attno, &entry, false, &valvec[spl->splitVector.spl_left[i]], false); if ( penalty == 0.0 ) { spl->spl_equiv[spl->splitVector.spl_left[i]] = true; len++; } } gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE); for (i = 0; i < spl->splitVector.spl_nright; i++) { float penalty = gistpenalty(giststate, attno, &entry, false, &valvec[spl->splitVector.spl_right[i]], false); if ( penalty == 0.0 ) { spl->spl_equiv[spl->splitVector.spl_right[i]] = true; len++; } } return len; } static void cleanupOffsets( OffsetNumber *a, int *len, bool *equiv, int *LenEquiv ) { int curlen,i; OffsetNumber *curwpos; curlen = *len; curwpos = a; for (i = 0; i < *len; i++) { if ( equiv[ a[i] ] == FALSE ) { *curwpos = a[i]; curwpos++; } else { /* corner case: we shouldn't make void array */ if ( curlen==1 ) { equiv[ a[i] ] = FALSE; /* mark item as non-equivalent */ i--; /* redo the same */ *LenEquiv -= 1; continue; } else curlen--; } } *len = curlen; } static void placeOne( Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno ) { GISTENTRY identry[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; bool toLeft = true; gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull); for(;attnotupdesc->natts;attno++) { float lpenalty, rpenalty; GISTENTRY entry; gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE); lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry+attno, isnull[ attno ]); gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE); rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry+attno, isnull[ attno ]); if ( lpenalty != rpenalty ) { if ( lpenalty > rpenalty ) toLeft = false; break; } } if ( toLeft ) v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = off; else v->splitVector.spl_right[ v->splitVector.spl_nright++ ] = off; } #define SWAPVAR( s, d, t ) \ do { \ (t) = (s); \ (s) = (d); \ (d) = (t); \ } while(0) /* * adjust left and right unions according to splits by previous * split by firsts columns. This function is called only in case * when pickSplit doesn't support subspplit. */ static void supportSecondarySplit( Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR ) { bool leaveOnLeft = true, tmpBool; GISTENTRY entryL, entryR, entrySL, entrySR; gistentryinit(entryL, oldL, r, NULL, 0, FALSE); gistentryinit(entryR, oldR, r, NULL, 0, FALSE); gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0, FALSE); gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0, FALSE); if ( sv->spl_ldatum_exists && sv->spl_rdatum_exists ) { float penalty1, penalty2; penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) + gistpenalty(giststate, attno, &entryR, false, &entrySR, false); penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) + gistpenalty(giststate, attno, &entryR, false, &entrySL, false); if ( penalty1 > penalty2 ) leaveOnLeft = false; } else { GISTENTRY *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR; float penalty1, penalty2; /* * there is only one previously defined union, * so we just choose swap or not by lowest penalty */ penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false); penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false); if ( penalty1 < penalty2 ) leaveOnLeft = ( sv->spl_ldatum_exists ) ? true : false; else leaveOnLeft = ( sv->spl_rdatum_exists ) ? true : false; } if ( leaveOnLeft == false ) { /* * swap left and right */ OffsetNumber *off, noff; Datum datum; SWAPVAR( sv->spl_left, sv->spl_right, off ); SWAPVAR( sv->spl_nleft, sv->spl_nright, noff ); SWAPVAR( sv->spl_ldatum, sv->spl_rdatum, datum ); gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0, FALSE); gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0, FALSE); } if ( sv->spl_ldatum_exists ) gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false, &sv->spl_ldatum, &tmpBool); if ( sv->spl_rdatum_exists ) gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false, &sv->spl_rdatum, &tmpBool); sv->spl_ldatum_exists = sv->spl_rdatum_exists = false; } /* * Calls user picksplit method for attno columns to split vector to * two vectors. May use attno+n columns data to * get better split. * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same, * so caller may find better split * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved */ static bool gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v, IndexTuple *itup, int len, GISTSTATE *giststate) { GIST_SPLITVEC *sv = &v->splitVector; /* * now let the user-defined picksplit function set up the split vector; in * entryvec have no null value!! */ sv->spl_ldatum_exists = ( v->spl_lisnull[ attno ] ) ? false : true; sv->spl_rdatum_exists = ( v->spl_risnull[ attno ] ) ? false : true; sv->spl_ldatum = v->spl_lattr[attno]; sv->spl_rdatum = v->spl_rattr[attno]; FunctionCall2(&giststate->picksplitFn[attno], PointerGetDatum(entryvec), PointerGetDatum(sv)); /* compatibility with old code */ if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber) sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1); if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber) sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); if( sv->spl_ldatum_exists || sv->spl_rdatum_exists ) { elog(LOG,"PickSplit method of %d columns of index '%s' doesn't support secondary split", attno + 1, RelationGetRelationName(r) ); supportSecondarySplit( r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno] ); } v->spl_lattr[attno] = sv->spl_ldatum; v->spl_rattr[attno] = sv->spl_rdatum; v->spl_lisnull[attno] = false; v->spl_risnull[attno] = false; /* * if index is multikey, then we must to try get smaller bounding box for * subkey(s) */ v->spl_equiv = NULL; if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts) { if ( gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum) ) { /* * Left and right key's unions are equial, so * we can get better split by following columns. Note, * unions for attno columns are already done. */ return true; } else { int LenEquiv; v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n+1)); LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno); /* * if possible, we should distribute equivalent tuples */ if (LenEquiv == 0 ) { gistunionsubkey(giststate, itup, v, attno + 1); } else { cleanupOffsets( sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv ); cleanupOffsets( sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv ); gistunionsubkey(giststate, itup, v, attno + 1); if (LenEquiv == 1 ) { /* * In case with one tuple we just choose left-right * by penalty. It's simplify user-defined pickSplit */ OffsetNumber toMove = InvalidOffsetNumber; for(toMove=FirstOffsetNumber;toMoven;toMove++) if ( v->spl_equiv[ toMove ] ) break; Assert( toMove < entryvec->n ); placeOne( r, giststate, v, itup[ toMove-1 ], toMove, attno+1 ); /* redo gistunionsubkey(): it will not degradate performance, * because it's very rarely */ v->spl_equiv = NULL; gistunionsubkey(giststate, itup, v, attno + 1); return false; } else if ( LenEquiv > 1 ) return true; } } } return false; } /* * simple split page */ static void gistSplitHalf(GIST_SPLITVEC *v, int len) { int i; v->spl_nright = v->spl_nleft = 0; v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); for(i = 1; i <= len; i++) if ( ispl_right[ v->spl_nright++ ] = i; else v->spl_left[ v->spl_nleft++ ] = i; } /* * if it was invalid tuple then we need special processing. * We move all invalid tuples on right page. * * if there is no place on left page, gistSplit will be called one more * time for left page. * * Normally, we never exec this code, but after crash replay it's possible * to get 'invalid' tuples (probability is low enough) */ static void gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) { int i; static OffsetNumber offInvTuples[ MaxOffsetNumber ]; int nOffInvTuples = 0; for (i = 1; i <= len; i++) if ( GistTupleIsInvalid(itup[i - 1]) ) offInvTuples[ nOffInvTuples++ ] = i; if ( nOffInvTuples == len ) { /* corner case, all tuples are invalid */ v->spl_rightvalid= v->spl_leftvalid = false; gistSplitHalf( &v->splitVector, len ); } else { GistSplitUnion gsvp; v->splitVector.spl_right = offInvTuples; v->splitVector.spl_nright = nOffInvTuples; v->spl_rightvalid = false; v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); v->splitVector.spl_nleft = 0; for(i = 1; i <= len; i++) if ( !GistTupleIsInvalid(itup[i - 1]) ) v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i; v->spl_leftvalid = true; gsvp.equiv = NULL; gsvp.attr = v->spl_lattr; gsvp.len = v->splitVector.spl_nleft; gsvp.entries = v->splitVector.spl_left; gsvp.isnull = v->spl_lisnull; gistunionsubkeyvec(giststate, itup, &gsvp, 0); } } /* * trys to split page by attno key, in a case of null * values move its to separate page. */ void gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate, GistSplitVector *v, GistEntryVector *entryvec, int attno) { int i; static OffsetNumber offNullTuples[ MaxOffsetNumber ]; int nOffNullTuples = 0; for (i = 1; i <= len; i++) { Datum datum; bool IsNull; if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) { gistSplitByInvalid(giststate, v, itup, len); return; } datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, attno, &(entryvec->vector[i]), datum, r, page, i, FALSE, IsNull); if ( IsNull ) offNullTuples[ nOffNullTuples++ ] = i; } v->spl_leftvalid = v->spl_rightvalid = true; if ( nOffNullTuples == len ) { /* * Corner case: All keys in attno column are null, we should try to * split by keys in next column. It all keys in all columns * are NULL just split page half by half */ v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE; if ( attno+1 == r->rd_att->natts ) gistSplitHalf( &v->splitVector, len ); else gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1); } else if ( nOffNullTuples > 0 ) { int j=0; /* * We don't want to mix NULLs and not-NULLs keys * on one page, so move nulls to right page */ v->splitVector.spl_right = offNullTuples; v->splitVector.spl_nright = nOffNullTuples; v->spl_risnull[attno] = TRUE; v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); v->splitVector.spl_nleft = 0; for(i = 1; i <= len; i++) if ( jsplitVector.spl_nright && offNullTuples[j] == i ) j++; else v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i; v->spl_equiv = NULL; gistunionsubkey(giststate, itup, v, attno); } else { /* * all keys are not-null */ entryvec->n = len+1; if ( gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno+1 != r->rd_att->natts ) { /* * Splitting on attno column is not optimized: there is a tuples which can be freely * left or right page, we will try to split page by * following columns */ if ( v->spl_equiv == NULL ) { /* simple case: left and right keys for attno column are equial */ gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1); } else { /* we should clean up vector from already distributed tuples */ IndexTuple *newitup = (IndexTuple*)palloc((len + 1) * sizeof(IndexTuple)); OffsetNumber *map = (OffsetNumber*)palloc((len + 1) * sizeof(IndexTuple)); int newlen = 0; GIST_SPLITVEC backupSplit = v->splitVector; for(i=0; ispl_equiv[i+1] ) { map[ newlen ] = i+1; newitup[ newlen++ ] = itup[i]; } Assert( newlen>0 ); backupSplit.spl_left = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len); memcpy( backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber)*v->splitVector.spl_nleft); backupSplit.spl_right = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len); memcpy( backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber)*v->splitVector.spl_nright); gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno+1); /* merge result of subsplit */ for(i=0;isplitVector.spl_nleft;i++) backupSplit.spl_left[ backupSplit.spl_nleft++ ] = map[ v->splitVector.spl_left[i]-1 ]; for(i=0;isplitVector.spl_nright;i++) backupSplit.spl_right[ backupSplit.spl_nright++ ] = map[ v->splitVector.spl_right[i]-1 ]; v->splitVector = backupSplit; /* reunion left and right datums */ gistunionsubkey(giststate, itup, v, attno); } } } }