aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2017-05-06 22:19:56 -0400
committerRobert Haas <rhaas@postgresql.org>2017-05-06 22:19:56 -0400
commitf14bf0a8fdd5a9fb2c5be692e0c9003185b88fa3 (patch)
tree926eee73d8425c5c0a0bc5a2e7871fde60d85144 /contrib/postgres_fdw/postgres_fdw.c
parent8c681454dca0cf6a4dc8b48ca900851c046c4592 (diff)
downloadpostgresql-f14bf0a8fdd5a9fb2c5be692e0c9003185b88fa3.tar.gz
postgresql-f14bf0a8fdd5a9fb2c5be692e0c9003185b88fa3.zip
Allow queries submitted by postgres_fdw to be canceled.
Back-patch of commits f039eaac7131ef2a4cf63a10cf98486f8bcd09d2 and 1b812afb0eafe125b820cc3b95e7ca03821aa675, which arranged (in 9.6+) to make remote queries interruptible. It was known at the time that the same problem existed in the back-branches, but I did not back-patch for lack of a user complaint. Michael Paquier and Etsuro Fujita, adjusted for older branches by me. Per gripe from Suraj Kharage. This doesn't directly addresss Suraj's gripe, but since the patch that will do so builds up on top of this work, it seems best to back-patch this part first. Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c109
1 files changed, 69 insertions, 40 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 7dd43a99379..5a4a47dbd2b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1064,7 +1064,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1386,18 +1386,24 @@ postgresExecForeignInsert(EState *estate,
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1456,18 +1462,24 @@ postgresExecForeignUpdate(EState *estate,
slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1526,18 +1538,24 @@ postgresExecForeignDelete(EState *estate,
NULL);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1587,7 +1605,7 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -1845,7 +1863,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -1974,12 +1992,18 @@ create_cursor(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecParams(conn, buf.data, numParams, NULL, values,
- NULL, NULL, 0);
+ res = pgfdw_get_result(conn, buf.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
@@ -2029,7 +2053,7 @@ fetch_more_data(ForeignScanState *node)
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fetch_size, fsstate->cursor_number);
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2136,7 +2160,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -2164,16 +2188,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* with the remote server using different type OIDs than we do. All of
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
+ */
+ if (!PQsendPrepare(fmstate->conn,
+ p_name,
+ fmstate->query,
+ 0,
+ NULL))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQprepare(fmstate->conn,
- p_name,
- fmstate->query,
- 0,
- NULL);
-
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
@@ -2323,7 +2352,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2417,7 +2446,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -2447,7 +2476,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
- res = PQexec(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);