diff options
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r-- | contrib/pgbench/pgbench.c | 227 |
1 files changed, 174 insertions, 53 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 2aaa8df8464..88d7e524694 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -4,7 +4,7 @@ * A simple benchmark program for PostgreSQL * Originally written by Tatsuo Ishii and enhanced by many contributors. * - * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.99 2010/07/06 19:18:55 momjian Exp $ + * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.100 2010/08/12 20:39:39 tgl Exp $ * Copyright (c) 2000-2010, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -133,6 +133,7 @@ int fillfactor = 100; bool use_log; /* log transaction latencies to a file */ bool is_connect; /* establish connection for each transaction */ +bool is_latencies; /* report per-command latencies */ int main_pid; /* main process id used in log filename */ char *pghost = ""; @@ -171,7 +172,8 @@ typedef struct int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; - instr_time txn_begin; /* used for measuring latencies */ + instr_time txn_begin; /* used for measuring transaction latencies */ + instr_time stmt_begin; /* used for measuring statement latencies */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; @@ -186,6 +188,8 @@ typedef struct CState *state; /* array of CState */ int nstate; /* length of state[] */ instr_time start_time; /* thread start time */ + instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ + int *exec_count; /* number of cmd executions (per Command) */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -216,13 +220,16 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; typedef struct { + char *line; /* full text of command line */ + int command_num; /* unique index of this Command struct */ int type; /* command type (SQL_COMMAND or META_COMMAND) */ - int argc; /* number of commands */ - char *argv[MAX_ARGS]; /* command list */ + int argc; /* number of command words */ + char *argv[MAX_ARGS]; /* command word list */ } Command; static Command **sql_files[MAX_FILES]; /* SQL script files */ static int num_files; /* number of script files */ +static int num_commands = 0; /* total number of Command structs */ static int debug = 0; /* debug flag */ /* default scenario */ @@ -287,6 +294,7 @@ usage(const char *progname) " define variable for use by custom script\n" " -f FILENAME read transaction script from FILENAME\n" " -j NUM number of threads (default: 1)\n" + " -r report average latency per command\n" " -l write transaction times to log file\n" " -M {simple|extended|prepared}\n" " protocol for submitting queries to server (default: simple)\n" @@ -629,11 +637,13 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) char *endptr; int retval; - /* - * Join arguments with whilespace separaters. Arguments starting with - * exactly one colon are treated as variables: name - append a string - * "name" :var - append a variable named 'var'. ::name - append a string - * ":name" + /*---------- + * Join arguments with whitespace separators. Arguments starting with + * exactly one colon are treated as variables: + * name - append a string "name" + * :var - append a variable named 'var' + * ::name - append a string ":name" + *---------- */ for (i = 0; i < argc; i++) { @@ -740,7 +750,7 @@ clientDone(CState *st, bool ok) /* return false iff client should be disconnected */ static bool -doCustom(CState *st, instr_time *conn_time, FILE *logfile) +doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile) { PGresult *res; Command **commands; @@ -775,7 +785,22 @@ top: } /* - * transaction finished: record the time it took in the log + * command finished: accumulate per-command execution times in + * thread-local data structure, if per-command latencies are requested + */ + if (is_latencies) + { + instr_time now; + int cnum = commands[st->state]->command_num; + + INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum], + now, st->stmt_begin); + thread->exec_count[cnum]++; + } + + /* + * if transaction finished, record the time it took in the log */ if (logfile && commands[st->state + 1] == NULL) { @@ -802,6 +827,10 @@ top: if (commands[st->state]->type == SQL_COMMAND) { + /* + * Read and discard the query result; note this is not included + * in the statement latency numbers. + */ res = PQgetResult(st->con); switch (PQresultStatus(res)) { @@ -856,9 +885,14 @@ top: INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } + /* Record transaction start time if logging is enabled */ if (logfile && st->state == 0) INSTR_TIME_SET_CURRENT(st->txn_begin); + /* Record statement start time if per-command latencies are requested */ + if (is_latencies) + INSTR_TIME_SET_CURRENT(st->stmt_begin); + if (commands[st->state]->type == SQL_COMMAND) { const Command *command = commands[st->state]; @@ -1351,6 +1385,7 @@ parseQuery(Command *cmd, const char *raw_sql) return true; } +/* Parse a command; return a Command struct, or NULL if it's a comment */ static Command * process_commands(char *buf) { @@ -1361,24 +1396,28 @@ process_commands(char *buf) char *p, *tok; + /* Make the string buf end at the next newline */ if ((p = strchr(buf, '\n')) != NULL) *p = '\0'; + /* Skip leading whitespace */ p = buf; while (isspace((unsigned char) *p)) p++; + /* If the line is empty or actually a comment, we're done */ if (*p == '\0' || strncmp(p, "--", 2) == 0) - { return NULL; - } + /* Allocate and initialize Command structure */ my_commands = (Command *) malloc(sizeof(Command)); if (my_commands == NULL) - { return NULL; - } - + my_commands->line = strdup(buf); + if (my_commands->line == NULL) + return NULL; + my_commands->command_num = num_commands++; + my_commands->type = 0; /* until set */ my_commands->argc = 0; if (*p == '\\') @@ -1547,26 +1586,13 @@ process_file(char *filename) while (fgets(buf, sizeof(buf), fd) != NULL) { - Command *commands; - int i; + Command *command; - i = 0; - while (isspace((unsigned char) buf[i])) - i++; - - if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0) - { - commands = process_commands(&buf[i]); - if (commands == NULL) - { - fclose(fd); - return false; - } - } - else + command = process_commands(buf); + if (command == NULL) continue; - my_commands[lineno] = commands; + my_commands[lineno] = command; lineno++; if (lineno >= alloc_num) @@ -1612,7 +1638,7 @@ process_builtin(char *tb) for (;;) { char *p; - Command *commands; + Command *command; p = buf; while (*tb && *tb != '\n') @@ -1626,13 +1652,11 @@ process_builtin(char *tb) *p = '\0'; - commands = process_commands(buf); - if (commands == NULL) - { - return NULL; - } + command = process_commands(buf); + if (command == NULL) + continue; - my_commands[lineno] = commands; + my_commands[lineno] = command; lineno++; if (lineno >= alloc_num) @@ -1653,7 +1677,8 @@ process_builtin(char *tb) /* print out results */ static void -printResults(int ttype, int normal_xacts, int nclients, int nthreads, +printResults(int ttype, int normal_xacts, int nclients, + TState *threads, int nthreads, instr_time total_time, instr_time conn_total_time) { double time_include, @@ -1694,6 +1719,51 @@ printResults(int ttype, int normal_xacts, int nclients, int nthreads, } printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); + + /* Report per-command latencies */ + if (is_latencies) + { + int i; + + for (i = 0; i < num_files; i++) + { + Command **commands; + + if (num_files > 1) + printf("statement latencies in milliseconds, file %d:\n", i+1); + else + printf("statement latencies in milliseconds:\n"); + + for (commands = sql_files[i]; *commands != NULL; commands++) + { + Command *command = *commands; + int cnum = command->command_num; + double total_time; + instr_time total_exec_elapsed; + int total_exec_count; + int t; + + /* Accumulate per-thread data for command */ + INSTR_TIME_SET_ZERO(total_exec_elapsed); + total_exec_count = 0; + for (t = 0; t < nthreads; t++) + { + TState *thread = &threads[t]; + + INSTR_TIME_ADD(total_exec_elapsed, + thread->exec_elapsed[cnum]); + total_exec_count += thread->exec_count[cnum]; + } + + if (total_exec_count > 0) + total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count; + else + total_time = 0.0; + + printf("\t%f\t%s\n", total_time, command->line); + } + } + } } @@ -1770,7 +1840,7 @@ main(int argc, char **argv) memset(state, 0, sizeof(*state)); - while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1) + while ((c = getopt(argc, argv, "ih:nvp:dSNc:j:Crs:t:T:U:lf:D:F:M:")) != -1) { switch (c) { @@ -1834,6 +1904,9 @@ main(int argc, char **argv) case 'C': is_connect = true; break; + case 'r': + is_latencies = true; + break; case 's': scale_given = true; scale = atoi(optarg); @@ -1955,6 +2028,22 @@ main(int argc, char **argv) } /* + * is_latencies only works with multiple threads in thread-based + * implementations, not fork-based ones, because it supposes that the + * parent can see changes made to the per-thread execution stats by child + * threads. It seems useful enough to accept despite this limitation, + * but perhaps we should FIXME someday (by passing the stats data back + * up through the parent-to-child pipes). + */ +#ifndef ENABLE_THREAD_SAFETY + if (is_latencies && nthreads > 1) + { + fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n"); + exit(1); + } +#endif + + /* * save main process id in the global variable because process id will be * changed after fork. */ @@ -2091,6 +2180,39 @@ main(int argc, char **argv) break; } + /* set up thread data structures */ + threads = (TState *) malloc(sizeof(TState) * nthreads); + for (i = 0; i < nthreads; i++) + { + TState *thread = &threads[i]; + + thread->tid = i; + thread->state = &state[nclients / nthreads * i]; + thread->nstate = nclients / nthreads; + + if (is_latencies) + { + /* Reserve memory for the thread to store per-command latencies */ + int t; + + thread->exec_elapsed = (instr_time *) + malloc(sizeof(instr_time) * num_commands); + thread->exec_count = (int *) + malloc(sizeof(int) * num_commands); + + for (t = 0; t < num_commands; t++) + { + INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]); + thread->exec_count[t] = 0; + } + } + else + { + thread->exec_elapsed = NULL; + thread->exec_count = NULL; + } + } + /* get start up time */ INSTR_TIME_SET_CURRENT(start_time); @@ -2099,20 +2221,18 @@ main(int argc, char **argv) setalarm(duration); /* start threads */ - threads = (TState *) malloc(sizeof(TState) * nthreads); for (i = 0; i < nthreads; i++) { - threads[i].tid = i; - threads[i].state = &state[nclients / nthreads * i]; - threads[i].nstate = nclients / nthreads; - INSTR_TIME_SET_CURRENT(threads[i].start_time); + TState *thread = &threads[i]; + + INSTR_TIME_SET_CURRENT(thread->start_time); /* the first thread (i = 0) is executed by main thread */ if (i > 0) { - int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + int err = pthread_create(&thread->thread, NULL, threadRun, thread); - if (err != 0 || threads[i].thread == INVALID_THREAD) + if (err != 0 || thread->thread == INVALID_THREAD) { fprintf(stderr, "cannot create thread: %s\n", strerror(err)); exit(1); @@ -2120,7 +2240,7 @@ main(int argc, char **argv) } else { - threads[i].thread = INVALID_THREAD; + thread->thread = INVALID_THREAD; } } @@ -2150,7 +2270,8 @@ main(int argc, char **argv) /* get end time */ INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); - printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time); + printResults(ttype, total_xacts, nclients, threads, nthreads, + total_time, conn_total_time); return 0; } @@ -2211,7 +2332,7 @@ threadRun(void *arg) int prev_ecnt = st->ecnt; st->use_file = getrand(0, num_files - 1); - if (!doCustom(st, &result->conn_time, logfile)) + if (!doCustom(thread, st, &result->conn_time, logfile)) remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) @@ -2313,7 +2434,7 @@ threadRun(void *arg) if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) || commands[st->state]->type == META_COMMAND)) { - if (!doCustom(st, &result->conn_time, logfile)) + if (!doCustom(thread, st, &result->conn_time, logfile)) remains--; /* I've aborted */ } |