diff options
-rw-r--r-- | doc/src/sgml/monitoring.sgml | 107 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 11 | ||||
-rw-r--r-- | src/backend/commands/copyfrom.c | 17 | ||||
-rw-r--r-- | src/backend/commands/copyfromparse.c | 4 | ||||
-rw-r--r-- | src/backend/commands/copyto.c | 22 | ||||
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 2 | ||||
-rw-r--r-- | src/include/commands/copyfrom_internal.h | 1 | ||||
-rw-r--r-- | src/include/commands/progress.h | 5 | ||||
-rw-r--r-- | src/include/pgstat.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 9 |
10 files changed, 176 insertions, 5 deletions
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3d6c9013067..43fe8ae383e 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry> + <entry>One row for each backend running <command>COPY</command>, showing current progress. + See <xref linkend='copy-progress-reporting'/>. + </entry> + </row> </tbody> </tgroup> </table> @@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, which support progress reporting are <command>ANALYZE</command>, <command>CLUSTER</command>, <command>CREATE INDEX</command>, <command>VACUUM</command>, + <command>COPY</command>, and <xref linkend="protocol-replication-base-backup"/> (i.e., replication command that <xref linkend="app-pgbasebackup"/> issues to take a base backup). @@ -6396,6 +6403,106 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, </table> </sect2> + + <sect2 id="copy-progress-reporting"> + <title>COPY Progress Reporting</title> + + <indexterm> + <primary>pg_stat_progress_copy</primary> + </indexterm> + + <para> + Whenever <command>COPY</command> is running, the + <structname>pg_stat_progress_copy</structname> view will contain one row + for each backend that is currently running <command>COPY</command> command. + The table bellow describes the information that will be reported and provide + information how to interpret it. + </para> + + <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy"> + <title><structname>pg_stat_progress_copy</structname> View</title> + <tgroup cols="1"> + <thead> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + Column Type + </para> + <para> + Description + </para></entry> + </row> + </thead> + + <tbody> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>pid</structfield> <type>integer</type> + </para> + <para> + Process ID of backend. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>datid</structfield> <type>text</type> + </para> + <para> + OID of the database to which this backend is connected. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>datname</structfield> <type>name</type> + </para> + <para> + Name of the database to which this backend is connected. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>relid</structfield> <type>oid</type> + </para> + <para> + OID of the table on which the <command>COPY</command> command is executed. + It is set to 0 if <command>SELECT</command> query is provided. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>bytes_processed</structfield> <type>bigint</type> + </para> + <para> + Number of bytes already processed by <command>COPY</command> command. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>bytes_total</structfield> <type>bigint</type> + </para> + <para> + Size of source file for <command>COPY FROM</command> command in bytes. + It is set to 0 if not available. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>lines_processed</structfield> <type>bigint</type> + </para> + <para> + Number of lines already processed by <command>COPY</command> command. + </para></entry> + </row> + </tbody> + </tgroup> + </table> + </sect2> + </sect1> <sect1 id="dynamic-trace"> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ab4603c69b8..5d89e77dbe2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS S.param5 AS tablespaces_streamed FROM pg_stat_get_progress_info('BASEBACKUP') AS S; + +CREATE VIEW pg_stat_progress_copy AS + SELECT + S.pid AS pid, S.datid AS datid, D.datname AS datname, + S.relid AS relid, + S.param1 AS bytes_processed, + S.param2 AS bytes_total, + S.param3 AS lines_processed + FROM pg_stat_get_progress_info('COPY') AS S + LEFT JOIN pg_database D ON S.datid = D.oid; + CREATE VIEW pg_user_mappings AS SELECT U.oid AS umid, diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 84a5045215b..08b6f782c73 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -25,6 +25,7 @@ #include "access/xlog.h" #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -35,6 +36,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate) /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. + * for counting tuples inserted by an INSERT command. Update + * progress of the COPY command as well. */ - processed++; + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } } @@ -1415,6 +1418,12 @@ BeginCopyFrom(ParseState *pstate, } } + + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, + cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + /* We keep those variables in cstate. */ cstate->in_functions = in_functions; cstate->typioparams = typioparams; @@ -1479,6 +1488,8 @@ BeginCopyFrom(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); + + pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); } } @@ -1522,6 +1533,8 @@ EndCopyFrom(CopyFromState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 4360b7788ea..4c74067f849 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -20,11 +20,13 @@ #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "executor/executor.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate) cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; + cstate->bytes_processed += nbytes; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); return (inbytes > 0); } diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 51597ae523d..e04ec1e331b 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -24,6 +24,7 @@ #include "access/xact.h" #include "access/xlog.h" #include "commands/copy.h" +#include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" #include "executor/tuptable.h" @@ -32,6 +33,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -95,6 +97,7 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ } CopyToStateData; @@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate) break; } + /* Update the progress */ + cstate->bytes_processed += fe_msgbuf->len; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); + resetStringInfo(fe_msgbuf); } @@ -363,6 +370,8 @@ EndCopy(CopyToState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } @@ -760,6 +769,11 @@ BeginCopyTo(ParseState *pstate, } } + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, + cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + MemoryContextSwitchTo(oldcontext); return cstate; @@ -938,7 +952,9 @@ CopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1303,7 +1319,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - myState->processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); return true; } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index c9a1d4c56d9..5c12a165a15 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) cmdtype = PROGRESS_COMMAND_CREATE_INDEX; else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0) cmdtype = PROGRESS_COMMAND_BASEBACKUP; + else if (pg_strcasecmp(cmd, "COPY") == 0) + cmdtype = PROGRESS_COMMAND_COPY; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 5401a966d27..e37942df391 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -154,6 +154,7 @@ typedef struct CopyFromStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + uint64 bytes_processed;/* number of bytes processed so far */ /* Shorthand for number of unconsumed bytes available in raw_buf */ #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyFromStateData; diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 49a158aabbf..95ec5d02e9c 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,4 +133,9 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 +/* Commands of PROGRESS_COPY */ +#define PROGRESS_COPY_BYTES_PROCESSED 0 +#define PROGRESS_COPY_BYTES_TOTAL 1 +#define PROGRESS_COPY_LINES_PROCESSED 2 + #endif diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 3a7e1997506..c38b6897101 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CREATE_INDEX, - PROGRESS_COMMAND_BASEBACKUP + PROGRESS_COMMAND_BASEBACKUP, + PROGRESS_COMMAND_COPY } ProgressCommandType; #define PGSTAT_NUM_PROGRESS_PARAM 20 diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6293ab57bcf..a687e99d1e4 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid, s.param8 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); +pg_stat_progress_copy| SELECT s.pid, + s.datid, + d.datname, + s.relid, + s.param1 AS bytes_processed, + s.param2 AS bytes_total, + s.param3 AS lines_processed + FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, s.datid, d.datname, |