aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVadim B. Mikheev <vadim4o@yahoo.com>1997-03-24 08:48:16 +0000
committerVadim B. Mikheev <vadim4o@yahoo.com>1997-03-24 08:48:16 +0000
commit14f6b387b11841b18db12dd89983c4b98dd38ee4 (patch)
tree68c8a8b6e50f83d6e8d8031b99fb1786723c404c
parent427a87911d5cfc008caae84bb9d082a0aa05f021 (diff)
downloadpostgresql-14f6b387b11841b18db12dd89983c4b98dd38ee4.tar.gz
postgresql-14f6b387b11841b18db12dd89983c4b98dd38ee4.zip
+ NULLs handling
Actually required by multi-column indices support. We still don't use btree for 'A is (not) null', but now btree keep items with NULL attrs using single rule for placing/finding items on pages: NULLs greater NOT_NULLs and NULL = NULL. + Bulkload code (nbtsort.c) support for multi-column indices building and NULLs. + Fix for btendscan()->pfree(scanopaque) from Chris Dunlop.
-rw-r--r--src/backend/access/nbtree/nbtinsert.c120
-rw-r--r--src/backend/access/nbtree/nbtree.c28
-rw-r--r--src/backend/access/nbtree/nbtsearch.c131
-rw-r--r--src/backend/access/nbtree/nbtsort.c117
-rw-r--r--src/backend/access/nbtree/nbtutils.c143
5 files changed, 432 insertions, 107 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 89a80c23970..06c54a456dc 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.10 1997/01/25 21:08:09 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.11 1997/03/24 08:48:09 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -19,6 +19,7 @@
#include <access/nbtree.h>
#include <access/heapam.h>
#include <storage/bufmgr.h>
+#include <fmgr.h>
#ifndef HAVE_MEMMOVE
# include <regex/utils.h>
@@ -33,6 +34,7 @@ static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, Oid bti_oid, BTItem newItem);
+static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
/*
* _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -104,8 +106,16 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
itupdesc = RelationGetTupleDescriptor(rel);
nbuf = InvalidBuffer;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ /*
+ * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
+ * this's how we handling NULLs - and so we must not use
+ * _bt_compare in real comparison, but only for
+ * ordering/finding items on pages. - vadim 03/24/97
+
while ( !_bt_compare (rel, itupdesc, page,
natts, itup_scankey, offset) )
+ */
+ while ( _bt_isequal (itupdesc, page, offset, natts, itup_scankey) )
{ /* they're equal */
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
itup = &(btitem->bti_itup);
@@ -123,8 +133,8 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
{ /* move right ? */
if ( P_RIGHTMOST (opaque) )
break;
- if ( _bt_compare (rel, itupdesc, page,
- natts, itup_scankey, P_HIKEY) )
+ if ( !_bt_isequal (itupdesc, page, P_HIKEY,
+ natts, itup_scankey) )
break;
/*
* min key of the right page is the same,
@@ -939,18 +949,70 @@ _bt_itemcmp(Relation rel,
IndexTuple indexTuple1, indexTuple2;
Datum attrDatum1, attrDatum2;
int i;
- bool isNull;
+ bool isFirstNull, isSecondNull;
bool compare;
+ bool useEqual = false;
+
+ if ( strat == BTLessEqualStrategyNumber )
+ {
+ useEqual = true;
+ strat = BTLessStrategyNumber;
+ }
+ else if ( strat == BTGreaterEqualStrategyNumber )
+ {
+ useEqual = true;
+ strat = BTGreaterStrategyNumber;
+ }
tupDes = RelationGetTupleDescriptor(rel);
indexTuple1 = &(item1->bti_itup);
indexTuple2 = &(item2->bti_itup);
for (i = 1; i <= keysz; i++) {
- attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isNull);
- attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isNull);
- compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
- if (!compare) {
+ attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull);
+ attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull);
+
+ /* see comments about NULLs handling in btbuild */
+ if ( isFirstNull ) /* attr in item1 is NULL */
+ {
+ if ( isSecondNull ) /* attr in item2 is NULL too */
+ compare = ( strat == BTEqualStrategyNumber ) ? true : false;
+ else
+ compare = ( strat == BTGreaterStrategyNumber ) ? true : false;
+ }
+ else if ( isSecondNull ) /* attr in item1 is NOT_NULL and */
+ { /* and attr in item2 is NULL */
+ compare = ( strat == BTLessStrategyNumber ) ? true : false;
+ }
+ else
+ {
+ compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
+ }
+
+ if ( compare ) /* true for one of ">, <, =" */
+ {
+ if ( strat != BTEqualStrategyNumber )
+ return (true);
+ }
+ else /* false for one of ">, <, =" */
+ {
+ if ( strat == BTEqualStrategyNumber )
+ return (false);
+ /*
+ * if original strat was "<=, >=" OR
+ * "<, >" but some attribute(s) left
+ * - need to test for Equality
+ */
+ if ( useEqual || i < keysz )
+ {
+ if ( isFirstNull || isSecondNull )
+ compare = ( isFirstNull && isSecondNull ) ? true : false;
+ else
+ compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber,
+ attrDatum1, attrDatum2);
+ if ( compare ) /* item1' and item2' attributes are equal */
+ continue; /* - try to compare next attributes */
+ }
return (false);
}
}
@@ -1015,3 +1077,45 @@ _bt_updateitem(Relation rel,
ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid));
}
+
+/*
+ * _bt_isequal - used in _bt_doinsert in check for duplicates.
+ *
+ * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
+ */
+static bool
+_bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
+ int keysz, ScanKey scankey)
+{
+ Datum datum;
+ BTItem btitem;
+ IndexTuple itup;
+ ScanKey entry;
+ AttrNumber attno;
+ long result;
+ int i;
+ bool null;
+
+ btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
+ itup = &(btitem->bti_itup);
+
+ for (i = 1; i <= keysz; i++)
+ {
+ entry = &scankey[i - 1];
+ attno = entry->sk_attno;
+ Assert (attno == i);
+ datum = index_getattr(itup, attno, itupdesc, &null);
+
+ /* NULLs are not equal */
+ if ( entry->sk_flags & SK_ISNULL || null )
+ return (false);
+
+ result = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
+ entry->sk_argument, datum);
+ if (result != 0)
+ return (false);
+ }
+
+ /* by here, the keys are equal */
+ return (true);
+}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 0fe6787c010..e75814dd8e8 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.16 1997/03/18 18:38:35 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.17 1997/03/24 08:48:11 vadim Exp $
*
* NOTES
* This file contains only the public interface routines.
@@ -219,11 +219,21 @@ btbuild(Relation heap,
* artifact of the strategy map architecture chosen in 1986, not
* of the way nulls are handled here.
*/
-
+ /*
+ * New comments: NULLs handling.
+ * While we can't do NULL comparison, we can follow simple
+ * rule for ordering items on btree pages - NULLs greater
+ * NOT_NULLs and NULL = NULL is TRUE. Sure, it's just rule
+ * for placing/finding items and no more - keytest'll return
+ * FALSE for a = 5 for items having 'a' isNULL.
+ * Look at _bt_skeycmp, _bt_compare and _bt_itemcmp for
+ * how it works. - vadim 03/23/97
+
if (itup->t_info & INDEX_NULL_MASK) {
pfree(itup);
continue;
}
+ */
itup->t_tid = htup->t_ctid;
btitem = _bt_formitem(itup);
@@ -328,8 +338,12 @@ btinsert(Relation rel, Datum *datum, char *nulls, ItemPointer ht_ctid, Relation
itup = index_formtuple(RelationGetTupleDescriptor(rel), datum, nulls);
itup->t_tid = *ht_ctid;
+ /*
+ * See comments in btbuild.
+
if (itup->t_info & INDEX_NULL_MASK)
return ((InsertIndexResult) NULL);
+ */
btitem = _bt_formitem(itup);
@@ -423,7 +437,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey)
/* reset the scan key */
so->numberOfKeys = scan->numberOfKeys;
- so->numberOfFirstKeys = 0;
+ so->numberOfFirstKeys = 0; /* may be changed by _bt_orderkeys */
so->qual_ok = 1; /* may be changed by _bt_orderkeys */
if (scan->numberOfKeys > 0) {
memmove(scan->keyData,
@@ -433,10 +447,7 @@ btrescan(IndexScanDesc scan, bool fromEnd, ScanKey scankey)
scankey,
so->numberOfKeys * sizeof(ScanKeyData));
/* order the keys in the qualification */
- if (so->numberOfKeys > 1)
- _bt_orderkeys(scan->relation, so);
- else
- so->numberOfFirstKeys = 1;
+ _bt_orderkeys(scan->relation, so);
}
/* finally, be sure that the scan exploits the tree order */
@@ -499,9 +510,10 @@ btendscan(IndexScanDesc scan)
ItemPointerSetInvalid(iptr);
}
- pfree (scan->opaque);
if ( so->keyData != (ScanKey) NULL )
pfree (so->keyData);
+ pfree (so);
+
_bt_dropscan(scan);
}
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 2e802ee8527..99fb38f18ce 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.15 1997/03/18 18:38:41 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.16 1997/03/24 08:48:12 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -19,6 +19,7 @@
#include <storage/bufpage.h>
#include <storage/bufmgr.h>
#include <access/nbtree.h>
+#include <catalog/pg_proc.h>
#ifndef HAVE_MEMMOVE
# include <regex/utils.h>
@@ -238,7 +239,20 @@ _bt_skeycmp(Relation rel,
Datum keyDatum;
bool compare;
bool isNull;
+ bool useEqual = false;
+ bool keyNull;
+ if ( strat == BTLessEqualStrategyNumber )
+ {
+ useEqual = true;
+ strat = BTLessStrategyNumber;
+ }
+ else if ( strat == BTGreaterEqualStrategyNumber )
+ {
+ useEqual = true;
+ strat = BTGreaterStrategyNumber;
+ }
+
item = (BTItem) PageGetItem(page, itemid);
indexTuple = &(item->bti_itup);
@@ -248,27 +262,60 @@ _bt_skeycmp(Relation rel,
for (i=1; i <= keysz; i++) {
entry = &scankey[i-1];
+ Assert ( entry->sk_attno == i );
attrDatum = index_getattr(indexTuple,
entry->sk_attno,
tupDes,
&isNull);
keyDatum = entry->sk_argument;
-
- /*
- * This may happen in a nested loop if an attribute used
- * as scan key is null. DZ 29-10-1996
- */
- if ((entry->sk_flags & SK_ISNULL) || (isNull)) {
- if ((entry->sk_flags & SK_ISNULL) && (isNull)) {
- return (true);
- } else {
- return (false);
- }
+
+ /* see comments about NULLs handling in btbuild */
+ if ( entry->sk_flags & SK_ISNULL ) /* key is NULL */
+ {
+ Assert ( entry->sk_procedure == NullValueRegProcedure );
+ keyNull = true;
+ if ( isNull )
+ compare = ( strat == BTEqualStrategyNumber ) ? true : false;
+ else
+ compare = ( strat == BTGreaterStrategyNumber ) ? true : false;
+ }
+ else if ( isNull ) /* key is NOT_NULL and item is NULL */
+ {
+ keyNull = false;
+ compare = ( strat == BTLessStrategyNumber ) ? true : false;
+ }
+ else
+ {
+ keyNull = false;
+ compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum);
}
- compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum);
- if (!compare)
+ if ( compare ) /* true for one of ">, <, =" */
+ {
+ if ( strat != BTEqualStrategyNumber )
+ return (true);
+ }
+ else /* false for one of ">, <, =" */
+ {
+ if ( strat == BTEqualStrategyNumber )
+ return (false);
+ /*
+ * if original strat was "<=, >=" OR
+ * "<, >" but some attribute(s) left
+ * - need to test for Equality
+ */
+ if ( useEqual || i < keysz )
+ {
+ if ( keyNull || isNull )
+ compare = ( keyNull && isNull ) ? true : false;
+ else
+ compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber,
+ keyDatum, attrDatum);
+ if ( compare ) /* key' and item' attributes are equal */
+ continue; /* - try to compare next attributes */
+ }
return (false);
+ }
}
return (true);
@@ -520,20 +567,24 @@ _bt_compare(Relation rel,
attno = entry->sk_attno;
datum = index_getattr(itup, attno, itupdesc, &null);
- /*
- * This may happen in a nested loop if an attribute used
- * as scan key is null. DZ 29-10-1996
- */
- if ((entry->sk_flags & SK_ISNULL) || (null)) {
- if ((entry->sk_flags & SK_ISNULL) && (null)) {
- return (0);
- } else {
- return (null ? +1 : -1);
- }
+ /* see comments about NULLs handling in btbuild */
+ if ( entry->sk_flags & SK_ISNULL ) /* key is NULL */
+ {
+ Assert ( entry->sk_procedure == NullValueRegProcedure );
+ if ( null )
+ tmpres = (long) 0; /* NULL "=" NULL */
+ else
+ tmpres = (long) 1; /* NULL ">" NOT_NULL */
+ }
+ else if ( null ) /* key is NOT_NULL and item is NULL */
+ {
+ tmpres = (long) -1; /* NOT_NULL "<" NULL */
+ }
+ else
+ {
+ tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
+ entry->sk_argument, datum);
}
-
- tmpres = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure,
- entry->sk_argument, datum);
result = tmpres;
/* if the keys are unequal, return the difference */
@@ -566,6 +617,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
BTItem btitem;
IndexTuple itup;
BTScanOpaque so;
+ Size keysok;
rel = scan->relation;
so = (BTScanOpaque) scan->opaque;
@@ -596,8 +648,9 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup;
- if (_bt_checkqual(scan, itup))
+ if ( _bt_checkkeys (scan, itup, &keysok) )
{
+ Assert (keysok == so->numberOfKeys);
res = FormRetrieveIndexResult(current, &(itup->t_tid));
/* remember which buffer we have pinned and locked */
@@ -605,7 +658,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
return (res);
}
- } while ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) );
+ } while ( keysok >= so->numberOfFirstKeys );
ItemPointerSetInvalid(current);
so->btso_curbuf = InvalidBuffer;
@@ -644,6 +697,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
int result;
BTScanOpaque so;
ScanKeyData skdata;
+ Size keysok;
so = (BTScanOpaque) scan->opaque;
if ( so->qual_ok == 0 ) /* may be set by _bt_orderkeys */
@@ -663,6 +717,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* ordered to take advantage of index ordering) to position ourselves
* at the right place in the scan.
*/
+ /* _bt_orderkeys disallows it, but it's place to add some code latter */
+ if ( so->keyData[0].sk_flags & SK_ISNULL )
+ {
+ elog (WARN, "_bt_first: btree doesn't support is(not)null, yet");
+ return ((RetrieveIndexResult) NULL);
+ }
proc = index_getprocid(rel, 1, BTORDER_PROC);
ScanKeyEntryInitialize(&skdata, so->keyData[0].sk_flags, 1, proc,
so->keyData[0].sk_argument);
@@ -706,6 +766,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
*/
result = _bt_compare(rel, itupdesc, page, 1, &skdata, offnum);
+
+ /* it's yet other place to add some code latter for is(not)null */
+
strat = _bt_getstrat(rel, 1, so->keyData[0].sk_procedure);
switch (strat) {
@@ -798,14 +861,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup;
- if ( _bt_checkqual(scan, itup) )
+ if ( _bt_checkkeys (scan, itup, &keysok) )
{
res = FormRetrieveIndexResult(current, &(itup->t_tid));
/* remember which buffer we have pinned */
so->btso_curbuf = buf;
}
- else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) )
+ else if ( keysok >= so->numberOfFirstKeys )
{
so->btso_curbuf = buf;
return (_bt_next (scan, dir));
@@ -1081,6 +1144,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
IndexTuple itup;
BTScanOpaque so;
RetrieveIndexResult res;
+ Size keysok;
rel = scan->relation;
current = &(scan->currentItemData);
@@ -1223,13 +1287,14 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
itup = &(btitem->bti_itup);
/* see if we picked a winner */
- if (_bt_checkqual(scan, itup)) {
+ if ( _bt_checkkeys (scan, itup, &keysok) )
+ {
res = FormRetrieveIndexResult(current, &(itup->t_tid));
/* remember which buffer we have pinned */
so->btso_curbuf = buf;
}
- else if ( _bt_checkforkeys (scan, itup, so->numberOfFirstKeys) )
+ else if ( keysok >= so->numberOfFirstKeys )
{
so->btso_curbuf = buf;
return (_bt_next (scan, dir));
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 7c3d5ba5639..7ec926f9e24 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -5,7 +5,7 @@
*
*
* IDENTIFICATION
- * $Id: nbtsort.c,v 1.12 1997/02/25 03:38:23 scrappy Exp $
+ * $Id: nbtsort.c,v 1.13 1997/03/24 08:48:15 vadim Exp $
*
* NOTES
*
@@ -137,11 +137,13 @@ typedef struct {
* *-------------------------------------------------------------------------
*/
typedef struct {
- Datum btsk_datum;
+ Datum *btsk_datum;
+ char *btsk_nulls;
BTItem btsk_item;
} BTSortKey;
static Relation _bt_sortrel;
+static int _bt_nattr;
static BTSpool * _bt_inspool;
static void
@@ -149,26 +151,51 @@ _bt_isortcmpinit(Relation index, BTSpool *spool)
{
_bt_sortrel = index;
_bt_inspool = spool;
+ _bt_nattr = index->rd_att->natts;
}
static int
_bt_isortcmp(BTSortKey *k1, BTSortKey *k2)
{
- if (k1->btsk_item == (BTItem) NULL) {
- if (k2->btsk_item == (BTItem) NULL) {
+ Datum *k1_datum = k1->btsk_datum;
+ Datum *k2_datum = k2->btsk_datum;
+ char *k1_nulls = k1->btsk_nulls;
+ char *k2_nulls = k2->btsk_nulls;
+ bool equal_isnull = false;
+ int i;
+
+ if (k1->btsk_item == (BTItem) NULL)
+ {
+ if (k2->btsk_item == (BTItem) NULL)
return(0); /* 1 = 2 */
- }
return(1); /* 1 > 2 */
- } else if (k2->btsk_item == (BTItem) NULL) {
- return(-1); /* 1 < 2 */
- } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber,
- k1->btsk_datum, k2->btsk_datum)) {
- return(1); /* 1 > 2 */
- } else if (_bt_invokestrat(_bt_sortrel, 1, BTGreaterStrategyNumber,
- k2->btsk_datum, k1->btsk_datum)) {
+ }
+ else if (k2->btsk_item == (BTItem) NULL)
return(-1); /* 1 < 2 */
+
+ for (i = 0; i < _bt_nattr; i++)
+ {
+ if ( k1_nulls[i] != ' ' ) /* k1 attr is NULL */
+ {
+ if ( k2_nulls[i] != ' ' ) /* the same for k2 */
+ {
+ equal_isnull = true;
+ continue;
+ }
+ return (1); /* NULL ">" NOT_NULL */
+ }
+ else if ( k2_nulls[i] != ' ' ) /* k2 attr is NULL */
+ return (-1); /* NOT_NULL "<" NULL */
+
+ if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber,
+ k1_datum[i], k2_datum[i]))
+ return(1); /* 1 > 2 */
+ else if (_bt_invokestrat(_bt_sortrel, i+1, BTGreaterStrategyNumber,
+ k2_datum[i], k1_datum[i]))
+ return(-1); /* 1 < 2 */
}
- if ( _bt_inspool->isunique )
+
+ if ( _bt_inspool->isunique && !equal_isnull )
{
_bt_spooldestroy ((void*)_bt_inspool);
elog (WARN, "Cannot create unique index. Table contains non-unique values");
@@ -180,15 +207,29 @@ static void
_bt_setsortkey(Relation index, BTItem bti, BTSortKey *sk)
{
sk->btsk_item = (BTItem) NULL;
- sk->btsk_datum = (Datum) NULL;
- if (bti != (BTItem) NULL) {
+ sk->btsk_datum = (Datum*) NULL;
+ sk->btsk_nulls = (char*) NULL;
+
+ if (bti != (BTItem) NULL)
+ {
+ IndexTuple it = &(bti->bti_itup);
+ TupleDesc itdesc = index->rd_att;
+ Datum *dp = (Datum*) palloc (_bt_nattr * sizeof (Datum));
+ char *np = (char*) palloc (_bt_nattr * sizeof (char));
bool isnull;
- Datum d = index_getattr(&(bti->bti_itup), 1, index->rd_att, &isnull);
-
- if (!isnull) {
- sk->btsk_item = bti;
- sk->btsk_datum = d;
+ int i;
+
+ for (i = 0; i < _bt_nattr; i++)
+ {
+ dp[i] = index_getattr(it, i+1, itdesc, &isnull);
+ if ( isnull )
+ np[i] = 'n';
+ else
+ np[i] = ' ';
}
+ sk->btsk_item = bti;
+ sk->btsk_datum = dp;
+ sk->btsk_nulls = np;
}
}
@@ -622,27 +663,25 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
BTItem bti;
char *pos;
int btisz;
+ int it_ntup = itape->bttb_ntup;
int i;
/*
* build an array of pointers to the BTItemDatas on the input
* block.
*/
- if (itape->bttb_ntup > 0) {
+ if (it_ntup > 0) {
parray =
- (BTSortKey *) palloc(itape->bttb_ntup * sizeof(BTSortKey));
- if (parray == (BTSortKey *) NULL) {
- elog(WARN, "_bt_spool: out of memory");
- }
+ (BTSortKey *) palloc(it_ntup * sizeof(BTSortKey));
pos = itape->bttb_data;
- for (i = 0; i < itape->bttb_ntup; ++i) {
+ for (i = 0; i < it_ntup; ++i) {
_bt_setsortkey(index, _bt_tapenext(itape, &pos), &(parray[i]));
}
/*
* qsort the pointer array.
*/
- qsort((void *) parray, itape->bttb_ntup, sizeof(BTSortKey),
+ qsort((void *) parray, it_ntup, sizeof(BTSortKey),
(int (*)(const void *,const void *))_bt_isortcmp);
}
@@ -656,7 +695,7 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
* block..)
*/
otape = btspool->bts_otape[btspool->bts_tape];
- for (i = 0; i < itape->bttb_ntup; ++i) {
+ for (i = 0; i < it_ntup; ++i) {
bti = parray[i].btsk_item;
btisz = BTITEMSZ(bti);
btisz = DOUBLEALIGN(btisz);
@@ -694,7 +733,15 @@ _bt_spool(Relation index, BTItem btitem, void *spool)
/*
* destroy the pointer array.
*/
- if (parray != (BTSortKey *) NULL) {
+ if (parray != (BTSortKey *) NULL)
+ {
+ for (i = 0; i < it_ntup; i++)
+ {
+ if ( parray[i].btsk_datum != (Datum*) NULL )
+ pfree ((void*)(parray[i].btsk_datum));
+ if ( parray[i].btsk_nulls != (char*) NULL )
+ pfree ((void*)(parray[i].btsk_nulls));
+ }
pfree((void *) parray);
}
}
@@ -976,7 +1023,7 @@ _bt_buildadd(Relation index, void *pstate, BTItem bti, int flags)
#endif
if (last_bti == (BTItem) NULL) {
first_off = P_FIRSTKEY;
- } else if (!_bt_itemcmp(index, 1, bti, last_bti, BTEqualStrategyNumber)) {
+ } else if (!_bt_itemcmp(index, _bt_nattr, bti, last_bti, BTEqualStrategyNumber)) {
first_off = off;
}
last_off = off;
@@ -1044,6 +1091,7 @@ _bt_merge(Relation index, BTSpool *btspool)
BTPageState *state;
BTPriQueue q;
BTPriQueueElem e;
+ BTSortKey btsk;
BTItem bti;
BTTapeBlock *itape;
BTTapeBlock *otape;
@@ -1136,7 +1184,8 @@ _bt_merge(Relation index, BTSpool *btspool)
* if it hits either End-Of-Run or EOF.
*/
t = e.btpqe_tape;
- bti = e.btpqe_item.btsk_item;
+ btsk = e.btpqe_item;
+ bti = btsk.btsk_item;
if (bti != (BTItem) NULL) {
btisz = BTITEMSZ(bti);
btisz = DOUBLEALIGN(btisz);
@@ -1177,6 +1226,12 @@ _bt_merge(Relation index, BTSpool *btspool)
}
#endif /* FASTBUILD_DEBUG && FASTBUILD_MERGE */
}
+
+ if ( btsk.btsk_datum != (Datum*) NULL )
+ pfree ((void*)(btsk.btsk_datum));
+ if ( btsk.btsk_nulls != (char*) NULL )
+ pfree ((void*)(btsk.btsk_nulls));
+
}
itape = btspool->bts_itape[t];
if (!tapedone[t]) {
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 6d0a40ef132..fa2ff890fe9 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.8 1997/03/18 18:38:46 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.9 1997/03/24 08:48:16 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -20,6 +20,11 @@
#include <access/nbtree.h>
#include <access/istrat.h>
#include <access/iqual.h>
+#include <catalog/pg_proc.h>
+#include <executor/execdebug.h>
+
+extern int NIndexTupleProcessed;
+
#ifndef HAVE_MEMMOVE
# include <regex/utils.h>
@@ -37,6 +42,7 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
Datum arg;
RegProcedure proc;
bool null;
+ bits16 flag;
natts = rel->rd_rel->relnatts;
itupdesc = RelationGetTupleDescriptor(rel);
@@ -45,9 +51,18 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
for (i = 0; i < natts; i++) {
arg = index_getattr(itup, i + 1, itupdesc, &null);
- proc = index_getprocid(rel, i + 1, BTORDER_PROC);
+ if ( null )
+ {
+ proc = NullValueRegProcedure;
+ flag = SK_ISNULL;
+ }
+ else
+ {
+ proc = index_getprocid(rel, i + 1, BTORDER_PROC);
+ flag = 0x0;
+ }
ScanKeyEntryInitialize(&skey[i],
- 0x0, (AttrNumber) (i + 1), proc, arg);
+ flag, (AttrNumber) (i + 1), proc, arg);
}
return (skey);
@@ -90,22 +105,35 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
int i, j;
int init[BTMaxStrategyNumber+1];
ScanKey key;
- uint16 numberOfKeys, new_numberOfKeys = 0;
+ uint16 numberOfKeys = so->numberOfKeys;
+ uint16 new_numberOfKeys = 0;
AttrNumber attno = 1;
- numberOfKeys = so->numberOfKeys;
+ if ( numberOfKeys < 1 )
+ return;
+
key = so->keyData;
- if ( numberOfKeys <= 1 )
+ cur = &key[0];
+ if ( cur->sk_attno != 1 )
+ elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed");
+
+ if ( numberOfKeys == 1 )
+ {
+ /*
+ * We don't use indices for 'A is null' and 'A is not null'
+ * currently and 'A < = > <> NULL' is non-sense' - so
+ * qual is not Ok. - vadim 03/21/97
+ */
+ if ( cur->sk_flags & SK_ISNULL )
+ so->qual_ok = 0;
+ so->numberOfFirstKeys = 1;
return;
+ }
/* get space for the modified array of keys */
nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData);
xform = (ScanKey) palloc(nbytes);
-
- cur = &key[0];
- if ( cur->sk_attno != 1 )
- elog (WARN, "_bt_orderkeys: key(s) for attribute 1 missed");
memset(xform, 0, nbytes);
map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
@@ -119,6 +147,10 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
{
if ( i < numberOfKeys )
cur = &key[i];
+
+ if ( cur->sk_flags & SK_ISNULL ) /* see comments above */
+ so->qual_ok = 0;
+
if ( i == numberOfKeys || cur->sk_attno != attno )
{
if ( cur->sk_attno != attno + 1 && i < numberOfKeys )
@@ -243,6 +275,32 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
pfree(xform);
}
+BTItem
+_bt_formitem(IndexTuple itup)
+{
+ int nbytes_btitem;
+ BTItem btitem;
+ Size tuplen;
+ extern Oid newoid();
+
+ /* see comments in btbuild
+
+ if (itup->t_info & INDEX_NULL_MASK)
+ elog(WARN, "btree indices cannot include null keys");
+ */
+
+ /* make a copy of the index tuple with room for the sequence number */
+ tuplen = IndexTupleSize(itup);
+ nbytes_btitem = tuplen +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+
+ btitem = (BTItem) palloc(nbytes_btitem);
+ memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen);
+
+ btitem->bti_oid = newoid();
+ return (btitem);
+}
+
bool
_bt_checkqual(IndexScanDesc scan, IndexTuple itup)
{
@@ -269,26 +327,57 @@ _bt_checkforkeys(IndexScanDesc scan, IndexTuple itup, Size keysz)
return (true);
}
-BTItem
-_bt_formitem(IndexTuple itup)
+bool
+_bt_checkkeys (IndexScanDesc scan, IndexTuple tuple, Size *keysok)
{
- int nbytes_btitem;
- BTItem btitem;
- Size tuplen;
- extern Oid newoid();
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ Size keysz = so->numberOfKeys;
+ TupleDesc tupdesc;
+ ScanKey key;
+ Datum datum;
+ bool isNull;
+ int test;
- /* disallow nulls in btree keys */
- if (itup->t_info & INDEX_NULL_MASK)
- elog(WARN, "btree indices cannot include null keys");
+ *keysok = 0;
+ if ( keysz == 0 )
+ return (true);
- /* make a copy of the index tuple with room for the sequence number */
- tuplen = IndexTupleSize(itup);
- nbytes_btitem = tuplen +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
+ key = so->keyData;
+ tupdesc = RelationGetTupleDescriptor(scan->relation);
- btitem = (BTItem) palloc(nbytes_btitem);
- memmove((char *) &(btitem->bti_itup), (char *) itup, tuplen);
+ IncrIndexProcessed();
- btitem->bti_oid = newoid();
- return (btitem);
+ while (keysz > 0)
+ {
+ datum = index_getattr(tuple,
+ key[0].sk_attno,
+ tupdesc,
+ &isNull);
+
+ /* btree doesn't support 'A is null' clauses, yet */
+ if ( isNull || key[0].sk_flags & SK_ISNULL )
+ {
+ return (false);
+ }
+
+ if (key[0].sk_flags & SK_COMMUTE) {
+ test = (int) (*(key[0].sk_func))
+ (DatumGetPointer(key[0].sk_argument),
+ datum);
+ } else {
+ test = (int) (*(key[0].sk_func))
+ (datum,
+ DatumGetPointer(key[0].sk_argument));
+ }
+
+ if (!test == !(key[0].sk_flags & SK_NEGATE)) {
+ return (false);
+ }
+
+ keysz -= 1;
+ key++;
+ (*keysok)++;
+ }
+
+ return (true);
}