diff options
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r-- | contrib/pgbench/pgbench.c | 231 |
1 files changed, 175 insertions, 56 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index e9431ee7860..3453a1f7a6b 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -141,6 +141,14 @@ double sample_rate = 0.0; int64 throttle_delay = 0; /* + * Transactions which take longer than this limit (in usec) are counted as + * late, and reported as such, although they are completed anyway. When + * throttling is enabled, execution time slots that are more than this late + * are skipped altogether, and counted separately. + */ +int64 latency_limit = 0; + +/* * tablespace selection */ char *tablespace = NULL; @@ -238,6 +246,8 @@ typedef struct int64 throttle_trigger; /* previous/next throttling (us) */ int64 throttle_lag; /* total transaction lag behind throttling */ int64 throttle_lag_max; /* max transaction lag */ + int64 throttle_latency_skipped; /* lagging transactions skipped */ + int64 latency_late; /* late transactions */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -250,6 +260,8 @@ typedef struct int64 sqlats; int64 throttle_lag; int64 throttle_lag_max; + int64 throttle_latency_skipped; + int64 latency_late; } TResult; /* @@ -284,6 +296,8 @@ typedef struct long start_time; /* when does the interval start */ int cnt; /* number of transactions */ + int skipped; /* number of transactions skipped under + * --rate and --latency-limit */ double min_latency; /* min/max latencies */ double max_latency; @@ -348,7 +362,7 @@ static void setalarm(int seconds); static void *threadRun(void *arg); static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, - AggVals *agg); + AggVals *agg, bool skipped); static void usage(void) @@ -375,6 +389,8 @@ usage(void) " -f, --file=FILENAME read transaction script from FILENAME\n" " -j, --jobs=NUM number of threads (default: 1)\n" " -l, --log write transaction times to log file\n" + " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n" + " as late.\n" " -M, --protocol=simple|extended|prepared\n" " protocol for submitting queries (default: simple)\n" " -n, --no-vacuum do not run VACUUM before tests\n" @@ -994,7 +1010,9 @@ void agg_vals_init(AggVals *aggs, instr_time start) { /* basic counters */ - aggs->cnt = 0; /* number of transactions */ + aggs->cnt = 0; /* number of transactions (includes skipped) */ + aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */ + aggs->sum_latency = 0; /* SUM(latency) */ aggs->sum2_latency = 0; /* SUM(latency*latency) */ @@ -1050,8 +1068,34 @@ top: int64 wait = getPoissonRand(thread, throttle_delay); thread->throttle_trigger += wait; - st->txn_scheduled = thread->throttle_trigger; + + /* + * If this --latency-limit is used, and this slot is already late so + * that the transaction will miss the latency limit even if it + * completed immediately, we skip this time slot and iterate till the + * next slot that isn't late yet. + */ + if (latency_limit) + { + int64 now_us; + + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + now_us = INSTR_TIME_GET_MICROSEC(now); + while (thread->throttle_trigger < now_us - latency_limit) + { + thread->throttle_latency_skipped++; + + if (logfile) + doLog(thread, st, logfile, &now, agg, true); + + wait = getPoissonRand(thread, throttle_delay); + thread->throttle_trigger += wait; + st->txn_scheduled = thread->throttle_trigger; + } + } + st->sleeping = 1; st->throttling = true; st->is_throttled = true; @@ -1119,12 +1163,13 @@ top: if (commands[st->state + 1] == NULL) { /* only calculate latency if an option is used that needs it */ - if (progress || throttle_delay) + if (progress || throttle_delay || latency_limit) { int64 latency; if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); + latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled; st->txn_latencies += latency; @@ -1137,11 +1182,15 @@ top: * transactions, overflow would take 256 hours. */ st->txn_sqlats += latency * latency; + + /* record over the limit transactions if needed. */ + if (latency_limit && latency > latency_limit) + thread->latency_late++; } /* record the time it took in the log */ if (logfile) - doLog(thread, st, logfile, &now, agg); + doLog(thread, st, logfile, &now, agg, false); } if (commands[st->state]->type == SQL_COMMAND) @@ -1227,7 +1276,7 @@ top: } /* Record transaction start time under logging, progress or throttling */ - if ((logfile || progress || throttle_delay) && st->state == 0) + if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0) { INSTR_TIME_SET_CURRENT(st->txn_begin); @@ -1605,7 +1654,8 @@ top: * print log entry after completing one transaction. */ static void -doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) +doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, + bool skipped) { double lag; double latency; @@ -1622,7 +1672,10 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) INSTR_TIME_SET_CURRENT(*now); latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled); - lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled); + if (skipped) + lag = latency; + else + lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled); /* should we aggregate the results or not? */ if (agg_interval > 0) @@ -1634,26 +1687,34 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now)) { agg->cnt += 1; - agg->sum_latency += latency; - agg->sum2_latency += latency * latency; + if (skipped) + { + /* there is no latency to record if the transaction was skipped */ + agg->skipped += 1; + } + else + { + agg->sum_latency += latency; + agg->sum2_latency += latency * latency; - /* first in this aggregation interval */ - if ((agg->cnt == 1) || (latency < agg->min_latency)) - agg->min_latency = latency; + /* first in this aggregation interval */ + if ((agg->cnt == 1) || (latency < agg->min_latency)) + agg->min_latency = latency; - if ((agg->cnt == 1) || (latency > agg->max_latency)) - agg->max_latency = latency; + if ((agg->cnt == 1) || (latency > agg->max_latency)) + agg->max_latency = latency; - /* and the same for schedule lag */ - if (throttle_delay) - { - agg->sum_lag += lag; - agg->sum2_lag += lag * lag; + /* and the same for schedule lag */ + if (throttle_delay) + { + agg->sum_lag += lag; + agg->sum2_lag += lag * lag; - if ((agg->cnt == 1) || (lag < agg->min_lag)) - agg->min_lag = lag; - if ((agg->cnt == 1) || (lag > agg->max_lag)) - agg->max_lag = lag; + if ((agg->cnt == 1) || (lag < agg->min_lag)) + agg->min_lag = lag; + if ((agg->cnt == 1) || (lag > agg->max_lag)) + agg->max_lag = lag; + } } } else @@ -1677,11 +1738,15 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) agg->min_latency, agg->max_latency); if (throttle_delay) + { fprintf(logfile, " %.0f %.0f %.0f %.0f", agg->sum_lag, agg->sum2_lag, agg->min_lag, agg->max_lag); + if (latency_limit) + fprintf(logfile, " %d", agg->skipped); + } fputc('\n', logfile); /* move to the next inteval */ @@ -1689,6 +1754,7 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) /* reset for "no transaction" intervals */ agg->cnt = 0; + agg->skipped = 0; agg->min_latency = 0; agg->max_latency = 0; agg->sum_latency = 0; @@ -1701,10 +1767,11 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) /* reset the values to include only the current transaction. */ agg->cnt = 1; + agg->skipped = skipped ? 1 : 0; agg->min_latency = latency; agg->max_latency = latency; - agg->sum_latency = latency; - agg->sum2_latency = latency * latency; + agg->sum_latency = skipped ? 0.0 : latency; + agg->sum2_latency = skipped ? 0.0 : latency * latency; agg->min_lag = lag; agg->max_lag = lag; agg->sum_lag = lag; @@ -1717,14 +1784,23 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg) #ifndef WIN32 /* This is more than we really ought to know about instr_time */ - fprintf(logfile, "%d %d %.0f %d %ld %ld", - st->id, st->cnt, latency, st->use_file, - (long) now->tv_sec, (long) now->tv_usec); + if (skipped) + fprintf(logfile, "%d %d skipped %d %ld %ld", + st->id, st->cnt, st->use_file, + (long) now->tv_sec, (long) now->tv_usec); + else + fprintf(logfile, "%d %d %.0f %d %ld %ld", + st->id, st->cnt, latency, st->use_file, + (long) now->tv_sec, (long) now->tv_usec); #else /* On Windows, instr_time doesn't provide a timestamp anyway */ - fprintf(logfile, "%d %d %.0f %d 0 0", - st->id, st->cnt, latency, st->use_file); + if (skipped) + fprintf(logfile, "%d %d skipped %d 0 0", + st->id, st->cnt, st->use_file); + else + fprintf(logfile, "%d %d %.0f %d 0 0", + st->id, st->cnt, latency, st->use_file); #endif if (throttle_delay) fprintf(logfile, " %.0f", lag); @@ -2424,7 +2500,8 @@ printResults(int ttype, int64 normal_xacts, int nclients, TState *threads, int nthreads, instr_time total_time, instr_time conn_total_time, int64 total_latencies, int64 total_sqlats, - int64 throttle_lag, int64 throttle_lag_max) + int64 throttle_lag, int64 throttle_lag_max, + int64 throttle_latency_skipped, int64 latency_late) { double time_include, tps_include, @@ -2463,7 +2540,17 @@ printResults(int ttype, int64 normal_xacts, int nclients, normal_xacts); } - if (throttle_delay || progress) + if (throttle_delay && latency_limit) + printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n", + throttle_latency_skipped, + 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts)); + + if (latency_limit) + printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n", + latency_limit / 1000.0, latency_late, + 100.0 * latency_late / (throttle_latency_skipped + normal_xacts)); + + if (throttle_delay || progress || latency_limit) { /* compute and show latency average and standard deviation */ double latency = 0.001 * total_latencies / normal_xacts; @@ -2578,6 +2665,7 @@ main(int argc, char **argv) {"sampling-rate", required_argument, NULL, 4}, {"aggregate-interval", required_argument, NULL, 5}, {"rate", required_argument, NULL, 'R'}, + {"latency-limit", required_argument, NULL, 'L'}, {NULL, 0, NULL, 0} }; @@ -2607,6 +2695,8 @@ main(int argc, char **argv) int64 total_sqlats = 0; int64 throttle_lag = 0; int64 throttle_lag_max = 0; + int64 throttle_latency_skipped = 0; + int64 latency_late = 0; int i; @@ -2651,7 +2741,7 @@ main(int argc, char **argv) state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); - while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) { switch (c) { @@ -2848,6 +2938,18 @@ main(int argc, char **argv) throttle_delay = (int64) (1000000.0 / throttle_value); } break; + case 'L': + { + double limit_ms = atof(optarg); + if (limit_ms <= 0.0) + { + fprintf(stderr, "invalid latency limit: %s\n", optarg); + exit(1); + } + benchmarking_option_set = true; + latency_limit = (int64) (limit_ms * 1000); + } + break; case 0: /* This covers long options which take no argument. */ if (foreign_keys || unlogged_tables) @@ -3143,6 +3245,8 @@ main(int argc, char **argv) thread->random_state[0] = random(); thread->random_state[1] = random(); thread->random_state[2] = random(); + thread->throttle_latency_skipped = 0; + thread->latency_late = 0; if (is_latencies) { @@ -3217,6 +3321,8 @@ main(int argc, char **argv) total_latencies += r->latencies; total_sqlats += r->sqlats; throttle_lag += r->throttle_lag; + throttle_latency_skipped += r->throttle_latency_skipped; + latency_late += r->latency_late; if (r->throttle_lag_max > throttle_lag_max) throttle_lag_max = r->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, r->conn_time); @@ -3239,7 +3345,8 @@ main(int argc, char **argv) INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, total_time, conn_total_time, total_latencies, total_sqlats, - throttle_lag, throttle_lag_max); + throttle_lag, throttle_lag_max, throttle_latency_skipped, + latency_late); return 0; } @@ -3264,7 +3371,8 @@ threadRun(void *arg) int64 last_count = 0, last_lats = 0, last_sqlats = 0, - last_lags = 0; + last_lags = 0, + last_skipped = 0; AggVals aggs; @@ -3467,7 +3575,8 @@ threadRun(void *arg) /* generate and show report */ int64 count = 0, lats = 0, - sqlats = 0; + sqlats = 0, + skipped = 0; int64 lags = thread->throttle_lag; int64 run = now - last_report; double tps, @@ -3490,23 +3599,26 @@ threadRun(void *arg) sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); lag = 0.001 * (lags - last_lags) / (count - last_count); + skipped = thread->throttle_latency_skipped - last_skipped; + fprintf(stderr, + "progress %d: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f", + thread->tid, total_run, tps, latency, stdev); if (throttle_delay) - fprintf(stderr, - "progress %d: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f, lag %.3f ms\n", - thread->tid, total_run, tps, latency, stdev, lag); - else - fprintf(stderr, - "progress %d: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f\n", - thread->tid, total_run, tps, latency, stdev); + { + fprintf(stderr, ", lag %.3f ms", lag); + if (latency_limit) + fprintf(stderr, ", skipped " INT64_FORMAT, skipped); + } + fprintf(stderr, "\n"); last_count = count; last_lats = lats; last_sqlats = sqlats; last_lags = lags; last_report = now; + last_skipped = thread->throttle_latency_skipped; next_report += (int64) progress *1000000; } } @@ -3525,7 +3637,8 @@ threadRun(void *arg) int64 count = 0, lats = 0, sqlats = 0, - lags = 0; + lags = 0, + skipped = 0; int64 run = now - last_report; double tps, total_run, @@ -3550,23 +3663,26 @@ threadRun(void *arg) sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); lag = 0.001 * (lags - last_lags) / (count - last_count); + skipped = thread->throttle_latency_skipped - last_skipped; + fprintf(stderr, + "progress: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f", + total_run, tps, latency, stdev); if (throttle_delay) - fprintf(stderr, - "progress: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f, lag %.3f ms\n", - total_run, tps, latency, stdev, lag); - else - fprintf(stderr, - "progress: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f\n", - total_run, tps, latency, stdev); + { + fprintf(stderr, ", lag %.3f ms", lag); + if (latency_limit) + fprintf(stderr, ", " INT64_FORMAT " skipped", skipped); + } + fprintf(stderr, "\n"); last_count = count; last_lats = lats; last_sqlats = sqlats; last_lags = lags; last_report = now; + last_skipped = thread->throttle_latency_skipped; next_report += (int64) progress *1000000; } } @@ -3587,6 +3703,9 @@ done: } result->throttle_lag = thread->throttle_lag; result->throttle_lag_max = thread->throttle_lag_max; + result->throttle_latency_skipped = thread->throttle_latency_skipped; + result->latency_late = thread->latency_late; + INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); if (logfile) |