diff options
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 191 |
1 files changed, 131 insertions, 60 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 85704762a6f..5ddb6672c5c 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent, */ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, - SPPageDesc *current, SPPageDesc *parent, bool isNew) + SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) { XLogRecData rdata[4]; spgxlogAddLeaf xlrec; @@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, xlrec.node = index->rd_node; xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; + xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; @@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || - current->blkno == SPGIST_HEAD_BLKNO) + SpGistBlockIsRoot(current->blkno)) { /* Tuple is not part of a chain */ leafTuple->nextOffset = InvalidOffsetNumber; @@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state, n = 0, totalSize = 0; - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) { /* return impossible values to force split */ *nToSplit = BLCKSZ; @@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state, static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, - SpGistLeafTuple newLeafTuple) + SpGistLeafTuple newLeafTuple, bool isNulls) { int i, nDelete, @@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state, } /* Find a leaf page that will hold them */ - nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage); + nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), + size, &xlrec.newPage); npage = BufferGetPage(nbuf); nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); @@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state, xlrec.blknoDst = nblkno; xlrec.nMoves = nDelete; xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; @@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position, * If so, randomly divide the tuples into several nodes (all with the same * label) and return TRUE to select allTheSame mode for this inner tuple. * + * (This code is also used to forcibly select allTheSame mode for nulls.) + * * If we know that the leaf tuples wouldn't all fit on one page, then we * exclude the last tuple (which is the incoming new tuple that forced a split) * from the check to see if more than one node is used. The reason for this @@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, - SpGistLeafTuple newLeafTuple, int level, bool isNew) + SpGistLeafTuple newLeafTuple, + int level, bool isNulls, bool isNew) { bool insertedNew = false; spgPickSplitIn in; @@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state, * also, count up the amount of space that will be freed from current. * (Note that in the non-root case, we won't actually delete the old * tuples, only replace them with redirects or placeholders.) + * + * Note: the SGLTDATUM calls here are safe even when dealing with a nulls + * page. For a pass-by-value data type we will fetch a word that must + * exist even though it may contain garbage (because of the fact that leaf + * tuples must have size at least SGDTSIZE). For a pass-by-reference type + * we are just computing a pointer that isn't going to get dereferenced. + * So it's not worth guarding the calls with isNulls checks. */ nToInsert = 0; nToDelete = 0; spaceToDelete = 0; - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) { /* * We are splitting the root (which up to now is also a leaf page). @@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state, heapPtrs[in.nTuples] = newLeafTuple->heapPtr; in.nTuples++; - /* - * Perform split using user-defined method. - */ memset(&out, 0, sizeof(out)); - procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); + if (!isNulls) + { + /* + * Perform split using user-defined method. + */ + procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); - /* - * Form new leaf tuples and count up the total space needed. - */ - totalLeafSizes = 0; - for (i = 0; i < in.nTuples; i++) + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, + out.leafTupleDatums[i], + false); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } + } + else { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - out.leafTupleDatums[i]); - totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + /* + * Perform dummy split that puts all tuples into one node. + * checkAllTheSame will override this and force allTheSame mode. + */ + out.hasPrefix = false; + out.nNodes = 1; + out.nodeLabels = NULL; + out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples); + + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, + (Datum) 0, + true); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } } /* @@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state, for (i = 0; i < out.nNodes; i++) { Datum label = (Datum) 0; - bool isnull = (out.nodeLabels == NULL); + bool labelisnull = (out.nodeLabels == NULL); - if (!isnull) + if (!labelisnull) label = out.nodeLabels[i]; - nodes[i] = spgFormNodeTuple(state, label, isnull); + nodes[i] = spgFormNodeTuple(state, label, labelisnull); } innerTuple = spgFormInnerTuple(state, out.hasPrefix, out.prefixDatum, @@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state, */ xlrec.initInner = false; if (parent->buffer != InvalidBuffer && - parent->blkno != SPGIST_HEAD_BLKNO && + !SpGistBlockIsRoot(parent->blkno) && (SpGistPageGetFreeSpace(parent->page, 1) >= innerTuple->size + sizeof(ItemIdData))) { @@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state, { /* Send tuple to page with next triple parity (see README) */ newInnerBuffer = SpGistGetBuffer(index, - GBUF_INNER_PARITY(parent->blkno + 1), + GBUF_INNER_PARITY(parent->blkno + 1) | + (isNulls ? GBUF_NULLS : 0), innerTuple->size + sizeof(ItemIdData), &xlrec.initInner); } @@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state, newInnerBuffer = InvalidBuffer; } - /*---------- + /* * Because a WAL record can't involve more than four buffers, we can * only afford to deal with two leaf pages in each picksplit action, * ie the current page and at most one other. @@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state, * If we are splitting the root page (turning it from a leaf page into an * inner page), then no leaf tuples can go back to the current page; they * must all go somewhere else. - *---------- */ - if (current->blkno != SPGIST_HEAD_BLKNO) + if (!SpGistBlockIsRoot(current->blkno)) currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete; else currentFreeSpace = 0; /* prevent assigning any tuples to current */ @@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state, int curspace; int newspace; - newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF, + newLeafBuffer = SpGistGetBuffer(index, + GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), Min(totalLeafSizes, SPGIST_PAGE_CAPACITY), &xlrec.initDest); @@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state, xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; + xlrec.storesNulls = isNulls; leafdata = leafptr = (char *) palloc(totalLeafSizes); @@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state, * the root; in that case there's no need because we'll re-init the page * below. We do this first to make room for reinserting new leaf tuples. */ - if (current->blkno != SPGIST_HEAD_BLKNO) + if (!SpGistBlockIsRoot(current->blkno)) { /* * Init buffer instead of deleting individual tuples, but only if @@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state, nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder == PageGetMaxOffsetNumber(current->page)) { - SpGistInitBuffer(current->buffer, SPGIST_LEAF); + SpGistInitBuffer(current->buffer, + SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0)); xlrec.initSrc = true; } else if (isNew) @@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state, * Splitting root page, which was a leaf but now becomes inner page * (and so "current" continues to point at it) */ - Assert(current->blkno == SPGIST_HEAD_BLKNO); + Assert(SpGistBlockIsRoot(current->blkno)); Assert(redirectTuplePos == InvalidOffsetNumber); - SpGistInitBuffer(current->buffer, 0); + SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; xlrec.blknoInner = current->blkno; @@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state, XLogRecData rdata[5]; spgxlogAddNode xlrec; + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + /* Construct new inner tuple with additional node */ newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); @@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state, * allow only one inner tuple on the root page, and spgFormInnerTuple * always checks that inner tuples don't exceed the size of a page. */ - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) elog(ERROR, "cannot enlarge root tuple any more"); Assert(parent->buffer != InvalidBuffer); @@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state, spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + /* * Construct new prefix tuple, containing a single node with the * specified label. (We'll update the node's downlink to point to the @@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state, * For the space calculation, note that prefixTuple replaces innerTuple * but postfixTuple will be a new entry. */ - if (current->blkno == SPGIST_HEAD_BLKNO || + if (SpGistBlockIsRoot(current->blkno) || SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size < prefixTuple->size + postfixTuple->size + sizeof(ItemIdData)) { @@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state, */ void spgdoinsert(Relation index, SpGistState *state, - ItemPointer heapPtr, Datum datum) + ItemPointer heapPtr, Datum datum, bool isnull) { int level = 0; Datum leafDatum; @@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state, * value to be inserted is not toasted; FormIndexDatum doesn't guarantee * that. */ - if (state->attType.attlen == -1) + if (!isnull && state->attType.attlen == -1) datum = PointerGetDatum(PG_DETOAST_DATUM(datum)); leafDatum = datum; @@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state, * If it isn't gonna fit, and the opclass can't reduce the datum size by * suffixing, bail out now rather than getting into an endless loop. */ - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attType, leafDatum); + if (!isnull) + leafSize = SGLTHDRSZ + sizeof(ItemIdData) + + SpGistGetTypeSize(&state->attType, leafDatum); + else + leafSize = SGDTSIZE + sizeof(ItemIdData); if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK) ereport(ERROR, @@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state, RelationGetRelationName(index)), errhint("Values larger than a buffer page cannot be indexed."))); - /* Initialize "current" to the root page */ - current.blkno = SPGIST_HEAD_BLKNO; + /* Initialize "current" to the appropriate root page */ + current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; current.buffer = InvalidBuffer; current.page = NULL; current.offnum = FirstOffsetNumber; @@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state, * for doPickSplit to always have a leaf page at hand; so just * quietly limit our request to a page size. */ - current.buffer = SpGistGetBuffer(index, GBUF_LEAF, - Min(leafSize, - SPGIST_PAGE_CAPACITY), - &isNew); + current.buffer = + SpGistGetBuffer(index, + GBUF_LEAF | (isnull ? GBUF_NULLS : 0), + Min(leafSize, SPGIST_PAGE_CAPACITY), + &isNew); current.blkno = BufferGetBlockNumber(current.buffer); } else if (parent.buffer == InvalidBuffer || @@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state, } current.page = BufferGetPage(current.buffer); + /* should not arrive at a page of the wrong type */ + if (isnull ? !SpGistPageStoresNulls(current.page) : + SpGistPageStoresNulls(current.page)) + elog(ERROR, "SPGiST index page %u has wrong nulls flag", + current.blkno); + if (SpGistPageIsLeaf(current.page)) { SpGistLeafTuple leafTuple; int nToSplit, sizeToSplit; - leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum); + leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull); if (leafTuple->size + sizeof(ItemIdData) <= SpGistPageGetFreeSpace(current.page, 1)) { /* it fits on page, so insert it and we're done */ addLeafTuple(index, state, leafTuple, - ¤t, &parent, isNew); + ¤t, &parent, isnull, isNew); break; } else if ((sizeToSplit = @@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state, * chain to another leaf page rather than splitting it. */ Assert(!isNew); - moveLeafs(index, state, ¤t, &parent, leafTuple); + moveLeafs(index, state, ¤t, &parent, leafTuple, isnull); break; /* we're done */ } else { /* picksplit */ if (doPickSplit(index, state, ¤t, &parent, - leafTuple, level, isNew)) + leafTuple, level, isnull, isNew)) break; /* doPickSplit installed new tuples */ /* leaf tuple will not be inserted yet */ @@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state, memset(&out, 0, sizeof(out)); - procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); + if (!isnull) + { + /* use user-defined choose method */ + procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force "match" action (to insert to random subnode) */ + out.resultType = spgMatchNode; + } if (innerTuple->allTheSame) { @@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state, /* Adjust level as per opclass request */ level += out.result.matchNode.levelAdd; /* Replace leafDatum and recompute leafSize */ - leafDatum = out.result.matchNode.restDatum; - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attType, leafDatum); + if (!isnull) + { + leafDatum = out.result.matchNode.restDatum; + leafSize = SGLTHDRSZ + sizeof(ItemIdData) + + SpGistGetTypeSize(&state->attType, leafDatum); + } /* * Loop around and attempt to insert the new leafDatum |