diff options
-rw-r--r-- | src/test/modules/libpq_pipeline/libpq_pipeline.c | 78 |
1 files changed, 49 insertions, 29 deletions
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 1fe15ee8899..c353dba9c70 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -114,50 +114,38 @@ confirm_query_canceled_impl(int line, PGconn *conn) PQconsumeInput(conn); } -#define send_cancellable_query(conn, monitorConn) \ - send_cancellable_query_impl(__LINE__, conn, monitorConn) +/* + * Using monitorConn, query pg_stat_activity to see that the connection with + * the given PID is in the given state. We never stop until it does. + */ static void -send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) +wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state) { - const char *env_wait; - const Oid paramTypes[1] = {INT4OID}; - int procpid = PQbackendPID(conn); - - env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT"); - if (env_wait == NULL) - env_wait = "180"; + const Oid paramTypes[] = {INT4OID, TEXTOID}; + const char *paramValues[2]; + char *pidstr = psprintf("%d", procpid); - if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, - &env_wait, NULL, NULL, 0) != 1) - pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); + paramValues[0] = pidstr; + paramValues[1] = state; - /* - * Wait until the query is actually running. Otherwise sending a - * cancellation request might not cancel the query due to race conditions. - */ while (true) { - char *value; PGresult *res; - const char *paramValues[1]; - char pidval[16]; - - snprintf(pidval, 16, "%d", procpid); - paramValues[0] = pidval; + char *value; res = PQexecParams(monitorConn, "SELECT count(*) FROM pg_stat_activity WHERE " - "pid = $1 AND state = 'active'", - 1, NULL, paramValues, NULL, NULL, 1); + "pid = $1 AND state = $2", + 2, paramTypes, paramValues, NULL, NULL, 1); if (PQresultStatus(res) != PGRES_TUPLES_OK) - pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn)); + pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn)); if (PQntuples(res) != 1) - pg_fatal("unexpected number of rows received: %d", PQntuples(res)); + pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res)); if (PQnfields(res) != 1) - pg_fatal("unexpected number of columns received: %d", PQnfields(res)); + pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res)); value = PQgetvalue(res, 0, 0); - if (*value != '0') + if (value[0] != '0') { PQclear(res); break; @@ -167,6 +155,38 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) /* wait 10ms before polling again */ pg_usleep(10000); } + + pfree(pidstr); +} + +#define send_cancellable_query(conn, monitorConn) \ + send_cancellable_query_impl(__LINE__, conn, monitorConn) +static void +send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) +{ + const char *env_wait; + const Oid paramTypes[1] = {INT4OID}; + + /* + * Wait for the connection to be idle, so that our check for an active + * connection below is reliable, instead of possibly seeing an outdated + * state. + */ + wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle"); + + env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT"); + if (env_wait == NULL) + env_wait = "180"; + + if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, + &env_wait, NULL, NULL, 0) != 1) + pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); + + /* + * Wait for the query to start, because if the query is not running yet + * the cancel request that we send won't have any effect. + */ + wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "active"); } /* |