diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 819 |
1 files changed, 17 insertions, 802 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 82dabe636f8..2cc736aecca 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -657,11 +657,6 @@ static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, const char *compression); static char GetAttributeStorage(Oid atttypid, const char *storagemode); -static void ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, - Relation rel, PartitionCmd *cmd, - AlterTableUtilityContext *context); -static void ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, - PartitionCmd *cmd, AlterTableUtilityContext *context); /* ---------------------------------------------------------------- * DefineRelation @@ -4691,14 +4686,6 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = ShareUpdateExclusiveLock; break; - case AT_SplitPartition: - cmd_lockmode = AccessExclusiveLock; - break; - - case AT_MergePartitions: - cmd_lockmode = AccessExclusiveLock; - break; - case AT_CheckNotNull: /* @@ -5125,16 +5112,6 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, /* No command-specific prep needed */ pass = AT_PASS_MISC; break; - case AT_SplitPartition: - ATSimplePermissions(cmd->subtype, rel, ATT_TABLE); - /* No command-specific prep needed */ - pass = AT_PASS_MISC; - break; - case AT_MergePartitions: - ATSimplePermissions(cmd->subtype, rel, ATT_TABLE); - /* No command-specific prep needed */ - pass = AT_PASS_MISC; - break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -5531,22 +5508,6 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, case AT_DetachPartitionFinalize: address = ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name); break; - case AT_SplitPartition: - cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, - cur_pass, context); - Assert(cmd != NULL); - Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); - ATExecSplitPartition(wqueue, tab, rel, (PartitionCmd *) cmd->def, - context); - break; - case AT_MergePartitions: - cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, - cur_pass, context); - Assert(cmd != NULL); - Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); - ATExecMergePartitions(wqueue, tab, rel, (PartitionCmd *) cmd->def, - context); - break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -6535,10 +6496,6 @@ alter_table_type_to_string(AlterTableType cmdtype) return "DETACH PARTITION"; case AT_DetachPartitionFinalize: return "DETACH PARTITION ... FINALIZE"; - case AT_SplitPartition: - return "SPLIT PARTITION"; - case AT_MergePartitions: - return "MERGE PARTITIONS"; case AT_AddIdentity: return "ALTER COLUMN ... ADD IDENTITY"; case AT_SetIdentity: @@ -18337,37 +18294,6 @@ QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, } /* - * attachPartitionTable: attach a new partition to the partitioned table - * - * wqueue: the ALTER TABLE work queue; can be NULL when not running as part - * of an ALTER TABLE sequence. - * rel: partitioned relation; - * attachrel: relation of attached partition; - * bound: bounds of attached relation. - */ -static void -attachPartitionTable(List **wqueue, Relation rel, Relation attachrel, PartitionBoundSpec *bound) -{ - /* OK to create inheritance. Rest of the checks performed there */ - CreateInheritance(attachrel, rel, true); - - /* Update the pg_class entry. */ - StorePartitionBound(attachrel, rel, bound); - - /* Ensure there exists a correct set of indexes in the partition. */ - AttachPartitionEnsureIndexes(wqueue, rel, attachrel); - - /* and triggers */ - CloneRowTriggersToPartition(rel, attachrel); - - /* - * Clone foreign key constraints. Callee is responsible for setting up - * for phase 3 constraint verification. - */ - CloneForeignKeyConstraints(wqueue, rel, attachrel); -} - -/* * ALTER TABLE <name> ATTACH PARTITION <partition-name> FOR VALUES * * Return the address of the newly attached partition. @@ -18569,8 +18495,23 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, check_new_partition_bound(RelationGetRelationName(attachrel), rel, cmd->bound, pstate); - /* Attach a new partition to the partitioned table. */ - attachPartitionTable(wqueue, rel, attachrel, cmd->bound); + /* OK to create inheritance. Rest of the checks performed there */ + CreateInheritance(attachrel, rel, true); + + /* Update the pg_class entry. */ + StorePartitionBound(attachrel, rel, cmd->bound); + + /* Ensure there exists a correct set of indexes in the partition. */ + AttachPartitionEnsureIndexes(wqueue, rel, attachrel); + + /* and triggers */ + CloneRowTriggersToPartition(rel, attachrel); + + /* + * Clone foreign key constraints. Callee is responsible for setting up + * for phase 3 constraint verification. + */ + CloneForeignKeyConstraints(wqueue, rel, attachrel); /* * Generate partition constraint from the partition bound specification. @@ -20085,729 +20026,3 @@ GetAttributeStorage(Oid atttypid, const char *storagemode) return cstorage; } - -/* - * Struct with context of new partition for inserting rows from split partition - */ -typedef struct SplitPartitionContext -{ - ExprState *partqualstate; /* expression for checking slot for partition - * (NULL for DEFAULT partition) */ - BulkInsertState bistate; /* state of bulk inserts for partition */ - TupleTableSlot *dstslot; /* slot for inserting row into partition */ - Relation partRel; /* relation for partition */ -} SplitPartitionContext; - - -/* - * createSplitPartitionContext: create context for partition and fill it - */ -static SplitPartitionContext * -createSplitPartitionContext(Relation partRel) -{ - SplitPartitionContext *pc; - - pc = (SplitPartitionContext *) palloc0(sizeof(SplitPartitionContext)); - pc->partRel = partRel; - - /* - * Prepare a BulkInsertState for table_tuple_insert. The FSM is empty, so - * don't bother using it. - */ - pc->bistate = GetBulkInsertState(); - - /* Create tuple slot for new partition. */ - pc->dstslot = MakeSingleTupleTableSlot(RelationGetDescr(pc->partRel), - table_slot_callbacks(pc->partRel)); - ExecStoreAllNullTuple(pc->dstslot); - - return pc; -} - -/* - * deleteSplitPartitionContext: delete context for partition - */ -static void -deleteSplitPartitionContext(SplitPartitionContext *pc, int ti_options) -{ - ExecDropSingleTupleTableSlot(pc->dstslot); - FreeBulkInsertState(pc->bistate); - - table_finish_bulk_insert(pc->partRel, ti_options); - - pfree(pc); -} - -/* - * moveSplitTableRows: scan split partition (splitRel) of partitioned table - * (rel) and move rows into new partitions. - * - * New partitions description: - * partlist: list of pointers to SinglePartitionSpec structures. - * newPartRels: list of Relations. - * defaultPartOid: oid of DEFAULT partition, for table rel. - */ -static void -moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPartRels, Oid defaultPartOid) -{ - /* The FSM is empty, so don't bother using it. */ - int ti_options = TABLE_INSERT_SKIP_FSM; - CommandId mycid; - EState *estate; - ListCell *listptr, - *listptr2; - TupleTableSlot *srcslot; - ExprContext *econtext; - TableScanDesc scan; - Snapshot snapshot; - MemoryContext oldCxt; - List *partContexts = NIL; - TupleConversionMap *tuple_map; - SplitPartitionContext *defaultPartCtx = NULL, - *pc; - bool isOldDefaultPart = false; - - mycid = GetCurrentCommandId(true); - - estate = CreateExecutorState(); - - forboth(listptr, partlist, listptr2, newPartRels) - { - SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); - - pc = createSplitPartitionContext((Relation) lfirst(listptr2)); - - if (sps->bound->is_default) - { - /* We should not create constraint for detached DEFAULT partition. */ - defaultPartCtx = pc; - } - else - { - List *partConstraint; - - /* Build expression execution states for partition check quals. */ - partConstraint = get_qual_from_partbound(rel, sps->bound); - partConstraint = - (List *) eval_const_expressions(NULL, - (Node *) partConstraint); - /* Make boolean expression for ExecCheck(). */ - partConstraint = list_make1(make_ands_explicit(partConstraint)); - - /* - * Map the vars in the constraint expression from rel's attnos to - * splitRel's. - */ - partConstraint = map_partition_varattnos(partConstraint, - 1, splitRel, rel); - - pc->partqualstate = - ExecPrepareExpr((Expr *) linitial(partConstraint), estate); - Assert(pc->partqualstate != NULL); - } - - /* Store partition context into list. */ - partContexts = lappend(partContexts, pc); - } - - /* - * Create partition context for DEFAULT partition. We can insert values - * into this partition in case spaces with values between new partitions. - */ - if (!defaultPartCtx && OidIsValid(defaultPartOid)) - { - /* Indicate that we allocate context for old DEFAULT partition */ - isOldDefaultPart = true; - defaultPartCtx = createSplitPartitionContext(table_open(defaultPartOid, AccessExclusiveLock)); - } - - econtext = GetPerTupleExprContext(estate); - - /* Create necessary tuple slot. */ - srcslot = MakeSingleTupleTableSlot(RelationGetDescr(splitRel), - table_slot_callbacks(splitRel)); - - /* - * Map computing for moving attributes of split partition to new partition - * (for first new partition, but other new partitions can use the same - * map). - */ - pc = (SplitPartitionContext *) lfirst(list_head(partContexts)); - tuple_map = convert_tuples_by_name(RelationGetDescr(splitRel), - RelationGetDescr(pc->partRel)); - - /* Scan through the rows. */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); - scan = table_beginscan(splitRel, snapshot, 0, NULL); - - /* - * Switch to per-tuple memory context and reset it for each tuple - * produced, so we don't leak memory. - */ - oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot)) - { - bool found = false; - TupleTableSlot *insertslot; - - /* Extract data from old tuple. */ - slot_getallattrs(srcslot); - - econtext->ecxt_scantuple = srcslot; - - /* Search partition for current slot srcslot. */ - foreach(listptr, partContexts) - { - pc = (SplitPartitionContext *) lfirst(listptr); - - if (pc->partqualstate /* skip DEFAULT partition */ && - ExecCheck(pc->partqualstate, econtext)) - { - found = true; - break; - } - ResetExprContext(econtext); - } - if (!found) - { - /* Use DEFAULT partition if it exists. */ - if (defaultPartCtx) - pc = defaultPartCtx; - else - ereport(ERROR, - (errcode(ERRCODE_CHECK_VIOLATION), - errmsg("can not find partition for split partition row"), - errtable(splitRel))); - } - - if (tuple_map) - { - /* Need to use map to copy attributes. */ - insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, pc->dstslot); - } - else - { - /* Copy attributes directly. */ - insertslot = pc->dstslot; - - ExecClearTuple(insertslot); - - memcpy(insertslot->tts_values, srcslot->tts_values, - sizeof(Datum) * srcslot->tts_nvalid); - memcpy(insertslot->tts_isnull, srcslot->tts_isnull, - sizeof(bool) * srcslot->tts_nvalid); - - ExecStoreVirtualTuple(insertslot); - } - - /* Write the tuple out to the new relation. */ - table_tuple_insert(pc->partRel, insertslot, mycid, - ti_options, pc->bistate); - - ResetExprContext(econtext); - - CHECK_FOR_INTERRUPTS(); - } - - MemoryContextSwitchTo(oldCxt); - - table_endscan(scan); - UnregisterSnapshot(snapshot); - - if (tuple_map) - free_conversion_map(tuple_map); - - ExecDropSingleTupleTableSlot(srcslot); - - FreeExecutorState(estate); - - foreach(listptr, partContexts) - deleteSplitPartitionContext((SplitPartitionContext *) lfirst(listptr), ti_options); - - /* Need to close table and free buffers for DEFAULT partition. */ - if (isOldDefaultPart) - { - Relation defaultPartRel = defaultPartCtx->partRel; - - deleteSplitPartitionContext(defaultPartCtx, ti_options); - /* Keep the lock until commit. */ - table_close(defaultPartRel, NoLock); - } -} - -/* - * createPartitionTable: create table for a new partition with given name - * (newPartName) like table (modelRel) - * - * Emulates command: CREATE [TEMP] TABLE <newPartName> (LIKE <modelRel's name> - * INCLUDING ALL EXCLUDING INDEXES EXCLUDING IDENTITY EXCLUDING STATISTICS) - * - * Also, this function sets the new partition access method same as parent - * table access methods (similarly to CREATE TABLE ... PARTITION OF). It - * checks that parent and child tables have compatible persistence. - * - * Function returns the created relation (locked in AccessExclusiveLock mode). - */ -static Relation -createPartitionTable(RangeVar *newPartName, Relation modelRel, - AlterTableUtilityContext *context) -{ - CreateStmt *createStmt; - TableLikeClause *tlc; - PlannedStmt *wrapper; - Relation newRel; - - /* If existing rel is temp, it must belong to this session */ - if (modelRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP && - !modelRel->rd_islocaltemp) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot create as partition of temporary relation of another session"))); - - /* New partition should have the same persistence as modelRel */ - newPartName->relpersistence = modelRel->rd_rel->relpersistence; - - createStmt = makeNode(CreateStmt); - createStmt->relation = newPartName; - createStmt->tableElts = NIL; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = NIL; - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = get_tablespace_name(modelRel->rd_rel->reltablespace); - createStmt->if_not_exists = false; - createStmt->accessMethod = get_am_name(modelRel->rd_rel->relam); - - tlc = makeNode(TableLikeClause); - tlc->relation = makeRangeVar(get_namespace_name(RelationGetNamespace(modelRel)), - RelationGetRelationName(modelRel), -1); - - /* - * Indexes will be inherited on "attach new partitions" stage, after data - * moving. We also don't copy the extended statistics for consistency - * with CREATE TABLE PARTITION OF. - */ - tlc->options = CREATE_TABLE_LIKE_ALL & - ~(CREATE_TABLE_LIKE_INDEXES | CREATE_TABLE_LIKE_IDENTITY | CREATE_TABLE_LIKE_STATISTICS); - tlc->relationOid = InvalidOid; - tlc->newRelationOid = InvalidOid; - createStmt->tableElts = lappend(createStmt->tableElts, tlc); - - /* Need to make a wrapper PlannedStmt. */ - wrapper = makeNode(PlannedStmt); - wrapper->commandType = CMD_UTILITY; - wrapper->canSetTag = false; - wrapper->utilityStmt = (Node *) createStmt; - wrapper->stmt_location = context->pstmt->stmt_location; - wrapper->stmt_len = context->pstmt->stmt_len; - - ProcessUtility(wrapper, - context->queryString, - false, - PROCESS_UTILITY_SUBCOMMAND, - NULL, - NULL, - None_Receiver, - NULL); - - /* - * Open the new partition with no lock, because we already have - * AccessExclusiveLock placed there after creation. - */ - newRel = table_open(tlc->newRelationOid, NoLock); - - /* - * We intended to create the partition with the same persistence as the - * parent table, but we still need to recheck because that might be - * affected by the search_path. If the parent is permanent, so must be - * all of its partitions. - */ - if (modelRel->rd_rel->relpersistence != RELPERSISTENCE_TEMP && - newRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot create a temporary relation as partition of permanent relation \"%s\"", - RelationGetRelationName(modelRel)))); - - /* Permanent rels cannot be partitions belonging to temporary parent */ - if (newRel->rd_rel->relpersistence != RELPERSISTENCE_TEMP && - modelRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot create a permanent relation as partition of temporary relation \"%s\"", - RelationGetRelationName(modelRel)))); - - return newRel; -} - -/* - * ALTER TABLE <name> SPLIT PARTITION <partition-name> INTO <partition-list> - */ -static void -ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel, - PartitionCmd *cmd, AlterTableUtilityContext *context) -{ - Relation splitRel; - Oid splitRelOid; - char relname[NAMEDATALEN]; - Oid namespaceId; - ListCell *listptr, - *listptr2; - bool isSameName = false; - char tmpRelName[NAMEDATALEN]; - List *newPartRels = NIL; - ObjectAddress object; - Oid defaultPartOid; - - defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true)); - - /* - * We are going to detach and remove this partition: need to use exclusive - * lock for preventing DML-queries to the partition. - */ - splitRel = table_openrv(cmd->name, AccessExclusiveLock); - - splitRelOid = RelationGetRelid(splitRel); - - /* Check descriptions of new partitions. */ - foreach(listptr, cmd->partlist) - { - Oid existing_relid; - SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); - - strlcpy(relname, sps->name->relname, NAMEDATALEN); - - /* - * Look up the namespace in which we are supposed to create the - * partition, check we have permission to create there, lock it - * against concurrent drop, and mark stmt->relation as - * RELPERSISTENCE_TEMP if a temporary namespace is selected. - */ - sps->name->relpersistence = rel->rd_rel->relpersistence; - namespaceId = - RangeVarGetAndCheckCreationNamespace(sps->name, NoLock, NULL); - - /* - * This would fail later on anyway if the relation already exists. But - * by catching it here we can emit a nicer error message. - */ - existing_relid = get_relname_relid(relname, namespaceId); - if (existing_relid == splitRelOid && !isSameName) - /* One new partition can have the same name as split partition. */ - isSameName = true; - else if (existing_relid != InvalidOid) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists", relname))); - } - - /* Detach split partition. */ - RemoveInheritance(splitRel, rel, false); - /* Do the final part of detaching. */ - DetachPartitionFinalize(rel, splitRel, false, defaultPartOid); - - /* - * If new partition has the same name as split partition then we should - * rename split partition for reusing name. - */ - if (isSameName) - { - /* - * We must bump the command counter to make the split partition tuple - * visible for renaming. - */ - CommandCounterIncrement(); - /* Rename partition. */ - sprintf(tmpRelName, "split-%u-%X-tmp", RelationGetRelid(rel), MyProcPid); - RenameRelationInternal(splitRelOid, tmpRelName, false, false); - - /* - * We must bump the command counter to make the split partition tuple - * visible after renaming. - */ - CommandCounterIncrement(); - } - - /* Create new partitions (like split partition), without indexes. */ - foreach(listptr, cmd->partlist) - { - SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); - Relation newPartRel; - - newPartRel = createPartitionTable(sps->name, rel, context); - newPartRels = lappend(newPartRels, newPartRel); - } - - /* Copy data from split partition to new partitions. */ - moveSplitTableRows(rel, splitRel, cmd->partlist, newPartRels, defaultPartOid); - /* Keep the lock until commit. */ - table_close(splitRel, NoLock); - - /* Attach new partitions to partitioned table. */ - forboth(listptr, cmd->partlist, listptr2, newPartRels) - { - SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr); - Relation newPartRel = (Relation) lfirst(listptr2); - - /* - * wqueue = NULL: verification for each cloned constraint is not - * needed. - */ - attachPartitionTable(NULL, rel, newPartRel, sps->bound); - /* Keep the lock until commit. */ - table_close(newPartRel, NoLock); - } - - /* Drop split partition. */ - object.classId = RelationRelationId; - object.objectId = splitRelOid; - object.objectSubId = 0; - /* Probably DROP_CASCADE is not needed. */ - performDeletion(&object, DROP_RESTRICT, 0); -} - -/* - * moveMergedTablesRows: scan partitions to be merged (mergingPartitionsList) - * of the partitioned table (rel) and move rows into the new partition - * (newPartRel). - */ -static void -moveMergedTablesRows(Relation rel, List *mergingPartitionsList, - Relation newPartRel) -{ - CommandId mycid; - - /* The FSM is empty, so don't bother using it. */ - int ti_options = TABLE_INSERT_SKIP_FSM; - ListCell *listptr; - BulkInsertState bistate; /* state of bulk inserts for partition */ - TupleTableSlot *dstslot; - - mycid = GetCurrentCommandId(true); - - /* Prepare a BulkInsertState for table_tuple_insert. */ - bistate = GetBulkInsertState(); - - /* Create necessary tuple slot. */ - dstslot = MakeSingleTupleTableSlot(RelationGetDescr(newPartRel), - table_slot_callbacks(newPartRel)); - ExecStoreAllNullTuple(dstslot); - - foreach(listptr, mergingPartitionsList) - { - Relation mergingPartition = (Relation) lfirst(listptr); - TupleTableSlot *srcslot; - TupleConversionMap *tuple_map; - TableScanDesc scan; - Snapshot snapshot; - - /* Create tuple slot for new partition. */ - srcslot = MakeSingleTupleTableSlot(RelationGetDescr(mergingPartition), - table_slot_callbacks(mergingPartition)); - - /* - * Map computing for moving attributes of merged partition to new - * partition. - */ - tuple_map = convert_tuples_by_name(RelationGetDescr(mergingPartition), - RelationGetDescr(newPartRel)); - - /* Scan through the rows. */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); - scan = table_beginscan(mergingPartition, snapshot, 0, NULL); - - while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot)) - { - TupleTableSlot *insertslot; - - /* Extract data from old tuple. */ - slot_getallattrs(srcslot); - - if (tuple_map) - { - /* Need to use map to copy attributes. */ - insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, dstslot); - } - else - { - /* Copy attributes directly. */ - insertslot = dstslot; - - ExecClearTuple(insertslot); - - memcpy(insertslot->tts_values, srcslot->tts_values, - sizeof(Datum) * srcslot->tts_nvalid); - memcpy(insertslot->tts_isnull, srcslot->tts_isnull, - sizeof(bool) * srcslot->tts_nvalid); - - ExecStoreVirtualTuple(insertslot); - } - - /* Write the tuple out to the new relation. */ - table_tuple_insert(newPartRel, insertslot, mycid, - ti_options, bistate); - - CHECK_FOR_INTERRUPTS(); - } - - table_endscan(scan); - UnregisterSnapshot(snapshot); - - if (tuple_map) - free_conversion_map(tuple_map); - - ExecDropSingleTupleTableSlot(srcslot); - } - - ExecDropSingleTupleTableSlot(dstslot); - FreeBulkInsertState(bistate); - - table_finish_bulk_insert(newPartRel, ti_options); -} - -/* - * ALTER TABLE <name> MERGE PARTITIONS <partition-list> INTO <partition-name> - */ -static void -ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, - PartitionCmd *cmd, AlterTableUtilityContext *context) -{ - Relation newPartRel; - ListCell *listptr; - List *mergingPartitionsList = NIL; - Oid defaultPartOid; - Oid namespaceId; - Oid existingRelid; - - /* - * Lock all merged partitions, check them and create list with partitions - * contexts. - */ - foreach(listptr, cmd->partlist) - { - RangeVar *name = (RangeVar *) lfirst(listptr); - Relation mergingPartition; - - /* - * We are going to detach and remove this partition: need to use - * exclusive lock for preventing DML-queries to the partition. - */ - mergingPartition = table_openrv(name, AccessExclusiveLock); - - /* Store a next merging partition into the list. */ - mergingPartitionsList = lappend(mergingPartitionsList, - mergingPartition); - } - - /* - * Look up the namespace in which we are supposed to create the partition, - * check we have permission to create there, lock it against concurrent - * drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary - * namespace is selected. - */ - cmd->name->relpersistence = rel->rd_rel->relpersistence; - namespaceId = - RangeVarGetAndCheckCreationNamespace(cmd->name, NoLock, NULL); - - /* - * Check if this name is already taken. This helps us to detect the - * situation when one of the merging partitions has the same name as the - * new partition. Otherwise, this would fail later on anyway but catching - * this here allows us to emit a nicer error message. - */ - existingRelid = get_relname_relid(cmd->name->relname, namespaceId); - - if (OidIsValid(existingRelid)) - { - Relation sameNamePartition = NULL; - - foreach_ptr(RelationData, mergingPartition, mergingPartitionsList) - { - if (RelationGetRelid(mergingPartition) == existingRelid) - { - sameNamePartition = mergingPartition; - break; - } - } - - if (sameNamePartition) - { - /* - * The new partition has the same name as one of merging - * partitions. - */ - char tmpRelName[NAMEDATALEN]; - - /* Generate temporary name. */ - sprintf(tmpRelName, "merge-%u-%X-tmp", RelationGetRelid(rel), MyProcPid); - - /* - * Rename the existing partition with a temporary name, leaving it - * free for the new partition. We don't need to care about this - * in the future because we're going to eventually drop the - * existing partition anyway. - */ - RenameRelationInternal(RelationGetRelid(sameNamePartition), - tmpRelName, false, false); - - /* - * We must bump the command counter to make the new partition - * tuple visible for rename. - */ - CommandCounterIncrement(); - } - else - { - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists", cmd->name->relname))); - } - } - - /* Detach all merged partitions. */ - defaultPartOid = - get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true)); - foreach(listptr, mergingPartitionsList) - { - Relation mergingPartition = (Relation) lfirst(listptr); - - /* Remove the pg_inherits row first. */ - RemoveInheritance(mergingPartition, rel, false); - /* Do the final part of detaching. */ - DetachPartitionFinalize(rel, mergingPartition, false, defaultPartOid); - } - - /* Create table for new partition, use partitioned table as model. */ - newPartRel = createPartitionTable(cmd->name, rel, context); - - /* Copy data from merged partitions to new partition. */ - moveMergedTablesRows(rel, mergingPartitionsList, newPartRel); - - /* Drop the current partitions before attaching the new one. */ - foreach(listptr, mergingPartitionsList) - { - ObjectAddress object; - Relation mergingPartition = (Relation) lfirst(listptr); - - /* Get relation id before table_close() call. */ - object.objectId = RelationGetRelid(mergingPartition); - object.classId = RelationRelationId; - object.objectSubId = 0; - - /* Keep the lock until commit. */ - table_close(mergingPartition, NoLock); - - performDeletion(&object, DROP_RESTRICT, 0); - } - list_free(mergingPartitionsList); - - /* - * Attach a new partition to the partitioned table. wqueue = NULL: - * verification for each cloned constraint is not needed. - */ - attachPartitionTable(NULL, rel, newPartRel, cmd->bound); - - /* Keep the lock until commit. */ - table_close(newPartRel, NoLock); -} |