diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2006-08-30 23:34:22 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2006-08-30 23:34:22 +0000 |
commit | 85188ab8838bf19cdf12298e1b6c29e12f9b9a3c (patch) | |
tree | 285f4ee5fe8c623b8eea4caa5a664aeffd3fe96a /src/backend/commands/copy.c | |
parent | 0d5065781dd1486d57357c49384a034b45bb027a (diff) | |
download | postgresql-85188ab8838bf19cdf12298e1b6c29e12f9b9a3c.tar.gz postgresql-85188ab8838bf19cdf12298e1b6c29e12f9b9a3c.zip |
Extend COPY to support COPY (SELECT ...) TO ...
Bernd Helmle
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 809 |
1 files changed, 519 insertions, 290 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 4242c1aff1c..569d86eee2b 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.268 2006/07/14 14:52:18 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.269 2006/08/30 23:34:21 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "optimizer/planner.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -99,18 +100,21 @@ typedef struct CopyStateData /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ + QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN/STDOUT */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */ bool header_line; /* CSV header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ + char *null_print_client; /* same converted to client encoding */ char *delim; /* column delimiter (must be 1 byte) */ char *quote; /* CSV quote char (must be 1 byte) */ char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote_atts; /* integer list of attnums to FQ */ - List *force_notnull_atts; /* integer list of attnums to FNN */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ /* these are just for error messages, see copy_in_error_callback */ const char *cur_relname; /* table name for error messages */ @@ -119,6 +123,12 @@ typedef struct CopyStateData const char *cur_attval; /* current att value for error messages */ /* + * Working state for COPY TO + */ + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + + /* * These variables are used to reduce overhead in textual COPY FROM. * * attribute_buf holds the separated, de-escaped text for each field of @@ -153,6 +163,13 @@ typedef struct CopyStateData typedef CopyStateData *CopyState; +/* DestReceiver for COPY (SELECT) TO */ +typedef struct +{ + DestReceiver pub; /* publicly-known function pointers */ + CopyState cstate; /* CopyStateData for the command */ +} DR_copy; + /* * These macros centralize code used to process line_buf and raw_buf buffers. @@ -225,6 +242,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ static void DoCopyTo(CopyState cstate); static void CopyTo(CopyState cstate); +static void CopyOneRowTo(CopyState cstate, Oid tupleOid, + Datum *values, bool *nulls); static void CopyFrom(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); @@ -239,7 +258,8 @@ static Datum CopyReadBinaryAttribute(CopyState cstate, static void CopyAttributeOutText(CopyState cstate, char *string); static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr); -static List *CopyGetAttnums(Relation rel, List *attnamelist); +static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, + List *attnamelist); static char *limit_printout_length(const char *str); /* Low-level communications functions */ @@ -668,7 +688,8 @@ CopyLoadRawBuf(CopyState cstate) * DoCopy executes the SQL COPY statement. * * Either unload or reload contents of table <relation>, depending on <from>. - * (<from> = TRUE means we are inserting into the table.) + * (<from> = TRUE means we are inserting into the table.) In the "TO" case + * we also support copying the output of an arbitrary SELECT query. * * If <pipe> is false, transfer is between the table and the file named * <filename>. Otherwise, transfer is between the table and our regular @@ -697,8 +718,6 @@ uint64 DoCopy(const CopyStmt *stmt) { CopyState cstate; - RangeVar *relation = stmt->relation; - char *filename = stmt->filename; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); List *attnamelist = stmt->attlist; @@ -707,6 +726,8 @@ DoCopy(const CopyStmt *stmt) AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); AclResult aclresult; ListCell *option; + TupleDesc tupDesc; + int num_phys_attrs; uint64 processed; /* Allocate workspace and zero all fields */ @@ -920,23 +941,7 @@ DoCopy(const CopyStmt *stmt) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); - /* Open and lock the relation, using the appropriate lock type. */ - cstate->rel = heap_openrv(relation, - (is_from ? RowExclusiveLock : AccessShareLock)); - - /* check read-only transaction */ - if (XactReadOnly && is_from && - !isTempNamespace(RelationGetNamespace(cstate->rel))) - ereport(ERROR, - (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), - errmsg("transaction is read-only"))); - - /* Check permissions. */ - aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel), GetUserId(), - required_access); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_CLASS, - RelationGetRelationName(cstate->rel)); + /* Disallow file COPY except to superusers. */ if (!pipe && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -944,26 +949,137 @@ DoCopy(const CopyStmt *stmt) errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); - /* Don't allow COPY w/ OIDs to or from a table without them */ - if (cstate->oids && !cstate->rel->rd_rel->relhasoids) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("table \"%s\" does not have OIDs", - RelationGetRelationName(cstate->rel)))); + if (stmt->relation) + { + Assert(!stmt->query); + cstate->queryDesc = NULL; + + /* Open and lock the relation, using the appropriate lock type. */ + cstate->rel = heap_openrv(stmt->relation, + (is_from ? RowExclusiveLock : AccessShareLock)); + + /* Check relation permissions. */ + aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel), + GetUserId(), + required_access); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, + RelationGetRelationName(cstate->rel)); + + /* check read-only transaction */ + if (XactReadOnly && is_from && + !isTempNamespace(RelationGetNamespace(cstate->rel))) + ereport(ERROR, + (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("transaction is read-only"))); + + /* Don't allow COPY w/ OIDs to or from a table without them */ + if (cstate->oids && !cstate->rel->rd_rel->relhasoids) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("table \"%s\" does not have OIDs", + RelationGetRelationName(cstate->rel)))); + + tupDesc = RelationGetDescr(cstate->rel); + } + else + { + Query *query = stmt->query; + List *rewritten; + Plan *plan; + DestReceiver *dest; + + Assert(query); + Assert(!is_from); + cstate->rel = NULL; + + /* Don't allow COPY w/ OIDs from a select */ + if (cstate->oids) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY (SELECT) WITH OIDS is not supported"))); + + if (query->into) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY (SELECT INTO) is not supported"))); + + /* + * The query has already been through parse analysis, but not + * rewriting or planning. Do that now. + * + * Because the planner is not cool about not scribbling on its input, + * we make a preliminary copy of the source querytree. This prevents + * problems in the case that the COPY is in a portal or plpgsql + * function and is executed repeatedly. (See also the same hack in + * EXPLAIN, DECLARE CURSOR and PREPARE.) XXX the planner really + * shouldn't modify its input ... FIXME someday. + */ + query = copyObject(query); + Assert(query->commandType == CMD_SELECT); + + /* + * Must acquire locks in case we didn't come fresh from the parser. + * XXX this also scribbles on query, another reason for copyObject + */ + AcquireRewriteLocks(query); + + /* Rewrite through rule system */ + rewritten = QueryRewrite(query); + + /* We don't expect more or less than one result query */ + if (list_length(rewritten) != 1) + elog(ERROR, "unexpected rewrite result"); + + query = (Query *) linitial(rewritten); + Assert(query->commandType == CMD_SELECT); + + /* plan the query */ + plan = planner(query, false, 0, NULL); + + /* + * Update snapshot command ID to ensure this query sees results of any + * previously executed queries. (It's a bit cheesy to modify + * ActiveSnapshot without making a copy, but for the limited ways in + * which COPY can be invoked, I think it's OK, because the active + * snapshot shouldn't be shared with anything else anyway.) + */ + ActiveSnapshot->curcid = GetCurrentCommandId(); + + /* Create dest receiver for COPY OUT */ + dest = CreateDestReceiver(DestCopyOut, NULL); + ((DR_copy *) dest)->cstate = cstate; + + /* Create a QueryDesc requesting no output */ + cstate->queryDesc = CreateQueryDesc(query, plan, + ActiveSnapshot, InvalidSnapshot, + dest, NULL, false); + + /* + * Call ExecutorStart to prepare the plan for execution. + * + * ExecutorStart computes a result tupdesc for us + */ + ExecutorStart(cstate->queryDesc, 0); + + tupDesc = cstate->queryDesc->tupDesc; + } /* Generate or convert list of attributes to process */ - cstate->attnumlist = CopyGetAttnums(cstate->rel, attnamelist); + cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + + num_phys_attrs = tupDesc->natts; - /* Convert FORCE QUOTE name list to column numbers, check validity */ + /* Convert FORCE QUOTE name list to per-column flags, check validity */ + cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (force_quote) { - TupleDesc tupDesc = RelationGetDescr(cstate->rel); - Form_pg_attribute *attr = tupDesc->attrs; + List *attnums; ListCell *cur; - cstate->force_quote_atts = CopyGetAttnums(cstate->rel, force_quote); + attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote); - foreach(cur, cstate->force_quote_atts) + foreach(cur, attnums) { int attnum = lfirst_int(cur); @@ -971,21 +1087,21 @@ DoCopy(const CopyStmt *stmt) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE QUOTE column \"%s\" not referenced by COPY", - NameStr(attr[attnum - 1]->attname)))); + NameStr(tupDesc->attrs[attnum - 1]->attname)))); + cstate->force_quote_flags[attnum - 1] = true; } } - /* Convert FORCE NOT NULL name list to column numbers, check validity */ + /* Convert FORCE NOT NULL name list to per-column flags, check validity */ + cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (force_notnull) { - TupleDesc tupDesc = RelationGetDescr(cstate->rel); - Form_pg_attribute *attr = tupDesc->attrs; + List *attnums; ListCell *cur; - cstate->force_notnull_atts = CopyGetAttnums(cstate->rel, - force_notnull); + attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull); - foreach(cur, cstate->force_notnull_atts) + foreach(cur, attnums) { int attnum = lfirst_int(cur); @@ -993,7 +1109,8 @@ DoCopy(const CopyStmt *stmt) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY", - NameStr(attr[attnum - 1]->attname)))); + NameStr(tupDesc->attrs[attnum - 1]->attname)))); + cstate->force_notnull_flags[attnum - 1] = true; } } @@ -1018,67 +1135,59 @@ DoCopy(const CopyStmt *stmt) cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ + cstate->filename = stmt->filename; - if (is_from) - { /* copy from file to database */ - if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) - { - if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to view \"%s\"", - RelationGetRelationName(cstate->rel)))); - else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to sequence \"%s\"", - RelationGetRelationName(cstate->rel)))); - else - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot copy to non-table relation \"%s\"", - RelationGetRelationName(cstate->rel)))); - } - if (pipe) - { - if (whereToSendOutput == DestRemote) - ReceiveCopyBegin(cstate); - else - cstate->copy_file = stdin; - } - else - { - struct stat st; + if (is_from) /* copy from file to database */ + CopyFrom(cstate); + else /* copy from database to file */ + DoCopyTo(cstate); - cstate->copy_file = AllocateFile(filename, PG_BINARY_R); + /* + * Close the relation or query. If reading, we can release the + * AccessShareLock we got; if writing, we should hold the lock until end + * of transaction to ensure that updates will be committed before lock is + * released. + */ + if (cstate->rel) + heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); + else + { + /* Close down the query and free resources. */ + ExecutorEnd(cstate->queryDesc); + FreeQueryDesc(cstate->queryDesc); + } - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - filename))); + /* Clean up storage (probably not really necessary) */ + processed = cstate->processed; - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - { - FreeFile(cstate->copy_file); - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", filename))); - } - } + pfree(cstate->attribute_buf.data); + pfree(cstate->line_buf.data); + pfree(cstate->raw_buf); + pfree(cstate); - CopyFrom(cstate); - } - else - { /* copy from database to file */ + return processed; +} + + +/* + * This intermediate routine exists mainly to localize the effects of setjmp + * so we don't need to plaster a lot of variables with "volatile". + */ +static void +DoCopyTo(CopyState cstate) +{ + bool pipe = (cstate->filename == NULL); + + if (cstate->rel) + { if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from view \"%s\"", - RelationGetRelationName(cstate->rel)))); + RelationGetRelationName(cstate->rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -1090,86 +1199,49 @@ DoCopy(const CopyStmt *stmt) errmsg("cannot copy from non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } - if (pipe) - { - if (whereToSendOutput == DestRemote) - cstate->fe_copy = true; - else - cstate->copy_file = stdout; - } - else - { - mode_t oumask; /* Pre-existing umask value */ - struct stat st; - - /* - * Prevent write to relative path ... too easy to shoot oneself in - * the foot by overwriting a database file ... - */ - if (!is_absolute_path(filename)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("relative path not allowed for COPY to file"))); - - oumask = umask((mode_t) 022); - cstate->copy_file = AllocateFile(filename, PG_BINARY_W); - umask(oumask); - - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - filename))); - - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - { - FreeFile(cstate->copy_file); - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", filename))); - } - } - - DoCopyTo(cstate); } - if (!pipe) + if (pipe) { - /* we assume only the write case could fail here */ - if (FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to file \"%s\": %m", - filename))); + if (whereToSendOutput == DestRemote) + cstate->fe_copy = true; + else + cstate->copy_file = stdout; } + else + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; - /* - * Close the relation. If reading, we can release the AccessShareLock we - * got; if writing, we should hold the lock until end of transaction to - * ensure that updates will be committed before lock is released. - */ - heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); - - /* Clean up storage (probably not really necessary) */ - processed = cstate->processed; + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(cstate->filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for COPY to file"))); - pfree(cstate->attribute_buf.data); - pfree(cstate->line_buf.data); - pfree(cstate->raw_buf); - pfree(cstate); + oumask = umask((mode_t) 022); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + umask(oumask); - return processed; -} + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->filename))); + fstat(fileno(cstate->copy_file), &st); + if (S_ISDIR(st.st_mode)) + { + FreeFile(cstate->copy_file); + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } + } -/* - * This intermediate routine just exists to localize the effects of setjmp - * so we don't need to plaster a lot of variables with "volatile". - */ -static void -DoCopyTo(CopyState cstate) -{ PG_TRY(); { if (cstate->fe_copy) @@ -1191,40 +1263,41 @@ DoCopyTo(CopyState cstate) PG_RE_THROW(); } PG_END_TRY(); + + if (!pipe) + { + if (FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + cstate->filename))); + } } /* - * Copy from relation TO file. + * Copy from relation or query TO file. */ static void CopyTo(CopyState cstate) { - HeapTuple tuple; TupleDesc tupDesc; - HeapScanDesc scandesc; int num_phys_attrs; - int attr_count; Form_pg_attribute *attr; - FmgrInfo *out_functions; - bool *force_quote; - char *string; - char *null_print_client; ListCell *cur; - MemoryContext oldcontext; - MemoryContext mycontext; - tupDesc = cstate->rel->rd_att; + if (cstate->rel) + tupDesc = RelationGetDescr(cstate->rel); + else + tupDesc = cstate->queryDesc->tupDesc; attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - null_print_client = cstate->null_print; /* default */ + cstate->null_print_client = cstate->null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); /* Get info about the columns we need to process. */ - out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - force_quote = (bool *) palloc(num_phys_attrs * sizeof(bool)); + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); @@ -1239,12 +1312,7 @@ CopyTo(CopyState cstate) getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena); - fmgr_info(out_func_oid, &out_functions[attnum - 1]); - - if (list_member_int(cstate->force_quote_atts, attnum)) - force_quote[attnum - 1] = true; - else - force_quote[attnum - 1] = false; + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); } /* @@ -1253,11 +1321,11 @@ CopyTo(CopyState cstate) * datatype output routines, and should be faster than retail pfree's * anyway. (We don't need a whole econtext as CopyFrom does.) */ - mycontext = AllocSetContextCreate(CurrentMemoryContext, - "COPY TO", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY TO", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); if (cstate->binary) { @@ -1282,7 +1350,7 @@ CopyTo(CopyState cstate) * encoding, because it will be sent directly with CopySendString. */ if (cstate->need_transcoding) - null_print_client = pg_server_to_client(cstate->null_print, + cstate->null_print_client = pg_server_to_client(cstate->null_print, cstate->null_print_len); /* if a header has been requested send the line */ @@ -1309,113 +1377,139 @@ CopyTo(CopyState cstate) } } - scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL); - - while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) + if (cstate->rel) { - bool need_delim = false; + Datum *values; + bool *nulls; + HeapScanDesc scandesc; + HeapTuple tuple; - CHECK_FOR_INTERRUPTS(); + values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); + nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); - MemoryContextReset(mycontext); - oldcontext = MemoryContextSwitchTo(mycontext); + scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL); - if (cstate->binary) + while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { - /* Binary per-tuple header */ - CopySendInt16(cstate, attr_count); - /* Send OID if wanted --- note attr_count doesn't include it */ - if (cstate->oids) - { - Oid oid = HeapTupleGetOid(tuple); + CHECK_FOR_INTERRUPTS(); - /* Hack --- assume Oid is same size as int32 */ - CopySendInt32(cstate, sizeof(int32)); - CopySendInt32(cstate, oid); - } + /* Deconstruct the tuple ... faster than repeated heap_getattr */ + heap_deform_tuple(tuple, tupDesc, values, nulls); + + /* Format and send the data */ + CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls); } - else + + heap_endscan(scandesc); + } + else + { + /* run the plan --- the dest receiver will send tuples */ + ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + } + + if (cstate->binary) + { + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); + } + + MemoryContextDelete(cstate->rowcontext); +} + +/* + * Emit one row during CopyTo(). + */ +static void +CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + MemoryContext oldcontext; + ListCell *cur; + char *string; + + MemoryContextReset(cstate->rowcontext); + oldcontext = MemoryContextSwitchTo(cstate->rowcontext); + + if (cstate->binary) + { + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + /* Send OID if wanted --- note attnumlist doesn't include it */ + if (cstate->oids) { - /* Text format has no per-tuple header, but send OID if wanted */ - /* Assume digits don't need any quoting or encoding conversion */ - if (cstate->oids) - { - string = DatumGetCString(DirectFunctionCall1(oidout, - ObjectIdGetDatum(HeapTupleGetOid(tuple)))); - CopySendString(cstate, string); - need_delim = true; - } + /* Hack --- assume Oid is same size as int32 */ + CopySendInt32(cstate, sizeof(int32)); + CopySendInt32(cstate, tupleOid); } - - foreach(cur, cstate->attnumlist) + } + else + { + /* Text format has no per-tuple header, but send OID if wanted */ + /* Assume digits don't need any quoting or encoding conversion */ + if (cstate->oids) { - int attnum = lfirst_int(cur); - Datum value; - bool isnull; + string = DatumGetCString(DirectFunctionCall1(oidout, + ObjectIdGetDatum(tupleOid))); + CopySendString(cstate, string); + need_delim = true; + } + } + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = values[attnum - 1]; + bool isnull = nulls[attnum - 1]; - value = heap_getattr(tuple, attnum, tupDesc, &isnull); + if (!cstate->binary) + { + if (need_delim) + CopySendChar(cstate, cstate->delim[0]); + need_delim = true; + } + if (isnull) + { + if (!cstate->binary) + CopySendString(cstate, cstate->null_print_client); + else + CopySendInt32(cstate, -1); + } + else + { if (!cstate->binary) { - if (need_delim) - CopySendChar(cstate, cstate->delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->binary) - CopySendString(cstate, null_print_client); + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + if (cstate->csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); else - CopySendInt32(cstate, -1); + CopyAttributeOutText(cstate, string); } else { - if (!cstate->binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->csv_mode) - CopyAttributeOutCSV(cstate, string, - force_quote[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; + bytea *outputbytes; - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); } } - - CopySendEndOfRow(cstate); - - MemoryContextSwitchTo(oldcontext); - - cstate->processed++; - } - - heap_endscan(scandesc); - - if (cstate->binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); } - MemoryContextDelete(mycontext); + CopySendEndOfRow(cstate); - pfree(out_functions); - pfree(force_quote); + MemoryContextSwitchTo(oldcontext); + + cstate->processed++; } @@ -1528,6 +1622,7 @@ limit_printout_length(const char *str) static void CopyFrom(CopyState cstate) { + bool pipe = (cstate->filename == NULL); HeapTuple tuple; TupleDesc tupDesc; Form_pg_attribute *attr; @@ -1538,7 +1633,6 @@ CopyFrom(CopyState cstate) FmgrInfo oid_in_function; Oid *typioparams; Oid oid_typioparam; - bool *force_notnull; int attnum; int i; Oid in_func_oid; @@ -1558,6 +1652,56 @@ CopyFrom(CopyState cstate) MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; + Assert(cstate->rel); + + if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) + { + if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to view \"%s\"", + RelationGetRelationName(cstate->rel)))); + else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to sequence \"%s\"", + RelationGetRelationName(cstate->rel)))); + else + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to non-table relation \"%s\"", + RelationGetRelationName(cstate->rel)))); + } + + if (pipe) + { + if (whereToSendOutput == DestRemote) + ReceiveCopyBegin(cstate); + else + cstate->copy_file = stdin; + } + else + { + struct stat st; + + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename))); + + fstat(fileno(cstate->copy_file), &st); + if (S_ISDIR(st.st_mode)) + { + FreeFile(cstate->copy_file); + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } + } + tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; @@ -1599,7 +1743,6 @@ CopyFrom(CopyState cstate) typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); - force_notnull = (bool *) palloc(num_phys_attrs * sizeof(bool)); for (attnum = 1; attnum <= num_phys_attrs; attnum++) { @@ -1616,11 +1759,6 @@ CopyFrom(CopyState cstate) &in_func_oid, &typioparams[attnum - 1]); fmgr_info(in_func_oid, &in_functions[attnum - 1]); - if (list_member_int(cstate->force_notnull_atts, attnum)) - force_notnull[attnum - 1] = true; - else - force_notnull[attnum - 1] = false; - /* Get default info if needed */ if (!list_member_int(cstate->attnumlist, attnum)) { @@ -1810,7 +1948,8 @@ CopyFrom(CopyState cstate) NameStr(attr[m]->attname)))); string = field_strings[fieldno++]; - if (cstate->csv_mode && string == NULL && force_notnull[m]) + if (cstate->csv_mode && string == NULL && + cstate->force_notnull_flags[m]) { /* Go ahead and read the NULL string */ string = cstate->null_print; @@ -1972,13 +2111,21 @@ CopyFrom(CopyState cstate) pfree(typioparams); pfree(defmap); pfree(defexprs); - pfree(force_notnull); ExecDropSingleTupleTableSlot(slot); ExecCloseIndices(resultRelInfo); FreeExecutorState(estate); + + if (!pipe) + { + if (FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from file \"%s\": %m", + cstate->filename))); + } } @@ -3055,16 +3202,17 @@ CopyAttributeOutCSV(CopyState cstate, char *string, * The input attnamelist is either the user-specified column list, * or NIL if there was none (in which case we want all the non-dropped * columns). + * + * rel can be NULL ... it's only used for error reports. */ static List * -CopyGetAttnums(Relation rel, List *attnamelist) +CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) { List *attnums = NIL; if (attnamelist == NIL) { /* Generate default column list */ - TupleDesc tupDesc = RelationGetDescr(rel); Form_pg_attribute *attr = tupDesc->attrs; int attr_count = tupDesc->natts; int i; @@ -3085,15 +3233,33 @@ CopyGetAttnums(Relation rel, List *attnamelist) { char *name = strVal(lfirst(l)); int attnum; + int i; /* Lookup column name */ - /* Note we disallow system columns here */ - attnum = attnameAttNum(rel, name, false); + attnum = InvalidAttrNumber; + for (i = 0; i < tupDesc->natts; i++) + { + if (tupDesc->attrs[i]->attisdropped) + continue; + if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0) + { + attnum = tupDesc->attrs[i]->attnum; + break; + } + } if (attnum == InvalidAttrNumber) - ereport(ERROR, + { + if (rel != NULL) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", - name, RelationGetRelationName(rel)))); + name, RelationGetRelationName(rel)))); + else + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" does not exist", + name))); + } /* Check for duplicates */ if (list_member_int(attnums, attnum)) ereport(ERROR, @@ -3106,3 +3272,66 @@ CopyGetAttnums(Relation rel, List *attnamelist) return attnums; } + + +/* + * copy_dest_startup --- executor startup + */ +static void +copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* no-op */ +} + +/* + * copy_dest_receive --- receive one tuple + */ +static void +copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) +{ + DR_copy *myState = (DR_copy *) self; + CopyState cstate = myState->cstate; + + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + /* And send the data */ + CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); +} + +/* + * copy_dest_shutdown --- executor end + */ +static void +copy_dest_shutdown(DestReceiver *self) +{ + /* no-op */ +} + +/* + * copy_dest_destroy --- release DestReceiver object + */ +static void +copy_dest_destroy(DestReceiver *self) +{ + pfree(self); +} + +/* + * CreateCopyDestReceiver -- create a suitable DestReceiver object + */ +DestReceiver * +CreateCopyDestReceiver(void) +{ + DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); + + self->pub.receiveSlot = copy_dest_receive; + self->pub.rStartup = copy_dest_startup; + self->pub.rShutdown = copy_dest_shutdown; + self->pub.rDestroy = copy_dest_destroy; + self->pub.mydest = DestCopyOut; + + self->cstate = NULL; /* will be set later */ + + return (DestReceiver *) self; +} |