diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2013-02-21 05:26:23 -0500 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2013-02-21 05:27:16 -0500 |
commit | d0d75c402217421b691050857eb3d7af82d0c770 (patch) | |
tree | 1d934b75b12e41c80520ce3aea6830e3bbe4b718 /contrib/postgres_fdw/connection.c | |
parent | f435cd1d385859a0cdb1d70fccc21dde2b1ee116 (diff) | |
download | postgresql-d0d75c402217421b691050857eb3d7af82d0c770.tar.gz postgresql-d0d75c402217421b691050857eb3d7af82d0c770.zip |
Add postgres_fdw contrib module.
There's still a lot of room for improvement, but it basically works,
and we need this to be present before we can do anything much with the
writable-foreign-tables patch. So let's commit it and get on with testing.
Shigeru Hanada, reviewed by KaiGai Kohei and Tom Lane
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c new file mode 100644 index 00000000000..62ccea4c460 --- /dev/null +++ b/contrib/postgres_fdw/connection.c @@ -0,0 +1,581 @@ +/*------------------------------------------------------------------------- + * + * connection.c + * Connection management functions for postgres_fdw + * + * Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/connection.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "postgres_fdw.h" + +#include "access/xact.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +/* + * Connection cache hash table entry + * + * The lookup key in this hash table is the foreign server OID plus the user + * mapping OID. (We use just one connection per user per foreign server, + * so that we can ensure all scans use the same snapshot during a query.) + * + * The "conn" pointer can be NULL if we don't currently have a live connection. + * When we do have a connection, xact_depth tracks the current depth of + * transactions and subtransactions open on the remote side. We need to issue + * commands at the same nesting depth on the remote as we're executing at + * ourselves, so that rolling back a subtransaction will kill the right + * queries and not the wrong ones. + */ +typedef struct ConnCacheKey +{ + Oid serverid; /* OID of foreign server */ + Oid userid; /* OID of local user whose mapping we use */ +} ConnCacheKey; + +typedef struct ConnCacheEntry +{ + ConnCacheKey key; /* hash key (must be first) */ + PGconn *conn; /* connection to foreign server, or NULL */ + int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = + * one level of subxact open, etc */ +} ConnCacheEntry; + +/* + * Connection cache (initialized on first use) + */ +static HTAB *ConnectionHash = NULL; + +/* for assigning cursor numbers */ +static unsigned int cursor_number = 0; + +/* tracks whether any work is needed in callback functions */ +static bool xact_got_connection = false; + +/* prototypes of private functions */ +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void check_conn_params(const char **keywords, const char **values); +static void begin_remote_xact(ConnCacheEntry *entry); +static void pgfdw_xact_callback(XactEvent event, void *arg); +static void pgfdw_subxact_callback(SubXactEvent event, + SubTransactionId mySubid, + SubTransactionId parentSubid, + void *arg); + + +/* + * Get a PGconn which can be used to execute queries on the remote PostgreSQL + * server with the user's authorization. A new connection is established + * if we don't already have a suitable one, and a transaction is opened at + * the right subtransaction nesting depth if we didn't do that already. + * + * XXX Note that caching connections theoretically requires a mechanism to + * detect change of FDW objects to invalidate already established connections. + * We could manage that by watching for invalidation events on the relevant + * syscaches. For the moment, though, it's not clear that this would really + * be useful and not mere pedantry. We could not flush any active connections + * mid-transaction anyway. + */ +PGconn * +GetConnection(ForeignServer *server, UserMapping *user) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + /* First time through, initialize connection cache hashtable */ + if (ConnectionHash == NULL) + { + HASHCTL ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(ConnCacheKey); + ctl.entrysize = sizeof(ConnCacheEntry); + ctl.hash = tag_hash; + /* allocate ConnectionHash in the cache context */ + ctl.hcxt = CacheMemoryContext; + ConnectionHash = hash_create("postgres_fdw connections", 8, + &ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* + * Register some callback functions that manage connection cleanup. + * This should be done just once in each backend. + */ + RegisterXactCallback(pgfdw_xact_callback, NULL); + RegisterSubXactCallback(pgfdw_subxact_callback, NULL); + } + + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key.serverid = server->serverid; + key.userid = user->userid; + + /* + * Find or create cached entry for requested connection. + */ + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + /* initialize new hashtable entry (key is already filled in) */ + entry->conn = NULL; + entry->xact_depth = 0; + } + + /* + * We don't check the health of cached connection here, because it would + * require some overhead. Broken connection will be detected when the + * connection is actually used. + */ + + /* + * If cache entry doesn't have a connection, we have to establish a new + * connection. (If connect_pg_server throws an error, the cache entry + * will be left in a valid empty state.) + */ + if (entry->conn == NULL) + { + entry->xact_depth = 0; /* just to be sure */ + entry->conn = connect_pg_server(server, user); + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", + entry->conn, server->servername); + } + + /* + * Start a new transaction or subtransaction if needed. + */ + begin_remote_xact(entry); + + return entry->conn; +} + +/* + * Connect to remote server using specified server and user mapping properties. + */ +static PGconn * +connect_pg_server(ForeignServer *server, UserMapping *user) +{ + PGconn *volatile conn = NULL; + + /* + * Use PG_TRY block to ensure closing connection on error. + */ + PG_TRY(); + { + const char **keywords; + const char **values; + int n; + + /* + * Construct connection params from generic options of ForeignServer + * and UserMapping. (Some of them might not be libpq options, in + * which case we'll just waste a few array slots.) Add 3 extra slots + * for fallback_application_name, client_encoding, end marker. + */ + n = list_length(server->options) + list_length(user->options) + 3; + keywords = (const char **) palloc(n * sizeof(char *)); + values = (const char **) palloc(n * sizeof(char *)); + + n = 0; + n += ExtractConnectionOptions(server->options, + keywords + n, values + n); + n += ExtractConnectionOptions(user->options, + keywords + n, values + n); + + /* Use "postgres_fdw" as fallback_application_name. */ + keywords[n] = "fallback_application_name"; + values[n] = "postgres_fdw"; + n++; + + /* Set client_encoding so that libpq can convert encoding properly. */ + keywords[n] = "client_encoding"; + values[n] = GetDatabaseEncodingName(); + n++; + + keywords[n] = values[n] = NULL; + + /* verify connection parameters and make connection */ + check_conn_params(keywords, values); + + conn = PQconnectdbParams(keywords, values, false); + if (!conn || PQstatus(conn) != CONNECTION_OK) + { + char *connmessage; + int msglen; + + /* libpq typically appends a newline, strip that */ + connmessage = pstrdup(PQerrorMessage(conn)); + msglen = strlen(connmessage); + if (msglen > 0 && connmessage[msglen - 1] == '\n') + connmessage[msglen - 1] = '\0'; + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", + server->servername), + errdetail_internal("%s", connmessage))); + } + + /* + * Check that non-superuser has used password to establish connection; + * otherwise, he's piggybacking on the postgres server's user + * identity. See also dblink_security_check() in contrib/dblink. + */ + if (!superuser() && !PQconnectionUsedPassword(conn)) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed."))); + + pfree(keywords); + pfree(values); + } + PG_CATCH(); + { + /* Release PGconn data structure if we managed to create one */ + if (conn) + PQfinish(conn); + PG_RE_THROW(); + } + PG_END_TRY(); + + return conn; +} + +/* + * For non-superusers, insist that the connstr specify a password. This + * prevents a password from being picked up from .pgpass, a service file, + * the environment, etc. We don't want the postgres user's passwords + * to be accessible to non-superusers. (See also dblink_connstr_check in + * contrib/dblink.) + */ +static void +check_conn_params(const char **keywords, const char **values) +{ + int i; + + /* no check required if superuser */ + if (superuser()) + return; + + /* ok if params contain a non-empty password */ + for (i = 0; keywords[i] != NULL; i++) + { + if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0') + return; + } + + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the user mapping."))); +} + +/* + * Start remote transaction or subtransaction, if needed. + * + * Note that we always use at least REPEATABLE READ in the remote session. + * This is so that, if a query initiates multiple scans of the same or + * different foreign tables, we will get snapshot-consistent results from + * those scans. A disadvantage is that we can't provide sane emulation of + * READ COMMITTED behavior --- it would be nice if we had some other way to + * control which remote queries share a snapshot. + */ +static void +begin_remote_xact(ConnCacheEntry *entry) +{ + int curlevel = GetCurrentTransactionNestLevel(); + PGresult *res; + + /* Start main transaction if we haven't yet */ + if (entry->xact_depth <= 0) + { + const char *sql; + + elog(DEBUG3, "starting remote transaction on connection %p", + entry->conn); + + if (XactIsoLevel == XACT_SERIALIZABLE) + sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; + else + sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + res = PQexec(entry->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, sql); + PQclear(res); + entry->xact_depth = 1; + } + + /* + * If we're in a subtransaction, stack up savepoints to match our level. + * This ensures we can rollback just the desired effects when a + * subtransaction aborts. + */ + while (entry->xact_depth < curlevel) + { + char sql[64]; + + snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); + res = PQexec(entry->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, sql); + PQclear(res); + entry->xact_depth++; + } +} + +/* + * Release connection reference count created by calling GetConnection. + */ +void +ReleaseConnection(PGconn *conn) +{ + /* + * Currently, we don't actually track connection references because all + * cleanup is managed on a transaction or subtransaction basis instead. So + * there's nothing to do here. + */ +} + +/* + * Assign a "unique" number for a cursor. + * + * These really only need to be unique per connection within a transaction. + * For the moment we ignore the per-connection point and assign them across + * all connections in the transaction, but we ask for the connection to be + * supplied in case we want to refine that. + * + * Note that even if wraparound happens in a very long transaction, actual + * collisions are highly improbable; just be sure to use %u not %d to print. + */ +unsigned int +GetCursorNumber(PGconn *conn) +{ + return ++cursor_number; +} + +/* + * Report an error we got from the remote server. + * + * elevel: error level to use (typically ERROR, but might be less) + * res: PGresult containing the error + * clear: if true, PQclear the result (otherwise caller will handle it) + * sql: NULL, or text of remote command we tried to execute + */ +void +pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql) +{ + /* If requested, PGresult must be released before leaving this function. */ + PG_TRY(); + { + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); + int sqlstate; + + if (diag_sqlstate) + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + ereport(elevel, + (errcode(sqlstate), + message_primary ? errmsg_internal("%s", message_primary) : + errmsg("unknown error"), + message_detail ? errdetail_internal("%s", message_detail) : 0, + message_hint ? errhint("%s", message_hint) : 0, + message_context ? errcontext("%s", message_context) : 0, + sql ? errcontext("Remote SQL command: %s", sql) : 0)); + } + PG_CATCH(); + { + if (clear) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + if (clear) + PQclear(res); +} + +/* + * pgfdw_xact_callback --- cleanup at main-transaction end. + */ +static void +pgfdw_xact_callback(XactEvent event, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + /* Quick exit if no connections were touched in this transaction. */ + if (!xact_got_connection) + return; + + /* + * Scan all connection cache entries to find open remote transactions, and + * close them. + */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + PGresult *res; + + /* We only care about connections with open remote transactions */ + if (entry->conn == NULL || entry->xact_depth == 0) + continue; + + elog(DEBUG3, "closing remote transaction on connection %p", + entry->conn); + + switch (event) + { + case XACT_EVENT_PRE_COMMIT: + /* Commit all remote transactions during pre-commit */ + res = PQexec(entry->conn, "COMMIT TRANSACTION"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION"); + PQclear(res); + break; + case XACT_EVENT_PRE_PREPARE: + + /* + * We disallow remote transactions that modified anything, + * since it's not really reasonable to hold them open until + * the prepared transaction is committed. For the moment, + * throw error unconditionally; later we might allow read-only + * cases. Note that the error will cause us to come right + * back here with event == XACT_EVENT_ABORT, so we'll clean up + * the connection state at that point. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified remote tables"))); + break; + case XACT_EVENT_COMMIT: + case XACT_EVENT_PREPARE: + /* Should not get here -- pre-commit should have handled it */ + elog(ERROR, "missed cleaning up connection during pre-commit"); + break; + case XACT_EVENT_ABORT: + /* If we're aborting, abort all remote transactions too */ + res = PQexec(entry->conn, "ABORT TRANSACTION"); + /* Note: can't throw ERROR, it would be infinite loop */ + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(WARNING, res, true, + "ABORT TRANSACTION"); + else + PQclear(res); + break; + } + + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + PQfinish(entry->conn); + entry->conn = NULL; + } + } + + /* + * Regardless of the event type, we can now mark ourselves as out of the + * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, + * this saves a useless scan of the hashtable during COMMIT or PREPARE.) + */ + xact_got_connection = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} + +/* + * pgfdw_subxact_callback --- cleanup at subtransaction end. + */ +static void +pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, + SubTransactionId parentSubid, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + int curlevel; + + /* Nothing to do at subxact start, nor after commit. */ + if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || + event == SUBXACT_EVENT_ABORT_SUB)) + return; + + /* Quick exit if no connections were touched in this transaction. */ + if (!xact_got_connection) + return; + + /* + * Scan all connection cache entries to find open remote subtransactions + * of the current level, and close them. + */ + curlevel = GetCurrentTransactionNestLevel(); + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + PGresult *res; + char sql[100]; + + /* + * We only care about connections with open remote subtransactions of + * the current level. + */ + if (entry->conn == NULL || entry->xact_depth < curlevel) + continue; + + if (entry->xact_depth > curlevel) + elog(ERROR, "missed cleaning up remote subtransaction at level %d", + entry->xact_depth); + + if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) + { + /* Commit all remote subtransactions during pre-commit */ + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + res = PQexec(entry->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, sql); + PQclear(res); + } + else + { + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + curlevel, curlevel); + res = PQexec(entry->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(WARNING, res, true, sql); + else + PQclear(res); + } + + /* OK, we're outta that level of subtransaction */ + entry->xact_depth--; + } +} |