diff options
author | Kevin Grittner <kgrittn@postgresql.org> | 2013-07-16 12:55:44 -0500 |
---|---|---|
committer | Kevin Grittner <kgrittn@postgresql.org> | 2013-07-16 12:55:44 -0500 |
commit | cc1965a99bf87005f431804bbda0f723887a04d6 (patch) | |
tree | 694801e2e7a34a1247ad7858b9c81ff16a90ac39 /src/backend/commands/matview.c | |
parent | 7f7485a0cde92aa4ba235a1ffe4dda0ca0b6cc9a (diff) | |
download | postgresql-cc1965a99bf87005f431804bbda0f723887a04d6.tar.gz postgresql-cc1965a99bf87005f431804bbda0f723887a04d6.zip |
Add support for REFRESH MATERIALIZED VIEW CONCURRENTLY.
This allows reads to continue without any blocking while a REFRESH
runs. The new data appears atomically as part of transaction
commit.
Review questioned the Assert that a matview was not a system
relation. This will be addressed separately.
Reviewed by Hitoshi Harada, Robert Haas, Andres Freund.
Merged after review with security patch f3ab5d4.
Diffstat (limited to 'src/backend/commands/matview.c')
-rw-r--r-- | src/backend/commands/matview.c | 524 |
1 files changed, 487 insertions, 37 deletions
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1c383baf687..edd34ff1716 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -20,17 +20,24 @@ #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_operator.h" #include "commands/cluster.h" #include "commands/matview.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "executor/executor.h" +#include "executor/spi.h" #include "miscadmin.h" +#include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/typcache.h" typedef struct @@ -44,12 +51,23 @@ typedef struct BulkInsertState bistate; /* bulk insert state */ } DR_transientrel; +static int matview_maintenance_depth = 0; + static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static void refresh_matview_datafill(DestReceiver *dest, Query *query, - const char *queryString); + const char *queryString, Oid relowner); + +static char *make_temptable_name_n(char *tempname, int n); +static void mv_GenerateOper(StringInfo buf, Oid opoid); + +static void refresh_by_match_merge(Oid matviewOid, Oid tempOid); +static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap); + +static void OpenMatViewIncrementalMaintenance(void); +static void CloseMatViewIncrementalMaintenance(void); /* * SetMatViewPopulatedState @@ -122,18 +140,21 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, RewriteRule *rule; List *actions; Query *dataQuery; - Oid save_userid; - int save_sec_context; - int save_nestlevel; Oid tableSpace; Oid OIDNewHeap; DestReceiver *dest; + bool concurrent; + LOCKMODE lockmode; + + /* Determine strength of lock needed. */ + concurrent = stmt->concurrent; + lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; /* * Get a lock until end of transaction. */ matviewOid = RangeVarGetRelidExtended(stmt->relation, - AccessExclusiveLock, false, false, + lockmode, false, false, RangeVarCallbackOwnsTable, NULL); matviewRel = heap_open(matviewOid, NoLock); @@ -144,11 +165,22 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, errmsg("\"%s\" is not a materialized view", RelationGetRelationName(matviewRel)))); - /* - * We're not using materialized views in the system catalogs. - */ + /* Check that CONCURRENTLY is not specified if not populated. */ + if (concurrent && !RelationIsPopulated(matviewRel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CONCURRENTLY cannot be used when the materialized view is not populated"))); + + /* Check that conflicting options have not been specified. */ + if (concurrent && stmt->skipData) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together"))); + + /* We're not using materialized views in the system catalogs. */ Assert(!IsSystemRelation(matviewRel)); + /* We don't allow an oid column for a materialized view. */ Assert(!matviewRel->rd_rel->relhasoids); /* @@ -195,47 +227,48 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); /* - * Switch to the owner's userid, so that any functions are run as that - * user. Also lock down security-restricted operations and arrange to - * make GUC variable changes local to this command. - */ - GetUserIdAndSecContext(&save_userid, &save_sec_context); - SetUserIdAndSecContext(matviewRel->rd_rel->relowner, - save_sec_context | SECURITY_RESTRICTED_OPERATION); - save_nestlevel = NewGUCNestLevel(); - - /* * Tentatively mark the matview as populated or not (this will roll back * if we fail later). */ SetMatViewPopulatedState(matviewRel, !stmt->skipData); - tableSpace = matviewRel->rd_rel->reltablespace; + /* Concurrent refresh builds new data in temp tablespace, and does diff. */ + if (concurrent) + tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP); + else + tableSpace = matviewRel->rd_rel->reltablespace; heap_close(matviewRel, NoLock); /* Create the transient table that will receive the regenerated data. */ - OIDNewHeap = make_new_heap(matviewOid, tableSpace); + OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent, + ExclusiveLock); dest = CreateTransientRelDestReceiver(OIDNewHeap); /* Generate the data, if wanted. */ if (!stmt->skipData) - refresh_matview_datafill(dest, dataQuery, queryString); - - /* - * Swap the physical files of the target and transient tables, then - * rebuild the target's indexes and throw away the transient table. - */ - finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, - RecentXmin, ReadNextMultiXactId()); - - RelationCacheInvalidateEntry(matviewOid); - - /* Roll back any GUC changes */ - AtEOXact_GUC(false, save_nestlevel); - - /* Restore userid and security context */ - SetUserIdAndSecContext(save_userid, save_sec_context); + refresh_matview_datafill(dest, dataQuery, queryString, + matviewRel->rd_rel->relowner); + + /* Make the matview match the newly generated data. */ + if (concurrent) + { + int old_depth = matview_maintenance_depth; + + PG_TRY(); + { + refresh_by_match_merge(matviewOid, OIDNewHeap); + } + PG_CATCH(); + { + matview_maintenance_depth = old_depth; + PG_RE_THROW(); + } + PG_END_TRY(); + Assert(matview_maintenance_depth == old_depth); + } + else + refresh_by_heap_swap(matviewOid, OIDNewHeap); } /* @@ -243,11 +276,24 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ static void refresh_matview_datafill(DestReceiver *dest, Query *query, - const char *queryString) + const char *queryString, Oid relowner) { List *rewritten; PlannedStmt *plan; QueryDesc *queryDesc; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); /* Rewrite, copying the given Query to make sure it's not changed */ rewritten = QueryRewrite((Query *) copyObject(query)); @@ -290,6 +336,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, FreeQueryDesc(queryDesc); PopActiveSnapshot(); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); } DestReceiver * @@ -388,3 +440,401 @@ transientrel_destroy(DestReceiver *self) { pfree(self); } + + +/* + * Given a qualified temporary table name, append an underscore followed by + * the given integer, to make a new table name based on the old one. + * + * This leaks memory through palloc(), which won't be cleaned up until the + * current memory memory context is freed. + */ +static char * +make_temptable_name_n(char *tempname, int n) +{ + StringInfoData namebuf; + + initStringInfo(&namebuf); + appendStringInfoString(&namebuf, tempname); + appendStringInfo(&namebuf, "_%i", n); + return namebuf.data; +} + +static void +mv_GenerateOper(StringInfo buf, Oid opoid) +{ + HeapTuple opertup; + Form_pg_operator operform; + + opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid)); + if (!HeapTupleIsValid(opertup)) + elog(ERROR, "cache lookup failed for operator %u", opoid); + operform = (Form_pg_operator) GETSTRUCT(opertup); + Assert(operform->oprkind == 'b'); + + appendStringInfo(buf, "OPERATOR(%s.%s)", + quote_identifier(get_namespace_name(operform->oprnamespace)), + NameStr(operform->oprname)); + + ReleaseSysCache(opertup); +} + +/* + * refresh_by_match_merge + * + * Refresh a materialized view with transactional semantics, while allowing + * concurrent reads. + * + * This is called after a new version of the data has been created in a + * temporary table. It performs a full outer join against the old version of + * the data, producing "diff" results. This join cannot work if there are any + * duplicated rows in either the old or new versions, in the sense that every + * column would compare as equal between the two rows. It does work correctly + * in the face of rows which have at least one NULL value, with all non-NULL + * columns equal. The behavior of NULLs on equality tests and on UNIQUE + * indexes turns out to be quite convenient here; the tests we need to make + * are consistent with default behavior. If there is at least one UNIQUE + * index on the materialized view, we have exactly the guarantee we need. By + * joining based on equality on all columns which are part of any unique + * index, we identify the rows on which we can use UPDATE without any problem. + * If any column is NULL in either the old or new version of a row (or both), + * we must use DELETE and INSERT, since there could be multiple rows which are + * NOT DISTINCT FROM each other, and we could otherwise end up with the wrong + * number of occurrences in the updated relation. The temporary table used to + * hold the diff results contains just the TID of the old record (if matched) + * and the ROW from the new table as a single column of complex record type + * (if matched). + * + * Once we have the diff table, we perform set-based DELETE, UPDATE, and + * INSERT operations against the materialized view, and discard both temporary + * tables. + * + * Everything from the generation of the new data to applying the differences + * takes place under cover of an ExclusiveLock, since it seems as though we + * would want to prohibit not only concurrent REFRESH operations, but also + * incremental maintenance. It also doesn't seem reasonable or safe to allow + * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by + * this command. + */ +static void +refresh_by_match_merge(Oid matviewOid, Oid tempOid) +{ + StringInfoData querybuf; + Relation matviewRel; + Relation tempRel; + char *matviewname; + char *tempname; + char *diffname; + TupleDesc tupdesc; + bool foundUniqueIndex; + List *indexoidlist; + ListCell *indexoidscan; + int16 relnatts; + bool *usedForQual; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + initStringInfo(&querybuf); + matviewRel = heap_open(matviewOid, NoLock); + matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + tempRel = heap_open(tempOid, NoLock); + tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)), + RelationGetRelationName(tempRel)); + diffname = make_temptable_name_n(tempname, 2); + + relnatts = matviewRel->rd_rel->relnatts; + usedForQual = (bool *) palloc0(sizeof(bool) * relnatts); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* Analyze the temp table with the new contents. */ + appendStringInfo(&querybuf, "ANALYZE %s", tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We need to ensure that there are not duplicate rows without NULLs in + * the new data set before we can count on the "diff" results. Check for + * that in a way that allows showing the first duplicated row found. Even + * after we pass this test, a unique index on the materialized view may + * find a duplicate key problem. + */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS " + "(SELECT * FROM %s y WHERE y IS NOT NULL " + "AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1", + tempname, tempname); + if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + if (SPI_processed > 0) + { + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + errmsg("new data for \"%s\" contains duplicate rows without any NULL columns", + RelationGetRelationName(matviewRel)), + errdetail("Row: %s", + SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1)))); + } + + /* Start building the query for creating the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "CREATE TEMP TABLE %s AS " + "SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON (", + diffname, matviewname, tempname); + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache. We will test for equality on all + * columns present in all unique indexes which only reference columns and + * include all rows. + */ + tupdesc = matviewRel->rd_att; + foundUniqueIndex = false; + indexoidlist = RelationGetIndexList(matviewRel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + + /* We're only interested if it is unique and valid. */ + if (index->indisunique && IndexIsValid(index)) + { + int numatts = index->indnatts; + int i; + bool expr = false; + Relation indexRel; + + /* Skip any index on an expression. */ + for (i = 0; i < numatts; i++) + { + if (index->indkey.values[i] == 0) + { + expr = true; + break; + } + } + if (expr) + { + ReleaseSysCache(indexTuple); + continue; + } + + /* Skip partial indexes. */ + indexRel = index_open(index->indexrelid, RowExclusiveLock); + if (indexRel->rd_indpred != NIL) + { + index_close(indexRel, NoLock); + ReleaseSysCache(indexTuple); + continue; + } + /* Hold the locks, since we're about to run DML which needs them. */ + index_close(indexRel, NoLock); + + /* Add quals for all columns from this index. */ + for (i = 0; i < numatts; i++) + { + int attnum = index->indkey.values[i]; + Oid type; + Oid op; + const char *colname; + + /* + * Only include the column once regardless of how many times + * it shows up in how many indexes. + * + * This is also useful later to omit columns which can not + * have changed from the SET clause of the UPDATE statement. + */ + if (usedForQual[attnum - 1]) + continue; + usedForQual[attnum - 1] = true; + + /* + * Actually add the qual, ANDed with any others. + */ + if (foundUniqueIndex) + appendStringInfoString(&querybuf, " AND "); + + colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname)); + appendStringInfo(&querybuf, "y.%s ", colname); + type = attnumTypeId(matviewRel, attnum); + op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr; + mv_GenerateOper(&querybuf, op); + appendStringInfo(&querybuf, " x.%s", colname); + + foundUniqueIndex = true; + } + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundUniqueIndex) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot refresh materialized view \"%s\" concurrently", + matviewname), + errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view."))); + + appendStringInfoString(&querybuf, + " AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*)" + " ORDER BY tid"); + + /* Create the temporary "diff" table. */ + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We have no further use for data from the "full-data" temp table, but we + * must keep it around because its type is reference from the diff table. + */ + + /* Analyze the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "ANALYZE %s", diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + OpenMatViewIncrementalMaintenance(); + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(matviewRel->rd_rel->relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* Deletes must come before inserts; do them first. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DELETE FROM %s WHERE ctid IN " + "(SELECT d.tid FROM %s d " + "WHERE d.tid IS NOT NULL " + "AND (d.y) IS NOT DISTINCT FROM NULL)", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Updates before inserts gives a better chance at HOT updates. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "UPDATE %s x SET ", matviewname); + + { + int i; + bool targetColFound = false; + + for (i = 0; i < tupdesc->natts; i++) + { + const char *colname; + + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (usedForQual[i]) + continue; + + if (targetColFound) + appendStringInfoString(&querybuf, ", "); + targetColFound = true; + + colname = quote_identifier(NameStr((tupdesc->attrs[i])->attname)); + appendStringInfo(&querybuf, "%s = (d.y).%s", colname, colname); + } + + if (targetColFound) + { + appendStringInfo(&querybuf, + " FROM %s d " + "WHERE d.tid IS NOT NULL AND x.ctid = d.tid", + diffname); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + } + } + + /* Inserts go last. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + /* We're done maintaining the materialized view. */ + CloseMatViewIncrementalMaintenance(); + heap_close(tempRel, NoLock); + heap_close(matviewRel, NoLock); + + /* Clean up temp tables. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Close SPI context. */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + +/* + * Swap the physical files of the target and transient tables, then rebuild + * the target's indexes and throw away the transient table. Security context + * swapping is handled by the called function, so it is not needed here. + */ +static void +refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap) +{ + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + RecentXmin, ReadNextMultiXactId()); + + RelationCacheInvalidateEntry(matviewOid); +} + +static void +OpenMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth++; +} + +static void +CloseMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth--; + Assert(matview_maintenance_depth >= 0); +} + +/* + * This should be used to test whether the backend is in a context where it is + * OK to allow DML statements to modify materialized views. We only want to + * allow that for internal code driven by the materialized view definition, + * not for arbitrary user-supplied code. + */ +bool +MatViewIncrementalMaintenanceIsEnabled(void) +{ + return matview_maintenance_depth > 0; +} |