aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/copyfrom.c
diff options
context:
space:
mode:
authorMichael Paquier <michael@paquier.xyz>2021-03-09 14:21:03 +0900
committerMichael Paquier <michael@paquier.xyz>2021-03-09 14:21:03 +0900
commit9d2d45700928d49212fb7ed140feeaebe3a6014f (patch)
tree88c79348c8b63167cae1cf4636185cc48e24785b /src/backend/commands/copyfrom.c
parentf9264d1524baa19e4a0528f033681ef16f61b137 (diff)
downloadpostgresql-9d2d45700928d49212fb7ed140feeaebe3a6014f.tar.gz
postgresql-9d2d45700928d49212fb7ed140feeaebe3a6014f.zip
Add support for more progress reporting in COPY
The command (TO or FROM), its type (file, pipe, program or callback), and the number of tuples excluded by a WHERE clause in COPY FROM are added to the progress reporting already available. The column "lines_processed" is renamed to "tuples_processed" to disambiguate the meaning of this column in the cases of CSV and BINARY COPY and to be more consistent with the other catalog progress views. Bump catalog version, again. Author: Matthias van de Meent Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef Šimánek, Tomas Vondra Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com
Diffstat (limited to 'src/backend/commands/copyfrom.c')
-rw-r--r--src/backend/commands/copyfrom.c34
1 files changed, 30 insertions, 4 deletions
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f05e2d23476..2ed696d429b 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
- uint64 processed = 0;
+ int64 processed = 0;
+ int64 excluded = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
@@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
+ {
+ /*
+ * Report that this tuple was filtered out by the WHERE
+ * clause.
+ */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
+ ++excluded);
continue;
+ }
}
/* Determine the partition to insert the tuple into */
@@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
- * for counting tuples inserted by an INSERT command. Update
+ * for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/
- pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++processed);
}
}
@@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
+ const int progress_cols[] = {
+ PROGRESS_COPY_COMMAND,
+ PROGRESS_COPY_TYPE,
+ PROGRESS_COPY_BYTES_TOTAL
+ };
+ int64 progress_vals[] = {
+ PROGRESS_COPY_COMMAND_FROM,
+ 0,
+ 0
+ };
/* Allocate workspace and zero all fields */
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
@@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,
if (data_source_cb)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
cstate->copy_src = COPY_CALLBACK;
cstate->data_source_cb = data_source_cb;
}
else if (pipe)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
@@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,
if (cstate->is_program)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
@@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
{
struct stat st;
+ progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
{
@@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
- pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
+ progress_vals[2] = st.st_size;
}
}
+ pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
+
if (cstate->opts.binary)
{
/* Read and verify binary header */