diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/cluster.c | 10 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 53 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 30 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 30 |
4 files changed, 56 insertions, 67 deletions
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index a72b0ad5ff2..98534cceb93 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -444,7 +444,7 @@ check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMOD * might put recently-dead tuples out-of-order in the new table, and there * is little harm in that.) */ - if (!OldIndex->rd_index->indisvalid) + if (!IndexIsValid(OldIndex->rd_index)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot cluster on invalid index \"%s\"", @@ -458,6 +458,11 @@ check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMOD * mark_index_clustered: mark the specified index as the one clustered on * * With indexOid == InvalidOid, will mark all indexes of rel not-clustered. + * + * Note: we do transactional updates of the pg_index rows, which are unsafe + * against concurrent SnapshotNow scans of pg_index. Therefore this is unsafe + * to execute with less than full exclusive lock on the parent table; + * otherwise concurrent executions of RelationGetIndexList could miss indexes. */ void mark_index_clustered(Relation rel, Oid indexOid) @@ -513,6 +518,9 @@ mark_index_clustered(Relation rel, Oid indexOid) } else if (thisIndexOid == indexOid) { + /* this was checked earlier, but let's be real sure */ + if (!IndexIsValid(indexForm)) + elog(ERROR, "cannot cluster on invalid index %u", indexOid); indexForm->indisclustered = true; simple_heap_update(pg_index, &indexTuple->t_self, indexTuple); CatalogUpdateIndexes(pg_index, indexTuple); diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index d32839fd6dc..5848305a351 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -123,6 +123,7 @@ CheckIndexCompatible(Oid oldId, Oid accessMethodId; Oid relationId; HeapTuple tuple; + Form_pg_index indexForm; Form_pg_am accessMethodForm; bool amcanorder; int16 *coloptions; @@ -192,17 +193,22 @@ CheckIndexCompatible(Oid oldId, tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId)); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for index %u", oldId); + indexForm = (Form_pg_index) GETSTRUCT(tuple); - /* We don't assess expressions or predicates; assume incompatibility. */ + /* + * We don't assess expressions or predicates; assume incompatibility. + * Also, if the index is invalid for any reason, treat it as incompatible. + */ if (!(heap_attisnull(tuple, Anum_pg_index_indpred) && - heap_attisnull(tuple, Anum_pg_index_indexprs))) + heap_attisnull(tuple, Anum_pg_index_indexprs) && + IndexIsValid(indexForm))) { ReleaseSysCache(tuple); return false; } /* Any change in operator class or collation breaks compatibility. */ - old_natts = ((Form_pg_index) GETSTRUCT(tuple))->indnatts; + old_natts = indexForm->indnatts; Assert(old_natts == numberOfAttributes); d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull); @@ -319,9 +325,6 @@ DefineIndex(IndexStmt *stmt, LockRelId heaprelid; LOCKTAG heaplocktag; Snapshot snapshot; - Relation pg_index; - HeapTuple indexTuple; - Form_pg_index indexForm; int i; /* @@ -718,23 +721,7 @@ DefineIndex(IndexStmt *stmt, * commit this transaction, any new transactions that open the table must * insert new entries into the index for insertions and non-HOT updates. */ - pg_index = heap_open(IndexRelationId, RowExclusiveLock); - - indexTuple = SearchSysCacheCopy1(INDEXRELID, - ObjectIdGetDatum(indexRelationId)); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "cache lookup failed for index %u", indexRelationId); - indexForm = (Form_pg_index) GETSTRUCT(indexTuple); - - Assert(!indexForm->indisready); - Assert(!indexForm->indisvalid); - - indexForm->indisready = true; - - simple_heap_update(pg_index, &indexTuple->t_self, indexTuple); - CatalogUpdateIndexes(pg_index, indexTuple); - - heap_close(pg_index, RowExclusiveLock); + index_set_state_flags(indexRelationId, INDEX_CREATE_SET_READY); /* we can do away with our snapshot */ PopActiveSnapshot(); @@ -858,23 +845,7 @@ DefineIndex(IndexStmt *stmt, /* * Index can now be marked valid -- update its pg_index entry */ - pg_index = heap_open(IndexRelationId, RowExclusiveLock); - - indexTuple = SearchSysCacheCopy1(INDEXRELID, - ObjectIdGetDatum(indexRelationId)); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "cache lookup failed for index %u", indexRelationId); - indexForm = (Form_pg_index) GETSTRUCT(indexTuple); - - Assert(indexForm->indisready); - Assert(!indexForm->indisvalid); - - indexForm->indisvalid = true; - - simple_heap_update(pg_index, &indexTuple->t_self, indexTuple); - CatalogUpdateIndexes(pg_index, indexTuple); - - heap_close(pg_index, RowExclusiveLock); + index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID); /* * The pg_index update will cause backends (including this one) to update @@ -882,7 +853,7 @@ DefineIndex(IndexStmt *stmt, * relcache inval on the parent table to force replanning of cached plans. * Otherwise existing sessions might fail to use the new index where it * would be useful. (Note that our earlier commits did not create reasons - * to replan; relcache flush on the index itself was sufficient.) + * to replan; so relcache flush on the index itself was sufficient.) */ CacheInvalidateRelcacheByRelid(heaprelid.relId); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d88e3dee3fa..ab5ab940ade 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -743,10 +743,13 @@ RemoveRelations(DropStmt *drop) int flags = 0; LOCKMODE lockmode = AccessExclusiveLock; + /* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */ if (drop->concurrent) { + flags |= PERFORM_DELETION_CONCURRENTLY; lockmode = ShareUpdateExclusiveLock; - if (list_length(drop->objects) > 1) + Assert(drop->removeType == OBJECT_INDEX); + if (list_length(drop->objects) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("DROP INDEX CONCURRENTLY does not support dropping multiple objects"))); @@ -838,19 +841,6 @@ RemoveRelations(DropStmt *drop) add_exact_object_address(&obj, objects); } - /* - * Set options and check further requirements for concurrent drop - */ - if (drop->concurrent) - { - /* - * Confirm that concurrent behaviour is restricted in grammar. - */ - Assert(drop->removeType == OBJECT_INDEX); - - flags |= PERFORM_DELETION_CONCURRENTLY; - } - performMultipleDeletions(objects, drop->behavior, flags); free_object_addresses(objects); @@ -917,7 +907,7 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, * locking the index. index_drop() will need this anyway, and since * regular queries lock tables before their indexes, we risk deadlock if * we do it the other way around. No error if we don't find a pg_index - * entry, though --- the relation may have been droppd. + * entry, though --- the relation may have been dropped. */ if (relkind == RELKIND_INDEX && relOid != oldRelOid) { @@ -4783,6 +4773,8 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) /* * Check that the attribute is not in a primary key + * + * Note: we'll throw error even if the pkey index is not valid. */ /* Loop over all indexes on the relation */ @@ -6317,7 +6309,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, /* * Get the list of index OIDs for the table from the relcache, and look up * each one in the pg_index syscache until we find one marked primary key - * (hopefully there isn't more than one such). + * (hopefully there isn't more than one such). Insist it's valid, too. */ *indexOid = InvalidOid; @@ -6331,7 +6323,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, if (!HeapTupleIsValid(indexTuple)) elog(ERROR, "cache lookup failed for index %u", indexoid); indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - if (indexStruct->indisprimary) + if (indexStruct->indisprimary && IndexIsValid(indexStruct)) { /* * Refuse to use a deferrable primary key. This is per SQL spec, @@ -6429,10 +6421,12 @@ transformFkeyCheckAttrs(Relation pkrel, /* * Must have the right number of columns; must be unique and not a - * partial index; forget it if there are any expressions, too + * partial index; forget it if there are any expressions, too. Invalid + * indexes are out as well. */ if (indexStruct->indnatts == numattrs && indexStruct->indisunique && + IndexIsValid(indexStruct) && heap_attisnull(indexTuple, Anum_pg_index_indpred) && heap_attisnull(indexTuple, Anum_pg_index_indexprs)) { diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 710c2afc9f3..9b5b79fdcc5 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1096,9 +1096,16 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound) /* - * Open all the indexes of the given relation, obtaining the specified kind - * of lock on each. Return an array of Relation pointers for the indexes - * into *Irel, and the number of indexes into *nindexes. + * Open all the vacuumable indexes of the given relation, obtaining the + * specified kind of lock on each. Return an array of Relation pointers for + * the indexes into *Irel, and the number of indexes into *nindexes. + * + * We consider an index vacuumable if it is marked insertable (IndexIsReady). + * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in + * execution, and what we have is too corrupt to be processable. We will + * vacuum even if the index isn't indisvalid; this is important because in a + * unique index, uniqueness checks will be performed anyway and had better not + * hit dangling index pointers. */ void vac_open_indexes(Relation relation, LOCKMODE lockmode, @@ -1112,21 +1119,30 @@ vac_open_indexes(Relation relation, LOCKMODE lockmode, indexoidlist = RelationGetIndexList(relation); - *nindexes = list_length(indexoidlist); + /* allocate enough memory for all indexes */ + i = list_length(indexoidlist); - if (*nindexes > 0) - *Irel = (Relation *) palloc(*nindexes * sizeof(Relation)); + if (i > 0) + *Irel = (Relation *) palloc(i * sizeof(Relation)); else *Irel = NULL; + /* collect just the ready indexes */ i = 0; foreach(indexoidscan, indexoidlist) { Oid indexoid = lfirst_oid(indexoidscan); + Relation indrel; - (*Irel)[i++] = index_open(indexoid, lockmode); + indrel = index_open(indexoid, lockmode); + if (IndexIsReady(indrel->rd_index)) + (*Irel)[i++] = indrel; + else + index_close(indrel, lockmode); } + *nindexes = i; + list_free(indexoidlist); } |