aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/copy.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2006-08-30 23:34:22 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2006-08-30 23:34:22 +0000
commit85188ab8838bf19cdf12298e1b6c29e12f9b9a3c (patch)
tree285f4ee5fe8c623b8eea4caa5a664aeffd3fe96a /src/backend/commands/copy.c
parent0d5065781dd1486d57357c49384a034b45bb027a (diff)
downloadpostgresql-85188ab8838bf19cdf12298e1b6c29e12f9b9a3c.tar.gz
postgresql-85188ab8838bf19cdf12298e1b6c29e12f9b9a3c.zip
Extend COPY to support COPY (SELECT ...) TO ...
Bernd Helmle
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r--src/backend/commands/copy.c809
1 files changed, 519 insertions, 290 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 4242c1aff1c..569d86eee2b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.268 2006/07/14 14:52:18 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.269 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,6 +31,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
@@ -99,18 +100,21 @@ typedef struct CopyStateData
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
+ QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
+ char *filename; /* filename, or NULL for STDIN/STDOUT */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
char *null_print; /* NULL marker string (server encoding!) */
int null_print_len; /* length of same */
+ char *null_print_client; /* same converted to client encoding */
char *delim; /* column delimiter (must be 1 byte) */
char *quote; /* CSV quote char (must be 1 byte) */
char *escape; /* CSV escape char (must be 1 byte) */
- List *force_quote_atts; /* integer list of attnums to FQ */
- List *force_notnull_atts; /* integer list of attnums to FNN */
+ bool *force_quote_flags; /* per-column CSV FQ flags */
+ bool *force_notnull_flags; /* per-column CSV FNN flags */
/* these are just for error messages, see copy_in_error_callback */
const char *cur_relname; /* table name for error messages */
@@ -119,6 +123,12 @@ typedef struct CopyStateData
const char *cur_attval; /* current att value for error messages */
/*
+ * Working state for COPY TO
+ */
+ FmgrInfo *out_functions; /* lookup info for output functions */
+ MemoryContext rowcontext; /* per-row evaluation context */
+
+ /*
* These variables are used to reduce overhead in textual COPY FROM.
*
* attribute_buf holds the separated, de-escaped text for each field of
@@ -153,6 +163,13 @@ typedef struct CopyStateData
typedef CopyStateData *CopyState;
+/* DestReceiver for COPY (SELECT) TO */
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ CopyState cstate; /* CopyStateData for the command */
+} DR_copy;
+
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
@@ -225,6 +242,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static void DoCopyTo(CopyState cstate);
static void CopyTo(CopyState cstate);
+static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
+ Datum *values, bool *nulls);
static void CopyFrom(CopyState cstate);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
@@ -239,7 +258,8 @@ static Datum CopyReadBinaryAttribute(CopyState cstate,
static void CopyAttributeOutText(CopyState cstate, char *string);
static void CopyAttributeOutCSV(CopyState cstate, char *string,
bool use_quote, bool single_attr);
-static List *CopyGetAttnums(Relation rel, List *attnamelist);
+static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
+ List *attnamelist);
static char *limit_printout_length(const char *str);
/* Low-level communications functions */
@@ -668,7 +688,8 @@ CopyLoadRawBuf(CopyState cstate)
* DoCopy executes the SQL COPY statement.
*
* Either unload or reload contents of table <relation>, depending on <from>.
- * (<from> = TRUE means we are inserting into the table.)
+ * (<from> = TRUE means we are inserting into the table.) In the "TO" case
+ * we also support copying the output of an arbitrary SELECT query.
*
* If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular
@@ -697,8 +718,6 @@ uint64
DoCopy(const CopyStmt *stmt)
{
CopyState cstate;
- RangeVar *relation = stmt->relation;
- char *filename = stmt->filename;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
List *attnamelist = stmt->attlist;
@@ -707,6 +726,8 @@ DoCopy(const CopyStmt *stmt)
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
ListCell *option;
+ TupleDesc tupDesc;
+ int num_phys_attrs;
uint64 processed;
/* Allocate workspace and zero all fields */
@@ -920,23 +941,7 @@ DoCopy(const CopyStmt *stmt)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
- /* Open and lock the relation, using the appropriate lock type. */
- cstate->rel = heap_openrv(relation,
- (is_from ? RowExclusiveLock : AccessShareLock));
-
- /* check read-only transaction */
- if (XactReadOnly && is_from &&
- !isTempNamespace(RelationGetNamespace(cstate->rel)))
- ereport(ERROR,
- (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
- errmsg("transaction is read-only")));
-
- /* Check permissions. */
- aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel), GetUserId(),
- required_access);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, ACL_KIND_CLASS,
- RelationGetRelationName(cstate->rel));
+ /* Disallow file COPY except to superusers. */
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -944,26 +949,137 @@ DoCopy(const CopyStmt *stmt)
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
- /* Don't allow COPY w/ OIDs to or from a table without them */
- if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("table \"%s\" does not have OIDs",
- RelationGetRelationName(cstate->rel))));
+ if (stmt->relation)
+ {
+ Assert(!stmt->query);
+ cstate->queryDesc = NULL;
+
+ /* Open and lock the relation, using the appropriate lock type. */
+ cstate->rel = heap_openrv(stmt->relation,
+ (is_from ? RowExclusiveLock : AccessShareLock));
+
+ /* Check relation permissions. */
+ aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel),
+ GetUserId(),
+ required_access);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_CLASS,
+ RelationGetRelationName(cstate->rel));
+
+ /* check read-only transaction */
+ if (XactReadOnly && is_from &&
+ !isTempNamespace(RelationGetNamespace(cstate->rel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("transaction is read-only")));
+
+ /* Don't allow COPY w/ OIDs to or from a table without them */
+ if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("table \"%s\" does not have OIDs",
+ RelationGetRelationName(cstate->rel))));
+
+ tupDesc = RelationGetDescr(cstate->rel);
+ }
+ else
+ {
+ Query *query = stmt->query;
+ List *rewritten;
+ Plan *plan;
+ DestReceiver *dest;
+
+ Assert(query);
+ Assert(!is_from);
+ cstate->rel = NULL;
+
+ /* Don't allow COPY w/ OIDs from a select */
+ if (cstate->oids)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY (SELECT) WITH OIDS is not supported")));
+
+ if (query->into)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY (SELECT INTO) is not supported")));
+
+ /*
+ * The query has already been through parse analysis, but not
+ * rewriting or planning. Do that now.
+ *
+ * Because the planner is not cool about not scribbling on its input,
+ * we make a preliminary copy of the source querytree. This prevents
+ * problems in the case that the COPY is in a portal or plpgsql
+ * function and is executed repeatedly. (See also the same hack in
+ * EXPLAIN, DECLARE CURSOR and PREPARE.) XXX the planner really
+ * shouldn't modify its input ... FIXME someday.
+ */
+ query = copyObject(query);
+ Assert(query->commandType == CMD_SELECT);
+
+ /*
+ * Must acquire locks in case we didn't come fresh from the parser.
+ * XXX this also scribbles on query, another reason for copyObject
+ */
+ AcquireRewriteLocks(query);
+
+ /* Rewrite through rule system */
+ rewritten = QueryRewrite(query);
+
+ /* We don't expect more or less than one result query */
+ if (list_length(rewritten) != 1)
+ elog(ERROR, "unexpected rewrite result");
+
+ query = (Query *) linitial(rewritten);
+ Assert(query->commandType == CMD_SELECT);
+
+ /* plan the query */
+ plan = planner(query, false, 0, NULL);
+
+ /*
+ * Update snapshot command ID to ensure this query sees results of any
+ * previously executed queries. (It's a bit cheesy to modify
+ * ActiveSnapshot without making a copy, but for the limited ways in
+ * which COPY can be invoked, I think it's OK, because the active
+ * snapshot shouldn't be shared with anything else anyway.)
+ */
+ ActiveSnapshot->curcid = GetCurrentCommandId();
+
+ /* Create dest receiver for COPY OUT */
+ dest = CreateDestReceiver(DestCopyOut, NULL);
+ ((DR_copy *) dest)->cstate = cstate;
+
+ /* Create a QueryDesc requesting no output */
+ cstate->queryDesc = CreateQueryDesc(query, plan,
+ ActiveSnapshot, InvalidSnapshot,
+ dest, NULL, false);
+
+ /*
+ * Call ExecutorStart to prepare the plan for execution.
+ *
+ * ExecutorStart computes a result tupdesc for us
+ */
+ ExecutorStart(cstate->queryDesc, 0);
+
+ tupDesc = cstate->queryDesc->tupDesc;
+ }
/* Generate or convert list of attributes to process */
- cstate->attnumlist = CopyGetAttnums(cstate->rel, attnamelist);
+ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
+
+ num_phys_attrs = tupDesc->natts;
- /* Convert FORCE QUOTE name list to column numbers, check validity */
+ /* Convert FORCE QUOTE name list to per-column flags, check validity */
+ cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_quote)
{
- TupleDesc tupDesc = RelationGetDescr(cstate->rel);
- Form_pg_attribute *attr = tupDesc->attrs;
+ List *attnums;
ListCell *cur;
- cstate->force_quote_atts = CopyGetAttnums(cstate->rel, force_quote);
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
- foreach(cur, cstate->force_quote_atts)
+ foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
@@ -971,21 +1087,21 @@ DoCopy(const CopyStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
- NameStr(attr[attnum - 1]->attname))));
+ NameStr(tupDesc->attrs[attnum - 1]->attname))));
+ cstate->force_quote_flags[attnum - 1] = true;
}
}
- /* Convert FORCE NOT NULL name list to column numbers, check validity */
+ /* Convert FORCE NOT NULL name list to per-column flags, check validity */
+ cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_notnull)
{
- TupleDesc tupDesc = RelationGetDescr(cstate->rel);
- Form_pg_attribute *attr = tupDesc->attrs;
+ List *attnums;
ListCell *cur;
- cstate->force_notnull_atts = CopyGetAttnums(cstate->rel,
- force_notnull);
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
- foreach(cur, cstate->force_notnull_atts)
+ foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
@@ -993,7 +1109,8 @@ DoCopy(const CopyStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
- NameStr(attr[attnum - 1]->attname))));
+ NameStr(tupDesc->attrs[attnum - 1]->attname))));
+ cstate->force_notnull_flags[attnum - 1] = true;
}
}
@@ -1018,67 +1135,59 @@ DoCopy(const CopyStmt *stmt)
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
cstate->copy_dest = COPY_FILE; /* default */
+ cstate->filename = stmt->filename;
- if (is_from)
- { /* copy from file to database */
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
- {
- if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to view \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to sequence \"%s\"",
- RelationGetRelationName(cstate->rel))));
- else
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot copy to non-table relation \"%s\"",
- RelationGetRelationName(cstate->rel))));
- }
- if (pipe)
- {
- if (whereToSendOutput == DestRemote)
- ReceiveCopyBegin(cstate);
- else
- cstate->copy_file = stdin;
- }
- else
- {
- struct stat st;
+ if (is_from) /* copy from file to database */
+ CopyFrom(cstate);
+ else /* copy from database to file */
+ DoCopyTo(cstate);
- cstate->copy_file = AllocateFile(filename, PG_BINARY_R);
+ /*
+ * Close the relation or query. If reading, we can release the
+ * AccessShareLock we got; if writing, we should hold the lock until end
+ * of transaction to ensure that updates will be committed before lock is
+ * released.
+ */
+ if (cstate->rel)
+ heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
+ else
+ {
+ /* Close down the query and free resources. */
+ ExecutorEnd(cstate->queryDesc);
+ FreeQueryDesc(cstate->queryDesc);
+ }
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for reading: %m",
- filename)));
+ /* Clean up storage (probably not really necessary) */
+ processed = cstate->processed;
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- {
- FreeFile(cstate->copy_file);
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", filename)));
- }
- }
+ pfree(cstate->attribute_buf.data);
+ pfree(cstate->line_buf.data);
+ pfree(cstate->raw_buf);
+ pfree(cstate);
- CopyFrom(cstate);
- }
- else
- { /* copy from database to file */
+ return processed;
+}
+
+
+/*
+ * This intermediate routine exists mainly to localize the effects of setjmp
+ * so we don't need to plaster a lot of variables with "volatile".
+ */
+static void
+DoCopyTo(CopyState cstate)
+{
+ bool pipe = (cstate->filename == NULL);
+
+ if (cstate->rel)
+ {
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
- RelationGetRelationName(cstate->rel))));
+ RelationGetRelationName(cstate->rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -1090,86 +1199,49 @@ DoCopy(const CopyStmt *stmt)
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
- if (pipe)
- {
- if (whereToSendOutput == DestRemote)
- cstate->fe_copy = true;
- else
- cstate->copy_file = stdout;
- }
- else
- {
- mode_t oumask; /* Pre-existing umask value */
- struct stat st;
-
- /*
- * Prevent write to relative path ... too easy to shoot oneself in
- * the foot by overwriting a database file ...
- */
- if (!is_absolute_path(filename))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("relative path not allowed for COPY to file")));
-
- oumask = umask((mode_t) 022);
- cstate->copy_file = AllocateFile(filename, PG_BINARY_W);
- umask(oumask);
-
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for writing: %m",
- filename)));
-
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- {
- FreeFile(cstate->copy_file);
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", filename)));
- }
- }
-
- DoCopyTo(cstate);
}
- if (!pipe)
+ if (pipe)
{
- /* we assume only the write case could fail here */
- if (FreeFile(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write to file \"%s\": %m",
- filename)));
+ if (whereToSendOutput == DestRemote)
+ cstate->fe_copy = true;
+ else
+ cstate->copy_file = stdout;
}
+ else
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
- /*
- * Close the relation. If reading, we can release the AccessShareLock we
- * got; if writing, we should hold the lock until end of transaction to
- * ensure that updates will be committed before lock is released.
- */
- heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
-
- /* Clean up storage (probably not really necessary) */
- processed = cstate->processed;
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(cstate->filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for COPY to file")));
- pfree(cstate->attribute_buf.data);
- pfree(cstate->line_buf.data);
- pfree(cstate->raw_buf);
- pfree(cstate);
+ oumask = umask((mode_t) 022);
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ umask(oumask);
- return processed;
-}
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->filename)));
+ fstat(fileno(cstate->copy_file), &st);
+ if (S_ISDIR(st.st_mode))
+ {
+ FreeFile(cstate->copy_file);
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
+ }
-/*
- * This intermediate routine just exists to localize the effects of setjmp
- * so we don't need to plaster a lot of variables with "volatile".
- */
-static void
-DoCopyTo(CopyState cstate)
-{
PG_TRY();
{
if (cstate->fe_copy)
@@ -1191,40 +1263,41 @@ DoCopyTo(CopyState cstate)
PG_RE_THROW();
}
PG_END_TRY();
+
+ if (!pipe)
+ {
+ if (FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ cstate->filename)));
+ }
}
/*
- * Copy from relation TO file.
+ * Copy from relation or query TO file.
*/
static void
CopyTo(CopyState cstate)
{
- HeapTuple tuple;
TupleDesc tupDesc;
- HeapScanDesc scandesc;
int num_phys_attrs;
- int attr_count;
Form_pg_attribute *attr;
- FmgrInfo *out_functions;
- bool *force_quote;
- char *string;
- char *null_print_client;
ListCell *cur;
- MemoryContext oldcontext;
- MemoryContext mycontext;
- tupDesc = cstate->rel->rd_att;
+ if (cstate->rel)
+ tupDesc = RelationGetDescr(cstate->rel);
+ else
+ tupDesc = cstate->queryDesc->tupDesc;
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
- attr_count = list_length(cstate->attnumlist);
- null_print_client = cstate->null_print; /* default */
+ cstate->null_print_client = cstate->null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
/* Get info about the columns we need to process. */
- out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- force_quote = (bool *) palloc(num_phys_attrs * sizeof(bool));
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
@@ -1239,12 +1312,7 @@ CopyTo(CopyState cstate)
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
- fmgr_info(out_func_oid, &out_functions[attnum - 1]);
-
- if (list_member_int(cstate->force_quote_atts, attnum))
- force_quote[attnum - 1] = true;
- else
- force_quote[attnum - 1] = false;
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
/*
@@ -1253,11 +1321,11 @@ CopyTo(CopyState cstate)
* datatype output routines, and should be faster than retail pfree's
* anyway. (We don't need a whole econtext as CopyFrom does.)
*/
- mycontext = AllocSetContextCreate(CurrentMemoryContext,
- "COPY TO",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "COPY TO",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
if (cstate->binary)
{
@@ -1282,7 +1350,7 @@ CopyTo(CopyState cstate)
* encoding, because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
- null_print_client = pg_server_to_client(cstate->null_print,
+ cstate->null_print_client = pg_server_to_client(cstate->null_print,
cstate->null_print_len);
/* if a header has been requested send the line */
@@ -1309,113 +1377,139 @@ CopyTo(CopyState cstate)
}
}
- scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL);
-
- while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+ if (cstate->rel)
{
- bool need_delim = false;
+ Datum *values;
+ bool *nulls;
+ HeapScanDesc scandesc;
+ HeapTuple tuple;
- CHECK_FOR_INTERRUPTS();
+ values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
+ nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
- MemoryContextReset(mycontext);
- oldcontext = MemoryContextSwitchTo(mycontext);
+ scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL);
- if (cstate->binary)
+ while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
- /* Binary per-tuple header */
- CopySendInt16(cstate, attr_count);
- /* Send OID if wanted --- note attr_count doesn't include it */
- if (cstate->oids)
- {
- Oid oid = HeapTupleGetOid(tuple);
+ CHECK_FOR_INTERRUPTS();
- /* Hack --- assume Oid is same size as int32 */
- CopySendInt32(cstate, sizeof(int32));
- CopySendInt32(cstate, oid);
- }
+ /* Deconstruct the tuple ... faster than repeated heap_getattr */
+ heap_deform_tuple(tuple, tupDesc, values, nulls);
+
+ /* Format and send the data */
+ CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
}
- else
+
+ heap_endscan(scandesc);
+ }
+ else
+ {
+ /* run the plan --- the dest receiver will send tuples */
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ }
+
+ if (cstate->binary)
+ {
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+ }
+
+ MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Emit one row during CopyTo().
+ */
+static void
+CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ MemoryContext oldcontext;
+ ListCell *cur;
+ char *string;
+
+ MemoryContextReset(cstate->rowcontext);
+ oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
+
+ if (cstate->binary)
+ {
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+ /* Send OID if wanted --- note attnumlist doesn't include it */
+ if (cstate->oids)
{
- /* Text format has no per-tuple header, but send OID if wanted */
- /* Assume digits don't need any quoting or encoding conversion */
- if (cstate->oids)
- {
- string = DatumGetCString(DirectFunctionCall1(oidout,
- ObjectIdGetDatum(HeapTupleGetOid(tuple))));
- CopySendString(cstate, string);
- need_delim = true;
- }
+ /* Hack --- assume Oid is same size as int32 */
+ CopySendInt32(cstate, sizeof(int32));
+ CopySendInt32(cstate, tupleOid);
}
-
- foreach(cur, cstate->attnumlist)
+ }
+ else
+ {
+ /* Text format has no per-tuple header, but send OID if wanted */
+ /* Assume digits don't need any quoting or encoding conversion */
+ if (cstate->oids)
{
- int attnum = lfirst_int(cur);
- Datum value;
- bool isnull;
+ string = DatumGetCString(DirectFunctionCall1(oidout,
+ ObjectIdGetDatum(tupleOid)));
+ CopySendString(cstate, string);
+ need_delim = true;
+ }
+ }
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = values[attnum - 1];
+ bool isnull = nulls[attnum - 1];
- value = heap_getattr(tuple, attnum, tupDesc, &isnull);
+ if (!cstate->binary)
+ {
+ if (need_delim)
+ CopySendChar(cstate, cstate->delim[0]);
+ need_delim = true;
+ }
+ if (isnull)
+ {
+ if (!cstate->binary)
+ CopySendString(cstate, cstate->null_print_client);
+ else
+ CopySendInt32(cstate, -1);
+ }
+ else
+ {
if (!cstate->binary)
{
- if (need_delim)
- CopySendChar(cstate, cstate->delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->binary)
- CopySendString(cstate, null_print_client);
+ string = OutputFunctionCall(&out_functions[attnum - 1],
+ value);
+ if (cstate->csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
else
- CopySendInt32(cstate, -1);
+ CopyAttributeOutText(cstate, string);
}
else
{
- if (!cstate->binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->csv_mode)
- CopyAttributeOutCSV(cstate, string,
- force_quote[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
+ bytea *outputbytes;
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+ value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
}
}
-
- CopySendEndOfRow(cstate);
-
- MemoryContextSwitchTo(oldcontext);
-
- cstate->processed++;
- }
-
- heap_endscan(scandesc);
-
- if (cstate->binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
}
- MemoryContextDelete(mycontext);
+ CopySendEndOfRow(cstate);
- pfree(out_functions);
- pfree(force_quote);
+ MemoryContextSwitchTo(oldcontext);
+
+ cstate->processed++;
}
@@ -1528,6 +1622,7 @@ limit_printout_length(const char *str)
static void
CopyFrom(CopyState cstate)
{
+ bool pipe = (cstate->filename == NULL);
HeapTuple tuple;
TupleDesc tupDesc;
Form_pg_attribute *attr;
@@ -1538,7 +1633,6 @@ CopyFrom(CopyState cstate)
FmgrInfo oid_in_function;
Oid *typioparams;
Oid oid_typioparam;
- bool *force_notnull;
int attnum;
int i;
Oid in_func_oid;
@@ -1558,6 +1652,56 @@ CopyFrom(CopyState cstate)
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
+ Assert(cstate->rel);
+
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+ {
+ if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to view \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to sequence \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to non-table relation \"%s\"",
+ RelationGetRelationName(cstate->rel))));
+ }
+
+ if (pipe)
+ {
+ if (whereToSendOutput == DestRemote)
+ ReceiveCopyBegin(cstate);
+ else
+ cstate->copy_file = stdin;
+ }
+ else
+ {
+ struct stat st;
+
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for reading: %m",
+ cstate->filename)));
+
+ fstat(fileno(cstate->copy_file), &st);
+ if (S_ISDIR(st.st_mode))
+ {
+ FreeFile(cstate->copy_file);
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
+ }
+
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
@@ -1599,7 +1743,6 @@ CopyFrom(CopyState cstate)
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
- force_notnull = (bool *) palloc(num_phys_attrs * sizeof(bool));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
@@ -1616,11 +1759,6 @@ CopyFrom(CopyState cstate)
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
- if (list_member_int(cstate->force_notnull_atts, attnum))
- force_notnull[attnum - 1] = true;
- else
- force_notnull[attnum - 1] = false;
-
/* Get default info if needed */
if (!list_member_int(cstate->attnumlist, attnum))
{
@@ -1810,7 +1948,8 @@ CopyFrom(CopyState cstate)
NameStr(attr[m]->attname))));
string = field_strings[fieldno++];
- if (cstate->csv_mode && string == NULL && force_notnull[m])
+ if (cstate->csv_mode && string == NULL &&
+ cstate->force_notnull_flags[m])
{
/* Go ahead and read the NULL string */
string = cstate->null_print;
@@ -1972,13 +2111,21 @@ CopyFrom(CopyState cstate)
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
- pfree(force_notnull);
ExecDropSingleTupleTableSlot(slot);
ExecCloseIndices(resultRelInfo);
FreeExecutorState(estate);
+
+ if (!pipe)
+ {
+ if (FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from file \"%s\": %m",
+ cstate->filename)));
+ }
}
@@ -3055,16 +3202,17 @@ CopyAttributeOutCSV(CopyState cstate, char *string,
* The input attnamelist is either the user-specified column list,
* or NIL if there was none (in which case we want all the non-dropped
* columns).
+ *
+ * rel can be NULL ... it's only used for error reports.
*/
static List *
-CopyGetAttnums(Relation rel, List *attnamelist)
+CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
- TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
int attr_count = tupDesc->natts;
int i;
@@ -3085,15 +3233,33 @@ CopyGetAttnums(Relation rel, List *attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
+ int i;
/* Lookup column name */
- /* Note we disallow system columns here */
- attnum = attnameAttNum(rel, name, false);
+ attnum = InvalidAttrNumber;
+ for (i = 0; i < tupDesc->natts; i++)
+ {
+ if (tupDesc->attrs[i]->attisdropped)
+ continue;
+ if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
+ {
+ attnum = tupDesc->attrs[i]->attnum;
+ break;
+ }
+ }
if (attnum == InvalidAttrNumber)
- ereport(ERROR,
+ {
+ if (rel != NULL)
+ ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
- name, RelationGetRelationName(rel))));
+ name, RelationGetRelationName(rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" does not exist",
+ name)));
+ }
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
@@ -3106,3 +3272,66 @@ CopyGetAttnums(Relation rel, List *attnamelist)
return attnums;
}
+
+
+/*
+ * copy_dest_startup --- executor startup
+ */
+static void
+copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ /* no-op */
+}
+
+/*
+ * copy_dest_receive --- receive one tuple
+ */
+static void
+copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_copy *myState = (DR_copy *) self;
+ CopyState cstate = myState->cstate;
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /* And send the data */
+ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
+}
+
+/*
+ * copy_dest_shutdown --- executor end
+ */
+static void
+copy_dest_shutdown(DestReceiver *self)
+{
+ /* no-op */
+}
+
+/*
+ * copy_dest_destroy --- release DestReceiver object
+ */
+static void
+copy_dest_destroy(DestReceiver *self)
+{
+ pfree(self);
+}
+
+/*
+ * CreateCopyDestReceiver -- create a suitable DestReceiver object
+ */
+DestReceiver *
+CreateCopyDestReceiver(void)
+{
+ DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
+
+ self->pub.receiveSlot = copy_dest_receive;
+ self->pub.rStartup = copy_dest_startup;
+ self->pub.rShutdown = copy_dest_shutdown;
+ self->pub.rDestroy = copy_dest_destroy;
+ self->pub.mydest = DestCopyOut;
+
+ self->cstate = NULL; /* will be set later */
+
+ return (DestReceiver *) self;
+}