aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r--src/backend/commands/copy.c23
1 files changed, 18 insertions, 5 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index ba89b292d1e..b0fd09f458a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -60,7 +60,8 @@ typedef enum CopyDest
{
COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_CALLBACK /* to/from callback function */
} CopyDest;
/*
@@ -109,6 +110,7 @@ typedef struct CopyStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_source_cb data_source_cb; /* function for reading data*/
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
@@ -299,7 +301,6 @@ static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int hi_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
@@ -529,6 +530,9 @@ CopySendEndOfRow(CopyState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ Assert(false); /* Not yet supported. */
+ break;
}
resetStringInfo(fe_msgbuf);
@@ -643,6 +647,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
bytesread += avail;
}
break;
+ case COPY_CALLBACK:
+ bytesread = cstate->data_source_cb(databuf, minread, maxread);
+ break;
}
return bytesread;
@@ -969,7 +976,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
cstate->range_table = range_table;
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
@@ -2286,7 +2293,7 @@ limit_printout_length(const char *str)
/*
* Copy FROM file to relation.
*/
-static uint64
+uint64
CopyFrom(CopyState cstate)
{
HeapTuple tuple;
@@ -2878,6 +2885,7 @@ BeginCopyFrom(ParseState *pstate,
Relation rel,
const char *filename,
bool is_program,
+ copy_data_source_cb data_source_cb,
List *attnamelist,
List *options)
{
@@ -2992,7 +3000,12 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
- if (pipe)
+ if (data_source_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_source_cb = data_source_cb;
+ }
+ else if (pipe)
{
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)