aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/rtree/rtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/rtree/rtree.c')
-rw-r--r--src/backend/access/rtree/rtree.c1298
1 files changed, 0 insertions, 1298 deletions
diff --git a/src/backend/access/rtree/rtree.c b/src/backend/access/rtree/rtree.c
deleted file mode 100644
index d684101d261..00000000000
--- a/src/backend/access/rtree/rtree.c
+++ /dev/null
@@ -1,1298 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * rtree.c
- * interface routines for the postgres rtree indexed access method.
- *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/rtree/rtree.c,v 1.92 2005/10/15 02:49:09 momjian Exp $
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/genam.h"
-#include "access/heapam.h"
-#include "access/rtree.h"
-#include "access/xlogutils.h"
-#include "catalog/index.h"
-#include "commands/vacuum.h"
-#include "executor/executor.h"
-#include "miscadmin.h"
-
-
-/*
- * XXX We assume that all datatypes indexable in rtrees are pass-by-reference.
- * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and
- * do something with the various datum-pfreeing code. However, it's not that
- * unreasonable an assumption in practice.
- */
-#define IndexTupleGetDatum(itup) \
- PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData))
-
-/*
- * Space-allocation macros. Note we count the item's line pointer in its size.
- */
-#define RTPageAvailSpace \
- (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \
- - MAXALIGN(sizeof(RTreePageOpaqueData)))
-#define IndexTupleTotalSize(itup) \
- (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData))
-#define IndexTupleAttSize(itup) \
- (IndexTupleSize(itup) - sizeof(IndexTupleData))
-
-/* results of rtpicksplit() */
-typedef struct SPLITVEC
-{
- OffsetNumber *spl_left;
- int spl_nleft;
- Datum spl_ldatum;
- OffsetNumber *spl_right;
- int spl_nright;
- Datum spl_rdatum;
-} SPLITVEC;
-
-/* for sorting tuples by cost, for picking split */
-typedef struct SPLITCOST
-{
- OffsetNumber offset_number;
- float cost_differential;
- bool choose_left;
-} SPLITCOST;
-
-typedef struct RTSTATE
-{
- FmgrInfo unionFn; /* union function */
- FmgrInfo sizeFn; /* size function */
- FmgrInfo interFn; /* intersection function */
-} RTSTATE;
-
-/* Working state for rtbuild and its callback */
-typedef struct
-{
- RTSTATE rtState;
- double indtuples;
-} RTBuildState;
-
-/* non-export function prototypes */
-static void rtbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state);
-static void rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate);
-static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size,
- RTSTATE *rtstate);
-static void rtdosplit(Relation r, Buffer buffer, RTSTACK *stack,
- IndexTuple itup, RTSTATE *rtstate);
-static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
- IndexTuple rtup, RTSTATE *rtstate);
-static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
-static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
- RTSTATE *rtstate);
-static void RTInitBuffer(Buffer b, uint32 f);
-static OffsetNumber choose(Relation r, Page p, IndexTuple it,
- RTSTATE *rtstate);
-static int nospace(Page p, IndexTuple it);
-static void initRtstate(RTSTATE *rtstate, Relation index);
-static int qsort_comp_splitcost(const void *a, const void *b);
-
-
-/*
- * routine to build an index. Basically calls insert over and over
- */
-Datum
-rtbuild(PG_FUNCTION_ARGS)
-{
- Relation heap = (Relation) PG_GETARG_POINTER(0);
- Relation index = (Relation) PG_GETARG_POINTER(1);
- IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
- double reltuples;
- RTBuildState buildstate;
- Buffer buffer;
-
- /* no locking is needed */
-
- initRtstate(&buildstate.rtState, index);
-
- /*
- * We expect to be called exactly once for any index relation. If that's
- * not the case, big trouble's what we have.
- */
- if (RelationGetNumberOfBlocks(index) != 0)
- elog(ERROR, "index \"%s\" already contains data",
- RelationGetRelationName(index));
-
- /* initialize the root page */
- buffer = ReadBuffer(index, P_NEW);
- RTInitBuffer(buffer, F_LEAF);
- WriteBuffer(buffer);
-
- /* build the index */
- buildstate.indtuples = 0;
-
- /* do the heap scan */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo,
- rtbuildCallback, (void *) &buildstate);
-
- /* okay, all heap tuples are indexed */
-
- /* since we just counted the # of tuples, may as well update stats */
- IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
-
- PG_RETURN_VOID();
-}
-
-/*
- * Per-tuple callback from IndexBuildHeapScan
- */
-static void
-rtbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state)
-{
- RTBuildState *buildstate = (RTBuildState *) state;
- IndexTuple itup;
-
- /* form an index tuple and point it at the heap tuple */
- itup = index_form_tuple(RelationGetDescr(index), values, isnull);
- itup->t_tid = htup->t_self;
-
- /* rtree indexes don't index nulls, see notes in rtinsert */
- if (IndexTupleHasNulls(itup))
- {
- pfree(itup);
- return;
- }
-
- /*
- * Since we already have the index relation locked, we call rtdoinsert
- * directly. Normal access method calls dispatch through rtinsert, which
- * locks the relation for write. This is the right thing to do if you're
- * inserting single tups, but not when you're initializing the whole index
- * at once.
- */
- rtdoinsert(index, itup, &buildstate->rtState);
-
- buildstate->indtuples += 1;
-
- pfree(itup);
-}
-
-/*
- * rtinsert -- wrapper for rtree tuple insertion.
- *
- * This is the public interface routine for tuple insertion in rtrees.
- * It doesn't do any work; just locks the relation and passes the buck.
- */
-Datum
-rtinsert(PG_FUNCTION_ARGS)
-{
- Relation r = (Relation) PG_GETARG_POINTER(0);
- Datum *values = (Datum *) PG_GETARG_POINTER(1);
- bool *isnull = (bool *) PG_GETARG_POINTER(2);
- ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
-
-#ifdef NOT_USED
- Relation heapRel = (Relation) PG_GETARG_POINTER(4);
- bool checkUnique = PG_GETARG_BOOL(5);
-#endif
- IndexTuple itup;
- RTSTATE rtState;
-
- /* generate an index tuple */
- itup = index_form_tuple(RelationGetDescr(r), values, isnull);
- itup->t_tid = *ht_ctid;
-
- /*
- * Currently, rtrees do not support indexing NULLs; considerable
- * infrastructure work would have to be done to do anything reasonable
- * with a NULL.
- */
- if (IndexTupleHasNulls(itup))
- {
- pfree(itup);
- PG_RETURN_BOOL(false);
- }
-
- initRtstate(&rtState, r);
-
- /*
- * Since rtree is not marked "amconcurrent" in pg_am, caller should have
- * acquired exclusive lock on index relation. We need no locking here.
- */
- rtdoinsert(r, itup, &rtState);
-
- PG_RETURN_BOOL(true);
-}
-
-static void
-rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
-{
- Page page;
- Buffer buffer;
- BlockNumber blk;
- IndexTuple which;
- OffsetNumber l;
- RTSTACK *stack;
- RTreePageOpaque opaque;
- Datum datum;
-
- blk = P_ROOT;
- buffer = InvalidBuffer;
- stack = NULL;
-
- do
- {
- /* release the current buffer, read in the next one */
- buffer = ReleaseAndReadBuffer(buffer, r, blk);
- page = (Page) BufferGetPage(buffer);
-
- opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
- if (!(opaque->flags & F_LEAF))
- {
- RTSTACK *n;
- ItemId iid;
-
- n = (RTSTACK *) palloc(sizeof(RTSTACK));
- n->rts_parent = stack;
- n->rts_blk = blk;
- n->rts_child = choose(r, page, itup, rtstate);
- stack = n;
-
- iid = PageGetItemId(page, n->rts_child);
- which = (IndexTuple) PageGetItem(page, iid);
- blk = ItemPointerGetBlockNumber(&(which->t_tid));
- }
- } while (!(opaque->flags & F_LEAF));
-
- if (nospace(page, itup))
- {
- /* need to do a split */
- rtdosplit(r, buffer, stack, itup, rtstate);
- freestack(stack);
- WriteBuffer(buffer); /* don't forget to release buffer! */
- return;
- }
-
- /* add the item and write the buffer */
- if (PageIsEmpty(page))
- {
- l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
- FirstOffsetNumber,
- LP_USED);
- }
- else
- {
- l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
- OffsetNumberNext(PageGetMaxOffsetNumber(page)),
- LP_USED);
- }
- if (l == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
-
- WriteBuffer(buffer);
-
- datum = IndexTupleGetDatum(itup);
-
- /* now expand the page boundary in the parent to include the new child */
- rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate);
- freestack(stack);
-}
-
-static void
-rttighten(Relation r,
- RTSTACK *stk,
- Datum datum,
- int att_size,
- RTSTATE *rtstate)
-{
- Datum oldud;
- Datum tdatum;
- Page p;
- float old_size,
- newd_size;
- Buffer b;
-
- if (stk == NULL)
- return;
-
- b = ReadBuffer(r, stk->rts_blk);
- p = BufferGetPage(b);
-
- oldud = IndexTupleGetDatum(PageGetItem(p,
- PageGetItemId(p, stk->rts_child)));
-
- FunctionCall2(&rtstate->sizeFn, oldud,
- PointerGetDatum(&old_size));
-
- datum = FunctionCall2(&rtstate->unionFn, oldud, datum);
-
- FunctionCall2(&rtstate->sizeFn, datum,
- PointerGetDatum(&newd_size));
-
- /*
- * If newd_size == 0 we have degenerate rectangles, so we don't know if
- * there was any change, so we have to assume there was.
- */
- if ((newd_size == 0) || (newd_size != old_size))
- {
- TupleDesc td = RelationGetDescr(r);
-
- if (td->attrs[0]->attlen < 0)
- {
- /*
- * This is an internal page, so 'oldud' had better be a union
- * (constant-length) key, too. (See comment below.)
- */
- Assert(VARSIZE(DatumGetPointer(datum)) ==
- VARSIZE(DatumGetPointer(oldud)));
- memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
- VARSIZE(DatumGetPointer(datum)));
- }
- else
- {
- memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
- att_size);
- }
- WriteBuffer(b);
-
- /*
- * The user may be defining an index on variable-sized data (like
- * polygons). If so, we need to get a constant-sized datum for
- * insertion on the internal page. We do this by calling the union
- * proc, which is required to return a rectangle.
- */
- tdatum = FunctionCall2(&rtstate->unionFn, datum, datum);
-
- rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
- pfree(DatumGetPointer(tdatum));
- }
- else
- ReleaseBuffer(b);
- pfree(DatumGetPointer(datum));
-}
-
-/*
- * rtdosplit -- split a page in the tree.
- *
- * rtpicksplit does the interesting work of choosing the split.
- * This routine just does the bit-pushing.
- */
-static void
-rtdosplit(Relation r,
- Buffer buffer,
- RTSTACK *stack,
- IndexTuple itup,
- RTSTATE *rtstate)
-{
- Page p;
- Buffer leftbuf,
- rightbuf;
- Page left,
- right;
- ItemId itemid;
- IndexTuple item;
- IndexTuple ltup,
- rtup;
- OffsetNumber maxoff;
- OffsetNumber i;
- OffsetNumber leftoff,
- rightoff;
- BlockNumber lbknum,
- rbknum;
- BlockNumber bufblock;
- RTreePageOpaque opaque;
- bool *isnull;
- SPLITVEC v;
- OffsetNumber *spl_left,
- *spl_right;
- TupleDesc tupDesc;
- int n;
- OffsetNumber newitemoff;
-
- p = (Page) BufferGetPage(buffer);
- opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
-
- rtpicksplit(r, p, &v, itup, rtstate);
-
- /*
- * The root of the tree is the first block in the relation. If we're
- * about to split the root, we need to do some hocus-pocus to enforce this
- * guarantee.
- */
-
- if (BufferGetBlockNumber(buffer) == P_ROOT)
- {
- leftbuf = ReadBuffer(r, P_NEW);
- RTInitBuffer(leftbuf, opaque->flags);
- lbknum = BufferGetBlockNumber(leftbuf);
- left = (Page) BufferGetPage(leftbuf);
- }
- else
- {
- leftbuf = buffer;
- IncrBufferRefCount(buffer);
- lbknum = BufferGetBlockNumber(buffer);
- left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
- }
-
- rightbuf = ReadBuffer(r, P_NEW);
- RTInitBuffer(rightbuf, opaque->flags);
- rbknum = BufferGetBlockNumber(rightbuf);
- right = (Page) BufferGetPage(rightbuf);
-
- spl_left = v.spl_left;
- spl_right = v.spl_right;
- leftoff = rightoff = FirstOffsetNumber;
- maxoff = PageGetMaxOffsetNumber(p);
- newitemoff = OffsetNumberNext(maxoff);
-
- /*
- * spl_left contains a list of the offset numbers of the tuples that will
- * go to the left page. For each offset number, get the tuple item, then
- * add the item to the left page. Similarly for the right side.
- */
-
- /* fill left node */
- for (n = 0; n < v.spl_nleft; n++)
- {
- i = *spl_left;
- if (i == newitemoff)
- item = itup;
- else
- {
- itemid = PageGetItemId(p, i);
- item = (IndexTuple) PageGetItem(p, itemid);
- }
-
- if (PageAddItem(left, (Item) item, IndexTupleSize(item),
- leftoff, LP_USED) == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- leftoff = OffsetNumberNext(leftoff);
-
- spl_left++; /* advance in left split vector */
- }
-
- /* fill right node */
- for (n = 0; n < v.spl_nright; n++)
- {
- i = *spl_right;
- if (i == newitemoff)
- item = itup;
- else
- {
- itemid = PageGetItemId(p, i);
- item = (IndexTuple) PageGetItem(p, itemid);
- }
-
- if (PageAddItem(right, (Item) item, IndexTupleSize(item),
- rightoff, LP_USED) == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- rightoff = OffsetNumberNext(rightoff);
-
- spl_right++; /* advance in right split vector */
- }
-
- /* Make sure we consumed all of the split vectors, and release 'em */
- Assert(*spl_left == InvalidOffsetNumber);
- Assert(*spl_right == InvalidOffsetNumber);
- pfree(v.spl_left);
- pfree(v.spl_right);
-
- if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
- PageRestoreTempPage(left, p);
- WriteBuffer(leftbuf);
- WriteBuffer(rightbuf);
-
- /*
- * Okay, the page is split. We have three things left to do:
- *
- * 1) Adjust any active scans on this index to cope with changes we
- * introduced in its structure by splitting this page.
- *
- * 2) "Tighten" the bounding box of the pointer to the left page in the
- * parent node in the tree, if any. Since we moved a bunch of stuff off
- * the left page, we expect it to get smaller. This happens in the
- * internal insertion routine.
- *
- * 3) Insert a pointer to the right page in the parent. This may cause the
- * parent to split. If it does, we need to repeat steps one and two for
- * each split node in the tree.
- */
-
- /* adjust active scans */
- rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
-
- tupDesc = r->rd_att;
- isnull = (bool *) palloc(r->rd_rel->relnatts * sizeof(bool));
- memset(isnull, false, r->rd_rel->relnatts * sizeof(bool));
-
- ltup = index_form_tuple(tupDesc, &(v.spl_ldatum), isnull);
- rtup = index_form_tuple(tupDesc, &(v.spl_rdatum), isnull);
-
- pfree(isnull);
- pfree(DatumGetPointer(v.spl_ldatum));
- pfree(DatumGetPointer(v.spl_rdatum));
-
- /* set pointers to new child pages in the internal index tuples */
- ItemPointerSet(&(ltup->t_tid), lbknum, 1);
- ItemPointerSet(&(rtup->t_tid), rbknum, 1);
-
- rtintinsert(r, stack, ltup, rtup, rtstate);
-
- pfree(ltup);
- pfree(rtup);
-}
-
-static void
-rtintinsert(Relation r,
- RTSTACK *stk,
- IndexTuple ltup,
- IndexTuple rtup,
- RTSTATE *rtstate)
-{
- IndexTuple old;
- Buffer b;
- Page p;
- Datum ldatum,
- rdatum,
- newdatum;
-
- if (stk == NULL)
- {
- rtnewroot(r, ltup, rtup);
- return;
- }
-
- b = ReadBuffer(r, stk->rts_blk);
- p = BufferGetPage(b);
- old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
-
- /*
- * This is a hack. Right now, we force rtree internal keys to be constant
- * size. To fix this, need delete the old key and add both left and right
- * for the two new pages. The insertion of left may force a split if the
- * new left key is bigger than the old key.
- */
-
- if (IndexTupleSize(old) != IndexTupleSize(ltup))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("variable-length rtree keys are not supported")));
-
- /* install pointer to left child */
- memmove(old, ltup, IndexTupleSize(ltup));
-
- if (nospace(p, rtup))
- {
- newdatum = IndexTupleGetDatum(ltup);
- rttighten(r, stk->rts_parent, newdatum,
- IndexTupleAttSize(ltup), rtstate);
- rtdosplit(r, b, stk->rts_parent, rtup, rtstate);
- WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
- }
- else
- {
- if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
- PageGetMaxOffsetNumber(p),
- LP_USED) == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- WriteBuffer(b);
- ldatum = IndexTupleGetDatum(ltup);
- rdatum = IndexTupleGetDatum(rtup);
- newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum);
-
- rttighten(r, stk->rts_parent, newdatum,
- IndexTupleAttSize(rtup), rtstate);
-
- pfree(DatumGetPointer(newdatum));
- }
-}
-
-static void
-rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
-{
- Buffer b;
- Page p;
-
- b = ReadBuffer(r, P_ROOT);
- RTInitBuffer(b, 0);
- p = BufferGetPage(b);
- if (PageAddItem(p, (Item) lt, IndexTupleSize(lt),
- FirstOffsetNumber,
- LP_USED) == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- if (PageAddItem(p, (Item) rt, IndexTupleSize(rt),
- OffsetNumberNext(FirstOffsetNumber),
- LP_USED) == InvalidOffsetNumber)
- elog(ERROR, "failed to add index item to \"%s\"",
- RelationGetRelationName(r));
- WriteBuffer(b);
-}
-
-/*
- * Choose how to split an rtree page into two pages.
- *
- * We return two vectors of index item numbers, one for the items to be
- * put on the left page, one for the items to be put on the right page.
- * In addition, the item to be added (itup) is listed in the appropriate
- * vector. It is represented by item number N+1 (N = # of items on page).
- *
- * Both vectors have a terminating sentinel value of InvalidOffsetNumber,
- * but the sentinal value is no longer used, because the SPLITVEC
- * vector also contains the length of each vector, and that information
- * is now used to iterate over them in rtdosplit(). --kbb, 21 Sept 2001
- *
- * The bounding-box datums for the two new pages are also returned in *v.
- *
- * This is the quadratic-cost split algorithm Guttman describes in
- * his paper. The reason we chose it is that you can implement this
- * with less information about the data types on which you're operating.
- *
- * We must also deal with a consideration not found in Guttman's algorithm:
- * variable-length data. In particular, the incoming item might be
- * large enough that not just any split will work. In the worst case,
- * our "split" may have to be the new item on one page and all the existing
- * items on the other. Short of that, we have to take care that we do not
- * make a split that leaves both pages too full for the new item.
- */
-static void
-rtpicksplit(Relation r,
- Page page,
- SPLITVEC *v,
- IndexTuple itup,
- RTSTATE *rtstate)
-{
- OffsetNumber maxoff,
- newitemoff;
- OffsetNumber i,
- j;
- IndexTuple item_1,
- item_2;
- Datum datum_alpha,
- datum_beta;
- Datum datum_l,
- datum_r;
- Datum union_d,
- union_dl,
- union_dr;
- Datum inter_d;
- bool firsttime;
- float size_alpha,
- size_beta,
- size_union,
- size_inter;
- float size_waste,
- waste;
- float size_l,
- size_r;
- int nbytes;
- OffsetNumber seed_1 = 0,
- seed_2 = 0;
- OffsetNumber *left,
- *right;
- Size newitemsz,
- item_1_sz,
- item_2_sz,
- left_avail_space,
- right_avail_space;
- int total_num_tuples,
- num_tuples_without_seeds,
- max_after_split; /* in Guttman's lingo, (M - m) */
- float diff; /* diff between cost of putting tuple left or
- * right */
- SPLITCOST *cost_vector;
- int n;
-
- /*
- * First, make sure the new item is not so large that we can't possibly
- * fit it on a page, even by itself. (It's sufficient to make this test
- * here, since any oversize tuple must lead to a page split attempt.)
- */
- newitemsz = IndexTupleTotalSize(itup);
- if (newitemsz > RTPageAvailSpace)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds rtree maximum, %lu",
- (unsigned long) newitemsz,
- (unsigned long) RTPageAvailSpace),
- errhint("Values larger than a buffer page cannot be indexed.")));
-
- maxoff = PageGetMaxOffsetNumber(page);
- newitemoff = OffsetNumberNext(maxoff); /* phony index for new item */
- total_num_tuples = newitemoff;
- num_tuples_without_seeds = total_num_tuples - 2;
- max_after_split = total_num_tuples / 2; /* works for m = M/2 */
-
- /* Make arrays big enough for worst case, including sentinel */
- nbytes = (maxoff + 2) * sizeof(OffsetNumber);
- v->spl_left = (OffsetNumber *) palloc(nbytes);
- v->spl_right = (OffsetNumber *) palloc(nbytes);
-
- firsttime = true;
- waste = 0.0;
-
- for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
- {
- item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- datum_alpha = IndexTupleGetDatum(item_1);
- item_1_sz = IndexTupleTotalSize(item_1);
-
- for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
- {
- item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
- datum_beta = IndexTupleGetDatum(item_2);
- item_2_sz = IndexTupleTotalSize(item_2);
-
- /*
- * Ignore seed pairs that don't leave room for the new item on
- * either split page.
- */
- if (newitemsz + item_1_sz > RTPageAvailSpace &&
- newitemsz + item_2_sz > RTPageAvailSpace)
- continue;
-
- /* compute the wasted space by unioning these guys */
- union_d = FunctionCall2(&rtstate->unionFn,
- datum_alpha, datum_beta);
- FunctionCall2(&rtstate->sizeFn, union_d,
- PointerGetDatum(&size_union));
- inter_d = FunctionCall2(&rtstate->interFn,
- datum_alpha, datum_beta);
-
- /*
- * The interFn may return a NULL pointer (not an SQL null!) to
- * indicate no intersection. sizeFn must cope with this.
- */
- FunctionCall2(&rtstate->sizeFn, inter_d,
- PointerGetDatum(&size_inter));
- size_waste = size_union - size_inter;
-
- if (DatumGetPointer(union_d) != NULL)
- pfree(DatumGetPointer(union_d));
- if (DatumGetPointer(inter_d) != NULL)
- pfree(DatumGetPointer(inter_d));
-
- /*
- * are these a more promising split that what we've already seen?
- */
- if (size_waste > waste || firsttime)
- {
- waste = size_waste;
- seed_1 = i;
- seed_2 = j;
- firsttime = false;
- }
- }
- }
-
- if (firsttime)
- {
- /*
- * There is no possible split except to put the new item on its own
- * page. Since we still have to compute the union rectangles, we play
- * dumb and run through the split algorithm anyway, setting seed_1 =
- * first item on page and seed_2 = new item.
- */
- seed_1 = FirstOffsetNumber;
- seed_2 = newitemoff;
- }
-
- item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
- datum_alpha = IndexTupleGetDatum(item_1);
- datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha);
- FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l));
- left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1);
-
- if (seed_2 == newitemoff)
- {
- item_2 = itup;
- /* Needn't leave room for new item in calculations below */
- newitemsz = 0;
- }
- else
- item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
- datum_beta = IndexTupleGetDatum(item_2);
- datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta);
- FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r));
- right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2);
-
- /*
- * Now split up the regions between the two seeds.
- *
- * The cost_vector array will contain hints for determining where each tuple
- * should go. Each record in the array will contain a boolean,
- * choose_left, that indicates which node the tuple prefers to be on, and
- * the absolute difference in cost between putting the tuple in its
- * favored node and in the other node.
- *
- * Later, we will sort the cost_vector in descending order by cost
- * difference, and consider the tuples in that order for placement. That
- * way, the tuples that *really* want to be in one node or the other get
- * to choose first, and the tuples that don't really care choose last.
- *
- * First, build the cost_vector array. The new index tuple will also be
- * handled in this loop, and represented in the array, with i==newitemoff.
- *
- * In the case of variable size tuples it is possible that we only have the
- * two seeds and no other tuples, in which case we don't do any of this
- * cost_vector stuff.
- */
-
- /* to keep compiler quiet */
- cost_vector = NULL;
-
- if (num_tuples_without_seeds > 0)
- {
- cost_vector =
- (SPLITCOST *) palloc(num_tuples_without_seeds * sizeof(SPLITCOST));
- n = 0;
- for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i))
- {
- /* Compute new union datums and sizes for both choices */
-
- if ((i == seed_1) || (i == seed_2))
- continue;
- else if (i == newitemoff)
- item_1 = itup;
- else
- item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-
- datum_alpha = IndexTupleGetDatum(item_1);
- union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
- union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
- FunctionCall2(&rtstate->sizeFn, union_dl,
- PointerGetDatum(&size_alpha));
- FunctionCall2(&rtstate->sizeFn, union_dr,
- PointerGetDatum(&size_beta));
- pfree(DatumGetPointer(union_dl));
- pfree(DatumGetPointer(union_dr));
-
- diff = (size_alpha - size_l) - (size_beta - size_r);
-
- cost_vector[n].offset_number = i;
- cost_vector[n].cost_differential = fabs(diff);
- cost_vector[n].choose_left = (diff < 0);
-
- n++;
- }
-
- /*
- * Sort the array. The function qsort_comp_splitcost is set up
- * "backwards", to provided descending order.
- */
- qsort(cost_vector, num_tuples_without_seeds, sizeof(SPLITCOST),
- &qsort_comp_splitcost);
- }
-
- /*
- * Now make the final decisions about where each tuple will go, and build
- * the vectors to return in the SPLITVEC record.
- *
- * The cost_vector array contains (descriptions of) all the tuples, in the
- * order that we want to consider them, so we we just iterate through it
- * and place each tuple in left or right nodes, according to the criteria
- * described below.
- */
-
- left = v->spl_left;
- v->spl_nleft = 0;
- right = v->spl_right;
- v->spl_nright = 0;
-
- /*
- * Place the seeds first. left avail space, left union, right avail space,
- * and right union have already been adjusted for the seeds.
- */
-
- *left++ = seed_1;
- v->spl_nleft++;
-
- *right++ = seed_2;
- v->spl_nright++;
-
- for (n = 0; n < num_tuples_without_seeds; n++)
- {
- bool left_feasible,
- right_feasible,
- choose_left;
-
- /*
- * We need to figure out which page needs the least enlargement in
- * order to store the item.
- */
-
- i = cost_vector[n].offset_number;
-
- /* Compute new union datums and sizes for both possible additions */
- if (i == newitemoff)
- {
- item_1 = itup;
- /* Needn't leave room for new item anymore */
- newitemsz = 0;
- }
- else
- item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- item_1_sz = IndexTupleTotalSize(item_1);
-
- datum_alpha = IndexTupleGetDatum(item_1);
- union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
- union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
- FunctionCall2(&rtstate->sizeFn, union_dl,
- PointerGetDatum(&size_alpha));
- FunctionCall2(&rtstate->sizeFn, union_dr,
- PointerGetDatum(&size_beta));
-
- /*
- * We prefer the page that shows smaller enlargement of its union area
- * (Guttman's algorithm), but we must take care that at least one page
- * will still have room for the new item after this one is added.
- *
- * (We know that all the old items together can fit on one page, so we
- * need not worry about any other problem than failing to fit the new
- * item.)
- *
- * Guttman's algorithm actually has two factors to consider (in order):
- * 1. if one node has so many tuples already assigned to it that the
- * other needs all the rest in order to satisfy the condition that
- * neither node has fewer than m tuples, then that is decisive; 2.
- * otherwise, choose the page that shows the smaller enlargement of
- * its union area.
- *
- * I have chosen m = M/2, where M is the maximum number of tuples on a
- * page. (Actually, this is only strictly true for fixed size tuples.
- * For variable size tuples, there still might have to be only one
- * tuple on a page, if it is really big. But even with variable size
- * tuples we still try to get m as close as possible to M/2.)
- *
- * The question of which page shows the smaller enlargement of its union
- * area has already been answered, and the answer stored in the
- * choose_left field of the SPLITCOST record.
- */
- left_feasible = (left_avail_space >= item_1_sz &&
- ((left_avail_space - item_1_sz) >= newitemsz ||
- right_avail_space >= newitemsz));
- right_feasible = (right_avail_space >= item_1_sz &&
- ((right_avail_space - item_1_sz) >= newitemsz ||
- left_avail_space >= newitemsz));
- if (left_feasible && right_feasible)
- {
- /*
- * Both feasible, use Guttman's algorithm. First check the m
- * condition described above, and if that doesn't apply, choose
- * the page with the smaller enlargement of its union area.
- */
- if (v->spl_nleft > max_after_split)
- choose_left = false;
- else if (v->spl_nright > max_after_split)
- choose_left = true;
- else
- choose_left = cost_vector[n].choose_left;
- }
- else if (left_feasible)
- choose_left = true;
- else if (right_feasible)
- choose_left = false;
- else
- {
- elog(ERROR, "failed to find a workable rtree page split");
- choose_left = false; /* keep compiler quiet */
- }
-
- if (choose_left)
- {
- pfree(DatumGetPointer(datum_l));
- pfree(DatumGetPointer(union_dr));
- datum_l = union_dl;
- size_l = size_alpha;
- left_avail_space -= item_1_sz;
- *left++ = i;
- v->spl_nleft++;
- }
- else
- {
- pfree(DatumGetPointer(datum_r));
- pfree(DatumGetPointer(union_dl));
- datum_r = union_dr;
- size_r = size_beta;
- right_avail_space -= item_1_sz;
- *right++ = i;
- v->spl_nright++;
- }
- }
-
- if (num_tuples_without_seeds > 0)
- pfree(cost_vector);
-
- *left = *right = InvalidOffsetNumber; /* add ending sentinels */
-
- v->spl_ldatum = datum_l;
- v->spl_rdatum = datum_r;
-}
-
-static void
-RTInitBuffer(Buffer b, uint32 f)
-{
- RTreePageOpaque opaque;
- Page page;
- Size pageSize;
-
- pageSize = BufferGetPageSize(b);
-
- page = BufferGetPage(b);
-
- PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
-
- opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
- opaque->flags = f;
-}
-
-static OffsetNumber
-choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
-{
- OffsetNumber maxoff;
- OffsetNumber i;
- Datum ud,
- id;
- Datum datum;
- float usize,
- dsize;
- OffsetNumber which;
- float which_grow;
-
- id = IndexTupleGetDatum(it);
- maxoff = PageGetMaxOffsetNumber(p);
- which_grow = -1.0;
- which = -1;
-
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- {
- datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i)));
- FunctionCall2(&rtstate->sizeFn, datum,
- PointerGetDatum(&dsize));
- ud = FunctionCall2(&rtstate->unionFn, datum, id);
- FunctionCall2(&rtstate->sizeFn, ud,
- PointerGetDatum(&usize));
- pfree(DatumGetPointer(ud));
- if (which_grow < 0 || usize - dsize < which_grow)
- {
- which = i;
- which_grow = usize - dsize;
- if (which_grow == 0)
- break;
- }
- }
-
- return which;
-}
-
-static int
-nospace(Page p, IndexTuple it)
-{
- return PageGetFreeSpace(p) < IndexTupleSize(it);
-}
-
-void
-freestack(RTSTACK *s)
-{
- RTSTACK *p;
-
- while (s != NULL)
- {
- p = s->rts_parent;
- pfree(s);
- s = p;
- }
-}
-
-/*
- * Bulk deletion of all index entries pointing to a set of heap tuples.
- * The set of target tuples is specified via a callback routine that tells
- * whether any given heap tuple (identified by ItemPointer) is being deleted.
- *
- * Result: a palloc'd struct containing statistical info for VACUUM displays.
- */
-Datum
-rtbulkdelete(PG_FUNCTION_ARGS)
-{
- Relation rel = (Relation) PG_GETARG_POINTER(0);
- IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
- void *callback_state = (void *) PG_GETARG_POINTER(2);
- IndexBulkDeleteResult *result;
- BlockNumber num_pages;
- double tuples_removed;
- double num_index_tuples;
- IndexScanDesc iscan;
-
- tuples_removed = 0;
- num_index_tuples = 0;
-
- /*
- * Since rtree is not marked "amconcurrent" in pg_am, caller should have
- * acquired exclusive lock on index relation. We need no locking here.
- */
-
- /*
- * XXX generic implementation --- should be improved!
- */
-
- /* walk through the entire index */
- iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL);
- /* including killed tuples */
- iscan->ignore_killed_tuples = false;
-
- while (index_getnext_indexitem(iscan, ForwardScanDirection))
- {
- vacuum_delay_point();
-
- if (callback(&iscan->xs_ctup.t_self, callback_state))
- {
- ItemPointerData indextup = iscan->currentItemData;
- BlockNumber blkno;
- OffsetNumber offnum;
- Buffer buf;
- Page page;
-
- blkno = ItemPointerGetBlockNumber(&indextup);
- offnum = ItemPointerGetOffsetNumber(&indextup);
-
- /* adjust any scans that will be affected by this deletion */
- /* (namely, my own scan) */
- rtadjscans(rel, RTOP_DEL, blkno, offnum);
-
- /* delete the index tuple */
- buf = ReadBuffer(rel, blkno);
- page = BufferGetPage(buf);
-
- PageIndexTupleDelete(page, offnum);
-
- WriteBuffer(buf);
-
- tuples_removed += 1;
- }
- else
- num_index_tuples += 1;
- }
-
- index_endscan(iscan);
-
- /* return statistics */
- num_pages = RelationGetNumberOfBlocks(rel);
-
- result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- result->num_pages = num_pages;
- result->num_index_tuples = num_index_tuples;
- result->tuples_removed = tuples_removed;
-
- PG_RETURN_POINTER(result);
-}
-
-
-static void
-initRtstate(RTSTATE *rtstate, Relation index)
-{
- fmgr_info_copy(&rtstate->unionFn,
- index_getprocinfo(index, 1, RT_UNION_PROC),
- CurrentMemoryContext);
- fmgr_info_copy(&rtstate->sizeFn,
- index_getprocinfo(index, 1, RT_SIZE_PROC),
- CurrentMemoryContext);
- fmgr_info_copy(&rtstate->interFn,
- index_getprocinfo(index, 1, RT_INTER_PROC),
- CurrentMemoryContext);
-}
-
-/* for sorting SPLITCOST records in descending order */
-static int
-qsort_comp_splitcost(const void *a, const void *b)
-{
- float diff =
- ((SPLITCOST *) a)->cost_differential -
- ((SPLITCOST *) b)->cost_differential;
-
- if (diff < 0)
- return 1;
- else if (diff > 0)
- return -1;
- else
- return 0;
-}
-
-#ifdef RTDEBUG
-
-void
-_rtdump(Relation r)
-{
- Buffer buf;
- Page page;
- OffsetNumber offnum,
- maxoff;
- BlockNumber blkno;
- BlockNumber nblocks;
- RTreePageOpaque po;
- IndexTuple itup;
- BlockNumber itblkno;
- OffsetNumber itoffno;
- Datum datum;
- char *itkey;
-
- nblocks = RelationGetNumberOfBlocks(r);
- for (blkno = 0; blkno < nblocks; blkno++)
- {
- buf = ReadBuffer(r, blkno);
- page = BufferGetPage(buf);
- po = (RTreePageOpaque) PageGetSpecialPointer(page);
- maxoff = PageGetMaxOffsetNumber(page);
- printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
- (po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
-
- if (PageIsEmpty(page))
- {
- ReleaseBuffer(buf);
- continue;
- }
-
- for (offnum = FirstOffsetNumber;
- offnum <= maxoff;
- offnum = OffsetNumberNext(offnum))
- {
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
- itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
- itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
- datum = IndexTupleGetDatum(itup);
- itkey = DatumGetCString(DirectFunctionCall1(box_out,
- datum));
- printf("\t[%d] size %d heap <%d,%d> key:%s\n",
- offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
- pfree(itkey);
- }
-
- ReleaseBuffer(buf);
- }
-}
-#endif /* defined RTDEBUG */
-
-void
-rtree_redo(XLogRecPtr lsn, XLogRecord *record)
-{
- elog(PANIC, "rtree_redo: unimplemented");
-}
-
-void
-rtree_desc(char *buf, uint8 xl_info, char *rec)
-{
-}