diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-04-21 10:46:09 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-04-21 10:49:09 -0400 |
commit | f039eaac7131ef2a4cf63a10cf98486f8bcd09d2 (patch) | |
tree | 7cf01d74aa16537ce41f72f9c796cc0e896270c6 /contrib/postgres_fdw/connection.c | |
parent | 11e178d0dc4bc2328ae4759090b3c48b07023fab (diff) | |
download | postgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.tar.gz postgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.zip |
Allow queries submitted by postgres_fdw to be canceled.
This fixes a problem which is not new, but with the advent of direct
foreign table modification in 0bf3ae88af330496517722e391e7c975e6bad219,
it's somewhat more likely to be annoying than previously. So,
arrange for a local query cancelation to propagate to the remote side.
Michael Paquier, reviewed by Etsuro Fujita. Original report by
Thom Brown.
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 189f290cdf6..16ef38fff78 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -17,6 +17,7 @@ #include "access/xact.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "storage/latch.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -448,6 +449,78 @@ GetPrepStmtNumber(PGconn *conn) } /* + * Submit a query and wait for the result. + * + * This function is interruptible by signals. + * + * Caller is responsible for the error handling on the result. + */ +PGresult * +pgfdw_exec_query(PGconn *conn, const char *query) +{ + /* + * Submit a query. Since we don't use non-blocking mode, this also can + * block. But its risk is relatively small, so we ignore that for now. + */ + if (!PQsendQuery(conn, query)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + + /* Wait for the result. */ + return pgfdw_get_result(conn, query); +} + +/* + * Wait for the result from a prior asynchronous execution function call. + * + * This function offers quick responsiveness by checking for any interruptions. + * + * This function emulates the PQexec()'s behavior of returning the last result + * when there are many. + * + * Caller is responsible for the error handling on the result. + */ +PGresult * +pgfdw_get_result(PGconn *conn, const char *query) +{ + PGresult *last_res = NULL; + + for (;;) + { + PGresult *res; + + while (PQisBusy(conn)) + { + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE, + PQsocket(conn), + -1L); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + } + } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; + } + + return last_res; +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) @@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; + + /* + * If a command has been submitted to the remote server by + * using an asynchronous execution function, the command + * might not have yet completed. Check to see if a command + * is still being processed by the remote server, and if so, + * request cancellation of the command; if not, abort + * gracefully. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + { + PGcancel *cancel; + char errbuf[256]; + + if ((cancel = PQgetCancel(entry->conn))) + { + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + errbuf))); + PQfreeCancel(cancel); + } + break; + } + /* If we're aborting, abort all remote transactions too */ res = PQexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ |