diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2021-04-05 18:41:09 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2021-04-05 18:41:21 -0400 |
commit | 09c1c6ab4bc5764dd69c53ccfd43b2060b1fd090 (patch) | |
tree | 0b5eacefe5007d52388f475499b018cccd228c0e /src/backend/access/spgist/spgdoinsert.c | |
parent | 49f49defe7c0a330cca084de5da14ccdfdafc6a3 (diff) | |
download | postgresql-09c1c6ab4bc5764dd69c53ccfd43b2060b1fd090.tar.gz postgresql-09c1c6ab4bc5764dd69c53ccfd43b2060b1fd090.zip |
Support INCLUDE'd columns in SP-GiST.
Not much to say here: does what it says on the tin.
We steal a previously-always-zero bit from the nextOffset
field of leaf index tuples in order to track whether there
is a nulls bitmap. Otherwise it works about like included
columns in other index types.
Pavel Borisov, reviewed by Andrey Borodin and Anastasia Lubennikova,
and rather heavily editorialized on by me
Discussion: https://postgr.es/m/CALT9ZEFi-vMp4faht9f9Junb1nO3NOSjhpxTmbm1UGLMsLqiEQ@mail.gmail.com
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 134 |
1 files changed, 86 insertions, 48 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index e14c71abd07..4d380c99f06 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SpGistBlockIsRoot(current->blkno)) { /* Tuple is not part of a chain */ - leafTuple->nextOffset = InvalidOffsetNumber; + SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber); current->offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); @@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, PageGetItemId(current->page, current->offnum)); if (head->tupstate == SPGIST_LIVE) { - leafTuple->nextOffset = head->nextOffset; + SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head)); offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); @@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, */ head = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, current->offnum)); - head->nextOffset = offnum; + SGLT_SET_NEXTOFFSET(head, offnum); xlrec.offnumLeaf = offnum; xlrec.offnumHeadLeaf = current->offnum; } else if (head->tupstate == SPGIST_DEAD) { - leafTuple->nextOffset = InvalidOffsetNumber; + SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber); PageIndexTupleDelete(current->page, current->offnum); if (PageAddItem(current->page, (Item) leafTuple, leafTuple->size, @@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); /* Don't count it in result, because it won't go to other page */ } else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_NEXTOFFSET(it); } *nToSplit = n; @@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); /* We don't want to move it, so don't count it in size */ toDelete[nDelete] = i; nDelete++; @@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state, else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_NEXTOFFSET(it); } /* Find a leaf page that will hold them */ @@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state, * don't care). We're modifying the tuple on the source page * here, but it's okay since we're about to delete it. */ - it->nextOffset = r; + SGLT_SET_NEXTOFFSET(it, r); r = SpGistPageAddNewItem(state, npage, (Item) it, it->size, &startOffset, false); @@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state, } /* add the new tuple as well */ - newLeafTuple->nextOffset = r; + SGLT_SET_NEXTOFFSET(newLeafTuple, r); r = SpGistPageAddNewItem(state, npage, (Item) newLeafTuple, newLeafTuple->size, &startOffset, false); @@ -690,14 +690,16 @@ doPickSplit(Relation index, SpGistState *state, *nodes; Buffer newInnerBuffer, newLeafBuffer; - ItemPointerData *heapPtrs; uint8 *leafPageSelect; int *leafSizes; OffsetNumber *toDelete; OffsetNumber *toInsert; OffsetNumber redirectTuplePos = InvalidOffsetNumber; OffsetNumber startOffsets[2]; + SpGistLeafTuple *oldLeafs; SpGistLeafTuple *newLeafs; + Datum leafDatums[INDEX_MAX_KEYS]; + bool leafIsnulls[INDEX_MAX_KEYS]; int spaceToDelete; int currentFreeSpace; int totalLeafSizes; @@ -718,9 +720,9 @@ doPickSplit(Relation index, SpGistState *state, max = PageGetMaxOffsetNumber(current->page); n = max + 1; in.datums = (Datum *) palloc(sizeof(Datum) * n); - heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n); toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); + oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); @@ -752,7 +754,7 @@ doPickSplit(Relation index, SpGistState *state, { in.datums[nToInsert] = isNulls ? (Datum) 0 : SGLTDATUM(it, state); - heapPtrs[nToInsert] = it->heapPtr; + oldLeafs[nToInsert] = it; nToInsert++; toDelete[nToDelete] = i; nToDelete++; @@ -778,7 +780,7 @@ doPickSplit(Relation index, SpGistState *state, { in.datums[nToInsert] = isNulls ? (Datum) 0 : SGLTDATUM(it, state); - heapPtrs[nToInsert] = it->heapPtr; + oldLeafs[nToInsert] = it; nToInsert++; toDelete[nToDelete] = i; nToDelete++; @@ -790,7 +792,7 @@ doPickSplit(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); toDelete[nToDelete] = i; nToDelete++; /* replacing it with redirect will save no space */ @@ -798,7 +800,7 @@ doPickSplit(Relation index, SpGistState *state, else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_NEXTOFFSET(it); } } in.nTuples = nToInsert; @@ -811,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state, */ in.datums[in.nTuples] = isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state); - heapPtrs[in.nTuples] = newLeafTuple->heapPtr; + oldLeafs[in.nTuples] = newLeafTuple; in.nTuples++; memset(&out, 0, sizeof(out)); @@ -833,9 +835,19 @@ doPickSplit(Relation index, SpGistState *state, totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - out.leafTupleDatums[i], - false); + if (state->leafTupDesc->natts > 1) + spgDeformLeafTuple(oldLeafs[i], + state->leafTupDesc, + leafDatums, + leafIsnulls, + isNulls); + + leafDatums[spgKeyColumn] = out.leafTupleDatums[i]; + leafIsnulls[spgKeyColumn] = false; + + newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr, + leafDatums, + leafIsnulls); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } @@ -856,9 +868,22 @@ doPickSplit(Relation index, SpGistState *state, totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - (Datum) 0, - true); + if (state->leafTupDesc->natts > 1) + spgDeformLeafTuple(oldLeafs[i], + state->leafTupDesc, + leafDatums, + leafIsnulls, + isNulls); + + /* + * Nulls tree can contain only null key values. + */ + leafDatums[spgKeyColumn] = (Datum) 0; + leafIsnulls[spgKeyColumn] = true; + + newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr, + leafDatums, + leafIsnulls); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } @@ -1192,10 +1217,10 @@ doPickSplit(Relation index, SpGistState *state, if (ItemPointerIsValid(&nodes[n]->t_tid)) { Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock); - it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid); + SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid)); } else - it->nextOffset = InvalidOffsetNumber; + SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber); /* Insert it on page */ newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer), @@ -1885,10 +1910,12 @@ spgSplitNodeAction(Relation index, SpGistState *state, */ bool spgdoinsert(Relation index, SpGistState *state, - ItemPointer heapPtr, Datum datum, bool isnull) + ItemPointer heapPtr, Datum *datums, bool *isnulls) { + TupleDesc leafDescriptor = state->leafTupDesc; + bool isnull = isnulls[spgKeyColumn]; int level = 0; - Datum leafDatum; + Datum leafDatums[INDEX_MAX_KEYS]; int leafSize; SPPageDesc current, parent; @@ -1905,8 +1932,8 @@ spgdoinsert(Relation index, SpGistState *state, * Prepare the leaf datum to insert. * * If an optional "compress" method is provided, then call it to form the - * leaf datum from the input datum. Otherwise store the input datum as - * is. Since we don't use index_form_tuple in this AM, we have to make + * leaf key datum from the input datum. Otherwise, store the input datum + * as is. Since we don't use index_form_tuple in this AM, we have to make * sure value to be inserted is not toasted; FormIndexDatum doesn't * guarantee that. But we assume the "compress" method to return an * untoasted value. @@ -1918,32 +1945,43 @@ spgdoinsert(Relation index, SpGistState *state, FmgrInfo *compressProcinfo = NULL; compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC); - leafDatum = FunctionCall1Coll(compressProcinfo, - index->rd_indcollation[0], - datum); + leafDatums[spgKeyColumn] = + FunctionCall1Coll(compressProcinfo, + index->rd_indcollation[spgKeyColumn], + datums[spgKeyColumn]); } else { Assert(state->attLeafType.type == state->attType.type); if (state->attType.attlen == -1) - leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum)); + leafDatums[spgKeyColumn] = + PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn])); else - leafDatum = datum; + leafDatums[spgKeyColumn] = datums[spgKeyColumn]; } } else - leafDatum = (Datum) 0; + leafDatums[spgKeyColumn] = (Datum) 0; + + /* Likewise, ensure that any INCLUDE values are not toasted */ + for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++) + { + if (!isnulls[i]) + { + if (TupleDescAttr(leafDescriptor, i)->attlen == -1) + leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i])); + else + leafDatums[i] = datums[i]; + } + else + leafDatums[i] = (Datum) 0; + } /* - * Compute space needed for a leaf tuple containing the given datum. This - * must match spgFormLeafTuple. + * Compute space needed for a leaf tuple containing the given data. */ - leafSize = SGLTHDRSZ; - if (!isnull) - leafSize += SpGistGetLeafTypeSize(&state->attLeafType, leafDatum); - if (leafSize < SGDTSIZE) - leafSize = SGDTSIZE; + leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls); /* Account for an item pointer, too */ leafSize += sizeof(ItemIdData); @@ -2048,7 +2086,7 @@ spgdoinsert(Relation index, SpGistState *state, int nToSplit, sizeToSplit; - leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull); + leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls); if (leafTuple->size + sizeof(ItemIdData) <= SpGistPageGetFreeSpace(current.page, 1)) { @@ -2110,8 +2148,8 @@ spgdoinsert(Relation index, SpGistState *state, innerTuple = (SpGistInnerTuple) PageGetItem(current.page, PageGetItemId(current.page, current.offnum)); - in.datum = datum; - in.leafDatum = leafDatum; + in.datum = datums[spgKeyColumn]; + in.leafDatum = leafDatums[spgKeyColumn]; in.level = level; in.allTheSame = innerTuple->allTheSame; in.hasPrefix = (innerTuple->prefixSize > 0); @@ -2160,9 +2198,9 @@ spgdoinsert(Relation index, SpGistState *state, /* Replace leafDatum and recompute leafSize */ if (!isnull) { - leafDatum = out.result.matchNode.restDatum; - leafSize = SGLTHDRSZ + - SpGistGetLeafTypeSize(&state->attLeafType, leafDatum); + leafDatums[spgKeyColumn] = out.result.matchNode.restDatum; + leafSize = SpGistGetLeafTupleSize(leafDescriptor, + leafDatums, isnulls); leafSize += sizeof(ItemIdData); } |