diff options
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 99 |
1 files changed, 91 insertions, 8 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 304f3c20f83..caf14462696 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ + bool xact_read_only; /* xact r/o state */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ @@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* + * tracks the nesting level of the topmost read-only transaction determined + * by GetTopReadOnlyTransactionNestLevel() + */ +static int top_read_only_level = 0; + /* custom wait event values, retrieved from shared memory */ static uint32 pgfdw_we_cleanup_result = 0; static uint32 pgfdw_we_connect = 0; @@ -372,6 +379,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) /* Reset all transient state fields, to be sure all are clean */ entry->xact_depth = 0; + entry->xact_read_only = false; entry->have_prep_stmt = false; entry->have_error = false; entry->changing_xact_state = false; @@ -843,29 +851,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) * those scans. A disadvantage is that we can't provide sane emulation of * READ COMMITTED behavior --- it would be nice if we had some other way to * control which remote queries share a snapshot. + * + * Note also that we always start the remote transaction with the same + * read/write and deferrable properties as the local transaction, and start + * the remote subtransaction with the same read/write property as the local + * subtransaction. */ static void begin_remote_xact(ConnCacheEntry *entry) { int curlevel = GetCurrentTransactionNestLevel(); - /* Start main transaction if we haven't yet */ + /* + * Set the nesting level of the topmost read-only transaction if the + * current transaction is read-only and we haven't yet. Once it's set, + * it's retained until that transaction is committed/aborted, and then + * reset (see pgfdw_xact_callback and pgfdw_subxact_callback). + */ + if (XactReadOnly) + { + if (top_read_only_level == 0) + top_read_only_level = GetTopReadOnlyTransactionNestLevel(); + Assert(top_read_only_level > 0); + } + else + Assert(top_read_only_level == 0); + + /* + * Start main transaction if we haven't yet; otherwise, change the + * already-started remote transaction/subtransaction to read-only if the + * local transaction/subtransaction have been done so after starting them + * and we haven't yet. + */ if (entry->xact_depth <= 0) { - const char *sql; + StringInfoData sql; + bool ro = (top_read_only_level == 1); elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + initStringInfo(&sql); + appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL "); if (IsolationIsSerializable()) - sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; + appendStringInfoString(&sql, "SERIALIZABLE"); else - sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + appendStringInfoString(&sql, "REPEATABLE READ"); + if (ro) + appendStringInfoString(&sql, " READ ONLY"); + if (XactDeferrable) + appendStringInfoString(&sql, " DEFERRABLE"); entry->changing_xact_state = true; - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn, sql.data); entry->xact_depth = 1; + if (ro) + { + Assert(!entry->xact_read_only); + entry->xact_read_only = true; + } entry->changing_xact_state = false; } + else if (!entry->xact_read_only) + { + Assert(top_read_only_level == 0 || + entry->xact_depth <= top_read_only_level); + if (entry->xact_depth == top_read_only_level) + { + entry->changing_xact_state = true; + do_sql_command(entry->conn, "SET transaction_read_only = on"); + entry->xact_read_only = true; + entry->changing_xact_state = false; + } + } + else + Assert(top_read_only_level > 0 && + entry->xact_depth >= top_read_only_level); /* * If we're in a subtransaction, stack up savepoints to match our level. @@ -874,12 +934,21 @@ begin_remote_xact(ConnCacheEntry *entry) */ while (entry->xact_depth < curlevel) { - char sql[64]; + StringInfoData sql; + bool ro = (entry->xact_depth + 1 == top_read_only_level); - snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); + initStringInfo(&sql); + appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1); + if (ro) + appendStringInfoString(&sql, "; SET transaction_read_only = on"); entry->changing_xact_state = true; - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn, sql.data); entry->xact_depth++; + if (ro) + { + Assert(!entry->xact_read_only); + entry->xact_read_only = true; + } entry->changing_xact_state = false; } } @@ -1174,6 +1243,9 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Also reset cursor numbering for next transaction */ cursor_number = 0; + + /* Likewise for top_read_only_level */ + top_read_only_level = 0; } /* @@ -1272,6 +1344,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, false); } } + + /* If in the topmost read-only transaction, reset top_read_only_level */ + if (curlevel == top_read_only_level) + top_read_only_level = 0; } /* @@ -1374,6 +1450,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; + /* Reset xact r/o state */ + entry->xact_read_only = false; + /* * If the connection isn't in a good idle state, it is marked as * invalid or keep_connections option of its server is disabled, then @@ -1394,6 +1473,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) { /* Reset state to show we're out of a subtransaction */ entry->xact_depth--; + + /* If in the topmost read-only transaction, reset xact r/o state */ + if (entry->xact_depth + 1 == top_read_only_level) + entry->xact_read_only = false; } } |