diff options
Diffstat (limited to 'src/backend/commands/cluster.c')
-rw-r--r-- | src/backend/commands/cluster.c | 90 |
1 files changed, 59 insertions, 31 deletions
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index b4bf08006eb..ffd2aff2364 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.121 2004/05/05 04:48:45 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.122 2004/05/06 16:10:57 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include "miscadmin.h" #include "utils/acl.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" #include "utils/relcache.h" @@ -48,7 +49,6 @@ typedef struct IndexInfo *indexInfo; Oid accessMethodOID; Oid *classOID; - bool isclustered; } IndexAttrs; /* @@ -246,8 +246,7 @@ cluster(ClusterStmt *stmt) static void cluster_rel(RelToCluster *rvtc, bool recheck) { - Relation OldHeap, - OldIndex; + Relation OldHeap; /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); @@ -300,18 +299,41 @@ cluster_rel(RelToCluster *rvtc, bool recheck) /* * We grab exclusive access to the target rel and index for the * duration of the transaction. (This is redundant for the single- - * transaction case, since cluster() already did it.) + * transaction case, since cluster() already did it.) The index + * lock is taken inside check_index_is_clusterable. */ OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock); - OldIndex = index_open(rvtc->indexOid); + /* Check index is valid to cluster on */ + check_index_is_clusterable(OldHeap, rvtc->indexOid); + + /* rebuild_relation does all the dirty work */ + rebuild_relation(OldHeap, rvtc->indexOid); + + /* NB: rebuild_relation does heap_close() on OldHeap */ +} + +/* + * Verify that the specified index is a legitimate index to cluster on + * + * Side effect: obtains exclusive lock on the index. The caller should + * already have exclusive lock on the table, so the index lock is likely + * redundant, but it seems best to grab it anyway to ensure the index + * definition can't change under us. + */ +void +check_index_is_clusterable(Relation OldHeap, Oid indexOid) +{ + Relation OldIndex; + + OldIndex = index_open(indexOid); LockRelation(OldIndex, AccessExclusiveLock); /* * Check that index is in fact an index on the given relation */ if (OldIndex->rd_index == NULL || - OldIndex->rd_index->indrelid != rvtc->tableOid) + OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not an index for table \"%s\"", @@ -386,11 +408,6 @@ cluster_rel(RelToCluster *rvtc, bool recheck) /* Drop relcache refcnt on OldIndex, but keep lock */ index_close(OldIndex); - - /* rebuild_relation does all the dirty work */ - rebuild_relation(OldHeap, rvtc->indexOid); - - /* NB: rebuild_relation does heap_close() on OldHeap */ } /* @@ -410,13 +427,14 @@ void rebuild_relation(Relation OldHeap, Oid indexOid) { Oid tableOid = RelationGetRelid(OldHeap); + Oid oldClusterIndex; List *indexes; Oid OIDNewHeap; char NewHeapName[NAMEDATALEN]; ObjectAddress object; /* Save the information about all indexes on the relation. */ - indexes = get_indexattr_list(OldHeap, indexOid); + indexes = get_indexattr_list(OldHeap, &oldClusterIndex); /* Close relcache entry, but keep lock until transaction commit */ heap_close(OldHeap, NoLock); @@ -469,7 +487,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid) * Recreate each index on the relation. We do not need * CommandCounterIncrement() because rebuild_indexes does it. */ - rebuild_indexes(tableOid, indexes); + rebuild_indexes(tableOid, indexes, + (OidIsValid(indexOid) ? indexOid : oldClusterIndex)); } /* @@ -572,14 +591,18 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) /* * Get the necessary info about the indexes of the relation and - * return a list of IndexAttrs structures. + * return a list of IndexAttrs structures. Also, *OldClusterIndex + * is set to the OID of the existing clustered index, or InvalidOid + * if there is none. */ List * -get_indexattr_list(Relation OldHeap, Oid OldIndex) +get_indexattr_list(Relation OldHeap, Oid *OldClusterIndex) { List *indexes = NIL; List *indlist; + *OldClusterIndex = InvalidOid; + /* Ask the relcache to produce a list of the indexes of the old rel */ foreach(indlist, RelationGetIndexList(OldHeap)) { @@ -598,16 +621,12 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); memcpy(attrs->classOID, oldIndex->rd_index->indclass, sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); - /* We adjust the isclustered attribute to correct new state */ - attrs->isclustered = (indexOID == OldIndex); + if (oldIndex->rd_index->indisclustered) + *OldClusterIndex = indexOID; index_close(oldIndex); - /* - * Cons the gathered data into the list. We do not care about - * ordering, and this is more efficient than append. - */ - indexes = lcons(attrs, indexes); + indexes = lappend(indexes, attrs); } return indexes; @@ -616,17 +635,22 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) /* * Create new indexes and swap the filenodes with old indexes. Then drop * the new index (carrying the old index filenode along). + * + * OIDClusterIndex is the OID of the index to be marked as clustered, or + * InvalidOid if none should be marked clustered. */ void -rebuild_indexes(Oid OIDOldHeap, List *indexes) +rebuild_indexes(Oid OIDOldHeap, List *indexes, Oid OIDClusterIndex) { List *elem; foreach(elem, indexes) { IndexAttrs *attrs = (IndexAttrs *) lfirst(elem); + Oid oldIndexOID = attrs->indexOID; Oid newIndexOID; char newIndexName[NAMEDATALEN]; + bool isclustered; ObjectAddress object; Form_pg_index index; HeapTuple tuple; @@ -634,7 +658,7 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) /* Create the new index under a temporary name */ snprintf(newIndexName, sizeof(newIndexName), - "pg_temp_%u", attrs->indexOID); + "pg_temp_%u", oldIndexOID); /* * The new index will have primary and constraint status set to @@ -654,26 +678,30 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) CommandCounterIncrement(); /* Swap the filenodes. */ - swap_relfilenodes(attrs->indexOID, newIndexOID); + swap_relfilenodes(oldIndexOID, newIndexOID); CommandCounterIncrement(); /* * Make sure that indisclustered is correct: it should be set only - * for the index we just clustered on. + * for the index specified by the caller. */ + isclustered = (oldIndexOID == OIDClusterIndex); + pg_index = heap_openr(IndexRelationName, RowExclusiveLock); tuple = SearchSysCacheCopy(INDEXRELID, - ObjectIdGetDatum(attrs->indexOID), + ObjectIdGetDatum(oldIndexOID), 0, 0, 0); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for index %u", attrs->indexOID); + elog(ERROR, "cache lookup failed for index %u", oldIndexOID); index = (Form_pg_index) GETSTRUCT(tuple); - if (index->indisclustered != attrs->isclustered) + if (index->indisclustered != isclustered) { - index->indisclustered = attrs->isclustered; + index->indisclustered = isclustered; simple_heap_update(pg_index, &tuple->t_self, tuple); CatalogUpdateIndexes(pg_index, tuple); + /* Ensure we see the update in the index's relcache entry */ + CacheInvalidateRelcacheByRelid(oldIndexOID); } heap_freetuple(tuple); heap_close(pg_index, RowExclusiveLock); |