diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index ba89b292d1e..b0fd09f458a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -60,7 +60,8 @@ typedef enum CopyDest { COPY_FILE, /* to/from file (or a piped program) */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE /* to/from frontend (3.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_CALLBACK /* to/from callback function */ } CopyDest; /* @@ -109,6 +110,7 @@ typedef struct CopyStateData List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data*/ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool freeze; /* freeze rows on loading? */ @@ -299,7 +301,6 @@ static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); -static uint64 CopyFrom(CopyState cstate); static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, @@ -529,6 +530,9 @@ CopySendEndOfRow(CopyState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + Assert(false); /* Not yet supported. */ + break; } resetStringInfo(fe_msgbuf); @@ -643,6 +647,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; + case COPY_CALLBACK: + bytesread = cstate->data_source_cb(databuf, minread, maxread); + break; } return bytesread; @@ -969,7 +976,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, - stmt->attlist, stmt->options); + NULL, stmt->attlist, stmt->options); cstate->range_table = range_table; *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); @@ -2286,7 +2293,7 @@ limit_printout_length(const char *str) /* * Copy FROM file to relation. */ -static uint64 +uint64 CopyFrom(CopyState cstate) { HeapTuple tuple; @@ -2878,6 +2885,7 @@ BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, + copy_data_source_cb data_source_cb, List *attnamelist, List *options) { @@ -2992,7 +3000,12 @@ BeginCopyFrom(ParseState *pstate, cstate->num_defaults = num_defaults; cstate->is_program = is_program; - if (pipe) + if (data_source_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_source_cb = data_source_cb; + } + else if (pipe) { Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) |