aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/createas.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/createas.c')
-rw-r--r--src/backend/commands/createas.c423
1 files changed, 423 insertions, 0 deletions
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
new file mode 100644
index 00000000000..5173f5a3081
--- /dev/null
+++ b/src/backend/commands/createas.c
@@ -0,0 +1,423 @@
+/*-------------------------------------------------------------------------
+ *
+ * createas.c
+ * Execution of CREATE TABLE ... AS, a/k/a SELECT INTO
+ *
+ * We implement this by diverting the query's normal output to a
+ * specialized DestReceiver type.
+ *
+ * Formerly, this command was implemented as a variant of SELECT, which led
+ * to assorted legacy behaviors that we still try to preserve, notably that
+ * we must return a tuples-processed count in the completionTag.
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/commands/createas.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/reloptions.h"
+#include "access/sysattr.h"
+#include "access/xact.h"
+#include "catalog/toasting.h"
+#include "commands/createas.h"
+#include "commands/prepare.h"
+#include "commands/tablecmds.h"
+#include "parser/parse_clause.h"
+#include "rewrite/rewriteHandler.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+
+
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ IntoClause *into; /* target relation specification */
+ /* These fields are filled by intorel_startup: */
+ Relation rel; /* relation to write to */
+ CommandId output_cid; /* cmin to insert in output tuples */
+ int hi_options; /* heap_insert performance options */
+ BulkInsertState bistate; /* bulk insert state */
+} DR_intorel;
+
+static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
+static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static void intorel_shutdown(DestReceiver *self);
+static void intorel_destroy(DestReceiver *self);
+
+
+/*
+ * ExecCreateTableAs -- execute a CREATE TABLE AS command
+ */
+void
+ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
+ ParamListInfo params, char *completionTag)
+{
+ Query *query = (Query *) stmt->query;
+ IntoClause *into = stmt->into;
+ DestReceiver *dest;
+ List *rewritten;
+ PlannedStmt *plan;
+ QueryDesc *queryDesc;
+ ScanDirection dir;
+
+ /*
+ * Create the tuple receiver object and insert info it will need
+ */
+ dest = CreateIntoRelDestReceiver(into);
+
+ /*
+ * The contained Query could be a SELECT, or an EXECUTE utility command.
+ * If the latter, we just pass it off to ExecuteQuery.
+ */
+ Assert(IsA(query, Query));
+ if (query->commandType == CMD_UTILITY &&
+ IsA(query->utilityStmt, ExecuteStmt))
+ {
+ ExecuteStmt *estmt = (ExecuteStmt *) query->utilityStmt;
+
+ ExecuteQuery(estmt, into, queryString, params, dest, completionTag);
+
+ return;
+ }
+ Assert(query->commandType == CMD_SELECT);
+
+ /*
+ * Parse analysis was done already, but we still have to run the rule
+ * rewriter. We do not do AcquireRewriteLocks: we assume the query either
+ * came straight from the parser, or suitable locks were acquired by
+ * plancache.c.
+ *
+ * Because the rewriter and planner tend to scribble on the input, we make
+ * a preliminary copy of the source querytree. This prevents problems in
+ * the case that CTAS is in a portal or plpgsql function and is executed
+ * repeatedly. (See also the same hack in EXPLAIN and PREPARE.)
+ */
+ rewritten = QueryRewrite((Query *) copyObject(stmt->query));
+
+ /* SELECT should never rewrite to more or less than one SELECT query */
+ if (list_length(rewritten) != 1)
+ elog(ERROR, "unexpected rewrite result for CREATE TABLE AS SELECT");
+ query = (Query *) linitial(rewritten);
+ Assert(query->commandType == CMD_SELECT);
+
+ /* plan the query */
+ plan = pg_plan_query(query, 0, params);
+
+ /*
+ * Use a snapshot with an updated command ID to ensure this query sees
+ * results of any previously executed queries. (This could only matter
+ * if the planner executed an allegedly-stable function that changed
+ * the database contents, but let's do it anyway to be parallel to the
+ * EXPLAIN code path.)
+ */
+ PushCopiedSnapshot(GetActiveSnapshot());
+ UpdateActiveSnapshotCommandId();
+
+ /* Create a QueryDesc, redirecting output to our tuple receiver */
+ queryDesc = CreateQueryDesc(plan, queryString,
+ GetActiveSnapshot(), InvalidSnapshot,
+ dest, params, 0);
+
+ /* call ExecutorStart to prepare the plan for execution */
+ ExecutorStart(queryDesc, GetIntoRelEFlags(into));
+
+ /*
+ * Normally, we run the plan to completion; but if skipData is specified,
+ * just do tuple receiver startup and shutdown.
+ */
+ if (into->skipData)
+ dir = NoMovementScanDirection;
+ else
+ dir = ForwardScanDirection;
+
+ /* run the plan */
+ ExecutorRun(queryDesc, dir, 0L);
+
+ /* save the rowcount if we're given a completionTag to fill */
+ if (completionTag)
+ snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
+ "SELECT %u", queryDesc->estate->es_processed);
+
+ /* and clean up */
+ ExecutorFinish(queryDesc);
+ ExecutorEnd(queryDesc);
+
+ FreeQueryDesc(queryDesc);
+
+ PopActiveSnapshot();
+}
+
+/*
+ * GetIntoRelEFlags --- compute executor flags needed for CREATE TABLE AS
+ *
+ * This is exported because EXPLAIN and PREPARE need it too. (Note: those
+ * callers still need to deal explicitly with the skipData flag; since they
+ * use different methods for suppressing execution, it doesn't seem worth
+ * trying to encapsulate that part.)
+ */
+int
+GetIntoRelEFlags(IntoClause *intoClause)
+{
+ /*
+ * We need to tell the executor whether it has to produce OIDs or not,
+ * because it doesn't have enough information to do so itself (since we
+ * can't build the target relation until after ExecutorStart).
+ */
+ if (interpretOidsOption(intoClause->options))
+ return EXEC_FLAG_WITH_OIDS;
+ else
+ return EXEC_FLAG_WITHOUT_OIDS;
+}
+
+/*
+ * CreateIntoRelDestReceiver -- create a suitable DestReceiver object
+ *
+ * intoClause will be NULL if called from CreateDestReceiver(), in which
+ * case it has to be provided later. However, it is convenient to allow
+ * self->into to be filled in immediately for other callers.
+ */
+DestReceiver *
+CreateIntoRelDestReceiver(IntoClause *intoClause)
+{
+ DR_intorel *self = (DR_intorel *) palloc0(sizeof(DR_intorel));
+
+ self->pub.receiveSlot = intorel_receive;
+ self->pub.rStartup = intorel_startup;
+ self->pub.rShutdown = intorel_shutdown;
+ self->pub.rDestroy = intorel_destroy;
+ self->pub.mydest = DestIntoRel;
+ self->into = intoClause;
+ /* other private fields will be set during intorel_startup */
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * intorel_startup --- executor startup
+ */
+static void
+intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+ IntoClause *into = myState->into;
+ CreateStmt *create;
+ Oid intoRelationId;
+ Relation intoRelationDesc;
+ RangeTblEntry *rte;
+ Datum toast_options;
+ ListCell *lc;
+ int attnum;
+ static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
+
+ Assert(into != NULL); /* else somebody forgot to set it */
+
+ /*
+ * Create the target relation by faking up a CREATE TABLE parsetree and
+ * passing it to DefineRelation.
+ */
+ create = makeNode(CreateStmt);
+ create->relation = into->rel;
+ create->tableElts = NIL; /* will fill below */
+ create->inhRelations = NIL;
+ create->ofTypename = NULL;
+ create->constraints = NIL;
+ create->options = into->options;
+ create->oncommit = into->onCommit;
+ create->tablespacename = into->tableSpaceName;
+ create->if_not_exists = false;
+
+ /*
+ * Build column definitions using "pre-cooked" type and collation info.
+ * If a column name list was specified in CREATE TABLE AS, override the
+ * column names derived from the query. (Too few column names are OK, too
+ * many are not.)
+ */
+ lc = list_head(into->colNames);
+ for (attnum = 0; attnum < typeinfo->natts; attnum++)
+ {
+ Form_pg_attribute attribute = typeinfo->attrs[attnum];
+ ColumnDef *col = makeNode(ColumnDef);
+ TypeName *coltype = makeNode(TypeName);
+
+ if (lc)
+ {
+ col->colname = strVal(lfirst(lc));
+ lc = lnext(lc);
+ }
+ else
+ col->colname = NameStr(attribute->attname);
+ col->typeName = coltype;
+ col->inhcount = 0;
+ col->is_local = true;
+ col->is_not_null = false;
+ col->is_from_type = false;
+ col->storage = 0;
+ col->raw_default = NULL;
+ col->cooked_default = NULL;
+ col->collClause = NULL;
+ col->collOid = attribute->attcollation;
+ col->constraints = NIL;
+ col->fdwoptions = NIL;
+
+ coltype->names = NIL;
+ coltype->typeOid = attribute->atttypid;
+ coltype->setof = false;
+ coltype->pct_type = false;
+ coltype->typmods = NIL;
+ coltype->typemod = attribute->atttypmod;
+ coltype->arrayBounds = NIL;
+ coltype->location = -1;
+
+ /*
+ * It's possible that the column is of a collatable type but the
+ * collation could not be resolved, so double-check. (We must
+ * check this here because DefineRelation would adopt the type's
+ * default collation rather than complaining.)
+ */
+ if (!OidIsValid(col->collOid) &&
+ type_is_collatable(coltype->typeOid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INDETERMINATE_COLLATION),
+ errmsg("no collation was derived for column \"%s\" with collatable type %s",
+ col->colname, format_type_be(coltype->typeOid)),
+ errhint("Use the COLLATE clause to set the collation explicitly.")));
+
+ create->tableElts = lappend(create->tableElts, col);
+ }
+
+ if (lc != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("CREATE TABLE AS specifies too many column names")));
+
+ /*
+ * Actually create the target table
+ */
+ intoRelationId = DefineRelation(create, RELKIND_RELATION, InvalidOid);
+
+ /*
+ * If necessary, create a TOAST table for the target table. Note that
+ * AlterTableCreateToastTable ends with CommandCounterIncrement(), so that
+ * the TOAST table will be visible for insertion.
+ */
+ CommandCounterIncrement();
+
+ /* parse and validate reloptions for the toast table */
+ toast_options = transformRelOptions((Datum) 0,
+ create->options,
+ "toast",
+ validnsps,
+ true, false);
+
+ (void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
+
+ AlterTableCreateToastTable(intoRelationId, toast_options);
+
+ /*
+ * Finally we can open the target table
+ */
+ intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
+
+ /*
+ * Check INSERT permission on the constructed table.
+ *
+ * XXX: It would arguably make sense to skip this check if into->skipData
+ * is true.
+ */
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = intoRelationId;
+ rte->relkind = RELKIND_RELATION;
+ rte->requiredPerms = ACL_INSERT;
+
+ for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
+ rte->modifiedCols = bms_add_member(rte->modifiedCols,
+ attnum - FirstLowInvalidHeapAttributeNumber);
+
+ ExecCheckRTPerms(list_make1(rte), true);
+
+ /*
+ * Fill private fields of myState for use by later routines
+ */
+ myState->rel = intoRelationDesc;
+ myState->output_cid = GetCurrentCommandId(true);
+
+ /*
+ * We can skip WAL-logging the insertions, unless PITR or streaming
+ * replication is in use. We can skip the FSM in any case.
+ */
+ myState->hi_options = HEAP_INSERT_SKIP_FSM |
+ (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+ myState->bistate = GetBulkInsertState();
+
+ /* Not using WAL requires smgr_targblock be initially invalid */
+ Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+}
+
+/*
+ * intorel_receive --- receive one tuple
+ */
+static void
+intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+ HeapTuple tuple;
+
+ /*
+ * get the heap tuple out of the tuple table slot, making sure we have a
+ * writable copy
+ */
+ tuple = ExecMaterializeSlot(slot);
+
+ /*
+ * force assignment of new OID (see comments in ExecInsert)
+ */
+ if (myState->rel->rd_rel->relhasoids)
+ HeapTupleSetOid(tuple, InvalidOid);
+
+ heap_insert(myState->rel,
+ tuple,
+ myState->output_cid,
+ myState->hi_options,
+ myState->bistate);
+
+ /* We know this is a newly created relation, so there are no indexes */
+}
+
+/*
+ * intorel_shutdown --- executor end
+ */
+static void
+intorel_shutdown(DestReceiver *self)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+
+ FreeBulkInsertState(myState->bistate);
+
+ /* If we skipped using WAL, must heap_sync before commit */
+ if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(myState->rel);
+
+ /* close rel, but keep lock until commit */
+ heap_close(myState->rel, NoLock);
+ myState->rel = NULL;
+}
+
+/*
+ * intorel_destroy --- release DestReceiver object
+ */
+static void
+intorel_destroy(DestReceiver *self)
+{
+ pfree(self);
+}