aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/matview.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/matview.c')
-rw-r--r--src/backend/commands/matview.c374
1 files changed, 374 insertions, 0 deletions
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
new file mode 100644
index 00000000000..e040bedb7e5
--- /dev/null
+++ b/src/backend/commands/matview.c
@@ -0,0 +1,374 @@
+/*-------------------------------------------------------------------------
+ *
+ * matview.c
+ * materialized view support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/commands/matview.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/multixact.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/heap.h"
+#include "catalog/namespace.h"
+#include "commands/cluster.h"
+#include "commands/matview.h"
+#include "commands/tablecmds.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "rewrite/rewriteHandler.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/snapmgr.h"
+
+
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ Oid transientoid; /* OID of new heap into which to store */
+ /* These fields are filled by transientrel_startup: */
+ Relation transientrel; /* relation to write to */
+ CommandId output_cid; /* cmin to insert in output tuples */
+ int hi_options; /* heap_insert performance options */
+ BulkInsertState bistate; /* bulk insert state */
+} DR_transientrel;
+
+static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
+static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
+static void transientrel_shutdown(DestReceiver *self);
+static void transientrel_destroy(DestReceiver *self);
+static void refresh_matview_datafill(DestReceiver *dest, Query *query,
+ const char *queryString);
+
+/*
+ * SetRelationIsScannable
+ * Make the relation appear scannable.
+ *
+ * NOTE: This is only implemented for materialized views. The heap starts out
+ * in a state that doesn't look scannable, and can only transition from there
+ * to scannable, unless a new heap is created.
+ *
+ * NOTE: caller must be holding an appropriate lock on the relation.
+ */
+void
+SetRelationIsScannable(Relation relation)
+{
+ Page page;
+
+ Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
+ Assert(relation->rd_isscannable == false);
+
+ RelationOpenSmgr(relation);
+ page = (Page) palloc(BLCKSZ);
+ PageInit(page, BLCKSZ, 0);
+ smgrextend(relation->rd_smgr, MAIN_FORKNUM, 0, (char *) page, true);
+ pfree(page);
+
+ smgrimmedsync(relation->rd_smgr, MAIN_FORKNUM);
+
+ RelationCacheInvalidateEntry(relation->rd_id);
+}
+
+/*
+ * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
+ *
+ * This refreshes the materialized view by creating a new table and swapping
+ * the relfilenodes of the new table and the old materialized view, so the OID
+ * of the original materialized view is preserved. Thus we do not lose GRANT
+ * nor references to this materialized view.
+ *
+ * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
+ * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
+ * statement associated with the materialized view. The statement node's
+ * skipData field is used to indicate that the clause was used.
+ *
+ * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
+ * the new heap, it's better to create the indexes afterwards than to fill them
+ * incrementally while we load.
+ *
+ * The scannable state is changed based on whether the contents reflect the
+ * result set of the materialized view's query.
+ */
+void
+ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
+ ParamListInfo params, char *completionTag)
+{
+ Oid matviewOid;
+ Relation matviewRel;
+ RewriteRule *rule;
+ List *actions;
+ Query *dataQuery;
+ Oid tableSpace;
+ Oid OIDNewHeap;
+ DestReceiver *dest;
+
+ /*
+ * Get a lock until end of transaction.
+ */
+ matviewOid = RangeVarGetRelidExtended(stmt->relation,
+ AccessExclusiveLock, false, false,
+ RangeVarCallbackOwnsTable, NULL);
+ matviewRel = heap_open(matviewOid, NoLock);
+
+ /* Make sure it is a materialized view. */
+ if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("\"%s\" is not a materialized view",
+ RelationGetRelationName(matviewRel))));
+
+ /*
+ * We're not using materialized views in the system catalogs.
+ */
+ Assert(!IsSystemRelation(matviewRel));
+
+ Assert(!matviewRel->rd_rel->relhasoids);
+
+ /*
+ * Check that everything is correct for a refresh. Problems at this point
+ * are internal errors, so elog is sufficient.
+ */
+ if (matviewRel->rd_rel->relhasrules == false ||
+ matviewRel->rd_rules->numLocks < 1)
+ elog(ERROR,
+ "materialized view \"%s\" is missing rewrite information",
+ RelationGetRelationName(matviewRel));
+
+ if (matviewRel->rd_rules->numLocks > 1)
+ elog(ERROR,
+ "materialized view \"%s\" has too many rules",
+ RelationGetRelationName(matviewRel));
+
+ rule = matviewRel->rd_rules->rules[0];
+ if (rule->event != CMD_SELECT || !(rule->isInstead))
+ elog(ERROR,
+ "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
+ RelationGetRelationName(matviewRel));
+
+ actions = rule->actions;
+ if (list_length(actions) != 1)
+ elog(ERROR,
+ "the rule for materialized view \"%s\" is not a single action",
+ RelationGetRelationName(matviewRel));
+
+ /*
+ * The stored query was rewritten at the time of the MV definition, but
+ * has not been scribbled on by the planner.
+ */
+ dataQuery = (Query *) linitial(actions);
+ Assert(IsA(dataQuery, Query));
+
+ /*
+ * Check for active uses of the relation in the current transaction, such
+ * as open scans.
+ *
+ * NB: We count on this to protect us against problems with refreshing the
+ * data using HEAP_INSERT_FROZEN.
+ */
+ CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
+
+ tableSpace = matviewRel->rd_rel->reltablespace;
+
+ heap_close(matviewRel, NoLock);
+
+ /* Create the transient table that will receive the regenerated data. */
+ OIDNewHeap = make_new_heap(matviewOid, tableSpace);
+ dest = CreateTransientRelDestReceiver(OIDNewHeap);
+
+ if (!stmt->skipData)
+ refresh_matview_datafill(dest, dataQuery, queryString);
+
+ /*
+ * Swap the physical files of the target and transient tables, then
+ * rebuild the target's indexes and throw away the transient table.
+ */
+ finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, RecentXmin,
+ ReadNextMultiXactId());
+
+ RelationCacheInvalidateEntry(matviewOid);
+}
+
+/*
+ * refresh_matview_datafill
+ */
+static void
+refresh_matview_datafill(DestReceiver *dest, Query *query,
+ const char *queryString)
+{
+ List *rewritten;
+ PlannedStmt *plan;
+ QueryDesc *queryDesc;
+ List *rtable;
+ RangeTblEntry *initial_rte;
+ RangeTblEntry *second_rte;
+
+ rewritten = QueryRewrite((Query *) copyObject(query));
+
+ /* SELECT should never rewrite to more or less than one SELECT query */
+ if (list_length(rewritten) != 1)
+ elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
+ query = (Query *) linitial(rewritten);
+
+ /* Check for user-requested abort. */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Kludge here to allow refresh of a materialized view which is invalid
+ * (that is, it was created or refreshed WITH NO DATA. We flag the first
+ * two RangeTblEntry list elements, which were added to the front of the
+ * rewritten Query to keep the rules system happy, with the isResultRel
+ * flag to indicate that it is OK if they are flagged as invalid. See
+ * UpdateRangeTableOfViewParse() for details.
+ *
+ * NOTE: The rewrite has switched the frist two RTEs, but they are still
+ * in the first two positions. If that behavior changes, the asserts here
+ * will fail.
+ */
+ rtable = query->rtable;
+ initial_rte = ((RangeTblEntry *) linitial(rtable));
+ Assert(strcmp(initial_rte->alias->aliasname, "new"));
+ initial_rte->isResultRel = true;
+ second_rte = ((RangeTblEntry *) lsecond(rtable));
+ Assert(strcmp(second_rte->alias->aliasname, "old"));
+ second_rte->isResultRel = true;
+
+ /* Plan the query which will generate data for the refresh. */
+ plan = pg_plan_query(query, 0, NULL);
+
+ /*
+ * Use a snapshot with an updated command ID to ensure this query sees
+ * results of any previously executed queries. (This could only matter if
+ * the planner executed an allegedly-stable function that changed the
+ * database contents, but let's do it anyway to be safe.)
+ */
+ PushCopiedSnapshot(GetActiveSnapshot());
+ UpdateActiveSnapshotCommandId();
+
+ /* Create a QueryDesc, redirecting output to our tuple receiver */
+ queryDesc = CreateQueryDesc(plan, queryString,
+ GetActiveSnapshot(), InvalidSnapshot,
+ dest, NULL, 0);
+
+ /* call ExecutorStart to prepare the plan for execution */
+ ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
+
+ /* run the plan */
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+
+ /* and clean up */
+ ExecutorFinish(queryDesc);
+ ExecutorEnd(queryDesc);
+
+ FreeQueryDesc(queryDesc);
+
+ PopActiveSnapshot();
+}
+
+DestReceiver *
+CreateTransientRelDestReceiver(Oid transientoid)
+{
+ DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
+
+ self->pub.receiveSlot = transientrel_receive;
+ self->pub.rStartup = transientrel_startup;
+ self->pub.rShutdown = transientrel_shutdown;
+ self->pub.rDestroy = transientrel_destroy;
+ self->pub.mydest = DestTransientRel;
+ self->transientoid = transientoid;
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * transientrel_startup --- executor startup
+ */
+static void
+transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ DR_transientrel *myState = (DR_transientrel *) self;
+ Relation transientrel;
+
+ transientrel = heap_open(myState->transientoid, NoLock);
+
+ /*
+ * Fill private fields of myState for use by later routines
+ */
+ myState->transientrel = transientrel;
+ myState->output_cid = GetCurrentCommandId(true);
+
+ /*
+ * We can skip WAL-logging the insertions, unless PITR or streaming
+ * replication is in use. We can skip the FSM in any case.
+ */
+ myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
+ if (!XLogIsNeeded())
+ myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+ myState->bistate = GetBulkInsertState();
+
+ SetRelationIsScannable(transientrel);
+
+ /* Not using WAL requires smgr_targblock be initially invalid */
+ Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
+}
+
+/*
+ * transientrel_receive --- receive one tuple
+ */
+static void
+transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_transientrel *myState = (DR_transientrel *) self;
+ HeapTuple tuple;
+
+ /*
+ * get the heap tuple out of the tuple table slot, making sure we have a
+ * writable copy
+ */
+ tuple = ExecMaterializeSlot(slot);
+
+ heap_insert(myState->transientrel,
+ tuple,
+ myState->output_cid,
+ myState->hi_options,
+ myState->bistate);
+
+ /* We know this is a newly created relation, so there are no indexes */
+}
+
+/*
+ * transientrel_shutdown --- executor end
+ */
+static void
+transientrel_shutdown(DestReceiver *self)
+{
+ DR_transientrel *myState = (DR_transientrel *) self;
+
+ FreeBulkInsertState(myState->bistate);
+
+ /* If we skipped using WAL, must heap_sync before commit */
+ if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(myState->transientrel);
+
+ /* close transientrel, but keep lock until commit */
+ heap_close(myState->transientrel, NoLock);
+ myState->transientrel = NULL;
+}
+
+/*
+ * transientrel_destroy --- release DestReceiver object
+ */
+static void
+transientrel_destroy(DestReceiver *self)
+{
+ pfree(self);
+}