diff options
Diffstat (limited to 'src/test/modules/libpq_pipeline/libpq_pipeline.c')
-rw-r--r-- | src/test/modules/libpq_pipeline/libpq_pipeline.c | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 249ee22105c..c27c4e0adaf 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -28,6 +28,8 @@ static void exit_nicely(PGconn *conn); +static bool process_result(PGconn *conn, PGresult *res, int results, + int numsent); const char *const progname = "libpq_pipeline"; @@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn) fprintf(stderr, "ok\n"); } +/* + * In this test mode we send a stream of queries, with one in the middle + * causing an error. Verify that we can still send some more after the + * error and have libpq work properly. + */ +static void +test_uniqviol(PGconn *conn) +{ + int sock = PQsocket(conn); + PGresult *res; + Oid paramTypes[2] = {INT8OID, INT8OID}; + const char *paramValues[2]; + char paramValue0[MAXINT8LEN]; + char paramValue1[MAXINT8LEN]; + int ctr = 0; + int numsent = 0; + int results = 0; + bool read_done = false; + bool write_done = false; + bool error_sent = false; + bool got_error = false; + int switched = 0; + int socketful = 0; + fd_set in_fds; + fd_set out_fds; + + fprintf(stderr, "uniqviol ..."); + + PQsetnonblocking(conn, 1); + + paramValues[0] = paramValue0; + paramValues[1] = paramValue1; + sprintf(paramValue1, "42"); + + res = PQexec(conn, "drop table if exists ppln_uniqviol;" + "create table ppln_uniqviol(id bigint primary key, idata bigint)"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("failed to create table: %s", PQerrorMessage(conn)); + + res = PQexec(conn, "begin"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn)); + + res = PQprepare(conn, "insertion", + "insert into ppln_uniqviol values ($1, $2) returning id", + 2, paramTypes); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("failed to prepare query: %s", PQerrorMessage(conn)); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode"); + + while (!read_done) + { + /* + * Avoid deadlocks by reading everything the server has sent before + * sending anything. (Special precaution is needed here to process + * PQisBusy before testing the socket for read-readiness, because the + * socket does not turn read-ready after "sending" queries in aborted + * pipeline mode.) + */ + while (PQisBusy(conn) == 0) + { + bool new_error; + + if (results >= numsent) + { + if (write_done) + read_done = true; + break; + } + + res = PQgetResult(conn); + new_error = process_result(conn, res, results, numsent); + if (new_error && got_error) + pg_fatal("got two errors"); + got_error |= new_error; + if (results++ >= numsent - 1) + { + if (write_done) + read_done = true; + break; + } + } + + if (read_done) + break; + + FD_ZERO(&out_fds); + FD_SET(sock, &out_fds); + + FD_ZERO(&in_fds); + FD_SET(sock, &in_fds); + + if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1) + { + if (errno == EINTR) + continue; + pg_fatal("select() failed: %m"); + } + + if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0) + pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn)); + + /* + * If the socket is writable and we haven't finished sending queries, + * send some. + */ + if (!write_done && FD_ISSET(sock, &out_fds)) + { + for (;;) + { + int flush; + + /* + * provoke uniqueness violation exactly once after having + * switched to read mode. + */ + if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2) + { + sprintf(paramValue0, "%d", numsent / 2); + fprintf(stderr, "E"); + error_sent = true; + } + else + { + fprintf(stderr, "."); + sprintf(paramValue0, "%d", ctr++); + } + + if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1) + pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn)); + numsent++; + + /* Are we done writing? */ + if (socketful != 0 && numsent % socketful == 42 && error_sent) + { + if (PQsendFlushRequest(conn) != 1) + pg_fatal("failed to send flush request"); + write_done = true; + fprintf(stderr, "\ndone writing\n"); + PQflush(conn); + break; + } + + /* is the outgoing socket full? */ + flush = PQflush(conn); + if (flush == -1) + pg_fatal("failed to flush: %s", PQerrorMessage(conn)); + if (flush == 1) + { + if (socketful == 0) + socketful = numsent; + fprintf(stderr, "\nswitch to reading\n"); + switched++; + break; + } + } + } + } + + if (!got_error) + pg_fatal("did not get expected error"); + + fprintf(stderr, "ok\n"); +} + +/* + * Subroutine for test_uniqviol; given a PGresult, print it out and consume + * the expected NULL that should follow it. + * + * Returns true if we read a fatal error message, otherwise false. + */ +static bool +process_result(PGconn *conn, PGresult *res, int results, int numsent) +{ + PGresult *res2; + bool got_error = false; + + if (res == NULL) + pg_fatal("got unexpected NULL"); + + switch (PQresultStatus(res)) + { + case PGRES_FATAL_ERROR: + got_error = true; + fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn)); + PQclear(res); + + res2 = PQgetResult(conn); + if (res2 != NULL) + pg_fatal("expected NULL, got %s", + PQresStatus(PQresultStatus(res2))); + break; + + case PGRES_TUPLES_OK: + fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0)); + PQclear(res); + + res2 = PQgetResult(conn); + if (res2 != NULL) + pg_fatal("expected NULL, got %s", + PQresStatus(PQresultStatus(res2))); + break; + + case PGRES_PIPELINE_ABORTED: + fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent); + res2 = PQgetResult(conn); + if (res2 != NULL) + pg_fatal("expected NULL, got %s", + PQresStatus(PQresultStatus(res2))); + break; + + default: + pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res))); + } + + return got_error; +} + + static void usage(const char *progname) { @@ -1331,6 +1554,7 @@ print_test_list(void) printf("simple_pipeline\n"); printf("singlerow\n"); printf("transaction\n"); + printf("uniqviol\n"); } int @@ -1436,6 +1660,8 @@ main(int argc, char **argv) test_singlerowmode(conn); else if (strcmp(testname, "transaction") == 0) test_transaction(conn); + else if (strcmp(testname, "uniqviol") == 0) + test_uniqviol(conn); else { fprintf(stderr, "\"%s\" is not a recognized test name\n", testname); |