diff options
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 357 |
1 files changed, 138 insertions, 219 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index caf14462696..e8148f2c5a2 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,7 +58,6 @@ typedef struct ConnCacheEntry /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ - bool xact_read_only; /* xact r/o state */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ @@ -85,12 +84,6 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; -/* - * tracks the nesting level of the topmost read-only transaction determined - * by GetTopReadOnlyTransactionNestLevel() - */ -static int top_read_only_level = 0; - /* custom wait event values, retrieved from shared memory */ static uint32 pgfdw_we_cleanup_result = 0; static uint32 pgfdw_we_connect = 0; @@ -149,6 +142,8 @@ static void do_sql_command_begin(PGconn *conn, const char *sql); static void do_sql_command_end(PGconn *conn, const char *sql, bool consume_input); static void begin_remote_xact(ConnCacheEntry *entry); +static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn, + const char *sql); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, @@ -379,7 +374,6 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) /* Reset all transient state fields, to be sure all are clean */ entry->xact_depth = 0; - entry->xact_read_only = false; entry->have_prep_stmt = false; entry->have_error = false; entry->changing_xact_state = false; @@ -633,6 +627,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user) server->servername), errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, + "received message via remote connection"); + /* Perform post-connection security checks. */ pgfdw_security_check(keywords, values, user, conn); @@ -820,7 +817,7 @@ static void do_sql_command_begin(PGconn *conn, const char *sql) { if (!PQsendQuery(conn, sql)) - pgfdw_report_error(ERROR, NULL, conn, false, sql); + pgfdw_report_error(NULL, conn, sql); } static void @@ -835,10 +832,10 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) * would be large compared to the overhead of PQconsumeInput.) */ if (consume_input && !PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, sql); + pgfdw_report_error(NULL, conn, sql); res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); + pgfdw_report_error(res, conn, sql); PQclear(res); } @@ -851,81 +848,29 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) * those scans. A disadvantage is that we can't provide sane emulation of * READ COMMITTED behavior --- it would be nice if we had some other way to * control which remote queries share a snapshot. - * - * Note also that we always start the remote transaction with the same - * read/write and deferrable properties as the local transaction, and start - * the remote subtransaction with the same read/write property as the local - * subtransaction. */ static void begin_remote_xact(ConnCacheEntry *entry) { int curlevel = GetCurrentTransactionNestLevel(); - /* - * Set the nesting level of the topmost read-only transaction if the - * current transaction is read-only and we haven't yet. Once it's set, - * it's retained until that transaction is committed/aborted, and then - * reset (see pgfdw_xact_callback and pgfdw_subxact_callback). - */ - if (XactReadOnly) - { - if (top_read_only_level == 0) - top_read_only_level = GetTopReadOnlyTransactionNestLevel(); - Assert(top_read_only_level > 0); - } - else - Assert(top_read_only_level == 0); - - /* - * Start main transaction if we haven't yet; otherwise, change the - * already-started remote transaction/subtransaction to read-only if the - * local transaction/subtransaction have been done so after starting them - * and we haven't yet. - */ + /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { - StringInfoData sql; - bool ro = (top_read_only_level == 1); + const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); - initStringInfo(&sql); - appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL "); if (IsolationIsSerializable()) - appendStringInfoString(&sql, "SERIALIZABLE"); + sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else - appendStringInfoString(&sql, "REPEATABLE READ"); - if (ro) - appendStringInfoString(&sql, " READ ONLY"); - if (XactDeferrable) - appendStringInfoString(&sql, " DEFERRABLE"); + sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; entry->changing_xact_state = true; - do_sql_command(entry->conn, sql.data); + do_sql_command(entry->conn, sql); entry->xact_depth = 1; - if (ro) - { - Assert(!entry->xact_read_only); - entry->xact_read_only = true; - } entry->changing_xact_state = false; } - else if (!entry->xact_read_only) - { - Assert(top_read_only_level == 0 || - entry->xact_depth <= top_read_only_level); - if (entry->xact_depth == top_read_only_level) - { - entry->changing_xact_state = true; - do_sql_command(entry->conn, "SET transaction_read_only = on"); - entry->xact_read_only = true; - entry->changing_xact_state = false; - } - } - else - Assert(top_read_only_level > 0 && - entry->xact_depth >= top_read_only_level); /* * If we're in a subtransaction, stack up savepoints to match our level. @@ -934,21 +879,12 @@ begin_remote_xact(ConnCacheEntry *entry) */ while (entry->xact_depth < curlevel) { - StringInfoData sql; - bool ro = (entry->xact_depth + 1 == top_read_only_level); + char sql[64]; - initStringInfo(&sql); - appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1); - if (ro) - appendStringInfoString(&sql, "; SET transaction_read_only = on"); + snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); entry->changing_xact_state = true; - do_sql_command(entry->conn, sql.data); + do_sql_command(entry->conn, sql); entry->xact_depth++; - if (ro) - { - Assert(!entry->xact_read_only); - entry->xact_read_only = true; - } entry->changing_xact_state = false; } } @@ -1032,63 +968,73 @@ pgfdw_get_result(PGconn *conn) /* * Report an error we got from the remote server. * - * elevel: error level to use (typically ERROR, but might be less) - * res: PGresult containing the error + * Callers should use pgfdw_report_error() to throw an error, or use + * pgfdw_report() for lesser message levels. (We make this distinction + * so that pgfdw_report_error() can be marked noreturn.) + * + * res: PGresult containing the error (might be NULL) * conn: connection we did the query on - * clear: if true, PQclear the result (otherwise caller will handle it) * sql: NULL, or text of remote command we tried to execute * + * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error, + * in which case memory context cleanup will clear it eventually). + * * Note: callers that choose not to throw ERROR for a remote error are * responsible for making sure that the associated ConnCacheEntry gets * marked with have_error = true. */ void -pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, - bool clear, const char *sql) +pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql) { - /* If requested, PGresult must be released before leaving this function. */ - PG_TRY(); - { - char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); - char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); - char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); - char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); - char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); - int sqlstate; - - if (diag_sqlstate) - sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], - diag_sqlstate[1], - diag_sqlstate[2], - diag_sqlstate[3], - diag_sqlstate[4]); - else - sqlstate = ERRCODE_CONNECTION_FAILURE; + pgfdw_report_internal(ERROR, res, conn, sql); + pg_unreachable(); +} - /* - * If we don't get a message from the PGresult, try the PGconn. This - * is needed because for connection-level failures, PQgetResult may - * just return NULL, not a PGresult at all. - */ - if (message_primary == NULL) - message_primary = pchomp(PQerrorMessage(conn)); - - ereport(elevel, - (errcode(sqlstate), - (message_primary != NULL && message_primary[0] != '\0') ? - errmsg_internal("%s", message_primary) : - errmsg("could not obtain message string for remote error"), - message_detail ? errdetail_internal("%s", message_detail) : 0, - message_hint ? errhint("%s", message_hint) : 0, - message_context ? errcontext("%s", message_context) : 0, - sql ? errcontext("remote SQL command: %s", sql) : 0)); - } - PG_FINALLY(); - { - if (clear) - PQclear(res); - } - PG_END_TRY(); +void +pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql) +{ + Assert(elevel < ERROR); /* use pgfdw_report_error for that */ + pgfdw_report_internal(elevel, res, conn, sql); +} + +static void +pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn, + const char *sql) +{ + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); + int sqlstate; + + if (diag_sqlstate) + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * If we don't get a message from the PGresult, try the PGconn. This is + * needed because for connection-level failures, PQgetResult may just + * return NULL, not a PGresult at all. + */ + if (message_primary == NULL) + message_primary = pchomp(PQerrorMessage(conn)); + + ereport(elevel, + (errcode(sqlstate), + (message_primary != NULL && message_primary[0] != '\0') ? + errmsg_internal("%s", message_primary) : + errmsg("could not obtain message string for remote error"), + message_detail ? errdetail_internal("%s", message_detail) : 0, + message_hint ? errhint("%s", message_hint) : 0, + message_context ? errcontext("%s", message_context) : 0, + sql ? errcontext("remote SQL command: %s", sql) : 0)); + PQclear(res); } /* @@ -1243,9 +1189,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Also reset cursor numbering for next transaction */ cursor_number = 0; - - /* Likewise for top_read_only_level */ - top_read_only_level = 0; } /* @@ -1344,10 +1287,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, false); } } - - /* If in the topmost read-only transaction, reset top_read_only_level */ - if (curlevel == top_read_only_level) - top_read_only_level = 0; } /* @@ -1450,9 +1389,6 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; - /* Reset xact r/o state */ - entry->xact_read_only = false; - /* * If the connection isn't in a good idle state, it is marked as * invalid or keep_connections option of its server is disabled, then @@ -1473,10 +1409,6 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) { /* Reset state to show we're out of a subtransaction */ entry->xact_depth--; - - /* If in the topmost read-only transaction, reset xact r/o state */ - if (entry->xact_depth + 1 == top_read_only_level) - entry->xact_read_only = false; } } @@ -1625,7 +1557,7 @@ pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query) */ if (!PQsendQuery(conn, query)) { - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report(WARNING, NULL, conn, query); return false; } @@ -1650,7 +1582,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, */ if (consume_input && !PQconsumeInput(conn)) { - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report(WARNING, NULL, conn, query); return false; } @@ -1662,7 +1594,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, (errmsg("could not get query result due to timeout"), errcontext("remote SQL command: %s", query))); else - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report(WARNING, NULL, conn, query); return false; } @@ -1670,7 +1602,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, /* Issue a warning if not successful. */ if (PQresultStatus(result) != PGRES_COMMAND_OK) { - pgfdw_report_error(WARNING, result, conn, true, query); + pgfdw_report(WARNING, result, conn, query); return ignore_errors; } PQclear(result); @@ -1698,103 +1630,90 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out) { - volatile bool failed = false; - PGresult *volatile last_res = NULL; + bool failed = false; + PGresult *last_res = NULL; + int canceldelta = RETRY_CANCEL_TIMEOUT * 2; *result = NULL; *timed_out = false; - - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); + for (;;) { - int canceldelta = RETRY_CANCEL_TIMEOUT * 2; + PGresult *res; - for (;;) + while (PQisBusy(conn)) { - PGresult *res; + int wc; + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; - while (PQisBusy(conn)) + /* If timeout has expired, give up. */ + if (now >= endtime) { - int wc; - TimestampTz now = GetCurrentTimestamp(); - long cur_timeout; - - /* If timeout has expired, give up. */ - if (now >= endtime) - { - *timed_out = true; - failed = true; - goto exit; - } + *timed_out = true; + failed = true; + goto exit; + } - /* If we need to re-issue the cancel request, do that. */ - if (now >= retrycanceltime) - { - /* We ignore failure to issue the repeated request. */ - (void) libpqsrv_cancel(conn, endtime); + /* If we need to re-issue the cancel request, do that. */ + if (now >= retrycanceltime) + { + /* We ignore failure to issue the repeated request. */ + (void) libpqsrv_cancel(conn, endtime); - /* Recompute "now" in case that took measurable time. */ - now = GetCurrentTimestamp(); + /* Recompute "now" in case that took measurable time. */ + now = GetCurrentTimestamp(); - /* Adjust re-cancel timeout in increasing steps. */ - retrycanceltime = TimestampTzPlusMilliseconds(now, - canceldelta); - canceldelta += canceldelta; - } + /* Adjust re-cancel timeout in increasing steps. */ + retrycanceltime = TimestampTzPlusMilliseconds(now, + canceldelta); + canceldelta += canceldelta; + } - /* If timeout has expired, give up, else get sleep time. */ - cur_timeout = TimestampDifferenceMilliseconds(now, - Min(endtime, - retrycanceltime)); - if (cur_timeout <= 0) - { - *timed_out = true; - failed = true; - goto exit; - } + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, + Min(endtime, + retrycanceltime)); + if (cur_timeout <= 0) + { + *timed_out = true; + failed = true; + goto exit; + } - /* first time, allocate or get the custom wait event */ - if (pgfdw_we_cleanup_result == 0) - pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult"); + /* first time, allocate or get the custom wait event */ + if (pgfdw_we_cleanup_result == 0) + pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult"); - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - cur_timeout, pgfdw_we_cleanup_result); - ResetLatch(MyLatch); + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + cur_timeout, pgfdw_we_cleanup_result); + ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) { - if (!PQconsumeInput(conn)) - { - /* connection trouble */ - failed = true; - goto exit; - } + /* connection trouble */ + failed = true; + goto exit; } } + } - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ - PQclear(last_res); - last_res = res; - } -exit: ; - } - PG_CATCH(); - { PQclear(last_res); - PG_RE_THROW(); + last_res = res; } - PG_END_TRY(); - +exit: if (failed) PQclear(last_res); else |