diff options
Diffstat (limited to 'src/bin/psql/common.c')
-rw-r--r-- | src/bin/psql/common.c | 624 |
1 files changed, 346 insertions, 278 deletions
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 7a95465111a..028a357991f 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -33,6 +33,7 @@ static bool DescribeQuery(const char *query, double *elapsed_msec); static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec); static bool command_no_begin(const char *query); static bool is_select_command(const char *query); +static int SendQueryAndProcessResults(const char *query, double *pelapsed_msec, bool is_watch); /* @@ -353,7 +354,7 @@ CheckConnection(void) * Returns true for valid result, false for error state. */ static bool -AcceptResult(const PGresult *result) +AcceptResult(const PGresult *result, bool show_error) { bool OK; @@ -384,7 +385,7 @@ AcceptResult(const PGresult *result) break; } - if (!OK) + if (!OK && show_error) { const char *error = PQerrorMessage(pset.db); @@ -472,6 +473,18 @@ ClearOrSaveResult(PGresult *result) } } +/* + * Consume all results + */ +static void +ClearOrSaveAllResults() +{ + PGresult *result; + + while ((result = PQgetResult(pset.db)) != NULL) + ClearOrSaveResult(result); +} + /* * Print microtiming output. Always print raw milliseconds; if the interval @@ -572,7 +585,7 @@ PSQLexec(const char *query) ResetCancelConn(); - if (!AcceptResult(res)) + if (!AcceptResult(res, true)) { ClearOrSaveResult(res); res = NULL; @@ -594,10 +607,8 @@ PSQLexec(const char *query) int PSQLexecWatch(const char *query, const printQueryOpt *opt) { - PGresult *res; double elapsed_msec = 0; - instr_time before; - instr_time after; + int res; if (!pset.db) { @@ -606,75 +617,16 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt) } SetCancelConn(pset.db); - - if (pset.timing) - INSTR_TIME_SET_CURRENT(before); - - res = PQexec(pset.db, query); - + res = SendQueryAndProcessResults(query, &elapsed_msec, true); ResetCancelConn(); - if (!AcceptResult(res)) - { - ClearOrSaveResult(res); - return 0; - } - - if (pset.timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - elapsed_msec = INSTR_TIME_GET_MILLISEC(after); - } - - /* - * If SIGINT is sent while the query is processing, the interrupt will be - * consumed. The user's intention, though, is to cancel the entire watch - * process, so detect a sent cancellation request and exit in this case. - */ - if (cancel_pressed) - { - PQclear(res); - return 0; - } - - switch (PQresultStatus(res)) - { - case PGRES_TUPLES_OK: - printQuery(res, opt, pset.queryFout, false, pset.logfile); - break; - - case PGRES_COMMAND_OK: - fprintf(pset.queryFout, "%s\n%s\n\n", opt->title, PQcmdStatus(res)); - break; - - case PGRES_EMPTY_QUERY: - pg_log_error("\\watch cannot be used with an empty query"); - PQclear(res); - return -1; - - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - case PGRES_COPY_BOTH: - pg_log_error("\\watch cannot be used with COPY"); - PQclear(res); - return -1; - - default: - pg_log_error("unexpected result status for \\watch"); - PQclear(res); - return -1; - } - - PQclear(res); - fflush(pset.queryFout); /* Possible microtiming output */ if (pset.timing) PrintTiming(elapsed_msec); - return 1; + return res; } @@ -887,197 +839,114 @@ loop_exit: /* - * ProcessResult: utility function for use by SendQuery() only - * - * When our command string contained a COPY FROM STDIN or COPY TO STDOUT, - * PQexec() has stopped at the PGresult associated with the first such - * command. In that event, we'll marshal data for the COPY and then cycle - * through any subsequent PGresult objects. + * Marshal the COPY data. Either subroutine will get the + * connection out of its COPY state, then call PQresultStatus() + * once and report any error. Return whether all was ok. * - * When the command string contained no such COPY command, this function - * degenerates to an AcceptResult() call. + * For COPY OUT, direct the output to pset.copyStream if it's set, + * otherwise to pset.gfname if it's set, otherwise to queryFout. + * For COPY IN, use pset.copyStream as data source if it's set, + * otherwise cur_cmd_source. * - * Changes its argument to point to the last PGresult of the command string, - * or NULL if that result was for a COPY TO STDOUT. (Returning NULL prevents - * the command status from being printed, which we want in that case so that - * the status line doesn't get taken as part of the COPY data.) - * - * Returns true on complete success, false otherwise. Possible failure modes - * include purely client-side problems; check the transaction status for the - * server-side opinion. + * Update result if further processing is necessary, or NULL otherwise. + * Return a result when queryFout can safely output a result status: + * on COPY IN, or on COPY OUT if written to something other than pset.queryFout. + * Returning NULL prevents the command status from being printed, which + * we want if the status line doesn't get taken as part of the COPY data. */ static bool -ProcessResult(PGresult **results) +HandleCopyResult(PGresult **result) { - bool success = true; - bool first_cycle = true; + bool success; + FILE *copystream; + PGresult *copy_result; + ExecStatusType result_status = PQresultStatus(*result); - for (;;) + Assert(result_status == PGRES_COPY_OUT || + result_status == PGRES_COPY_IN); + + SetCancelConn(pset.db); + + if (result_status == PGRES_COPY_OUT) { - ExecStatusType result_status; - bool is_copy; - PGresult *next_result; + bool need_close = false; + bool is_pipe = false; - if (!AcceptResult(*results)) + if (pset.copyStream) { - /* - * Failure at this point is always a server-side failure or a - * failure to submit the command string. Either way, we're - * finished with this command string. - */ - success = false; - break; + /* invoked by \copy */ + copystream = pset.copyStream; } - - result_status = PQresultStatus(*results); - switch (result_status) + else if (pset.gfname) { - case PGRES_EMPTY_QUERY: - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - is_copy = false; - break; + /* invoked by \g */ + if (openQueryOutputFile(pset.gfname, + ©stream, &is_pipe)) + { + need_close = true; + if (is_pipe) + disable_sigpipe_trap(); + } + else + copystream = NULL; /* discard COPY data entirely */ + } + else + { + /* fall back to the generic query output stream */ + copystream = pset.queryFout; + } - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - is_copy = true; - break; + success = handleCopyOut(pset.db, + copystream, + ©_result) + && (copystream != NULL); - default: - /* AcceptResult() should have caught anything else. */ - is_copy = false; - pg_log_error("unexpected PQresultStatus: %d", result_status); - break; + /* + * Suppress status printing if the report would go to the same + * place as the COPY data just went. Note this doesn't + * prevent error reporting, since handleCopyOut did that. + */ + if (copystream == pset.queryFout) + { + PQclear(copy_result); + copy_result = NULL; } - if (is_copy) + if (need_close) { - /* - * Marshal the COPY data. Either subroutine will get the - * connection out of its COPY state, then call PQresultStatus() - * once and report any error. - * - * For COPY OUT, direct the output to pset.copyStream if it's set, - * otherwise to pset.gfname if it's set, otherwise to queryFout. - * For COPY IN, use pset.copyStream as data source if it's set, - * otherwise cur_cmd_source. - */ - FILE *copystream; - PGresult *copy_result; - - SetCancelConn(pset.db); - if (result_status == PGRES_COPY_OUT) + /* close \g argument file/pipe */ + if (is_pipe) { - bool need_close = false; - bool is_pipe = false; - - if (pset.copyStream) - { - /* invoked by \copy */ - copystream = pset.copyStream; - } - else if (pset.gfname) - { - /* invoked by \g */ - if (openQueryOutputFile(pset.gfname, - ©stream, &is_pipe)) - { - need_close = true; - if (is_pipe) - disable_sigpipe_trap(); - } - else - copystream = NULL; /* discard COPY data entirely */ - } - else - { - /* fall back to the generic query output stream */ - copystream = pset.queryFout; - } - - success = handleCopyOut(pset.db, - copystream, - ©_result) - && success - && (copystream != NULL); - - /* - * Suppress status printing if the report would go to the same - * place as the COPY data just went. Note this doesn't - * prevent error reporting, since handleCopyOut did that. - */ - if (copystream == pset.queryFout) - { - PQclear(copy_result); - copy_result = NULL; - } - - if (need_close) - { - /* close \g argument file/pipe */ - if (is_pipe) - { - pclose(copystream); - restore_sigpipe_trap(); - } - else - { - fclose(copystream); - } - } + pclose(copystream); + restore_sigpipe_trap(); } else { - /* COPY IN */ - copystream = pset.copyStream ? pset.copyStream : pset.cur_cmd_source; - success = handleCopyIn(pset.db, - copystream, - PQbinaryTuples(*results), - ©_result) && success; + fclose(copystream); } - ResetCancelConn(); - - /* - * Replace the PGRES_COPY_OUT/IN result with COPY command's exit - * status, or with NULL if we want to suppress printing anything. - */ - PQclear(*results); - *results = copy_result; - } - else if (first_cycle) - { - /* fast path: no COPY commands; PQexec visited all results */ - break; } - - /* - * Check PQgetResult() again. In the typical case of a single-command - * string, it will return NULL. Otherwise, we'll have other results - * to process that may include other COPYs. We keep the last result. - */ - next_result = PQgetResult(pset.db); - if (!next_result) - break; - - PQclear(*results); - *results = next_result; - first_cycle = false; + } + else + { + /* COPY IN */ + copystream = pset.copyStream ? pset.copyStream : pset.cur_cmd_source; + success = handleCopyIn(pset.db, + copystream, + PQbinaryTuples(*result), + ©_result); } - SetResultVariables(*results, success); - - /* may need this to recover from conn loss during COPY */ - if (!first_cycle && !CheckConnection()) - return false; + ResetCancelConn(); + PQclear(*result); + *result = copy_result; return success; } - /* * PrintQueryStatus: report command status as required * - * Note: Utility function for use by PrintQueryResults() only. + * Note: Utility function for use by HandleQueryResult() only. */ static void PrintQueryStatus(PGresult *results) @@ -1105,43 +974,50 @@ PrintQueryStatus(PGresult *results) /* - * PrintQueryResults: print out (or store or execute) query results as required - * - * Note: Utility function for use by SendQuery() only. + * HandleQueryResult: print out, store or execute one query result + * as required. * * Returns true if the query executed successfully, false otherwise. */ static bool -PrintQueryResults(PGresult *results) +HandleQueryResult(PGresult *result, bool last) { bool success; const char *cmdstatus; - if (!results) + if (result == NULL) return false; - switch (PQresultStatus(results)) + switch (PQresultStatus(result)) { case PGRES_TUPLES_OK: /* store or execute or print the data ... */ - if (pset.gset_prefix) - success = StoreQueryTuple(results); - else if (pset.gexec_flag) - success = ExecQueryTuples(results); - else if (pset.crosstab_flag) - success = PrintResultsInCrosstab(results); + if (last && pset.gset_prefix) + success = StoreQueryTuple(result); + else if (last && pset.gexec_flag) + success = ExecQueryTuples(result); + else if (last && pset.crosstab_flag) + success = PrintResultsInCrosstab(result); + else if (last || pset.show_all_results) + success = PrintQueryTuples(result); else - success = PrintQueryTuples(results); + success = true; + /* if it's INSERT/UPDATE/DELETE RETURNING, also print status */ - cmdstatus = PQcmdStatus(results); - if (strncmp(cmdstatus, "INSERT", 6) == 0 || - strncmp(cmdstatus, "UPDATE", 6) == 0 || - strncmp(cmdstatus, "DELETE", 6) == 0) - PrintQueryStatus(results); + if (last || pset.show_all_results) + { + cmdstatus = PQcmdStatus(result); + if (strncmp(cmdstatus, "INSERT", 6) == 0 || + strncmp(cmdstatus, "UPDATE", 6) == 0 || + strncmp(cmdstatus, "DELETE", 6) == 0) + PrintQueryStatus(result); + } + break; case PGRES_COMMAND_OK: - PrintQueryStatus(results); + if (last || pset.show_all_results) + PrintQueryStatus(result); success = true; break; @@ -1151,7 +1027,7 @@ PrintQueryResults(PGresult *results) case PGRES_COPY_OUT: case PGRES_COPY_IN: - /* nothing to do here */ + /* nothing to do here: already processed */ success = true; break; @@ -1164,7 +1040,7 @@ PrintQueryResults(PGresult *results) default: success = false; pg_log_error("unexpected PQresultStatus: %d", - PQresultStatus(results)); + PQresultStatus(result)); break; } @@ -1173,6 +1049,217 @@ PrintQueryResults(PGresult *results) return success; } +/* + * Data structure and functions to record notices while they are + * emitted, so that they can be shown later. + * + * We need to know which result is last, which requires to extract + * one result in advance, hence two buffers are needed. + */ +typedef struct { + bool in_flip; + PQExpBufferData flip; + PQExpBufferData flop; +} t_notice_messages; + +/* + * Store notices in appropriate buffer, for later display. + */ +static void +AppendNoticeMessage(void *arg, const char *msg) +{ + t_notice_messages *notes = (t_notice_messages*) arg; + appendPQExpBufferStr(notes->in_flip ? ¬es->flip : ¬es->flop, msg); +} + +/* + * Show notices stored in buffer, which is then reset. + */ +static void +ShowNoticeMessage(t_notice_messages *notes) +{ + PQExpBufferData *current = notes->in_flip ? ¬es->flip : ¬es->flop; + if (current->data != NULL && *current->data != '\0') + pg_log_info("%s", current->data); + resetPQExpBuffer(current); +} + +/* + * SendQueryAndProcessResults: utility function for use by SendQuery() + * and PSQLexecWatch(). + * + * Sends query and cycles through PGresult objects. + * + * When not under \watch and if our command string contained a COPY FROM STDIN + * or COPY TO STDOUT, the PGresult associated with these commands must be + * processed by providing an input or output stream. In that event, we'll + * marshal data for the COPY. + * + * For other commands, the results are processed normally, depending on their + * status. + * + * Returns 1 on complete success, 0 on interrupt and -1 or errors. Possible + * failure modes include purely client-side problems; check the transaction + * status for the server-side opinion. + * + * Note that on a combined query, failure does not mean that nothing was + * committed. + */ +static int +SendQueryAndProcessResults(const char *query, double *pelapsed_msec, bool is_watch) +{ + bool success; + instr_time before; + PGresult *result; + t_notice_messages notes; + + if (pset.timing) + INSTR_TIME_SET_CURRENT(before); + + success = PQsendQuery(pset.db, query); + ResetCancelConn(); + + if (!success) + { + const char *error = PQerrorMessage(pset.db); + + if (strlen(error)) + pg_log_info("%s", error); + + CheckConnection(); + + return -1; + } + + /* + * If SIGINT is sent while the query is processing, the interrupt will be + * consumed. The user's intention, though, is to cancel the entire watch + * process, so detect a sent cancellation request and exit in this case. + */ + if (is_watch && cancel_pressed) + { + ClearOrSaveAllResults(); + return 0; + } + + /* intercept notices */ + notes.in_flip = true; + initPQExpBuffer(¬es.flip); + initPQExpBuffer(¬es.flop); + PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬es); + + /* first result */ + result = PQgetResult(pset.db); + + while (result != NULL) + { + ExecStatusType result_status; + PGresult *next_result; + bool last; + + if (!AcceptResult(result, false)) + { + /* + * Some error occured, either a server-side failure or + * a failure to submit the command string. Record that. + */ + const char *error = PQerrorMessage(pset.db); + + ShowNoticeMessage(¬es); + if (strlen(error)) + pg_log_info("%s", error); + CheckConnection(); + if (!is_watch) + SetResultVariables(result, false); + ClearOrSaveResult(result); + success = false; + + /* and switch to next result */ + result = PQgetResult(pset.db); + continue; + } + + /* must handle COPY before changing the current result */ + result_status = PQresultStatus(result); + Assert(result_status != PGRES_COPY_BOTH); + if (result_status == PGRES_COPY_IN || + result_status == PGRES_COPY_OUT) + { + ShowNoticeMessage(¬es); + + if (is_watch) + { + ClearOrSaveAllResults(); + pg_log_error("\\watch cannot be used with COPY"); + return -1; + } + + /* use normal notice processor during COPY */ + PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL); + + success &= HandleCopyResult(&result); + + PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬es); + } + + /* + * Check PQgetResult() again. In the typical case of a single-command + * string, it will return NULL. Otherwise, we'll have other results + * to process. + */ + notes.in_flip = !notes.in_flip; + next_result = PQgetResult(pset.db); + notes.in_flip = !notes.in_flip; + last = (next_result == NULL); + + /* + * Get timing measure before printing the last result. + * + * It will include the display of previous results, if any. + * This cannot be helped because the server goes on processing + * further queries anyway while the previous ones are being displayed. + * The parallel execution of the client display hides the server time + * when it is shorter. + * + * With combined queries, timing must be understood as an upper bound + * of the time spent processing them. + */ + if (last && pset.timing) + { + instr_time now; + INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SUBTRACT(now, before); + *pelapsed_msec = INSTR_TIME_GET_MILLISEC(now); + } + + /* notices already shown above for copy */ + ShowNoticeMessage(¬es); + + /* this may or may not print something depending on settings */ + if (result != NULL) + success &= HandleQueryResult(result, last); + + /* set variables on last result if all went well */ + if (!is_watch && last && success) + SetResultVariables(result, true); + + ClearOrSaveResult(result); + notes.in_flip = !notes.in_flip; + result = next_result; + } + + /* reset notice hook */ + PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL); + termPQExpBuffer(¬es.flip); + termPQExpBuffer(¬es.flop); + + /* may need this to recover from conn loss during COPY */ + if (!CheckConnection()) + return -1; + + return success ? 1 : -1; +} + /* * SendQuery: send the query string to the backend @@ -1294,28 +1381,9 @@ SendQuery(const char *query) pset.crosstab_flag || !is_select_command(query)) { /* Default fetch-it-all-and-print mode */ - instr_time before, - after; - - if (pset.timing) - INSTR_TIME_SET_CURRENT(before); - - results = PQexec(pset.db, query); - - /* these operations are included in the timing result: */ - ResetCancelConn(); - OK = ProcessResult(&results); - - if (pset.timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - elapsed_msec = INSTR_TIME_GET_MILLISEC(after); - } - - /* but printing results isn't: */ - if (OK && results) - OK = PrintQueryResults(results); + int res = SendQueryAndProcessResults(query, &elapsed_msec, false); + OK = (res >= 0); + results = NULL; } else { @@ -1497,7 +1565,7 @@ DescribeQuery(const char *query, double *elapsed_msec) PQclear(results); results = PQdescribePrepared(pset.db, ""); - OK = AcceptResult(results) && + OK = AcceptResult(results, true) && (PQresultStatus(results) == PGRES_COMMAND_OK); if (OK && results) { @@ -1545,7 +1613,7 @@ DescribeQuery(const char *query, double *elapsed_msec) PQclear(results); results = PQexec(pset.db, buf.data); - OK = AcceptResult(results); + OK = AcceptResult(results, true); if (pset.timing) { @@ -1555,7 +1623,7 @@ DescribeQuery(const char *query, double *elapsed_msec) } if (OK && results) - OK = PrintQueryResults(results); + OK = HandleQueryResult(results, true); termPQExpBuffer(&buf); } @@ -1614,7 +1682,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) if (PQtransactionStatus(pset.db) == PQTRANS_IDLE) { results = PQexec(pset.db, "BEGIN"); - OK = AcceptResult(results) && + OK = AcceptResult(results, true) && (PQresultStatus(results) == PGRES_COMMAND_OK); ClearOrSaveResult(results); if (!OK) @@ -1628,7 +1696,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) query); results = PQexec(pset.db, buf.data); - OK = AcceptResult(results) && + OK = AcceptResult(results, true) && (PQresultStatus(results) == PGRES_COMMAND_OK); if (!OK) SetResultVariables(results, OK); @@ -1701,7 +1769,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) is_pager = false; } - OK = AcceptResult(results); + OK = AcceptResult(results, true); Assert(!OK); SetResultVariables(results, OK); ClearOrSaveResult(results); @@ -1810,7 +1878,7 @@ cleanup: results = PQexec(pset.db, "CLOSE _psql_cursor"); if (OK) { - OK = AcceptResult(results) && + OK = AcceptResult(results, true) && (PQresultStatus(results) == PGRES_COMMAND_OK); ClearOrSaveResult(results); } @@ -1820,7 +1888,7 @@ cleanup: if (started_txn) { results = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK"); - OK &= AcceptResult(results) && + OK &= AcceptResult(results, true) && (PQresultStatus(results) == PGRES_COMMAND_OK); ClearOrSaveResult(results); } |