diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 354 |
1 files changed, 337 insertions, 17 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 317b89f67c3..05e86de8ebc 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -670,6 +670,8 @@ static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, const char *compression); static char GetAttributeStorage(Oid atttypid, const char *storagemode); +static void ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, + PartitionCmd *cmd, AlterTableUtilityContext *context); /* ---------------------------------------------------------------- * DefineRelation @@ -4738,6 +4740,10 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = ShareUpdateExclusiveLock; break; + case AT_MergePartitions: + cmd_lockmode = AccessExclusiveLock; + break; + default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -5157,6 +5163,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, /* No command-specific prep needed */ pass = AT_PASS_MISC; break; + case AT_MergePartitions: + ATSimplePermissions(cmd->subtype, rel, ATT_TABLE); + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -5554,6 +5565,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, case AT_DetachPartitionFinalize: address = ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name); break; + case AT_MergePartitions: + cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, + cur_pass, context); + Assert(cmd != NULL); + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + ATExecMergePartitions(wqueue, tab, rel, (PartitionCmd *) cmd->def, + context); + break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -6548,6 +6567,8 @@ alter_table_type_to_string(AlterTableType cmdtype) return "DETACH PARTITION"; case AT_DetachPartitionFinalize: return "DETACH PARTITION ... FINALIZE"; + case AT_MergePartitions: + return "MERGE PARTITIONS"; case AT_AddIdentity: return "ALTER COLUMN ... ADD IDENTITY"; case AT_SetIdentity: @@ -19043,6 +19064,37 @@ QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, } /* + * attachPartitionTable: attach new partition to partitioned table + * + * wqueue: the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * rel: partitioned relation; + * attachrel: relation of attached partition; + * bound: bounds of attached relation. + */ +static void +attachPartitionTable(List **wqueue, Relation rel, Relation attachrel, PartitionBoundSpec *bound) +{ + /* OK to create inheritance. Rest of the checks performed there */ + CreateInheritance(attachrel, rel, true); + + /* Update the pg_class entry. */ + StorePartitionBound(attachrel, rel, bound); + + /* Ensure there exists a correct set of indexes in the partition. */ + AttachPartitionEnsureIndexes(wqueue, rel, attachrel); + + /* and triggers */ + CloneRowTriggersToPartition(rel, attachrel); + + /* + * Clone foreign key constraints. Callee is responsible for setting up + * for phase 3 constraint verification. + */ + CloneForeignKeyConstraints(wqueue, rel, attachrel); +} + +/* * ALTER TABLE <name> ATTACH PARTITION <partition-name> FOR VALUES * * Return the address of the newly attached partition. @@ -19244,23 +19296,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, check_new_partition_bound(RelationGetRelationName(attachrel), rel, cmd->bound, pstate); - /* OK to create inheritance. Rest of the checks performed there */ - CreateInheritance(attachrel, rel, true); - - /* Update the pg_class entry. */ - StorePartitionBound(attachrel, rel, cmd->bound); - - /* Ensure there exists a correct set of indexes in the partition. */ - AttachPartitionEnsureIndexes(wqueue, rel, attachrel); - - /* and triggers */ - CloneRowTriggersToPartition(rel, attachrel); - - /* - * Clone foreign key constraints. Callee is responsible for setting up - * for phase 3 constraint verification. - */ - CloneForeignKeyConstraints(wqueue, rel, attachrel); + /* Attach a new partition to the partitioned table. */ + attachPartitionTable(wqueue, rel, attachrel, cmd->bound); /* * Generate partition constraint from the partition bound specification. @@ -20814,3 +20851,286 @@ GetAttributeStorage(Oid atttypid, const char *storagemode) return cstorage; } + +/* + * createPartitionTable: create table for new partition with given name + * (newPartName) like table (modelRelName) + * + * Emulates command: CREATE TABLE <newPartName> (LIKE <modelRelName> + * INCLUDING ALL EXCLUDING INDEXES EXCLUDING IDENTITY) + */ +static void +createPartitionTable(RangeVar *newPartName, RangeVar *modelRelName, + AlterTableUtilityContext *context) +{ + CreateStmt *createStmt; + TableLikeClause *tlc; + PlannedStmt *wrapper; + + createStmt = makeNode(CreateStmt); + createStmt->relation = newPartName; + createStmt->tableElts = NIL; + createStmt->inhRelations = NIL; + createStmt->constraints = NIL; + createStmt->options = NIL; + createStmt->oncommit = ONCOMMIT_NOOP; + createStmt->tablespacename = NULL; + createStmt->if_not_exists = false; + + tlc = makeNode(TableLikeClause); + tlc->relation = modelRelName; + + /* + * Indexes will be inherited on "attach new partitions" stage, after data + * moving. + */ + tlc->options = CREATE_TABLE_LIKE_ALL & ~(CREATE_TABLE_LIKE_INDEXES | CREATE_TABLE_LIKE_IDENTITY); + tlc->relationOid = InvalidOid; + createStmt->tableElts = lappend(createStmt->tableElts, tlc); + + /* Need to make a wrapper PlannedStmt. */ + wrapper = makeNode(PlannedStmt); + wrapper->commandType = CMD_UTILITY; + wrapper->canSetTag = false; + wrapper->utilityStmt = (Node *) createStmt; + wrapper->stmt_location = context->pstmt->stmt_location; + wrapper->stmt_len = context->pstmt->stmt_len; + + ProcessUtility(wrapper, + context->queryString, + false, + PROCESS_UTILITY_SUBCOMMAND, + NULL, + NULL, + None_Receiver, + NULL); +} + +/* + * moveMergedTablesRows: scan partitions to be merged (mergingPartitionsList) + * of the partitioned table (rel) and move rows into the new partition + * (newPartRel). + */ +static void +moveMergedTablesRows(Relation rel, List *mergingPartitionsList, + Relation newPartRel) +{ + CommandId mycid; + + /* The FSM is empty, so don't bother using it. */ + int ti_options = TABLE_INSERT_SKIP_FSM; + ListCell *listptr; + BulkInsertState bistate; /* state of bulk inserts for partition */ + TupleTableSlot *dstslot; + + mycid = GetCurrentCommandId(true); + + /* Prepare a BulkInsertState for table_tuple_insert. */ + bistate = GetBulkInsertState(); + + /* Create necessary tuple slot. */ + dstslot = MakeSingleTupleTableSlot(RelationGetDescr(newPartRel), + table_slot_callbacks(newPartRel)); + ExecStoreAllNullTuple(dstslot); + + foreach(listptr, mergingPartitionsList) + { + Relation mergingPartition = (Relation) lfirst(listptr); + TupleTableSlot *srcslot; + TupleConversionMap *tuple_map; + TableScanDesc scan; + Snapshot snapshot; + + /* Create tuple slot for new partition. */ + srcslot = MakeSingleTupleTableSlot(RelationGetDescr(mergingPartition), + table_slot_callbacks(mergingPartition)); + + /* + * Map computing for moving attributes of merged partition to new + * partition. + */ + tuple_map = convert_tuples_by_name(RelationGetDescr(mergingPartition), + RelationGetDescr(newPartRel)); + + /* Scan through the rows. */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = table_beginscan(mergingPartition, snapshot, 0, NULL); + + while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot)) + { + TupleTableSlot *insertslot; + bool insert_indexes; + + /* Extract data from old tuple. */ + slot_getallattrs(srcslot); + + if (tuple_map) + { + /* Need to use map for copy attributes. */ + insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, dstslot); + } + else + { + /* Copy attributes directly. */ + insertslot = dstslot; + + ExecClearTuple(insertslot); + + memcpy(insertslot->tts_values, srcslot->tts_values, + sizeof(Datum) * srcslot->tts_nvalid); + memcpy(insertslot->tts_isnull, srcslot->tts_isnull, + sizeof(bool) * srcslot->tts_nvalid); + + ExecStoreVirtualTuple(insertslot); + } + + /* + * Write the tuple out to the new relation. We ignore the + * 'insert_indexes' flag since newPartRel has no indexes anyway. + */ + (void) table_tuple_insert(newPartRel, insertslot, mycid, + ti_options, bistate, &insert_indexes); + + CHECK_FOR_INTERRUPTS(); + } + + table_endscan(scan); + UnregisterSnapshot(snapshot); + + if (tuple_map) + free_conversion_map(tuple_map); + + ExecDropSingleTupleTableSlot(srcslot); + } + + ExecDropSingleTupleTableSlot(dstslot); + FreeBulkInsertState(bistate); + + table_finish_bulk_insert(newPartRel, ti_options); +} + +/* + * ALTER TABLE <name> MERGE PARTITIONS <partition-list> INTO <partition-name> + */ +static void +ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel, + PartitionCmd *cmd, AlterTableUtilityContext *context) +{ + Relation newPartRel; + ListCell *listptr; + List *mergingPartitionsList = NIL; + Oid defaultPartOid; + char tmpRelName[NAMEDATALEN]; + RangeVar *mergePartName = cmd->name; + bool isSameName = false; + + /* + * Lock all merged partitions, check them and create list with partitions + * contexts. + */ + foreach(listptr, cmd->partlist) + { + RangeVar *name = (RangeVar *) lfirst(listptr); + Relation mergingPartition; + + /* + * We are going to detach and remove this partition: need to use + * exclusive lock for prevent DML-queries to the partition. + */ + mergingPartition = table_openrv(name, AccessExclusiveLock); + + if (mergingPartition->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot merge non-table partition \"%s\"", + RelationGetRelationName(mergingPartition)))); + + /* + * Checking that two partitions have the same name was before, in + * function transformPartitionCmdForMerge(). + */ + if (equal(name, cmd->name)) + /* One new partition can have the same name as merged partition. */ + isSameName = true; + + /* Store a next merging partition into the list. */ + mergingPartitionsList = lappend(mergingPartitionsList, + mergingPartition); + } + + /* Detach all merged partitions. */ + defaultPartOid = + get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true)); + foreach(listptr, mergingPartitionsList) + { + Relation mergingPartition = (Relation) lfirst(listptr); + + /* Remove the pg_inherits row first. */ + RemoveInheritance(mergingPartition, rel, false); + /* Do the final part of detaching. */ + DetachPartitionFinalize(rel, mergingPartition, false, defaultPartOid); + } + + /* Create table for new partition, use partitioned table as model. */ + if (isSameName) + { + /* Create partition table with generated temparary name. */ + sprintf(tmpRelName, "merge-%u-%X-tmp", RelationGetRelid(rel), MyProcPid); + mergePartName = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), + tmpRelName, -1); + } + createPartitionTable(mergePartName, + makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), -1), + context); + + /* + * Open the new partition and acquire exclusive lock on it. This will + * stop all the operations with partitioned table. This might seem + * excessive, but this is the way we make sure nobody is planning queries + * involving merging partitions. + */ + newPartRel = table_openrv(mergePartName, AccessExclusiveLock); + + /* Copy data from merged partitions to new partition. */ + moveMergedTablesRows(rel, mergingPartitionsList, newPartRel); + + /* + * Attach a new partition to the partitioned table. wqueue = NULL: + * verification for each cloned constraint is not need. + */ + attachPartitionTable(NULL, rel, newPartRel, cmd->bound); + + /* Unlock and drop merged partitions. */ + foreach(listptr, mergingPartitionsList) + { + ObjectAddress object; + Relation mergingPartition = (Relation) lfirst(listptr); + + /* Get relation id before table_close() call. */ + object.objectId = RelationGetRelid(mergingPartition); + object.classId = RelationRelationId; + object.objectSubId = 0; + + /* Keep the lock until commit. */ + table_close(mergingPartition, NoLock); + + performDeletion(&object, DROP_RESTRICT, 0); + } + list_free(mergingPartitionsList); + + /* Rename new partition if it is needed. */ + if (isSameName) + { + /* + * We must bump the command counter to make the new partition tuple + * visible for rename. + */ + CommandCounterIncrement(); + /* Rename partition. */ + RenameRelationInternal(RelationGetRelid(newPartRel), + cmd->name->relname, false, false); + } + /* Keep the lock until commit. */ + table_close(newPartRel, NoLock); +} |