diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/pgbench/pgbench.c | 1128 | ||||
-rw-r--r-- | src/bin/pgbench/t/001_pgbench_with_server.pl | 215 | ||||
-rw-r--r-- | src/bin/pgbench/t/002_pgbench_no_server.pl | 10 | ||||
-rw-r--r-- | src/fe_utils/conditional.c | 16 | ||||
-rw-r--r-- | src/include/fe_utils/conditional.h | 2 |
5 files changed, 1209 insertions, 162 deletions
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 000ffc4a5cc..7080d2a7958 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -76,6 +76,8 @@ #define M_PI 3.14159265358979323846 #endif +#define ERRCODE_T_R_SERIALIZATION_FAILURE "40001" +#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01" #define ERRCODE_UNDEFINED_TABLE "42P01" /* @@ -275,9 +277,34 @@ bool progress_timestamp = false; /* progress report with Unix time */ int nclients = 1; /* number of clients */ int nthreads = 1; /* number of threads */ bool is_connect; /* establish connection for each transaction */ -bool report_per_command; /* report per-command latencies */ +bool report_per_command = false; /* report per-command latencies, retries + * after errors and failures (errors + * without retrying) */ int main_pid; /* main process id used in log filename */ +/* + * There are different types of restrictions for deciding that the current + * transaction with a serialization/deadlock error can no longer be retried and + * should be reported as failed: + * - max_tries (--max-tries) can be used to limit the number of tries; + * - latency_limit (-L) can be used to limit the total time of tries; + * - duration (-T) can be used to limit the total benchmark time. + * + * They can be combined together, and you need to use at least one of them to + * retry the transactions with serialization/deadlock errors. If none of them is + * used, the default value of max_tries is 1 and such transactions will not be + * retried. + */ + +/* + * We cannot retry a transaction after the serialization/deadlock error if its + * number of tries reaches this maximum; if its value is zero, it is not used. + */ +uint32 max_tries = 1; + +bool failures_detailed = false; /* whether to group failures in reports + * or logs by basic types */ + const char *pghost = NULL; const char *pgport = NULL; const char *username = NULL; @@ -290,6 +317,12 @@ const char *progname; volatile bool timer_exceeded = false; /* flag from signal handler */ /* + * We don't want to allocate variables one by one; for efficiency, add a + * constant margin each time it overflows. + */ +#define VARIABLES_ALLOC_MARGIN 8 + +/* * Variable definitions. * * If a variable only has a string value, "svalue" is that value, and value is @@ -306,6 +339,24 @@ typedef struct PgBenchValue value; /* actual variable's value */ } Variable; +/* + * Data structure for client variables. + */ +typedef struct +{ + Variable *vars; /* array of variable definitions */ + int nvars; /* number of variables */ + + /* + * The maximum number of variables that we can currently store in 'vars' + * without having to reallocate more space. We must always have max_vars >= + * nvars. + */ + int max_vars; + + bool vars_sorted; /* are variables sorted by name? */ +} Variables; + #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */ #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ @@ -338,9 +389,66 @@ typedef int64 pg_time_usec_t; typedef struct StatsData { pg_time_usec_t start_time; /* interval start time, for aggregates */ - int64 cnt; /* number of transactions, including skipped */ + + /* + * Transactions are counted depending on their execution and outcome. First + * a transaction may have started or not: skipped transactions occur under + * --rate and --latency-limit when the client is too late to execute them. + * Secondly, a started transaction may ultimately succeed or fail, possibly + * after some retries when --max-tries is not one. Thus + * + * the number of all transactions = + * 'skipped' (it was too late to execute them) + + * 'cnt' (the number of successful transactions) + + * failed (the number of failed transactions). + * + * A successful transaction can have several unsuccessful tries before a + * successful run. Thus + * + * 'cnt' (the number of successful transactions) = + * successfully retried transactions (they got a serialization or a + * deadlock error(s), but were + * successfully retried from the very + * beginning) + + * directly successful transactions (they were successfully completed on + * the first try). + * + * A failed transaction is defined as unsuccessfully retried transactions. + * It can be one of two types: + * + * failed (the number of failed transactions) = + * 'serialization_failures' (they got a serialization error and were not + * successfully retried) + + * 'deadlock_failures' (they got a deadlock error and were not successfully + * retried). + * + * If the transaction was retried after a serialization or a deadlock error + * this does not guarantee that this retry was successful. Thus + * + * 'retries' (number of retries) = + * number of retries in all retried transactions = + * number of retries in (successfully retried transactions + + * failed transactions); + * + * 'retried' (number of all retried transactions) = + * successfully retried transactions + + * failed transactions. + */ + int64 cnt; /* number of successful transactions, not + * including 'skipped' */ int64 skipped; /* number of transactions skipped under --rate * and --latency-limit */ + int64 retries; /* number of retries after a serialization or a + * deadlock error in all the transactions */ + int64 retried; /* number of all transactions that were retried + * after a serialization or a deadlock error + * (perhaps the last try was unsuccessful) */ + int64 serialization_failures; /* number of transactions that were not + * successfully retried after a + * serialization error */ + int64 deadlock_failures; /* number of transactions that were not + * successfully retried after a deadlock + * error */ SimpleStats latency; SimpleStats lag; } StatsData; @@ -351,6 +459,31 @@ typedef struct StatsData */ pg_time_usec_t epoch_shift; +/* + * Error status for errors during script execution. + */ +typedef enum EStatus +{ + ESTATUS_NO_ERROR = 0, + ESTATUS_META_COMMAND_ERROR, + + /* SQL errors */ + ESTATUS_SERIALIZATION_ERROR, + ESTATUS_DEADLOCK_ERROR, + ESTATUS_OTHER_SQL_ERROR +} EStatus; + +/* + * Transaction status at the end of a command. + */ +typedef enum TStatus +{ + TSTATUS_IDLE, + TSTATUS_IN_BLOCK, + TSTATUS_CONN_ERROR, + TSTATUS_OTHER_ERROR +} TStatus; + /* Various random sequences are initialized from this one. */ static pg_prng_state base_random_sequence; @@ -423,6 +556,35 @@ typedef enum CSTATE_SKIP_COMMAND, /* + * States for failed commands. + * + * If the SQL/meta command fails, in CSTATE_ERROR clean up after an error: + * - clear the conditional stack; + * - if we have an unterminated (possibly failed) transaction block, send + * the rollback command to the server and wait for the result in + * CSTATE_WAIT_ROLLBACK_RESULT. If something goes wrong with rolling back, + * go to CSTATE_ABORTED. + * + * But if everything is ok we are ready for future transactions: if this is + * a serialization or deadlock error and we can re-execute the transaction + * from the very beginning, go to CSTATE_RETRY; otherwise go to + * CSTATE_FAILURE. + * + * In CSTATE_RETRY report an error, set the same parameters for the + * transaction execution as in the previous tries and process the first + * transaction command in CSTATE_START_COMMAND. + * + * In CSTATE_FAILURE report a failure, set the parameters for the + * transaction execution as they were before the first run of this + * transaction (except for a random state) and go to CSTATE_END_TX to + * complete this transaction. + */ + CSTATE_ERROR, + CSTATE_WAIT_ROLLBACK_RESULT, + CSTATE_RETRY, + CSTATE_FAILURE, + + /* * CSTATE_END_TX performs end-of-transaction processing. It calculates * latency, and logs the transaction. In --connect mode, it closes the * current connection. @@ -460,9 +622,7 @@ typedef struct int command; /* command number in script */ /* client variables */ - Variable *variables; /* array of variable definitions */ - int nvariables; /* number of variables */ - bool vars_sorted; /* are variables sorted by name? */ + Variables variables; /* various times about current transaction in microseconds */ pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */ @@ -472,8 +632,20 @@ typedef struct bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + /* + * For processing failures and repeating transactions with serialization or + * deadlock errors: + */ + EStatus estatus; /* the error status of the current transaction + * execution; this is ESTATUS_NO_ERROR if there were + * no errors */ + pg_prng_state random_state; /* random state */ + uint32 tries; /* how many times have we already tried the + * current transaction? */ + /* per client collected stats */ - int64 cnt; /* client transaction count, for -t */ + int64 cnt; /* client transaction count, for -t; skipped and + * failed transactions are also counted here */ } CState; /* @@ -568,6 +740,9 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; * aset do gset on all possible queries of a combined query (\;). * expr Parsed expression, if needed. * stats Time spent in this command. + * retries Number of retries after a serialization or deadlock error in the + * current command. + * failures Number of errors in the current command that were not retried. */ typedef struct Command { @@ -580,6 +755,8 @@ typedef struct Command char *varprefix; PgBenchExpr *expr; SimpleStats stats; + int64 retries; + int64 failures; } Command; typedef struct ParsedScript @@ -594,6 +771,8 @@ static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */ static int num_scripts; /* number of scripts in sql_script[] */ static int64 total_weight = 0; +static bool verbose_errors = false; /* print verbose messages of all errors */ + /* Builtin test scripts */ typedef struct BuiltinScript { @@ -731,15 +910,18 @@ usage(void) " protocol for submitting queries (default: simple)\n" " -n, --no-vacuum do not run VACUUM before tests\n" " -P, --progress=NUM show thread progress report every NUM seconds\n" - " -r, --report-latencies report average latency per command\n" + " -r, --report-per-command report latencies, failures and retries per command\n" " -R, --rate=NUM target rate in transactions per second\n" " -s, --scale=NUM report this scale factor in output\n" " -t, --transactions=NUM number of transactions each client runs (default: 10)\n" " -T, --time=NUM duration of benchmark test in seconds\n" " -v, --vacuum-all vacuum all four standard tables before tests\n" " --aggregate-interval=NUM aggregate data over NUM seconds\n" + " --failures-detailed report the failures grouped by basic types\n" " --log-prefix=PREFIX prefix for transaction time log file\n" " (default: \"pgbench_log\")\n" + " --max-tries=NUM max number of tries to run transaction (default: 1)\n" + " --verbose-errors print messages of all errors\n" " --progress-timestamp use Unix epoch timestamps for progress\n" " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n" " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n" @@ -1265,6 +1447,10 @@ initStats(StatsData *sd, pg_time_usec_t start) sd->start_time = start; sd->cnt = 0; sd->skipped = 0; + sd->retries = 0; + sd->retried = 0; + sd->serialization_failures = 0; + sd->deadlock_failures = 0; initSimpleStats(&sd->latency); initSimpleStats(&sd->lag); } @@ -1273,22 +1459,51 @@ initStats(StatsData *sd, pg_time_usec_t start) * Accumulate one additional item into the given stats object. */ static void -accumStats(StatsData *stats, bool skipped, double lat, double lag) +accumStats(StatsData *stats, bool skipped, double lat, double lag, + EStatus estatus, int64 tries) { - stats->cnt++; - + /* Record the skipped transaction */ if (skipped) { /* no latency to record on skipped transactions */ stats->skipped++; + return; } - else + + /* + * Record the number of retries regardless of whether the transaction was + * successful or failed. + */ + if (tries > 1) { - addToSimpleStats(&stats->latency, lat); + stats->retries += (tries - 1); + stats->retried++; + } - /* and possibly the same for schedule lag */ - if (throttle_delay) - addToSimpleStats(&stats->lag, lag); + switch (estatus) + { + /* Record the successful transaction */ + case ESTATUS_NO_ERROR: + stats->cnt++; + + addToSimpleStats(&stats->latency, lat); + + /* and possibly the same for schedule lag */ + if (throttle_delay) + addToSimpleStats(&stats->lag, lag); + break; + + /* Record the failed transaction */ + case ESTATUS_SERIALIZATION_ERROR: + stats->serialization_failures++; + break; + case ESTATUS_DEADLOCK_ERROR: + stats->deadlock_failures++; + break; + default: + /* internal error which should never occur */ + pg_log_fatal("unexpected error status: %d", estatus); + exit(1); } } @@ -1398,39 +1613,39 @@ compareVariableNames(const void *v1, const void *v2) /* Locate a variable by name; returns NULL if unknown */ static Variable * -lookupVariable(CState *st, char *name) +lookupVariable(Variables *variables, char *name) { Variable key; /* On some versions of Solaris, bsearch of zero items dumps core */ - if (st->nvariables <= 0) + if (variables->nvars <= 0) return NULL; /* Sort if we have to */ - if (!st->vars_sorted) + if (!variables->vars_sorted) { - qsort((void *) st->variables, st->nvariables, sizeof(Variable), + qsort((void *) variables->vars, variables->nvars, sizeof(Variable), compareVariableNames); - st->vars_sorted = true; + variables->vars_sorted = true; } /* Now we can search */ key.name = name; return (Variable *) bsearch((void *) &key, - (void *) st->variables, - st->nvariables, + (void *) variables->vars, + variables->nvars, sizeof(Variable), compareVariableNames); } /* Get the value of a variable, in string form; returns NULL if unknown */ static char * -getVariable(CState *st, char *name) +getVariable(Variables *variables, char *name) { Variable *var; char stringform[64]; - var = lookupVariable(st, name); + var = lookupVariable(variables, name); if (var == NULL) return NULL; /* not found */ @@ -1563,20 +1778,36 @@ valid_variable_name(const char *name) } /* + * Make sure there is enough space for 'needed' more variable in the variables + * array. + */ +static void +enlargeVariables(Variables *variables, int needed) +{ + /* total number of variables required now */ + needed += variables->nvars; + + if (variables->max_vars < needed) + { + variables->max_vars = needed + VARIABLES_ALLOC_MARGIN; + variables->vars = (Variable *) + pg_realloc(variables->vars, variables->max_vars * sizeof(Variable)); + } +} + +/* * Lookup a variable by name, creating it if need be. * Caller is expected to assign a value to the variable. * Returns NULL on failure (bad name). */ static Variable * -lookupCreateVariable(CState *st, const char *context, char *name) +lookupCreateVariable(Variables *variables, const char *context, char *name) { Variable *var; - var = lookupVariable(st, name); + var = lookupVariable(variables, name); if (var == NULL) { - Variable *newvars; - /* * Check for the name only when declaring a new variable to avoid * overhead. @@ -1588,23 +1819,17 @@ lookupCreateVariable(CState *st, const char *context, char *name) } /* Create variable at the end of the array */ - if (st->variables) - newvars = (Variable *) pg_realloc(st->variables, - (st->nvariables + 1) * sizeof(Variable)); - else - newvars = (Variable *) pg_malloc(sizeof(Variable)); - - st->variables = newvars; + enlargeVariables(variables, 1); - var = &newvars[st->nvariables]; + var = &(variables->vars[variables->nvars]); var->name = pg_strdup(name); var->svalue = NULL; /* caller is expected to initialize remaining fields */ - st->nvariables++; + variables->nvars++; /* we don't re-sort the array till we have to */ - st->vars_sorted = false; + variables->vars_sorted = false; } return var; @@ -1613,12 +1838,13 @@ lookupCreateVariable(CState *st, const char *context, char *name) /* Assign a string value to a variable, creating it if need be */ /* Returns false on failure (bad name) */ static bool -putVariable(CState *st, const char *context, char *name, const char *value) +putVariable(Variables *variables, const char *context, char *name, + const char *value) { Variable *var; char *val; - var = lookupCreateVariable(st, context, name); + var = lookupCreateVariable(variables, context, name); if (!var) return false; @@ -1636,12 +1862,12 @@ putVariable(CState *st, const char *context, char *name, const char *value) /* Assign a value to a variable, creating it if need be */ /* Returns false on failure (bad name) */ static bool -putVariableValue(CState *st, const char *context, char *name, +putVariableValue(Variables *variables, const char *context, char *name, const PgBenchValue *value) { Variable *var; - var = lookupCreateVariable(st, context, name); + var = lookupCreateVariable(variables, context, name); if (!var) return false; @@ -1656,12 +1882,13 @@ putVariableValue(CState *st, const char *context, char *name, /* Assign an integer value to a variable, creating it if need be */ /* Returns false on failure (bad name) */ static bool -putVariableInt(CState *st, const char *context, char *name, int64 value) +putVariableInt(Variables *variables, const char *context, char *name, + int64 value) { PgBenchValue val; setIntValue(&val, value); - return putVariableValue(st, context, name, &val); + return putVariableValue(variables, context, name, &val); } /* @@ -1720,7 +1947,7 @@ replaceVariable(char **sql, char *param, int len, char *value) } static char * -assignVariables(CState *st, char *sql) +assignVariables(Variables *variables, char *sql) { char *p, *name, @@ -1741,7 +1968,7 @@ assignVariables(CState *st, char *sql) continue; } - val = getVariable(st, name); + val = getVariable(variables, name); free(name); if (val == NULL) { @@ -1756,12 +1983,13 @@ assignVariables(CState *st, char *sql) } static void -getQueryParams(CState *st, const Command *command, const char **params) +getQueryParams(Variables *variables, const Command *command, + const char **params) { int i; for (i = 0; i < command->argc - 1; i++) - params[i] = getVariable(st, command->argv[i + 1]); + params[i] = getVariable(variables, command->argv[i + 1]); } static char * @@ -2629,7 +2857,7 @@ evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval) { Variable *var; - if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL) + if ((var = lookupVariable(&st->variables, expr->u.variable.varname)) == NULL) { pg_log_error("undefined variable \"%s\"", expr->u.variable.varname); return false; @@ -2699,7 +2927,7 @@ getMetaCommand(const char *cmd) * Return true if succeeded, or false on error. */ static bool -runShellCommand(CState *st, char *variable, char **argv, int argc) +runShellCommand(Variables *variables, char *variable, char **argv, int argc) { char command[SHELL_COMMAND_SIZE]; int i, @@ -2730,7 +2958,7 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) { arg = argv[i] + 1; /* a string literal starting with colons */ } - else if ((arg = getVariable(st, argv[i] + 1)) == NULL) + else if ((arg = getVariable(variables, argv[i] + 1)) == NULL) { pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[i]); return false; @@ -2791,7 +3019,7 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) pg_log_error("%s: shell command must return an integer (not \"%s\")", argv[0], res); return false; } - if (!putVariableInt(st, "setshell", variable, retval)) + if (!putVariableInt(variables, "setshell", variable, retval)) return false; pg_log_debug("%s: shell parameter name: \"%s\", value: \"%s\"", argv[0], argv[1], res); @@ -2806,6 +3034,9 @@ preparedStatementName(char *buffer, int file, int state) sprintf(buffer, "P%d_%d", file, state); } +/* + * Report the abortion of the client when processing SQL commands. + */ static void commandFailed(CState *st, const char *cmd, const char *message) { @@ -2813,6 +3044,17 @@ commandFailed(CState *st, const char *cmd, const char *message) st->id, st->command, cmd, st->use_file, message); } +/* + * Report the error in the command while the script is executing. + */ +static void +commandError(CState *st, const char *message) +{ + Assert(sql_script[st->use_file].commands[st->command]->type == SQL_COMMAND); + pg_log_info("client %d got an error in command %d (SQL) of script %d; %s", + st->id, st->command, st->use_file, message); +} + /* return a script number with a weighted choice. */ static int chooseScript(TState *thread) @@ -2843,7 +3085,7 @@ sendCommand(CState *st, Command *command) char *sql; sql = pg_strdup(command->argv[0]); - sql = assignVariables(st, sql); + sql = assignVariables(&st->variables, sql); pg_log_debug("client %d sending %s", st->id, sql); r = PQsendQuery(st->con, sql); @@ -2854,7 +3096,7 @@ sendCommand(CState *st, Command *command) const char *sql = command->argv[0]; const char *params[MAX_ARGS]; - getQueryParams(st, command, params); + getQueryParams(&st->variables, command, params); pg_log_debug("client %d sending %s", st->id, sql); r = PQsendQueryParams(st->con, sql, command->argc - 1, @@ -2901,7 +3143,7 @@ sendCommand(CState *st, Command *command) st->prepared[st->use_file] = true; } - getQueryParams(st, command, params); + getQueryParams(&st->variables, command, params); preparedStatementName(name, st->use_file, st->command); pg_log_debug("client %d sending %s", st->id, name); @@ -2921,6 +3163,33 @@ sendCommand(CState *st, Command *command) } /* + * Get the error status from the error code. + */ +static EStatus +getSQLErrorStatus(const char *sqlState) +{ + if (sqlState != NULL) + { + if (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0) + return ESTATUS_SERIALIZATION_ERROR; + else if (strcmp(sqlState, ERRCODE_T_R_DEADLOCK_DETECTED) == 0) + return ESTATUS_DEADLOCK_ERROR; + } + + return ESTATUS_OTHER_SQL_ERROR; +} + +/* + * Returns true if this type of error can be retried. + */ +static bool +canRetryError(EStatus estatus) +{ + return (estatus == ESTATUS_SERIALIZATION_ERROR || + estatus == ESTATUS_DEADLOCK_ERROR); +} + +/* * Process query response from the backend. * * If varprefix is not NULL, it's the variable name prefix where to store @@ -2962,6 +3231,7 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) { pg_log_error("client %d script %d command %d query %d: expected one row, got %d", st->id, st->use_file, st->command, qrynum, 0); + st->estatus = ESTATUS_META_COMMAND_ERROR; goto error; } break; @@ -2976,6 +3246,7 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) /* under \gset, report the error */ pg_log_error("client %d script %d command %d query %d: expected one row, got %d", st->id, st->use_file, st->command, qrynum, PQntuples(res)); + st->estatus = ESTATUS_META_COMMAND_ERROR; goto error; } else if (meta == META_ASET && ntuples <= 0) @@ -2994,12 +3265,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) varname = psprintf("%s%s", varprefix, varname); /* store last row result as a string */ - if (!putVariable(st, meta == META_ASET ? "aset" : "gset", varname, + if (!putVariable(&st->variables, meta == META_ASET ? "aset" : "gset", varname, PQgetvalue(res, ntuples - 1, fld))) { /* internal error */ pg_log_error("client %d script %d command %d query %d: error storing into variable %s", st->id, st->use_file, st->command, qrynum, varname); + st->estatus = ESTATUS_META_COMMAND_ERROR; goto error; } @@ -3017,6 +3289,18 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) PQerrorMessage(st->con)); break; + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: + st->estatus = getSQLErrorStatus( + PQresultErrorField(res, PG_DIAG_SQLSTATE)); + if (canRetryError(st->estatus)) + { + if (verbose_errors) + commandError(st, PQerrorMessage(st->con)); + goto error; + } + /* fall through */ + default: /* anything else is unexpected */ pg_log_error("client %d script %d aborted in command %d query %d: %s", @@ -3055,14 +3339,14 @@ error: * of delay, in microseconds. Returns true on success, false on error. */ static bool -evaluateSleep(CState *st, int argc, char **argv, int *usecs) +evaluateSleep(Variables *variables, int argc, char **argv, int *usecs) { char *var; int usec; if (*argv[1] == ':') { - if ((var = getVariable(st, argv[1] + 1)) == NULL) + if ((var = getVariable(variables, argv[1] + 1)) == NULL) { pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[1] + 1); return false; @@ -3095,6 +3379,165 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs) return true; } + +/* + * Returns true if the error can be retried. + */ +static bool +doRetry(CState *st, pg_time_usec_t *now) +{ + Assert(st->estatus != ESTATUS_NO_ERROR); + + /* We can only retry serialization or deadlock errors. */ + if (!canRetryError(st->estatus)) + return false; + + /* + * We must have at least one option to limit the retrying of transactions + * that got an error. + */ + Assert(max_tries || latency_limit || duration > 0); + + /* + * We cannot retry the error if we have reached the maximum number of tries. + */ + if (max_tries && st->tries >= max_tries) + return false; + + /* + * We cannot retry the error if we spent too much time on this transaction. + */ + if (latency_limit) + { + pg_time_now_lazy(now); + if (*now - st->txn_scheduled > latency_limit) + return false; + } + + /* + * We cannot retry the error if the benchmark duration is over. + */ + if (timer_exceeded) + return false; + + /* OK */ + return true; +} + +/* + * Read results and discard it until a sync point. + */ +static int +discardUntilSync(CState *st) +{ + /* send a sync */ + if (!PQpipelineSync(st->con)) + { + pg_log_error("client %d aborted: failed to send a pipeline sync", + st->id); + return 0; + } + + /* receive PGRES_PIPELINE_SYNC and null following it */ + for(;;) + { + PGresult *res = PQgetResult(st->con); + if (PQresultStatus(res) == PGRES_PIPELINE_SYNC) + { + PQclear(res); + res = PQgetResult(st->con); + Assert(res == NULL); + break; + } + PQclear(res); + } + + /* exit pipline */ + if (PQexitPipelineMode(st->con) != 1) + { + pg_log_error("client %d aborted: failed to exit pipeline mode for rolling back the failed transaction", + st->id); + return 0; + } + return 1; +} + +/* + * Get the transaction status at the end of a command especially for + * checking if we are in a (failed) transaction block. + */ +static TStatus +getTransactionStatus(PGconn *con) +{ + PGTransactionStatusType tx_status; + + tx_status = PQtransactionStatus(con); + switch (tx_status) + { + case PQTRANS_IDLE: + return TSTATUS_IDLE; + case PQTRANS_INTRANS: + case PQTRANS_INERROR: + return TSTATUS_IN_BLOCK; + case PQTRANS_UNKNOWN: + /* PQTRANS_UNKNOWN is expected given a broken connection */ + if (PQstatus(con) == CONNECTION_BAD) + return TSTATUS_CONN_ERROR; + /* fall through */ + case PQTRANS_ACTIVE: + default: + /* + * We cannot find out whether we are in a transaction block or not. + * Internal error which should never occur. + */ + pg_log_error("unexpected transaction status %d", tx_status); + return TSTATUS_OTHER_ERROR; + } + + /* not reached */ + Assert(false); + return TSTATUS_OTHER_ERROR; +} + +/* + * Print verbose messages of an error + */ +static void +printVerboseErrorMessages(CState *st, pg_time_usec_t *now, bool is_retry) +{ + static PQExpBuffer buf = NULL; + + if (buf == NULL) + buf = createPQExpBuffer(); + else + resetPQExpBuffer(buf); + + printfPQExpBuffer(buf, "client %d ", st->id); + appendPQExpBuffer(buf, "%s", + (is_retry ? + "repeats the transaction after the error" : + "ends the failed transaction")); + appendPQExpBuffer(buf, " (try %d", st->tries); + + /* Print max_tries if it is not unlimitted. */ + if (max_tries) + appendPQExpBuffer(buf, "/%d", max_tries); + + /* + * If the latency limit is used, print a percentage of the current transaction + * latency from the latency limit. + */ + if (latency_limit) + { + pg_time_now_lazy(now); + appendPQExpBuffer(buf, ", %.3f%% of the maximum time of tries was used", + (100.0 * (*now - st->txn_scheduled) / latency_limit)); + } + appendPQExpBuffer(buf, ")\n"); + + pg_log_info("%s", buf->data); +} + /* * Advance the state machine of a connection. */ @@ -3132,6 +3575,10 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) st->use_file = chooseScript(thread); Assert(conditional_stack_empty(st->cstack)); + /* reset transaction variables to default values */ + st->estatus = ESTATUS_NO_ERROR; + st->tries = 1; + pg_log_debug("client %d executing script \"%s\"", st->id, sql_script[st->use_file].desc); @@ -3172,6 +3619,13 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) memset(st->prepared, 0, sizeof(st->prepared)); } + /* + * It is the first try to run this transaction. Remember the + * random state: maybe it will get an error and we will need to + * run it again. + */ + st->random_state = st->cs_func_rs; + /* record transaction start time */ st->txn_begin = now; @@ -3328,6 +3782,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * - else CSTATE_END_COMMAND */ st->state = executeMetaCommand(st, &now); + if (st->state == CSTATE_ABORTED) + st->estatus = ESTATUS_META_COMMAND_ERROR; } /* @@ -3473,6 +3929,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) st->state = CSTATE_END_COMMAND; } + else if (canRetryError(st->estatus)) + st->state = CSTATE_ERROR; else st->state = CSTATE_ABORTED; break; @@ -3520,44 +3978,223 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) break; /* - * End of transaction (end of script, really). + * Clean up after an error. */ - case CSTATE_END_TX: + case CSTATE_ERROR: + { + TStatus tstatus; - /* transaction finished: calculate latency and do log */ - processXactStats(thread, st, &now, false, agg); + Assert(st->estatus != ESTATUS_NO_ERROR); - /* - * missing \endif... cannot happen if CheckConditional was - * okay - */ - Assert(conditional_stack_empty(st->cstack)); + /* Clear the conditional stack */ + conditional_stack_reset(st->cstack); - if (is_connect) - { - pg_time_usec_t start = now; + /* Read and discard until a sync point in pipeline mode */ + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) + { + if (!discardUntilSync(st)) + { + st->state = CSTATE_ABORTED; + break; + } + } - pg_time_now_lazy(&start); - finishCon(st); - now = pg_time_now(); - thread->conn_duration += now - start; + /* + * Check if we have a (failed) transaction block or not, and + * roll it back if any. + */ + tstatus = getTransactionStatus(st->con); + if (tstatus == TSTATUS_IN_BLOCK) + { + /* Try to rollback a (failed) transaction block. */ + if (!PQsendQuery(st->con, "ROLLBACK")) + { + pg_log_error("client %d aborted: failed to send sql command for rolling back the failed transaction", + st->id); + st->state = CSTATE_ABORTED; + } + else + st->state = CSTATE_WAIT_ROLLBACK_RESULT; + } + else if (tstatus == TSTATUS_IDLE) + { + /* + * If time is over, we're done; + * otherwise, check if we can retry the error. + */ + st->state = timer_exceeded ? CSTATE_FINISHED : + doRetry(st, &now) ? CSTATE_RETRY : CSTATE_FAILURE; + } + else + { + if (tstatus == TSTATUS_CONN_ERROR) + pg_log_error("perhaps the backend died while processing"); + + pg_log_error("client %d aborted while receiving the transaction status", st->id); + st->state = CSTATE_ABORTED; + } + break; } - if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) + /* + * Wait for the rollback command to complete + */ + case CSTATE_WAIT_ROLLBACK_RESULT: { - /* script completed */ - st->state = CSTATE_FINISHED; + PGresult *res; + + pg_log_debug("client %d receiving", st->id); + if (!PQconsumeInput(st->con)) + { + pg_log_error("client %d aborted while rolling back the transaction after an error; perhaps the backend died while processing", + st->id); + st->state = CSTATE_ABORTED; + break; + } + if (PQisBusy(st->con)) + return; /* don't have the whole result yet */ + + /* + * Read and discard the query result; + */ + res = PQgetResult(st->con); + switch (PQresultStatus(res)) + { + case PGRES_COMMAND_OK: + /* OK */ + PQclear(res); + /* null must be returned */ + res = PQgetResult(st->con); + Assert(res == NULL); + + /* + * If time is over, we're done; + * otherwise, check if we can retry the error. + */ + st->state = timer_exceeded ? CSTATE_FINISHED : + doRetry(st, &now) ? CSTATE_RETRY : CSTATE_FAILURE; + break; + default: + pg_log_error("client %d aborted while rolling back the transaction after an error; %s", + st->id, PQerrorMessage(st->con)); + PQclear(res); + st->state = CSTATE_ABORTED; + break; + } break; } - /* next transaction (script) */ - st->state = CSTATE_CHOOSE_SCRIPT; + /* + * Retry the transaction after an error. + */ + case CSTATE_RETRY: + command = sql_script[st->use_file].commands[st->command]; /* - * Ensure that we always return on this point, so as to avoid - * an infinite loop if the script only contains meta commands. + * Inform that the transaction will be retried after the error. */ - return; + if (verbose_errors) + printVerboseErrorMessages(st, &now, true); + + /* Count tries and retries */ + st->tries++; + command->retries++; + + /* + * Reset the random state as they were at the beginning + * of the transaction. + */ + st->cs_func_rs = st->random_state; + + /* Process the first transaction command. */ + st->command = 0; + st->estatus = ESTATUS_NO_ERROR; + st->state = CSTATE_START_COMMAND; + break; + + /* + * Record a failed transaction. + */ + case CSTATE_FAILURE: + command = sql_script[st->use_file].commands[st->command]; + + /* Accumulate the failure. */ + command->failures++; + + /* + * Inform that the failed transaction will not be retried. + */ + if (verbose_errors) + printVerboseErrorMessages(st, &now, false); + + /* End the failed transaction. */ + st->state = CSTATE_END_TX; + break; + + /* + * End of transaction (end of script, really). + */ + case CSTATE_END_TX: + { + TStatus tstatus; + + /* transaction finished: calculate latency and do log */ + processXactStats(thread, st, &now, false, agg); + + /* + * missing \endif... cannot happen if CheckConditional was + * okay + */ + Assert(conditional_stack_empty(st->cstack)); + + /* + * We must complete all the transaction blocks that were + * started in this script. + */ + tstatus = getTransactionStatus(st->con); + if (tstatus == TSTATUS_IN_BLOCK) + { + pg_log_error("client %d aborted: end of script reached without completing the last transaction", + st->id); + st->state = CSTATE_ABORTED; + break; + } + else if (tstatus != TSTATUS_IDLE) + { + if (tstatus == TSTATUS_CONN_ERROR) + pg_log_error("perhaps the backend died while processing"); + + pg_log_error("client %d aborted while receiving the transaction status", st->id); + st->state = CSTATE_ABORTED; + break; + } + + if (is_connect) + { + pg_time_usec_t start = now; + + pg_time_now_lazy(&start); + finishCon(st); + now = pg_time_now(); + thread->conn_duration += now - start; + } + + if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) + { + /* script completed */ + st->state = CSTATE_FINISHED; + break; + } + + /* next transaction (script) */ + st->state = CSTATE_CHOOSE_SCRIPT; + + /* + * Ensure that we always return on this point, so as to avoid + * an infinite loop if the script only contains meta commands. + */ + return; + } /* * Final states. Close the connection if it's still open. @@ -3627,7 +4264,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) * latency will be recorded in CSTATE_SLEEP state, not here, after the * delay has elapsed.) */ - if (!evaluateSleep(st, argc, argv, &usec)) + if (!evaluateSleep(&st->variables, argc, argv, &usec)) { commandFailed(st, "sleep", "execution of meta-command failed"); return CSTATE_ABORTED; @@ -3648,7 +4285,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } - if (!putVariableValue(st, argv[0], argv[1], &result)) + if (!putVariableValue(&st->variables, argv[0], argv[1], &result)) { commandFailed(st, "set", "assignment of meta-command failed"); return CSTATE_ABORTED; @@ -3718,7 +4355,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) } else if (command->meta == META_SETSHELL) { - if (!runShellCommand(st, argv[1], argv + 2, argc - 2)) + if (!runShellCommand(&st->variables, argv[1], argv + 2, argc - 2)) { commandFailed(st, "setshell", "execution of meta-command failed"); return CSTATE_ABORTED; @@ -3726,7 +4363,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) } else if (command->meta == META_SHELL) { - if (!runShellCommand(st, NULL, argv + 1, argc - 1)) + if (!runShellCommand(&st->variables, NULL, argv + 1, argc - 1)) { commandFailed(st, "shell", "execution of meta-command failed"); return CSTATE_ABORTED; @@ -3782,6 +4419,43 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) } /* + * Return the number fo failed transactions. + */ +static int64 +getFailures(const StatsData *stats) +{ + return (stats->serialization_failures + + stats->deadlock_failures); +} + +/* + * Return a string constant representing the result of a transaction + * that is not successfully processed. + */ +static const char * +getResultString(bool skipped, EStatus estatus) +{ + if (skipped) + return "skipped"; + else if (failures_detailed) + { + switch (estatus) + { + case ESTATUS_SERIALIZATION_ERROR: + return "serialization"; + case ESTATUS_DEADLOCK_ERROR: + return "deadlock"; + default: + /* internal error which should never occur */ + pg_log_fatal("unexpected error status: %d", estatus); + exit(1); + } + } + else + return "failed"; +} + +/* * Print log entry after completing one transaction. * * We print Unix-epoch timestamps in the log, so that entries can be @@ -3828,6 +4502,14 @@ doLog(TState *thread, CState *st, agg->latency.sum2, agg->latency.min, agg->latency.max); + + if (failures_detailed) + fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT, + agg->serialization_failures, + agg->deadlock_failures); + else + fprintf(logfile, " " INT64_FORMAT, getFailures(agg)); + if (throttle_delay) { fprintf(logfile, " %.0f %.0f %.0f %.0f", @@ -3838,6 +4520,10 @@ doLog(TState *thread, CState *st, if (latency_limit) fprintf(logfile, " " INT64_FORMAT, agg->skipped); } + if (max_tries != 1) + fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT, + agg->retried, + agg->retries); fputc('\n', logfile); /* reset data and move to next interval */ @@ -3845,22 +4531,26 @@ doLog(TState *thread, CState *st, } /* accumulate the current transaction */ - accumStats(agg, skipped, latency, lag); + accumStats(agg, skipped, latency, lag, st->estatus, st->tries); } else { /* no, print raw transactions */ - if (skipped) - fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " " - INT64_FORMAT, - st->id, st->cnt, st->use_file, now / 1000000, now % 1000000); - else + if (!skipped && st->estatus == ESTATUS_NO_ERROR) fprintf(logfile, "%d " INT64_FORMAT " %.0f %d " INT64_FORMAT " " INT64_FORMAT, st->id, st->cnt, latency, st->use_file, now / 1000000, now % 1000000); + else + fprintf(logfile, "%d " INT64_FORMAT " %s %d " INT64_FORMAT " " + INT64_FORMAT, + st->id, st->cnt, getResultString(skipped, st->estatus), + st->use_file, now / 1000000, now % 1000000); + if (throttle_delay) fprintf(logfile, " %.0f", lag); + if (max_tries != 1) + fprintf(logfile, " %d", st->tries - 1); fputc('\n', logfile); } } @@ -3869,7 +4559,8 @@ doLog(TState *thread, CState *st, * Accumulate and report statistics at end of a transaction. * * (This is also called when a transaction is late and thus skipped. - * Note that even skipped transactions are counted in the "cnt" fields.) + * Note that even skipped and failed transactions are counted in the CState + * "cnt" field.) */ static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now, @@ -3877,10 +4568,10 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now, { double latency = 0.0, lag = 0.0; - bool thread_details = progress || throttle_delay || latency_limit, - detailed = thread_details || use_log || per_script_stats; + bool detailed = progress || throttle_delay || latency_limit || + use_log || per_script_stats; - if (detailed && !skipped) + if (detailed && !skipped && st->estatus == ESTATUS_NO_ERROR) { pg_time_now_lazy(now); @@ -3889,20 +4580,12 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now, lag = st->txn_begin - st->txn_scheduled; } - if (thread_details) - { - /* keep detailed thread stats */ - accumStats(&thread->stats, skipped, latency, lag); + /* keep detailed thread stats */ + accumStats(&thread->stats, skipped, latency, lag, st->estatus, st->tries); - /* count transactions over the latency limit, if needed */ - if (latency_limit && latency > latency_limit) - thread->latency_late++; - } - else - { - /* no detailed stats, just count */ - thread->stats.cnt++; - } + /* count transactions over the latency limit, if needed */ + if (latency_limit && latency > latency_limit) + thread->latency_late++; /* client stat is just counting */ st->cnt++; @@ -3912,7 +4595,8 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now, /* XXX could use a mutex here, but we choose not to */ if (per_script_stats) - accumStats(&sql_script[st->use_file].stats, skipped, latency, lag); + accumStats(&sql_script[st->use_file].stats, skipped, latency, lag, + st->estatus, st->tries); } @@ -4771,6 +5455,8 @@ create_sql_command(PQExpBuffer buf, const char *source) my_command->type = SQL_COMMAND; my_command->meta = META_NONE; my_command->argc = 0; + my_command->retries = 0; + my_command->failures = 0; memset(my_command->argv, 0, sizeof(my_command->argv)); my_command->varprefix = NULL; /* allocated later, if needed */ my_command->expr = NULL; @@ -5439,7 +6125,9 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, { /* generate and show report */ pg_time_usec_t run = now - *last_report; - int64 ntx; + int64 cnt, + failures, + retried; double tps, total_run, latency, @@ -5466,23 +6154,30 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, mergeSimpleStats(&cur.lag, &threads[i].stats.lag); cur.cnt += threads[i].stats.cnt; cur.skipped += threads[i].stats.skipped; + cur.retries += threads[i].stats.retries; + cur.retried += threads[i].stats.retried; + cur.serialization_failures += + threads[i].stats.serialization_failures; + cur.deadlock_failures += threads[i].stats.deadlock_failures; } /* we count only actually executed transactions */ - ntx = (cur.cnt - cur.skipped) - (last->cnt - last->skipped); + cnt = cur.cnt - last->cnt; total_run = (now - test_start) / 1000000.0; - tps = 1000000.0 * ntx / run; - if (ntx > 0) + tps = 1000000.0 * cnt / run; + if (cnt > 0) { - latency = 0.001 * (cur.latency.sum - last->latency.sum) / ntx; - sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / ntx; + latency = 0.001 * (cur.latency.sum - last->latency.sum) / cnt; + sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / cnt; stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (cur.lag.sum - last->lag.sum) / ntx; + lag = 0.001 * (cur.lag.sum - last->lag.sum) / cnt; } else { latency = sqlat = stdev = lag = 0; } + failures = getFailures(&cur) - getFailures(last); + retried = cur.retried - last->retried; if (progress_timestamp) { @@ -5496,8 +6191,8 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, } fprintf(stderr, - "progress: %s, %.1f tps, lat %.3f ms stddev %.3f", - tbuf, tps, latency, stdev); + "progress: %s, %.1f tps, lat %.3f ms stddev %.3f, " INT64_FORMAT " failed", + tbuf, tps, latency, stdev, failures); if (throttle_delay) { @@ -5506,6 +6201,12 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, fprintf(stderr, ", " INT64_FORMAT " skipped", cur.skipped - last->skipped); } + + /* it can be non-zero only if max_tries is not equal to one */ + if (max_tries != 1) + fprintf(stderr, + ", " INT64_FORMAT " retried, " INT64_FORMAT " retries", + retried, cur.retries - last->retries); fprintf(stderr, "\n"); *last = cur; @@ -5565,9 +6266,10 @@ printResults(StatsData *total, int64 latency_late) { /* tps is about actually executed transactions during benchmarking */ - int64 ntx = total->cnt - total->skipped; + int64 failures = getFailures(total); + int64 total_cnt = total->cnt + total->skipped + failures; double bench_duration = PG_TIME_GET_DOUBLE(total_duration); - double tps = ntx / bench_duration; + double tps = total->cnt / bench_duration; /* Report test parameters. */ printf("transaction type: %s\n", @@ -5580,39 +6282,65 @@ printResults(StatsData *total, printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); printf("number of threads: %d\n", nthreads); + + if (max_tries) + printf("maximum number of tries: %d\n", max_tries); + if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); printf("number of transactions actually processed: " INT64_FORMAT "/%d\n", - ntx, nxacts * nclients); + total->cnt, nxacts * nclients); } else { printf("duration: %d s\n", duration); printf("number of transactions actually processed: " INT64_FORMAT "\n", - ntx); + total->cnt); + } + + printf("number of failed transactions: " INT64_FORMAT " (%.3f%%)\n", + failures, 100.0 * failures / total_cnt); + + if (failures_detailed) + { + printf("number of serialization failures: " INT64_FORMAT " (%.3f%%)\n", + total->serialization_failures, + 100.0 * total->serialization_failures / total_cnt); + printf("number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n", + total->deadlock_failures, + 100.0 * total->deadlock_failures / total_cnt); + } + + /* it can be non-zero only if max_tries is not equal to one */ + if (max_tries != 1) + { + printf("number of transactions retried: " INT64_FORMAT " (%.3f%%)\n", + total->retried, 100.0 * total->retried / total_cnt); + printf("total number of retries: " INT64_FORMAT "\n", total->retries); } /* Remaining stats are nonsensical if we failed to execute any xacts */ - if (total->cnt <= 0) + if (total->cnt + total->skipped <= 0) return; if (throttle_delay && latency_limit) printf("number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n", - total->skipped, 100.0 * total->skipped / total->cnt); + total->skipped, 100.0 * total->skipped / total_cnt); if (latency_limit) printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f%%)\n", - latency_limit / 1000.0, latency_late, ntx, - (ntx > 0) ? 100.0 * latency_late / ntx : 0.0); + latency_limit / 1000.0, latency_late, total->cnt, + (total->cnt > 0) ? 100.0 * latency_late / total->cnt : 0.0); if (throttle_delay || progress || latency_limit) printSimpleStats("latency", &total->latency); else { /* no measurement, show average latency computed from run time */ - printf("latency average = %.3f ms\n", - 0.001 * total_duration * nclients / total->cnt); + printf("latency average = %.3f ms%s\n", + 0.001 * total_duration * nclients / total_cnt, + failures > 0 ? " (including failures)" : ""); } if (throttle_delay) @@ -5638,7 +6366,7 @@ printResults(StatsData *total, */ if (is_connect) { - printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / total->cnt); + printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / (total->cnt + failures)); printf("tps = %f (including reconnection times)\n", tps); } else @@ -5657,6 +6385,9 @@ printResults(StatsData *total, if (per_script_stats) { StatsData *sstats = &sql_script[i].stats; + int64 script_failures = getFailures(sstats); + int64 script_total_cnt = + sstats->cnt + sstats->skipped + script_failures; printf("SQL script %d: %s\n" " - weight: %d (targets %.1f%% of total)\n" @@ -5666,25 +6397,55 @@ printResults(StatsData *total, 100.0 * sql_script[i].weight / total_weight, sstats->cnt, 100.0 * sstats->cnt / total->cnt, - (sstats->cnt - sstats->skipped) / bench_duration); + sstats->cnt / bench_duration); + + printf(" - number of failed transactions: " INT64_FORMAT " (%.3f%%)\n", + script_failures, + 100.0 * script_failures / script_total_cnt); + + if (failures_detailed) + { + printf(" - number of serialization failures: " INT64_FORMAT " (%.3f%%)\n", + sstats->serialization_failures, + (100.0 * sstats->serialization_failures / + script_total_cnt)); + printf(" - number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n", + sstats->deadlock_failures, + (100.0 * sstats->deadlock_failures / + script_total_cnt)); + } + + /* it can be non-zero only if max_tries is not equal to one */ + if (max_tries != 1) + { + printf(" - number of transactions retried: " INT64_FORMAT " (%.3f%%)\n", + sstats->retried, + 100.0 * sstats->retried / script_total_cnt); + printf(" - total number of retries: " INT64_FORMAT "\n", + sstats->retries); + } - if (throttle_delay && latency_limit && sstats->cnt > 0) + if (throttle_delay && latency_limit && script_total_cnt > 0) printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n", sstats->skipped, - 100.0 * sstats->skipped / sstats->cnt); + 100.0 * sstats->skipped / script_total_cnt); printSimpleStats(" - latency", &sstats->latency); } - /* Report per-command latencies */ + /* + * Report per-command statistics: latencies, retries after errors, + * failures (errors without retrying). + */ if (report_per_command) { Command **commands; - if (per_script_stats) - printf(" - statement latencies in milliseconds:\n"); - else - printf("statement latencies in milliseconds:\n"); + printf("%sstatement latencies in milliseconds%s:\n", + per_script_stats ? " - " : "", + (max_tries == 1 ? + " and failures" : + ", failures and retries")); for (commands = sql_script[i].commands; *commands != NULL; @@ -5692,10 +6453,19 @@ printResults(StatsData *total, { SimpleStats *cstats = &(*commands)->stats; - printf(" %11.3f %s\n", - (cstats->count > 0) ? - 1000.0 * cstats->sum / cstats->count : 0.0, - (*commands)->first_line); + if (max_tries == 1) + printf(" %11.3f %10" INT64_MODIFIER "d %s\n", + (cstats->count > 0) ? + 1000.0 * cstats->sum / cstats->count : 0.0, + (*commands)->failures, + (*commands)->first_line); + else + printf(" %11.3f %10" INT64_MODIFIER "d %10" INT64_MODIFIER "d %s\n", + (cstats->count > 0) ? + 1000.0 * cstats->sum / cstats->count : 0.0, + (*commands)->failures, + (*commands)->retries, + (*commands)->first_line); } } } @@ -5775,7 +6545,7 @@ main(int argc, char **argv) {"progress", required_argument, NULL, 'P'}, {"protocol", required_argument, NULL, 'M'}, {"quiet", no_argument, NULL, 'q'}, - {"report-latencies", no_argument, NULL, 'r'}, + {"report-per-command", no_argument, NULL, 'r'}, {"rate", required_argument, NULL, 'R'}, {"scale", required_argument, NULL, 's'}, {"select-only", no_argument, NULL, 'S'}, @@ -5797,6 +6567,9 @@ main(int argc, char **argv) {"show-script", required_argument, NULL, 10}, {"partitions", required_argument, NULL, 11}, {"partition-method", required_argument, NULL, 12}, + {"failures-detailed", no_argument, NULL, 13}, + {"max-tries", required_argument, NULL, 14}, + {"verbose-errors", no_argument, NULL, 15}, {NULL, 0, NULL, 0} }; @@ -6020,7 +6793,7 @@ main(int argc, char **argv) } *p++ = '\0'; - if (!putVariable(&state[0], "option", optarg, p)) + if (!putVariable(&state[0].variables, "option", optarg, p)) exit(1); } break; @@ -6150,6 +6923,28 @@ main(int argc, char **argv) exit(1); } break; + case 13: /* failures-detailed */ + benchmarking_option_set = true; + failures_detailed = true; + break; + case 14: /* max-tries */ + { + int32 max_tries_arg = atoi(optarg); + + if (max_tries_arg < 0) + { + pg_log_fatal("invalid number of maximum tries: \"%s\"", optarg); + exit(1); + } + + benchmarking_option_set = true; + max_tries = (uint32) max_tries_arg; + } + break; + case 15: /* verbose-errors */ + benchmarking_option_set = true; + verbose_errors = true; + break; default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -6331,6 +7126,15 @@ main(int argc, char **argv) exit(1); } + if (!max_tries) + { + if (!latency_limit && duration <= 0) + { + pg_log_fatal("an unlimited number of transaction tries can only be used with --latency-limit or a duration (-T)"); + exit(1); + } + } + /* * save main process id in the global variable because process id will be * changed after fork. @@ -6348,19 +7152,19 @@ main(int argc, char **argv) int j; state[i].id = i; - for (j = 0; j < state[0].nvariables; j++) + for (j = 0; j < state[0].variables.nvars; j++) { - Variable *var = &state[0].variables[j]; + Variable *var = &state[0].variables.vars[j]; if (var->value.type != PGBT_NO_VALUE) { - if (!putVariableValue(&state[i], "startup", + if (!putVariableValue(&state[i].variables, "startup", var->name, &var->value)) exit(1); } else { - if (!putVariable(&state[i], "startup", + if (!putVariable(&state[i].variables, "startup", var->name, var->svalue)) exit(1); } @@ -6398,11 +7202,11 @@ main(int argc, char **argv) * :scale variables normally get -s or database scale, but don't override * an explicit -D switch */ - if (lookupVariable(&state[0], "scale") == NULL) + if (lookupVariable(&state[0].variables, "scale") == NULL) { for (i = 0; i < nclients; i++) { - if (!putVariableInt(&state[i], "startup", "scale", scale)) + if (!putVariableInt(&state[i].variables, "startup", "scale", scale)) exit(1); } } @@ -6411,28 +7215,30 @@ main(int argc, char **argv) * Define a :client_id variable that is unique per connection. But don't * override an explicit -D switch. */ - if (lookupVariable(&state[0], "client_id") == NULL) + if (lookupVariable(&state[0].variables, "client_id") == NULL) { for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i], "startup", "client_id", i)) + if (!putVariableInt(&state[i].variables, "startup", "client_id", i)) exit(1); } /* set default seed for hash functions */ - if (lookupVariable(&state[0], "default_seed") == NULL) + if (lookupVariable(&state[0].variables, "default_seed") == NULL) { uint64 seed = pg_prng_uint64(&base_random_sequence); for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed)) + if (!putVariableInt(&state[i].variables, "startup", "default_seed", + (int64) seed)) exit(1); } /* set random seed unless overwritten */ - if (lookupVariable(&state[0], "random_seed") == NULL) + if (lookupVariable(&state[0].variables, "random_seed") == NULL) { for (i = 0; i < nclients; i++) - if (!putVariableInt(&state[i], "startup", "random_seed", random_seed)) + if (!putVariableInt(&state[i].variables, "startup", "random_seed", + random_seed)) exit(1); } @@ -6541,6 +7347,10 @@ main(int argc, char **argv) mergeSimpleStats(&stats.lag, &thread->stats.lag); stats.cnt += thread->stats.cnt; stats.skipped += thread->stats.skipped; + stats.retries += thread->stats.retries; + stats.retried += thread->stats.retried; + stats.serialization_failures += thread->stats.serialization_failures; + stats.deadlock_failures += thread->stats.deadlock_failures; latency_late += thread->latency_late; conn_total_duration += thread->conn_duration; @@ -6687,7 +7497,8 @@ threadRun(void *arg) if (min_usec > this_usec) min_usec = this_usec; } - else if (st->state == CSTATE_WAIT_RESULT) + else if (st->state == CSTATE_WAIT_RESULT || + st->state == CSTATE_WAIT_ROLLBACK_RESULT) { /* * waiting for result from server - nothing to do unless the @@ -6776,7 +7587,8 @@ threadRun(void *arg) { CState *st = &state[i]; - if (st->state == CSTATE_WAIT_RESULT) + if (st->state == CSTATE_WAIT_RESULT || + st->state == CSTATE_WAIT_ROLLBACK_RESULT) { /* don't call advanceConnectionState unless data is available */ int sock = PQsocket(st->con); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index f1341092fee..d173ceae7ac 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -11,7 +11,9 @@ use Config; # start a pgbench specific server my $node = PostgreSQL::Test::Cluster->new('main'); -$node->init; +# Set to untranslated messages, to be able to compare program output with +# expected strings. +$node->init(extra => [ '--locale', 'C' ]); $node->start; # tablespace for testing, because partitioned tables cannot use pg_default @@ -109,7 +111,8 @@ $node->pgbench( qr{builtin: TPC-B}, qr{clients: 2\b}, qr{processed: 10/10}, - qr{mode: simple} + qr{mode: simple}, + qr{maximum number of tries: 1} ], [qr{^$}], 'pgbench tpcb-like'); @@ -1198,6 +1201,214 @@ $node->pgbench( check_pgbench_logs($bdir, '001_pgbench_log_3', 1, 10, 10, qr{^0 \d{1,2} \d+ \d \d+ \d+$}); +# abortion of the client if the script contains an incomplete transaction block +$node->pgbench( + '--no-vacuum', 2, [ qr{processed: 1/10} ], + [ qr{client 0 aborted: end of script reached without completing the last transaction} ], + 'incomplete transaction block', + { '001_pgbench_incomplete_transaction_block' => q{BEGIN;SELECT 1;} }); + +# Test the concurrent update in the table row and deadlocks. + +$node->safe_psql('postgres', + 'CREATE UNLOGGED TABLE first_client_table (value integer); ' + . 'CREATE UNLOGGED TABLE xy (x integer, y integer); ' + . 'INSERT INTO xy VALUES (1, 2);'); + +# Serialization error and retry + +local $ENV{PGOPTIONS} = "-c default_transaction_isolation=repeatable\\ read"; + +# Check that we have a serialization error and the same random value of the +# delta variable in the next try +my $err_pattern = + "client (0|1) got an error in command 3 \\(SQL\\) of script 0; " + . "ERROR: could not serialize access due to concurrent update\\b.*" + . "\\g1"; + +$node->pgbench( + "-n -c 2 -t 1 -d --verbose-errors --max-tries 2", + 0, + [ qr{processed: 2/2\b}, qr{number of transactions retried: 1\b}, + qr{total number of retries: 1\b} ], + [ qr/$err_pattern/s ], + 'concurrent update with retrying', + { + '001_pgbench_serialization' => q{ +-- What's happening: +-- The first client starts the transaction with the isolation level Repeatable +-- Read: +-- +-- BEGIN; +-- UPDATE xy SET y = ... WHERE x = 1; +-- +-- The second client starts a similar transaction with the same isolation level: +-- +-- BEGIN; +-- UPDATE xy SET y = ... WHERE x = 1; +-- <waiting for the first client> +-- +-- The first client commits its transaction, and the second client gets a +-- serialization error. + +\set delta random(-5000, 5000) + +-- The second client will stop here +SELECT pg_advisory_lock(0); + +-- Start transaction with concurrent update +BEGIN; +UPDATE xy SET y = y + :delta WHERE x = 1 AND pg_advisory_lock(1) IS NOT NULL; + +-- Wait for the second client +DO $$ +DECLARE + exists boolean; + waiters integer; +BEGIN + -- The second client always comes in second, and the number of rows in the + -- table first_client_table reflect this. Here the first client inserts a row, + -- so the second client will see a non-empty table when repeating the + -- transaction after the serialization error. + SELECT EXISTS (SELECT * FROM first_client_table) INTO STRICT exists; + IF NOT exists THEN + -- Let the second client begin + PERFORM pg_advisory_unlock(0); + -- And wait until the second client tries to get the same lock + LOOP + SELECT COUNT(*) INTO STRICT waiters FROM pg_locks WHERE + locktype = 'advisory' AND objsubid = 1 AND + ((classid::bigint << 32) | objid::bigint = 1::bigint) AND NOT granted; + IF waiters = 1 THEN + INSERT INTO first_client_table VALUES (1); + + -- Exit loop + EXIT; + END IF; + END LOOP; + END IF; +END$$; + +COMMIT; +SELECT pg_advisory_unlock_all(); +} + }); + +# Clean up + +$node->safe_psql('postgres', 'DELETE FROM first_client_table;'); + +local $ENV{PGOPTIONS} = "-c default_transaction_isolation=read\\ committed"; + +# Deadlock error and retry + +# Check that we have a deadlock error +$err_pattern = + "client (0|1) got an error in command (3|5) \\(SQL\\) of script 0; " + . "ERROR: deadlock detected\\b"; + +$node->pgbench( + "-n -c 2 -t 1 --max-tries 2 --verbose-errors", + 0, + [ qr{processed: 2/2\b}, qr{number of transactions retried: 1\b}, + qr{total number of retries: 1\b} ], + [ qr{$err_pattern} ], + 'deadlock with retrying', + { + '001_pgbench_deadlock' => q{ +-- What's happening: +-- The first client gets the lock 2. +-- The second client gets the lock 3 and tries to get the lock 2. +-- The first client tries to get the lock 3 and one of them gets a deadlock +-- error. +-- +-- A client that does not get a deadlock error must hold a lock at the +-- transaction start. Thus in the end it releases all of its locks before the +-- client with the deadlock error starts a retry (we do not want any errors +-- again). + +-- Since the client with the deadlock error has not released the blocking locks, +-- let's do this here. +SELECT pg_advisory_unlock_all(); + +-- The second client and the client with the deadlock error stop here +SELECT pg_advisory_lock(0); +SELECT pg_advisory_lock(1); + +-- The second client and the client with the deadlock error always come after +-- the first and the number of rows in the table first_client_table reflects +-- this. Here the first client inserts a row, so in the future the table is +-- always non-empty. +DO $$ +DECLARE + exists boolean; +BEGIN + SELECT EXISTS (SELECT * FROM first_client_table) INTO STRICT exists; + IF exists THEN + -- We are the second client or the client with the deadlock error + + -- The first client will take care by itself of this lock (see below) + PERFORM pg_advisory_unlock(0); + + PERFORM pg_advisory_lock(3); + + -- The second client can get a deadlock here + PERFORM pg_advisory_lock(2); + ELSE + -- We are the first client + + -- This code should not be used in a new transaction after an error + INSERT INTO first_client_table VALUES (1); + + PERFORM pg_advisory_lock(2); + END IF; +END$$; + +DO $$ +DECLARE + num_rows integer; + waiters integer; +BEGIN + -- Check if we are the first client + SELECT COUNT(*) FROM first_client_table INTO STRICT num_rows; + IF num_rows = 1 THEN + -- This code should not be used in a new transaction after an error + INSERT INTO first_client_table VALUES (2); + + -- Let the second client begin + PERFORM pg_advisory_unlock(0); + PERFORM pg_advisory_unlock(1); + + -- Make sure the second client is ready for deadlock + LOOP + SELECT COUNT(*) INTO STRICT waiters FROM pg_locks WHERE + locktype = 'advisory' AND + objsubid = 1 AND + ((classid::bigint << 32) | objid::bigint = 2::bigint) AND + NOT granted; + + IF waiters = 1 THEN + -- Exit loop + EXIT; + END IF; + END LOOP; + + PERFORM pg_advisory_lock(0); + -- And the second client took care by itself of the lock 1 + END IF; +END$$; + +-- The first client can get a deadlock here +SELECT pg_advisory_lock(3); + +SELECT pg_advisory_unlock_all(); +} + }); + +# Clean up +$node->safe_psql('postgres', 'DROP TABLE first_client_table, xy;'); + + # done $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts'); $node->stop; diff --git a/src/bin/pgbench/t/002_pgbench_no_server.pl b/src/bin/pgbench/t/002_pgbench_no_server.pl index acad19edd0c..a5074c70d9d 100644 --- a/src/bin/pgbench/t/002_pgbench_no_server.pl +++ b/src/bin/pgbench/t/002_pgbench_no_server.pl @@ -188,6 +188,16 @@ my @options = ( '-i --partition-method=hash', [qr{partition-method requires greater than zero --partitions}] ], + [ + 'bad maximum number of tries', + '--max-tries -10', + [qr{invalid number of maximum tries: "-10"}] + ], + [ + 'an infinite number of tries', + '--max-tries 0', + [qr{an unlimited number of transaction tries can only be used with --latency-limit or a duration}] + ], # logging sub-options [ diff --git a/src/fe_utils/conditional.c b/src/fe_utils/conditional.c index 0bf877e895b..5a946649897 100644 --- a/src/fe_utils/conditional.c +++ b/src/fe_utils/conditional.c @@ -24,13 +24,25 @@ conditional_stack_create(void) } /* - * destroy stack + * Destroy all the elements from the stack. The stack itself is not freed. */ void -conditional_stack_destroy(ConditionalStack cstack) +conditional_stack_reset(ConditionalStack cstack) { + if (!cstack) + return; /* nothing to do here */ + while (conditional_stack_pop(cstack)) continue; +} + +/* + * destroy stack + */ +void +conditional_stack_destroy(ConditionalStack cstack) +{ + conditional_stack_reset(cstack); free(cstack); } diff --git a/src/include/fe_utils/conditional.h b/src/include/fe_utils/conditional.h index b28189471cd..fa53d86501b 100644 --- a/src/include/fe_utils/conditional.h +++ b/src/include/fe_utils/conditional.h @@ -73,6 +73,8 @@ typedef struct ConditionalStackData *ConditionalStack; extern ConditionalStack conditional_stack_create(void); +extern void conditional_stack_reset(ConditionalStack cstack); + extern void conditional_stack_destroy(ConditionalStack cstack); extern int conditional_stack_depth(ConditionalStack cstack); |