aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/Makefile4
-rw-r--r--src/backend/access/gin/Makefile32
-rw-r--r--src/backend/access/gin/README153
-rw-r--r--src/backend/access/gin/ginarrayproc.c261
-rw-r--r--src/backend/access/gin/ginbtree.c394
-rw-r--r--src/backend/access/gin/ginbulk.c155
-rw-r--r--src/backend/access/gin/gindatapage.c569
-rw-r--r--src/backend/access/gin/ginentrypage.c532
-rw-r--r--src/backend/access/gin/ginget.c412
-rw-r--r--src/backend/access/gin/gininsert.c374
-rw-r--r--src/backend/access/gin/ginscan.c256
-rw-r--r--src/backend/access/gin/ginutil.c203
-rw-r--r--src/backend/access/gin/ginvacuum.c647
-rw-r--r--src/backend/access/gin/ginxlog.c544
-rw-r--r--src/backend/access/transam/rmgr.c5
-rw-r--r--src/backend/commands/cluster.c9
-rw-r--r--src/backend/commands/opclasscmds.c6
-rw-r--r--src/backend/commands/vacuum.c20
-rw-r--r--src/backend/utils/adt/selfuncs.c20
-rw-r--r--src/backend/utils/init/globals.c4
-rw-r--r--src/backend/utils/misc/guc.c14
21 files changed, 4601 insertions, 13 deletions
diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile
index 82cd7a90ed5..8807fae4f14 100644
--- a/src/backend/access/Makefile
+++ b/src/backend/access/Makefile
@@ -1,14 +1,14 @@
#
# Makefile for the access methods module
#
-# $PostgreSQL: pgsql/src/backend/access/Makefile,v 1.10 2005/11/07 17:36:44 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/access/Makefile,v 1.11 2006/05/02 11:28:54 teodor Exp $
#
subdir = src/backend/access
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS := common gist hash heap index nbtree transam
+SUBDIRS := common gist hash heap index nbtree transam gin
SUBDIROBJS := $(SUBDIRS:%=%/SUBSYS.o)
all: SUBSYS.o
diff --git a/src/backend/access/gin/Makefile b/src/backend/access/gin/Makefile
new file mode 100644
index 00000000000..ac2271e3f55
--- /dev/null
+++ b/src/backend/access/gin/Makefile
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/gin
+#
+# IDENTIFICATION
+# $PostgreSQL: pgsql/src/backend/access/gin/Makefile,v 1.1 2006/05/02 11:28:54 teodor Exp $
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/gin
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = ginutil.o gininsert.o ginxlog.o ginentrypage.o gindatapage.o \
+ ginbtree.o ginscan.o ginget.o ginvacuum.o ginarrayproc.o \
+ ginbulk.o
+
+all: SUBSYS.o
+
+SUBSYS.o: $(OBJS)
+ $(LD) $(LDREL) $(LDOUT) SUBSYS.o $(OBJS)
+
+depend dep:
+ $(CC) -MM $(CFLAGS) *.c >depend
+
+clean:
+ rm -f SUBSYS.o $(OBJS)
+
+ifeq (depend,$(wildcard depend))
+include depend
+endif
diff --git a/src/backend/access/gin/README b/src/backend/access/gin/README
new file mode 100644
index 00000000000..2827f672689
--- /dev/null
+++ b/src/backend/access/gin/README
@@ -0,0 +1,153 @@
+Gin for PostgreSQL
+==================
+
+Gin was sponsored by jfg://networks (http://www.jfg-networks.com/)
+
+Gin stands for Generalized Inverted Index and should be considered as a genie,
+not a drink.
+
+Generalized means that the index does not know which operation it accelerates.
+It instead works with custom strategies, defined for specific data types (read
+"Index Method Strategies" in the PostgreSQL documentation). In that sense, Gin
+is similar to GiST and differs from btree indices, which have predefined,
+comparison-based operations.
+
+An inverted index is an index structure storing a set of (key, posting list)
+pairs, where 'posting list' is a set of documents in which the key occurs.
+(A text document would usually contain many keys.) The primary goal of
+Gin indices is support for highly scalable, full-text search in PostgreSQL.
+
+Gin consists of a B-tree index constructed over entries (ET, entries tree),
+where each entry is an element of the indexed value (element of array, lexeme
+for tsvector) and where each tuple in a leaf page is either a pointer to a
+B-tree over item pointers (PT, posting tree), or a list of item pointers
+(PL, posting list) if the tuple is small enough.
+
+Note: There is no delete operation for ET. The reason for this is that from
+our experience, a set of unique words over a large collection change very
+rarely. This greatly simplifies the code and concurrency algorithms.
+
+Gin comes with built-in support for one-dimensional arrays (eg. integer[],
+text[]), but no support for NULL elements. The following operations are
+available:
+
+ * contains: value_array @ query_array
+ * overlap: value_array && query_array
+ * contained: value_array ~ query_array
+
+Synopsis
+--------
+
+=# create index txt_idx on aa using gin(a);
+
+Features
+--------
+
+ * Concurrency
+ * Write-Ahead Logging (WAL). (Recoverability from crashes.)
+ * User-defined opclasses. (The scheme is similar to GiST.)
+ * Optimized index creation (Makes use of maintenance_work_mem to accumulate
+ postings in memory.)
+ * Tsearch2 support via an opclass
+ * Soft upper limit on the returned results set using a GUC variable:
+ gin_fuzzy_search_limit
+
+Gin Fuzzy Limit
+---------------
+
+There are often situations when a full-text search returns a very large set of
+results. Since reading tuples from the disk and sorting them could take a
+lot of time, this is unacceptable for production. (Note that the search
+itself is very fast.)
+
+Such queries usually contain very frequent lexemes, so the results are not
+very helpful. To facilitate execution of such queries Gin has a configurable
+soft upper limit of the size of the returned set, determined by the
+'gin_fuzzy_search_limit' GUC variable. This is set to 0 by default (no
+limit).
+
+If a non-zero search limit is set, then the returned set is a subset of the
+whole result set, chosen at random.
+
+"Soft" means that the actual number of returned results could slightly differ
+from the specified limit, depending on the query and the quality of the
+system's random number generator.
+
+From experience, a value of 'gin_fuzzy_search_limit' in the thousands
+(eg. 5000-20000) works well. This means that 'gin_fuzzy_search_limit' will
+have no effect for queries returning a result set with less tuples than this
+number.
+
+Limitations
+-----------
+
+ * No support for multicolumn indices
+ * Gin doesn't uses scan->kill_prior_tuple & scan->ignore_killed_tuples
+ * Gin searches entries only by equality matching. This may be improved in
+ future.
+ * Gin doesn't support full scans of indices.
+ * Gin doesn't index NULL values.
+
+Gin Interface
+-------------
+
+Opclass interface pseudocode. An example for a Gin opclass can be found in
+ginarayproc.c.
+
+Datum* extractValue(Datum inputValue, uint32* nentries)
+
+ Returns an array of Datum of entries of the value to be indexed. nentries
+ should contain the number of returned entries.
+
+int compareEntry(Datum a, Datum b)
+
+ Compares two entries (not the indexing values)
+
+Datum* extractQuery(Datum query, uint32* nentries, StrategyNumber n)
+
+ Returns an array of Datum of entries of the query to be executed.
+ n contains the strategy number of the operation.
+
+bool consistent(bool[] check, StrategyNumber n, Datum query)
+
+ The size of the check array is the same as sizeof of the array returned by
+ extractQuery. Each element of the check array is true if the indexed value
+ has a corresponding entry in the query. i.e. if (check[i] == TRUE) then
+ the i-th entry of the query is present in the indexed value. The Function
+ should return true if the indexed value matches by StrategyNumber and
+ the query.
+
+Open Items
+----------
+
+We appreciate any comments, help and suggestions.
+
+ * Teach optimizer/executor that GIN is intrinsically clustered. i.e., it
+ always returns ItemPointer in ascending order.
+ * Tweak gincostestimate.
+ * GIN stores several ItemPointer to heap tuple, so VACUUM FULL produces
+ this warning message:
+
+ WARNING: index "idx" contains 88395 row versions, but table contains
+ 51812 row versions
+ HINT: Rebuild the index with REINDEX.
+ **** Workaround added
+
+TODO
+----
+
+Nearest future:
+
+ * Opclasses for all types (no programming, just many catalog changes).
+
+Distant future:
+
+ * Replace B-tree of entries to something like GiST
+ * Add multicolumn support
+ * Optimize insert operations (background index insertion)
+
+Authors
+-------
+
+All work was done by Teodor Sigaev (teodor@sigaev.ru) and Oleg Bartunov
+(oleg@sai.msu.su).
diff --git a/src/backend/access/gin/ginarrayproc.c b/src/backend/access/gin/ginarrayproc.c
new file mode 100644
index 00000000000..0fe213d2a8f
--- /dev/null
+++ b/src/backend/access/gin/ginarrayproc.c
@@ -0,0 +1,261 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginvacuum.c
+ * support function for GIN's indexing of any array
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginarrayproc.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+#include "utils/builtins.h"
+#include "access/gin.h"
+
+#define GinOverlapStrategy 1
+#define GinContainsStrategy 2
+#define GinContainedStrategy 3
+
+#define ARRAYCHECK(x) do { \
+ if ( ARR_HASNULL(x) ) \
+ ereport(ERROR, \
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \
+ errmsg("array must not contain nulls"))); \
+ \
+ if ( ARR_NDIM(x) != 1 && ARR_NDIM(x) != 0 ) \
+ ereport(ERROR, \
+ (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), \
+ errmsg("array must be one-dimensional"))); \
+} while(0)
+
+/*
+ * Function used as extractValue and extractQuery both
+ */
+Datum
+ginarrayextract(PG_FUNCTION_ARGS) {
+ ArrayType *array;
+ uint32 *nentries = (uint32*)PG_GETARG_POINTER(1);
+ Datum *entries = NULL;
+ int16 elmlen;
+ bool elmbyval;
+ char elmalign;
+
+ /* we should guarantee that array will not be destroyed during all operation */
+ array = PG_GETARG_ARRAYTYPE_P_COPY(0);
+
+ ARRAYCHECK(array);
+
+ get_typlenbyvalalign(ARR_ELEMTYPE(array),
+ &elmlen, &elmbyval, &elmalign);
+
+ deconstruct_array(array,
+ ARR_ELEMTYPE(array),
+ elmlen, elmbyval, elmalign,
+ &entries, NULL, (int*)nentries);
+
+ /* we should not free array, entries[i] points into it */
+ PG_RETURN_POINTER(entries);
+}
+
+Datum
+ginarrayconsistent(PG_FUNCTION_ARGS) {
+ bool *check = (bool*)PG_GETARG_POINTER(0);
+ StrategyNumber strategy = PG_GETARG_UINT16(1);
+ ArrayType *query = PG_GETARG_ARRAYTYPE_P(2);
+ int res=FALSE, i, nentries=ArrayGetNItems(ARR_NDIM(query), ARR_DIMS(query));
+
+ /* we can do not check array carefully, it's done by previous ginarrayextract call */
+
+ switch( strategy ) {
+ case GinOverlapStrategy:
+ case GinContainedStrategy:
+ /* at least one element in check[] is true, so result = true */
+ res = TRUE;
+ break;
+ case GinContainsStrategy:
+ res = TRUE;
+ for(i=0;i<nentries;i++)
+ if ( !check[i] ) {
+ res = FALSE;
+ break;
+ }
+ break;
+ default:
+ elog(ERROR, "ginarrayconsistent: unknown strategy number: %d", strategy);
+ }
+
+ PG_RETURN_BOOL(res);
+}
+
+static TypeCacheEntry*
+fillTypeCacheEntry( TypeCacheEntry *typentry, Oid element_type ) {
+ if ( typentry && typentry->type_id == element_type )
+ return typentry;
+
+ typentry = lookup_type_cache(element_type, TYPECACHE_EQ_OPR_FINFO);
+ if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an equality operator for type %s", format_type_be(element_type))));
+
+ return typentry;
+}
+
+static bool
+typeEQ(FunctionCallInfoData *locfcinfo, Datum a, Datum b) {
+ locfcinfo->arg[0] = a;
+ locfcinfo->arg[1] = b;
+ locfcinfo->argnull[0] = false;
+ locfcinfo->argnull[1] = false;
+ locfcinfo->isnull = false;
+
+ return DatumGetBool(FunctionCallInvoke(locfcinfo));
+}
+
+static bool
+ginArrayOverlap(TypeCacheEntry *typentry, ArrayType *a, ArrayType *b) {
+ Datum *da, *db;
+ int na, nb, j, i;
+ FunctionCallInfoData locfcinfo;
+
+ if ( ARR_ELEMTYPE(a) != ARR_ELEMTYPE(b) )
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("cannot compare arrays of different element types")));
+
+ ARRAYCHECK(a);
+ ARRAYCHECK(b);
+
+ deconstruct_array(a,
+ ARR_ELEMTYPE(a),
+ typentry->typlen, typentry->typbyval, typentry->typalign,
+ &da, NULL, &na);
+ deconstruct_array(b,
+ ARR_ELEMTYPE(b),
+ typentry->typlen, typentry->typbyval, typentry->typalign,
+ &db, NULL, &nb);
+
+ InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
+ NULL, NULL);
+
+ for(i=0;i<na;i++) {
+ for(j=0;j<nb;j++) {
+ if ( typeEQ(&locfcinfo, da[i], db[j]) ) {
+ pfree( da );
+ pfree( db );
+ return TRUE;
+ }
+ }
+ }
+
+ pfree( da );
+ pfree( db );
+
+ return FALSE;
+}
+
+static bool
+ginArrayContains(TypeCacheEntry *typentry, ArrayType *a, ArrayType *b) {
+ Datum *da, *db;
+ int na, nb, j, i, n = 0;
+ FunctionCallInfoData locfcinfo;
+
+ if ( ARR_ELEMTYPE(a) != ARR_ELEMTYPE(b) )
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("cannot compare arrays of different element types")));
+
+ ARRAYCHECK(a);
+ ARRAYCHECK(b);
+
+ deconstruct_array(a,
+ ARR_ELEMTYPE(a),
+ typentry->typlen, typentry->typbyval, typentry->typalign,
+ &da, NULL, &na);
+ deconstruct_array(b,
+ ARR_ELEMTYPE(b),
+ typentry->typlen, typentry->typbyval, typentry->typalign,
+ &db, NULL, &nb);
+
+ InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
+ NULL, NULL);
+
+ for(i=0;i<nb;i++) {
+ for(j=0;j<na;j++) {
+ if ( typeEQ(&locfcinfo, db[i], da[j]) ) {
+ n++;
+ break;
+ }
+ }
+ }
+
+ pfree( da );
+ pfree( db );
+
+ return ( n==nb ) ? TRUE : FALSE;
+}
+
+Datum
+arrayoverlap(PG_FUNCTION_ARGS) {
+ ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
+ ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
+ TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) );
+ bool res;
+
+ fcinfo->flinfo->fn_extra = (void*)typentry;
+
+ res = ginArrayOverlap( typentry, a, b );
+
+ PG_FREE_IF_COPY(a,0);
+ PG_FREE_IF_COPY(b,1);
+
+ PG_RETURN_BOOL(res);
+}
+
+Datum
+arraycontains(PG_FUNCTION_ARGS) {
+ ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
+ ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
+ TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) );
+ bool res;
+
+ fcinfo->flinfo->fn_extra = (void*)typentry;
+
+ res = ginArrayContains( typentry, a, b );
+
+ PG_FREE_IF_COPY(a,0);
+ PG_FREE_IF_COPY(b,1);
+
+ PG_RETURN_BOOL(res);
+}
+
+Datum
+arraycontained(PG_FUNCTION_ARGS) {
+ ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
+ ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
+ TypeCacheEntry *typentry = fillTypeCacheEntry( fcinfo->flinfo->fn_extra, ARR_ELEMTYPE(a) );
+ bool res;
+
+ fcinfo->flinfo->fn_extra = (void*)typentry;
+
+ res = ginArrayContains( typentry, b, a );
+
+ PG_FREE_IF_COPY(a,0);
+ PG_FREE_IF_COPY(b,1);
+
+ PG_RETURN_BOOL(res);
+}
+
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
new file mode 100644
index 00000000000..821822c8a94
--- /dev/null
+++ b/src/backend/access/gin/ginbtree.c
@@ -0,0 +1,394 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginbtree.c
+ * page utilities routines for the postgres inverted index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginbtree.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+
+/*
+ * Locks buffer by needed method for search.
+ */
+static int
+ginTraverseLock(Buffer buffer, bool searchMode) {
+ Page page;
+ int access=GIN_SHARE;
+
+ LockBuffer(buffer, GIN_SHARE);
+ page = BufferGetPage( buffer );
+ if ( GinPageIsLeaf(page) ) {
+ if ( searchMode == FALSE ) {
+ /* we should relock our page */
+ LockBuffer(buffer, GIN_UNLOCK);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+
+ /* But root can become non-leaf during relock */
+ if ( !GinPageIsLeaf(page) ) {
+ /* resore old lock type (very rare) */
+ LockBuffer(buffer, GIN_UNLOCK);
+ LockBuffer(buffer, GIN_SHARE);
+ } else
+ access = GIN_EXCLUSIVE;
+ }
+ }
+
+ return access;
+}
+
+GinBtreeStack*
+ginPrepareFindLeafPage(GinBtree btree, BlockNumber blkno) {
+ GinBtreeStack *stack = (GinBtreeStack*)palloc(sizeof(GinBtreeStack));
+
+ stack->blkno = blkno;
+ stack->buffer = ReadBuffer(btree->index, stack->blkno);
+ stack->parent = NULL;
+ stack->predictNumber = 1;
+
+ ginTraverseLock(stack->buffer, btree->searchMode);
+
+ return stack;
+}
+
+/*
+ * Locates leaf page contained tuple
+ */
+GinBtreeStack*
+ginFindLeafPage(GinBtree btree, GinBtreeStack *stack) {
+ bool isfirst=TRUE;
+ BlockNumber rootBlkno;
+
+ if ( !stack )
+ stack = ginPrepareFindLeafPage(btree, GIN_ROOT_BLKNO);
+ rootBlkno = stack->blkno;
+
+ for(;;) {
+ Page page;
+ BlockNumber child;
+ int access=GIN_SHARE;
+
+ stack->off = InvalidOffsetNumber;
+
+ page = BufferGetPage( stack->buffer );
+
+ if ( isfirst ) {
+ if ( GinPageIsLeaf(page) && !btree->searchMode )
+ access = GIN_EXCLUSIVE;
+ isfirst = FALSE;
+ } else
+ access = ginTraverseLock(stack->buffer, btree->searchMode);
+
+ /* ok, page is correctly locked, we should check to move right ..,
+ root never has a right link, so small optimization */
+ while( btree->fullScan==FALSE && stack->blkno != rootBlkno && btree->isMoveRight(btree, page) ) {
+ BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
+
+ if ( rightlink==InvalidBlockNumber )
+ /* rightmost page */
+ break;
+
+ stack->blkno = rightlink;
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
+ LockBuffer(stack->buffer, access);
+ page = BufferGetPage( stack->buffer );
+ }
+
+ if ( GinPageIsLeaf(page) ) /* we found, return locked page */
+ return stack;
+
+ /* now we have correct buffer, try to find child */
+ child = btree->findChildPage(btree, stack);
+
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ Assert( child != InvalidBlockNumber );
+ Assert( stack->blkno != child );
+
+ if ( btree->searchMode ) {
+ /* in search mode we may forget path to leaf */
+ stack->blkno = child;
+ stack->buffer = ReleaseAndReadBuffer( stack->buffer, btree->index, stack->blkno );
+ } else {
+ GinBtreeStack *ptr = (GinBtreeStack*)palloc(sizeof(GinBtreeStack));
+
+ ptr->parent = stack;
+ stack = ptr;
+ stack->blkno = child;
+ stack->buffer = ReadBuffer(btree->index, stack->blkno);
+ stack->predictNumber = 1;
+ }
+ }
+
+ /* keep compiler happy */
+ return NULL;
+}
+
+void
+freeGinBtreeStack( GinBtreeStack *stack ) {
+ while(stack) {
+ GinBtreeStack *tmp = stack->parent;
+ if ( stack->buffer != InvalidBuffer )
+ ReleaseBuffer(stack->buffer);
+
+ pfree( stack );
+ stack = tmp;
+ }
+}
+
+/*
+ * Try to find parent for current stack position, returns correct
+ * parent and child's offset in stack->parent.
+ * Function should never release root page to prevent conflicts
+ * with vacuum process
+ */
+void
+findParents( GinBtree btree, GinBtreeStack *stack,
+ BlockNumber rootBlkno) {
+
+ Page page;
+ Buffer buffer;
+ BlockNumber blkno, leftmostBlkno;
+ OffsetNumber offset;
+ GinBtreeStack *root = stack->parent;
+ GinBtreeStack *ptr;
+
+ if ( !root ) {
+ /* XLog mode... */
+ root = (GinBtreeStack*)palloc(sizeof(GinBtreeStack));
+ root->blkno = rootBlkno;
+ root->buffer = ReadBuffer(btree->index, rootBlkno);
+ LockBuffer(root->buffer, GIN_EXCLUSIVE);
+ root->parent = NULL;
+ } else {
+ /* find root, we should not release root page until update is finished!! */
+ while( root->parent ) {
+ ReleaseBuffer( root->buffer );
+ root = root->parent;
+ }
+
+ Assert( root->blkno == rootBlkno );
+ Assert( BufferGetBlockNumber(root->buffer) == rootBlkno );
+ LockBuffer(root->buffer, GIN_EXCLUSIVE);
+ }
+ root->off = InvalidOffsetNumber;
+
+ page = BufferGetPage(root->buffer);
+ Assert( !GinPageIsLeaf(page) );
+
+ /* check trivial case */
+ if ( (root->off != btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidBuffer ) {
+ stack->parent = root;
+ return;
+ }
+
+ leftmostBlkno = blkno = btree->getLeftMostPage(btree, page);
+ LockBuffer(root->buffer, GIN_UNLOCK );
+ Assert( blkno!=InvalidBlockNumber );
+
+
+ for(;;) {
+ buffer = ReadBuffer(btree->index, blkno);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(root->buffer);
+ if ( GinPageIsLeaf(page) )
+ elog(ERROR, "Lost path");
+
+ leftmostBlkno = btree->getLeftMostPage(btree, page);
+
+ while( (offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber))==InvalidOffsetNumber ) {
+ blkno = GinPageGetOpaque(page)->rightlink;
+ LockBuffer(buffer,GIN_UNLOCK);
+ ReleaseBuffer(buffer);
+ if ( blkno == InvalidBlockNumber )
+ break;
+ buffer = ReadBuffer(btree->index, blkno);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+ }
+
+ if ( blkno != InvalidBlockNumber ) {
+ ptr = (GinBtreeStack*)palloc(sizeof(GinBtreeStack));
+ ptr->blkno = blkno;
+ ptr->buffer = buffer;
+ ptr->parent = root; /* it's may be wrong, but in next call we will correct */
+ stack->parent = ptr;
+ return;
+ }
+
+ blkno = leftmostBlkno;
+ }
+}
+
+/*
+ * Insert value (stored in GinBtree) to tree descibed by stack
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack) {
+ GinBtreeStack *parent = stack;
+ BlockNumber rootBlkno = InvalidBuffer;
+ Page page, rpage, lpage;
+
+ /* remember root BlockNumber */
+ while( parent ) {
+ rootBlkno = parent->blkno;
+ parent = parent->parent;
+ }
+
+ while( stack ) {
+ XLogRecData *rdata;
+ BlockNumber savedRightLink;
+
+ page = BufferGetPage( stack->buffer );
+ savedRightLink = GinPageGetOpaque(page)->rightlink;
+
+ if ( btree->isEnoughSpace( btree, stack->buffer, stack->off ) ) {
+ START_CRIT_SECTION();
+ btree->placeToPage( btree, stack->buffer, stack->off, &rdata );
+
+ if (!btree->index->rd_istemp) {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ MarkBufferDirty( stack->buffer );
+ UnlockReleaseBuffer(stack->buffer);
+ END_CRIT_SECTION();
+
+ freeGinBtreeStack(stack->parent);
+ return;
+ } else {
+ Buffer rbuffer = GinNewBuffer(btree->index);
+ Page newlpage;
+
+ /* newlpage is a pointer to memory page, it does'nt assosiates with buffer,
+ stack->buffer shoud be untouched */
+ newlpage = btree->splitPage( btree, stack->buffer, rbuffer, stack->off, &rdata );
+
+
+ ((ginxlogSplit*)(rdata->data))->rootBlkno = rootBlkno;
+
+ parent = stack->parent;
+
+ if ( parent == NULL ) {
+ /* split root, so we need to allocate new left page and
+ place pointer on root to left and right page */
+ Buffer lbuffer = GinNewBuffer(btree->index);
+
+ ((ginxlogSplit*)(rdata->data))->isRootSplit = TRUE;
+ ((ginxlogSplit*)(rdata->data))->rrlink = InvalidBlockNumber;
+
+
+ page = BufferGetPage( stack->buffer );
+ lpage = BufferGetPage( lbuffer );
+ rpage = BufferGetPage( rbuffer );
+
+ GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+ GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+ ((ginxlogSplit*)(rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
+
+ START_CRIT_SECTION();
+
+ GinInitBuffer( stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF );
+ PageRestoreTempPage( newlpage, lpage );
+ btree->fillRoot( btree, stack->buffer, lbuffer, rbuffer );
+ if (!btree->index->rd_istemp) {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ PageSetLSN(lpage, recptr);
+ PageSetTLI(lpage, ThisTimeLineID);
+ PageSetLSN(rpage, recptr);
+ PageSetTLI(rpage, ThisTimeLineID);
+ }
+
+ MarkBufferDirty(rbuffer);
+ UnlockReleaseBuffer(rbuffer);
+ MarkBufferDirty(lbuffer);
+ UnlockReleaseBuffer(lbuffer);
+ MarkBufferDirty(stack->buffer);
+ UnlockReleaseBuffer(stack->buffer);
+
+ END_CRIT_SECTION();
+
+ return;
+ } else {
+ /* split non-root page */
+ ((ginxlogSplit*)(rdata->data))->isRootSplit = FALSE;
+ ((ginxlogSplit*)(rdata->data))->rrlink = savedRightLink;
+
+ lpage = BufferGetPage( stack->buffer );
+ rpage = BufferGetPage( rbuffer );
+
+ GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+ GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+
+ START_CRIT_SECTION();
+ PageRestoreTempPage( newlpage, lpage );
+ if (!btree->index->rd_istemp) {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ PageSetLSN(lpage, recptr);
+ PageSetTLI(lpage, ThisTimeLineID);
+ PageSetLSN(rpage, recptr);
+ PageSetTLI(rpage, ThisTimeLineID);
+ }
+ MarkBufferDirty(rbuffer);
+ UnlockReleaseBuffer(rbuffer);
+ MarkBufferDirty( stack->buffer );
+ END_CRIT_SECTION();
+ }
+ }
+
+ btree->isDelete = FALSE;
+
+ /* search parent to lock */
+ LockBuffer(parent->buffer, GIN_EXCLUSIVE);
+
+ /* move right if it's needed */
+ page = BufferGetPage( parent->buffer );
+ while( (parent->off=btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber ) {
+ BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
+
+ LockBuffer(parent->buffer, GIN_UNLOCK);
+
+ if ( rightlink==InvalidBlockNumber ) {
+ /* rightmost page, but we don't find parent, we should
+ use plain search... */
+ findParents(btree, stack, rootBlkno);
+ parent=stack->parent;
+ page = BufferGetPage( parent->buffer );
+ break;
+ }
+
+ parent->blkno = rightlink;
+ parent->buffer = ReleaseAndReadBuffer(parent->buffer, btree->index, parent->blkno);
+ LockBuffer(parent->buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage( parent->buffer );
+ }
+
+ UnlockReleaseBuffer(stack->buffer);
+ pfree( stack );
+ stack = parent;
+ }
+}
+
+
diff --git a/src/backend/access/gin/ginbulk.c b/src/backend/access/gin/ginbulk.c
new file mode 100644
index 00000000000..6d73f070ba2
--- /dev/null
+++ b/src/backend/access/gin/ginbulk.c
@@ -0,0 +1,155 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginbulk.c
+ * routines for fast build of inverted index
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginbulk.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+#include "access/tuptoaster.h"
+
+#define DEF_NENTRY 128
+#define DEF_NPTR 4
+
+void
+ginInitBA(BuildAccumulator *accum) {
+
+ accum->number = 0;
+ accum->curget = 0;
+ accum->length = DEF_NENTRY;
+ accum->entries = (EntryAccumulator*)palloc0( sizeof(EntryAccumulator) * DEF_NENTRY );
+ accum->allocatedMemory = sizeof(EntryAccumulator) * DEF_NENTRY;
+}
+
+/*
+ * Stores heap item pointer. For robust, it checks that
+ * item pointer are ordered
+ */
+static void
+ginInsertData(BuildAccumulator *accum, EntryAccumulator *entry, ItemPointer heapptr) {
+ if ( entry->number >= entry->length ) {
+ accum->allocatedMemory += sizeof(ItemPointerData) * entry->length;
+ entry->length *= 2;
+ entry->list = (ItemPointerData*)repalloc(entry->list,
+ sizeof(ItemPointerData)*entry->length);
+ }
+
+ if ( entry->shouldSort==FALSE ) {
+ int res = compareItemPointers( entry->list + entry->number - 1, heapptr );
+
+ Assert( res != 0 );
+
+ if ( res > 0 )
+ entry->shouldSort=TRUE;
+ }
+
+ entry->list[ entry->number ] = *heapptr;
+ entry->number++;
+}
+
+/*
+ * Find/store one entry from indexed value.
+ * It supposes, that entry should be located between low and end of array of
+ * entries. Returns position of found/inserted entry
+ */
+static uint32
+ginInsertEntry(BuildAccumulator *accum, ItemPointer heapptr, Datum entry, uint32 low) {
+ uint32 high = accum->number, mid;
+ int res;
+
+ while(high>low) {
+ mid = low + ((high - low) / 2);
+
+ res = compareEntries(accum->ginstate, entry, accum->entries[mid].value);
+
+ if ( res == 0 ) {
+ ginInsertData( accum, accum->entries+mid, heapptr );
+ return mid;
+ } else if ( res > 0 )
+ low = mid + 1;
+ else
+ high = mid;
+ }
+
+ /* did not find an entry, insert */
+ if ( accum->number >= accum->length ) {
+ accum->allocatedMemory += sizeof(EntryAccumulator) * accum->length;
+ accum->length *= 2;
+ accum->entries = (EntryAccumulator*)repalloc( accum->entries,
+ sizeof(EntryAccumulator) * accum->length );
+ }
+
+ if ( high != accum->number )
+ memmove( accum->entries+high+1, accum->entries+high, sizeof(EntryAccumulator) * (accum->number-high) );
+
+ accum->entries[high].value = entry;
+ accum->entries[high].length = DEF_NPTR;
+ accum->entries[high].number = 1;
+ accum->entries[high].shouldSort = FALSE;
+ accum->entries[high].list = (ItemPointerData*)palloc(sizeof(ItemPointerData)*DEF_NPTR);
+ accum->entries[high].list[0] = *heapptr;
+
+ accum->allocatedMemory += sizeof(ItemPointerData)*DEF_NPTR;
+ accum->number++;
+
+ return high;
+}
+
+
+/*
+ * Insert one heap pointer. Requires entries to be sorted!
+ */
+void
+ginInsertRecordBA( BuildAccumulator *accum, ItemPointer heapptr, Datum *entries, uint32 nentry ) {
+ uint32 start=0,i;
+
+ for(i=0;i<nentry;i++)
+ start = ginInsertEntry( accum, heapptr, entries[i], start);
+}
+
+static int
+qsortCompareItemPointers( const void *a, const void *b ) {
+ int res = compareItemPointers( (ItemPointer)a, (ItemPointer)b );
+ Assert( res!=0 );
+ return res;
+}
+
+ItemPointerData*
+ginGetEntry(BuildAccumulator *accum, Datum *value, uint32 *n) {
+ EntryAccumulator *entry;
+
+ ItemPointerData *list;
+ if ( accum->curget >= accum->number )
+ return NULL;
+ else if ( accum->curget > 0 )
+ pfree( accum->entries[ accum->curget-1 ].list );
+
+ entry = accum->entries + accum->curget;
+ *n = entry->number;
+ *value = entry->value;
+ list = entry->list;
+ accum->curget++;
+
+ if ( entry->shouldSort && entry->number > 1 )
+ qsort(list, *n, sizeof(ItemPointerData), qsortCompareItemPointers);
+
+
+ return list;
+}
+
+
+
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
new file mode 100644
index 00000000000..1069b19b92c
--- /dev/null
+++ b/src/backend/access/gin/gindatapage.c
@@ -0,0 +1,569 @@
+/*-------------------------------------------------------------------------
+ *
+ * gindatapage.c
+ * page utilities routines for the postgres inverted index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/gindatapage.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+
+int
+compareItemPointers( ItemPointer a, ItemPointer b ) {
+ if ( GinItemPointerGetBlockNumber(a) == GinItemPointerGetBlockNumber(b) ) {
+ if ( GinItemPointerGetOffsetNumber(a) == GinItemPointerGetOffsetNumber(b) )
+ return 0;
+ return ( GinItemPointerGetOffsetNumber(a) > GinItemPointerGetOffsetNumber(b) ) ? 1 : -1;
+ }
+
+ return ( GinItemPointerGetBlockNumber(a) > GinItemPointerGetBlockNumber(b) ) ? 1 : -1;
+}
+
+/*
+ * Merge two ordered array of itempointer
+ */
+void
+MergeItemPointers(ItemPointerData *dst, ItemPointerData *a, uint32 na, ItemPointerData *b, uint32 nb) {
+ ItemPointerData *dptr = dst;
+ ItemPointerData *aptr = a, *bptr = b;
+
+ while( aptr - a < na && bptr - b < nb ) {
+ if ( compareItemPointers(aptr, bptr) > 0 )
+ *dptr++ = *bptr++;
+ else
+ *dptr++ = *aptr++;
+ }
+
+ while( aptr - a < na )
+ *dptr++ = *aptr++;
+
+ while( bptr - b < nb )
+ *dptr++ = *bptr++;
+}
+
+/*
+ * Checks, should we move to right link...
+ * Compares inserting itemp pointer with right bound of current page
+ */
+static bool
+dataIsMoveRight(GinBtree btree, Page page) {
+ ItemPointer iptr = GinDataPageGetRightBound(page);
+
+ if ( GinPageRightMost(page) )
+ return FALSE;
+
+ return ( compareItemPointers( btree->items + btree->curitem, iptr ) > 0 ) ? TRUE : FALSE;
+}
+
+/*
+ * Find correct PostingItem in non-leaf page. It supposed that
+ * page correctly choosen and searching value SHOULD be on page
+ */
+static BlockNumber
+dataLocateItem(GinBtree btree, GinBtreeStack *stack) {
+ OffsetNumber low, high, maxoff;
+ PostingItem *pitem=NULL;
+ int result;
+ Page page = BufferGetPage( stack->buffer );
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( GinPageIsData(page) );
+
+ if ( btree->fullScan ) {
+ stack->off = FirstOffsetNumber;
+ stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
+ return btree->getLeftMostPage(btree, page);
+ }
+
+ low = FirstOffsetNumber;
+ maxoff = high = GinPageGetOpaque(page)->maxoff;
+ Assert( high >= low );
+
+ high++;
+
+ while (high > low) {
+ OffsetNumber mid = low + ((high - low) / 2);
+ pitem = (PostingItem*)GinDataPageGetItem(page,mid);
+
+ if ( mid == maxoff )
+ /* Right infinity, page already correctly choosen
+ with a help of dataIsMoveRight */
+ result = -1;
+ else {
+ pitem = (PostingItem*)GinDataPageGetItem(page,mid);
+ result = compareItemPointers( btree->items + btree->curitem, &( pitem->key ) );
+ }
+
+ if ( result == 0 ) {
+ stack->off = mid;
+ return PostingItemGetBlockNumber(pitem);
+ } else if ( result > 0 )
+ low = mid + 1;
+ else
+ high = mid;
+ }
+
+ Assert( high>=FirstOffsetNumber && high <= maxoff );
+
+ stack->off = high;
+ pitem = (PostingItem*)GinDataPageGetItem(page,high);
+ return PostingItemGetBlockNumber(pitem);
+}
+
+/*
+ * Searches correct position for value on leaf page.
+ * Page should be corrrectly choosen.
+ * Returns true if value found on page.
+ */
+static bool
+dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack) {
+ Page page = BufferGetPage( stack->buffer );
+ OffsetNumber low, high;
+ int result;
+
+ Assert( GinPageIsLeaf(page) );
+ Assert( GinPageIsData(page) );
+
+ if ( btree->fullScan ) {
+ stack->off = FirstOffsetNumber;
+ return TRUE;
+ }
+
+ low=FirstOffsetNumber;
+ high = GinPageGetOpaque(page)->maxoff;
+
+ if ( high < low ) {
+ stack->off = FirstOffsetNumber;
+ return false;
+ }
+
+ high++;
+
+ while (high > low) {
+ OffsetNumber mid = low + ((high - low) / 2);
+
+ result = compareItemPointers( btree->items + btree->curitem, (ItemPointer)GinDataPageGetItem(page,mid) );
+
+ if ( result == 0 ) {
+ stack->off = mid;
+ return true;
+ } else if ( result > 0 )
+ low = mid + 1;
+ else
+ high = mid;
+ }
+
+ stack->off = high;
+ return false;
+}
+
+/*
+ * Finds links to blkno on non-leaf page, retuns
+ * offset of PostingItem
+ */
+static OffsetNumber
+dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff) {
+ OffsetNumber i, maxoff = GinPageGetOpaque(page)->maxoff;
+ PostingItem *pitem;
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( GinPageIsData(page) );
+
+ /* if page isn't changed, we returns storedOff */
+ if ( storedOff>= FirstOffsetNumber && storedOff<=maxoff) {
+ pitem = (PostingItem*)GinDataPageGetItem(page, storedOff);
+ if ( PostingItemGetBlockNumber(pitem) == blkno )
+ return storedOff;
+
+ /* we hope, that needed pointer goes to right. It's true
+ if there wasn't a deletion */
+ for( i=storedOff+1 ; i <= maxoff ; i++ ) {
+ pitem = (PostingItem*)GinDataPageGetItem(page, i);
+ if ( PostingItemGetBlockNumber(pitem) == blkno )
+ return i;
+ }
+
+ maxoff = storedOff-1;
+ }
+
+ /* last chance */
+ for( i=FirstOffsetNumber; i <= maxoff ; i++ ) {
+ pitem = (PostingItem*)GinDataPageGetItem(page, i);
+ if ( PostingItemGetBlockNumber(pitem) == blkno )
+ return i;
+ }
+
+ return InvalidOffsetNumber;
+}
+
+/*
+ * retunrs blkno of lefmost child
+ */
+static BlockNumber
+dataGetLeftMostPage(GinBtree btree, Page page) {
+ PostingItem *pitem;
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( GinPageIsData(page) );
+ Assert( GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber );
+
+ pitem = (PostingItem*)GinDataPageGetItem(page, FirstOffsetNumber);
+ return PostingItemGetBlockNumber(pitem);
+}
+
+/*
+ * add ItemPointer or PostingItem to page. data should points to
+ * correct value! depending on leaf or non-leaf page
+ */
+void
+GinDataPageAddItem( Page page, void *data, OffsetNumber offset ) {
+ OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
+ char *ptr;
+
+ if ( offset == InvalidOffsetNumber ) {
+ ptr = GinDataPageGetItem(page,maxoff+1);
+ } else {
+ ptr = GinDataPageGetItem(page,offset);
+ if ( maxoff+1-offset != 0 )
+ memmove( ptr+GinSizeOfItem(page), ptr, (maxoff-offset+1) * GinSizeOfItem(page) );
+ }
+ memcpy( ptr, data, GinSizeOfItem(page) );
+
+ GinPageGetOpaque(page)->maxoff++;
+}
+
+/*
+ * Deletes posting item from non-leaf page
+ */
+void
+PageDeletePostingItem(Page page, OffsetNumber offset) {
+ OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( offset>=FirstOffsetNumber && offset <= maxoff );
+
+ if ( offset != maxoff )
+ memmove( GinDataPageGetItem(page,offset), GinDataPageGetItem(page,offset+1),
+ sizeof(PostingItem) * (maxoff-offset) );
+
+ GinPageGetOpaque(page)->maxoff--;
+}
+
+/*
+ * checks space to install new value,
+ * item pointer never deletes!
+ */
+static bool
+dataIsEnoughSpace( GinBtree btree, Buffer buf, OffsetNumber off ) {
+ Page page = BufferGetPage(buf);
+
+ Assert( GinPageIsData(page) );
+ Assert( !btree->isDelete );
+
+ if ( GinPageIsLeaf(page) ) {
+ if ( GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff ) {
+ if ( (btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page) )
+ return true;
+ } else if ( sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page) )
+ return true;
+ } else if ( sizeof(PostingItem) <= GinDataPageGetFreeSpace(page) )
+ return true;
+
+ return false;
+}
+
+/*
+ * In case of previous split update old child blkno to
+ * new right page
+ * item pointer never deletes!
+ */
+static BlockNumber
+dataPrepareData( GinBtree btree, Page page, OffsetNumber off) {
+ BlockNumber ret = InvalidBlockNumber;
+
+ Assert( GinPageIsData(page) );
+
+ if ( !GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber ) {
+ PostingItem *pitem = (PostingItem*)GinDataPageGetItem(page,off);
+ PostingItemSetBlockNumber( pitem, btree->rightblkno );
+ ret = btree->rightblkno;
+ }
+
+ btree->rightblkno = InvalidBlockNumber;
+
+ return ret;
+}
+
+/*
+ * Places keys to page and fills WAL record. In case leaf page and
+ * build mode puts all ItemPointers to page.
+ */
+static void
+dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata) {
+ Page page = BufferGetPage(buf);
+ static XLogRecData rdata[3];
+ int sizeofitem = GinSizeOfItem(page);
+ static ginxlogInsert data;
+
+ *prdata = rdata;
+ Assert( GinPageIsData(page) );
+
+ data.updateBlkno = dataPrepareData( btree, page, off );
+
+ data.node = btree->index->rd_node;
+ data.blkno = BufferGetBlockNumber( buf );
+ data.offset = off;
+ data.nitem = 1;
+ data.isDelete = FALSE;
+ data.isData = TRUE;
+ data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
+
+ rdata[0].buffer = buf;
+ rdata[0].buffer_std = FALSE;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) &data;
+ rdata[1].len = sizeof(ginxlogInsert);
+ rdata[1].next = &rdata[2];
+
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].data = (GinPageIsLeaf(page)) ? ((char*)(btree->items+btree->curitem)) : ((char*)&(btree->pitem));
+ rdata[2].len = sizeofitem;
+ rdata[2].next = NULL;
+
+ if ( GinPageIsLeaf(page) ) {
+ if ( GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff ) {
+ /* usually, create index... */
+ uint32 savedPos = btree->curitem;
+
+ while( btree->curitem < btree->nitem ) {
+ GinDataPageAddItem(page, btree->items+btree->curitem, off);
+ off++;
+ btree->curitem++;
+ }
+ data.nitem = btree->curitem-savedPos;
+ rdata[2].len = sizeofitem * data.nitem;
+ } else {
+ GinDataPageAddItem(page, btree->items+btree->curitem, off);
+ btree->curitem++;
+ }
+ } else
+ GinDataPageAddItem(page, &(btree->pitem), off);
+}
+
+/*
+ * split page and fills WAL record. original buffer(lbuf) leaves untouched,
+ * returns shadow page of lbuf filled new data. In leaf page and build mode puts all
+ * ItemPointers to pages. Also, in build mode splits data by way to full fulled
+ * left page
+ */
+static Page
+dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata) {
+ static ginxlogSplit data;
+ static XLogRecData rdata[4];
+ static char vector[2*BLCKSZ];
+ char *ptr;
+ OffsetNumber separator;
+ ItemPointer bound;
+ Page lpage = GinPageGetCopyPage( BufferGetPage( lbuf ) );
+ ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
+ int sizeofitem = GinSizeOfItem(lpage);
+ OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
+ Page rpage = BufferGetPage( rbuf );
+ Size pageSize = PageGetPageSize( lpage );
+ Size freeSpace;
+ uint32 nCopied = 1;
+
+ GinInitPage( rpage, GinPageGetOpaque(lpage)->flags, pageSize );
+ freeSpace = GinDataPageGetFreeSpace(rpage);
+
+ *prdata = rdata;
+ data.leftChildBlkno = ( GinPageIsLeaf(lpage) ) ?
+ InvalidOffsetNumber : PostingItemGetBlockNumber( &(btree->pitem) );
+ data.updateBlkno = dataPrepareData( btree, lpage, off );
+
+ memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
+ maxoff*sizeofitem);
+
+ if ( GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff ) {
+ nCopied = 0;
+ while( btree->curitem < btree->nitem && maxoff*sizeof(ItemPointerData) < 2*(freeSpace - sizeof(ItemPointerData)) ) {
+ memcpy( vector + maxoff*sizeof(ItemPointerData), btree->items+btree->curitem,
+ sizeof(ItemPointerData) );
+ maxoff++;
+ nCopied++;
+ btree->curitem++;
+ }
+ } else {
+ ptr = vector + (off-1)*sizeofitem;
+ if ( maxoff+1-off != 0 )
+ memmove( ptr+sizeofitem, ptr, (maxoff-off+1) * sizeofitem );
+ if ( GinPageIsLeaf(lpage) ) {
+ memcpy(ptr, btree->items+btree->curitem, sizeofitem );
+ btree->curitem++;
+ } else
+ memcpy(ptr, &(btree->pitem), sizeofitem );
+
+ maxoff++;
+ }
+
+ /* we suppose that during index creation table scaned from
+ begin to end, so ItemPointers are monotonically increased.. */
+ if ( btree->isBuild && GinPageRightMost(lpage) )
+ separator=freeSpace/sizeofitem;
+ else
+ separator=maxoff/2;
+
+ GinInitPage( rpage, GinPageGetOpaque(lpage)->flags, pageSize );
+ GinInitPage( lpage, GinPageGetOpaque(rpage)->flags, pageSize );
+
+ memcpy( GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem );
+ GinPageGetOpaque(lpage)->maxoff = separator;
+ memcpy( GinDataPageGetItem(rpage, FirstOffsetNumber),
+ vector + separator * sizeofitem, (maxoff-separator) * sizeofitem );
+ GinPageGetOpaque(rpage)->maxoff = maxoff-separator;
+
+ PostingItemSetBlockNumber( &(btree->pitem), BufferGetBlockNumber(lbuf) );
+ if ( GinPageIsLeaf(lpage) )
+ btree->pitem.key = *(ItemPointerData*)GinDataPageGetItem(lpage,
+ GinPageGetOpaque(lpage)->maxoff);
+ else
+ btree->pitem.key = ((PostingItem*)GinDataPageGetItem(lpage,
+ GinPageGetOpaque(lpage)->maxoff))->key;
+ btree->rightblkno = BufferGetBlockNumber(rbuf);
+
+ /* set up right bound for left page */
+ bound = GinDataPageGetRightBound(lpage);
+ *bound = btree->pitem.key;
+
+ /* set up right bound for right page */
+ bound = GinDataPageGetRightBound(rpage);
+ *bound = oldbound;
+
+ data.node = btree->index->rd_node;
+ data.rootBlkno = InvalidBlockNumber;
+ data.lblkno = BufferGetBlockNumber( lbuf );
+ data.rblkno = BufferGetBlockNumber( rbuf );
+ data.separator = separator;
+ data.nitem = maxoff;
+ data.isData = TRUE;
+ data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
+ data.isRootSplit = FALSE;
+ data.rightbound = oldbound;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogSplit);
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = vector;
+ rdata[1].len = MAXALIGN( maxoff * sizeofitem );
+ rdata[1].next = NULL;
+
+ return lpage;
+}
+
+/*
+ * Fills new root by right bound values from child.
+ * Also called from ginxlog, should not use btree
+ */
+void
+dataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf) {
+ Page page = BufferGetPage(root),
+ lpage = BufferGetPage(lbuf),
+ rpage = BufferGetPage(rbuf);
+ PostingItem li, ri;
+
+ li.key = *GinDataPageGetRightBound(lpage);
+ PostingItemSetBlockNumber( &li, BufferGetBlockNumber(lbuf) );
+ GinDataPageAddItem(page, &li, InvalidOffsetNumber );
+
+ ri.key = *GinDataPageGetRightBound(rpage);
+ PostingItemSetBlockNumber( &ri, BufferGetBlockNumber(rbuf) );
+ GinDataPageAddItem(page, &ri, InvalidOffsetNumber );
+}
+
+void
+prepareDataScan( GinBtree btree, Relation index) {
+ memset(btree, 0, sizeof(GinBtreeData));
+ btree->index = index;
+ btree->isMoveRight = dataIsMoveRight;
+ btree->findChildPage = dataLocateItem;
+ btree->findItem = dataLocateLeafItem;
+ btree->findChildPtr = dataFindChildPtr;
+ btree->getLeftMostPage = dataGetLeftMostPage;
+ btree->isEnoughSpace = dataIsEnoughSpace;
+ btree->placeToPage = dataPlaceToPage;
+ btree->splitPage = dataSplitPage;
+ btree->fillRoot = dataFillRoot;
+
+ btree->searchMode = FALSE;
+ btree->isDelete = FALSE;
+ btree->fullScan = FALSE;
+ btree->isBuild= FALSE;
+}
+
+GinPostingTreeScan*
+prepareScanPostingTree( Relation index, BlockNumber rootBlkno, bool searchMode) {
+ GinPostingTreeScan *gdi = (GinPostingTreeScan*)palloc0( sizeof(GinPostingTreeScan) );
+
+ prepareDataScan( &gdi->btree, index );
+
+ gdi->btree.searchMode = searchMode;
+ gdi->btree.fullScan = searchMode;
+
+ gdi->stack = ginPrepareFindLeafPage( &gdi->btree, rootBlkno );
+
+ return gdi;
+}
+
+/*
+ * Inserts array of item pointers, may execute several tree scan (very rare)
+ */
+void
+insertItemPointer(GinPostingTreeScan *gdi, ItemPointerData *items, uint32 nitem) {
+ BlockNumber rootBlkno = gdi->stack->blkno;
+
+ gdi->btree.items = items;
+ gdi->btree.nitem = nitem;
+ gdi->btree.curitem = 0;
+
+ while( gdi->btree.curitem < gdi->btree.nitem ) {
+ if (!gdi->stack)
+ gdi->stack = ginPrepareFindLeafPage( &gdi->btree, rootBlkno );
+
+ gdi->stack = ginFindLeafPage( &gdi->btree, gdi->stack );
+
+ if ( gdi->btree.findItem( &(gdi->btree), gdi->stack ) )
+ elog(ERROR,"Item pointer(%d:%d) is already exists",
+ ItemPointerGetBlockNumber(gdi->btree.items + gdi->btree.curitem),
+ ItemPointerGetOffsetNumber(gdi->btree.items + gdi->btree.curitem));
+
+ ginInsertValue(&(gdi->btree), gdi->stack);
+
+ gdi->stack=NULL;
+ }
+}
+
+Buffer
+scanBeginPostingTree( GinPostingTreeScan *gdi ) {
+ gdi->stack = ginFindLeafPage( &gdi->btree, gdi->stack );
+ return gdi->stack->buffer;
+}
+
diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c
new file mode 100644
index 00000000000..81c109d6c4d
--- /dev/null
+++ b/src/backend/access/gin/ginentrypage.c
@@ -0,0 +1,532 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginentrypage.c
+ * page utilities routines for the postgres inverted index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginentrypage.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "access/tuptoaster.h"
+
+/*
+ * forms tuple for entry tree. On leaf page, Index tuple has
+ * non-traditional layout. Tuple may contain posting list or
+ * root blocknumber of posting tree. Macros GinIsPostingTre: (itup) / GinSetPostingTree(itup, blkno)
+ * 1) Posting list
+ * - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usial
+ * - ItemPointerGetBlockNumber(&itup->t_tid) contains original
+ * size of tuple (without posting list).
+ * Macroses: GinGetOrigSizePosting(itup) / GinSetOrigSizePosting(itup,n)
+ * - ItemPointerGetOffsetNumber(&itup->t_tid) contains number
+ * of elements in posting list (number of heap itempointer)
+ * Macroses: GinGetNPosting(itup) / GinSetNPosting(itup,n)
+ * - After usial part of tuple there is a posting list
+ * Macros: GinGetPosting(itup)
+ * 2) Posting tree
+ * - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usial
+ * - ItemPointerGetBlockNumber(&itup->t_tid) contains block number of
+ * root of posting tree
+ * - ItemPointerGetOffsetNumber(&itup->t_tid) contains magick number GIN_TREE_POSTING
+ */
+IndexTuple
+GinFormTuple(GinState *ginstate, Datum key, ItemPointerData *ipd, uint32 nipd) {
+ bool isnull=FALSE;
+ IndexTuple itup;
+
+ itup = index_form_tuple(ginstate->tupdesc, &key, &isnull);
+
+ GinSetOrigSizePosting( itup, IndexTupleSize(itup) );
+
+ if ( nipd > 0 ) {
+ uint32 newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData)*nipd);
+
+ if ( newsize >= INDEX_SIZE_MASK )
+ return NULL;
+
+ if ( newsize > TOAST_INDEX_TARGET && nipd > 1 )
+ return NULL;
+
+ itup = repalloc( itup, newsize );
+
+ /* set new size */
+ itup->t_info &= ~INDEX_SIZE_MASK;
+ itup->t_info |= newsize;
+
+ if ( ipd )
+ memcpy( GinGetPosting(itup), ipd, sizeof(ItemPointerData)*nipd );
+ GinSetNPosting(itup, nipd);
+ } else {
+ GinSetNPosting(itup, 0);
+ }
+ return itup;
+}
+
+/*
+ * Entry tree is a "static", ie tuple never deletes from it,
+ * so we don't use right bound, we use rightest key instead.
+ */
+static IndexTuple
+getRightMostTuple(Page page) {
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
+ return (IndexTuple) PageGetItem(page, PageGetItemId(page, maxoff));
+}
+
+Datum
+ginGetHighKey(GinState *ginstate, Page page) {
+ IndexTuple itup;
+ bool isnull;
+
+ itup = getRightMostTuple(page);
+
+ return index_getattr(itup, FirstOffsetNumber, ginstate->tupdesc, &isnull);
+}
+
+static bool
+entryIsMoveRight(GinBtree btree, Page page) {
+ Datum highkey;
+
+ if ( GinPageRightMost(page) )
+ return FALSE;
+
+ highkey = ginGetHighKey(btree->ginstate, page);
+
+ if ( compareEntries(btree->ginstate, btree->entryValue, highkey) > 0 )
+ return TRUE;
+
+ return FALSE;
+}
+
+/*
+ * Find correct tuple in non-leaf page. It supposed that
+ * page correctly choosen and searching value SHOULD be on page
+ */
+static BlockNumber
+entryLocateEntry(GinBtree btree, GinBtreeStack *stack) {
+ OffsetNumber low, high, maxoff;
+ IndexTuple itup;
+ int result;
+ Page page = BufferGetPage( stack->buffer );
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( !GinPageIsData(page) );
+
+ if ( btree->fullScan ) {
+ stack->off = FirstOffsetNumber;
+ stack->predictNumber *= PageGetMaxOffsetNumber(page);
+ return btree->getLeftMostPage(btree, page);
+ }
+
+ low = FirstOffsetNumber;
+ maxoff = high = PageGetMaxOffsetNumber(page);
+ Assert( high >= low );
+
+ high++;
+
+ while (high > low) {
+ OffsetNumber mid = low + ((high - low) / 2);
+
+ if ( mid == maxoff && GinPageRightMost(page) )
+ /* Right infinity */
+ result = -1;
+ else {
+ bool isnull;
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
+ result = compareEntries(btree->ginstate, btree->entryValue,
+ index_getattr(itup, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull) );
+ }
+
+ if ( result == 0 ) {
+ stack->off = mid;
+ Assert( GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO );
+ return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+ } else if ( result > 0 )
+ low = mid + 1;
+ else
+ high = mid;
+ }
+
+ Assert( high>=FirstOffsetNumber && high <= maxoff );
+
+ stack->off = high;
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
+ Assert( GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO );
+ return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+}
+
+/*
+ * Searches correct position for value on leaf page.
+ * Page should be corrrectly choosen.
+ * Returns true if value found on page.
+ */
+static bool
+entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack) {
+ Page page = BufferGetPage( stack->buffer );
+ OffsetNumber low, high;
+ IndexTuple itup;
+
+ Assert( GinPageIsLeaf(page) );
+ Assert( !GinPageIsData(page) );
+
+ if ( btree->fullScan ) {
+ stack->off = FirstOffsetNumber;
+ return TRUE;
+ }
+
+ low = FirstOffsetNumber;
+ high = PageGetMaxOffsetNumber(page);
+
+ if ( high < low ) {
+ stack->off = FirstOffsetNumber;
+ return false;
+ }
+
+ high++;
+
+ while (high > low) {
+ OffsetNumber mid = low + ((high - low) / 2);
+ bool isnull;
+ int result;
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
+ result = compareEntries(btree->ginstate, btree->entryValue,
+ index_getattr(itup, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull) );
+
+ if ( result == 0 ) {
+ stack->off = mid;
+ return true;
+ } else if ( result > 0 )
+ low = mid + 1;
+ else
+ high = mid;
+ }
+
+ stack->off = high;
+ return false;
+}
+
+static OffsetNumber
+entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff) {
+ OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page);
+ IndexTuple itup;
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( !GinPageIsData(page) );
+
+ /* if page isn't changed, we returns storedOff */
+ if ( storedOff>= FirstOffsetNumber && storedOff<=maxoff) {
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
+ if ( GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno )
+ return storedOff;
+
+ /* we hope, that needed pointer goes to right. It's true
+ if there wasn't a deletion */
+ for( i=storedOff+1 ; i <= maxoff ; i++ ) {
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ if ( GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno )
+ return i;
+ }
+ maxoff = storedOff-1;
+ }
+
+ /* last chance */
+ for( i=FirstOffsetNumber; i <= maxoff ; i++ ) {
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ if ( GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno )
+ return i;
+ }
+
+ return InvalidOffsetNumber;
+}
+
+static BlockNumber
+entryGetLeftMostPage(GinBtree btree, Page page) {
+ IndexTuple itup;
+
+ Assert( !GinPageIsLeaf(page) );
+ Assert( !GinPageIsData(page) );
+ Assert( PageGetMaxOffsetNumber(page) >= FirstOffsetNumber );
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
+ return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+}
+
+static bool
+entryIsEnoughSpace( GinBtree btree, Buffer buf, OffsetNumber off ) {
+ Size itupsz = 0;
+ Page page = BufferGetPage(buf);
+
+ Assert( btree->entry );
+ Assert( !GinPageIsData(page) );
+
+ if ( btree->isDelete ) {
+ IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));
+ itupsz = MAXALIGN( IndexTupleSize( itup ) ) + sizeof(ItemIdData);
+ }
+
+ if ( PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData) )
+ return true;
+
+ return false;
+}
+
+/*
+ * Delete tuple on leaf page if tuples was existed and we
+ * should update it, update old child blkno to new right page
+ * if child split is occured
+ */
+static BlockNumber
+entryPreparePage( GinBtree btree, Page page, OffsetNumber off) {
+ BlockNumber ret = InvalidBlockNumber;
+
+ Assert( btree->entry );
+ Assert( !GinPageIsData(page) );
+
+ if ( btree->isDelete ) {
+ Assert( GinPageIsLeaf(page) );
+ PageIndexTupleDelete(page, off);
+ }
+
+ if ( !GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber ) {
+ IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));
+ ItemPointerSet(&itup->t_tid, btree->rightblkno, InvalidOffsetNumber);
+ ret = btree->rightblkno;
+ }
+
+ btree->rightblkno = InvalidBlockNumber;
+
+ return ret;
+}
+
+/*
+ * Place tuple on page and fills WAL record
+ */
+static void
+entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata) {
+ Page page = BufferGetPage(buf);
+ static XLogRecData rdata[3];
+ OffsetNumber placed;
+ static ginxlogInsert data;
+
+ *prdata = rdata;
+ data.updateBlkno = entryPreparePage( btree, page, off );
+
+ placed = PageAddItem( page, (Item)btree->entry, IndexTupleSize(btree->entry), off, LP_USED);
+ if ( placed != off )
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(btree->index));
+
+ data.node = btree->index->rd_node;
+ data.blkno = BufferGetBlockNumber( buf );
+ data.offset = off;
+ data.nitem = 1;
+ data.isDelete = btree->isDelete;
+ data.isData = false;
+ data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
+
+ rdata[0].buffer = buf;
+ rdata[0].buffer_std = TRUE;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) &data;
+ rdata[1].len = sizeof(ginxlogInsert);
+ rdata[1].next = &rdata[2];
+
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].data = (char *) btree->entry;
+ rdata[2].len = IndexTupleSize(btree->entry);
+ rdata[2].next = NULL;
+
+ btree->entry = NULL;
+}
+
+/*
+ * Place tuple and split page, original buffer(lbuf) leaves untouched,
+ * returns shadow page of lbuf filled new data.
+ * Tuples are distributed between pages by equal size on its, not
+ * an equal number!
+ */
+static Page
+entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata) {
+ static XLogRecData rdata[2];
+ OffsetNumber i, maxoff, separator=InvalidOffsetNumber;
+ Size totalsize=0;
+ Size lsize = 0, size;
+ static char tupstore[ 2*BLCKSZ ];
+ char *ptr;
+ IndexTuple itup, leftrightmost=NULL;
+ static ginxlogSplit data;
+ Datum value;
+ bool isnull;
+ Page page;
+ Page lpage = GinPageGetCopyPage( BufferGetPage( lbuf ) );
+ Page rpage = BufferGetPage( rbuf );
+ Size pageSize = PageGetPageSize( lpage );
+
+ *prdata = rdata;
+ data.leftChildBlkno = ( GinPageIsLeaf(lpage) ) ?
+ InvalidOffsetNumber : GinItemPointerGetBlockNumber( &(btree->entry->t_tid) );
+ data.updateBlkno = entryPreparePage( btree, lpage, off );
+
+ maxoff = PageGetMaxOffsetNumber(lpage);
+ ptr = tupstore;
+
+ for(i=FirstOffsetNumber; i<=maxoff; i++) {
+ if ( i==off ) {
+ size = MAXALIGN( IndexTupleSize(btree->entry) );
+ memcpy(ptr, btree->entry, size);
+ ptr+=size;
+ totalsize += size + sizeof(ItemIdData);
+ }
+
+ itup = (IndexTuple)PageGetItem(lpage, PageGetItemId(lpage, i));
+ size = MAXALIGN( IndexTupleSize(itup) );
+ memcpy(ptr, itup, size);
+ ptr+=size;
+ totalsize += size + sizeof(ItemIdData);
+ }
+
+ if ( off==maxoff+1 ) {
+ size = MAXALIGN( IndexTupleSize(btree->entry) );
+ memcpy(ptr, btree->entry, size);
+ ptr+=size;
+ totalsize += size + sizeof(ItemIdData);
+ }
+
+ GinInitPage( rpage, GinPageGetOpaque(lpage)->flags, pageSize );
+ GinInitPage( lpage, GinPageGetOpaque(rpage)->flags, pageSize );
+
+ ptr = tupstore;
+ maxoff++;
+ lsize = 0;
+
+ page = lpage;
+ for(i=FirstOffsetNumber; i<=maxoff; i++) {
+ itup = (IndexTuple)ptr;
+
+ if ( lsize > totalsize/2 ) {
+ if ( separator==InvalidOffsetNumber )
+ separator = i-1;
+ page = rpage;
+ } else {
+ leftrightmost = itup;
+ lsize += MAXALIGN( IndexTupleSize(itup) ) + sizeof(ItemIdData);
+ }
+
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(btree->index));
+ ptr += MAXALIGN( IndexTupleSize(itup) );
+ }
+
+ value = index_getattr(leftrightmost, FirstOffsetNumber, btree->ginstate->tupdesc, &isnull);
+ btree->entry = GinFormTuple( btree->ginstate, value, NULL, 0);
+ ItemPointerSet(&(btree->entry)->t_tid, BufferGetBlockNumber( lbuf ), InvalidOffsetNumber);
+ btree->rightblkno = BufferGetBlockNumber( rbuf );
+
+ data.node = btree->index->rd_node;
+ data.rootBlkno = InvalidBlockNumber;
+ data.lblkno = BufferGetBlockNumber( lbuf );
+ data.rblkno = BufferGetBlockNumber( rbuf );
+ data.separator = separator;
+ data.nitem = maxoff;
+ data.isData = FALSE;
+ data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
+ data.isRootSplit = FALSE;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogSplit);
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = tupstore;
+ rdata[1].len = MAXALIGN(totalsize);
+ rdata[1].next = NULL;
+
+ return lpage;
+}
+
+/*
+ * return newly allocate rightmost tuple
+ */
+IndexTuple
+ginPageGetLinkItup(Buffer buf) {
+ IndexTuple itup, nitup;
+ Page page = BufferGetPage(buf);
+
+ itup = getRightMostTuple( page );
+ if ( GinPageIsLeaf(page) && !GinIsPostingTree(itup) ) {
+ nitup = (IndexTuple)palloc( MAXALIGN(GinGetOrigSizePosting(itup)) );
+ memcpy( nitup, itup, GinGetOrigSizePosting(itup) );
+ nitup->t_info &= ~INDEX_SIZE_MASK;
+ nitup->t_info |= GinGetOrigSizePosting(itup);
+ } else {
+ nitup = (IndexTuple)palloc( MAXALIGN(IndexTupleSize(itup)) );
+ memcpy( nitup, itup, IndexTupleSize(itup) );
+ }
+
+ ItemPointerSet(&nitup->t_tid, BufferGetBlockNumber(buf), InvalidOffsetNumber);
+ return nitup;
+}
+
+/*
+ * Fills new root by rightest values from child.
+ * Also called from ginxlog, should not use btree
+ */
+void
+entryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf) {
+ Page page;
+ IndexTuple itup;
+
+ page = BufferGetPage(root);
+
+ itup = ginPageGetLinkItup( lbuf );
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index root page");
+
+ itup = ginPageGetLinkItup( rbuf );
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index root page");
+}
+
+void
+prepareEntryScan( GinBtree btree, Relation index, Datum value, GinState *ginstate) {
+ memset(btree, 0, sizeof(GinBtreeData));
+
+ btree->isMoveRight = entryIsMoveRight;
+ btree->findChildPage = entryLocateEntry;
+ btree->findItem = entryLocateLeafEntry;
+ btree->findChildPtr = entryFindChildPtr;
+ btree->getLeftMostPage = entryGetLeftMostPage;
+ btree->isEnoughSpace = entryIsEnoughSpace;
+ btree->placeToPage = entryPlaceToPage;
+ btree->splitPage = entrySplitPage;
+ btree->fillRoot = entryFillRoot;
+
+ btree->index = index;
+ btree->ginstate = ginstate;
+ btree->entryValue = value;
+
+ btree->isDelete = FALSE;
+ btree->searchMode = FALSE;
+ btree->fullScan = FALSE;
+ btree->isBuild = FALSE;
+}
+
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
new file mode 100644
index 00000000000..e160197e0bb
--- /dev/null
+++ b/src/backend/access/gin/ginget.c
@@ -0,0 +1,412 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginget.c
+ * fetch tuples from a GIN scan.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginget.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+
+static OffsetNumber
+findItemInPage( Page page, ItemPointer item, OffsetNumber off ) {
+ OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
+ int res;
+
+ for(; off<=maxoff; off++) {
+ res = compareItemPointers( item, (ItemPointer)GinDataPageGetItem(page, off) );
+ Assert( res>= 0 );
+
+ if ( res == 0 )
+ return off;
+ }
+
+ return InvalidOffsetNumber;
+}
+
+/*
+ * Start* functions setup state of searches: find correct buffer and locks it,
+ * Stop* functions unlock buffer (but don't release!)
+ */
+static void
+startScanEntry( Relation index, GinState *ginstate, GinScanEntry entry, bool firstCall ) {
+ if ( entry->master != NULL ) {
+ entry->isFinished = entry->master->isFinished;
+ return;
+ }
+
+ if ( firstCall ) {
+ /* at first call we should find entry, and
+ begin scan of posting tree or just store posting list in memory */
+ GinBtreeData btreeEntry;
+ GinBtreeStack *stackEntry;
+ Page page;
+ bool needUnlock = TRUE;
+
+ prepareEntryScan( &btreeEntry, index, entry->entry, ginstate );
+ btreeEntry.searchMode = TRUE;
+ stackEntry = ginFindLeafPage(&btreeEntry, NULL);
+ page = BufferGetPage( stackEntry->buffer );
+
+ entry->isFinished = TRUE;
+ entry->buffer = InvalidBuffer;
+ entry->offset = InvalidOffsetNumber;
+ entry->list = NULL;
+ entry->nlist = 0;
+ entry->reduceResult = FALSE;
+ entry->predictNumberResult = 0;
+
+ if ( btreeEntry.findItem( &btreeEntry, stackEntry ) ) {
+ IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stackEntry->off));
+
+ if ( GinIsPostingTree(itup) ) {
+ BlockNumber rootPostingTree = GinGetPostingTree(itup);
+ GinPostingTreeScan *gdi;
+ Page page;
+
+ LockBuffer(stackEntry->buffer, GIN_UNLOCK);
+ needUnlock = FALSE;
+ gdi = prepareScanPostingTree( index, rootPostingTree, TRUE );
+
+ entry->buffer = scanBeginPostingTree( gdi );
+ IncrBufferRefCount( entry->buffer );
+
+ page = BufferGetPage( entry->buffer );
+ entry->predictNumberResult = gdi->stack->predictNumber * GinPageGetOpaque(page)->maxoff;
+
+ freeGinBtreeStack( gdi->stack );
+ pfree( gdi );
+ entry->isFinished = FALSE;
+ } else if ( GinGetNPosting(itup) > 0 ) {
+ entry->nlist = GinGetNPosting(itup);
+ entry->list = (ItemPointerData*)palloc( sizeof(ItemPointerData) * entry->nlist );
+ memcpy( entry->list, GinGetPosting(itup), sizeof(ItemPointerData) * entry->nlist );
+ entry->isFinished = FALSE;
+ }
+ }
+
+ if ( needUnlock )
+ LockBuffer(stackEntry->buffer, GIN_UNLOCK);
+ freeGinBtreeStack( stackEntry );
+ } else if ( entry->buffer != InvalidBuffer ) {
+ /* we should find place were we was stopped */
+ BlockNumber blkno;
+ Page page;
+
+ LockBuffer( entry->buffer, GIN_SHARE );
+
+ if ( !ItemPointerIsValid( &entry->curItem ) )
+ /* start position */
+ return;
+ Assert( entry->offset!=InvalidOffsetNumber );
+
+ page = BufferGetPage( entry->buffer );
+
+ /* try to find curItem in current buffer */
+ if ( (entry->offset=findItemInPage(page , &entry->curItem, entry->offset))!=InvalidOffsetNumber )
+ return;
+
+ /* walk to right */
+ while( (blkno = GinPageGetOpaque( page )->rightlink)!=InvalidBlockNumber ) {
+ LockBuffer( entry->buffer, GIN_UNLOCK );
+ entry->buffer = ReleaseAndReadBuffer( entry->buffer, index, blkno );
+ LockBuffer( entry->buffer, GIN_SHARE );
+ page = BufferGetPage( entry->buffer );
+
+ if ( (entry->offset=findItemInPage(page , &entry->curItem, FirstOffsetNumber))!=InvalidOffsetNumber )
+ return;
+ }
+
+ elog(ERROR,"Logic error: lost previously founded ItemId");
+ }
+}
+
+static void
+stopScanEntry( GinScanEntry entry ) {
+ if ( entry->buffer != InvalidBuffer )
+ LockBuffer( entry->buffer, GIN_UNLOCK );
+}
+
+static void
+startScanKey( Relation index, GinState *ginstate, GinScanKey key ) {
+ uint32 i;
+
+ for(i=0;i<key->nentries;i++)
+ startScanEntry( index, ginstate, key->scanEntry+i, key->firstCall );
+
+ if ( key->firstCall ) {
+ memset( key->entryRes, TRUE, sizeof(bool) * key->nentries );
+ key->isFinished = FALSE;
+ key->firstCall = FALSE;
+
+ if ( GinFuzzySearchLimit > 0 ) {
+ /*
+ * If all of keys more than treshold we will try to reduce
+ * result, we hope (and only hope, for intersection operation of array
+ * our supposition isn't true), that total result will not more
+ * than minimal predictNumberResult.
+ */
+
+ for(i=0;i<key->nentries;i++)
+ if ( key->scanEntry[i].predictNumberResult <= key->nentries * GinFuzzySearchLimit )
+ return;
+
+ for(i=0;i<key->nentries;i++)
+ if ( key->scanEntry[i].predictNumberResult > key->nentries * GinFuzzySearchLimit ) {
+ key->scanEntry[i].predictNumberResult /= key->nentries;
+ key->scanEntry[i].reduceResult = TRUE;
+ }
+ }
+ }
+}
+
+static void
+stopScanKey( GinScanKey key ) {
+ uint32 i;
+
+ for(i=0;i<key->nentries;i++)
+ stopScanEntry( key->scanEntry+i );
+}
+
+static void
+startScan( IndexScanDesc scan ) {
+ uint32 i;
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ for(i=0; i<so->nkeys; i++)
+ startScanKey( scan->indexRelation, &so->ginstate, so->keys + i );
+}
+
+static void
+stopScan( IndexScanDesc scan ) {
+ uint32 i;
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ for(i=0; i<so->nkeys; i++)
+ stopScanKey( so->keys + i );
+}
+
+
+static void
+entryGetNextItem( Relation index, GinScanEntry entry ) {
+ Page page = BufferGetPage( entry->buffer );
+
+ entry->offset++;
+ if ( entry->offset <= GinPageGetOpaque( page )->maxoff && GinPageGetOpaque( page )->maxoff >= FirstOffsetNumber ) {
+ entry->curItem = *(ItemPointerData*)GinDataPageGetItem(page, entry->offset);
+ } else {
+ BlockNumber blkno = GinPageGetOpaque( page )->rightlink;
+
+ LockBuffer( entry->buffer, GIN_UNLOCK );
+ if ( blkno == InvalidBlockNumber ) {
+ ReleaseBuffer( entry->buffer );
+ entry->buffer = InvalidBuffer;
+ entry->isFinished = TRUE;
+ } else {
+ entry->buffer = ReleaseAndReadBuffer( entry->buffer, index, blkno );
+ LockBuffer( entry->buffer, GIN_SHARE );
+ entry->offset = InvalidOffsetNumber;
+ entryGetNextItem(index, entry);
+ }
+ }
+}
+
+#define gin_rand() (((double) random()) / ((double) MAX_RANDOM_VALUE))
+#define dropItem(e) ( gin_rand() > ((double)GinFuzzySearchLimit)/((double)((e)->predictNumberResult)) )
+
+/*
+ * Sets entry->curItem to new found heap item pointer for one
+ * entry of one scan key
+ */
+static bool
+entryGetItem( Relation index, GinScanEntry entry ) {
+ if ( entry->master ) {
+ entry->isFinished = entry->master->isFinished;
+ entry->curItem = entry->master->curItem;
+ } else if ( entry->list ) {
+ entry->offset++;
+ if ( entry->offset <= entry->nlist )
+ entry->curItem = entry->list[ entry->offset - 1 ];
+ else {
+ ItemPointerSet( &entry->curItem, InvalidBlockNumber, InvalidOffsetNumber );
+ entry->isFinished = TRUE;
+ }
+ } else {
+ do {
+ entryGetNextItem(index, entry);
+ } while ( entry->isFinished == FALSE && entry->reduceResult == TRUE && dropItem(entry) );
+ }
+
+ return entry->isFinished;
+}
+
+/*
+ * Sets key->curItem to new found heap item pointer for one scan key
+ * returns isFinished!
+ */
+static bool
+keyGetItem( Relation index, GinState *ginstate, MemoryContext tempCtx, GinScanKey key ) {
+ uint32 i;
+ GinScanEntry entry;
+ bool res;
+ MemoryContext oldCtx;
+
+ if ( key->isFinished )
+ return TRUE;
+
+ do {
+ /* move forward from previously value and set new curItem,
+ which is minimal from entries->curItems */
+ ItemPointerSetMax( &key->curItem );
+ for(i=0;i<key->nentries;i++) {
+ entry = key->scanEntry+i;
+
+ if ( key->entryRes[i] ) {
+ if ( entry->isFinished == FALSE && entryGetItem(index, entry) == FALSE ) {
+ if (compareItemPointers( &entry->curItem, &key->curItem ) < 0)
+ key->curItem = entry->curItem;
+ } else
+ key->entryRes[i] = FALSE;
+ } else if ( entry->isFinished == FALSE ) {
+ if (compareItemPointers( &entry->curItem, &key->curItem ) < 0)
+ key->curItem = entry->curItem;
+ }
+ }
+
+ if ( ItemPointerIsMax( &key->curItem ) ) {
+ /* all entries are finished */
+ key->isFinished = TRUE;
+ return TRUE;
+ }
+
+ if ( key->nentries == 1 ) {
+ /* we can do not call consistentFn !! */
+ key->entryRes[0] = TRUE;
+ return FALSE;
+ }
+
+ /* setting up array for consistentFn */
+ for(i=0;i<key->nentries;i++) {
+ entry = key->scanEntry+i;
+
+ if ( entry->isFinished == FALSE && compareItemPointers( &entry->curItem, &key->curItem )==0 )
+ key->entryRes[i] = TRUE;
+ else
+ key->entryRes[i] = FALSE;
+ }
+
+ oldCtx = MemoryContextSwitchTo(tempCtx);
+ res = DatumGetBool( FunctionCall3(
+ &ginstate->consistentFn,
+ PointerGetDatum( key->entryRes ),
+ UInt16GetDatum( key->strategy ),
+ key->query
+ ));
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextReset(tempCtx);
+ } while( !res );
+
+ return FALSE;
+}
+
+/*
+ * Get heap item pointer from scan
+ * returns true if found
+ */
+static bool
+scanGetItem( IndexScanDesc scan, ItemPointerData *item ) {
+ uint32 i;
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ ItemPointerSetMin( item );
+ for(i=0;i<so->nkeys;i++) {
+ GinScanKey key = so->keys+i;
+
+ if ( keyGetItem( scan->indexRelation, &so->ginstate, so->tempCtx, key )==FALSE ) {
+ if ( compareItemPointers( item, &key->curItem ) < 0 )
+ *item = key->curItem;
+ } else
+ return FALSE; /* finshed one of keys */
+ }
+
+ for(i=1;i<=so->nkeys;i++) {
+ GinScanKey key = so->keys+i-1;
+
+ for(;;) {
+ int cmp = compareItemPointers( item, &key->curItem );
+
+ if ( cmp == 0 )
+ break;
+ else if ( cmp > 0 ) {
+ if ( keyGetItem( scan->indexRelation, &so->ginstate, so->tempCtx, key )==TRUE )
+ return FALSE; /* finshed one of keys */
+ } else { /* returns to begin */
+ *item = key->curItem;
+ i=0;
+ break;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
+#define GinIsNewKey(s) ( ((GinScanOpaque) scan->opaque)->keys == NULL )
+
+Datum
+gingetmulti(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
+ int32 max_tids = PG_GETARG_INT32(2);
+ int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3);
+
+ if ( GinIsNewKey(scan) )
+ newScanKey( scan );
+
+ startScan( scan );
+
+ *returned_tids = 0;
+
+ do {
+ if ( scanGetItem( scan, tids + *returned_tids ) )
+ (*returned_tids)++;
+ else
+ break;
+ } while ( *returned_tids < max_tids );
+
+ stopScan( scan );
+
+ PG_RETURN_BOOL(*returned_tids == max_tids);
+}
+
+Datum
+gingettuple(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
+ bool res;
+
+ if ( dir != ForwardScanDirection )
+ elog(ERROR, "Gin doesn't support other scan directions than forward");
+
+ if ( GinIsNewKey(scan) )
+ newScanKey( scan );
+
+ startScan( scan );
+ res = scanGetItem(scan, &scan->xs_ctup.t_self);
+ stopScan( scan );
+
+ PG_RETURN_BOOL(res);
+}
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
new file mode 100644
index 00000000000..fd7fc9f823b
--- /dev/null
+++ b/src/backend/access/gin/gininsert.c
@@ -0,0 +1,374 @@
+/*-------------------------------------------------------------------------
+ *
+ * gininsert.c
+ * insert routines for the postgres inverted index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/gininsert.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+#include "access/tuptoaster.h"
+
+typedef struct {
+ GinState ginstate;
+ double indtuples;
+ MemoryContext tmpCtx;
+ BuildAccumulator accum;
+} GinBuildState;
+
+/*
+ * Creates posting tree with one page. Function
+ * suppose that items[] fits to page
+ */
+static BlockNumber
+createPostingTree( Relation index, ItemPointerData *items, uint32 nitems ) {
+ BlockNumber blkno;
+ Buffer buffer = GinNewBuffer(index);
+ Page page;
+
+ START_CRIT_SECTION();
+
+ GinInitBuffer( buffer, GIN_DATA|GIN_LEAF );
+ page = BufferGetPage(buffer);
+ blkno = BufferGetBlockNumber(buffer);
+
+ memcpy( GinDataPageGetData(page), items, sizeof(ItemPointerData) * nitems );
+ GinPageGetOpaque(page)->maxoff = nitems;
+
+ if (!index->rd_istemp) {
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+ ginxlogCreatePostingTree data;
+
+ data.node = index->rd_node;
+ data.blkno = blkno;
+ data.nitem = nitems;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogCreatePostingTree);
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) items;
+ rdata[1].len = sizeof(ItemPointerData) * nitems;
+ rdata[1].next = NULL;
+
+
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+
+ }
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+
+ END_CRIT_SECTION();
+
+ return blkno;
+}
+
+
+/*
+ * Adds array of item pointers to tuple's posting list or
+ * creates posting tree and tuple pointed to tree in a case
+ * of not enough space. Max size of tuple is defined in
+ * GinFormTuple().
+ */
+static IndexTuple
+addItemPointersToTuple(Relation index, GinState *ginstate, GinBtreeStack *stack,
+ IndexTuple old, ItemPointerData *items, uint32 nitem, bool isBuild) {
+ bool isnull;
+ Datum key = index_getattr(old, FirstOffsetNumber, ginstate->tupdesc, &isnull);
+ IndexTuple res = GinFormTuple(ginstate, key, NULL, nitem + GinGetNPosting(old));
+
+ if ( res ) {
+ /* good, small enough */
+ MergeItemPointers( GinGetPosting(res),
+ GinGetPosting(old), GinGetNPosting(old),
+ items, nitem
+ );
+
+ GinSetNPosting(res, nitem + GinGetNPosting(old));
+ } else {
+ BlockNumber postingRoot;
+ GinPostingTreeScan *gdi;
+
+ /* posting list becomes big, so we need to make posting's tree */
+ res = GinFormTuple(ginstate, key, NULL, 0);
+ postingRoot = createPostingTree(index, GinGetPosting(old), GinGetNPosting(old));
+ GinSetPostingTree(res, postingRoot);
+
+ gdi = prepareScanPostingTree(index, postingRoot, FALSE);
+ gdi->btree.isBuild = isBuild;
+
+ insertItemPointer(gdi, items, nitem);
+
+ pfree(gdi);
+ }
+
+ return res;
+}
+
+/*
+ * Inserts only one entry to the index, but it can adds more that 1
+ * ItemPointer.
+ */
+static void
+ginEntryInsert( Relation index, GinState *ginstate, Datum value, ItemPointerData *items, uint32 nitem, bool isBuild) {
+ GinBtreeData btree;
+ GinBtreeStack *stack;
+ IndexTuple itup;
+ Page page;
+
+ prepareEntryScan( &btree, index, value, ginstate );
+
+ stack = ginFindLeafPage(&btree, NULL);
+ page = BufferGetPage( stack->buffer );
+
+ if ( btree.findItem( &btree, stack ) ) {
+ /* found entry */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
+
+ if ( GinIsPostingTree(itup) ) {
+ /* lock root of posting tree */
+ GinPostingTreeScan *gdi;
+ BlockNumber rootPostingTree = GinGetPostingTree(itup);
+
+ /* release all stack */
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ freeGinBtreeStack( stack );
+
+ /* insert into posting tree */
+ gdi = prepareScanPostingTree( index, rootPostingTree, FALSE );
+ gdi->btree.isBuild = isBuild;
+ insertItemPointer(gdi, items, nitem);
+
+ return;
+ }
+
+ itup = addItemPointersToTuple(index, ginstate, stack, itup, items, nitem, isBuild);
+
+ btree.isDelete = TRUE;
+ } else {
+ /* We suppose, that tuple can store at list one itempointer */
+ itup = GinFormTuple( ginstate, value, items, 1);
+ if ( itup==NULL || IndexTupleSize(itup) >= GinMaxItemSize )
+ elog(ERROR, "huge tuple");
+
+ if ( nitem>1 ) {
+ IndexTuple previtup = itup;
+
+ itup = addItemPointersToTuple(index, ginstate, stack, previtup, items+1, nitem-1, isBuild);
+ pfree(previtup);
+ }
+ }
+
+ btree.entry = itup;
+ ginInsertValue(&btree, stack);
+ pfree( itup );
+}
+
+/*
+ * Saves indexed value in memory accumulator during index creation
+ * Function isnt use during normal insert
+ */
+static uint32
+ginHeapTupleBulkInsert(BuildAccumulator *accum, Datum value, ItemPointer heapptr) {
+ Datum *entries;
+ uint32 nentries;
+
+ entries = extractEntriesSU( accum->ginstate, value, &nentries);
+
+ if ( nentries==0 )
+ /* nothing to insert */
+ return 0;
+
+ ginInsertRecordBA( accum, heapptr, entries, nentries);
+
+ pfree( entries );
+
+ return nentries;
+}
+
+static void
+ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
+ bool *isnull, bool tupleIsAlive, void *state) {
+
+ GinBuildState *buildstate = (GinBuildState*)state;
+ MemoryContext oldCtx;
+
+ if ( *isnull )
+ return;
+
+ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+ buildstate->indtuples += ginHeapTupleBulkInsert(&buildstate->accum, *values, &htup->t_self);
+
+ /* we use only half maintenance_work_mem, because there is some leaks
+ during insertion and extract values */
+ if ( buildstate->accum.allocatedMemory >= maintenance_work_mem*1024L/2L ) {
+ ItemPointerData *list;
+ Datum entry;
+ uint32 nlist;
+
+ while( (list=ginGetEntry(&buildstate->accum, &entry, &nlist)) != NULL )
+ ginEntryInsert(index, &buildstate->ginstate, entry, list, nlist, TRUE);
+
+ MemoryContextReset(buildstate->tmpCtx);
+ ginInitBA(&buildstate->accum);
+ }
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
+Datum
+ginbuild(PG_FUNCTION_ARGS) {
+ Relation heap = (Relation) PG_GETARG_POINTER(0);
+ Relation index = (Relation) PG_GETARG_POINTER(1);
+ IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
+ double reltuples;
+ GinBuildState buildstate;
+ Buffer buffer;
+ ItemPointerData *list;
+ Datum entry;
+ uint32 nlist;
+ MemoryContext oldCtx;
+
+ if (RelationGetNumberOfBlocks(index) != 0)
+ elog(ERROR, "index \"%s\" already contains data",
+ RelationGetRelationName(index));
+
+ initGinState(&buildstate.ginstate, index);
+
+ /* initialize the root page */
+ buffer = GinNewBuffer(index);
+ START_CRIT_SECTION();
+ GinInitBuffer(buffer, GIN_LEAF);
+ if (!index->rd_istemp) {
+ XLogRecPtr recptr;
+ XLogRecData rdata;
+ Page page;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &(index->rd_node);
+ rdata.len = sizeof(RelFileNode);
+ rdata.next = NULL;
+
+ page = BufferGetPage(buffer);
+
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX, &rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+
+ }
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ END_CRIT_SECTION();
+
+ /* build the index */
+ buildstate.indtuples = 0;
+
+ /*
+ * create a temporary memory context that is reset once for each tuple
+ * inserted into the index
+ */
+ buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ buildstate.accum.ginstate = &buildstate.ginstate;
+ ginInitBA( &buildstate.accum );
+
+ /* do the heap scan */
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo,
+ ginBuildCallback, (void *) &buildstate);
+
+ oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
+ while( (list=ginGetEntry(&buildstate.accum, &entry, &nlist)) != NULL )
+ ginEntryInsert(index, &buildstate.ginstate, entry, list, nlist, TRUE);
+ MemoryContextSwitchTo(oldCtx);
+
+ MemoryContextDelete(buildstate.tmpCtx);
+
+ /* since we just counted the # of tuples, may as well update stats */
+ IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Inserts value during normal insertion
+ */
+static uint32
+ginHeapTupleInsert( Relation index, GinState *ginstate, Datum value, ItemPointer item) {
+ Datum *entries;
+ uint32 i,nentries;
+
+ entries = extractEntriesSU( ginstate, value, &nentries);
+
+ if ( nentries==0 )
+ /* nothing to insert */
+ return 0;
+
+ for(i=0;i<nentries;i++)
+ ginEntryInsert(index, ginstate, entries[i], item, 1, FALSE);
+
+ return nentries;
+}
+
+Datum
+gininsert(PG_FUNCTION_ARGS) {
+ Relation index = (Relation) PG_GETARG_POINTER(0);
+ Datum *values = (Datum *) PG_GETARG_POINTER(1);
+ bool *isnull = (bool *) PG_GETARG_POINTER(2);
+ ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
+#ifdef NOT_USED
+ Relation heapRel = (Relation) PG_GETARG_POINTER(4);
+ bool checkUnique = PG_GETARG_BOOL(5);
+#endif
+ GinState ginstate;
+ MemoryContext oldCtx;
+ MemoryContext insertCtx;
+ uint32 res;
+
+ if ( *isnull )
+ PG_RETURN_BOOL(false);
+
+ insertCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin insert temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ oldCtx = MemoryContextSwitchTo(insertCtx);
+
+ initGinState(&ginstate, index);
+
+ res = ginHeapTupleInsert(index, &ginstate, *values, ht_ctid);
+
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextDelete(insertCtx);
+
+ PG_RETURN_BOOL(res>0);
+}
+
diff --git a/src/backend/access/gin/ginscan.c b/src/backend/access/gin/ginscan.c
new file mode 100644
index 00000000000..b641b82e662
--- /dev/null
+++ b/src/backend/access/gin/ginscan.c
@@ -0,0 +1,256 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginscan.c
+ * routines to manage scans inverted index relations
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginscan.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+
+
+Datum
+ginbeginscan(PG_FUNCTION_ARGS) {
+ Relation rel = (Relation) PG_GETARG_POINTER(0);
+ int keysz = PG_GETARG_INT32(1);
+ ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
+ IndexScanDesc scan;
+
+ scan = RelationGetIndexScan(rel, keysz, scankey);
+
+ PG_RETURN_POINTER(scan);
+}
+
+static void
+fillScanKey( GinState *ginstate, GinScanKey key, Datum query,
+ Datum *entryValues, uint32 nEntryValues, StrategyNumber strategy ) {
+ uint32 i,j;
+
+ key->nentries = nEntryValues;
+ key->entryRes = (bool*)palloc0( sizeof(bool) * nEntryValues );
+ key->scanEntry = (GinScanEntry) palloc( sizeof(GinScanEntryData) * nEntryValues );
+ key->strategy = strategy;
+ key->query = query;
+ key->firstCall= TRUE;
+ ItemPointerSet( &(key->curItem), InvalidBlockNumber, InvalidOffsetNumber );
+
+ for(i=0; i<nEntryValues; i++) {
+ key->scanEntry[i].pval = key->entryRes + i;
+ key->scanEntry[i].entry = entryValues[i];
+ ItemPointerSet( &(key->scanEntry[i].curItem), InvalidBlockNumber, InvalidOffsetNumber );
+ key->scanEntry[i].offset = InvalidOffsetNumber;
+ key->scanEntry[i].buffer = InvalidBuffer;
+ key->scanEntry[i].list = NULL;
+ key->scanEntry[i].nlist = 0;
+
+ /* link to the equals entry in current scan key */
+ key->scanEntry[i].master = NULL;
+ for( j=0; j<i; j++)
+ if ( compareEntries( ginstate, entryValues[i], entryValues[j] ) == 0 ) {
+ key->scanEntry[i].master = key->scanEntry + j;
+ break;
+ }
+ }
+}
+
+static void
+resetScanKeys(GinScanKey keys, uint32 nkeys) {
+ uint32 i, j;
+
+ if ( keys == NULL )
+ return;
+
+ for(i=0;i<nkeys;i++) {
+ GinScanKey key = keys + i;
+
+ key->firstCall = TRUE;
+ ItemPointerSet( &(key->curItem), InvalidBlockNumber, InvalidOffsetNumber );
+
+ for(j=0;j<key->nentries;j++) {
+ if ( key->scanEntry[j].buffer != InvalidBuffer )
+ ReleaseBuffer( key->scanEntry[i].buffer );
+
+ ItemPointerSet( &(key->scanEntry[j].curItem), InvalidBlockNumber, InvalidOffsetNumber );
+ key->scanEntry[j].offset = InvalidOffsetNumber;
+ key->scanEntry[j].buffer = InvalidBuffer;
+ key->scanEntry[j].list = NULL;
+ key->scanEntry[j].nlist = 0;
+ }
+ }
+}
+
+static void
+freeScanKeys(GinScanKey keys, uint32 nkeys, bool removeRes) {
+ uint32 i, j;
+
+ if ( keys == NULL )
+ return;
+
+ for(i=0;i<nkeys;i++) {
+ GinScanKey key = keys + i;
+
+ for(j=0;j<key->nentries;j++) {
+ if ( key->scanEntry[j].buffer != InvalidBuffer )
+ ReleaseBuffer( key->scanEntry[j].buffer );
+ if ( removeRes && key->scanEntry[j].list )
+ pfree(key->scanEntry[j].list);
+ }
+
+ if ( removeRes )
+ pfree(key->entryRes);
+ pfree(key->scanEntry);
+ }
+
+ pfree(keys);
+}
+
+void
+newScanKey( IndexScanDesc scan ) {
+ ScanKey scankey = scan->keyData;
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+ int i;
+ uint32 nkeys = 0;
+
+ so->keys = (GinScanKey) palloc( scan->numberOfKeys * sizeof(GinScanKeyData) );
+
+ for(i=0; i<scan->numberOfKeys; i++) {
+ Datum* entryValues;
+ uint32 nEntryValues;
+
+ if ( scankey[i].sk_flags & SK_ISNULL )
+ elog(ERROR, "Gin doesn't support NULL as scan key");
+ Assert( scankey[i].sk_attno == 1 );
+
+ entryValues = (Datum*)DatumGetPointer(
+ FunctionCall3(
+ &so->ginstate.extractQueryFn,
+ scankey[i].sk_argument,
+ PointerGetDatum( &nEntryValues ),
+ UInt16GetDatum(scankey[i].sk_strategy)
+ )
+ );
+ if ( entryValues==NULL || nEntryValues == 0 )
+ /* full scan... */
+ continue;
+
+ fillScanKey( &so->ginstate, &(so->keys[nkeys]), scankey[i].sk_argument,
+ entryValues, nEntryValues, scankey[i].sk_strategy );
+ nkeys++;
+ }
+
+ so->nkeys = nkeys;
+
+ if ( so->nkeys == 0 )
+ elog(ERROR, "Gin doesn't support full scan due to it's awful inefficiency");
+}
+
+Datum
+ginrescan(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
+ GinScanOpaque so;
+
+ so = (GinScanOpaque) scan->opaque;
+
+ if ( so == NULL ) {
+ /* if called from ginbeginscan */
+ so = (GinScanOpaque)palloc( sizeof(GinScanOpaqueData) );
+ so->tempCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin scan temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ initGinState(&so->ginstate, scan->indexRelation);
+ scan->opaque = so;
+ } else {
+ freeScanKeys(so->keys, so->nkeys, TRUE);
+ freeScanKeys(so->markPos, so->nkeys, FALSE);
+ }
+
+ so->markPos=so->keys=NULL;
+
+ if ( scankey && scan->numberOfKeys > 0 ) {
+ memmove(scan->keyData, scankey,
+ scan->numberOfKeys * sizeof(ScanKeyData));
+ }
+
+ PG_RETURN_VOID();
+}
+
+
+Datum
+ginendscan(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ if ( so != NULL ) {
+ freeScanKeys(so->keys, so->nkeys, TRUE);
+ freeScanKeys(so->markPos, so->nkeys, FALSE);
+
+ MemoryContextDelete(so->tempCtx);
+
+ pfree(so);
+ }
+
+ PG_RETURN_VOID();
+}
+
+static GinScanKey
+copyScanKeys( GinScanKey keys, uint32 nkeys ) {
+ GinScanKey newkeys;
+ uint32 i, j;
+
+ newkeys = (GinScanKey)palloc( sizeof(GinScanKeyData) * nkeys );
+ memcpy( newkeys, keys, sizeof(GinScanKeyData) * nkeys );
+
+ for(i=0;i<nkeys;i++) {
+ newkeys[i].scanEntry = (GinScanEntry)palloc(sizeof(GinScanEntryData) * keys[i].nentries );
+ memcpy( newkeys[i].scanEntry, keys[i].scanEntry, sizeof(GinScanEntryData) * keys[i].nentries );
+
+ for(j=0;j<keys[i].nentries; j++) {
+ if ( keys[i].scanEntry[j].buffer != InvalidBuffer )
+ IncrBufferRefCount( keys[i].scanEntry[j].buffer );
+ if ( keys[i].scanEntry[j].master ) {
+ int masterN = keys[i].scanEntry[j].master - keys[i].scanEntry;
+ newkeys[i].scanEntry[j].master = newkeys[i].scanEntry + masterN;
+ }
+ }
+ }
+
+ return newkeys;
+}
+
+Datum
+ginmarkpos(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ freeScanKeys(so->markPos, so->nkeys, FALSE);
+ so->markPos = copyScanKeys( so->keys, so->nkeys );
+
+ PG_RETURN_VOID();
+}
+
+Datum
+ginrestrpos(PG_FUNCTION_ARGS) {
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ GinScanOpaque so = (GinScanOpaque) scan->opaque;
+
+ freeScanKeys(so->keys, so->nkeys, FALSE);
+ so->keys = copyScanKeys( so->markPos, so->nkeys );
+
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
new file mode 100644
index 00000000000..b9cb80a6cf1
--- /dev/null
+++ b/src/backend/access/gin/ginutil.c
@@ -0,0 +1,203 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginutil.c
+ * utilities routines for the postgres inverted index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginutil.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+
+void
+initGinState( GinState *state, Relation index ) {
+ if ( index->rd_att->natts != 1 )
+ elog(ERROR, "numberOfAttributes %d != 1",
+ index->rd_att->natts);
+
+ state->tupdesc = index->rd_att;
+
+ fmgr_info_copy(&(state->compareFn),
+ index_getprocinfo(index, 1, GIN_COMPARE_PROC),
+ CurrentMemoryContext);
+ fmgr_info_copy(&(state->extractValueFn),
+ index_getprocinfo(index, 1, GIN_EXTRACTVALUE_PROC),
+ CurrentMemoryContext);
+ fmgr_info_copy(&(state->extractQueryFn),
+ index_getprocinfo(index, 1, GIN_EXTRACTQUERY_PROC),
+ CurrentMemoryContext);
+ fmgr_info_copy(&(state->consistentFn),
+ index_getprocinfo(index, 1, GIN_CONSISTENT_PROC),
+ CurrentMemoryContext);
+}
+
+/*
+ * Allocate a new page (either by recycling, or by extending the index file)
+ * The returned buffer is already pinned and exclusive-locked
+ * Caller is responsible for initializing the page by calling GinInitBuffer
+ */
+
+Buffer
+GinNewBuffer(Relation index) {
+ Buffer buffer;
+ bool needLock;
+
+ /* First, try to get a page from FSM */
+ for(;;) {
+ BlockNumber blkno = GetFreeIndexPage(&index->rd_node);
+ if (blkno == InvalidBlockNumber)
+ break;
+
+ buffer = ReadBuffer(index, blkno);
+
+ /*
+ * We have to guard against the possibility that someone else already
+ * recycled this page; the buffer may be locked if so.
+ */
+ if (ConditionalLockBuffer(buffer)) {
+ Page page = BufferGetPage(buffer);
+
+ if (PageIsNew(page))
+ return buffer; /* OK to use, if never initialized */
+
+ if (GinPageIsDeleted(page))
+ return buffer; /* OK to use */
+
+ LockBuffer(buffer, GIN_UNLOCK);
+ }
+
+ /* Can't use it, so release buffer and try again */
+ ReleaseBuffer(buffer);
+ }
+
+ /* Must extend the file */
+ needLock = !RELATION_IS_LOCAL(index);
+ if (needLock)
+ LockRelationForExtension(index, ExclusiveLock);
+
+ buffer = ReadBuffer(index, P_NEW);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+
+ if (needLock)
+ UnlockRelationForExtension(index, ExclusiveLock);
+
+ return buffer;
+}
+
+void
+GinInitPage(Page page, uint32 f, Size pageSize) {
+ GinPageOpaque opaque;
+
+ PageInit(page, pageSize, sizeof(GinPageOpaqueData));
+
+ opaque = GinPageGetOpaque(page);
+ memset( opaque, 0, sizeof(GinPageOpaqueData) );
+ opaque->flags = f;
+ opaque->rightlink = InvalidBlockNumber;
+}
+
+void
+GinInitBuffer(Buffer b, uint32 f) {
+ GinInitPage( BufferGetPage(b), f, BufferGetPageSize(b) );
+}
+
+int
+compareEntries(GinState *ginstate, Datum a, Datum b) {
+ return DatumGetInt32(
+ FunctionCall2(
+ &ginstate->compareFn,
+ a, b
+ )
+ );
+}
+
+static FmgrInfo* cmpDatumPtr=NULL;
+static bool needUnique = FALSE;
+
+static int
+cmpEntries(const void * a, const void * b) {
+ int res = DatumGetInt32(
+ FunctionCall2(
+ cmpDatumPtr,
+ *(Datum*)a,
+ *(Datum*)b
+ )
+ );
+
+ if ( res == 0 )
+ needUnique = TRUE;
+
+ return res;
+}
+
+Datum*
+extractEntriesS(GinState *ginstate, Datum value, uint32 *nentries) {
+ Datum *entries;
+
+ entries = (Datum*)DatumGetPointer(
+ FunctionCall2(
+ &ginstate->extractValueFn,
+ value,
+ PointerGetDatum( nentries )
+ )
+ );
+
+ if ( entries == NULL )
+ *nentries = 0;
+
+ if ( *nentries > 1 ) {
+ cmpDatumPtr = &ginstate->compareFn;
+ needUnique = FALSE;
+ qsort(entries, *nentries, sizeof(Datum), cmpEntries);
+ }
+
+ return entries;
+}
+
+
+Datum*
+extractEntriesSU(GinState *ginstate, Datum value, uint32 *nentries) {
+ Datum *entries = extractEntriesS(ginstate, value, nentries);
+
+ if ( *nentries>1 && needUnique ) {
+ Datum *ptr, *res;
+
+ ptr = res = entries;
+
+ while( ptr - entries < *nentries ) {
+ if ( compareEntries(ginstate, *ptr, *res ) != 0 )
+ *(++res) = *ptr++;
+ else
+ ptr++;
+ }
+
+ *nentries = res + 1 - entries;
+ }
+
+ return entries;
+}
+
+/*
+ * It's analog of PageGetTempPage(), but copies whole page
+ */
+Page
+GinPageGetCopyPage( Page page ) {
+ Size pageSize = PageGetPageSize( page );
+ Page tmppage;
+
+ tmppage=(Page)palloc( pageSize );
+ memcpy( tmppage, page, pageSize );
+
+ return tmppage;
+}
diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c
new file mode 100644
index 00000000000..ab1861fdaa9
--- /dev/null
+++ b/src/backend/access/gin/ginvacuum.c
@@ -0,0 +1,647 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginvacuum.c
+ * delete & vacuum routines for the postgres GIN
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginvacuum.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+#include "storage/freespace.h"
+#include "storage/smgr.h"
+#include "commands/vacuum.h"
+
+typedef struct {
+ Relation index;
+ IndexBulkDeleteResult *result;
+ IndexBulkDeleteCallback callback;
+ void *callback_state;
+ GinState ginstate;
+} GinVacuumState;
+
+
+/*
+ * Cleans array of ItemPointer (removes dead pointers)
+ * Results are always stored in *cleaned, which will be allocated
+ * if its needed. In case of *cleaned!=NULL caller is resposible to
+ * enough space. *cleaned and items may point to the same
+ * memory addres.
+ */
+
+static uint32
+ginVacuumPostingList( GinVacuumState *gvs, ItemPointerData *items, uint32 nitem, ItemPointerData **cleaned ) {
+ uint32 i,j=0;
+
+ /*
+ * just scan over ItemPointer array
+ */
+
+ for(i=0;i<nitem;i++) {
+ if ( gvs->callback(items+i, gvs->callback_state) ) {
+ gvs->result->tuples_removed += 1;
+ if ( !*cleaned ) {
+ *cleaned = (ItemPointerData*)palloc(sizeof(ItemPointerData)*nitem);
+ if ( i!=0 )
+ memcpy( *cleaned, items, sizeof(ItemPointerData)*i);
+ }
+ } else {
+ gvs->result->num_index_tuples += 1;
+ if (i!=j)
+ (*cleaned)[j] = items[i];
+ j++;
+ }
+ }
+
+ return j;
+}
+
+/*
+ * fills WAL record for vacuum leaf page
+ */
+static void
+xlogVacuumPage(Relation index, Buffer buffer) {
+ Page page = BufferGetPage( buffer );
+ XLogRecPtr recptr;
+ XLogRecData rdata[3];
+ ginxlogVacuumPage data;
+ char *backup;
+ char itups[BLCKSZ];
+ uint32 len=0;
+
+ Assert( GinPageIsLeaf( page ) );
+
+ if (index->rd_istemp)
+ return;
+
+ data.node = index->rd_node;
+ data.blkno = BufferGetBlockNumber(buffer);
+
+ if ( GinPageIsData( page ) ) {
+ backup = GinDataPageGetData( page );
+ data.nitem = GinPageGetOpaque( page )->maxoff;
+ if ( data.nitem )
+ len = MAXALIGN( sizeof(ItemPointerData)*data.nitem );
+ } else {
+ char *ptr;
+ OffsetNumber i;
+
+ ptr = backup = itups;
+ for(i=FirstOffsetNumber;i<=PageGetMaxOffsetNumber(page);i++) {
+ IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ memcpy( ptr, itup, IndexTupleSize( itup ) );
+ ptr += MAXALIGN( IndexTupleSize( itup ) );
+ }
+
+ data.nitem = PageGetMaxOffsetNumber(page);
+ len = ptr-backup;
+ }
+
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = ( GinPageIsData( page ) ) ? FALSE : TRUE;
+ rdata[0].len = 0;
+ rdata[0].data = NULL;
+ rdata[0].next = rdata + 1;
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].len = sizeof(ginxlogVacuumPage);
+ rdata[1].data = (char*)&data;
+
+ if ( len == 0 ) {
+ rdata[1].next = NULL;
+ } else {
+ rdata[1].next = rdata + 2;
+
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].len = len;
+ rdata[2].data = backup;
+ rdata[2].next = NULL;
+ }
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+}
+
+static bool
+ginVacuumPostingTreeLeaves( GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer ) {
+ Buffer buffer = ReadBuffer( gvs->index, blkno );
+ Page page = BufferGetPage( buffer );
+ bool hasVoidPage = FALSE;
+
+ /*
+ * We should be sure that we don't concurrent with inserts, insert process
+ * never release root page until end (but it can unlock it and lock again).
+ * If we lock root with with LockBufferForCleanup, new scan process can't begin,
+ * but previous may run.
+ * ginmarkpos/start* keeps buffer pinned, so we will wait for it.
+ * We lock only one posting tree in whole index, so, it's concurrent enough..
+ * Side effect: after this is full complete, tree is unused by any other process
+ */
+
+ LockBufferForCleanup( buffer );
+
+ Assert( GinPageIsData(page) );
+
+ if ( GinPageIsLeaf(page) ) {
+ OffsetNumber newMaxOff, oldMaxOff = GinPageGetOpaque(page)->maxoff;
+ ItemPointerData *cleaned = NULL;
+
+ newMaxOff = ginVacuumPostingList( gvs,
+ (ItemPointer)GinDataPageGetData(page), oldMaxOff, &cleaned );
+
+ /* saves changes about deleted tuple ... */
+ if ( oldMaxOff != newMaxOff ) {
+
+ START_CRIT_SECTION();
+
+ if ( newMaxOff > 0 )
+ memcpy( GinDataPageGetData(page), cleaned, sizeof(ItemPointerData) * newMaxOff );
+ pfree( cleaned );
+ GinPageGetOpaque(page)->maxoff = newMaxOff;
+
+ xlogVacuumPage(gvs->index, buffer);
+
+ MarkBufferDirty( buffer );
+ END_CRIT_SECTION();
+
+ /* if root is a leaf page, we don't desire futher processing */
+ if ( !isRoot && GinPageGetOpaque(page)->maxoff < FirstOffsetNumber )
+ hasVoidPage = TRUE;
+ }
+ } else {
+ OffsetNumber i;
+ bool isChildHasVoid = FALSE;
+
+ for( i=FirstOffsetNumber ; i <= GinPageGetOpaque(page)->maxoff ; i++ ) {
+ PostingItem *pitem = (PostingItem*)GinDataPageGetItem(page, i);
+ if ( ginVacuumPostingTreeLeaves( gvs, PostingItemGetBlockNumber(pitem), FALSE, NULL ) )
+ isChildHasVoid = TRUE;
+ }
+
+ if ( isChildHasVoid )
+ hasVoidPage = TRUE;
+ }
+
+ /* if we have root and theres void pages in tree, then we don't release lock
+ to go further processing and guarantee that tree is unused */
+ if ( !(isRoot && hasVoidPage) ) {
+ UnlockReleaseBuffer( buffer );
+ } else {
+ Assert( rootBuffer );
+ *rootBuffer = buffer;
+ }
+
+ return hasVoidPage;
+}
+
+static void
+ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
+ BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot ) {
+ Buffer dBuffer = ReadBuffer( gvs->index, deleteBlkno );
+ Buffer lBuffer = (leftBlkno==InvalidBlockNumber) ? InvalidBuffer : ReadBuffer( gvs->index, leftBlkno );
+ Buffer pBuffer = ReadBuffer( gvs->index, parentBlkno );
+ Page page, parentPage;
+
+ LockBuffer( dBuffer, GIN_EXCLUSIVE );
+ if ( !isParentRoot ) /* parent is already locked by LockBufferForCleanup() */
+ LockBuffer( pBuffer, GIN_EXCLUSIVE );
+
+ START_CRIT_SECTION();
+
+ if ( leftBlkno!= InvalidBlockNumber ) {
+ BlockNumber rightlink;
+
+ LockBuffer( lBuffer, GIN_EXCLUSIVE );
+
+ page = BufferGetPage( dBuffer );
+ rightlink = GinPageGetOpaque(page)->rightlink;
+
+ page = BufferGetPage( lBuffer );
+ GinPageGetOpaque(page)->rightlink = rightlink;
+ }
+
+ parentPage = BufferGetPage( pBuffer );
+ PageDeletePostingItem(parentPage, myoff);
+
+ page = BufferGetPage( dBuffer );
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+
+ if (!gvs->index->rd_istemp) {
+ XLogRecPtr recptr;
+ XLogRecData rdata[4];
+ ginxlogDeletePage data;
+ int n;
+
+ data.node = gvs->index->rd_node;
+ data.blkno = deleteBlkno;
+ data.parentBlkno = parentBlkno;
+ data.parentOffset = myoff;
+ data.leftBlkno = leftBlkno;
+ data.rightLink = GinPageGetOpaque(page)->rightlink;
+
+ rdata[0].buffer = dBuffer;
+ rdata[0].buffer_std = FALSE;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = rdata + 1;
+
+ rdata[1].buffer = pBuffer;
+ rdata[1].buffer_std = FALSE;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = rdata + 2;
+
+ if ( leftBlkno!= InvalidBlockNumber ) {
+ rdata[2].buffer = lBuffer;
+ rdata[2].buffer_std = FALSE;
+ rdata[2].data = NULL;
+ rdata[2].len = 0;
+ rdata[2].next = rdata + 3;
+ n = 3;
+ } else
+ n = 2;
+
+ rdata[n].buffer = InvalidBuffer;
+ rdata[n].buffer_std = FALSE;
+ rdata[n].len = sizeof(ginxlogDeletePage);
+ rdata[n].data = (char*)&data;
+ rdata[n].next = NULL;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ PageSetLSN(parentPage, recptr);
+ PageSetTLI(parentPage, ThisTimeLineID);
+ if ( leftBlkno!= InvalidBlockNumber ) {
+ page = BufferGetPage( lBuffer );
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+ }
+
+ MarkBufferDirty( pBuffer );
+ if ( !isParentRoot )
+ LockBuffer( pBuffer, GIN_UNLOCK );
+ ReleaseBuffer( pBuffer );
+
+ if ( leftBlkno!= InvalidBlockNumber ) {
+ MarkBufferDirty( lBuffer );
+ UnlockReleaseBuffer( lBuffer );
+ }
+
+ MarkBufferDirty( dBuffer );
+ UnlockReleaseBuffer( dBuffer );
+
+ END_CRIT_SECTION();
+
+ gvs->result->pages_deleted++;
+}
+
+typedef struct DataPageDeleteStack {
+ struct DataPageDeleteStack *child;
+ struct DataPageDeleteStack *parent;
+
+ BlockNumber blkno;
+ bool isRoot;
+} DataPageDeleteStack;
+
+/*
+ * scans posting tree and deletes empty pages
+ */
+static bool
+ginScanToDelete( GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDeleteStack *parent, OffsetNumber myoff ) {
+ DataPageDeleteStack *me;
+ Buffer buffer;
+ Page page;
+ bool meDelete = FALSE;
+
+ if ( isRoot ) {
+ me = parent;
+ } else {
+ if ( ! parent->child ) {
+ me = (DataPageDeleteStack*)palloc0(sizeof(DataPageDeleteStack));
+ me->parent=parent;
+ parent->child = me;
+ me->blkno = InvalidBlockNumber;
+ } else
+ me = parent->child;
+ }
+
+ buffer = ReadBuffer( gvs->index, blkno );
+ page = BufferGetPage( buffer );
+
+ Assert( GinPageIsData(page) );
+
+ if ( !GinPageIsLeaf(page) ) {
+ OffsetNumber i;
+
+ for(i=FirstOffsetNumber;i<=GinPageGetOpaque(page)->maxoff;i++) {
+ PostingItem *pitem = (PostingItem*)GinDataPageGetItem(page, i);
+
+ if ( ginScanToDelete( gvs, PostingItemGetBlockNumber(pitem), FALSE, me, i ) )
+ i--;
+ }
+ }
+
+ if ( GinPageGetOpaque(page)->maxoff < FirstOffsetNumber ) {
+ if ( !( me->blkno == InvalidBlockNumber && GinPageRightMost(page) ) ) {
+ /* we never delete right most branch */
+ Assert( !isRoot );
+ if ( GinPageGetOpaque(page)->maxoff < FirstOffsetNumber ) {
+ ginDeletePage( gvs, blkno, me->blkno, me->parent->blkno, myoff, me->parent->isRoot );
+ meDelete = TRUE;
+ }
+ }
+ }
+
+ ReleaseBuffer( buffer );
+
+ if ( !meDelete )
+ me->blkno = blkno;
+
+ return meDelete;
+}
+
+static void
+ginVacuumPostingTree( GinVacuumState *gvs, BlockNumber rootBlkno ) {
+ Buffer rootBuffer = InvalidBuffer;
+ DataPageDeleteStack root, *ptr, *tmp;
+
+ if ( ginVacuumPostingTreeLeaves(gvs, rootBlkno, TRUE, &rootBuffer)==FALSE ) {
+ Assert( rootBuffer == InvalidBuffer );
+ return;
+ }
+
+ memset(&root,0,sizeof(DataPageDeleteStack));
+ root.blkno = rootBlkno;
+ root.isRoot = TRUE;
+
+ vacuum_delay_point();
+
+ ginScanToDelete( gvs, rootBlkno, TRUE, &root, InvalidOffsetNumber );
+
+ ptr = root.child;
+ while( ptr ) {
+ tmp = ptr->child;
+ pfree( ptr );
+ ptr = tmp;
+ }
+
+ UnlockReleaseBuffer( rootBuffer );
+}
+
+/*
+ * returns modified page or NULL if page isn't modified.
+ * Function works with original page until first change is occured,
+ * then page is copied into temprorary one.
+ */
+static Page
+ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint32 *nroot) {
+ Page origpage = BufferGetPage( buffer ), tmppage;
+ OffsetNumber i, maxoff = PageGetMaxOffsetNumber( origpage );
+
+ tmppage = origpage;
+
+ *nroot=0;
+
+ for(i=FirstOffsetNumber; i<= maxoff; i++) {
+ IndexTuple itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
+
+ if ( GinIsPostingTree(itup) ) {
+ /* store posting tree's roots for further processing,
+ we can't vacuum it just now due to risk of deadlocks with scans/inserts */
+ roots[ *nroot ] = GinItemPointerGetBlockNumber(&itup->t_tid);
+ (*nroot)++;
+ } else if ( GinGetNPosting(itup) > 0 ) {
+ /* if we already create temrorary page, we will make changes in place */
+ ItemPointerData *cleaned = (tmppage==origpage) ? NULL : GinGetPosting(itup );
+ uint32 newN = ginVacuumPostingList( gvs, GinGetPosting(itup), GinGetNPosting(itup), &cleaned );
+
+ if ( GinGetNPosting(itup) != newN ) {
+ bool isnull;
+ Datum value;
+
+ /*
+ * Some ItemPointers was deleted, so we should remake our tuple
+ */
+
+ if ( tmppage==origpage ) {
+ /*
+ * On first difference we create temprorary page in memory
+ * and copies content in to it.
+ */
+ tmppage=GinPageGetCopyPage ( origpage );
+
+ if ( newN > 0 ) {
+ Size pos = ((char*)GinGetPosting(itup)) - ((char*)origpage);
+ memcpy( tmppage+pos, cleaned, sizeof(ItemPointerData)*newN );
+ }
+
+ pfree( cleaned );
+
+ /* set itup pointer to new page */
+ itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
+ }
+
+ value = index_getattr(itup, FirstOffsetNumber, gvs->ginstate.tupdesc, &isnull);
+ itup = GinFormTuple(&gvs->ginstate, value, GinGetPosting(itup), newN);
+ PageIndexTupleDelete(tmppage, i);
+
+ if ( PageAddItem( tmppage, (Item)itup, IndexTupleSize(itup), i, LP_USED ) != i )
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(gvs->index));
+
+ pfree( itup );
+ }
+ }
+ }
+
+ return ( tmppage==origpage ) ? NULL : tmppage;
+}
+
+Datum
+ginbulkdelete(PG_FUNCTION_ARGS) {
+ Relation index = (Relation) PG_GETARG_POINTER(0);
+ IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
+ void *callback_state = (void *) PG_GETARG_POINTER(2);
+ BlockNumber blkno = GIN_ROOT_BLKNO;
+ GinVacuumState gvs;
+ Buffer buffer;
+ BlockNumber rootOfPostingTree[ BLCKSZ/ (sizeof(IndexTupleData)+sizeof(ItemId)) ];
+ uint32 nRoot;
+
+ gvs.index = index;
+ gvs.result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ gvs.callback = callback;
+ gvs.callback_state = callback_state;
+ initGinState(&gvs.ginstate, index);
+
+ buffer = ReadBuffer( index, blkno );
+
+ /* find leaf page */
+ for(;;) {
+ Page page = BufferGetPage( buffer );
+ IndexTuple itup;
+
+ LockBuffer(buffer,GIN_SHARE);
+
+ Assert( !GinPageIsData(page) );
+
+ if ( GinPageIsLeaf(page) ) {
+ LockBuffer(buffer,GIN_UNLOCK);
+ LockBuffer(buffer,GIN_EXCLUSIVE);
+
+ if ( blkno==GIN_ROOT_BLKNO && !GinPageIsLeaf(page) ) {
+ LockBuffer(buffer,GIN_UNLOCK);
+ continue; /* check it one more */
+ }
+ break;
+ }
+
+ Assert( PageGetMaxOffsetNumber(page) >= FirstOffsetNumber );
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
+ blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
+ Assert( blkno!= InvalidBlockNumber );
+
+ LockBuffer(buffer,GIN_UNLOCK);
+ buffer = ReleaseAndReadBuffer( buffer, index, blkno );
+ }
+
+ /* right now we found leftmost page in entry's BTree */
+
+ for(;;) {
+ Page page = BufferGetPage( buffer );
+ Page resPage;
+ uint32 i;
+
+ Assert( !GinPageIsData(page) );
+
+ resPage = ginVacuumEntryPage(&gvs, buffer, rootOfPostingTree, &nRoot);
+
+ blkno = GinPageGetOpaque( page )->rightlink;
+
+ if ( resPage ) {
+ START_CRIT_SECTION();
+ PageRestoreTempPage( resPage, page );
+ xlogVacuumPage(gvs.index, buffer);
+ MarkBufferDirty( buffer );
+ UnlockReleaseBuffer(buffer);
+ END_CRIT_SECTION();
+ } else {
+ UnlockReleaseBuffer(buffer);
+ }
+
+ vacuum_delay_point();
+
+ for(i=0; i<nRoot; i++) {
+ ginVacuumPostingTree( &gvs, rootOfPostingTree[i] );
+ vacuum_delay_point();
+ }
+
+ if ( blkno==InvalidBlockNumber ) /*rightmost page*/
+ break;
+
+ buffer = ReadBuffer( index, blkno );
+ LockBuffer(buffer,GIN_EXCLUSIVE);
+ }
+
+ PG_RETURN_POINTER(gvs.result);
+}
+
+Datum
+ginvacuumcleanup(PG_FUNCTION_ARGS) {
+ Relation index = (Relation) PG_GETARG_POINTER(0);
+ IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
+ bool needLock = !RELATION_IS_LOCAL(index);
+ BlockNumber npages,
+ blkno;
+ BlockNumber nFreePages,
+ *freePages,
+ maxFreePages;
+ BlockNumber lastBlock = GIN_ROOT_BLKNO,
+ lastFilledBlock = GIN_ROOT_BLKNO;
+
+
+ if (info->vacuum_full) {
+ LockRelation(index, AccessExclusiveLock);
+ needLock = false;
+ }
+
+ if (needLock)
+ LockRelationForExtension(index, ExclusiveLock);
+ npages = RelationGetNumberOfBlocks(index);
+ if (needLock)
+ UnlockRelationForExtension(index, ExclusiveLock);
+
+ maxFreePages = npages;
+ if (maxFreePages > MaxFSMPages)
+ maxFreePages = MaxFSMPages;
+
+ nFreePages = 0;
+ freePages = (BlockNumber *) palloc(sizeof(BlockNumber) * maxFreePages);
+
+ for (blkno = GIN_ROOT_BLKNO + 1; blkno < npages; blkno++) {
+ Buffer buffer;
+ Page page;
+
+ vacuum_delay_point();
+
+ buffer = ReadBuffer(index, blkno);
+ LockBuffer(buffer, GIN_SHARE);
+ page = (Page) BufferGetPage(buffer);
+
+ if ( GinPageIsDeleted(page) ) {
+ if (nFreePages < maxFreePages)
+ freePages[nFreePages++] = blkno;
+ } else
+ lastFilledBlock = blkno;
+
+ UnlockReleaseBuffer(buffer);
+ }
+ lastBlock = npages - 1;
+
+ if (info->vacuum_full && nFreePages > 0) {
+ /* try to truncate index */
+ int i;
+ for (i = 0; i < nFreePages; i++)
+ if (freePages[i] >= lastFilledBlock) {
+ nFreePages = i;
+ break;
+ }
+
+ if (lastBlock > lastFilledBlock)
+ RelationTruncate(index, lastFilledBlock + 1);
+
+ stats->pages_removed = lastBlock - lastFilledBlock;
+ }
+
+ RecordIndexFreeSpace(&index->rd_node, nFreePages, freePages);
+ stats->pages_free = nFreePages;
+
+ if (needLock)
+ LockRelationForExtension(index, ExclusiveLock);
+ stats->num_pages = RelationGetNumberOfBlocks(index);
+ if (needLock)
+ UnlockRelationForExtension(index, ExclusiveLock);
+
+ if (info->vacuum_full)
+ UnlockRelation(index, AccessExclusiveLock);
+
+ PG_RETURN_POINTER(stats);
+}
+
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
new file mode 100644
index 00000000000..bc6a458e5fc
--- /dev/null
+++ b/src/backend/access/gin/ginxlog.c
@@ -0,0 +1,544 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginxlog.c
+ * WAL replay logic for inverted index.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+static MemoryContext opCtx; /* working memory for operations */
+static MemoryContext topCtx;
+
+typedef struct ginIncompleteSplit {
+ RelFileNode node;
+ BlockNumber leftBlkno;
+ BlockNumber rightBlkno;
+ BlockNumber rootBlkno;
+} ginIncompleteSplit;
+
+static List *incomplete_splits;
+
+static void
+pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno) {
+ ginIncompleteSplit *split;
+
+ MemoryContextSwitchTo( topCtx );
+
+ split = palloc(sizeof(ginIncompleteSplit));
+
+ split->node = node;
+ split->leftBlkno = leftBlkno;
+ split->rightBlkno = rightBlkno;
+ split->rootBlkno = rootBlkno;
+
+ incomplete_splits = lappend(incomplete_splits, split);
+
+ MemoryContextSwitchTo( opCtx );
+}
+
+static void
+forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno) {
+ ListCell *l;
+
+ foreach(l, incomplete_splits) {
+ ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+
+ if ( RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno ) {
+ incomplete_splits = list_delete_ptr(incomplete_splits, split);
+ break;
+ }
+ }
+}
+
+static void
+ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
+ RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(*node);
+ buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ GinInitBuffer(buffer, GIN_LEAF);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree*)XLogRecGetData(record);
+ ItemPointerData *items = (ItemPointerData*)(XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ GinInitBuffer(buffer, GIN_DATA|GIN_LEAF);
+ memcpy( GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem );
+ GinPageGetOpaque(page)->maxoff = data->nitem;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogInsert *data = (ginxlogInsert*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ /* nothing else to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ if ( data->isData ) {
+ Assert( data->isDelete == FALSE );
+ Assert( GinPageIsData( page ) );
+
+ if ( data->isLeaf ) {
+ OffsetNumber i;
+ ItemPointerData *items = (ItemPointerData*)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ Assert( GinPageIsLeaf( page ) );
+ Assert( data->updateBlkno == InvalidBlockNumber );
+
+ for(i=0;i<data->nitem;i++)
+ GinDataPageAddItem( page, items+i, data->offset + i );
+ } else {
+ PostingItem *pitem;
+
+ Assert( !GinPageIsLeaf( page ) );
+
+ if ( data->updateBlkno != InvalidBlockNumber ) {
+ /* update link to right page after split */
+ pitem = (PostingItem*)GinDataPageGetItem(page, data->offset);
+ PostingItemSetBlockNumber( pitem, data->updateBlkno );
+ }
+
+ pitem = (PostingItem*)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ GinDataPageAddItem( page, pitem, data->offset );
+
+ if ( data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, PostingItemGetBlockNumber( pitem ), data->updateBlkno);
+ }
+ } else {
+ IndexTuple itup;
+
+ Assert( !GinPageIsData( page ) );
+
+ if ( data->updateBlkno != InvalidBlockNumber ) {
+ /* update link to right page after split */
+ Assert( !GinPageIsLeaf( page ) );
+ Assert( data->offset>=FirstOffsetNumber && data->offset<=PageGetMaxOffsetNumber(page) );
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
+ ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
+ }
+
+ if ( data->isDelete ) {
+ Assert( GinPageIsLeaf( page ) );
+ Assert( data->offset>=FirstOffsetNumber && data->offset<=PageGetMaxOffsetNumber(page) );
+ PageIndexTupleDelete(page, data->offset);
+ }
+
+ itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), data->offset, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+
+ if ( !data->isLeaf && data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber( &itup->t_tid ), data->updateBlkno);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogSplit *data = (ginxlogSplit*)XLogRecGetData(record);
+ Relation reln;
+ Buffer lbuffer, rbuffer;
+ Page lpage, rpage;
+ uint32 flags = 0;
+
+ reln = XLogOpenRelation(data->node);
+
+ if ( data->isLeaf )
+ flags |= GIN_LEAF;
+ if ( data->isData )
+ flags |= GIN_DATA;
+
+ lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+ Assert(BufferIsValid(lbuffer));
+ lpage = (Page) BufferGetPage(lbuffer);
+ GinInitBuffer(lbuffer, flags);
+
+ rbuffer = XLogReadBuffer(reln, data->rblkno, true);
+ Assert(BufferIsValid(rbuffer));
+ rpage = (Page) BufferGetPage(rbuffer);
+ GinInitBuffer(rbuffer, flags);
+
+ GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber( rbuffer );
+ GinPageGetOpaque(rpage)->rightlink = data->rrlink;
+
+ if ( data->isData ) {
+ char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
+ Size sizeofitem = GinSizeOfItem(lpage);
+ OffsetNumber i;
+ ItemPointer bound;
+
+ for(i=0;i<data->separator;i++) {
+ GinDataPageAddItem( lpage, ptr, InvalidOffsetNumber );
+ ptr += sizeofitem;
+ }
+
+ for(i=data->separator;i<data->nitem;i++) {
+ GinDataPageAddItem( rpage, ptr, InvalidOffsetNumber );
+ ptr += sizeofitem;
+ }
+
+ /* set up right key */
+ bound = GinDataPageGetRightBound(lpage);
+ if ( data->isLeaf )
+ *bound = *(ItemPointerData*)GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
+ else
+ *bound = ((PostingItem*)GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
+
+ bound = GinDataPageGetRightBound(rpage);
+ *bound = data->rightbound;
+ } else {
+ IndexTuple itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogSplit) );
+ OffsetNumber i;
+
+ for(i=0;i<data->separator;i++) {
+ if ( PageAddItem( lpage, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+
+ for(i=data->separator;i<data->nitem;i++) {
+ if ( PageAddItem( rpage, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+ }
+
+ PageSetLSN(rpage, lsn);
+ PageSetTLI(lpage, ThisTimeLineID);
+ MarkBufferDirty(rbuffer);
+
+ PageSetLSN(lpage, lsn);
+ PageSetTLI(lpage, ThisTimeLineID);
+ MarkBufferDirty(lbuffer);
+
+ if ( !data->isLeaf && data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
+
+ if ( data->isRootSplit ) {
+ Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+ Page rootPage = BufferGetPage( rootBuf );
+
+ GinInitBuffer( rootBuf, flags & ~GIN_LEAF );
+
+ if ( data->isData ) {
+ Assert( data->rootBlkno != GIN_ROOT_BLKNO );
+ dataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ } else {
+ Assert( data->rootBlkno == GIN_ROOT_BLKNO );
+ entryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ }
+
+ PageSetLSN(rootPage, lsn);
+ PageSetTLI(rootPage, ThisTimeLineID);
+
+ MarkBufferDirty(rootBuf);
+ UnlockReleaseBuffer(rootBuf);
+ } else
+ pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
+
+ UnlockReleaseBuffer(rbuffer);
+ UnlockReleaseBuffer(lbuffer);
+}
+
+static void
+ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogVacuumPage *data = (ginxlogVacuumPage*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ /* nothing else to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ if ( GinPageIsData( page ) ) {
+ memcpy( GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+ GinSizeOfItem(page) * data->nitem );
+ GinPageGetOpaque(page)->maxoff = data->nitem;
+ } else {
+ OffsetNumber i, *tod;
+ IndexTuple itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogVacuumPage) );
+
+ tod = (OffsetNumber*)palloc( sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page) );
+ for(i=FirstOffsetNumber;i<=PageGetMaxOffsetNumber(page);i++)
+ tod[i-1] = i;
+
+ PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+
+ for(i=0;i<data->nitem;i++) {
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogDeletePage *data = (ginxlogDeletePage*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(data->node);
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_1) ) {
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_2) ) {
+ buffer = XLogReadBuffer(reln, data->parentBlkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ Assert(!GinPageIsLeaf(page));
+ PageDeletePostingItem(page, data->parentOffset);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_2) && data->leftBlkno != InvalidBlockNumber ) {
+ buffer = XLogReadBuffer(reln, data->leftBlkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->rightlink = data->rightLink;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+}
+
+void
+gin_redo(XLogRecPtr lsn, XLogRecord *record) {
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ topCtx = MemoryContextSwitchTo(opCtx);
+ switch (info) {
+ case XLOG_GIN_CREATE_INDEX:
+ ginRedoCreateIndex(lsn, record);
+ break;
+ case XLOG_GIN_CREATE_PTREE:
+ ginRedoCreatePTree(lsn, record);
+ break;
+ case XLOG_GIN_INSERT:
+ ginRedoInsert(lsn, record);
+ break;
+ case XLOG_GIN_SPLIT:
+ ginRedoSplit(lsn, record);
+ break;
+ case XLOG_GIN_VACUUM_PAGE:
+ ginRedoVacuumPage(lsn, record);
+ break;
+ case XLOG_GIN_DELETE_PAGE:
+ ginRedoDeletePage(lsn, record);
+ break;
+ default:
+ elog(PANIC, "gin_redo: unknown op code %u", info);
+ }
+ MemoryContextSwitchTo(topCtx);
+ MemoryContextReset(opCtx);
+}
+
+static void
+desc_node( StringInfo buf, RelFileNode node, BlockNumber blkno ) {
+ appendStringInfo(buf,"node: %u/%u/%u blkno: %u",
+ node.spcNode, node.dbNode, node.relNode, blkno);
+}
+
+void
+gin_desc(StringInfo buf, uint8 xl_info, char *rec) {
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ switch (info) {
+ case XLOG_GIN_CREATE_INDEX:
+ appendStringInfo(buf,"Create index, ");
+ desc_node(buf, *(RelFileNode*)rec, GIN_ROOT_BLKNO );
+ break;
+ case XLOG_GIN_CREATE_PTREE:
+ appendStringInfo(buf,"Create posting tree, ");
+ desc_node(buf, ((ginxlogCreatePostingTree*)rec)->node, ((ginxlogCreatePostingTree*)rec)->blkno );
+ break;
+ case XLOG_GIN_INSERT:
+ appendStringInfo(buf,"Insert item, ");
+ desc_node(buf, ((ginxlogInsert*)rec)->node, ((ginxlogInsert*)rec)->blkno );
+ appendStringInfo(buf," offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
+ ((ginxlogInsert*)rec)->offset,
+ ((ginxlogInsert*)rec)->nitem,
+ ( ((ginxlogInsert*)rec)->isData ) ? 'T' : 'F',
+ ( ((ginxlogInsert*)rec)->isLeaf ) ? 'T' : 'F',
+ ( ((ginxlogInsert*)rec)->isDelete ) ? 'T' : 'F',
+ ((ginxlogInsert*)rec)->updateBlkno
+ );
+
+ break;
+ case XLOG_GIN_SPLIT:
+ appendStringInfo(buf,"Page split, ");
+ desc_node(buf, ((ginxlogSplit*)rec)->node, ((ginxlogSplit*)rec)->lblkno );
+ appendStringInfo(buf," isrootsplit: %c", ( ((ginxlogSplit*)rec)->isRootSplit ) ? 'T' : 'F');
+ break;
+ case XLOG_GIN_VACUUM_PAGE:
+ appendStringInfo(buf,"Vacuum page, ");
+ desc_node(buf, ((ginxlogVacuumPage*)rec)->node, ((ginxlogVacuumPage*)rec)->blkno );
+ break;
+ case XLOG_GIN_DELETE_PAGE:
+ appendStringInfo(buf,"Delete page, ");
+ desc_node(buf, ((ginxlogDeletePage*)rec)->node, ((ginxlogDeletePage*)rec)->blkno );
+ break;
+ default:
+ elog(PANIC, "gin_desc: unknown op code %u", info);
+ }
+}
+
+void
+gin_xlog_startup(void) {
+ incomplete_splits = NIL;
+
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "GIN recovery temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+}
+
+static void
+ginContinueSplit( ginIncompleteSplit *split ) {
+ GinBtreeData btree;
+ Relation reln;
+ Buffer buffer;
+ GinBtreeStack stack;
+
+ /* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, split->leftBlkno, split->rightBlkno); */
+ reln = XLogOpenRelation(split->node);
+
+ buffer = XLogReadBuffer(reln, split->leftBlkno, false);
+
+ if ( split->rootBlkno == GIN_ROOT_BLKNO ) {
+ prepareEntryScan( &btree, reln, (Datum)0, NULL );
+ btree.entry = ginPageGetLinkItup( buffer );
+ } else {
+ Page page = BufferGetPage( buffer );
+
+ prepareDataScan( &btree, reln );
+
+ PostingItemSetBlockNumber( &(btree.pitem), split->leftBlkno );
+ if ( GinPageIsLeaf(page) )
+ btree.pitem.key = *(ItemPointerData*)GinDataPageGetItem(page,
+ GinPageGetOpaque(page)->maxoff);
+ else
+ btree.pitem.key = ((PostingItem*)GinDataPageGetItem(page,
+ GinPageGetOpaque(page)->maxoff))->key;
+ }
+
+ btree.rightblkno = split->rightBlkno;
+
+ stack.blkno = split->leftBlkno;
+ stack.buffer = buffer;
+ stack.off = InvalidOffsetNumber;
+ stack.parent = NULL;
+
+ findParents( &btree, &stack, split->rootBlkno);
+ ginInsertValue( &btree, stack.parent );
+
+ UnlockReleaseBuffer( buffer );
+}
+
+void
+gin_xlog_cleanup(void) {
+ ListCell *l;
+ MemoryContext topCtx;
+
+ topCtx = MemoryContextSwitchTo(opCtx);
+
+ foreach(l, incomplete_splits) {
+ ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+ ginContinueSplit( split );
+ MemoryContextReset( opCtx );
+ }
+
+ MemoryContextSwitchTo(topCtx);
+ MemoryContextDelete(opCtx);
+}
+
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f7b48df1263..f65e9374c5d 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -3,7 +3,7 @@
*
* Resource managers definition
*
- * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.21 2005/11/07 17:36:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.22 2006/05/02 11:28:54 teodor Exp $
*/
#include "postgres.h"
@@ -13,6 +13,7 @@
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/nbtree.h"
+#include "access/gin.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "commands/dbcommands.h"
@@ -35,7 +36,7 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Heap", heap_redo, heap_desc, NULL, NULL},
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
{"Hash", hash_redo, hash_desc, NULL, NULL},
- {"Reserved 13", NULL, NULL, NULL, NULL},
+ {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup},
{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
{"Sequence", seq_redo, seq_desc, NULL, NULL}
};
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index ce85d077a34..18845eecea2 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.144 2006/03/05 15:58:23 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.145 2006/05/02 11:28:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -376,6 +376,13 @@ check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
RelationGetRelationName(OldIndex))));
}
+ if (!OldIndex->rd_am->amclusterable)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot cluster on index \"%s\" because access method does not clusterable",
+ RelationGetRelationName(OldIndex))));
+
+
/*
* Disallow clustering system relations. This will definitely NOT work
* for shared relations (we have no way to update pg_class rows in other
diff --git a/src/backend/commands/opclasscmds.c b/src/backend/commands/opclasscmds.c
index d66c403e515..c493cc8e0be 100644
--- a/src/backend/commands/opclasscmds.c
+++ b/src/backend/commands/opclasscmds.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/opclasscmds.c,v 1.43 2006/03/14 22:48:18 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/opclasscmds.c,v 1.44 2006/05/02 11:28:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -273,11 +273,11 @@ DefineOpClass(CreateOpClassStmt *stmt)
else
{
/*
- * Currently, only GiST allows storagetype different from
+ * Currently, only GiST and GIN allows storagetype different from
* datatype. This hardcoded test should be eliminated in favor of
* adding another boolean column to pg_am ...
*/
- if (amoid != GIST_AM_OID)
+ if (!(amoid == GIST_AM_OID || amoid == GIN_AM_OID))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("storage type may not be different from data type for access method \"%s\"",
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index c6ebdd5770b..ececa0c0732 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.326 2006/03/31 23:32:06 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.327 2006/05/02 11:28:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -2982,7 +2982,16 @@ scan_index(Relation indrel, double num_tuples)
/*
* Check for tuple count mismatch. If the index is partial, then it's OK
* for it to have fewer tuples than the heap; else we got trouble.
+ *
+ * XXX Hack. Since GIN stores every pointer to heap several times and
+ * counting num_index_tuples during vacuum is very comlpex and slow
+ * we just copy num_tuples to num_index_tuples as upper limit to avoid
+ * WARNING and optimizer mistakes.
*/
+ if ( indrel->rd_rel->relam == GIN_AM_OID )
+ {
+ stats->num_index_tuples = num_tuples;
+ } else
if (stats->num_index_tuples != num_tuples)
{
if (stats->num_index_tuples > num_tuples ||
@@ -3052,7 +3061,16 @@ vacuum_index(VacPageList vacpagelist, Relation indrel,
/*
* Check for tuple count mismatch. If the index is partial, then it's OK
* for it to have fewer tuples than the heap; else we got trouble.
+ *
+ * XXX Hack. Since GIN stores every pointer to heap several times and
+ * counting num_index_tuples during vacuum is very comlpex and slow
+ * we just copy num_tuples to num_index_tuples as upper limit to avoid
+ * WARNING and optimizer mistakes.
*/
+ if ( indrel->rd_rel->relam == GIN_AM_OID )
+ {
+ stats->num_index_tuples = num_tuples;
+ } else
if (stats->num_index_tuples != num_tuples + keep_tuples)
{
if (stats->num_index_tuples > num_tuples + keep_tuples ||
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 5f830ef0cd3..5aeadce0917 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/adt/selfuncs.c,v 1.204 2006/05/02 04:34:18 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/adt/selfuncs.c,v 1.205 2006/05/02 11:28:55 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -4829,3 +4829,21 @@ gistcostestimate(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+Datum
+gincostestimate(PG_FUNCTION_ARGS)
+{
+ PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0);
+ IndexOptInfo *index = (IndexOptInfo *) PG_GETARG_POINTER(1);
+ List *indexQuals = (List *) PG_GETARG_POINTER(2);
+ Cost *indexStartupCost = (Cost *) PG_GETARG_POINTER(3);
+ Cost *indexTotalCost = (Cost *) PG_GETARG_POINTER(4);
+ Selectivity *indexSelectivity = (Selectivity *) PG_GETARG_POINTER(5);
+ double *indexCorrelation = (double *) PG_GETARG_POINTER(6);
+
+ genericcostestimate(root, index, indexQuals, 0.0,
+ indexStartupCost, indexTotalCost,
+ indexSelectivity, indexCorrelation);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 27f31c40fb1..1295288b89c 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.97 2006/03/05 15:58:46 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.98 2006/05/02 11:28:55 teodor Exp $
*
* NOTES
* Globals used all over the place should be declared here and not
@@ -107,3 +107,5 @@ int VacuumCostDelay = 0;
int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
+
+int GinFuzzySearchLimit = 0;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 465e77c4885..c2c1318b4cb 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -10,7 +10,7 @@
* Written by Peter Eisentraut <peter_e@gmx.net>.
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.317 2006/04/25 14:11:56 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.318 2006/05/02 11:28:55 teodor Exp $
*
*--------------------------------------------------------------------
*/
@@ -64,7 +64,7 @@
#include "utils/memutils.h"
#include "utils/pg_locale.h"
#include "pgstat.h"
-
+#include "access/gin.h"
#ifndef PG_KRB_SRVTAB
#define PG_KRB_SRVTAB ""
@@ -1572,6 +1572,16 @@ static struct config_int ConfigureNamesInt[] =
0, 0, INT_MAX, assign_tcp_keepalives_count, show_tcp_keepalives_count
},
+ {
+ {"gin_fuzzy_search_limit", PGC_USERSET, UNGROUPED,
+ gettext_noop("Sets the maximum allowed result for exact search by gin."),
+ NULL,
+ 0
+ },
+ &GinFuzzySearchLimit,
+ 0, 0, INT_MAX, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL