diff options
author | Noah Misch <noah@leadboat.com> | 2024-01-08 11:39:56 -0800 |
---|---|---|
committer | Noah Misch <noah@leadboat.com> | 2024-01-08 11:39:56 -0800 |
commit | d3c5f37dd543498cc7c678815d3921823beec9e9 (patch) | |
tree | 9c430f23ec95036ded0b8bb16446b30528159c9e /src | |
parent | 0efc8318477714600567d15812dc8d15841e269e (diff) | |
download | postgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.tar.gz postgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.zip |
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query). Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side. The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these. Use them in postgres_fdw,
replacing a local implementation from which the libpqsrv implementation
derives. This is a bug fix for dblink. Code inspection identified the
bug at least thirteen years ago, but user complaints have not appeared.
Hence, no back-patch for now.
Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 9 | ||||
-rw-r--r-- | src/include/libpq/libpq-be-fe-helpers.h | 127 |
2 files changed, 130 insertions, 6 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c9748539aa5..78344a03615 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -653,12 +653,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Send a query and wait for the results by using the asynchronous libpq * functions and socket readiness events. * - * We must not use the regular blocking libpq functions like PQexec() - * since they are uninterruptible by signals on some platforms, such as - * Windows. - * - * The function is modeled on PQexec() in libpq, but only implements - * those parts that are in use in the walreceiver api. + * The function is modeled on libpqsrv_exec(), with the behavior difference + * being that it calls ProcessWalRcvInterrupts(). As an optimization, it + * skips try/catch, since all errors terminate the process. * * May return NULL, rather than an error result, on failure. */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index ef8e9e170ab..5d33bcf32f7 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -48,6 +48,8 @@ static inline void libpqsrv_connect_prepare(void); static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); /* @@ -238,4 +240,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) PG_END_TRY(); } +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + #endif /* LIBPQ_BE_FE_HELPERS_H */ |