diff options
Diffstat (limited to 'src/bin/psql/common.c')
-rw-r--r-- | src/bin/psql/common.c | 524 |
1 files changed, 291 insertions, 233 deletions
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index f6777bbfc44..5a707dab648 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -32,7 +32,8 @@ static bool DescribeQuery(const char *query, double *elapsed_msec); static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec); -static bool ExecQueryAndProcessResult(const char *query, double *elapsed_msec, bool *svpt_gone_p); +static int ExecQueryAndProcessResults(const char *query, double *elapsed_msec, bool *svpt_gone_p, + bool is_watch, const printQueryOpt *opt, FILE *printQueryFout); static bool command_no_begin(const char *query); static bool is_select_command(const char *query); @@ -355,7 +356,7 @@ CheckConnection(void) * Returns true for valid result, false for error state. */ static bool -AcceptResult(const PGresult *result) +AcceptResult(const PGresult *result, bool show_error) { bool OK; @@ -386,7 +387,7 @@ AcceptResult(const PGresult *result) break; } - if (!OK) + if (!OK && show_error) { const char *error = PQerrorMessage(pset.db); @@ -476,6 +477,19 @@ ClearOrSaveResult(PGresult *result) /* + * Consume all results + */ +static void +ClearOrSaveAllResults(void) +{ + PGresult *result; + + while ((result = PQgetResult(pset.db)) != NULL) + ClearOrSaveResult(result); +} + + +/* * Print microtiming output. Always print raw milliseconds; if the interval * is >= 1 second, also break it down into days/hours/minutes/seconds. */ @@ -574,7 +588,7 @@ PSQLexec(const char *query) ResetCancelConn(); - if (!AcceptResult(res)) + if (!AcceptResult(res, true)) { ClearOrSaveResult(res); res = NULL; @@ -597,11 +611,8 @@ int PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout) { bool timing = pset.timing; - PGresult *res; double elapsed_msec = 0; - instr_time before; - instr_time after; - FILE *fout; + int res; if (!pset.db) { @@ -611,76 +622,15 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout) SetCancelConn(pset.db); - if (timing) - INSTR_TIME_SET_CURRENT(before); - - res = PQexec(pset.db, query); + res = ExecQueryAndProcessResults(query, &elapsed_msec, NULL, true, opt, printQueryFout); ResetCancelConn(); - if (!AcceptResult(res)) - { - ClearOrSaveResult(res); - return 0; - } - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - elapsed_msec = INSTR_TIME_GET_MILLISEC(after); - } - - /* - * If SIGINT is sent while the query is processing, the interrupt will be - * consumed. The user's intention, though, is to cancel the entire watch - * process, so detect a sent cancellation request and exit in this case. - */ - if (cancel_pressed) - { - PQclear(res); - return 0; - } - - fout = printQueryFout ? printQueryFout : pset.queryFout; - - switch (PQresultStatus(res)) - { - case PGRES_TUPLES_OK: - printQuery(res, opt, fout, false, pset.logfile); - break; - - case PGRES_COMMAND_OK: - fprintf(fout, "%s\n%s\n\n", opt->title, PQcmdStatus(res)); - break; - - case PGRES_EMPTY_QUERY: - pg_log_error("\\watch cannot be used with an empty query"); - PQclear(res); - return -1; - - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - case PGRES_COPY_BOTH: - pg_log_error("\\watch cannot be used with COPY"); - PQclear(res); - return -1; - - default: - pg_log_error("unexpected result status for \\watch"); - PQclear(res); - return -1; - } - - PQclear(res); - - fflush(fout); - /* Possible microtiming output */ if (timing) PrintTiming(elapsed_msec); - return 1; + return res; } @@ -715,7 +665,7 @@ PrintNotifications(void) * Returns true if successful, false otherwise. */ static bool -PrintQueryTuples(const PGresult *result) +PrintQueryTuples(const PGresult *result, const printQueryOpt *opt, FILE *printQueryFout) { bool ok = true; @@ -747,8 +697,9 @@ PrintQueryTuples(const PGresult *result) } else { - printQuery(result, &pset.popt, pset.queryFout, false, pset.logfile); - if (ferror(pset.queryFout)) + FILE *fout = printQueryFout ? printQueryFout : pset.queryFout; + printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile); + if (ferror(fout)) { pg_log_error("could not print result table: %m"); ok = false; @@ -1001,123 +952,27 @@ HandleCopyResult(PGresult **resultp) return success; } - -/* - * ProcessResult: utility function for use by SendQuery() only - * - * When our command string contained a COPY FROM STDIN or COPY TO STDOUT, - * PQexec() has stopped at the PGresult associated with the first such - * command. In that event, we'll marshal data for the COPY and then cycle - * through any subsequent PGresult objects. - * - * When the command string contained no such COPY command, this function - * degenerates to an AcceptResult() call. - * - * Changes its argument to point to the last PGresult of the command string, - * or NULL if that result was for a COPY TO STDOUT. (Returning NULL prevents - * the command status from being printed, which we want in that case so that - * the status line doesn't get taken as part of the COPY data.) - * - * Returns true on complete success, false otherwise. Possible failure modes - * include purely client-side problems; check the transaction status for the - * server-side opinion. - */ -static bool -ProcessResult(PGresult **resultp) -{ - bool success = true; - bool first_cycle = true; - - for (;;) - { - ExecStatusType result_status; - bool is_copy; - PGresult *next_result; - - if (!AcceptResult(*resultp)) - { - /* - * Failure at this point is always a server-side failure or a - * failure to submit the command string. Either way, we're - * finished with this command string. - */ - success = false; - break; - } - - result_status = PQresultStatus(*resultp); - switch (result_status) - { - case PGRES_EMPTY_QUERY: - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - is_copy = false; - break; - - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - is_copy = true; - break; - - default: - /* AcceptResult() should have caught anything else. */ - is_copy = false; - pg_log_error("unexpected PQresultStatus: %d", result_status); - break; - } - - if (is_copy) - success = HandleCopyResult(resultp); - else if (first_cycle) - { - /* fast path: no COPY commands; PQexec visited all results */ - break; - } - - /* - * Check PQgetResult() again. In the typical case of a single-command - * string, it will return NULL. Otherwise, we'll have other results - * to process that may include other COPYs. We keep the last result. - */ - next_result = PQgetResult(pset.db); - if (!next_result) - break; - - PQclear(*resultp); - *resultp = next_result; - first_cycle = false; - } - - SetResultVariables(*resultp, success); - - /* may need this to recover from conn loss during COPY */ - if (!first_cycle && !CheckConnection()) - return false; - - return success; -} - - /* * PrintQueryStatus: report command status as required * * Note: Utility function for use by PrintQueryResult() only. */ static void -PrintQueryStatus(PGresult *result) +PrintQueryStatus(PGresult *result, FILE *printQueryFout) { char buf[16]; + FILE *fout = printQueryFout ? printQueryFout : pset.queryFout; if (!pset.quiet) { if (pset.popt.topt.format == PRINT_HTML) { - fputs("<p>", pset.queryFout); - html_escaped_print(PQcmdStatus(result), pset.queryFout); - fputs("</p>\n", pset.queryFout); + fputs("<p>", fout); + html_escaped_print(PQcmdStatus(result), fout); + fputs("</p>\n", fout); } else - fprintf(pset.queryFout, "%s\n", PQcmdStatus(result)); + fprintf(fout, "%s\n", PQcmdStatus(result)); } if (pset.logfile) @@ -1136,7 +991,7 @@ PrintQueryStatus(PGresult *result) * Returns true if the query executed successfully, false otherwise. */ static bool -PrintQueryResult(PGresult *result) +PrintQueryResult(PGresult *result, bool last, bool is_watch, const printQueryOpt *opt, FILE *printQueryFout) { bool success; const char *cmdstatus; @@ -1148,24 +1003,32 @@ PrintQueryResult(PGresult *result) { case PGRES_TUPLES_OK: /* store or execute or print the data ... */ - if (pset.gset_prefix) + if (last && pset.gset_prefix) success = StoreQueryTuple(result); - else if (pset.gexec_flag) + else if (last && pset.gexec_flag) success = ExecQueryTuples(result); - else if (pset.crosstab_flag) + else if (last && pset.crosstab_flag) success = PrintResultInCrosstab(result); + else if (last || pset.show_all_results) + success = PrintQueryTuples(result, opt, printQueryFout); else - success = PrintQueryTuples(result); + success = true; + /* if it's INSERT/UPDATE/DELETE RETURNING, also print status */ - cmdstatus = PQcmdStatus(result); - if (strncmp(cmdstatus, "INSERT", 6) == 0 || - strncmp(cmdstatus, "UPDATE", 6) == 0 || - strncmp(cmdstatus, "DELETE", 6) == 0) - PrintQueryStatus(result); + if (last || pset.show_all_results) + { + cmdstatus = PQcmdStatus(result); + if (strncmp(cmdstatus, "INSERT", 6) == 0 || + strncmp(cmdstatus, "UPDATE", 6) == 0 || + strncmp(cmdstatus, "DELETE", 6) == 0) + PrintQueryStatus(result, printQueryFout); + } + break; case PGRES_COMMAND_OK: - PrintQueryStatus(result); + if (last || pset.show_all_results) + PrintQueryStatus(result, printQueryFout); success = true; break; @@ -1175,7 +1038,7 @@ PrintQueryResult(PGresult *result) case PGRES_COPY_OUT: case PGRES_COPY_IN: - /* nothing to do here */ + /* nothing to do here: already processed */ success = true; break; @@ -1192,11 +1055,46 @@ PrintQueryResult(PGresult *result) break; } - fflush(pset.queryFout); + fflush(printQueryFout ? printQueryFout : pset.queryFout); return success; } +/* + * Data structure and functions to record notices while they are + * emitted, so that they can be shown later. + * + * We need to know which result is last, which requires to extract + * one result in advance, hence two buffers are needed. + */ +struct t_notice_messages +{ + PQExpBufferData messages[2]; + int current; +}; + +/* + * Store notices in appropriate buffer, for later display. + */ +static void +AppendNoticeMessage(void *arg, const char *msg) +{ + struct t_notice_messages *notices = arg; + appendPQExpBufferStr(¬ices->messages[notices->current], msg); +} + +/* + * Show notices stored in buffer, which is then reset. + */ +static void +ShowNoticeMessage(struct t_notice_messages *notices) +{ + PQExpBufferData *current = ¬ices->messages[notices->current]; + if (*current->data != '\0') + pg_log_info("%s", current->data); + resetPQExpBuffer(current); +} + /* * SendQuery: send the query string to the backend @@ -1273,7 +1171,6 @@ SendQuery(const char *query) { pg_log_info("%s", PQerrorMessage(pset.db)); ClearOrSaveResult(result); - ResetCancelConn(); goto sendquery_cleanup; } ClearOrSaveResult(result); @@ -1292,7 +1189,6 @@ SendQuery(const char *query) { pg_log_info("%s", PQerrorMessage(pset.db)); ClearOrSaveResult(result); - ResetCancelConn(); goto sendquery_cleanup; } ClearOrSaveResult(result); @@ -1303,19 +1199,17 @@ SendQuery(const char *query) { /* Describe query's result columns, without executing it */ OK = DescribeQuery(query, &elapsed_msec); - ResetCancelConn(); } else if (pset.fetch_count <= 0 || pset.gexec_flag || pset.crosstab_flag || !is_select_command(query)) { /* Default fetch-it-all-and-print mode */ - OK = ExecQueryAndProcessResult(query, &elapsed_msec, &svpt_gone); + OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, NULL, NULL) >= 0); } else { /* Fetch-in-segments mode */ OK = ExecQueryUsingCursor(query, &elapsed_msec); - ResetCancelConn(); } if (!OK && pset.echo == PSQL_ECHO_ERRORS) @@ -1370,7 +1264,6 @@ SendQuery(const char *query) ClearOrSaveResult(svptres); OK = false; - ResetCancelConn(); goto sendquery_cleanup; } PQclear(svptres); @@ -1399,6 +1292,9 @@ SendQuery(const char *query) sendquery_cleanup: + /* global cancellation reset */ + ResetCancelConn(); + /* reset \g's output-to-filename trigger */ if (pset.gfname) { @@ -1479,7 +1375,7 @@ DescribeQuery(const char *query, double *elapsed_msec) PQclear(result); result = PQdescribePrepared(pset.db, ""); - OK = AcceptResult(result) && + OK = AcceptResult(result, true) && (PQresultStatus(result) == PGRES_COMMAND_OK); if (OK && result) { @@ -1527,7 +1423,7 @@ DescribeQuery(const char *query, double *elapsed_msec) PQclear(result); result = PQexec(pset.db, buf.data); - OK = AcceptResult(result); + OK = AcceptResult(result, true); if (timing) { @@ -1537,7 +1433,7 @@ DescribeQuery(const char *query, double *elapsed_msec) } if (OK && result) - OK = PrintQueryResult(result); + OK = PrintQueryResult(result, true, false, NULL, NULL); termPQExpBuffer(&buf); } @@ -1554,56 +1450,218 @@ DescribeQuery(const char *query, double *elapsed_msec) /* - * ExecQueryAndProcessResults: SendQuery() subroutine for the normal way to - * send a query + * ExecQueryAndProcessResults: utility function for use by SendQuery() + * and PSQLexecWatch(). + * + * Sends query and cycles through PGresult objects. + * + * When not under \watch and if our command string contained a COPY FROM STDIN + * or COPY TO STDOUT, the PGresult associated with these commands must be + * processed by providing an input or output stream. In that event, we'll + * marshal data for the COPY. + * + * For other commands, the results are processed normally, depending on their + * status. + * + * Returns 1 on complete success, 0 on interrupt and -1 or errors. Possible + * failure modes include purely client-side problems; check the transaction + * status for the server-side opinion. + * + * Note that on a combined query, failure does not mean that nothing was + * committed. */ -static bool -ExecQueryAndProcessResult(const char *query, double *elapsed_msec, bool *svpt_gone_p) +static int +ExecQueryAndProcessResults(const char *query, double *elapsed_msec, bool *svpt_gone_p, + bool is_watch, const printQueryOpt *opt, FILE *printQueryFout) { bool timing = pset.timing; - bool OK; + bool success; instr_time before, after; PGresult *result; + struct t_notice_messages notices; if (timing) INSTR_TIME_SET_CURRENT(before); - result = PQexec(pset.db, query); - - /* these operations are included in the timing result: */ - ResetCancelConn(); - OK = ProcessResult(&result); + success = PQsendQuery(pset.db, query); - if (timing) + if (!success) { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec = INSTR_TIME_GET_MILLISEC(after); - } + const char *error = PQerrorMessage(pset.db); - /* but printing result isn't: */ - if (OK && result) - OK = PrintQueryResult(result); + if (strlen(error)) + pg_log_info("%s", error); + + CheckConnection(); + + return -1; + } /* - * Check if the user ran any command that would destroy our internal - * savepoint: If the user did COMMIT AND CHAIN, RELEASE or ROLLBACK, our - * savepoint is gone. If they issued a SAVEPOINT, releasing ours would - * remove theirs. + * If SIGINT is sent while the query is processing, the interrupt will be + * consumed. The user's intention, though, is to cancel the entire watch + * process, so detect a sent cancellation request and exit in this case. */ - if (result && svpt_gone_p) + if (is_watch && cancel_pressed) { - const char *cmd = PQcmdStatus(result); - *svpt_gone_p = (strcmp(cmd, "COMMIT") == 0 || - strcmp(cmd, "SAVEPOINT") == 0 || - strcmp(cmd, "RELEASE") == 0 || - strcmp(cmd, "ROLLBACK") == 0); + ClearOrSaveAllResults(); + return 0; } - ClearOrSaveResult(result); + /* intercept notices */ + notices.current = 0; + initPQExpBuffer(¬ices.messages[0]); + initPQExpBuffer(¬ices.messages[1]); + PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬ices); - return OK; + /* first result */ + result = PQgetResult(pset.db); + + while (result != NULL) + { + ExecStatusType result_status; + PGresult *next_result; + bool last; + + if (!AcceptResult(result, false)) + { + /* + * Some error occured, either a server-side failure or + * a failure to submit the command string. Record that. + */ + const char *error = PQresultErrorMessage(result); + + ShowNoticeMessage(¬ices); + if (strlen(error)) + pg_log_info("%s", error); + + CheckConnection(); + if (!is_watch) + SetResultVariables(result, false); + + /* keep the result status before clearing it */ + result_status = PQresultStatus(result); + ClearOrSaveResult(result); + success = false; + + /* + * switch to next result + */ + if (result_status == PGRES_COPY_BOTH || + result_status == PGRES_COPY_OUT || + result_status == PGRES_COPY_IN) + /* + * For some obscure reason PQgetResult does *not* return a NULL in copy + * cases despite the result having been cleared, but keeps returning an + * "empty" result that we have to ignore manually. + */ + result = NULL; + else + result = PQgetResult(pset.db); + + continue; + } + else if (svpt_gone_p && !*svpt_gone_p) + { + /* + * Check if the user ran any command that would destroy our internal + * savepoint: If the user did COMMIT AND CHAIN, RELEASE or ROLLBACK, our + * savepoint is gone. If they issued a SAVEPOINT, releasing ours would + * remove theirs. + */ + const char *cmd = PQcmdStatus(result); + *svpt_gone_p = (strcmp(cmd, "COMMIT") == 0 || + strcmp(cmd, "SAVEPOINT") == 0 || + strcmp(cmd, "RELEASE") == 0 || + strcmp(cmd, "ROLLBACK") == 0); + } + + result_status = PQresultStatus(result); + + /* must handle COPY before changing the current result */ + Assert(result_status != PGRES_COPY_BOTH); + if (result_status == PGRES_COPY_IN || + result_status == PGRES_COPY_OUT) + { + ShowNoticeMessage(¬ices); + + if (is_watch) + { + ClearOrSaveAllResults(); + pg_log_error("\\watch cannot be used with COPY"); + return -1; + } + + /* use normal notice processor during COPY */ + PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL); + + success &= HandleCopyResult(&result); + + PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬ices); + } + + /* + * Check PQgetResult() again. In the typical case of a single-command + * string, it will return NULL. Otherwise, we'll have other results + * to process. We need to do that to check whether this is the last. + */ + notices.current ^= 1; + next_result = PQgetResult(pset.db); + notices.current ^= 1; + last = (next_result == NULL); + + /* + * Get timing measure before printing the last result. + * + * It will include the display of previous results, if any. + * This cannot be helped because the server goes on processing + * further queries anyway while the previous ones are being displayed. + * The parallel execution of the client display hides the server time + * when it is shorter. + * + * With combined queries, timing must be understood as an upper bound + * of the time spent processing them. + */ + if (last && timing) + { + INSTR_TIME_SET_CURRENT(after); + INSTR_TIME_SUBTRACT(after, before); + *elapsed_msec = INSTR_TIME_GET_MILLISEC(after); + } + + /* notices already shown above for copy */ + ShowNoticeMessage(¬ices); + + /* this may or may not print something depending on settings */ + if (result != NULL) + success &= PrintQueryResult(result, last, false, opt, printQueryFout); + + /* set variables on last result if all went well */ + if (!is_watch && last && success) + SetResultVariables(result, true); + + ClearOrSaveResult(result); + notices.current ^= 1; + result = next_result; + + if (cancel_pressed) + { + ClearOrSaveAllResults(); + break; + } + } + + /* reset notice hook */ + PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL); + termPQExpBuffer(¬ices.messages[0]); + termPQExpBuffer(¬ices.messages[1]); + + /* may need this to recover from conn loss during COPY */ + if (!CheckConnection()) + return -1; + + return cancel_pressed ? 0 : success ? 1 : -1; } @@ -1651,7 +1709,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) if (PQtransactionStatus(pset.db) == PQTRANS_IDLE) { result = PQexec(pset.db, "BEGIN"); - OK = AcceptResult(result) && + OK = AcceptResult(result, true) && (PQresultStatus(result) == PGRES_COMMAND_OK); ClearOrSaveResult(result); if (!OK) @@ -1665,7 +1723,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) query); result = PQexec(pset.db, buf.data); - OK = AcceptResult(result) && + OK = AcceptResult(result, true) && (PQresultStatus(result) == PGRES_COMMAND_OK); if (!OK) SetResultVariables(result, OK); @@ -1738,7 +1796,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) is_pager = false; } - OK = AcceptResult(result); + OK = AcceptResult(result, true); Assert(!OK); SetResultVariables(result, OK); ClearOrSaveResult(result); @@ -1847,7 +1905,7 @@ cleanup: result = PQexec(pset.db, "CLOSE _psql_cursor"); if (OK) { - OK = AcceptResult(result) && + OK = AcceptResult(result, true) && (PQresultStatus(result) == PGRES_COMMAND_OK); ClearOrSaveResult(result); } @@ -1857,7 +1915,7 @@ cleanup: if (started_txn) { result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK"); - OK &= AcceptResult(result) && + OK &= AcceptResult(result, true) && (PQresultStatus(result) == PGRES_COMMAND_OK); ClearOrSaveResult(result); } |