aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c357
1 files changed, 138 insertions, 219 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index caf14462696..e8148f2c5a2 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,7 +58,6 @@ typedef struct ConnCacheEntry
/* Remaining fields are invalid when conn is NULL: */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
- bool xact_read_only; /* xact r/o state */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
@@ -85,12 +84,6 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
-/*
- * tracks the nesting level of the topmost read-only transaction determined
- * by GetTopReadOnlyTransactionNestLevel()
- */
-static int top_read_only_level = 0;
-
/* custom wait event values, retrieved from shared memory */
static uint32 pgfdw_we_cleanup_result = 0;
static uint32 pgfdw_we_connect = 0;
@@ -149,6 +142,8 @@ static void do_sql_command_begin(PGconn *conn, const char *sql);
static void do_sql_command_end(PGconn *conn, const char *sql,
bool consume_input);
static void begin_remote_xact(ConnCacheEntry *entry);
+static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
+ const char *sql);
static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
@@ -379,7 +374,6 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
/* Reset all transient state fields, to be sure all are clean */
entry->xact_depth = 0;
- entry->xact_read_only = false;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
@@ -633,6 +627,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
server->servername),
errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
+ PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
+ "received message via remote connection");
+
/* Perform post-connection security checks. */
pgfdw_security_check(keywords, values, user, conn);
@@ -820,7 +817,7 @@ static void
do_sql_command_begin(PGconn *conn, const char *sql)
{
if (!PQsendQuery(conn, sql))
- pgfdw_report_error(ERROR, NULL, conn, false, sql);
+ pgfdw_report_error(NULL, conn, sql);
}
static void
@@ -835,10 +832,10 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
* would be large compared to the overhead of PQconsumeInput.)
*/
if (consume_input && !PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL, conn, false, sql);
+ pgfdw_report_error(NULL, conn, sql);
res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, sql);
+ pgfdw_report_error(res, conn, sql);
PQclear(res);
}
@@ -851,81 +848,29 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
* those scans. A disadvantage is that we can't provide sane emulation of
* READ COMMITTED behavior --- it would be nice if we had some other way to
* control which remote queries share a snapshot.
- *
- * Note also that we always start the remote transaction with the same
- * read/write and deferrable properties as the local transaction, and start
- * the remote subtransaction with the same read/write property as the local
- * subtransaction.
*/
static void
begin_remote_xact(ConnCacheEntry *entry)
{
int curlevel = GetCurrentTransactionNestLevel();
- /*
- * Set the nesting level of the topmost read-only transaction if the
- * current transaction is read-only and we haven't yet. Once it's set,
- * it's retained until that transaction is committed/aborted, and then
- * reset (see pgfdw_xact_callback and pgfdw_subxact_callback).
- */
- if (XactReadOnly)
- {
- if (top_read_only_level == 0)
- top_read_only_level = GetTopReadOnlyTransactionNestLevel();
- Assert(top_read_only_level > 0);
- }
- else
- Assert(top_read_only_level == 0);
-
- /*
- * Start main transaction if we haven't yet; otherwise, change the
- * already-started remote transaction/subtransaction to read-only if the
- * local transaction/subtransaction have been done so after starting them
- * and we haven't yet.
- */
+ /* Start main transaction if we haven't yet */
if (entry->xact_depth <= 0)
{
- StringInfoData sql;
- bool ro = (top_read_only_level == 1);
+ const char *sql;
elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);
- initStringInfo(&sql);
- appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL ");
if (IsolationIsSerializable())
- appendStringInfoString(&sql, "SERIALIZABLE");
+ sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
- appendStringInfoString(&sql, "REPEATABLE READ");
- if (ro)
- appendStringInfoString(&sql, " READ ONLY");
- if (XactDeferrable)
- appendStringInfoString(&sql, " DEFERRABLE");
+ sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
entry->changing_xact_state = true;
- do_sql_command(entry->conn, sql.data);
+ do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
- if (ro)
- {
- Assert(!entry->xact_read_only);
- entry->xact_read_only = true;
- }
entry->changing_xact_state = false;
}
- else if (!entry->xact_read_only)
- {
- Assert(top_read_only_level == 0 ||
- entry->xact_depth <= top_read_only_level);
- if (entry->xact_depth == top_read_only_level)
- {
- entry->changing_xact_state = true;
- do_sql_command(entry->conn, "SET transaction_read_only = on");
- entry->xact_read_only = true;
- entry->changing_xact_state = false;
- }
- }
- else
- Assert(top_read_only_level > 0 &&
- entry->xact_depth >= top_read_only_level);
/*
* If we're in a subtransaction, stack up savepoints to match our level.
@@ -934,21 +879,12 @@ begin_remote_xact(ConnCacheEntry *entry)
*/
while (entry->xact_depth < curlevel)
{
- StringInfoData sql;
- bool ro = (entry->xact_depth + 1 == top_read_only_level);
+ char sql[64];
- initStringInfo(&sql);
- appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1);
- if (ro)
- appendStringInfoString(&sql, "; SET transaction_read_only = on");
+ snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
entry->changing_xact_state = true;
- do_sql_command(entry->conn, sql.data);
+ do_sql_command(entry->conn, sql);
entry->xact_depth++;
- if (ro)
- {
- Assert(!entry->xact_read_only);
- entry->xact_read_only = true;
- }
entry->changing_xact_state = false;
}
}
@@ -1032,63 +968,73 @@ pgfdw_get_result(PGconn *conn)
/*
* Report an error we got from the remote server.
*
- * elevel: error level to use (typically ERROR, but might be less)
- * res: PGresult containing the error
+ * Callers should use pgfdw_report_error() to throw an error, or use
+ * pgfdw_report() for lesser message levels. (We make this distinction
+ * so that pgfdw_report_error() can be marked noreturn.)
+ *
+ * res: PGresult containing the error (might be NULL)
* conn: connection we did the query on
- * clear: if true, PQclear the result (otherwise caller will handle it)
* sql: NULL, or text of remote command we tried to execute
*
+ * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
+ * in which case memory context cleanup will clear it eventually).
+ *
* Note: callers that choose not to throw ERROR for a remote error are
* responsible for making sure that the associated ConnCacheEntry gets
* marked with have_error = true.
*/
void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
- bool clear, const char *sql)
+pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
{
- /* If requested, PGresult must be released before leaving this function. */
- PG_TRY();
- {
- char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
- char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
- char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
- char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
- int sqlstate;
-
- if (diag_sqlstate)
- sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
- diag_sqlstate[1],
- diag_sqlstate[2],
- diag_sqlstate[3],
- diag_sqlstate[4]);
- else
- sqlstate = ERRCODE_CONNECTION_FAILURE;
+ pgfdw_report_internal(ERROR, res, conn, sql);
+ pg_unreachable();
+}
- /*
- * If we don't get a message from the PGresult, try the PGconn. This
- * is needed because for connection-level failures, PQgetResult may
- * just return NULL, not a PGresult at all.
- */
- if (message_primary == NULL)
- message_primary = pchomp(PQerrorMessage(conn));
-
- ereport(elevel,
- (errcode(sqlstate),
- (message_primary != NULL && message_primary[0] != '\0') ?
- errmsg_internal("%s", message_primary) :
- errmsg("could not obtain message string for remote error"),
- message_detail ? errdetail_internal("%s", message_detail) : 0,
- message_hint ? errhint("%s", message_hint) : 0,
- message_context ? errcontext("%s", message_context) : 0,
- sql ? errcontext("remote SQL command: %s", sql) : 0));
- }
- PG_FINALLY();
- {
- if (clear)
- PQclear(res);
- }
- PG_END_TRY();
+void
+pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
+{
+ Assert(elevel < ERROR); /* use pgfdw_report_error for that */
+ pgfdw_report_internal(elevel, res, conn, sql);
+}
+
+static void
+pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
+ const char *sql)
+{
+ char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+ char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+ char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+ char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+ int sqlstate;
+
+ if (diag_sqlstate)
+ sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+ diag_sqlstate[1],
+ diag_sqlstate[2],
+ diag_sqlstate[3],
+ diag_sqlstate[4]);
+ else
+ sqlstate = ERRCODE_CONNECTION_FAILURE;
+
+ /*
+ * If we don't get a message from the PGresult, try the PGconn. This is
+ * needed because for connection-level failures, PQgetResult may just
+ * return NULL, not a PGresult at all.
+ */
+ if (message_primary == NULL)
+ message_primary = pchomp(PQerrorMessage(conn));
+
+ ereport(elevel,
+ (errcode(sqlstate),
+ (message_primary != NULL && message_primary[0] != '\0') ?
+ errmsg_internal("%s", message_primary) :
+ errmsg("could not obtain message string for remote error"),
+ message_detail ? errdetail_internal("%s", message_detail) : 0,
+ message_hint ? errhint("%s", message_hint) : 0,
+ message_context ? errcontext("%s", message_context) : 0,
+ sql ? errcontext("remote SQL command: %s", sql) : 0));
+ PQclear(res);
}
/*
@@ -1243,9 +1189,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Also reset cursor numbering for next transaction */
cursor_number = 0;
-
- /* Likewise for top_read_only_level */
- top_read_only_level = 0;
}
/*
@@ -1344,10 +1287,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
false);
}
}
-
- /* If in the topmost read-only transaction, reset top_read_only_level */
- if (curlevel == top_read_only_level)
- top_read_only_level = 0;
}
/*
@@ -1450,9 +1389,6 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
- /* Reset xact r/o state */
- entry->xact_read_only = false;
-
/*
* If the connection isn't in a good idle state, it is marked as
* invalid or keep_connections option of its server is disabled, then
@@ -1473,10 +1409,6 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
{
/* Reset state to show we're out of a subtransaction */
entry->xact_depth--;
-
- /* If in the topmost read-only transaction, reset xact r/o state */
- if (entry->xact_depth + 1 == top_read_only_level)
- entry->xact_read_only = false;
}
}
@@ -1625,7 +1557,7 @@ pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
*/
if (!PQsendQuery(conn, query))
{
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report(WARNING, NULL, conn, query);
return false;
}
@@ -1650,7 +1582,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
*/
if (consume_input && !PQconsumeInput(conn))
{
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report(WARNING, NULL, conn, query);
return false;
}
@@ -1662,7 +1594,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
(errmsg("could not get query result due to timeout"),
errcontext("remote SQL command: %s", query)));
else
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report(WARNING, NULL, conn, query);
return false;
}
@@ -1670,7 +1602,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
/* Issue a warning if not successful. */
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
- pgfdw_report_error(WARNING, result, conn, true, query);
+ pgfdw_report(WARNING, result, conn, query);
return ignore_errors;
}
PQclear(result);
@@ -1698,103 +1630,90 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result,
bool *timed_out)
{
- volatile bool failed = false;
- PGresult *volatile last_res = NULL;
+ bool failed = false;
+ PGresult *last_res = NULL;
+ int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
*result = NULL;
*timed_out = false;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
+ for (;;)
{
- int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
+ PGresult *res;
- for (;;)
+ while (PQisBusy(conn))
{
- PGresult *res;
+ int wc;
+ TimestampTz now = GetCurrentTimestamp();
+ long cur_timeout;
- while (PQisBusy(conn))
+ /* If timeout has expired, give up. */
+ if (now >= endtime)
{
- int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
-
- /* If timeout has expired, give up. */
- if (now >= endtime)
- {
- *timed_out = true;
- failed = true;
- goto exit;
- }
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
- /* If we need to re-issue the cancel request, do that. */
- if (now >= retrycanceltime)
- {
- /* We ignore failure to issue the repeated request. */
- (void) libpqsrv_cancel(conn, endtime);
+ /* If we need to re-issue the cancel request, do that. */
+ if (now >= retrycanceltime)
+ {
+ /* We ignore failure to issue the repeated request. */
+ (void) libpqsrv_cancel(conn, endtime);
- /* Recompute "now" in case that took measurable time. */
- now = GetCurrentTimestamp();
+ /* Recompute "now" in case that took measurable time. */
+ now = GetCurrentTimestamp();
- /* Adjust re-cancel timeout in increasing steps. */
- retrycanceltime = TimestampTzPlusMilliseconds(now,
- canceldelta);
- canceldelta += canceldelta;
- }
+ /* Adjust re-cancel timeout in increasing steps. */
+ retrycanceltime = TimestampTzPlusMilliseconds(now,
+ canceldelta);
+ canceldelta += canceldelta;
+ }
- /* If timeout has expired, give up, else get sleep time. */
- cur_timeout = TimestampDifferenceMilliseconds(now,
- Min(endtime,
- retrycanceltime));
- if (cur_timeout <= 0)
- {
- *timed_out = true;
- failed = true;
- goto exit;
- }
+ /* If timeout has expired, give up, else get sleep time. */
+ cur_timeout = TimestampDifferenceMilliseconds(now,
+ Min(endtime,
+ retrycanceltime));
+ if (cur_timeout <= 0)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
- /* first time, allocate or get the custom wait event */
- if (pgfdw_we_cleanup_result == 0)
- pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
+ /* first time, allocate or get the custom wait event */
+ if (pgfdw_we_cleanup_result == 0)
+ pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
- cur_timeout, pgfdw_we_cleanup_result);
- ResetLatch(MyLatch);
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ PQsocket(conn),
+ cur_timeout, pgfdw_we_cleanup_result);
+ ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
+ /* Data available in socket? */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
{
- if (!PQconsumeInput(conn))
- {
- /* connection trouble */
- failed = true;
- goto exit;
- }
+ /* connection trouble */
+ failed = true;
+ goto exit;
}
}
+ }
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ break; /* query is complete */
- PQclear(last_res);
- last_res = res;
- }
-exit: ;
- }
- PG_CATCH();
- {
PQclear(last_res);
- PG_RE_THROW();
+ last_res = res;
}
- PG_END_TRY();
-
+exit:
if (failed)
PQclear(last_res);
else