aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/Makefile2
-rw-r--r--src/backend/access/brin/Makefile18
-rw-r--r--src/backend/access/brin/README189
-rw-r--r--src/backend/access/brin/brin.c1228
-rw-r--r--src/backend/access/brin/brin_minmax.c341
-rw-r--r--src/backend/access/brin/brin_pageops.c723
-rw-r--r--src/backend/access/brin/brin_revmap.c510
-rw-r--r--src/backend/access/brin/brin_tuple.c554
-rw-r--r--src/backend/access/brin/brin_xlog.c291
-rw-r--r--src/backend/access/common/reloptions.c7
-rw-r--r--src/backend/access/heap/heapam.c22
-rw-r--r--src/backend/access/rmgrdesc/Makefile3
-rw-r--r--src/backend/access/rmgrdesc/brindesc.c112
-rw-r--r--src/backend/access/transam/rmgr.c1
-rw-r--r--src/backend/catalog/index.c24
-rw-r--r--src/backend/replication/logical/decode.c1
-rw-r--r--src/backend/storage/page/bufpage.c179
-rw-r--r--src/backend/utils/adt/selfuncs.c74
18 files changed, 4270 insertions, 9 deletions
diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile
index c32088f81df..21721b48f04 100644
--- a/src/backend/access/Makefile
+++ b/src/backend/access/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/access
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS = common gin gist hash heap index nbtree rmgrdesc spgist transam
+SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist transam
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/brin/Makefile b/src/backend/access/brin/Makefile
new file mode 100644
index 00000000000..ac44fcdee39
--- /dev/null
+++ b/src/backend/access/brin/Makefile
@@ -0,0 +1,18 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/brin
+#
+# IDENTIFICATION
+# src/backend/access/brin/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/brin
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = brin.o brin_pageops.o brin_revmap.o brin_tuple.o brin_xlog.o \
+ brin_minmax.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/brin/README b/src/backend/access/brin/README
new file mode 100644
index 00000000000..2619be8db56
--- /dev/null
+++ b/src/backend/access/brin/README
@@ -0,0 +1,189 @@
+Block Range Indexes (BRIN)
+==========================
+
+BRIN indexes intend to enable very fast scanning of extremely large tables.
+
+The essential idea of a BRIN index is to keep track of summarizing values in
+consecutive groups of heap pages (page ranges); for example, the minimum and
+maximum values for datatypes with a btree opclass, or the bounding box for
+geometric types. These values can be used to avoid scanning such pages
+during a table scan, depending on query quals.
+
+The cost of this is having to update the stored summary values of each page
+range as tuples are inserted into them.
+
+
+Access Method Design
+--------------------
+
+Since item pointers are not stored inside indexes of this type, it is not
+possible to support the amgettuple interface. Instead, we only provide
+amgetbitmap support. The amgetbitmap routine returns a lossy TIDBitmap
+comprising all pages in those page ranges that match the query
+qualifications. The recheck step in the BitmapHeapScan node prunes tuples
+that are not visible according to the query qualifications.
+
+An operator class must have the following entries:
+
+- generic support procedures (pg_amproc), identical to all opclasses:
+ * "opcinfo" (BRIN_PROCNUM_OPCINFO) initializes a structure for index
+ creation or scanning
+ * "addValue" (BRIN_PROCNUM_ADDVALUE) takes an index tuple and a heap item,
+ and possibly changes the index tuple so that it includes the heap item
+ values
+ * "consistent" (BRIN_PROCNUM_CONSISTENT) takes an index tuple and query
+ quals, and returns whether the index tuple values match the query quals.
+ * "union" (BRIN_PROCNUM_UNION) takes two index tuples and modifies the first
+ one so that it represents the union of the two.
+Procedure numbers up to 10 are reserved for future expansion.
+
+Additionally, each opclass needs additional support functions:
+- Minmax-style operator classes:
+ * Proc numbers 11-14 are used for the functions implementing inequality
+ operators for the type, in this order: less than, less or equal,
+ greater or equal, greater than.
+
+Opclasses using a different design will require different additional procedure
+numbers.
+
+Operator classes also need to have operator (pg_amop) entries so that the
+optimizer can choose the index to execute queries.
+- Minmax-style operator classes:
+ * The same operators as btree (<=, <, =, >=, >)
+
+Each index tuple stores some NULL bits and some opclass-specified values, which
+are stored in a single null bitmask of length twice the number of columns. The
+generic NULL bits indicate, for each column:
+ * bt_hasnulls: Whether there's any NULL value at all in the page range
+ * bt_allnulls: Whether all values are NULLs in the page range
+
+The opclass-specified values are:
+- Minmax-style operator classes
+ * minimum value across all tuples in the range
+ * maximum value across all tuples in the range
+
+Note that the addValue and Union support procedures must be careful to
+datumCopy() the values they want to store in the in-memory BRIN tuple, and
+must pfree() the old copies when replacing older ones. Since some values
+referenced from the tuple persist and others go away, there is no
+well-defined lifetime for a memory context that would make this automatic.
+
+
+The Range Map
+-------------
+
+To find the index tuple for a particular page range, we have an internal
+structure we call the range map, or "revmap" for short. This stores one TID
+per page range, which is the address of the index tuple summarizing that
+range. Since the map entries are fixed size, it is possible to compute the
+address of the range map entry for any given heap page by simple arithmetic.
+
+When a new heap tuple is inserted in a summarized page range, we compare the
+existing index tuple with the new heap tuple. If the heap tuple is outside
+the summarization data given by the index tuple for any indexed column (or
+if the new heap tuple contains null values but the index tuple indicates
+there are no nulls), the index is updated with the new values. In many
+cases it is possible to update the index tuple in-place, but if the new
+index tuple is larger than the old one and there's not enough space in the
+page, it is necessary to create a new index tuple with the new values. The
+range map can be updated quickly to point to it; the old index tuple is
+removed.
+
+If the range map points to an invalid TID, the corresponding page range is
+considered to be not summarized. When tuples are added to unsummarized
+pages, nothing needs to happen.
+
+To scan a table following a BRIN index, we scan the range map sequentially.
+This yields index tuples in ascending page range order. Query quals are
+matched to each index tuple; if they match, each page within the page range
+is returned as part of the output TID bitmap. If there's no match, they are
+skipped. Range map entries returning invalid index TIDs, that is
+unsummarized page ranges, are also returned in the TID bitmap.
+
+The revmap is stored in the first few blocks of the index main fork,
+immediately following the metapage. Whenever the revmap needs to be
+extended by another page, existing tuples in that page are moved to some
+other page.
+
+Heap tuples can be removed from anywhere without restriction. It might be
+useful to mark the corresponding index tuple somehow, if the heap tuple is
+one of the constraining values of the summary data (i.e. either min or max
+in the case of a btree-opclass-bearing datatype), so that in the future we
+are aware of the need to re-execute summarization on that range, leading to
+a possible tightening of the summary values.
+
+Summarization
+-------------
+
+At index creation time, the whole table is scanned; for each page range the
+summarizing values of each indexed column and nulls bitmap are collected and
+stored in the index. The partially-filled page range at the end of the
+table is also summarized.
+
+As new tuples get inserted at the end of the table, they may update the
+index tuple that summarizes the partial page range at the end. Eventually
+that page range is complete and new tuples belong in a new page range that
+hasn't yet been summarized. Those insertions do not create a new index
+entry; instead, the page range remains unsummarized until later.
+
+Wehn VACUUM is run on the table, all unsummarized page ranges are
+summarized. This action can also be invoked by the user via
+brin_summarize_new_values(). Both these procedures scan all the
+unsummarized ranges, and create a summary tuple. Again, this includes the
+partially-filled page range at the end of the table.
+
+Vacuuming
+---------
+
+Since no heap TIDs are stored in a BRIN index, it's not necessary to scan the
+index when heap tuples are removed. It might be that some summary values can
+be tightened if heap tuples have been deleted; but this would represent an
+optimization opportunity only, not a correctness issue. It's simpler to
+represent this as the need to re-run summarization on the affected page range
+rather than "subtracting" values from the existing one. This is not
+currently implemented.
+
+Note that if there are no indexes on the table other than the BRIN index,
+usage of maintenance_work_mem by vacuum can be decreased significantly, because
+no detailed index scan needs to take place (and thus it's not necessary for
+vacuum to save TIDs to remove). It's unlikely that BRIN would be the only
+indexes in a table, though, because primary keys can be btrees only, and so
+we don't implement this optimization.
+
+
+Optimizer
+---------
+
+The optimizer selects the index based on the operator class' pg_amop
+entries for the column.
+
+
+Future improvements
+-------------------
+
+* Different-size page ranges?
+ In the current design, each "index entry" in a BRIN index covers the same
+ number of pages. There's no hard reason for this; it might make sense to
+ allow the index to self-tune so that some index entries cover smaller page
+ ranges, if this allows the summary values to be more compact. This would incur
+ larger BRIN overhead for the index itself, but might allow better pruning of
+ page ranges during scan. In the limit of one index tuple per page, the index
+ itself would occupy too much space, even though we would be able to skip
+ reading the most heap pages, because the summary values are tight; in the
+ opposite limit of a single tuple that summarizes the whole table, we wouldn't
+ be able to prune anything even though the index is very small. This can
+ probably be made to work by using the range map as an index in itself.
+
+* More compact representation for TIDBitmap?
+ TIDBitmap is the structure used to represent bitmap scans. The
+ representation of lossy page ranges is not optimal for our purposes, because
+ it uses a Bitmapset to represent pages in the range; since we're going to return
+ all pages in a large range, it might be more convenient to allow for a
+ struct that uses start and end page numbers to represent the range, instead.
+
+* Better vacuuming?
+ It might be useful to enable passing more useful info to BRIN indexes during
+ vacuuming about tuples that are deleted, i.e. do not require the callback to
+ pass each tuple's TID. For instance we might need a callback that passes a
+ block number instead of a TID. That would help determine when to re-run
+ summarization on blocks that have seen lots of tuple deletions.
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
new file mode 100644
index 00000000000..76cc36c3469
--- /dev/null
+++ b/src/backend/access/brin/brin.c
@@ -0,0 +1,1228 @@
+/*
+ * brin.c
+ * Implementation of BRIN indexes for Postgres
+ *
+ * See src/backend/access/brin/README for details.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin.c
+ *
+ * TODO
+ * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
+ */
+#include "postgres.h"
+
+#include "access/brin.h"
+#include "access/brin_internal.h"
+#include "access/brin_page.h"
+#include "access/brin_pageops.h"
+#include "access/brin_xlog.h"
+#include "access/reloptions.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+
+
+/*
+ * We use a BrinBuildState during initial construction of a BRIN index.
+ * The running state is kept in a BrinMemTuple.
+ */
+typedef struct BrinBuildState
+{
+ Relation bs_irel;
+ int bs_numtuples;
+ Buffer bs_currentInsertBuf;
+ BlockNumber bs_pagesPerRange;
+ BlockNumber bs_currRangeStart;
+ BrinRevmap *bs_rmAccess;
+ BrinDesc *bs_bdesc;
+ BrinMemTuple *bs_dtuple;
+} BrinBuildState;
+
+/*
+ * Struct used as "opaque" during index scans
+ */
+typedef struct BrinOpaque
+{
+ BlockNumber bo_pagesPerRange;
+ BrinRevmap *bo_rmAccess;
+ BrinDesc *bo_bdesc;
+} BrinOpaque;
+
+PG_FUNCTION_INFO_V1(brin_summarize_new_values);
+
+static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
+ BrinRevmap *revmap, BlockNumber pagesPerRange);
+static void terminate_brin_buildstate(BrinBuildState *state);
+static void brinsummarize(Relation index, Relation heapRel,
+ double *numSummarized, double *numExisting);
+static void form_and_insert_tuple(BrinBuildState *state);
+static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
+ BrinTuple *b);
+
+
+/*
+ * A tuple in the heap is being inserted. To keep a brin index up to date,
+ * we need to obtain the relevant index tuple and compare its stored values
+ * with those of the new tuple. If the tuple values are not consistent with
+ * the summary tuple, we need to update the index tuple.
+ *
+ * If the range is not currently summarized (i.e. the revmap returns NULL for
+ * it), there's nothing to do.
+ */
+Datum
+brininsert(PG_FUNCTION_ARGS)
+{
+ Relation idxRel = (Relation) PG_GETARG_POINTER(0);
+ Datum *values = (Datum *) PG_GETARG_POINTER(1);
+ bool *nulls = (bool *) PG_GETARG_POINTER(2);
+ ItemPointer heaptid = (ItemPointer) PG_GETARG_POINTER(3);
+
+ /* we ignore the rest of our arguments */
+ BlockNumber pagesPerRange;
+ BrinDesc *bdesc = NULL;
+ BrinRevmap *revmap;
+ Buffer buf = InvalidBuffer;
+ MemoryContext tupcxt = NULL;
+ MemoryContext oldcxt = NULL;
+
+ revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
+
+ for (;;)
+ {
+ bool need_insert = false;
+ OffsetNumber off;
+ BrinTuple *brtup;
+ BrinMemTuple *dtup;
+ BlockNumber heapBlk;
+ int keyno;
+ BrinTuple *tmptup PG_USED_FOR_ASSERTS_ONLY;
+ BrinMemTuple *tmpdtup PG_USED_FOR_ASSERTS_ONLY;
+ Size tmpsiz PG_USED_FOR_ASSERTS_ONLY;
+
+ CHECK_FOR_INTERRUPTS();
+
+ heapBlk = ItemPointerGetBlockNumber(heaptid);
+ /* normalize the block number to be the first block in the range */
+ heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
+ brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
+ BUFFER_LOCK_SHARE);
+
+ /* if range is unsummarized, there's nothing to do */
+ if (!brtup)
+ break;
+
+ /* First time through? */
+ if (bdesc == NULL)
+ {
+ bdesc = brin_build_desc(idxRel);
+ tupcxt = AllocSetContextCreate(CurrentMemoryContext,
+ "brininsert cxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tupcxt);
+ }
+
+ dtup = brin_deform_tuple(bdesc, brtup);
+
+#ifdef USE_ASSERT_CHECKING
+ {
+ /*
+ * When assertions are enabled, we use this as an opportunity to
+ * test the "union" method, which would otherwise be used very
+ * rarely: first create a placeholder tuple, and addValue the
+ * value we just got into it. Then union the existing index tuple
+ * with the updated placeholder tuple. The tuple resulting from
+ * that union should be identical to the one resulting from the
+ * regular operation (straight addValue) below.
+ *
+ * Here we create the tuple to compare with; the actual comparison
+ * is below.
+ */
+ tmptup = brin_form_placeholder_tuple(bdesc, heapBlk, &tmpsiz);
+ tmpdtup = brin_deform_tuple(bdesc, tmptup);
+ for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
+ {
+ BrinValues *bval;
+ FmgrInfo *addValue;
+
+ bval = &tmpdtup->bt_columns[keyno];
+ addValue = index_getprocinfo(idxRel, keyno + 1,
+ BRIN_PROCNUM_ADDVALUE);
+ FunctionCall4Coll(addValue,
+ idxRel->rd_indcollation[keyno],
+ PointerGetDatum(bdesc),
+ PointerGetDatum(bval),
+ values[keyno],
+ nulls[keyno]);
+ }
+
+ union_tuples(bdesc, tmpdtup, brtup);
+
+ tmpdtup->bt_placeholder = dtup->bt_placeholder;
+ tmptup = brin_form_tuple(bdesc, heapBlk, tmpdtup, &tmpsiz);
+ }
+#endif
+
+ /*
+ * Compare the key values of the new tuple to the stored index values;
+ * our deformed tuple will get updated if the new tuple doesn't fit
+ * the original range (note this means we can't break out of the loop
+ * early). Make a note of whether this happens, so that we know to
+ * insert the modified tuple later.
+ */
+ for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
+ {
+ Datum result;
+ BrinValues *bval;
+ FmgrInfo *addValue;
+
+ bval = &dtup->bt_columns[keyno];
+ addValue = index_getprocinfo(idxRel, keyno + 1,
+ BRIN_PROCNUM_ADDVALUE);
+ result = FunctionCall4Coll(addValue,
+ idxRel->rd_indcollation[keyno],
+ PointerGetDatum(bdesc),
+ PointerGetDatum(bval),
+ values[keyno],
+ nulls[keyno]);
+ /* if that returned true, we need to insert the updated tuple */
+ need_insert |= DatumGetBool(result);
+ }
+
+#ifdef USE_ASSERT_CHECKING
+ {
+ /*
+ * Now we can compare the tuple produced by the union function
+ * with the one from plain addValue.
+ */
+ BrinTuple *cmptup;
+ Size cmpsz;
+
+ cmptup = brin_form_tuple(bdesc, heapBlk, dtup, &cmpsz);
+ Assert(brin_tuples_equal(tmptup, tmpsiz, cmptup, cmpsz));
+ }
+#endif
+
+ if (!need_insert)
+ {
+ /*
+ * The tuple is consistent with the new values, so there's nothing
+ * to do.
+ */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ }
+ else
+ {
+ Page page = BufferGetPage(buf);
+ ItemId lp = PageGetItemId(page, off);
+ Size origsz;
+ BrinTuple *origtup;
+ Size newsz;
+ BrinTuple *newtup;
+ bool samepage;
+
+ /*
+ * Make a copy of the old tuple, so that we can compare it after
+ * re-acquiring the lock.
+ */
+ origsz = ItemIdGetLength(lp);
+ origtup = brin_copy_tuple(brtup, origsz);
+
+ /*
+ * Before releasing the lock, check if we can attempt a same-page
+ * update. Another process could insert a tuple concurrently in
+ * the same page though, so downstream we must be prepared to cope
+ * if this turns out to not be possible after all.
+ */
+ samepage = brin_can_do_samepage_update(buf, origsz, newsz);
+
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
+
+ /*
+ * Try to update the tuple. If this doesn't work for whatever
+ * reason, we need to restart from the top; the revmap might be
+ * pointing at a different tuple for this block now, so we need to
+ * recompute to ensure both our new heap tuple and the other
+ * inserter's are covered by the combined tuple. It might be that
+ * we don't need to update at all.
+ */
+ if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
+ buf, off, origtup, origsz, newtup, newsz,
+ samepage))
+ {
+ /* no luck; start over */
+ MemoryContextResetAndDeleteChildren(tupcxt);
+ continue;
+ }
+ }
+
+ /* success! */
+ break;
+ }
+
+ brinRevmapTerminate(revmap);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
+ if (bdesc != NULL)
+ {
+ brin_free_desc(bdesc);
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(tupcxt);
+ }
+
+ return BoolGetDatum(false);
+}
+
+/*
+ * Initialize state for a BRIN index scan.
+ *
+ * We read the metapage here to determine the pages-per-range number that this
+ * index was built with. Note that since this cannot be changed while we're
+ * holding lock on index, it's not necessary to recompute it during brinrescan.
+ */
+Datum
+brinbeginscan(PG_FUNCTION_ARGS)
+{
+ Relation r = (Relation) PG_GETARG_POINTER(0);
+ int nkeys = PG_GETARG_INT32(1);
+ int norderbys = PG_GETARG_INT32(2);
+ IndexScanDesc scan;
+ BrinOpaque *opaque;
+
+ scan = RelationGetIndexScan(r, nkeys, norderbys);
+
+ opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
+ opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
+ opaque->bo_bdesc = brin_build_desc(r);
+ scan->opaque = opaque;
+
+ PG_RETURN_POINTER(scan);
+}
+
+/*
+ * Execute the index scan.
+ *
+ * This works by reading index TIDs from the revmap, and obtaining the index
+ * tuples pointed to by them; the summary values in the index tuples are
+ * compared to the scan keys. We return into the TID bitmap all the pages in
+ * ranges corresponding to index tuples that match the scan keys.
+ *
+ * If a TID from the revmap is read as InvalidTID, we know that range is
+ * unsummarized. Pages in those ranges need to be returned regardless of scan
+ * keys.
+ *
+ * XXX see _bt_first on what to do about sk_subtype.
+ */
+Datum
+bringetbitmap(PG_FUNCTION_ARGS)
+{
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
+ Relation idxRel = scan->indexRelation;
+ Buffer buf = InvalidBuffer;
+ BrinDesc *bdesc;
+ Oid heapOid;
+ Relation heapRel;
+ BrinOpaque *opaque;
+ BlockNumber nblocks;
+ BlockNumber heapBlk;
+ int totalpages = 0;
+ int keyno;
+ FmgrInfo *consistentFn;
+ MemoryContext oldcxt;
+ MemoryContext perRangeCxt;
+
+ opaque = (BrinOpaque *) scan->opaque;
+ bdesc = opaque->bo_bdesc;
+ pgstat_count_index_scan(idxRel);
+
+ /*
+ * We need to know the size of the table so that we know how long to
+ * iterate on the revmap.
+ */
+ heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
+ heapRel = heap_open(heapOid, AccessShareLock);
+ nblocks = RelationGetNumberOfBlocks(heapRel);
+ heap_close(heapRel, AccessShareLock);
+
+ /*
+ * Obtain consistent functions for all indexed column. Maybe it'd be
+ * possible to do this lazily only the first time we see a scan key that
+ * involves each particular attribute.
+ */
+ consistentFn = palloc(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
+ for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
+ {
+ FmgrInfo *tmp;
+
+ tmp = index_getprocinfo(idxRel, keyno + 1, BRIN_PROCNUM_CONSISTENT);
+ fmgr_info_copy(&consistentFn[keyno], tmp, CurrentMemoryContext);
+ }
+
+ /*
+ * Setup and use a per-range memory context, which is reset every time we
+ * loop below. This avoids having to free the tuples within the loop.
+ */
+ perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
+ "bringetbitmap cxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(perRangeCxt);
+
+ /*
+ * Now scan the revmap. We start by querying for heap page 0,
+ * incrementing by the number of pages per range; this gives us a full
+ * view of the table.
+ */
+ for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
+ {
+ bool addrange;
+ BrinTuple *tup;
+ OffsetNumber off;
+ Size size;
+
+ CHECK_FOR_INTERRUPTS();
+
+ MemoryContextResetAndDeleteChildren(perRangeCxt);
+
+ tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
+ &off, &size, BUFFER_LOCK_SHARE);
+ if (tup)
+ {
+ tup = brin_copy_tuple(tup, size);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ }
+
+ /*
+ * For page ranges with no indexed tuple, we must return the whole
+ * range; otherwise, compare it to the scan keys.
+ */
+ if (tup == NULL)
+ {
+ addrange = true;
+ }
+ else
+ {
+ BrinMemTuple *dtup;
+ int keyno;
+
+ dtup = brin_deform_tuple(bdesc, tup);
+ if (dtup->bt_placeholder)
+ {
+ /*
+ * Placeholder tuples are always returned, regardless of the
+ * values stored in them.
+ */
+ addrange = true;
+ }
+ else
+ {
+ /*
+ * Compare scan keys with summary values stored for the range.
+ * If scan keys are matched, the page range must be added to
+ * the bitmap. We initially assume the range needs to be
+ * added; in particular this serves the case where there are
+ * no keys.
+ */
+ addrange = true;
+ for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
+ {
+ ScanKey key = &scan->keyData[keyno];
+ AttrNumber keyattno = key->sk_attno;
+ BrinValues *bval = &dtup->bt_columns[keyattno - 1];
+ Datum add;
+
+ /*
+ * The collation of the scan key must match the collation
+ * used in the index column (but only if the search is not
+ * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
+ * this index ...
+ */
+ Assert((key->sk_flags & SK_ISNULL) ||
+ (key->sk_collation ==
+ bdesc->bd_tupdesc->attrs[keyattno - 1]->attcollation));
+
+ /*
+ * Check whether the scan key is consistent with the page
+ * range values; if so, have the pages in the range added
+ * to the output bitmap.
+ *
+ * When there are multiple scan keys, failure to meet the
+ * criteria for a single one of them is enough to discard
+ * the range as a whole, so break out of the loop as soon
+ * as a false return value is obtained.
+ */
+ add = FunctionCall3Coll(&consistentFn[keyattno - 1],
+ key->sk_collation,
+ PointerGetDatum(bdesc),
+ PointerGetDatum(bval),
+ PointerGetDatum(key));
+ addrange = DatumGetBool(add);
+ if (!addrange)
+ break;
+ }
+ }
+ }
+
+ /* add the pages in the range to the output bitmap, if needed */
+ if (addrange)
+ {
+ BlockNumber pageno;
+
+ for (pageno = heapBlk;
+ pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
+ pageno++)
+ {
+ MemoryContextSwitchTo(oldcxt);
+ tbm_add_page(tbm, pageno);
+ totalpages++;
+ MemoryContextSwitchTo(perRangeCxt);
+ }
+ }
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(perRangeCxt);
+
+ if (buf != InvalidBuffer)
+ ReleaseBuffer(buf);
+
+ /*
+ * XXX We have an approximation of the number of *pages* that our scan
+ * returns, but we don't have a precise idea of the number of heap tuples
+ * involved.
+ */
+ PG_RETURN_INT64(totalpages * 10);
+}
+
+/*
+ * Re-initialize state for a BRIN index scan
+ */
+Datum
+brinrescan(PG_FUNCTION_ARGS)
+{
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
+
+ /* other arguments ignored */
+
+ if (scankey && scan->numberOfKeys > 0)
+ memmove(scan->keyData, scankey,
+ scan->numberOfKeys * sizeof(ScanKeyData));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Close down a BRIN index scan
+ */
+Datum
+brinendscan(PG_FUNCTION_ARGS)
+{
+ IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+ BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
+
+ brinRevmapTerminate(opaque->bo_rmAccess);
+ brin_free_desc(opaque->bo_bdesc);
+ pfree(opaque);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+brinmarkpos(PG_FUNCTION_ARGS)
+{
+ elog(ERROR, "BRIN does not support mark/restore");
+ PG_RETURN_VOID();
+}
+
+Datum
+brinrestrpos(PG_FUNCTION_ARGS)
+{
+ elog(ERROR, "BRIN does not support mark/restore");
+ PG_RETURN_VOID();
+}
+
+/*
+ * Per-heap-tuple callback for IndexBuildHeapScan.
+ *
+ * Note we don't worry about the page range at the end of the table here; it is
+ * present in the build state struct after we're called the last time, but not
+ * inserted into the index. Caller must ensure to do so, if appropriate.
+ */
+static void
+brinbuildCallback(Relation index,
+ HeapTuple htup,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *brstate)
+{
+ BrinBuildState *state = (BrinBuildState *) brstate;
+ BlockNumber thisblock;
+ int i;
+
+ thisblock = ItemPointerGetBlockNumber(&htup->t_self);
+
+ /*
+ * If we're in a block that belongs to a future range, summarize what we've
+ * got and start afresh. Note the scan might have skipped many pages,
+ * if they were devoid of live tuples; make sure to insert index tuples
+ * for those too.
+ */
+ while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+ {
+
+ BRIN_elog(DEBUG2, "brinbuildCallback: completed a range: %u--%u",
+ state->bs_currRangeStart,
+ state->bs_currRangeStart + state->bs_pagesPerRange);
+
+ /* create the index tuple and insert it */
+ form_and_insert_tuple(state);
+
+ /* set state to correspond to the next range */
+ state->bs_currRangeStart += state->bs_pagesPerRange;
+
+ /* re-initialize state for it */
+ brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+ }
+
+ /* Accumulate the current tuple into the running state */
+ for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
+ {
+ FmgrInfo *addValue;
+ BrinValues *col;
+
+ col = &state->bs_dtuple->bt_columns[i];
+ addValue = index_getprocinfo(index, i + 1,
+ BRIN_PROCNUM_ADDVALUE);
+
+ /*
+ * Update dtuple state, if and as necessary.
+ */
+ FunctionCall4Coll(addValue,
+ state->bs_bdesc->bd_tupdesc->attrs[i]->attcollation,
+ PointerGetDatum(state->bs_bdesc),
+ PointerGetDatum(col),
+ values[i], isnull[i]);
+ }
+}
+
+/*
+ * brinbuild() -- build a new BRIN index.
+ */
+Datum
+brinbuild(PG_FUNCTION_ARGS)
+{
+ Relation heap = (Relation) PG_GETARG_POINTER(0);
+ Relation index = (Relation) PG_GETARG_POINTER(1);
+ IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
+ IndexBuildResult *result;
+ double reltuples;
+ double idxtuples;
+ BrinRevmap *revmap;
+ BrinBuildState *state;
+ Buffer meta;
+ BlockNumber pagesPerRange;
+
+ /*
+ * We expect to be called exactly once for any index relation.
+ */
+ if (RelationGetNumberOfBlocks(index) != 0)
+ elog(ERROR, "index \"%s\" already contains data",
+ RelationGetRelationName(index));
+
+ /*
+ * Critical section not required, because on error the creation of the
+ * whole relation will be rolled back.
+ */
+
+ meta = ReadBuffer(index, P_NEW);
+ Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
+ LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
+
+ brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
+ BRIN_CURRENT_VERSION);
+ MarkBufferDirty(meta);
+
+ if (RelationNeedsWAL(index))
+ {
+ xl_brin_createidx xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata;
+ Page page;
+
+ xlrec.node = index->rd_node;
+ xlrec.version = BRIN_CURRENT_VERSION;
+ xlrec.pagesPerRange = BrinGetPagesPerRange(index);
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = SizeOfBrinCreateIdx;
+ rdata.next = NULL;
+
+ recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX, &rdata);
+
+ page = BufferGetPage(meta);
+ PageSetLSN(page, recptr);
+ }
+
+ UnlockReleaseBuffer(meta);
+
+ /*
+ * Initialize our state, including the deformed tuple state.
+ */
+ revmap = brinRevmapInitialize(index, &pagesPerRange);
+ state = initialize_brin_buildstate(index, revmap, pagesPerRange);
+
+ /*
+ * Now scan the relation. No syncscan allowed here because we want the
+ * heap blocks in physical order.
+ */
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
+ brinbuildCallback, (void *) state);
+
+ /* process the final batch */
+ form_and_insert_tuple(state);
+
+ /* release resources */
+ idxtuples = state->bs_numtuples;
+ brinRevmapTerminate(state->bs_rmAccess);
+ terminate_brin_buildstate(state);
+
+ /*
+ * Return statistics
+ */
+ result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
+
+ result->heap_tuples = reltuples;
+ result->index_tuples = idxtuples;
+
+ PG_RETURN_POINTER(result);
+}
+
+Datum
+brinbuildempty(PG_FUNCTION_ARGS)
+{
+
+ Relation index = (Relation) PG_GETARG_POINTER(0);
+ Buffer metabuf;
+
+ /* An empty BRIN index has a metapage only. */
+ metabuf =
+ ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* Initialize and xlog metabuffer. */
+ START_CRIT_SECTION();
+ brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
+ BRIN_CURRENT_VERSION);
+ MarkBufferDirty(metabuf);
+ log_newpage_buffer(metabuf, false);
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(metabuf);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * brinbulkdelete
+ * Since there are no per-heap-tuple index tuples in BRIN indexes,
+ * there's not a lot we can do here.
+ *
+ * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
+ * tuple is deleted), meaning the need to re-run summarization on the affected
+ * range. Need to an extra flag in mmtuples for that.
+ */
+Datum
+brinbulkdelete(PG_FUNCTION_ARGS)
+{
+ /* other arguments are not currently used */
+ IndexBulkDeleteResult *stats =
+ (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+
+ /* allocate stats if first time through, else re-use existing struct */
+ if (stats == NULL)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+
+ PG_RETURN_POINTER(stats);
+}
+
+/*
+ * This routine is in charge of "vacuuming" a BRIN index: we just summarize
+ * ranges that are currently unsummarized.
+ */
+Datum
+brinvacuumcleanup(PG_FUNCTION_ARGS)
+{
+ IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
+ IndexBulkDeleteResult *stats =
+ (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ Relation heapRel;
+
+ /* No-op in ANALYZE ONLY mode */
+ if (info->analyze_only)
+ PG_RETURN_POINTER(stats);
+
+ if (!stats)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ stats->num_pages = RelationGetNumberOfBlocks(info->index);
+ /* rest of stats is initialized by zeroing */
+
+ heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
+ AccessShareLock);
+
+ brinsummarize(info->index, heapRel,
+ &stats->num_index_tuples, &stats->num_index_tuples);
+
+ heap_close(heapRel, AccessShareLock);
+
+ PG_RETURN_POINTER(stats);
+}
+
+/*
+ * reloptions processor for BRIN indexes
+ */
+Datum
+brinoptions(PG_FUNCTION_ARGS)
+{
+ Datum reloptions = PG_GETARG_DATUM(0);
+ bool validate = PG_GETARG_BOOL(1);
+ relopt_value *options;
+ BrinOptions *rdopts;
+ int numoptions;
+ static const relopt_parse_elt tab[] = {
+ {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)}
+ };
+
+ options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
+ &numoptions);
+
+ /* if none set, we're done */
+ if (numoptions == 0)
+ PG_RETURN_NULL();
+
+ rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
+
+ fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
+ validate, tab, lengthof(tab));
+
+ pfree(options);
+
+ PG_RETURN_BYTEA_P(rdopts);
+}
+
+/*
+ * SQL-callable function to scan through an index and summarize all ranges
+ * that are not currently summarized.
+ */
+Datum
+brin_summarize_new_values(PG_FUNCTION_ARGS)
+{
+ Oid indexoid = PG_GETARG_OID(0);
+ Relation indexRel;
+ Relation heapRel;
+ double numSummarized = 0;
+
+ heapRel = heap_open(IndexGetRelation(indexoid, false),
+ ShareUpdateExclusiveLock);
+ indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
+
+ brinsummarize(indexRel, heapRel, &numSummarized, NULL);
+
+ relation_close(indexRel, ShareUpdateExclusiveLock);
+ relation_close(heapRel, ShareUpdateExclusiveLock);
+
+ PG_RETURN_INT32((int32) numSummarized);
+}
+
+/*
+ * Build a BrinDesc used to create or scan a BRIN index
+ */
+BrinDesc *
+brin_build_desc(Relation rel)
+{
+ BrinOpcInfo **opcinfo;
+ BrinDesc *bdesc;
+ TupleDesc tupdesc;
+ int totalstored = 0;
+ int keyno;
+ long totalsize;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
+
+ cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "brin desc cxt",
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(cxt);
+ tupdesc = RelationGetDescr(rel);
+
+ /*
+ * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
+ * the number of columns stored, since the number is opclass-defined.
+ */
+ opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
+ for (keyno = 0; keyno < tupdesc->natts; keyno++)
+ {
+ FmgrInfo *opcInfoFn;
+
+ opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
+
+ opcinfo[keyno] = (BrinOpcInfo *)
+ DatumGetPointer(FunctionCall1(opcInfoFn,
+ tupdesc->attrs[keyno]->atttypid));
+ totalstored += opcinfo[keyno]->oi_nstored;
+ }
+
+ /* Allocate our result struct and fill it in */
+ totalsize = offsetof(BrinDesc, bd_info) +
+ sizeof(BrinOpcInfo *) * tupdesc->natts;
+
+ bdesc = palloc(totalsize);
+ bdesc->bd_context = cxt;
+ bdesc->bd_index = rel;
+ bdesc->bd_tupdesc = tupdesc;
+ bdesc->bd_disktdesc = NULL; /* generated lazily */
+ bdesc->bd_totalstored = totalstored;
+
+ for (keyno = 0; keyno < tupdesc->natts; keyno++)
+ bdesc->bd_info[keyno] = opcinfo[keyno];
+ pfree(opcinfo);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ return bdesc;
+}
+
+void
+brin_free_desc(BrinDesc *bdesc)
+{
+ /* make sure the tupdesc is still valid */
+ Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
+ /* no need for retail pfree */
+ MemoryContextDelete(bdesc->bd_context);
+}
+
+/*
+ * Initialize a BrinBuildState appropriate to create tuples on the given index.
+ */
+static BrinBuildState *
+initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
+ BlockNumber pagesPerRange)
+{
+ BrinBuildState *state;
+
+ state = palloc(sizeof(BrinBuildState));
+
+ state->bs_irel = idxRel;
+ state->bs_numtuples = 0;
+ state->bs_currentInsertBuf = InvalidBuffer;
+ state->bs_pagesPerRange = pagesPerRange;
+ state->bs_currRangeStart = 0;
+ state->bs_rmAccess = revmap;
+ state->bs_bdesc = brin_build_desc(idxRel);
+ state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+
+ brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+
+ return state;
+}
+
+/*
+ * Release resources associated with a BrinBuildState.
+ */
+static void
+terminate_brin_buildstate(BrinBuildState *state)
+{
+ /* release the last index buffer used */
+ if (!BufferIsInvalid(state->bs_currentInsertBuf))
+ {
+ Page page;
+
+ page = BufferGetPage(state->bs_currentInsertBuf);
+ RecordPageWithFreeSpace(state->bs_irel,
+ BufferGetBlockNumber(state->bs_currentInsertBuf),
+ PageGetFreeSpace(page));
+ ReleaseBuffer(state->bs_currentInsertBuf);
+ }
+
+ brin_free_desc(state->bs_bdesc);
+ pfree(state->bs_dtuple);
+ pfree(state);
+}
+
+/*
+ * Summarize the given page range of the given index.
+ *
+ * This routine can run in parallel with insertions into the heap. To avoid
+ * missing those values from the summary tuple, we first insert a placeholder
+ * index tuple into the index, then execute the heap scan; transactions
+ * concurrent with the scan update the placeholder tuple. After the scan, we
+ * union the placeholder tuple with the one computed by this routine. The
+ * update of the index value happens in a loop, so that if somebody updates
+ * the placeholder tuple after we read it, we detect the case and try again.
+ * This ensures that the concurrently inserted tuples are not lost.
+ */
+static void
+summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
+ BlockNumber heapBlk)
+{
+ Buffer phbuf;
+ BrinTuple *phtup;
+ Size phsz;
+ OffsetNumber offset;
+
+ /*
+ * Insert the placeholder tuple
+ */
+ phbuf = InvalidBuffer;
+ phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
+ offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
+ state->bs_rmAccess, &phbuf,
+ heapBlk, phtup, phsz);
+
+ /*
+ * Execute the partial heap scan covering the heap blocks in the specified
+ * page range, summarizing the heap tuples in it. This scan stops just
+ * short of brinbuildCallback creating the new index entry.
+ */
+ state->bs_currRangeStart = heapBlk;
+ IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false,
+ heapBlk, state->bs_pagesPerRange,
+ brinbuildCallback, (void *) state);
+
+ /*
+ * Now we update the values obtained by the scan with the placeholder
+ * tuple. We do this in a loop which only terminates if we're able to
+ * update the placeholder tuple successfully; if we are not, this means
+ * somebody else modified the placeholder tuple after we read it.
+ */
+ for (;;)
+ {
+ BrinTuple *newtup;
+ Size newsize;
+ bool didupdate;
+ bool samepage;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Update the summary tuple and try to update.
+ */
+ newtup = brin_form_tuple(state->bs_bdesc,
+ heapBlk, state->bs_dtuple, &newsize);
+ samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
+ didupdate =
+ brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
+ state->bs_rmAccess, heapBlk, phbuf, offset,
+ phtup, phsz, newtup, newsize, samepage);
+ brin_free_tuple(phtup);
+ brin_free_tuple(newtup);
+
+ /* If the update succeeded, we're done. */
+ if (didupdate)
+ break;
+
+ /*
+ * If the update didn't work, it might be because somebody updated the
+ * placeholder tuple concurrently. Extract the new version, union it
+ * with the values we have from the scan, and start over. (There are
+ * other reasons for the update to fail, but it's simple to treat them
+ * the same.)
+ */
+ phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
+ &offset, &phsz, BUFFER_LOCK_SHARE);
+ /* the placeholder tuple must exist */
+ if (phtup == NULL)
+ elog(ERROR, "missing placeholder tuple");
+ phtup = brin_copy_tuple(phtup, phsz);
+ LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
+
+ /* merge it into the tuple from the heap scan */
+ union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
+ }
+
+ ReleaseBuffer(phbuf);
+}
+
+/*
+ * Scan a complete BRIN index, and summarize each page range that's not already
+ * summarized. The index and heap must have been locked by caller in at
+ * least ShareUpdateExclusiveLock mode.
+ *
+ * For each new index tuple inserted, *numSummarized (if not NULL) is
+ * incremented; for each existing tuple, numExisting (if not NULL) is
+ * incremented.
+ */
+static void
+brinsummarize(Relation index, Relation heapRel, double *numSummarized,
+ double *numExisting)
+{
+ BrinRevmap *revmap;
+ BrinBuildState *state = NULL;
+ IndexInfo *indexInfo = NULL;
+ BlockNumber heapNumBlocks;
+ BlockNumber heapBlk;
+ BlockNumber pagesPerRange;
+ Buffer buf;
+
+ revmap = brinRevmapInitialize(index, &pagesPerRange);
+
+ /*
+ * Scan the revmap to find unsummarized items.
+ */
+ buf = InvalidBuffer;
+ heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
+ for (heapBlk = 0; heapBlk < heapNumBlocks; heapBlk += pagesPerRange)
+ {
+ BrinTuple *tup;
+ OffsetNumber off;
+
+ CHECK_FOR_INTERRUPTS();
+
+ tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
+ BUFFER_LOCK_SHARE);
+ if (tup == NULL)
+ {
+ /* no revmap entry for this heap range. Summarize it. */
+ if (state == NULL)
+ {
+ /* first time through */
+ Assert(!indexInfo);
+ state = initialize_brin_buildstate(index, revmap,
+ pagesPerRange);
+ indexInfo = BuildIndexInfo(index);
+
+ /*
+ * We only have ShareUpdateExclusiveLock on the table, and
+ * therefore other sessions may insert tuples into the range
+ * we're going to scan. This is okay, because we take
+ * additional precautions to avoid losing the additional
+ * tuples; see comments in summarize_range. Set the
+ * concurrent flag, which causes IndexBuildHeapRangeScan to
+ * use a snapshot other than SnapshotAny, and silences
+ * warnings emitted there.
+ */
+ indexInfo->ii_Concurrent = true;
+
+ /*
+ * If using transaction-snapshot mode, it would be possible
+ * for another transaction to insert a tuple that's not
+ * visible to our snapshot if we have already acquired one,
+ * when in snapshot-isolation mode; therefore, disallow this
+ * from running in such a transaction unless a snapshot hasn't
+ * been acquired yet.
+ *
+ * This code is called by VACUUM and
+ * brin_summarize_new_values. Have the error message mention
+ * the latter because VACUUM cannot run in a transaction and
+ * thus cannot cause this issue.
+ */
+ if (IsolationUsesXactSnapshot() && FirstSnapshotSet)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("brin_summarize_new_values() cannot run in a transaction that has already obtained a snapshot")));
+ }
+ summarize_range(indexInfo, state, heapRel, heapBlk);
+
+ /* and re-initialize state for the next range */
+ brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+
+ if (numSummarized)
+ *numSummarized += 1.0;
+ }
+ else
+ {
+ if (numExisting)
+ *numExisting += 1.0;
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ }
+ }
+
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
+
+ /* free resources */
+ brinRevmapTerminate(revmap);
+ if (state)
+ terminate_brin_buildstate(state);
+}
+
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and insert it into the index, making the revmap point to it.
+ */
+static void
+form_and_insert_tuple(BrinBuildState *state)
+{
+ BrinTuple *tup;
+ Size size;
+
+ tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+ state->bs_dtuple, &size);
+ brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+ &state->bs_currentInsertBuf, state->bs_currRangeStart,
+ tup, size);
+ state->bs_numtuples++;
+
+ pfree(tup);
+}
+
+/*
+ * Given two deformed tuples, adjust the first one so that it's consistent
+ * with the summary values in both.
+ */
+static void
+union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
+{
+ int keyno;
+ BrinMemTuple *db;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
+
+ /* Use our own memory context to avoid retail pfree */
+ cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "brin union",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(cxt);
+ db = brin_deform_tuple(bdesc, b);
+ MemoryContextSwitchTo(oldcxt);
+
+ for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
+ {
+ FmgrInfo *unionFn;
+ BrinValues *col_a = &a->bt_columns[keyno];
+ BrinValues *col_b = &db->bt_columns[keyno];
+
+ unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
+ BRIN_PROCNUM_UNION);
+ FunctionCall3Coll(unionFn,
+ bdesc->bd_index->rd_indcollation[keyno],
+ PointerGetDatum(bdesc),
+ PointerGetDatum(col_a),
+ PointerGetDatum(col_b));
+ }
+
+ MemoryContextDelete(cxt);
+}
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
new file mode 100644
index 00000000000..3a2bee2649e
--- /dev/null
+++ b/src/backend/access/brin/brin_minmax.c
@@ -0,0 +1,341 @@
+/*
+ * brin_minmax.c
+ * Implementation of Min/Max opclass for BRIN
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin_minmax.c
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/brin_internal.h"
+#include "access/brin_tuple.h"
+#include "access/skey.h"
+#include "catalog/pg_type.h"
+#include "utils/datum.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+
+
+/*
+ * Procedure numbers must not collide with BRIN_PROCNUM defines in
+ * brin_internal.h. Note we only need inequality functions.
+ */
+#define MINMAX_NUM_PROCNUMS 4 /* # support procs we need */
+#define PROCNUM_LESS 11
+#define PROCNUM_LESSEQUAL 12
+#define PROCNUM_GREATEREQUAL 13
+#define PROCNUM_GREATER 14
+
+/*
+ * Subtract this from procnum to obtain index in MinmaxOpaque arrays
+ * (Must be equal to minimum of private procnums)
+ */
+#define PROCNUM_BASE 11
+
+static FmgrInfo *minmax_get_procinfo(BrinDesc *bdesc, uint16 attno,
+ uint16 procnum);
+
+PG_FUNCTION_INFO_V1(minmaxOpcInfo);
+PG_FUNCTION_INFO_V1(minmaxAddValue);
+PG_FUNCTION_INFO_V1(minmaxConsistent);
+PG_FUNCTION_INFO_V1(minmaxUnion);
+
+
+typedef struct MinmaxOpaque
+{
+ FmgrInfo operators[MINMAX_NUM_PROCNUMS];
+ bool inited[MINMAX_NUM_PROCNUMS];
+} MinmaxOpaque;
+
+Datum
+minmaxOpcInfo(PG_FUNCTION_ARGS)
+{
+ Oid typoid = PG_GETARG_OID(0);
+ BrinOpcInfo *result;
+
+ /*
+ * opaque->operators is initialized lazily, as indicated by 'inited' which
+ * is initialized to all false by palloc0.
+ */
+
+ result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) +
+ sizeof(MinmaxOpaque));
+ result->oi_nstored = 2;
+ result->oi_opaque = (MinmaxOpaque *)
+ MAXALIGN((char *) result + SizeofBrinOpcInfo(2));
+ result->oi_typids[0] = typoid;
+ result->oi_typids[1] = typoid;
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
+ * Examine the given index tuple (which contains partial status of a certain
+ * page range) by comparing it to the given value that comes from another heap
+ * tuple. If the new value is outside the min/max range specified by the
+ * existing tuple values, update the index tuple and return true. Otherwise,
+ * return false and do not modify in this case.
+ */
+Datum
+minmaxAddValue(PG_FUNCTION_ARGS)
+{
+ BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
+ BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
+ Datum newval = PG_GETARG_DATUM(2);
+ bool isnull = PG_GETARG_DATUM(3);
+ Oid colloid = PG_GET_COLLATION();
+ FmgrInfo *cmpFn;
+ Datum compar;
+ bool updated = false;
+ Form_pg_attribute attr;
+ AttrNumber attno;
+
+ /*
+ * If the new value is null, we record that we saw it if it's the first
+ * one; otherwise, there's nothing to do.
+ */
+ if (isnull)
+ {
+ if (column->bv_hasnulls)
+ PG_RETURN_BOOL(false);
+
+ column->bv_hasnulls = true;
+ PG_RETURN_BOOL(true);
+ }
+
+ attno = column->bv_attno;
+ attr = bdesc->bd_tupdesc->attrs[attno - 1];
+
+ /*
+ * If the recorded value is null, store the new value (which we know to be
+ * not null) as both minimum and maximum, and we're done.
+ */
+ if (column->bv_allnulls)
+ {
+ column->bv_values[0] = datumCopy(newval, attr->attbyval, attr->attlen);
+ column->bv_values[1] = datumCopy(newval, attr->attbyval, attr->attlen);
+ column->bv_allnulls = false;
+ PG_RETURN_BOOL(true);
+ }
+
+ /*
+ * Otherwise, need to compare the new value with the existing boundaries
+ * and update them accordingly. First check if it's less than the
+ * existing minimum.
+ */
+ cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_LESS);
+ compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[0]);
+ if (DatumGetBool(compar))
+ {
+ if (!attr->attbyval)
+ pfree(DatumGetPointer(column->bv_values[0]));
+ column->bv_values[0] = datumCopy(newval, attr->attbyval, attr->attlen);
+ updated = true;
+ }
+
+ /*
+ * And now compare it to the existing maximum.
+ */
+ cmpFn = minmax_get_procinfo(bdesc, attno, PROCNUM_GREATER);
+ compar = FunctionCall2Coll(cmpFn, colloid, newval, column->bv_values[1]);
+ if (DatumGetBool(compar))
+ {
+ if (!attr->attbyval)
+ pfree(DatumGetPointer(column->bv_values[1]));
+ column->bv_values[1] = datumCopy(newval, attr->attbyval, attr->attlen);
+ updated = true;
+ }
+
+ PG_RETURN_BOOL(updated);
+}
+
+/*
+ * Given an index tuple corresponding to a certain page range and a scan key,
+ * return whether the scan key is consistent with the index tuple's min/max
+ * values. Return true if so, false otherwise.
+ */
+Datum
+minmaxConsistent(PG_FUNCTION_ARGS)
+{
+ BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
+ BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
+ ScanKey key = (ScanKey) PG_GETARG_POINTER(2);
+ Oid colloid = PG_GET_COLLATION();
+ AttrNumber attno;
+ Datum value;
+ Datum matches;
+
+ Assert(key->sk_attno == column->bv_attno);
+
+ /* handle IS NULL/IS NOT NULL tests */
+ if (key->sk_flags & SK_ISNULL)
+ {
+ if (key->sk_flags & SK_SEARCHNULL)
+ {
+ if (column->bv_allnulls || column->bv_hasnulls)
+ PG_RETURN_BOOL(true);
+ PG_RETURN_BOOL(false);
+ }
+
+ /*
+ * For IS NOT NULL, we can only skip ranges that are known to have
+ * only nulls.
+ */
+ Assert(key->sk_flags & SK_SEARCHNOTNULL);
+ PG_RETURN_BOOL(!column->bv_allnulls);
+ }
+
+ /* if the range is all empty, it cannot possibly be consistent */
+ if (column->bv_allnulls)
+ PG_RETURN_BOOL(false);
+
+ attno = key->sk_attno;
+ value = key->sk_argument;
+ switch (key->sk_strategy)
+ {
+ case BTLessStrategyNumber:
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_LESS),
+ colloid, column->bv_values[0], value);
+ break;
+ case BTLessEqualStrategyNumber:
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_LESSEQUAL),
+ colloid, column->bv_values[0], value);
+ break;
+ case BTEqualStrategyNumber:
+
+ /*
+ * In the equality case (WHERE col = someval), we want to return
+ * the current page range if the minimum value in the range <=
+ * scan key, and the maximum value >= scan key.
+ */
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_LESSEQUAL),
+ colloid, column->bv_values[0], value);
+ if (!DatumGetBool(matches))
+ break;
+ /* max() >= scankey */
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_GREATEREQUAL),
+ colloid, column->bv_values[1], value);
+ break;
+ case BTGreaterEqualStrategyNumber:
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_GREATEREQUAL),
+ colloid, column->bv_values[1], value);
+ break;
+ case BTGreaterStrategyNumber:
+ matches = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_GREATER),
+ colloid, column->bv_values[1], value);
+ break;
+ default:
+ /* shouldn't happen */
+ elog(ERROR, "invalid strategy number %d", key->sk_strategy);
+ matches = 0;
+ break;
+ }
+
+ PG_RETURN_DATUM(matches);
+}
+
+/*
+ * Given two BrinValues, update the first of them as a union of the summary
+ * values contained in both. The second one is untouched.
+ */
+Datum
+minmaxUnion(PG_FUNCTION_ARGS)
+{
+ BrinDesc *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
+ BrinValues *col_a = (BrinValues *) PG_GETARG_POINTER(1);
+ BrinValues *col_b = (BrinValues *) PG_GETARG_POINTER(2);
+ Oid colloid = PG_GET_COLLATION();
+ AttrNumber attno;
+ Form_pg_attribute attr;
+ bool needsadj;
+
+ Assert(col_a->bv_attno == col_b->bv_attno);
+
+ /* If there are no values in B, there's nothing to do */
+ if (col_b->bv_allnulls)
+ PG_RETURN_VOID();
+
+ attno = col_a->bv_attno;
+ attr = bdesc->bd_tupdesc->attrs[attno - 1];
+
+ /* Adjust "hasnulls" */
+ if (col_b->bv_hasnulls && !col_a->bv_hasnulls)
+ col_a->bv_hasnulls = true;
+
+ /*
+ * Adjust "allnulls". If B has values but A doesn't, just copy the values
+ * from B into A, and we're done. (We cannot run the operators in this
+ * case, because values in A might contain garbage.)
+ */
+ if (!col_b->bv_allnulls && col_a->bv_allnulls)
+ {
+ col_a->bv_allnulls = false;
+ col_a->bv_values[0] = datumCopy(col_b->bv_values[0],
+ attr->attbyval, attr->attlen);
+ col_a->bv_values[1] = datumCopy(col_b->bv_values[1],
+ attr->attbyval, attr->attlen);
+ PG_RETURN_VOID();
+ }
+
+ /* Adjust minimum, if B's min is less than A's min */
+ needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_LESS),
+ colloid, col_b->bv_values[0], col_a->bv_values[0]);
+ if (needsadj)
+ {
+ if (!attr->attbyval)
+ pfree(DatumGetPointer(col_a->bv_values[0]));
+ col_a->bv_values[0] = datumCopy(col_b->bv_values[0],
+ attr->attbyval, attr->attlen);
+ }
+
+ /* Adjust maximum, if B's max is greater than A's max */
+ needsadj = FunctionCall2Coll(minmax_get_procinfo(bdesc, attno,
+ PROCNUM_GREATER),
+ colloid, col_b->bv_values[1], col_a->bv_values[1]);
+ if (needsadj)
+ {
+ if (!attr->attbyval)
+ pfree(DatumGetPointer(col_a->bv_values[1]));
+ col_a->bv_values[1] = datumCopy(col_b->bv_values[1],
+ attr->attbyval, attr->attlen);
+ }
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Return the procedure corresponding to the given function support number.
+ */
+static FmgrInfo *
+minmax_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
+{
+ MinmaxOpaque *opaque;
+ uint16 basenum = procnum - PROCNUM_BASE;
+
+ opaque = (MinmaxOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
+
+ /*
+ * We cache these in the opaque struct, to avoid repetitive syscache
+ * lookups.
+ */
+ if (!opaque->inited[basenum])
+ {
+ fmgr_info_copy(&opaque->operators[basenum],
+ index_getprocinfo(bdesc->bd_index, attno, procnum),
+ bdesc->bd_context);
+ opaque->inited[basenum] = true;
+ }
+
+ return &opaque->operators[basenum];
+}
diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c
new file mode 100644
index 00000000000..c34b86c94c7
--- /dev/null
+++ b/src/backend/access/brin/brin_pageops.c
@@ -0,0 +1,723 @@
+/*
+ * brin_pageops.c
+ * Page-handling routines for BRIN indexes
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin_pageops.c
+ */
+#include "postgres.h"
+
+#include "access/brin_pageops.h"
+#include "access/brin_page.h"
+#include "access/brin_revmap.h"
+#include "access/brin_xlog.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/rel.h"
+
+
+static Buffer brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
+ bool *was_extended);
+static Size br_page_get_freespace(Page page);
+
+
+/*
+ * Update tuple origtup (size origsz), located in offset oldoff of buffer
+ * oldbuf, to newtup (size newsz) as summary tuple for the page range starting
+ * at heapBlk. oldbuf must not be locked on entry, and is not locked at exit.
+ *
+ * If samepage is true, attempt to put the new tuple in the same page, but if
+ * there's no room, use some other one.
+ *
+ * If the update is successful, return true; the revmap is updated to point to
+ * the new tuple. If the update is not done for whatever reason, return false.
+ * Caller may retry the update if this happens.
+ */
+bool
+brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
+ BrinRevmap *revmap, BlockNumber heapBlk,
+ Buffer oldbuf, OffsetNumber oldoff,
+ const BrinTuple *origtup, Size origsz,
+ const BrinTuple *newtup, Size newsz,
+ bool samepage)
+{
+ Page oldpage;
+ ItemId oldlp;
+ BrinTuple *oldtup;
+ Size oldsz;
+ Buffer newbuf;
+ BrinSpecialSpace *special;
+ bool extended = false;
+
+ newsz = MAXALIGN(newsz);
+
+ /* make sure the revmap is long enough to contain the entry we need */
+ brinRevmapExtend(revmap, heapBlk);
+
+ if (!samepage)
+ {
+ /* need a page on which to put the item */
+ newbuf = brin_getinsertbuffer(idxrel, oldbuf, newsz, &extended);
+ /* XXX delay vacuuming FSM until locks are released? */
+ if (extended)
+ FreeSpaceMapVacuum(idxrel);
+ if (!BufferIsValid(newbuf))
+ return false;
+
+ /*
+ * Note: it's possible (though unlikely) that the returned newbuf is
+ * the same as oldbuf, if brin_getinsertbuffer determined that the old
+ * buffer does in fact have enough space.
+ */
+ if (newbuf == oldbuf)
+ newbuf = InvalidBuffer;
+ }
+ else
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
+ newbuf = InvalidBuffer;
+ }
+ oldpage = BufferGetPage(oldbuf);
+ oldlp = PageGetItemId(oldpage, oldoff);
+
+ /*
+ * Check that the old tuple wasn't updated concurrently: it might have
+ * moved someplace else entirely ...
+ */
+ if (!ItemIdIsNormal(oldlp))
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+ return false;
+ }
+
+ oldsz = ItemIdGetLength(oldlp);
+ oldtup = (BrinTuple *) PageGetItem(oldpage, oldlp);
+
+ /*
+ * ... or it might have been updated in place to different contents.
+ */
+ if (!brin_tuples_equal(oldtup, oldsz, origtup, origsz))
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+ return false;
+ }
+
+ special = (BrinSpecialSpace *) PageGetSpecialPointer(oldpage);
+
+ /*
+ * Great, the old tuple is intact. We can proceed with the update.
+ *
+ * If there's enough room in the old page for the new tuple, replace it.
+ *
+ * Note that there might now be enough space on the page even though the
+ * caller told us there isn't, if a concurrent update moved another tuple
+ * elsewhere or replaced a tuple with a smaller one.
+ */
+ if (((special->flags & BRIN_EVACUATE_PAGE) == 0) &&
+ brin_can_do_samepage_update(oldbuf, origsz, newsz))
+ {
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+
+ START_CRIT_SECTION();
+ PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
+ if (PageAddItem(oldpage, (Item) newtup, newsz, oldoff, true,
+ false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add BRIN tuple");
+ MarkBufferDirty(oldbuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(idxrel))
+ {
+ BlockNumber blk = BufferGetBlockNumber(oldbuf);
+ xl_brin_samepage_update xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+ uint8 info = XLOG_BRIN_SAMEPAGE_UPDATE;
+
+ xlrec.node = idxrel->rd_node;
+ ItemPointerSetBlockNumber(&xlrec.tid, blk);
+ ItemPointerSetOffsetNumber(&xlrec.tid, oldoff);
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBrinSamepageUpdate;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = (char *) newtup;
+ rdata[1].len = newsz;
+ rdata[1].buffer = oldbuf;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+
+ PageSetLSN(oldpage, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ return true;
+ }
+ else if (newbuf == InvalidBuffer)
+ {
+ /*
+ * Not enough space, but caller said that there was. Tell them to
+ * start over.
+ */
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ return false;
+ }
+ else
+ {
+ /*
+ * Not enough free space on the oldpage. Put the new tuple on the new
+ * page, and update the revmap.
+ */
+ Page newpage = BufferGetPage(newbuf);
+ Buffer revmapbuf;
+ ItemPointerData newtid;
+ OffsetNumber newoff;
+
+ revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
+
+ START_CRIT_SECTION();
+
+ PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
+ newoff = PageAddItem(newpage, (Item) newtup, newsz,
+ InvalidOffsetNumber, false, false);
+ if (newoff == InvalidOffsetNumber)
+ elog(ERROR, "failed to add BRIN tuple to new page");
+ MarkBufferDirty(oldbuf);
+ MarkBufferDirty(newbuf);
+
+ ItemPointerSet(&newtid, BufferGetBlockNumber(newbuf), newoff);
+ brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, newtid);
+ MarkBufferDirty(revmapbuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(idxrel))
+ {
+ xl_brin_update xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[4];
+ uint8 info;
+
+ info = XLOG_BRIN_UPDATE | (extended ? XLOG_BRIN_INIT_PAGE : 0);
+
+ xlrec.new.node = idxrel->rd_node;
+ ItemPointerSet(&xlrec.new.tid, BufferGetBlockNumber(newbuf), newoff);
+ xlrec.new.heapBlk = heapBlk;
+ xlrec.new.tuplen = newsz;
+ xlrec.new.revmapBlk = BufferGetBlockNumber(revmapbuf);
+ xlrec.new.pagesPerRange = pagesPerRange;
+ ItemPointerSet(&xlrec.oldtid, BufferGetBlockNumber(oldbuf), oldoff);
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBrinUpdate;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = (char *) newtup;
+ rdata[1].len = newsz;
+ rdata[1].buffer = extended ? InvalidBuffer : newbuf;
+ rdata[1].buffer_std = true;
+ rdata[1].next = &(rdata[2]);
+
+ rdata[2].data = (char *) NULL;
+ rdata[2].len = 0;
+ rdata[2].buffer = revmapbuf;
+ rdata[2].buffer_std = true;
+ rdata[2].next = &(rdata[3]);
+
+ rdata[3].data = (char *) NULL;
+ rdata[3].len = 0;
+ rdata[3].buffer = oldbuf;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
+
+ recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+
+ PageSetLSN(oldpage, recptr);
+ PageSetLSN(newpage, recptr);
+ PageSetLSN(BufferGetPage(revmapbuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ UnlockReleaseBuffer(newbuf);
+ return true;
+ }
+}
+
+/*
+ * Return whether brin_doupdate can do a samepage update.
+ */
+bool
+brin_can_do_samepage_update(Buffer buffer, Size origsz, Size newsz)
+{
+ return
+ ((newsz <= origsz) ||
+ PageGetExactFreeSpace(BufferGetPage(buffer)) >= (newsz - origsz));
+}
+
+/*
+ * Insert an index tuple into the index relation. The revmap is updated to
+ * mark the range containing the given page as pointing to the inserted entry.
+ * A WAL record is written.
+ *
+ * The buffer, if valid, is first checked for free space to insert the new
+ * entry; if there isn't enough, a new buffer is obtained and pinned. No
+ * buffer lock must be held on entry, no buffer lock is held on exit.
+ *
+ * Return value is the offset number where the tuple was inserted.
+ */
+OffsetNumber
+brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
+ BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,
+ BrinTuple *tup, Size itemsz)
+{
+ Page page;
+ BlockNumber blk;
+ OffsetNumber off;
+ Buffer revmapbuf;
+ ItemPointerData tid;
+ bool extended = false;
+
+ itemsz = MAXALIGN(itemsz);
+
+ /* Make sure the revmap is long enough to contain the entry we need */
+ brinRevmapExtend(revmap, heapBlk);
+
+ /*
+ * Obtain a locked buffer to insert the new tuple. Note
+ * brin_getinsertbuffer ensures there's enough space in the returned
+ * buffer.
+ */
+ if (BufferIsValid(*buffer))
+ {
+ /*
+ * It's possible that another backend (or ourselves!) extended the
+ * revmap over the page we held a pin on, so we cannot assume that
+ * it's still a regular page.
+ */
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+ if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
+ {
+ UnlockReleaseBuffer(*buffer);
+ *buffer = InvalidBuffer;
+ }
+ }
+
+ if (!BufferIsValid(*buffer))
+ {
+ *buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
+ Assert(BufferIsValid(*buffer));
+ Assert(br_page_get_freespace(BufferGetPage(*buffer)) >= itemsz);
+ }
+
+ /* Now obtain lock on revmap buffer */
+ revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
+
+ page = BufferGetPage(*buffer);
+ blk = BufferGetBlockNumber(*buffer);
+
+ START_CRIT_SECTION();
+ off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
+ false, false);
+ if (off == InvalidOffsetNumber)
+ elog(ERROR, "could not insert new index tuple to page");
+ MarkBufferDirty(*buffer);
+
+ BRIN_elog(DEBUG2, "inserted tuple (%u,%u) for range starting at %u",
+ blk, off, heapBlk);
+
+ ItemPointerSet(&tid, blk, off);
+ brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);
+ MarkBufferDirty(revmapbuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(idxrel))
+ {
+ xl_brin_insert xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[3];
+ uint8 info;
+
+ info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
+ xlrec.node = idxrel->rd_node;
+ xlrec.heapBlk = heapBlk;
+ xlrec.pagesPerRange = pagesPerRange;
+ xlrec.revmapBlk = BufferGetBlockNumber(revmapbuf);
+ xlrec.tuplen = itemsz;
+ ItemPointerSet(&xlrec.tid, blk, off);
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBrinInsert;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].buffer_std = false;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = (char *) tup;
+ rdata[1].len = itemsz;
+ rdata[1].buffer = extended ? InvalidBuffer : *buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = &(rdata[2]);
+
+ rdata[2].data = (char *) NULL;
+ rdata[2].len = 0;
+ rdata[2].buffer = revmapbuf;
+ rdata[2].buffer_std = false;
+ rdata[2].next = NULL;
+
+ recptr = XLogInsert(RM_BRIN_ID, info, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetLSN(BufferGetPage(revmapbuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /* Tuple is firmly on buffer; we can release our locks */
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
+
+ if (extended)
+ FreeSpaceMapVacuum(idxrel);
+
+ return off;
+}
+
+/*
+ * Initialize a page with the given type.
+ *
+ * Caller is responsible for marking it dirty, as appropriate.
+ */
+void
+brin_page_init(Page page, uint16 type)
+{
+ BrinSpecialSpace *special;
+
+ PageInit(page, BLCKSZ, sizeof(BrinSpecialSpace));
+
+ special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
+ special->type = type;
+}
+
+/*
+ * Initialize a new BRIN index' metapage.
+ */
+void
+brin_metapage_init(Page page, BlockNumber pagesPerRange, uint16 version)
+{
+ BrinMetaPageData *metadata;
+
+ brin_page_init(page, BRIN_PAGETYPE_META);
+
+ metadata = (BrinMetaPageData *) PageGetContents(page);
+
+ metadata->brinMagic = BRIN_META_MAGIC;
+ metadata->brinVersion = version;
+ metadata->pagesPerRange = pagesPerRange;
+
+ /*
+ * Note we cheat here a little. 0 is not a valid revmap block number
+ * (because it's the metapage buffer), but doing this enables the first
+ * revmap page to be created when the index is.
+ */
+ metadata->lastRevmapPage = 0;
+}
+
+/*
+ * Initiate page evacuation protocol.
+ *
+ * The page must be locked in exclusive mode by the caller.
+ *
+ * If the page is not yet initialized or empty, return false without doing
+ * anything; it can be used for revmap without any further changes. If it
+ * contains tuples, mark it for evacuation and return true.
+ */
+bool
+brin_start_evacuating_page(Relation idxRel, Buffer buf)
+{
+ OffsetNumber off;
+ OffsetNumber maxoff;
+ BrinSpecialSpace *special;
+ Page page;
+
+ page = BufferGetPage(buf);
+
+ if (PageIsNew(page))
+ return false;
+
+ special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (off = FirstOffsetNumber; off <= maxoff; off++)
+ {
+ ItemId lp;
+
+ lp = PageGetItemId(page, off);
+ if (ItemIdIsUsed(lp))
+ {
+ /* prevent other backends from adding more stuff to this page */
+ special->flags |= BRIN_EVACUATE_PAGE;
+ MarkBufferDirtyHint(buf, true);
+
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ * Move all tuples out of a page.
+ *
+ * The caller must hold lock on the page. The lock and pin are released.
+ */
+void
+brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
+ BrinRevmap *revmap, Buffer buf)
+{
+ OffsetNumber off;
+ OffsetNumber maxoff;
+ Page page;
+
+ page = BufferGetPage(buf);
+
+ Assert(((BrinSpecialSpace *)
+ PageGetSpecialPointer(page))->flags & BRIN_EVACUATE_PAGE);
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (off = FirstOffsetNumber; off <= maxoff; off++)
+ {
+ BrinTuple *tup;
+ Size sz;
+ ItemId lp;
+
+ CHECK_FOR_INTERRUPTS();
+
+ lp = PageGetItemId(page, off);
+ if (ItemIdIsUsed(lp))
+ {
+ sz = ItemIdGetLength(lp);
+ tup = (BrinTuple *) PageGetItem(page, lp);
+ tup = brin_copy_tuple(tup, sz);
+
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ if (!brin_doupdate(idxRel, pagesPerRange, revmap, tup->bt_blkno,
+ buf, off, tup, sz, tup, sz, false))
+ off--; /* retry */
+
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+ /* It's possible that someone extended the revmap over this page */
+ if (!BRIN_IS_REGULAR_PAGE(page))
+ break;
+ }
+ }
+
+ UnlockReleaseBuffer(buf);
+}
+
+/*
+ * Return a pinned and exclusively locked buffer which can be used to insert an
+ * index item of size itemsz. If oldbuf is a valid buffer, it is also locked
+ * (in a order determined to avoid deadlocks.)
+ *
+ * If there's no existing page with enough free space to accomodate the new
+ * item, the relation is extended. If this happens, *extended is set to true.
+ *
+ * If we find that the old page is no longer a regular index page (because
+ * of a revmap extension), the old buffer is unlocked and we return
+ * InvalidBuffer.
+ */
+static Buffer
+brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
+ bool *was_extended)
+{
+ BlockNumber oldblk;
+ BlockNumber newblk;
+ Page page;
+ int freespace;
+
+ if (BufferIsValid(oldbuf))
+ oldblk = BufferGetBlockNumber(oldbuf);
+ else
+ oldblk = InvalidBlockNumber;
+
+ /*
+ * Loop until we find a page with sufficient free space. By the time we
+ * return to caller out of this loop, both buffers are valid and locked;
+ * if we have to restart here, neither buffer is locked and buf is not a
+ * pinned buffer.
+ */
+ newblk = RelationGetTargetBlock(irel);
+ if (newblk == InvalidBlockNumber)
+ newblk = GetPageWithFreeSpace(irel, itemsz);
+ for (;;)
+ {
+ Buffer buf;
+ bool extensionLockHeld = false;
+ bool extended = false;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (newblk == InvalidBlockNumber)
+ {
+ /*
+ * There's not enough free space in any existing index page,
+ * according to the FSM: extend the relation to obtain a shiny new
+ * page.
+ */
+ if (!RELATION_IS_LOCAL(irel))
+ {
+ LockRelationForExtension(irel, ExclusiveLock);
+ extensionLockHeld = true;
+ }
+ buf = ReadBuffer(irel, P_NEW);
+ newblk = BufferGetBlockNumber(buf);
+ *was_extended = extended = true;
+
+ BRIN_elog(DEBUG2, "brin_getinsertbuffer: extending to page %u",
+ BufferGetBlockNumber(buf));
+ }
+ else if (newblk == oldblk)
+ {
+ /*
+ * There's an odd corner-case here where the FSM is out-of-date,
+ * and gave us the old page.
+ */
+ buf = oldbuf;
+ }
+ else
+ {
+ buf = ReadBuffer(irel, newblk);
+ }
+
+ /*
+ * We lock the old buffer first, if it's earlier than the new one; but
+ * before we do, we need to check that it hasn't been turned into a
+ * revmap page concurrently; if we detect that it happened, give up
+ * and tell caller to start over.
+ */
+ if (BufferIsValid(oldbuf) && oldblk < newblk)
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
+ if (!BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)))
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buf);
+ return InvalidBuffer;
+ }
+ }
+
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+ if (extensionLockHeld)
+ UnlockRelationForExtension(irel, ExclusiveLock);
+
+ page = BufferGetPage(buf);
+
+ if (extended)
+ brin_page_init(page, BRIN_PAGETYPE_REGULAR);
+
+ /*
+ * We have a new buffer to insert into. Check that the new page has
+ * enough free space, and return it if it does; otherwise start over.
+ * Note that we allow for the FSM to be out of date here, and in that
+ * case we update it and move on.
+ *
+ * (br_page_get_freespace also checks that the FSM didn't hand us a
+ * page that has since been repurposed for the revmap.)
+ */
+ freespace = br_page_get_freespace(page);
+ if (freespace >= itemsz)
+ {
+ RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
+
+ /*
+ * Since the target block specification can get lost on cache
+ * invalidations, make sure we update the more permanent FSM with
+ * data about it before going away.
+ */
+ if (extended)
+ RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
+ freespace);
+
+ /*
+ * Lock the old buffer if not locked already. Note that in this
+ * case we know for sure it's a regular page: it's later than the
+ * new page we just got, which is not a revmap page, and revmap
+ * pages are always consecutive.
+ */
+ if (BufferIsValid(oldbuf) && oldblk > newblk)
+ {
+ LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
+ Assert(BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)));
+ }
+
+ return buf;
+ }
+
+ /* This page is no good. */
+
+ /*
+ * If an entirely new page does not contain enough free space for the
+ * new item, then surely that item is oversized. Complain loudly; but
+ * first make sure we record the page as free, for next time.
+ */
+ if (extended)
+ {
+ RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
+ freespace);
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
+ (unsigned long) itemsz,
+ (unsigned long) freespace,
+ RelationGetRelationName(irel))));
+ return InvalidBuffer; /* keep compiler quiet */
+ }
+
+ if (newblk != oldblk)
+ UnlockReleaseBuffer(buf);
+ if (BufferIsValid(oldbuf) && oldblk <= newblk)
+ LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+
+ newblk = RecordAndGetPageWithFreeSpace(irel, newblk, freespace, itemsz);
+ }
+}
+
+/*
+ * Return the amount of free space on a regular BRIN index page.
+ *
+ * If the page is not a regular page, or has been marked with the
+ * BRIN_EVACUATE_PAGE flag, returns 0.
+ */
+static Size
+br_page_get_freespace(Page page)
+{
+ BrinSpecialSpace *special;
+
+ special = (BrinSpecialSpace *) PageGetSpecialPointer(page);
+ if (!BRIN_IS_REGULAR_PAGE(page) ||
+ (special->flags & BRIN_EVACUATE_PAGE) != 0)
+ return 0;
+ else
+ return PageGetFreeSpace(page);
+}
diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c
new file mode 100644
index 00000000000..b08a94b742d
--- /dev/null
+++ b/src/backend/access/brin/brin_revmap.c
@@ -0,0 +1,510 @@
+/*
+ * brin_revmap.c
+ * Range map for BRIN indexes
+ *
+ * The range map (revmap) is a translation structure for BRIN indexes: for each
+ * page range there is one summary tuple, and its location is tracked by the
+ * revmap. Whenever a new tuple is inserted into a table that violates the
+ * previously recorded summary values, a new tuple is inserted into the index
+ * and the revmap is updated to point to it.
+ *
+ * The revmap is stored in the first pages of the index, immediately following
+ * the metapage. When the revmap needs to be expanded, all tuples on the
+ * regular BRIN page at that block (if any) are moved out of the way.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin_revmap.c
+ */
+#include "postgres.h"
+
+#include "access/brin_page.h"
+#include "access/brin_pageops.h"
+#include "access/brin_revmap.h"
+#include "access/brin_tuple.h"
+#include "access/brin_xlog.h"
+#include "access/rmgr.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "utils/rel.h"
+
+
+/*
+ * In revmap pages, each item stores an ItemPointerData. These defines let one
+ * find the logical revmap page number and index number of the revmap item for
+ * the given heap block number.
+ */
+#define HEAPBLK_TO_REVMAP_BLK(pagesPerRange, heapBlk) \
+ ((heapBlk / pagesPerRange) / REVMAP_PAGE_MAXITEMS)
+#define HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk) \
+ ((heapBlk / pagesPerRange) % REVMAP_PAGE_MAXITEMS)
+
+
+struct BrinRevmap
+{
+ Relation rm_irel;
+ BlockNumber rm_pagesPerRange;
+ BlockNumber rm_lastRevmapPage; /* cached from the metapage */
+ Buffer rm_metaBuf;
+ Buffer rm_currBuf;
+};
+
+/* typedef appears in brin_revmap.h */
+
+
+static BlockNumber revmap_get_blkno(BrinRevmap *revmap,
+ BlockNumber heapBlk);
+static Buffer revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk);
+static BlockNumber revmap_extend_and_get_blkno(BrinRevmap *revmap,
+ BlockNumber heapBlk);
+static void revmap_physical_extend(BrinRevmap *revmap);
+
+/*
+ * Initialize an access object for a range map. This must be freed by
+ * brinRevmapTerminate when caller is done with it.
+ */
+BrinRevmap *
+brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
+{
+ BrinRevmap *revmap;
+ Buffer meta;
+ BrinMetaPageData *metadata;
+
+ meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
+ LockBuffer(meta, BUFFER_LOCK_SHARE);
+ metadata = (BrinMetaPageData *) PageGetContents(BufferGetPage(meta));
+
+ revmap = palloc(sizeof(BrinRevmap));
+ revmap->rm_irel = idxrel;
+ revmap->rm_pagesPerRange = metadata->pagesPerRange;
+ revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
+ revmap->rm_metaBuf = meta;
+ revmap->rm_currBuf = InvalidBuffer;
+
+ *pagesPerRange = metadata->pagesPerRange;
+
+ LockBuffer(meta, BUFFER_LOCK_UNLOCK);
+
+ return revmap;
+}
+
+/*
+ * Release resources associated with a revmap access object.
+ */
+void
+brinRevmapTerminate(BrinRevmap *revmap)
+{
+ ReleaseBuffer(revmap->rm_metaBuf);
+ if (revmap->rm_currBuf != InvalidBuffer)
+ ReleaseBuffer(revmap->rm_currBuf);
+ pfree(revmap);
+}
+
+/*
+ * Extend the revmap to cover the given heap block number.
+ */
+void
+brinRevmapExtend(BrinRevmap *revmap, BlockNumber heapBlk)
+{
+ BlockNumber mapBlk;
+
+ mapBlk = revmap_extend_and_get_blkno(revmap, heapBlk);
+
+ /* Ensure the buffer we got is in the expected range */
+ Assert(mapBlk != InvalidBlockNumber &&
+ mapBlk != BRIN_METAPAGE_BLKNO &&
+ mapBlk <= revmap->rm_lastRevmapPage);
+}
+
+/*
+ * Prepare to insert an entry into the revmap; the revmap buffer in which the
+ * entry is to reside is locked and returned. Most callers should call
+ * brinRevmapExtend beforehand, as this routine does not extend the revmap if
+ * it's not long enough.
+ *
+ * The returned buffer is also recorded in the revmap struct; finishing that
+ * releases the buffer, therefore the caller needn't do it explicitely.
+ */
+Buffer
+brinLockRevmapPageForUpdate(BrinRevmap *revmap, BlockNumber heapBlk)
+{
+ Buffer rmBuf;
+
+ rmBuf = revmap_get_buffer(revmap, heapBlk);
+ LockBuffer(rmBuf, BUFFER_LOCK_EXCLUSIVE);
+
+ return rmBuf;
+}
+
+/*
+ * In the given revmap buffer (locked appropriately by caller), which is used
+ * in a BRIN index of pagesPerRange pages per range, set the element
+ * corresponding to heap block number heapBlk to the given TID.
+ *
+ * Once the operation is complete, the caller must update the LSN on the
+ * returned buffer.
+ *
+ * This is used both in regular operation and during WAL replay.
+ */
+void
+brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
+ BlockNumber heapBlk, ItemPointerData tid)
+{
+ RevmapContents *contents;
+ ItemPointerData *iptr;
+ Page page;
+
+ /* The correct page should already be pinned and locked */
+ page = BufferGetPage(buf);
+ contents = (RevmapContents *) PageGetContents(page);
+ iptr = (ItemPointerData *) contents->rm_tids;
+ iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk);
+
+ ItemPointerSet(iptr,
+ ItemPointerGetBlockNumber(&tid),
+ ItemPointerGetOffsetNumber(&tid));
+}
+
+/*
+ * Fetch the BrinTuple for a given heap block.
+ *
+ * The buffer containing the tuple is locked, and returned in *buf. As an
+ * optimization, the caller can pass a pinned buffer *buf on entry, which will
+ * avoid a pin-unpin cycle when the next tuple is on the same page as a
+ * previous one.
+ *
+ * If no tuple is found for the given heap range, returns NULL. In that case,
+ * *buf might still be updated, but it's not locked.
+ *
+ * The output tuple offset within the buffer is returned in *off, and its size
+ * is returned in *size.
+ */
+BrinTuple *
+brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
+ Buffer *buf, OffsetNumber *off, Size *size, int mode)
+{
+ Relation idxRel = revmap->rm_irel;
+ BlockNumber mapBlk;
+ RevmapContents *contents;
+ ItemPointerData *iptr;
+ BlockNumber blk;
+ Page page;
+ ItemId lp;
+ BrinTuple *tup;
+ ItemPointerData previptr;
+
+ /* normalize the heap block number to be the first page in the range */
+ heapBlk = (heapBlk / revmap->rm_pagesPerRange) * revmap->rm_pagesPerRange;
+
+ /* Compute the revmap page number we need */
+ mapBlk = revmap_get_blkno(revmap, heapBlk);
+ if (mapBlk == InvalidBlockNumber)
+ {
+ *off = InvalidOffsetNumber;
+ return NULL;
+ }
+
+ ItemPointerSetInvalid(&previptr);
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (revmap->rm_currBuf == InvalidBuffer ||
+ BufferGetBlockNumber(revmap->rm_currBuf) != mapBlk)
+ {
+ if (revmap->rm_currBuf != InvalidBuffer)
+ ReleaseBuffer(revmap->rm_currBuf);
+
+ Assert(mapBlk != InvalidBlockNumber);
+ revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
+ }
+
+ LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_SHARE);
+
+ contents = (RevmapContents *)
+ PageGetContents(BufferGetPage(revmap->rm_currBuf));
+ iptr = contents->rm_tids;
+ iptr += HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk);
+
+ if (!ItemPointerIsValid(iptr))
+ {
+ LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
+ return NULL;
+ }
+
+ /*
+ * Check the TID we got in a previous iteration, if any, and save the
+ * current TID we got from the revmap; if we loop, we can sanity-check
+ * that the next one we get is different. Otherwise we might be stuck
+ * looping forever if the revmap is somehow badly broken.
+ */
+ if (ItemPointerIsValid(&previptr) && ItemPointerEquals(&previptr, iptr))
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg_internal("corrupted BRIN index: inconsistent range map")));
+ previptr = *iptr;
+
+ blk = ItemPointerGetBlockNumber(iptr);
+ *off = ItemPointerGetOffsetNumber(iptr);
+
+ LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
+
+ /* Ok, got a pointer to where the BrinTuple should be. Fetch it. */
+ if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != blk)
+ {
+ if (BufferIsValid(*buf))
+ ReleaseBuffer(*buf);
+ *buf = ReadBuffer(idxRel, blk);
+ }
+ LockBuffer(*buf, mode);
+ page = BufferGetPage(*buf);
+
+ /* If we land on a revmap page, start over */
+ if (BRIN_IS_REGULAR_PAGE(page))
+ {
+ lp = PageGetItemId(page, *off);
+ if (ItemIdIsUsed(lp))
+ {
+ tup = (BrinTuple *) PageGetItem(page, lp);
+
+ if (tup->bt_blkno == heapBlk)
+ {
+ if (size)
+ *size = ItemIdGetLength(lp);
+ /* found it! */
+ return tup;
+ }
+ }
+ }
+
+ /*
+ * No luck. Assume that the revmap was updated concurrently.
+ */
+ LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ }
+ /* not reached, but keep compiler quiet */
+ return NULL;
+}
+
+/*
+ * Given a heap block number, find the corresponding physical revmap block
+ * number and return it. If the revmap page hasn't been allocated yet, return
+ * InvalidBlockNumber.
+ */
+static BlockNumber
+revmap_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
+{
+ BlockNumber targetblk;
+
+ /* obtain revmap block number, skip 1 for metapage block */
+ targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
+
+ /* Normal case: the revmap page is already allocated */
+ if (targetblk <= revmap->rm_lastRevmapPage)
+ return targetblk;
+
+ return InvalidBlockNumber;
+}
+
+/*
+ * Obtain and return a buffer containing the revmap page for the given heap
+ * page. The revmap must have been previously extended to cover that page.
+ * The returned buffer is also recorded in the revmap struct; finishing that
+ * releases the buffer, therefore the caller needn't do it explicitely.
+ */
+static Buffer
+revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk)
+{
+ BlockNumber mapBlk;
+
+ /* Translate the heap block number to physical index location. */
+ mapBlk = revmap_get_blkno(revmap, heapBlk);
+
+ if (mapBlk == InvalidBlockNumber)
+ elog(ERROR, "revmap does not cover heap block %u", heapBlk);
+
+ /* Ensure the buffer we got is in the expected range */
+ Assert(mapBlk != BRIN_METAPAGE_BLKNO &&
+ mapBlk <= revmap->rm_lastRevmapPage);
+
+ BRIN_elog(DEBUG2, "getting revmap page for logical page %lu (physical %u) for heap %u",
+ HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk),
+ mapBlk, heapBlk);
+
+ /*
+ * Obtain the buffer from which we need to read. If we already have the
+ * correct buffer in our access struct, use that; otherwise, release that,
+ * (if valid) and read the one we need.
+ */
+ if (revmap->rm_currBuf == InvalidBuffer ||
+ mapBlk != BufferGetBlockNumber(revmap->rm_currBuf))
+ {
+ if (revmap->rm_currBuf != InvalidBuffer)
+ ReleaseBuffer(revmap->rm_currBuf);
+
+ revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
+ }
+
+ return revmap->rm_currBuf;
+}
+
+/*
+ * Given a heap block number, find the corresponding physical revmap block
+ * number and return it. If the revmap page hasn't been allocated yet, extend
+ * the revmap until it is.
+ */
+static BlockNumber
+revmap_extend_and_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
+{
+ BlockNumber targetblk;
+
+ /* obtain revmap block number, skip 1 for metapage block */
+ targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
+
+ /* Extend the revmap, if necessary */
+ while (targetblk > revmap->rm_lastRevmapPage)
+ {
+ CHECK_FOR_INTERRUPTS();
+ revmap_physical_extend(revmap);
+ }
+
+ return targetblk;
+}
+
+/*
+ * Try to extend the revmap by one page. This might not happen for a number of
+ * reasons; caller is expected to retry until the expected outcome is obtained.
+ */
+static void
+revmap_physical_extend(BrinRevmap *revmap)
+{
+ Buffer buf;
+ Page page;
+ Page metapage;
+ BrinMetaPageData *metadata;
+ BlockNumber mapBlk;
+ BlockNumber nblocks;
+ Relation irel = revmap->rm_irel;
+ bool needLock = !RELATION_IS_LOCAL(irel);
+
+ /*
+ * Lock the metapage. This locks out concurrent extensions of the revmap,
+ * but note that we still need to grab the relation extension lock because
+ * another backend can extend the index with regular BRIN pages.
+ */
+ LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_EXCLUSIVE);
+ metapage = BufferGetPage(revmap->rm_metaBuf);
+ metadata = (BrinMetaPageData *) PageGetContents(metapage);
+
+ /*
+ * Check that our cached lastRevmapPage value was up-to-date; if it
+ * wasn't, update the cached copy and have caller start over.
+ */
+ if (metadata->lastRevmapPage != revmap->rm_lastRevmapPage)
+ {
+ revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
+ LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
+ return;
+ }
+ mapBlk = metadata->lastRevmapPage + 1;
+
+ nblocks = RelationGetNumberOfBlocks(irel);
+ if (mapBlk < nblocks)
+ {
+ buf = ReadBuffer(irel, mapBlk);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
+ }
+ else
+ {
+ if (needLock)
+ LockRelationForExtension(irel, ExclusiveLock);
+
+ buf = ReadBuffer(irel, P_NEW);
+ if (BufferGetBlockNumber(buf) != mapBlk)
+ {
+ /*
+ * Very rare corner case: somebody extended the relation
+ * concurrently after we read its length. If this happens, give
+ * up and have caller start over. We will have to evacuate that
+ * page from under whoever is using it.
+ */
+ if (needLock)
+ UnlockRelationForExtension(irel, ExclusiveLock);
+ LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
+ return;
+ }
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
+
+ if (needLock)
+ UnlockRelationForExtension(irel, ExclusiveLock);
+ }
+
+ /* Check that it's a regular block (or an empty page) */
+ if (!PageIsNew(page) && !BRIN_IS_REGULAR_PAGE(page))
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("unexpected page type 0x%04X in BRIN index \"%s\" block %u",
+ BRIN_PAGE_TYPE(page),
+ RelationGetRelationName(irel),
+ BufferGetBlockNumber(buf))));
+
+ /* If the page is in use, evacuate it and restart */
+ if (brin_start_evacuating_page(irel, buf))
+ {
+ LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
+ brin_evacuate_page(irel, revmap->rm_pagesPerRange, revmap, buf);
+
+ /* have caller start over */
+ return;
+ }
+
+ /*
+ * Ok, we have now locked the metapage and the target block. Re-initialize
+ * it as a revmap page.
+ */
+ START_CRIT_SECTION();
+
+ /* the rm_tids array is initialized to all invalid by PageInit */
+ brin_page_init(page, BRIN_PAGETYPE_REVMAP);
+ MarkBufferDirty(buf);
+
+ metadata->lastRevmapPage = mapBlk;
+ MarkBufferDirty(revmap->rm_metaBuf);
+
+ if (RelationNeedsWAL(revmap->rm_irel))
+ {
+ xl_brin_revmap_extend xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+
+ xlrec.node = revmap->rm_irel->rd_node;
+ xlrec.targetBlk = mapBlk;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBrinRevmapExtend;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].buffer_std = false;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].data = (char *) NULL;
+ rdata[1].len = 0;
+ rdata[1].buffer = revmap->rm_metaBuf;
+ rdata[1].buffer_std = false;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND, rdata);
+ PageSetLSN(metapage, recptr);
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
+
+ UnlockReleaseBuffer(buf);
+}
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
new file mode 100644
index 00000000000..d895cb715cb
--- /dev/null
+++ b/src/backend/access/brin/brin_tuple.c
@@ -0,0 +1,554 @@
+/*
+ * brin_tuples.c
+ * Method implementations for tuples in BRIN indexes.
+ *
+ * Intended usage is that code outside this file only deals with
+ * BrinMemTuples, and convert to and from the on-disk representation through
+ * functions in this file.
+ *
+ * NOTES
+ *
+ * A BRIN tuple is similar to a heap tuple, with a few key differences. The
+ * first interesting difference is that the tuple header is much simpler, only
+ * containing its total length and a small area for flags. Also, the stored
+ * data does not match the relation tuple descriptor exactly: for each
+ * attribute in the descriptor, the index tuple carries an arbitrary number
+ * of values, depending on the opclass.
+ *
+ * Also, for each column of the index relation there are two null bits: one
+ * (hasnulls) stores whether any tuple within the page range has that column
+ * set to null; the other one (allnulls) stores whether the column values are
+ * all null. If allnulls is true, then the tuple data area does not contain
+ * values for that column at all; whereas it does if the hasnulls is set.
+ * Note the size of the null bitmask may not be the same as that of the
+ * datum array.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin_tuple.c
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/brin_tuple.h"
+#include "access/tupdesc.h"
+#include "access/tupmacs.h"
+#include "utils/datum.h"
+#include "utils/memutils.h"
+
+
+static inline void brin_deconstruct_tuple(BrinDesc *brdesc,
+ char *tp, bits8 *nullbits, bool nulls,
+ Datum *values, bool *allnulls, bool *hasnulls);
+
+
+/*
+ * Return a tuple descriptor used for on-disk storage of BRIN tuples.
+ */
+static TupleDesc
+brtuple_disk_tupdesc(BrinDesc *brdesc)
+{
+ /* We cache these in the BrinDesc */
+ if (brdesc->bd_disktdesc == NULL)
+ {
+ int i;
+ int j;
+ AttrNumber attno = 1;
+ TupleDesc tupdesc;
+ MemoryContext oldcxt;
+
+ /* make sure it's in the bdesc's context */
+ oldcxt = MemoryContextSwitchTo(brdesc->bd_context);
+
+ tupdesc = CreateTemplateTupleDesc(brdesc->bd_totalstored, false);
+
+ for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
+ {
+ for (j = 0; j < brdesc->bd_info[i]->oi_nstored; j++)
+ TupleDescInitEntry(tupdesc, attno++, NULL,
+ brdesc->bd_info[i]->oi_typids[j],
+ -1, 0);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+
+ brdesc->bd_disktdesc = tupdesc;
+ }
+
+ return brdesc->bd_disktdesc;
+}
+
+/*
+ * Generate a new on-disk tuple to be inserted in a BRIN index.
+ *
+ * See brin_form_placeholder_tuple if you touch this.
+ */
+BrinTuple *
+brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
+ Size *size)
+{
+ Datum *values;
+ bool *nulls;
+ bool anynulls = false;
+ BrinTuple *rettuple;
+ int keyno;
+ int idxattno;
+ uint16 phony_infomask;
+ bits8 *phony_nullbitmap;
+ Size len,
+ hoff,
+ data_len;
+
+ Assert(brdesc->bd_totalstored > 0);
+
+ values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
+ nulls = palloc0(sizeof(bool) * brdesc->bd_totalstored);
+ phony_nullbitmap = palloc(sizeof(bits8) * BITMAPLEN(brdesc->bd_totalstored));
+
+ /*
+ * Set up the values/nulls arrays for heap_fill_tuple
+ */
+ idxattno = 0;
+ for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
+ {
+ int datumno;
+
+ /*
+ * "allnulls" is set when there's no nonnull value in any row in the
+ * column; when this happens, there is no data to store. Thus set the
+ * nullable bits for all data elements of this column and we're done.
+ */
+ if (tuple->bt_columns[keyno].bv_allnulls)
+ {
+ for (datumno = 0;
+ datumno < brdesc->bd_info[keyno]->oi_nstored;
+ datumno++)
+ nulls[idxattno++] = true;
+ anynulls = true;
+ continue;
+ }
+
+ /*
+ * The "hasnulls" bit is set when there are some null values in the
+ * data. We still need to store a real value, but the presence of
+ * this means we need a null bitmap.
+ */
+ if (tuple->bt_columns[keyno].bv_hasnulls)
+ anynulls = true;
+
+ for (datumno = 0;
+ datumno < brdesc->bd_info[keyno]->oi_nstored;
+ datumno++)
+ values[idxattno++] = tuple->bt_columns[keyno].bv_values[datumno];
+ }
+
+ /* compute total space needed */
+ len = SizeOfBrinTuple;
+ if (anynulls)
+ {
+ /*
+ * We need a double-length bitmap on an on-disk BRIN index tuple; the
+ * first half stores the "allnulls" bits, the second stores
+ * "hasnulls".
+ */
+ len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);
+ }
+
+ len = hoff = MAXALIGN(len);
+
+ data_len = heap_compute_data_size(brtuple_disk_tupdesc(brdesc),
+ values, nulls);
+
+ len += data_len;
+
+ rettuple = palloc0(len);
+ rettuple->bt_blkno = blkno;
+ rettuple->bt_info = hoff;
+ Assert((rettuple->bt_info & BRIN_OFFSET_MASK) == hoff);
+
+ /*
+ * The infomask and null bitmap as computed by heap_fill_tuple are useless
+ * to us. However, that function will not accept a null infomask; and we
+ * need to pass a valid null bitmap so that it will correctly skip
+ * outputting null attributes in the data area.
+ */
+ heap_fill_tuple(brtuple_disk_tupdesc(brdesc),
+ values,
+ nulls,
+ (char *) rettuple + hoff,
+ data_len,
+ &phony_infomask,
+ phony_nullbitmap);
+
+ /* done with these */
+ pfree(values);
+ pfree(nulls);
+ pfree(phony_nullbitmap);
+
+ /*
+ * Now fill in the real null bitmasks. allnulls first.
+ */
+ if (anynulls)
+ {
+ bits8 *bitP;
+ int bitmask;
+
+ rettuple->bt_info |= BRIN_NULLS_MASK;
+
+ /*
+ * Note that we reverse the sense of null bits in this module: we
+ * store a 1 for a null attribute rather than a 0. So we must reverse
+ * the sense of the att_isnull test in br_deconstruct_tuple as well.
+ */
+ bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;
+ bitmask = HIGHBIT;
+ for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
+ {
+ if (bitmask != HIGHBIT)
+ bitmask <<= 1;
+ else
+ {
+ bitP += 1;
+ *bitP = 0x0;
+ bitmask = 1;
+ }
+
+ if (!tuple->bt_columns[keyno].bv_allnulls)
+ continue;
+
+ *bitP |= bitmask;
+ }
+ /* hasnulls bits follow */
+ for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
+ {
+ if (bitmask != HIGHBIT)
+ bitmask <<= 1;
+ else
+ {
+ bitP += 1;
+ *bitP = 0x0;
+ bitmask = 1;
+ }
+
+ if (!tuple->bt_columns[keyno].bv_hasnulls)
+ continue;
+
+ *bitP |= bitmask;
+ }
+ bitP = ((bits8 *) (rettuple + SizeOfBrinTuple)) - 1;
+ }
+
+ if (tuple->bt_placeholder)
+ rettuple->bt_info |= BRIN_PLACEHOLDER_MASK;
+
+ *size = len;
+ return rettuple;
+}
+
+/*
+ * Generate a new on-disk tuple with no data values, marked as placeholder.
+ *
+ * This is a cut-down version of brin_form_tuple.
+ */
+BrinTuple *
+brin_form_placeholder_tuple(BrinDesc *brdesc, BlockNumber blkno, Size *size)
+{
+ Size len;
+ Size hoff;
+ BrinTuple *rettuple;
+ int keyno;
+ bits8 *bitP;
+ int bitmask;
+
+ /* compute total space needed: always add nulls */
+ len = SizeOfBrinTuple;
+ len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);
+ len = hoff = MAXALIGN(len);
+
+ rettuple = palloc0(len);
+ rettuple->bt_blkno = blkno;
+ rettuple->bt_info = hoff;
+ rettuple->bt_info |= BRIN_NULLS_MASK | BRIN_PLACEHOLDER_MASK;
+
+ bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;
+ bitmask = HIGHBIT;
+ /* set allnulls true for all attributes */
+ for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
+ {
+ if (bitmask != HIGHBIT)
+ bitmask <<= 1;
+ else
+ {
+ bitP += 1;
+ *bitP = 0x0;
+ bitmask = 1;
+ }
+
+ *bitP |= bitmask;
+ }
+ /* no need to set hasnulls */
+
+ *size = len;
+ return rettuple;
+}
+
+/*
+ * Free a tuple created by brin_form_tuple
+ */
+void
+brin_free_tuple(BrinTuple *tuple)
+{
+ pfree(tuple);
+}
+
+/*
+ * Create an palloc'd copy of a BrinTuple.
+ */
+BrinTuple *
+brin_copy_tuple(BrinTuple *tuple, Size len)
+{
+ BrinTuple *newtup;
+
+ newtup = palloc(len);
+ memcpy(newtup, tuple, len);
+
+ return newtup;
+}
+
+/*
+ * Return whether two BrinTuples are bitwise identical.
+ */
+bool
+brin_tuples_equal(const BrinTuple *a, Size alen, const BrinTuple *b, Size blen)
+{
+ if (alen != blen)
+ return false;
+ if (memcmp(a, b, alen) != 0)
+ return false;
+ return true;
+}
+
+/*
+ * Create a new BrinMemTuple from scratch, and initialize it to an empty
+ * state.
+ *
+ * Note: we don't provide any means to free a deformed tuple, so make sure to
+ * use a temporary memory context.
+ */
+BrinMemTuple *
+brin_new_memtuple(BrinDesc *brdesc)
+{
+ BrinMemTuple *dtup;
+ char *currdatum;
+ long basesize;
+ int i;
+
+ basesize = MAXALIGN(sizeof(BrinMemTuple) +
+ sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
+ dtup = palloc0(basesize + sizeof(Datum) * brdesc->bd_totalstored);
+ currdatum = (char *) dtup + basesize;
+ for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
+ {
+ dtup->bt_columns[i].bv_attno = i + 1;
+ dtup->bt_columns[i].bv_allnulls = true;
+ dtup->bt_columns[i].bv_hasnulls = false;
+ dtup->bt_columns[i].bv_values = (Datum *) currdatum;
+ currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
+ }
+
+ dtup->bt_context = AllocSetContextCreate(CurrentMemoryContext,
+ "brin dtuple",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ return dtup;
+}
+
+/*
+ * Reset a BrinMemTuple to initial state
+ */
+void
+brin_memtuple_initialize(BrinMemTuple *dtuple, BrinDesc *brdesc)
+{
+ int i;
+
+ MemoryContextReset(dtuple->bt_context);
+ for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
+ {
+ dtuple->bt_columns[i].bv_allnulls = true;
+ dtuple->bt_columns[i].bv_hasnulls = false;
+ }
+}
+
+/*
+ * Convert a BrinTuple back to a BrinMemTuple. This is the reverse of
+ * brin_form_tuple.
+ *
+ * Note we don't need the "on disk tupdesc" here; we rely on our own routine to
+ * deconstruct the tuple from the on-disk format.
+ */
+BrinMemTuple *
+brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
+{
+ BrinMemTuple *dtup;
+ Datum *values;
+ bool *allnulls;
+ bool *hasnulls;
+ char *tp;
+ bits8 *nullbits;
+ int keyno;
+ int valueno;
+ MemoryContext oldcxt;
+
+ dtup = brin_new_memtuple(brdesc);
+
+ if (BrinTupleIsPlaceholder(tuple))
+ dtup->bt_placeholder = true;
+ dtup->bt_blkno = tuple->bt_blkno;
+
+ values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
+ allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+ hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+
+ tp = (char *) tuple + BrinTupleDataOffset(tuple);
+
+ if (BrinTupleHasNulls(tuple))
+ nullbits = (bits8 *) ((char *) tuple + SizeOfBrinTuple);
+ else
+ nullbits = NULL;
+ brin_deconstruct_tuple(brdesc,
+ tp, nullbits, BrinTupleHasNulls(tuple),
+ values, allnulls, hasnulls);
+
+ /*
+ * Iterate to assign each of the values to the corresponding item in the
+ * values array of each column. The copies occur in the tuple's context.
+ */
+ oldcxt = MemoryContextSwitchTo(dtup->bt_context);
+ for (valueno = 0, keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++)
+ {
+ int i;
+
+ if (allnulls[keyno])
+ {
+ valueno += brdesc->bd_info[keyno]->oi_nstored;
+ continue;
+ }
+
+ /*
+ * We would like to skip datumCopy'ing the values datum in some cases,
+ * caller permitting ...
+ */
+ for (i = 0; i < brdesc->bd_info[keyno]->oi_nstored; i++)
+ dtup->bt_columns[keyno].bv_values[i] =
+ datumCopy(values[valueno++],
+ brdesc->bd_tupdesc->attrs[keyno]->attbyval,
+ brdesc->bd_tupdesc->attrs[keyno]->attlen);
+
+ dtup->bt_columns[keyno].bv_hasnulls = hasnulls[keyno];
+ dtup->bt_columns[keyno].bv_allnulls = false;
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+
+ pfree(values);
+ pfree(allnulls);
+ pfree(hasnulls);
+
+ return dtup;
+}
+
+/*
+ * brin_deconstruct_tuple
+ * Guts of attribute extraction from an on-disk BRIN tuple.
+ *
+ * Its arguments are:
+ * brdesc BRIN descriptor for the stored tuple
+ * tp pointer to the tuple data area
+ * nullbits pointer to the tuple nulls bitmask
+ * nulls "has nulls" bit in tuple infomask
+ * values output values, array of size brdesc->bd_totalstored
+ * allnulls output "allnulls", size brdesc->bd_tupdesc->natts
+ * hasnulls output "hasnulls", size brdesc->bd_tupdesc->natts
+ *
+ * Output arrays must have been allocated by caller.
+ */
+static inline void
+brin_deconstruct_tuple(BrinDesc *brdesc,
+ char *tp, bits8 *nullbits, bool nulls,
+ Datum *values, bool *allnulls, bool *hasnulls)
+{
+ int attnum;
+ int stored;
+ TupleDesc diskdsc;
+ long off;
+
+ /*
+ * First iterate to natts to obtain both null flags for each attribute.
+ * Note that we reverse the sense of the att_isnull test, because we store
+ * 1 for a null value (rather than a 1 for a not null value as is the
+ * att_isnull convention used elsewhere.) See brin_form_tuple.
+ */
+ for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
+ {
+ /*
+ * the "all nulls" bit means that all values in the page range for
+ * this column are nulls. Therefore there are no values in the tuple
+ * data area.
+ */
+ allnulls[attnum] = nulls && !att_isnull(attnum, nullbits);
+
+ /*
+ * the "has nulls" bit means that some tuples have nulls, but others
+ * have not-null values. Therefore we know the tuple contains data
+ * for this column.
+ *
+ * The hasnulls bits follow the allnulls bits in the same bitmask.
+ */
+ hasnulls[attnum] =
+ nulls && !att_isnull(brdesc->bd_tupdesc->natts + attnum, nullbits);
+ }
+
+ /*
+ * Iterate to obtain each attribute's stored values. Note that since we
+ * may reuse attribute entries for more than one column, we cannot cache
+ * offsets here.
+ */
+ diskdsc = brtuple_disk_tupdesc(brdesc);
+ stored = 0;
+ off = 0;
+ for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
+ {
+ int datumno;
+
+ if (allnulls[attnum])
+ {
+ stored += brdesc->bd_info[attnum]->oi_nstored;
+ continue;
+ }
+
+ for (datumno = 0;
+ datumno < brdesc->bd_info[attnum]->oi_nstored;
+ datumno++)
+ {
+ Form_pg_attribute thisatt = diskdsc->attrs[stored];
+
+ if (thisatt->attlen == -1)
+ {
+ off = att_align_pointer(off, thisatt->attalign, -1,
+ tp + off);
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, thisatt->attalign);
+ }
+
+ values[stored++] = fetchatt(thisatt, tp + off);
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+ }
+ }
+}
diff --git a/src/backend/access/brin/brin_xlog.c b/src/backend/access/brin/brin_xlog.c
new file mode 100644
index 00000000000..8dc80ad1e52
--- /dev/null
+++ b/src/backend/access/brin/brin_xlog.c
@@ -0,0 +1,291 @@
+/*
+ * brin_xlog.c
+ * XLog replay routines for BRIN indexes
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/brin/brin_xlog.c
+ */
+#include "postgres.h"
+
+#include "access/brin_page.h"
+#include "access/brin_pageops.h"
+#include "access/brin_xlog.h"
+#include "access/xlogutils.h"
+
+
+/*
+ * xlog replay routines
+ */
+static void
+brin_xlog_createidx(XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_brin_createidx *xlrec = (xl_brin_createidx *) XLogRecGetData(record);
+ Buffer buf;
+ Page page;
+
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
+ /* create the index' metapage */
+ buf = XLogReadBuffer(xlrec->node, BRIN_METAPAGE_BLKNO, true);
+ Assert(BufferIsValid(buf));
+ page = (Page) BufferGetPage(buf);
+ brin_metapage_init(page, xlrec->pagesPerRange, xlrec->version);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buf);
+ UnlockReleaseBuffer(buf);
+}
+
+/*
+ * Common part of an insert or update. Inserts the new tuple and updates the
+ * revmap.
+ */
+static void
+brin_xlog_insert_update(XLogRecPtr lsn, XLogRecord *record,
+ xl_brin_insert *xlrec, BrinTuple *tuple)
+{
+ BlockNumber blkno;
+ Buffer buffer;
+ Page page;
+ XLogRedoAction action;
+
+ blkno = ItemPointerGetBlockNumber(&xlrec->tid);
+
+ /*
+ * If we inserted the first and only tuple on the page, re-initialize the
+ * page from scratch.
+ */
+ if (record->xl_info & XLOG_BRIN_INIT_PAGE)
+ {
+ XLogReadBufferForRedoExtended(lsn, record, 0,
+ xlrec->node, MAIN_FORKNUM, blkno,
+ RBM_ZERO, false, &buffer);
+ page = BufferGetPage(buffer);
+ brin_page_init(page, BRIN_PAGETYPE_REGULAR);
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ {
+ action = XLogReadBufferForRedo(lsn, record, 0,
+ xlrec->node, blkno, &buffer);
+ }
+
+ /* insert the index item into the page */
+ if (action == BLK_NEEDS_REDO)
+ {
+ OffsetNumber offnum;
+
+ Assert(tuple->bt_blkno == xlrec->heapBlk);
+
+ page = (Page) BufferGetPage(buffer);
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "brin_xlog_insert_update: invalid max offset number");
+
+ offnum = PageAddItem(page, (Item) tuple, xlrec->tuplen, offnum, true,
+ false);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "brin_xlog_insert_update: failed to add tuple");
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ /* update the revmap */
+ action = XLogReadBufferForRedo(lsn, record, 1, xlrec->node,
+ xlrec->revmapBlk, &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ page = (Page) BufferGetPage(buffer);
+
+ brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk,
+ xlrec->tid);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ /* XXX no FSM updates here ... */
+}
+
+/*
+ * replay a BRIN index insertion
+ */
+static void
+brin_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_brin_insert *xlrec = (xl_brin_insert *) XLogRecGetData(record);
+ BrinTuple *newtup;
+
+ newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinInsert);
+
+ brin_xlog_insert_update(lsn, record, xlrec, newtup);
+}
+
+/*
+ * replay a BRIN index update
+ */
+static void
+brin_xlog_update(XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_brin_update *xlrec = (xl_brin_update *) XLogRecGetData(record);
+ BlockNumber blkno;
+ Buffer buffer;
+ BrinTuple *newtup;
+ XLogRedoAction action;
+
+ newtup = (BrinTuple *) ((char *) xlrec + SizeOfBrinUpdate);
+
+ /* First remove the old tuple */
+ blkno = ItemPointerGetBlockNumber(&(xlrec->oldtid));
+ action = XLogReadBufferForRedo(lsn, record, 2, xlrec->new.node,
+ blkno, &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page page;
+ OffsetNumber offnum;
+
+ page = (Page) BufferGetPage(buffer);
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->oldtid));
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "brin_xlog_update: invalid max offset number");
+
+ PageIndexDeleteNoCompact(page, &offnum, 1);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+
+ /* Then insert the new tuple and update revmap, like in an insertion. */
+ brin_xlog_insert_update(lsn, record, &xlrec->new, newtup);
+
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Update a tuple on a single page.
+ */
+static void
+brin_xlog_samepage_update(XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_brin_samepage_update *xlrec;
+ BlockNumber blkno;
+ Buffer buffer;
+ XLogRedoAction action;
+
+ xlrec = (xl_brin_samepage_update *) XLogRecGetData(record);
+ blkno = ItemPointerGetBlockNumber(&(xlrec->tid));
+ action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node, blkno,
+ &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ int tuplen;
+ BrinTuple *mmtuple;
+ Page page;
+ OffsetNumber offnum;
+
+ tuplen = record->xl_len - SizeOfBrinSamepageUpdate;
+ mmtuple = (BrinTuple *) ((char *) xlrec + SizeOfBrinSamepageUpdate);
+
+ page = (Page) BufferGetPage(buffer);
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->tid));
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "brin_xlog_samepage_update: invalid max offset number");
+
+ PageIndexDeleteNoCompact(page, &offnum, 1);
+ offnum = PageAddItem(page, (Item) mmtuple, tuplen, offnum, true, false);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "brin_xlog_samepage_update: failed to add tuple");
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ /* XXX no FSM updates here ... */
+}
+
+/*
+ * Replay a revmap page extension
+ */
+static void
+brin_xlog_revmap_extend(XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_brin_revmap_extend *xlrec;
+ Buffer metabuf;
+ Buffer buf;
+ Page page;
+ XLogRedoAction action;
+
+ xlrec = (xl_brin_revmap_extend *) XLogRecGetData(record);
+ /* Update the metapage */
+ action = XLogReadBufferForRedo(lsn, record, 0, xlrec->node,
+ BRIN_METAPAGE_BLKNO, &metabuf);
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page metapg;
+ BrinMetaPageData *metadata;
+
+ metapg = BufferGetPage(metabuf);
+ metadata = (BrinMetaPageData *) PageGetContents(metapg);
+
+ Assert(metadata->lastRevmapPage == xlrec->targetBlk - 1);
+ metadata->lastRevmapPage = xlrec->targetBlk;
+
+ PageSetLSN(metapg, lsn);
+ MarkBufferDirty(metabuf);
+ }
+
+ /*
+ * Re-init the target block as a revmap page. There's never a full- page
+ * image here.
+ */
+
+ buf = XLogReadBuffer(xlrec->node, xlrec->targetBlk, true);
+ page = (Page) BufferGetPage(buf);
+ brin_page_init(page, BRIN_PAGETYPE_REVMAP);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buf);
+
+ UnlockReleaseBuffer(buf);
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+void
+brin_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ switch (info & XLOG_BRIN_OPMASK)
+ {
+ case XLOG_BRIN_CREATE_INDEX:
+ brin_xlog_createidx(lsn, record);
+ break;
+ case XLOG_BRIN_INSERT:
+ brin_xlog_insert(lsn, record);
+ break;
+ case XLOG_BRIN_UPDATE:
+ brin_xlog_update(lsn, record);
+ break;
+ case XLOG_BRIN_SAMEPAGE_UPDATE:
+ brin_xlog_samepage_update(lsn, record);
+ break;
+ case XLOG_BRIN_REVMAP_EXTEND:
+ brin_xlog_revmap_extend(lsn, record);
+ break;
+ default:
+ elog(PANIC, "brin_redo: unknown op code %u", info);
+ }
+}
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index e0b81b9eb51..c55a7758273 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -209,6 +209,13 @@ static relopt_int intRelOpts[] =
RELOPT_KIND_HEAP | RELOPT_KIND_TOAST
}, -1, 0, 2000000000
},
+ {
+ {
+ "pages_per_range",
+ "Number of pages that each page range covers in a BRIN index",
+ RELOPT_KIND_BRIN
+ }, 128, 1, 131072
+ },
/* list terminator */
{{NULL}}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8f671ac4342..43098f44422 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -272,6 +272,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
scan->rs_startblock = 0;
}
+ scan->rs_initblock = 0;
+ scan->rs_numblocks = InvalidBlockNumber;
scan->rs_inited = false;
scan->rs_ctup.t_data = NULL;
ItemPointerSetInvalid(&scan->rs_ctup.t_self);
@@ -297,6 +299,14 @@ initscan(HeapScanDesc scan, ScanKey key, bool is_rescan)
pgstat_count_heap_scan(scan->rs_rd);
}
+void
+heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
+{
+ scan->rs_startblock = startBlk;
+ scan->rs_initblock = startBlk;
+ scan->rs_numblocks = numBlks;
+}
+
/*
* heapgetpage - subroutine for heapgettup()
*
@@ -637,7 +647,8 @@ heapgettup(HeapScanDesc scan,
*/
if (backward)
{
- finished = (page == scan->rs_startblock);
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
if (page == 0)
page = scan->rs_nblocks;
page--;
@@ -647,7 +658,8 @@ heapgettup(HeapScanDesc scan,
page++;
if (page >= scan->rs_nblocks)
page = 0;
- finished = (page == scan->rs_startblock);
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
/*
* Report our new scan position for synchronization purposes. We
@@ -898,7 +910,8 @@ heapgettup_pagemode(HeapScanDesc scan,
*/
if (backward)
{
- finished = (page == scan->rs_startblock);
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
if (page == 0)
page = scan->rs_nblocks;
page--;
@@ -908,7 +921,8 @@ heapgettup_pagemode(HeapScanDesc scan,
page++;
if (page >= scan->rs_nblocks)
page = 0;
- finished = (page == scan->rs_startblock);
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
/*
* Report our new scan position for synchronization purposes. We
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index 7d092d205d6..32cb985036c 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -8,7 +8,8 @@ subdir = src/backend/access/rmgrdesc
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = clogdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
+OBJS = brindesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
+ hashdesc.o heapdesc.o \
mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
diff --git a/src/backend/access/rmgrdesc/brindesc.c b/src/backend/access/rmgrdesc/brindesc.c
new file mode 100644
index 00000000000..39135bf52e7
--- /dev/null
+++ b/src/backend/access/rmgrdesc/brindesc.c
@@ -0,0 +1,112 @@
+/*-------------------------------------------------------------------------
+ *
+ * brindesc.c
+ * rmgr descriptor routines for BRIN indexes
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/brindesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/brin_xlog.h"
+
+void
+brin_desc(StringInfo buf, XLogRecord *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ info &= XLOG_BRIN_OPMASK;
+ if (info == XLOG_BRIN_CREATE_INDEX)
+ {
+ xl_brin_createidx *xlrec = (xl_brin_createidx *) rec;
+
+ appendStringInfo(buf, "v%d pagesPerRange %u rel %u/%u/%u",
+ xlrec->version, xlrec->pagesPerRange,
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode);
+ }
+ else if (info == XLOG_BRIN_INSERT)
+ {
+ xl_brin_insert *xlrec = (xl_brin_insert *) rec;
+
+ appendStringInfo(buf, "rel %u/%u/%u heapBlk %u revmapBlk %u pagesPerRange %u TID (%u,%u)",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode,
+ xlrec->heapBlk, xlrec->revmapBlk,
+ xlrec->pagesPerRange,
+ ItemPointerGetBlockNumber(&xlrec->tid),
+ ItemPointerGetOffsetNumber(&xlrec->tid));
+ }
+ else if (info == XLOG_BRIN_UPDATE)
+ {
+ xl_brin_update *xlrec = (xl_brin_update *) rec;
+
+ appendStringInfo(buf, "rel %u/%u/%u heapBlk %u revmapBlk %u pagesPerRange %u old TID (%u,%u) TID (%u,%u)",
+ xlrec->new.node.spcNode, xlrec->new.node.dbNode,
+ xlrec->new.node.relNode,
+ xlrec->new.heapBlk, xlrec->new.revmapBlk,
+ xlrec->new.pagesPerRange,
+ ItemPointerGetBlockNumber(&xlrec->oldtid),
+ ItemPointerGetOffsetNumber(&xlrec->oldtid),
+ ItemPointerGetBlockNumber(&xlrec->new.tid),
+ ItemPointerGetOffsetNumber(&xlrec->new.tid));
+ }
+ else if (info == XLOG_BRIN_SAMEPAGE_UPDATE)
+ {
+ xl_brin_samepage_update *xlrec = (xl_brin_samepage_update *) rec;
+
+ appendStringInfo(buf, "rel %u/%u/%u TID (%u,%u)",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode,
+ ItemPointerGetBlockNumber(&xlrec->tid),
+ ItemPointerGetOffsetNumber(&xlrec->tid));
+ }
+ else if (info == XLOG_BRIN_REVMAP_EXTEND)
+ {
+ xl_brin_revmap_extend *xlrec = (xl_brin_revmap_extend *) rec;
+
+ appendStringInfo(buf, "rel %u/%u/%u targetBlk %u",
+ xlrec->node.spcNode, xlrec->node.dbNode,
+ xlrec->node.relNode, xlrec->targetBlk);
+ }
+}
+
+const char *
+brin_identify(uint8 info)
+{
+ const char *id = NULL;
+
+ switch (info & ~XLR_INFO_MASK)
+ {
+ case XLOG_BRIN_CREATE_INDEX:
+ id = "CREATE_INDEX";
+ break;
+ case XLOG_BRIN_INSERT:
+ id = "INSERT";
+ break;
+ case XLOG_BRIN_INSERT | XLOG_BRIN_INIT_PAGE:
+ id = "INSERT+INIT";
+ break;
+ case XLOG_BRIN_UPDATE:
+ id = "UPDATE";
+ break;
+ case XLOG_BRIN_UPDATE | XLOG_BRIN_INIT_PAGE:
+ id = "UPDATE+INIT";
+ break;
+ case XLOG_BRIN_SAMEPAGE_UPDATE:
+ id = "SAMEPAGE_UPDATE";
+ break;
+ case XLOG_BRIN_REVMAP_EXTEND:
+ id = "REVMAP_EXTEND";
+ break;
+ }
+
+ return id;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 2645a7a3685..befd60f2d37 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -12,6 +12,7 @@
#include "access/gist_private.h"
#include "access/hash.h"
#include "access/heapam_xlog.h"
+#include "access/brin_xlog.h"
#include "access/multixact.h"
#include "access/nbtree.h"
#include "access/spgist.h"
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 0c31aa95d70..912038a712e 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2104,6 +2104,27 @@ IndexBuildHeapScan(Relation heapRelation,
IndexBuildCallback callback,
void *callback_state)
{
+ return IndexBuildHeapRangeScan(heapRelation, indexRelation,
+ indexInfo, allow_sync,
+ 0, InvalidBlockNumber,
+ callback, callback_state);
+}
+
+/*
+ * As above, except that instead of scanning the complete heap, only the given
+ * number of blocks are scanned. Scan to end-of-rel can be signalled by
+ * passing InvalidBlockNumber as numblocks.
+ */
+double
+IndexBuildHeapRangeScan(Relation heapRelation,
+ Relation indexRelation,
+ IndexInfo *indexInfo,
+ bool allow_sync,
+ BlockNumber start_blockno,
+ BlockNumber numblocks,
+ IndexBuildCallback callback,
+ void *callback_state)
+{
bool is_system_catalog;
bool checking_uniqueness;
HeapScanDesc scan;
@@ -2174,6 +2195,9 @@ IndexBuildHeapScan(Relation heapRelation,
true, /* buffer access strategy OK */
allow_sync); /* syncscan OK? */
+ /* set our scan endpoints */
+ heap_setscanlimits(scan, start_blockno, numblocks);
+
reltuples = 0;
/*
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9f1b20e04ab..8e78aafda7c 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -132,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
case RM_GIST_ID:
case RM_SEQ_ID:
case RM_SPGIST_ID:
+ case RM_BRIN_ID:
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 6351a9bea47..2b858c82719 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -399,7 +399,8 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
}
/*
- * sorting support for PageRepairFragmentation and PageIndexMultiDelete
+ * sorting support for PageRepairFragmentation, PageIndexMultiDelete,
+ * PageIndexDeleteNoCompact
*/
typedef struct itemIdSortData
{
@@ -896,6 +897,182 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
phdr->pd_upper = upper;
}
+/*
+ * PageIndexDeleteNoCompact
+ * Delete the given items for an index page, and defragment the resulting
+ * free space, but do not compact the item pointers array.
+ *
+ * itemnos is the array of tuples to delete; nitems is its size. maxIdxTuples
+ * is the maximum number of tuples that can exist in a page.
+ *
+ * Unused items at the end of the array are removed.
+ *
+ * This is used for index AMs that require that existing TIDs of live tuples
+ * remain unchanged.
+ */
+void
+PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
+{
+ PageHeader phdr = (PageHeader) page;
+ LocationIndex pd_lower = phdr->pd_lower;
+ LocationIndex pd_upper = phdr->pd_upper;
+ LocationIndex pd_special = phdr->pd_special;
+ int nline;
+ bool empty;
+ OffsetNumber offnum;
+ int nextitm;
+
+ /*
+ * As with PageRepairFragmentation, paranoia seems justified.
+ */
+ if (pd_lower < SizeOfPageHeaderData ||
+ pd_lower > pd_upper ||
+ pd_upper > pd_special ||
+ pd_special > BLCKSZ ||
+ pd_special != MAXALIGN(pd_special))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
+ pd_lower, pd_upper, pd_special)));
+
+ /*
+ * Scan the existing item pointer array and mark as unused those that are
+ * in our kill-list; make sure any non-interesting ones are marked unused
+ * as well.
+ */
+ nline = PageGetMaxOffsetNumber(page);
+ empty = true;
+ nextitm = 0;
+ for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
+ {
+ ItemId lp;
+ ItemLength itemlen;
+ ItemOffset offset;
+
+ lp = PageGetItemId(page, offnum);
+
+ itemlen = ItemIdGetLength(lp);
+ offset = ItemIdGetOffset(lp);
+
+ if (ItemIdIsUsed(lp))
+ {
+ if (offset < pd_upper ||
+ (offset + itemlen) > pd_special ||
+ offset != MAXALIGN(offset))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("corrupted item pointer: offset = %u, length = %u",
+ offset, (unsigned int) itemlen)));
+
+ if (nextitm < nitems && offnum == itemnos[nextitm])
+ {
+ /* this one is on our list to delete, so mark it unused */
+ ItemIdSetUnused(lp);
+ nextitm++;
+ }
+ else if (ItemIdHasStorage(lp))
+ {
+ /* This one's live -- must do the compaction dance */
+ empty = false;
+ }
+ else
+ {
+ /* get rid of this one too */
+ ItemIdSetUnused(lp);
+ }
+ }
+ }
+
+ /* this will catch invalid or out-of-order itemnos[] */
+ if (nextitm != nitems)
+ elog(ERROR, "incorrect index offsets supplied");
+
+ if (empty)
+ {
+ /* Page is completely empty, so just reset it quickly */
+ phdr->pd_lower = SizeOfPageHeaderData;
+ phdr->pd_upper = pd_special;
+ }
+ else
+ {
+ /* There are live items: need to compact the page the hard way */
+ itemIdSortData itemidbase[MaxOffsetNumber];
+ itemIdSort itemidptr;
+ int i;
+ Size totallen;
+ Offset upper;
+
+ /*
+ * Scan the page taking note of each item that we need to preserve.
+ * This includes both live items (those that contain data) and
+ * interspersed unused ones. It's critical to preserve these unused
+ * items, because otherwise the offset numbers for later live items
+ * would change, which is not acceptable. Unused items might get used
+ * again later; that is fine.
+ */
+ itemidptr = itemidbase;
+ totallen = 0;
+ for (i = 0; i < nline; i++, itemidptr++)
+ {
+ ItemId lp;
+
+ itemidptr->offsetindex = i;
+
+ lp = PageGetItemId(page, i + 1);
+ if (ItemIdHasStorage(lp))
+ {
+ itemidptr->itemoff = ItemIdGetOffset(lp);
+ itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
+ totallen += itemidptr->alignedlen;
+ }
+ else
+ {
+ itemidptr->itemoff = 0;
+ itemidptr->alignedlen = 0;
+ }
+ }
+ /* By here, there are exactly nline elements in itemidbase array */
+
+ if (totallen > (Size) (pd_special - pd_lower))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("corrupted item lengths: total %u, available space %u",
+ (unsigned int) totallen, pd_special - pd_lower)));
+
+ /* sort itemIdSortData array into decreasing itemoff order */
+ qsort((char *) itemidbase, nline, sizeof(itemIdSortData),
+ itemoffcompare);
+
+ /*
+ * Defragment the data areas of each tuple, being careful to preserve
+ * each item's position in the linp array.
+ */
+ upper = pd_special;
+ PageClearHasFreeLinePointers(page);
+ for (i = 0, itemidptr = itemidbase; i < nline; i++, itemidptr++)
+ {
+ ItemId lp;
+
+ lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+ if (itemidptr->alignedlen == 0)
+ {
+ PageSetHasFreeLinePointers(page);
+ ItemIdSetUnused(lp);
+ continue;
+ }
+ upper -= itemidptr->alignedlen;
+ memmove((char *) page + upper,
+ (char *) page + itemidptr->itemoff,
+ itemidptr->alignedlen);
+ lp->lp_off = upper;
+ /* lp_flags and lp_len remain the same as originally */
+ }
+
+ /* Set the new page limits */
+ phdr->pd_upper = upper;
+ phdr->pd_lower = SizeOfPageHeaderData + i * sizeof(ItemIdData);
+ }
+}
/*
* Set checksum for a page in shared buffers.
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index e932ccf0da5..ea9150b23f0 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -6081,7 +6081,7 @@ genericcostestimate(PlannerInfo *root,
else
numIndexPages = 1.0;
- /* fetch estimated page cost for schema containing index */
+ /* fetch estimated page cost for tablespace containing index */
get_tablespace_page_costs(index->reltablespace,
&spc_random_page_cost,
NULL);
@@ -7162,7 +7162,7 @@ gincostestimate(PG_FUNCTION_ARGS)
JOIN_INNER,
NULL);
- /* fetch estimated page cost for schema containing index */
+ /* fetch estimated page cost for tablespace containing index */
get_tablespace_page_costs(index->reltablespace,
&spc_random_page_cost,
NULL);
@@ -7349,3 +7349,73 @@ gincostestimate(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+/*
+ * BRIN has search behavior completely different from other index types
+ */
+Datum
+brincostestimate(PG_FUNCTION_ARGS)
+{
+ PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0);
+ IndexPath *path = (IndexPath *) PG_GETARG_POINTER(1);
+ double loop_count = PG_GETARG_FLOAT8(2);
+ Cost *indexStartupCost = (Cost *) PG_GETARG_POINTER(3);
+ Cost *indexTotalCost = (Cost *) PG_GETARG_POINTER(4);
+ Selectivity *indexSelectivity = (Selectivity *) PG_GETARG_POINTER(5);
+ double *indexCorrelation = (double *) PG_GETARG_POINTER(6);
+ IndexOptInfo *index = path->indexinfo;
+ List *indexQuals = path->indexquals;
+ List *indexOrderBys = path->indexorderbys;
+ double numPages = index->pages;
+ double numTuples = index->tuples;
+ Cost spc_seq_page_cost;
+ Cost spc_random_page_cost;
+ QualCost index_qual_cost;
+ double qual_op_cost;
+ double qual_arg_cost;
+
+ /* fetch estimated page cost for tablespace containing index */
+ get_tablespace_page_costs(index->reltablespace,
+ &spc_random_page_cost,
+ &spc_seq_page_cost);
+
+ /*
+ * BRIN indexes are always read in full; use that as startup cost.
+ * XXX maybe only include revmap pages here?
+ */
+ *indexStartupCost = spc_seq_page_cost * numPages * loop_count;
+
+ /*
+ * To read a BRIN index there might be a bit of back and forth over regular
+ * pages, as revmap might point to them out of sequential order; calculate
+ * this as reading the whole index in random order.
+ */
+ *indexTotalCost = spc_random_page_cost * numPages * loop_count;
+
+ *indexSelectivity =
+ clauselist_selectivity(root, path->indexquals,
+ path->indexinfo->rel->relid,
+ JOIN_INNER, NULL);
+ *indexCorrelation = 1;
+
+ /*
+ * Add on index qual eval costs, much as in genericcostestimate.
+ */
+ cost_qual_eval(&index_qual_cost, indexQuals, root);
+ qual_arg_cost = index_qual_cost.startup + index_qual_cost.per_tuple;
+ cost_qual_eval(&index_qual_cost, indexOrderBys, root);
+ qual_arg_cost += index_qual_cost.startup + index_qual_cost.per_tuple;
+ qual_op_cost = cpu_operator_cost *
+ (list_length(indexQuals) + list_length(indexOrderBys));
+ qual_arg_cost -= qual_op_cost;
+ if (qual_arg_cost < 0) /* just in case... */
+ qual_arg_cost = 0;
+
+ *indexStartupCost += qual_arg_cost;
+ *indexTotalCost += qual_arg_cost;
+ *indexTotalCost += (numTuples * *indexSelectivity) * (cpu_index_tuple_cost + qual_op_cost);
+
+ /* XXX what about pages_per_range? */
+
+ PG_RETURN_VOID();
+}