aboutsummaryrefslogtreecommitdiff
path: root/src/test/modules/libpq_pipeline/libpq_pipeline.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/modules/libpq_pipeline/libpq_pipeline.c')
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c226
1 files changed, 226 insertions, 0 deletions
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 249ee22105c..c27c4e0adaf 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -28,6 +28,8 @@
static void exit_nicely(PGconn *conn);
+static bool process_result(PGconn *conn, PGresult *res, int results,
+ int numsent);
const char *const progname = "libpq_pipeline";
@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
fprintf(stderr, "ok\n");
}
+/*
+ * In this test mode we send a stream of queries, with one in the middle
+ * causing an error. Verify that we can still send some more after the
+ * error and have libpq work properly.
+ */
+static void
+test_uniqviol(PGconn *conn)
+{
+ int sock = PQsocket(conn);
+ PGresult *res;
+ Oid paramTypes[2] = {INT8OID, INT8OID};
+ const char *paramValues[2];
+ char paramValue0[MAXINT8LEN];
+ char paramValue1[MAXINT8LEN];
+ int ctr = 0;
+ int numsent = 0;
+ int results = 0;
+ bool read_done = false;
+ bool write_done = false;
+ bool error_sent = false;
+ bool got_error = false;
+ int switched = 0;
+ int socketful = 0;
+ fd_set in_fds;
+ fd_set out_fds;
+
+ fprintf(stderr, "uniqviol ...");
+
+ PQsetnonblocking(conn, 1);
+
+ paramValues[0] = paramValue0;
+ paramValues[1] = paramValue1;
+ sprintf(paramValue1, "42");
+
+ res = PQexec(conn, "drop table if exists ppln_uniqviol;"
+ "create table ppln_uniqviol(id bigint primary key, idata bigint)");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to create table: %s", PQerrorMessage(conn));
+
+ res = PQexec(conn, "begin");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
+
+ res = PQprepare(conn, "insertion",
+ "insert into ppln_uniqviol values ($1, $2) returning id",
+ 2, paramTypes);
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode");
+
+ while (!read_done)
+ {
+ /*
+ * Avoid deadlocks by reading everything the server has sent before
+ * sending anything. (Special precaution is needed here to process
+ * PQisBusy before testing the socket for read-readiness, because the
+ * socket does not turn read-ready after "sending" queries in aborted
+ * pipeline mode.)
+ */
+ while (PQisBusy(conn) == 0)
+ {
+ bool new_error;
+
+ if (results >= numsent)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+
+ res = PQgetResult(conn);
+ new_error = process_result(conn, res, results, numsent);
+ if (new_error && got_error)
+ pg_fatal("got two errors");
+ got_error |= new_error;
+ if (results++ >= numsent - 1)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+ }
+
+ if (read_done)
+ break;
+
+ FD_ZERO(&out_fds);
+ FD_SET(sock, &out_fds);
+
+ FD_ZERO(&in_fds);
+ FD_SET(sock, &in_fds);
+
+ if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+ pg_fatal("select() failed: %m");
+ }
+
+ if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
+ pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
+
+ /*
+ * If the socket is writable and we haven't finished sending queries,
+ * send some.
+ */
+ if (!write_done && FD_ISSET(sock, &out_fds))
+ {
+ for (;;)
+ {
+ int flush;
+
+ /*
+ * provoke uniqueness violation exactly once after having
+ * switched to read mode.
+ */
+ if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
+ {
+ sprintf(paramValue0, "%d", numsent / 2);
+ fprintf(stderr, "E");
+ error_sent = true;
+ }
+ else
+ {
+ fprintf(stderr, ".");
+ sprintf(paramValue0, "%d", ctr++);
+ }
+
+ if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
+ pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
+ numsent++;
+
+ /* Are we done writing? */
+ if (socketful != 0 && numsent % socketful == 42 && error_sent)
+ {
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ write_done = true;
+ fprintf(stderr, "\ndone writing\n");
+ PQflush(conn);
+ break;
+ }
+
+ /* is the outgoing socket full? */
+ flush = PQflush(conn);
+ if (flush == -1)
+ pg_fatal("failed to flush: %s", PQerrorMessage(conn));
+ if (flush == 1)
+ {
+ if (socketful == 0)
+ socketful = numsent;
+ fprintf(stderr, "\nswitch to reading\n");
+ switched++;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!got_error)
+ pg_fatal("did not get expected error");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * Subroutine for test_uniqviol; given a PGresult, print it out and consume
+ * the expected NULL that should follow it.
+ *
+ * Returns true if we read a fatal error message, otherwise false.
+ */
+static bool
+process_result(PGconn *conn, PGresult *res, int results, int numsent)
+{
+ PGresult *res2;
+ bool got_error = false;
+
+ if (res == NULL)
+ pg_fatal("got unexpected NULL");
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_FATAL_ERROR:
+ got_error = true;
+ fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_PIPELINE_ABORTED:
+ fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ default:
+ pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
+ }
+
+ return got_error;
+}
+
+
static void
usage(const char *progname)
{
@@ -1331,6 +1554,7 @@ print_test_list(void)
printf("simple_pipeline\n");
printf("singlerow\n");
printf("transaction\n");
+ printf("uniqviol\n");
}
int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
test_singlerowmode(conn);
else if (strcmp(testname, "transaction") == 0)
test_transaction(conn);
+ else if (strcmp(testname, "uniqviol") == 0)
+ test_uniqviol(conn);
else
{
fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);