aboutsummaryrefslogtreecommitdiff
path: root/contrib/pgbench/pgbench.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r--contrib/pgbench/pgbench.c231
1 files changed, 175 insertions, 56 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index e9431ee7860..3453a1f7a6b 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -141,6 +141,14 @@ double sample_rate = 0.0;
int64 throttle_delay = 0;
/*
+ * Transactions which take longer than this limit (in usec) are counted as
+ * late, and reported as such, although they are completed anyway. When
+ * throttling is enabled, execution time slots that are more than this late
+ * are skipped altogether, and counted separately.
+ */
+int64 latency_limit = 0;
+
+/*
* tablespace selection
*/
char *tablespace = NULL;
@@ -238,6 +246,8 @@ typedef struct
int64 throttle_trigger; /* previous/next throttling (us) */
int64 throttle_lag; /* total transaction lag behind throttling */
int64 throttle_lag_max; /* max transaction lag */
+ int64 throttle_latency_skipped; /* lagging transactions skipped */
+ int64 latency_late; /* late transactions */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -250,6 +260,8 @@ typedef struct
int64 sqlats;
int64 throttle_lag;
int64 throttle_lag_max;
+ int64 throttle_latency_skipped;
+ int64 latency_late;
} TResult;
/*
@@ -284,6 +296,8 @@ typedef struct
long start_time; /* when does the interval start */
int cnt; /* number of transactions */
+ int skipped; /* number of transactions skipped under
+ * --rate and --latency-limit */
double min_latency; /* min/max latencies */
double max_latency;
@@ -348,7 +362,7 @@ static void setalarm(int seconds);
static void *threadRun(void *arg);
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
- AggVals *agg);
+ AggVals *agg, bool skipped);
static void
usage(void)
@@ -375,6 +389,8 @@ usage(void)
" -f, --file=FILENAME read transaction script from FILENAME\n"
" -j, --jobs=NUM number of threads (default: 1)\n"
" -l, --log write transaction times to log file\n"
+ " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
+ " as late.\n"
" -M, --protocol=simple|extended|prepared\n"
" protocol for submitting queries (default: simple)\n"
" -n, --no-vacuum do not run VACUUM before tests\n"
@@ -994,7 +1010,9 @@ void
agg_vals_init(AggVals *aggs, instr_time start)
{
/* basic counters */
- aggs->cnt = 0; /* number of transactions */
+ aggs->cnt = 0; /* number of transactions (includes skipped) */
+ aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
+
aggs->sum_latency = 0; /* SUM(latency) */
aggs->sum2_latency = 0; /* SUM(latency*latency) */
@@ -1050,8 +1068,34 @@ top:
int64 wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait;
-
st->txn_scheduled = thread->throttle_trigger;
+
+ /*
+ * If this --latency-limit is used, and this slot is already late so
+ * that the transaction will miss the latency limit even if it
+ * completed immediately, we skip this time slot and iterate till the
+ * next slot that isn't late yet.
+ */
+ if (latency_limit)
+ {
+ int64 now_us;
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ while (thread->throttle_trigger < now_us - latency_limit)
+ {
+ thread->throttle_latency_skipped++;
+
+ if (logfile)
+ doLog(thread, st, logfile, &now, agg, true);
+
+ wait = getPoissonRand(thread, throttle_delay);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
+ }
+ }
+
st->sleeping = 1;
st->throttling = true;
st->is_throttled = true;
@@ -1119,12 +1163,13 @@ top:
if (commands[st->state + 1] == NULL)
{
/* only calculate latency if an option is used that needs it */
- if (progress || throttle_delay)
+ if (progress || throttle_delay || latency_limit)
{
int64 latency;
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
+
latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
st->txn_latencies += latency;
@@ -1137,11 +1182,15 @@ top:
* transactions, overflow would take 256 hours.
*/
st->txn_sqlats += latency * latency;
+
+ /* record over the limit transactions if needed. */
+ if (latency_limit && latency > latency_limit)
+ thread->latency_late++;
}
/* record the time it took in the log */
if (logfile)
- doLog(thread, st, logfile, &now, agg);
+ doLog(thread, st, logfile, &now, agg, false);
}
if (commands[st->state]->type == SQL_COMMAND)
@@ -1227,7 +1276,7 @@ top:
}
/* Record transaction start time under logging, progress or throttling */
- if ((logfile || progress || throttle_delay) && st->state == 0)
+ if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
{
INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -1605,7 +1654,8 @@ top:
* print log entry after completing one transaction.
*/
static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
+ bool skipped)
{
double lag;
double latency;
@@ -1622,7 +1672,10 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
INSTR_TIME_SET_CURRENT(*now);
latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
- lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
+ if (skipped)
+ lag = latency;
+ else
+ lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
/* should we aggregate the results or not? */
if (agg_interval > 0)
@@ -1634,26 +1687,34 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
{
agg->cnt += 1;
- agg->sum_latency += latency;
- agg->sum2_latency += latency * latency;
+ if (skipped)
+ {
+ /* there is no latency to record if the transaction was skipped */
+ agg->skipped += 1;
+ }
+ else
+ {
+ agg->sum_latency += latency;
+ agg->sum2_latency += latency * latency;
- /* first in this aggregation interval */
- if ((agg->cnt == 1) || (latency < agg->min_latency))
- agg->min_latency = latency;
+ /* first in this aggregation interval */
+ if ((agg->cnt == 1) || (latency < agg->min_latency))
+ agg->min_latency = latency;
- if ((agg->cnt == 1) || (latency > agg->max_latency))
- agg->max_latency = latency;
+ if ((agg->cnt == 1) || (latency > agg->max_latency))
+ agg->max_latency = latency;
- /* and the same for schedule lag */
- if (throttle_delay)
- {
- agg->sum_lag += lag;
- agg->sum2_lag += lag * lag;
+ /* and the same for schedule lag */
+ if (throttle_delay)
+ {
+ agg->sum_lag += lag;
+ agg->sum2_lag += lag * lag;
- if ((agg->cnt == 1) || (lag < agg->min_lag))
- agg->min_lag = lag;
- if ((agg->cnt == 1) || (lag > agg->max_lag))
- agg->max_lag = lag;
+ if ((agg->cnt == 1) || (lag < agg->min_lag))
+ agg->min_lag = lag;
+ if ((agg->cnt == 1) || (lag > agg->max_lag))
+ agg->max_lag = lag;
+ }
}
}
else
@@ -1677,11 +1738,15 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
agg->min_latency,
agg->max_latency);
if (throttle_delay)
+ {
fprintf(logfile, " %.0f %.0f %.0f %.0f",
agg->sum_lag,
agg->sum2_lag,
agg->min_lag,
agg->max_lag);
+ if (latency_limit)
+ fprintf(logfile, " %d", agg->skipped);
+ }
fputc('\n', logfile);
/* move to the next inteval */
@@ -1689,6 +1754,7 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
/* reset for "no transaction" intervals */
agg->cnt = 0;
+ agg->skipped = 0;
agg->min_latency = 0;
agg->max_latency = 0;
agg->sum_latency = 0;
@@ -1701,10 +1767,11 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
/* reset the values to include only the current transaction. */
agg->cnt = 1;
+ agg->skipped = skipped ? 1 : 0;
agg->min_latency = latency;
agg->max_latency = latency;
- agg->sum_latency = latency;
- agg->sum2_latency = latency * latency;
+ agg->sum_latency = skipped ? 0.0 : latency;
+ agg->sum2_latency = skipped ? 0.0 : latency * latency;
agg->min_lag = lag;
agg->max_lag = lag;
agg->sum_lag = lag;
@@ -1717,14 +1784,23 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
#ifndef WIN32
/* This is more than we really ought to know about instr_time */
- fprintf(logfile, "%d %d %.0f %d %ld %ld",
- st->id, st->cnt, latency, st->use_file,
- (long) now->tv_sec, (long) now->tv_usec);
+ if (skipped)
+ fprintf(logfile, "%d %d skipped %d %ld %ld",
+ st->id, st->cnt, st->use_file,
+ (long) now->tv_sec, (long) now->tv_usec);
+ else
+ fprintf(logfile, "%d %d %.0f %d %ld %ld",
+ st->id, st->cnt, latency, st->use_file,
+ (long) now->tv_sec, (long) now->tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
- fprintf(logfile, "%d %d %.0f %d 0 0",
- st->id, st->cnt, latency, st->use_file);
+ if (skipped)
+ fprintf(logfile, "%d %d skipped %d 0 0",
+ st->id, st->cnt, st->use_file);
+ else
+ fprintf(logfile, "%d %d %.0f %d 0 0",
+ st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
@@ -2424,7 +2500,8 @@ printResults(int ttype, int64 normal_xacts, int nclients,
TState *threads, int nthreads,
instr_time total_time, instr_time conn_total_time,
int64 total_latencies, int64 total_sqlats,
- int64 throttle_lag, int64 throttle_lag_max)
+ int64 throttle_lag, int64 throttle_lag_max,
+ int64 throttle_latency_skipped, int64 latency_late)
{
double time_include,
tps_include,
@@ -2463,7 +2540,17 @@ printResults(int ttype, int64 normal_xacts, int nclients,
normal_xacts);
}
- if (throttle_delay || progress)
+ if (throttle_delay && latency_limit)
+ printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
+ throttle_latency_skipped,
+ 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+
+ if (latency_limit)
+ printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+ latency_limit / 1000.0, latency_late,
+ 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+
+ if (throttle_delay || progress || latency_limit)
{
/* compute and show latency average and standard deviation */
double latency = 0.001 * total_latencies / normal_xacts;
@@ -2578,6 +2665,7 @@ main(int argc, char **argv)
{"sampling-rate", required_argument, NULL, 4},
{"aggregate-interval", required_argument, NULL, 5},
{"rate", required_argument, NULL, 'R'},
+ {"latency-limit", required_argument, NULL, 'L'},
{NULL, 0, NULL, 0}
};
@@ -2607,6 +2695,8 @@ main(int argc, char **argv)
int64 total_sqlats = 0;
int64 throttle_lag = 0;
int64 throttle_lag_max = 0;
+ int64 throttle_latency_skipped = 0;
+ int64 latency_late = 0;
int i;
@@ -2651,7 +2741,7 @@ main(int argc, char **argv)
state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
- while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
{
switch (c)
{
@@ -2848,6 +2938,18 @@ main(int argc, char **argv)
throttle_delay = (int64) (1000000.0 / throttle_value);
}
break;
+ case 'L':
+ {
+ double limit_ms = atof(optarg);
+ if (limit_ms <= 0.0)
+ {
+ fprintf(stderr, "invalid latency limit: %s\n", optarg);
+ exit(1);
+ }
+ benchmarking_option_set = true;
+ latency_limit = (int64) (limit_ms * 1000);
+ }
+ break;
case 0:
/* This covers long options which take no argument. */
if (foreign_keys || unlogged_tables)
@@ -3143,6 +3245,8 @@ main(int argc, char **argv)
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
+ thread->throttle_latency_skipped = 0;
+ thread->latency_late = 0;
if (is_latencies)
{
@@ -3217,6 +3321,8 @@ main(int argc, char **argv)
total_latencies += r->latencies;
total_sqlats += r->sqlats;
throttle_lag += r->throttle_lag;
+ throttle_latency_skipped += r->throttle_latency_skipped;
+ latency_late += r->latency_late;
if (r->throttle_lag_max > throttle_lag_max)
throttle_lag_max = r->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
@@ -3239,7 +3345,8 @@ main(int argc, char **argv)
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
total_time, conn_total_time, total_latencies, total_sqlats,
- throttle_lag, throttle_lag_max);
+ throttle_lag, throttle_lag_max, throttle_latency_skipped,
+ latency_late);
return 0;
}
@@ -3264,7 +3371,8 @@ threadRun(void *arg)
int64 last_count = 0,
last_lats = 0,
last_sqlats = 0,
- last_lags = 0;
+ last_lags = 0,
+ last_skipped = 0;
AggVals aggs;
@@ -3467,7 +3575,8 @@ threadRun(void *arg)
/* generate and show report */
int64 count = 0,
lats = 0,
- sqlats = 0;
+ sqlats = 0,
+ skipped = 0;
int64 lags = thread->throttle_lag;
int64 run = now - last_report;
double tps,
@@ -3490,23 +3599,26 @@ threadRun(void *arg)
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (lags - last_lags) / (count - last_count);
+ skipped = thread->throttle_latency_skipped - last_skipped;
+ fprintf(stderr,
+ "progress %d: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f",
+ thread->tid, total_run, tps, latency, stdev);
if (throttle_delay)
- fprintf(stderr,
- "progress %d: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f, lag %.3f ms\n",
- thread->tid, total_run, tps, latency, stdev, lag);
- else
- fprintf(stderr,
- "progress %d: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f\n",
- thread->tid, total_run, tps, latency, stdev);
+ {
+ fprintf(stderr, ", lag %.3f ms", lag);
+ if (latency_limit)
+ fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
+ }
+ fprintf(stderr, "\n");
last_count = count;
last_lats = lats;
last_sqlats = sqlats;
last_lags = lags;
last_report = now;
+ last_skipped = thread->throttle_latency_skipped;
next_report += (int64) progress *1000000;
}
}
@@ -3525,7 +3637,8 @@ threadRun(void *arg)
int64 count = 0,
lats = 0,
sqlats = 0,
- lags = 0;
+ lags = 0,
+ skipped = 0;
int64 run = now - last_report;
double tps,
total_run,
@@ -3550,23 +3663,26 @@ threadRun(void *arg)
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (lags - last_lags) / (count - last_count);
+ skipped = thread->throttle_latency_skipped - last_skipped;
+ fprintf(stderr,
+ "progress: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f",
+ total_run, tps, latency, stdev);
if (throttle_delay)
- fprintf(stderr,
- "progress: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f, lag %.3f ms\n",
- total_run, tps, latency, stdev, lag);
- else
- fprintf(stderr,
- "progress: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f\n",
- total_run, tps, latency, stdev);
+ {
+ fprintf(stderr, ", lag %.3f ms", lag);
+ if (latency_limit)
+ fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
+ }
+ fprintf(stderr, "\n");
last_count = count;
last_lats = lats;
last_sqlats = sqlats;
last_lags = lags;
last_report = now;
+ last_skipped = thread->throttle_latency_skipped;
next_report += (int64) progress *1000000;
}
}
@@ -3587,6 +3703,9 @@ done:
}
result->throttle_lag = thread->throttle_lag;
result->throttle_lag_max = thread->throttle_lag_max;
+ result->throttle_latency_skipped = thread->throttle_latency_skipped;
+ result->latency_late = thread->latency_late;
+
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
if (logfile)