aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgdoinsert.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2012-03-11 16:29:04 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2012-03-11 16:29:59 -0400
commitc6a11b89e48dfb47b305cea405924333dabc20b6 (patch)
tree1ef16196fa824d0515789c59f34e46e829a43966 /src/backend/access/spgist/spgdoinsert.c
parentfc227a4e3b84f7bc243c4606780dde28aea257ee (diff)
downloadpostgresql-c6a11b89e48dfb47b305cea405924333dabc20b6.tar.gz
postgresql-c6a11b89e48dfb47b305cea405924333dabc20b6.zip
Teach SPGiST to store nulls and do whole-index scans.
This patch fixes the other major compatibility-breaking limitation of SPGiST, that it didn't store anything for null values of the indexed column, and so could not support whole-index scans or "x IS NULL" tests. The approach is to create a wholly separate search tree for the null entries, and use fixed "allTheSame" insertion and search rules when processing this tree, instead of calling the index opclass methods. This way the opclass methods do not need to worry about dealing with nulls. Catversion bump is for pg_am updates as well as the change in on-disk format of SPGiST indexes; there are some tweaks in SPGiST WAL records as well. Heavily rewritten version of a patch by Oleg Bartunov and Teodor Sigaev. (The original also stored nulls separately, but it reused GIN code to do so; which required undesirable compromises in the on-disk format, and would likely lead to bugs due to the GIN code being required to work in two very different contexts.)
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r--src/backend/access/spgist/spgdoinsert.c191
1 files changed, 131 insertions, 60 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 85704762a6f..5ddb6672c5c 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent,
*/
static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
- SPPageDesc *current, SPPageDesc *parent, bool isNew)
+ SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
XLogRecData rdata[4];
spgxlogAddLeaf xlrec;
@@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
xlrec.node = index->rd_node;
xlrec.blknoLeaf = current->blkno;
xlrec.newPage = isNew;
+ xlrec.storesNulls = isNulls;
/* these will be filled below as needed */
xlrec.offnumLeaf = InvalidOffsetNumber;
@@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
- current->blkno == SPGIST_HEAD_BLKNO)
+ SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
@@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state,
n = 0,
totalSize = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
@@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state,
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple)
+ SpGistLeafTuple newLeafTuple, bool isNulls)
{
int i,
nDelete,
@@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state,
}
/* Find a leaf page that will hold them */
- nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
+ nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
+ size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
@@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state,
xlrec.blknoDst = nblkno;
xlrec.nMoves = nDelete;
xlrec.replaceDead = replaceDead;
+ xlrec.storesNulls = isNulls;
xlrec.blknoParent = parent->blkno;
xlrec.offnumParent = parent->offnum;
@@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
* If so, randomly divide the tuples into several nodes (all with the same
* label) and return TRUE to select allTheSame mode for this inner tuple.
*
+ * (This code is also used to forcibly select allTheSame mode for nulls.)
+ *
* If we know that the leaf tuples wouldn't all fit on one page, then we
* exclude the last tuple (which is the incoming new tuple that forced a split)
* from the check to see if more than one node is used. The reason for this
@@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple, int level, bool isNew)
+ SpGistLeafTuple newLeafTuple,
+ int level, bool isNulls, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
@@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state,
* also, count up the amount of space that will be freed from current.
* (Note that in the non-root case, we won't actually delete the old
* tuples, only replace them with redirects or placeholders.)
+ *
+ * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
+ * page. For a pass-by-value data type we will fetch a word that must
+ * exist even though it may contain garbage (because of the fact that leaf
+ * tuples must have size at least SGDTSIZE). For a pass-by-reference type
+ * we are just computing a pointer that isn't going to get dereferenced.
+ * So it's not worth guarding the calls with isNulls checks.
*/
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/*
* We are splitting the root (which up to now is also a leaf page).
@@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state,
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
- /*
- * Perform split using user-defined method.
- */
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isNulls)
+ {
+ /*
+ * Perform split using user-defined method.
+ */
+ procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
- /*
- * Form new leaf tuples and count up the total space needed.
- */
- totalLeafSizes = 0;
- for (i = 0; i < in.nTuples; i++)
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ out.leafTupleDatums[i],
+ false);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
+ }
+ else
{
- newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
- out.leafTupleDatums[i]);
- totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ /*
+ * Perform dummy split that puts all tuples into one node.
+ * checkAllTheSame will override this and force allTheSame mode.
+ */
+ out.hasPrefix = false;
+ out.nNodes = 1;
+ out.nodeLabels = NULL;
+ out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
+
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ (Datum) 0,
+ true);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
}
/*
@@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state,
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
- bool isnull = (out.nodeLabels == NULL);
+ bool labelisnull = (out.nodeLabels == NULL);
- if (!isnull)
+ if (!labelisnull)
label = out.nodeLabels[i];
- nodes[i] = spgFormNodeTuple(state, label, isnull);
+ nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
@@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state,
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
- parent->blkno != SPGIST_HEAD_BLKNO &&
+ !SpGistBlockIsRoot(parent->blkno) &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
@@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state,
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
- GBUF_INNER_PARITY(parent->blkno + 1),
+ GBUF_INNER_PARITY(parent->blkno + 1) |
+ (isNulls ? GBUF_NULLS : 0),
innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
@@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state,
newInnerBuffer = InvalidBuffer;
}
- /*----------
+ /*
* Because a WAL record can't involve more than four buffers, we can
* only afford to deal with two leaf pages in each picksplit action,
* ie the current page and at most one other.
@@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state,
* If we are splitting the root page (turning it from a leaf page into an
* inner page), then no leaf tuples can go back to the current page; they
* must all go somewhere else.
- *----------
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
@@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state,
int curspace;
int newspace;
- newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
+ newLeafBuffer = SpGistGetBuffer(index,
+ GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
@@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state,
xlrec.blknoDest = InvalidBlockNumber;
xlrec.nDelete = 0;
xlrec.initSrc = isNew;
+ xlrec.storesNulls = isNulls;
leafdata = leafptr = (char *) palloc(totalLeafSizes);
@@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state,
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
{
/*
* Init buffer instead of deleting individual tuples, but only if
@@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state,
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
- SpGistInitBuffer(current->buffer, SPGIST_LEAF);
+ SpGistInitBuffer(current->buffer,
+ SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
xlrec.initSrc = true;
}
else if (isNew)
@@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state,
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
- Assert(current->blkno == SPGIST_HEAD_BLKNO);
+ Assert(SpGistBlockIsRoot(current->blkno));
Assert(redirectTuplePos == InvalidOffsetNumber);
- SpGistInitBuffer(current->buffer, 0);
+ SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
xlrec.blknoInner = current->blkno;
@@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state,
XLogRecData rdata[5];
spgxlogAddNode xlrec;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/* Construct new inner tuple with additional node */
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
@@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
@@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state,
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/*
* Construct new prefix tuple, containing a single node with the
* specified label. (We'll update the node's downlink to point to the
@@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO ||
+ if (SpGistBlockIsRoot(current->blkno) ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
@@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
*/
void
spgdoinsert(Relation index, SpGistState *state,
- ItemPointer heapPtr, Datum datum)
+ ItemPointer heapPtr, Datum datum, bool isnull)
{
int level = 0;
Datum leafDatum;
@@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state,
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
*/
- if (state->attType.attlen == -1)
+ if (!isnull && state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatum = datum;
@@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state,
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ else
+ leafSize = SGDTSIZE + sizeof(ItemIdData);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
@@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state,
RelationGetRelationName(index)),
errhint("Values larger than a buffer page cannot be indexed.")));
- /* Initialize "current" to the root page */
- current.blkno = SPGIST_HEAD_BLKNO;
+ /* Initialize "current" to the appropriate root page */
+ current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
@@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state,
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
- current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
- Min(leafSize,
- SPGIST_PAGE_CAPACITY),
- &isNew);
+ current.buffer =
+ SpGistGetBuffer(index,
+ GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+ Min(leafSize, SPGIST_PAGE_CAPACITY),
+ &isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
else if (parent.buffer == InvalidBuffer ||
@@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state,
}
current.page = BufferGetPage(current.buffer);
+ /* should not arrive at a page of the wrong type */
+ if (isnull ? !SpGistPageStoresNulls(current.page) :
+ SpGistPageStoresNulls(current.page))
+ elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+ current.blkno);
+
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
- leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
+ leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
/* it fits on page, so insert it and we're done */
addLeafTuple(index, state, leafTuple,
- &current, &parent, isNew);
+ &current, &parent, isnull, isNew);
break;
}
else if ((sizeToSplit =
@@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state,
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
- moveLeafs(index, state, &current, &parent, leafTuple);
+ moveLeafs(index, state, &current, &parent, leafTuple, isnull);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, &current, &parent,
- leafTuple, level, isNew))
+ leafTuple, level, isnull, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
@@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state,
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isnull)
+ {
+ /* use user-defined choose method */
+ procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
+ }
+ else
+ {
+ /* force "match" action (to insert to random subnode) */
+ out.resultType = spgMatchNode;
+ }
if (innerTuple->allTheSame)
{
@@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state,
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
- leafDatum = out.result.matchNode.restDatum;
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ {
+ leafDatum = out.result.matchNode.restDatum;
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ }
/*
* Loop around and attempt to insert the new leafDatum