aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNoah Misch <noah@leadboat.com>2024-01-08 11:39:56 -0800
committerNoah Misch <noah@leadboat.com>2024-01-08 11:39:56 -0800
commitd3c5f37dd543498cc7c678815d3921823beec9e9 (patch)
tree9c430f23ec95036ded0b8bb16446b30528159c9e /src
parent0efc8318477714600567d15812dc8d15841e269e (diff)
downloadpostgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.tar.gz
postgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.zip
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. Use them in postgres_fdw, replacing a local implementation from which the libpqsrv implementation derives. This is a bug fix for dblink. Code inspection identified the bug at least thirteen years ago, but user complaints have not appeared. Hence, no back-patch for now. Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c9
-rw-r--r--src/include/libpq/libpq-be-fe-helpers.h127
2 files changed, 130 insertions, 6 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c9748539aa5..78344a03615 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -653,12 +653,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
* Send a query and wait for the results by using the asynchronous libpq
* functions and socket readiness events.
*
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
+ * skips try/catch, since all errors terminate the process.
*
* May return NULL, rather than an error result, on failure.
*/
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index ef8e9e170ab..5d33bcf32f7 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -48,6 +48,8 @@
static inline void libpqsrv_connect_prepare(void);
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
/*
@@ -238,4 +240,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
PG_END_TRY();
}
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server. Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+ if (!PQsendQuery(conn, query))
+ return NULL;
+ return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+ const char *command,
+ int nParams,
+ const Oid *paramTypes,
+ const char *const *paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat,
+ uint32 wait_event_info)
+{
+ if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+ paramLengths, paramFormats, resultFormat))
+ return NULL;
+ return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state. Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+ PGresult *volatile lastResult = NULL;
+
+ /* In what follows, do not leak any PGresults on an error. */
+ PG_TRY();
+ {
+ for (;;)
+ {
+ /* Wait for, and collect, the next PGresult. */
+ PGresult *result;
+
+ result = libpqsrv_get_result(conn, wait_event_info);
+ if (result == NULL)
+ break; /* query is complete, or failure */
+
+ /*
+ * Emulate PQexec()'s behavior of returning the last result when
+ * there are many.
+ */
+ PQclear(lastResult);
+ lastResult = result;
+
+ if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+ PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+ PQstatus(conn) == CONNECTION_BAD)
+ break;
+ }
+ }
+ PG_CATCH();
+ {
+ PQclear(lastResult);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+ /*
+ * Collect data until PQgetResult is ready to get the result without
+ * blocking.
+ */
+ while (PQisBusy(conn))
+ {
+ int rc;
+
+ rc = WaitLatchOrSocket(MyLatch,
+ WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
+ WL_SOCKET_READABLE,
+ PQsocket(conn),
+ 0,
+ wait_event_info);
+
+ /* Interrupted? */
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ /* trouble; expect PQgetResult() to return NULL */
+ break;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(conn);
+}
+
#endif /* LIBPQ_BE_FE_HELPERS_H */