aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgdoinsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r--src/backend/access/spgist/spgdoinsert.c134
1 files changed, 86 insertions, 48 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index e14c71abd07..4d380c99f06 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
- leafTuple->nextOffset = InvalidOffsetNumber;
+ SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
current->offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
@@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
PageGetItemId(current->page, current->offnum));
if (head->tupstate == SPGIST_LIVE)
{
- leafTuple->nextOffset = head->nextOffset;
+ SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
@@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
*/
head = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum));
- head->nextOffset = offnum;
+ SGLT_SET_NEXTOFFSET(head, offnum);
xlrec.offnumLeaf = offnum;
xlrec.offnumHeadLeaf = current->offnum;
}
else if (head->tupstate == SPGIST_DEAD)
{
- leafTuple->nextOffset = InvalidOffsetNumber;
+ SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
PageIndexTupleDelete(current->page, current->offnum);
if (PageAddItem(current->page,
(Item) leafTuple, leafTuple->size,
@@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
- Assert(it->nextOffset == InvalidOffsetNumber);
+ Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
/* Don't count it in result, because it won't go to other page */
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
- i = it->nextOffset;
+ i = SGLT_GET_NEXTOFFSET(it);
}
*nToSplit = n;
@@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
- Assert(it->nextOffset == InvalidOffsetNumber);
+ Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
/* We don't want to move it, so don't count it in size */
toDelete[nDelete] = i;
nDelete++;
@@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state,
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
- i = it->nextOffset;
+ i = SGLT_GET_NEXTOFFSET(it);
}
/* Find a leaf page that will hold them */
@@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state,
* don't care). We're modifying the tuple on the source page
* here, but it's okay since we're about to delete it.
*/
- it->nextOffset = r;
+ SGLT_SET_NEXTOFFSET(it, r);
r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
&startOffset, false);
@@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state,
}
/* add the new tuple as well */
- newLeafTuple->nextOffset = r;
+ SGLT_SET_NEXTOFFSET(newLeafTuple, r);
r = SpGistPageAddNewItem(state, npage,
(Item) newLeafTuple, newLeafTuple->size,
&startOffset, false);
@@ -690,14 +690,16 @@ doPickSplit(Relation index, SpGistState *state,
*nodes;
Buffer newInnerBuffer,
newLeafBuffer;
- ItemPointerData *heapPtrs;
uint8 *leafPageSelect;
int *leafSizes;
OffsetNumber *toDelete;
OffsetNumber *toInsert;
OffsetNumber redirectTuplePos = InvalidOffsetNumber;
OffsetNumber startOffsets[2];
+ SpGistLeafTuple *oldLeafs;
SpGistLeafTuple *newLeafs;
+ Datum leafDatums[INDEX_MAX_KEYS];
+ bool leafIsnulls[INDEX_MAX_KEYS];
int spaceToDelete;
int currentFreeSpace;
int totalLeafSizes;
@@ -718,9 +720,9 @@ doPickSplit(Relation index, SpGistState *state,
max = PageGetMaxOffsetNumber(current->page);
n = max + 1;
in.datums = (Datum *) palloc(sizeof(Datum) * n);
- heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
+ oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
@@ -752,7 +754,7 @@ doPickSplit(Relation index, SpGistState *state,
{
in.datums[nToInsert] =
isNulls ? (Datum) 0 : SGLTDATUM(it, state);
- heapPtrs[nToInsert] = it->heapPtr;
+ oldLeafs[nToInsert] = it;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
@@ -778,7 +780,7 @@ doPickSplit(Relation index, SpGistState *state,
{
in.datums[nToInsert] =
isNulls ? (Datum) 0 : SGLTDATUM(it, state);
- heapPtrs[nToInsert] = it->heapPtr;
+ oldLeafs[nToInsert] = it;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
@@ -790,7 +792,7 @@ doPickSplit(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
- Assert(it->nextOffset == InvalidOffsetNumber);
+ Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
toDelete[nToDelete] = i;
nToDelete++;
/* replacing it with redirect will save no space */
@@ -798,7 +800,7 @@ doPickSplit(Relation index, SpGistState *state,
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
- i = it->nextOffset;
+ i = SGLT_GET_NEXTOFFSET(it);
}
}
in.nTuples = nToInsert;
@@ -811,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state,
*/
in.datums[in.nTuples] =
isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
- heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
+ oldLeafs[in.nTuples] = newLeafTuple;
in.nTuples++;
memset(&out, 0, sizeof(out));
@@ -833,9 +835,19 @@ doPickSplit(Relation index, SpGistState *state,
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
- newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
- out.leafTupleDatums[i],
- false);
+ if (state->leafTupDesc->natts > 1)
+ spgDeformLeafTuple(oldLeafs[i],
+ state->leafTupDesc,
+ leafDatums,
+ leafIsnulls,
+ isNulls);
+
+ leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
+ leafIsnulls[spgKeyColumn] = false;
+
+ newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
+ leafDatums,
+ leafIsnulls);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
@@ -856,9 +868,22 @@ doPickSplit(Relation index, SpGistState *state,
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
- newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
- (Datum) 0,
- true);
+ if (state->leafTupDesc->natts > 1)
+ spgDeformLeafTuple(oldLeafs[i],
+ state->leafTupDesc,
+ leafDatums,
+ leafIsnulls,
+ isNulls);
+
+ /*
+ * Nulls tree can contain only null key values.
+ */
+ leafDatums[spgKeyColumn] = (Datum) 0;
+ leafIsnulls[spgKeyColumn] = true;
+
+ newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
+ leafDatums,
+ leafIsnulls);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
@@ -1192,10 +1217,10 @@ doPickSplit(Relation index, SpGistState *state,
if (ItemPointerIsValid(&nodes[n]->t_tid))
{
Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
- it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
+ SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
}
else
- it->nextOffset = InvalidOffsetNumber;
+ SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
/* Insert it on page */
newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
@@ -1885,10 +1910,12 @@ spgSplitNodeAction(Relation index, SpGistState *state,
*/
bool
spgdoinsert(Relation index, SpGistState *state,
- ItemPointer heapPtr, Datum datum, bool isnull)
+ ItemPointer heapPtr, Datum *datums, bool *isnulls)
{
+ TupleDesc leafDescriptor = state->leafTupDesc;
+ bool isnull = isnulls[spgKeyColumn];
int level = 0;
- Datum leafDatum;
+ Datum leafDatums[INDEX_MAX_KEYS];
int leafSize;
SPPageDesc current,
parent;
@@ -1905,8 +1932,8 @@ spgdoinsert(Relation index, SpGistState *state,
* Prepare the leaf datum to insert.
*
* If an optional "compress" method is provided, then call it to form the
- * leaf datum from the input datum. Otherwise store the input datum as
- * is. Since we don't use index_form_tuple in this AM, we have to make
+ * leaf key datum from the input datum. Otherwise, store the input datum
+ * as is. Since we don't use index_form_tuple in this AM, we have to make
* sure value to be inserted is not toasted; FormIndexDatum doesn't
* guarantee that. But we assume the "compress" method to return an
* untoasted value.
@@ -1918,32 +1945,43 @@ spgdoinsert(Relation index, SpGistState *state,
FmgrInfo *compressProcinfo = NULL;
compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
- leafDatum = FunctionCall1Coll(compressProcinfo,
- index->rd_indcollation[0],
- datum);
+ leafDatums[spgKeyColumn] =
+ FunctionCall1Coll(compressProcinfo,
+ index->rd_indcollation[spgKeyColumn],
+ datums[spgKeyColumn]);
}
else
{
Assert(state->attLeafType.type == state->attType.type);
if (state->attType.attlen == -1)
- leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
+ leafDatums[spgKeyColumn] =
+ PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
else
- leafDatum = datum;
+ leafDatums[spgKeyColumn] = datums[spgKeyColumn];
}
}
else
- leafDatum = (Datum) 0;
+ leafDatums[spgKeyColumn] = (Datum) 0;
+
+ /* Likewise, ensure that any INCLUDE values are not toasted */
+ for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
+ {
+ if (!isnulls[i])
+ {
+ if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
+ leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
+ else
+ leafDatums[i] = datums[i];
+ }
+ else
+ leafDatums[i] = (Datum) 0;
+ }
/*
- * Compute space needed for a leaf tuple containing the given datum. This
- * must match spgFormLeafTuple.
+ * Compute space needed for a leaf tuple containing the given data.
*/
- leafSize = SGLTHDRSZ;
- if (!isnull)
- leafSize += SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
- if (leafSize < SGDTSIZE)
- leafSize = SGDTSIZE;
+ leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
/* Account for an item pointer, too */
leafSize += sizeof(ItemIdData);
@@ -2048,7 +2086,7 @@ spgdoinsert(Relation index, SpGistState *state,
int nToSplit,
sizeToSplit;
- leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
+ leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
@@ -2110,8 +2148,8 @@ spgdoinsert(Relation index, SpGistState *state,
innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
PageGetItemId(current.page, current.offnum));
- in.datum = datum;
- in.leafDatum = leafDatum;
+ in.datum = datums[spgKeyColumn];
+ in.leafDatum = leafDatums[spgKeyColumn];
in.level = level;
in.allTheSame = innerTuple->allTheSame;
in.hasPrefix = (innerTuple->prefixSize > 0);
@@ -2160,9 +2198,9 @@ spgdoinsert(Relation index, SpGistState *state,
/* Replace leafDatum and recompute leafSize */
if (!isnull)
{
- leafDatum = out.result.matchNode.restDatum;
- leafSize = SGLTHDRSZ +
- SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
+ leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
+ leafSize = SpGistGetLeafTupleSize(leafDescriptor,
+ leafDatums, isnulls);
leafSize += sizeof(ItemIdData);
}