aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
authorEtsuro Fujita <efujita@postgresql.org>2023-04-06 17:30:00 +0900
committerEtsuro Fujita <efujita@postgresql.org>2023-04-06 17:30:00 +0900
commit983ec23007bd83a649af9bc823f13feb0da27e0e (patch)
treed16c129d9a721b8ca9001390dc67bd58d072f451 /contrib/postgres_fdw/connection.c
parentb9b125b9c14381c4d04a446e335bb2da5f602596 (diff)
downloadpostgresql-983ec23007bd83a649af9bc823f13feb0da27e0e.tar.gz
postgresql-983ec23007bd83a649af9bc823f13feb0da27e0e.zip
postgres_fdw: Add support for parallel abort.
postgres_fdw aborts remote (sub)transactions opened on remote server(s) in a local (sub)transaction one by one when the local (sub)transaction aborts. This patch allows it to abort the remote (sub)transactions in parallel to improve performance. This is enabled by the server option "parallel_abort". The default is false. Etsuro Fujita, reviewed by David Zhang. Discussion: http://postgr.es/m/CAPmGK15FuPVGx3TGHKShsbPKKtF1y58-ZLcKoxfN-nqLj1dZ%3Dg%40mail.gmail.com
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c403
1 files changed, 378 insertions, 25 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8eb9194506c..2969351e9a9 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -60,6 +60,7 @@ typedef struct ConnCacheEntry
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
+ bool parallel_abort; /* do we abort (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
@@ -82,6 +83,25 @@ static unsigned int prep_stmt_number = 0;
static bool xact_got_connection = false;
/*
+ * Milliseconds to wait to cancel an in-progress query or execute a cleanup
+ * query; if it takes longer than 30 seconds to do these, we assume the
+ * connection is dead.
+ */
+#define CONNECTION_CLEANUP_TIMEOUT 30000
+
+/* Macro for constructing abort command to be sent */
+#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
+ do { \
+ if (toplevel) \
+ snprintf((sql), sizeof(sql), \
+ "ABORT TRANSACTION"); \
+ else \
+ snprintf((sql), sizeof(sql), \
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
+ (entry)->xact_depth, (entry)->xact_depth); \
+ } while(0)
+
+/*
* SQL functions
*/
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
@@ -107,14 +127,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
+static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
+static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime,
+ bool consume_input,
+ bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries,
+ List **cancel_requested);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
+static void pgfdw_finish_abort_cleanup(List *pending_entries,
+ List *cancel_requested,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -320,8 +354,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*
* By default, all the connections to any foreign servers are kept open.
*
- * Also determine whether to commit (sub)transactions opened on the remote
- * server in parallel at (sub)transaction end, which is disabled by
+ * Also determine whether to commit/abort (sub)transactions opened on the
+ * remote server in parallel at (sub)transaction end, which is disabled by
* default.
*
* Note: it's enough to determine these only when making a new connection
@@ -330,6 +364,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*/
entry->keep_connections = true;
entry->parallel_commit = false;
+ entry->parallel_abort = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
@@ -338,6 +373,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->keep_connections = defGetBoolean(def);
else if (strcmp(def->defname, "parallel_commit") == 0)
entry->parallel_commit = defGetBoolean(def);
+ else if (strcmp(def->defname, "parallel_abort") == 0)
+ entry->parallel_abort = defGetBoolean(def);
}
/* Now try to make the connection */
@@ -892,6 +929,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
@@ -985,7 +1023,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Rollback all remote transactions during abort */
- pgfdw_abort_cleanup(entry, true);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, true,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, true);
break;
}
}
@@ -995,11 +1041,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
- event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+ event == XACT_EVENT_PRE_COMMIT)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_commit_cleanup(pending_entries);
+ }
+ else
+ {
+ Assert(event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ true);
+ }
}
/*
@@ -1024,6 +1080,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1078,7 +1135,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
else
{
/* Rollback all remote subtransactions during abort */
- pgfdw_abort_cleanup(entry, false);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, false,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, false);
}
/* OK, we're outta that level of subtransaction */
@@ -1086,10 +1151,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ }
+ else
+ {
+ Assert(event == SUBXACT_EVENT_ABORT_SUB);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ false);
+ }
}
}
@@ -1233,17 +1307,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query(PGconn *conn)
{
- PGcancel *cancel;
- char errbuf[256];
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_begin(conn))
+ return false;
+ return pgfdw_cancel_query_end(conn, endtime, false);
+}
+
+static bool
+pgfdw_cancel_query_begin(PGconn *conn)
+{
+ PGcancel *cancel;
+ char errbuf[256];
/*
* Issue cancel request. Unfortunately, there's no good way to limit the
@@ -1263,6 +1345,30 @@ pgfdw_cancel_query(PGconn *conn)
PQfreeCancel(cancel);
}
+ return true;
+}
+
+static bool
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not get result of cancel request: %s",
+ pchomp(PQerrorMessage(conn)))));
+ return false;
+ }
+
/* Get and discard the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1297,9 +1403,7 @@ pgfdw_cancel_query(PGconn *conn)
static bool
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
{
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to execute a cleanup query, assume the connection
@@ -1307,8 +1411,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
* be too long.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_begin(conn, query))
+ return false;
+ return pgfdw_exec_cleanup_query_end(conn, query, endtime,
+ false, ignore_errors);
+}
+static bool
+pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
+{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -1319,6 +1433,29 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
return false;
}
+ return true;
+}
+
+static bool
+pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime, bool consume_input,
+ bool ignore_errors)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ pgfdw_report_error(WARNING, NULL, conn, false, query);
+ return false;
+ }
+
/* Get the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1474,12 +1611,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
!pgfdw_cancel_query(entry->conn))
return; /* Unable to cancel running query */
- if (toplevel)
- snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
- else
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- entry->xact_depth, entry->xact_depth);
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
return; /* Unable to abort remote (sub)transaction */
@@ -1509,6 +1641,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
}
/*
+ * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
+ * don't wait for the result.
+ *
+ * Returns true if the abort command or cancel request is successfully issued,
+ * false otherwise. If the abort command is successfully issued, the given
+ * connection cache entry is appended to *pending_entries. Othewise, if the
+ * cancel request is successfully issued, it is appended to *cancel_requested.
+ */
+static bool
+pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries, List **cancel_requested)
+{
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
+
+ /*
+ * If connection is already unsalvageable, don't touch it further.
+ */
+ if (entry->changing_xact_state)
+ return false;
+
+ /*
+ * Mark this connection as in the process of changing transaction state.
+ */
+ entry->changing_xact_state = true;
+
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server by using an
+ * asynchronous execution function, the command might not have yet
+ * completed. Check to see if a command is still being processed by the
+ * remote server, and if so, request cancellation of the command.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ if (!pgfdw_cancel_query_begin(entry->conn))
+ return false; /* Unable to cancel running query */
+ *cancel_requested = lappend(*cancel_requested, entry);
+ }
+ else
+ {
+ char sql[100];
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ return false; /* Unable to abort remote transaction */
+ *pending_entries = lappend(*pending_entries, entry);
+ }
+
+ return true;
+}
+
+/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
*/
@@ -1617,6 +1808,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
}
/*
+ * Finish abort cleanup of connections on each of which we've sent an abort
+ * command or cancel request to the remote server.
+ */
+static void
+pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
+ bool toplevel)
+{
+ List *pending_deallocs = NIL;
+ ListCell *lc;
+
+ /*
+ * For each of the pending cancel requests (if any), get and discard the
+ * result of the query, and submit an abort command to the remote server.
+ */
+ if (cancel_requested)
+ {
+ foreach(lc, cancel_requested)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. You might think we should do this before issuing
+ * cancel request like in normal mode, but that is problematic,
+ * because if, for example, it took longer than 30 seconds to
+ * process the first few entries in the cancel_requested list, it
+ * would cause a timeout error when processing each of the
+ * remaining entries in the list, leading to slamming that entry's
+ * connection shut.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+ {
+ /* Unable to cancel running query */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ /* Send an abort command in parallel if needed */
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_entries = lappend(pending_entries, entry);
+ }
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_entries)
+ return;
+
+ /*
+ * Get the result of the abort command for each of the pending entries
+ */
+ foreach(lc, pending_entries)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
+ true, false))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ if (toplevel)
+ {
+ /* Do a DEALLOCATE ALL in parallel if needed */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn,
+ "DEALLOCATE ALL"))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_deallocs)
+ return;
+ Assert(toplevel);
+
+ /*
+ * Get the result of the DEALLOCATE command for each of the pending
+ * entries
+ */
+ foreach(lc, pending_deallocs)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+
+ Assert(entry->changing_xact_state);
+ Assert(entry->have_prep_stmt);
+ Assert(entry->have_error);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
+ endtime, true, true))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+}
+
+/*
* List active foreign server connections.
*
* This function takes no input parameter and returns setof record made of