aboutsummaryrefslogtreecommitdiff
path: root/src/backend/partitioning/partdesc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/partitioning/partdesc.c')
-rw-r--r--src/backend/partitioning/partdesc.c185
1 files changed, 166 insertions, 19 deletions
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index 8a4b63aa268..a4494aca7aa 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -14,17 +14,39 @@
#include "postgres.h"
+#include "access/genam.h"
+#include "access/htup_details.h"
+#include "access/table.h"
+#include "catalog/indexing.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "partitioning/partbounds.h"
#include "partitioning/partdesc.h"
+#include "storage/bufmgr.h"
+#include "storage/sinval.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
+#include "utils/fmgroids.h"
+#include "utils/hsearch.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/partcache.h"
#include "utils/syscache.h"
+typedef struct PartitionDirectoryData
+{
+ MemoryContext pdir_mcxt;
+ HTAB *pdir_hash;
+} PartitionDirectoryData;
+
+typedef struct PartitionDirectoryEntry
+{
+ Oid reloid;
+ Relation rel;
+ PartitionDesc pd;
+} PartitionDirectoryEntry;
+
/*
* RelationBuildPartitionDesc
* Form rel's partition descriptor
@@ -47,43 +69,93 @@ RelationBuildPartitionDesc(Relation rel)
MemoryContext oldcxt;
int *mapping;
- /* Get partition oids from pg_inherits */
+ /*
+ * Get partition oids from pg_inherits. This uses a single snapshot to
+ * fetch the list of children, so while more children may be getting
+ * added concurrently, whatever this function returns will be accurate
+ * as of some well-defined point in time.
+ */
inhoids = find_inheritance_children(RelationGetRelid(rel), NoLock);
nparts = list_length(inhoids);
+ /* Allocate arrays for OIDs and boundspecs. */
if (nparts > 0)
{
oids = palloc(nparts * sizeof(Oid));
boundspecs = palloc(nparts * sizeof(PartitionBoundSpec *));
}
- /* Collect bound spec nodes for each partition */
+ /* Collect bound spec nodes for each partition. */
i = 0;
foreach(cell, inhoids)
{
Oid inhrelid = lfirst_oid(cell);
HeapTuple tuple;
- Datum datum;
- bool isnull;
- PartitionBoundSpec *boundspec;
+ PartitionBoundSpec *boundspec = NULL;
+ /* Try fetching the tuple from the catcache, for speed. */
tuple = SearchSysCache1(RELOID, inhrelid);
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", inhrelid);
-
- datum = SysCacheGetAttr(RELOID, tuple,
- Anum_pg_class_relpartbound,
- &isnull);
- if (isnull)
- elog(ERROR, "null relpartbound for relation %u", inhrelid);
- boundspec = stringToNode(TextDatumGetCString(datum));
+ if (HeapTupleIsValid(tuple))
+ {
+ Datum datum;
+ bool isnull;
+
+ datum = SysCacheGetAttr(RELOID, tuple,
+ Anum_pg_class_relpartbound,
+ &isnull);
+ if (!isnull)
+ boundspec = stringToNode(TextDatumGetCString(datum));
+ ReleaseSysCache(tuple);
+ }
+
+ /*
+ * The system cache may be out of date; if so, we may find no pg_class
+ * tuple or an old one where relpartbound is NULL. In that case, try
+ * the table directly. We can't just AcceptInvalidationMessages() and
+ * retry the system cache lookup because it's possible that a
+ * concurrent ATTACH PARTITION operation has removed itself to the
+ * ProcArray but yet added invalidation messages to the shared queue;
+ * InvalidateSystemCaches() would work, but seems excessive.
+ *
+ * Note that this algorithm assumes that PartitionBoundSpec we manage
+ * to fetch is the right one -- so this is only good enough for
+ * concurrent ATTACH PARTITION, not concurrent DETACH PARTITION
+ * or some hypothetical operation that changes the partition bounds.
+ */
+ if (boundspec == NULL)
+ {
+ Relation pg_class;
+ SysScanDesc scan;
+ ScanKeyData key[1];
+ Datum datum;
+ bool isnull;
+
+ pg_class = table_open(RelationRelationId, AccessShareLock);
+ ScanKeyInit(&key[0],
+ Anum_pg_class_oid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(inhrelid));
+ scan = systable_beginscan(pg_class, ClassOidIndexId, true,
+ NULL, 1, key);
+ tuple = systable_getnext(scan);
+ datum = heap_getattr(tuple, Anum_pg_class_relpartbound,
+ RelationGetDescr(pg_class), &isnull);
+ if (!isnull)
+ boundspec = stringToNode(TextDatumGetCString(datum));
+ systable_endscan(scan);
+ table_close(pg_class, AccessShareLock);
+ }
+
+ /* Sanity checks. */
+ if (!boundspec)
+ elog(ERROR, "missing relpartbound for relation %u", inhrelid);
if (!IsA(boundspec, PartitionBoundSpec))
elog(ERROR, "invalid relpartbound for relation %u", inhrelid);
/*
- * Sanity check: If the PartitionBoundSpec says this is the default
- * partition, its OID should correspond to whatever's stored in
- * pg_partitioned_table.partdefid; if not, the catalog is corrupt.
+ * If the PartitionBoundSpec says this is the default partition, its
+ * OID should match pg_partitioned_table.partdefid; if not, the
+ * catalog is corrupt.
*/
if (boundspec->is_default)
{
@@ -95,10 +167,10 @@ RelationBuildPartitionDesc(Relation rel)
inhrelid, partdefid);
}
+ /* Save results. */
oids[i] = inhrelid;
boundspecs[i] = boundspec;
++i;
- ReleaseSysCache(tuple);
}
/* Now build the actual relcache partition descriptor */
@@ -143,7 +215,7 @@ RelationBuildPartitionDesc(Relation rel)
partdesc->oids[index] = oids[i];
/* Record if the partition is a leaf partition */
partdesc->is_leaf[index] =
- (get_rel_relkind(oids[i]) != RELKIND_PARTITIONED_TABLE);
+ (get_rel_relkind(oids[i]) != RELKIND_PARTITIONED_TABLE);
}
MemoryContextSwitchTo(oldcxt);
@@ -151,6 +223,81 @@ RelationBuildPartitionDesc(Relation rel)
}
/*
+ * CreatePartitionDirectory
+ * Create a new partition directory object.
+ */
+PartitionDirectory
+CreatePartitionDirectory(MemoryContext mcxt)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(mcxt);
+ PartitionDirectory pdir;
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(HASHCTL));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(PartitionDirectoryEntry);
+ ctl.hcxt = mcxt;
+
+ pdir = palloc(sizeof(PartitionDirectoryData));
+ pdir->pdir_mcxt = mcxt;
+ pdir->pdir_hash = hash_create("partition directory", 256, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ MemoryContextSwitchTo(oldcontext);
+ return pdir;
+}
+
+/*
+ * PartitionDirectoryLookup
+ * Look up the partition descriptor for a relation in the directory.
+ *
+ * The purpose of this function is to ensure that we get the same
+ * PartitionDesc for each relation every time we look it up. In the
+ * face of current DDL, different PartitionDescs may be constructed with
+ * different views of the catalog state, but any single particular OID
+ * will always get the same PartitionDesc for as long as the same
+ * PartitionDirectory is used.
+ */
+PartitionDesc
+PartitionDirectoryLookup(PartitionDirectory pdir, Relation rel)
+{
+ PartitionDirectoryEntry *pde;
+ Oid relid = RelationGetRelid(rel);
+ bool found;
+
+ pde = hash_search(pdir->pdir_hash, &relid, HASH_ENTER, &found);
+ if (!found)
+ {
+ /*
+ * We must keep a reference count on the relation so that the
+ * PartitionDesc to which we are pointing can't get destroyed.
+ */
+ RelationIncrementReferenceCount(rel);
+ pde->rel = rel;
+ pde->pd = RelationGetPartitionDesc(rel);
+ Assert(pde->pd != NULL);
+ }
+ return pde->pd;
+}
+
+/*
+ * DestroyPartitionDirectory
+ * Destroy a partition directory.
+ *
+ * Release the reference counts we're holding.
+ */
+void
+DestroyPartitionDirectory(PartitionDirectory pdir)
+{
+ HASH_SEQ_STATUS status;
+ PartitionDirectoryEntry *pde;
+
+ hash_seq_init(&status, pdir->pdir_hash);
+ while ((pde = hash_seq_search(&status)) != NULL)
+ RelationDecrementReferenceCount(pde->rel);
+}
+
+/*
* equalPartitionDescs
* Compare two partition descriptors for logical equality
*/