aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2013-02-21 05:26:23 -0500
committerTom Lane <tgl@sss.pgh.pa.us>2013-02-21 05:27:16 -0500
commitd0d75c402217421b691050857eb3d7af82d0c770 (patch)
tree1d934b75b12e41c80520ce3aea6830e3bbe4b718 /contrib/postgres_fdw/connection.c
parentf435cd1d385859a0cdb1d70fccc21dde2b1ee116 (diff)
downloadpostgresql-d0d75c402217421b691050857eb3d7af82d0c770.tar.gz
postgresql-d0d75c402217421b691050857eb3d7af82d0c770.zip
Add postgres_fdw contrib module.
There's still a lot of room for improvement, but it basically works, and we need this to be present before we can do anything much with the writable-foreign-tables patch. So let's commit it and get on with testing. Shigeru Hanada, reviewed by KaiGai Kohei and Tom Lane
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c581
1 files changed, 581 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
new file mode 100644
index 00000000000..62ccea4c460
--- /dev/null
+++ b/contrib/postgres_fdw/connection.c
@@ -0,0 +1,581 @@
+/*-------------------------------------------------------------------------
+ *
+ * connection.c
+ * Connection management functions for postgres_fdw
+ *
+ * Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/postgres_fdw/connection.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "postgres_fdw.h"
+
+#include "access/xact.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+
+
+/*
+ * Connection cache hash table entry
+ *
+ * The lookup key in this hash table is the foreign server OID plus the user
+ * mapping OID. (We use just one connection per user per foreign server,
+ * so that we can ensure all scans use the same snapshot during a query.)
+ *
+ * The "conn" pointer can be NULL if we don't currently have a live connection.
+ * When we do have a connection, xact_depth tracks the current depth of
+ * transactions and subtransactions open on the remote side. We need to issue
+ * commands at the same nesting depth on the remote as we're executing at
+ * ourselves, so that rolling back a subtransaction will kill the right
+ * queries and not the wrong ones.
+ */
+typedef struct ConnCacheKey
+{
+ Oid serverid; /* OID of foreign server */
+ Oid userid; /* OID of local user whose mapping we use */
+} ConnCacheKey;
+
+typedef struct ConnCacheEntry
+{
+ ConnCacheKey key; /* hash key (must be first) */
+ PGconn *conn; /* connection to foreign server, or NULL */
+ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
+ * one level of subxact open, etc */
+} ConnCacheEntry;
+
+/*
+ * Connection cache (initialized on first use)
+ */
+static HTAB *ConnectionHash = NULL;
+
+/* for assigning cursor numbers */
+static unsigned int cursor_number = 0;
+
+/* tracks whether any work is needed in callback functions */
+static bool xact_got_connection = false;
+
+/* prototypes of private functions */
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static void check_conn_params(const char **keywords, const char **values);
+static void begin_remote_xact(ConnCacheEntry *entry);
+static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void pgfdw_subxact_callback(SubXactEvent event,
+ SubTransactionId mySubid,
+ SubTransactionId parentSubid,
+ void *arg);
+
+
+/*
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization. A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches. For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry. We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(ForeignServer *server, UserMapping *user)
+{
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheKey key;
+
+ /* First time through, initialize connection cache hashtable */
+ if (ConnectionHash == NULL)
+ {
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(ConnCacheKey);
+ ctl.entrysize = sizeof(ConnCacheEntry);
+ ctl.hash = tag_hash;
+ /* allocate ConnectionHash in the cache context */
+ ctl.hcxt = CacheMemoryContext;
+ ConnectionHash = hash_create("postgres_fdw connections", 8,
+ &ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /*
+ * Register some callback functions that manage connection cleanup.
+ * This should be done just once in each backend.
+ */
+ RegisterXactCallback(pgfdw_xact_callback, NULL);
+ RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+ }
+
+ /* Set flag that we did GetConnection during the current transaction */
+ xact_got_connection = true;
+
+ /* Create hash key for the entry. Assume no pad bytes in key struct */
+ key.serverid = server->serverid;
+ key.userid = user->userid;
+
+ /*
+ * Find or create cached entry for requested connection.
+ */
+ entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+ if (!found)
+ {
+ /* initialize new hashtable entry (key is already filled in) */
+ entry->conn = NULL;
+ entry->xact_depth = 0;
+ }
+
+ /*
+ * We don't check the health of cached connection here, because it would
+ * require some overhead. Broken connection will be detected when the
+ * connection is actually used.
+ */
+
+ /*
+ * If cache entry doesn't have a connection, we have to establish a new
+ * connection. (If connect_pg_server throws an error, the cache entry
+ * will be left in a valid empty state.)
+ */
+ if (entry->conn == NULL)
+ {
+ entry->xact_depth = 0; /* just to be sure */
+ entry->conn = connect_pg_server(server, user);
+ elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
+ entry->conn, server->servername);
+ }
+
+ /*
+ * Start a new transaction or subtransaction if needed.
+ */
+ begin_remote_xact(entry);
+
+ return entry->conn;
+}
+
+/*
+ * Connect to remote server using specified server and user mapping properties.
+ */
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+ PGconn *volatile conn = NULL;
+
+ /*
+ * Use PG_TRY block to ensure closing connection on error.
+ */
+ PG_TRY();
+ {
+ const char **keywords;
+ const char **values;
+ int n;
+
+ /*
+ * Construct connection params from generic options of ForeignServer
+ * and UserMapping. (Some of them might not be libpq options, in
+ * which case we'll just waste a few array slots.) Add 3 extra slots
+ * for fallback_application_name, client_encoding, end marker.
+ */
+ n = list_length(server->options) + list_length(user->options) + 3;
+ keywords = (const char **) palloc(n * sizeof(char *));
+ values = (const char **) palloc(n * sizeof(char *));
+
+ n = 0;
+ n += ExtractConnectionOptions(server->options,
+ keywords + n, values + n);
+ n += ExtractConnectionOptions(user->options,
+ keywords + n, values + n);
+
+ /* Use "postgres_fdw" as fallback_application_name. */
+ keywords[n] = "fallback_application_name";
+ values[n] = "postgres_fdw";
+ n++;
+
+ /* Set client_encoding so that libpq can convert encoding properly. */
+ keywords[n] = "client_encoding";
+ values[n] = GetDatabaseEncodingName();
+ n++;
+
+ keywords[n] = values[n] = NULL;
+
+ /* verify connection parameters and make connection */
+ check_conn_params(keywords, values);
+
+ conn = PQconnectdbParams(keywords, values, false);
+ if (!conn || PQstatus(conn) != CONNECTION_OK)
+ {
+ char *connmessage;
+ int msglen;
+
+ /* libpq typically appends a newline, strip that */
+ connmessage = pstrdup(PQerrorMessage(conn));
+ msglen = strlen(connmessage);
+ if (msglen > 0 && connmessage[msglen - 1] == '\n')
+ connmessage[msglen - 1] = '\0';
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not connect to server \"%s\"",
+ server->servername),
+ errdetail_internal("%s", connmessage)));
+ }
+
+ /*
+ * Check that non-superuser has used password to establish connection;
+ * otherwise, he's piggybacking on the postgres server's user
+ * identity. See also dblink_security_check() in contrib/dblink.
+ */
+ if (!superuser() && !PQconnectionUsedPassword(conn))
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superuser cannot connect if the server does not request a password."),
+ errhint("Target server's authentication method must be changed.")));
+
+ pfree(keywords);
+ pfree(values);
+ }
+ PG_CATCH();
+ {
+ /* Release PGconn data structure if we managed to create one */
+ if (conn)
+ PQfinish(conn);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return conn;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password. This
+ * prevents a password from being picked up from .pgpass, a service file,
+ * the environment, etc. We don't want the postgres user's passwords
+ * to be accessible to non-superusers. (See also dblink_connstr_check in
+ * contrib/dblink.)
+ */
+static void
+check_conn_params(const char **keywords, const char **values)
+{
+ int i;
+
+ /* no check required if superuser */
+ if (superuser())
+ return;
+
+ /* ok if params contain a non-empty password */
+ for (i = 0; keywords[i] != NULL; i++)
+ {
+ if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+ return;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the user mapping.")));
+}
+
+/*
+ * Start remote transaction or subtransaction, if needed.
+ *
+ * Note that we always use at least REPEATABLE READ in the remote session.
+ * This is so that, if a query initiates multiple scans of the same or
+ * different foreign tables, we will get snapshot-consistent results from
+ * those scans. A disadvantage is that we can't provide sane emulation of
+ * READ COMMITTED behavior --- it would be nice if we had some other way to
+ * control which remote queries share a snapshot.
+ */
+static void
+begin_remote_xact(ConnCacheEntry *entry)
+{
+ int curlevel = GetCurrentTransactionNestLevel();
+ PGresult *res;
+
+ /* Start main transaction if we haven't yet */
+ if (entry->xact_depth <= 0)
+ {
+ const char *sql;
+
+ elog(DEBUG3, "starting remote transaction on connection %p",
+ entry->conn);
+
+ if (XactIsoLevel == XACT_SERIALIZABLE)
+ sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
+ else
+ sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
+ res = PQexec(entry->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+ entry->xact_depth = 1;
+ }
+
+ /*
+ * If we're in a subtransaction, stack up savepoints to match our level.
+ * This ensures we can rollback just the desired effects when a
+ * subtransaction aborts.
+ */
+ while (entry->xact_depth < curlevel)
+ {
+ char sql[64];
+
+ snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
+ res = PQexec(entry->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+ entry->xact_depth++;
+ }
+}
+
+/*
+ * Release connection reference count created by calling GetConnection.
+ */
+void
+ReleaseConnection(PGconn *conn)
+{
+ /*
+ * Currently, we don't actually track connection references because all
+ * cleanup is managed on a transaction or subtransaction basis instead. So
+ * there's nothing to do here.
+ */
+}
+
+/*
+ * Assign a "unique" number for a cursor.
+ *
+ * These really only need to be unique per connection within a transaction.
+ * For the moment we ignore the per-connection point and assign them across
+ * all connections in the transaction, but we ask for the connection to be
+ * supplied in case we want to refine that.
+ *
+ * Note that even if wraparound happens in a very long transaction, actual
+ * collisions are highly improbable; just be sure to use %u not %d to print.
+ */
+unsigned int
+GetCursorNumber(PGconn *conn)
+{
+ return ++cursor_number;
+}
+
+/*
+ * Report an error we got from the remote server.
+ *
+ * elevel: error level to use (typically ERROR, but might be less)
+ * res: PGresult containing the error
+ * clear: if true, PQclear the result (otherwise caller will handle it)
+ * sql: NULL, or text of remote command we tried to execute
+ */
+void
+pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
+{
+ /* If requested, PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+ char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+ char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+ char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+ int sqlstate;
+
+ if (diag_sqlstate)
+ sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+ diag_sqlstate[1],
+ diag_sqlstate[2],
+ diag_sqlstate[3],
+ diag_sqlstate[4]);
+ else
+ sqlstate = ERRCODE_CONNECTION_FAILURE;
+
+ ereport(elevel,
+ (errcode(sqlstate),
+ message_primary ? errmsg_internal("%s", message_primary) :
+ errmsg("unknown error"),
+ message_detail ? errdetail_internal("%s", message_detail) : 0,
+ message_hint ? errhint("%s", message_hint) : 0,
+ message_context ? errcontext("%s", message_context) : 0,
+ sql ? errcontext("Remote SQL command: %s", sql) : 0));
+ }
+ PG_CATCH();
+ {
+ if (clear)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ if (clear)
+ PQclear(res);
+}
+
+/*
+ * pgfdw_xact_callback --- cleanup at main-transaction end.
+ */
+static void
+pgfdw_xact_callback(XactEvent event, void *arg)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ /* Quick exit if no connections were touched in this transaction. */
+ if (!xact_got_connection)
+ return;
+
+ /*
+ * Scan all connection cache entries to find open remote transactions, and
+ * close them.
+ */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ PGresult *res;
+
+ /* We only care about connections with open remote transactions */
+ if (entry->conn == NULL || entry->xact_depth == 0)
+ continue;
+
+ elog(DEBUG3, "closing remote transaction on connection %p",
+ entry->conn);
+
+ switch (event)
+ {
+ case XACT_EVENT_PRE_COMMIT:
+ /* Commit all remote transactions during pre-commit */
+ res = PQexec(entry->conn, "COMMIT TRANSACTION");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION");
+ PQclear(res);
+ break;
+ case XACT_EVENT_PRE_PREPARE:
+
+ /*
+ * We disallow remote transactions that modified anything,
+ * since it's not really reasonable to hold them open until
+ * the prepared transaction is committed. For the moment,
+ * throw error unconditionally; later we might allow read-only
+ * cases. Note that the error will cause us to come right
+ * back here with event == XACT_EVENT_ABORT, so we'll clean up
+ * the connection state at that point.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot prepare a transaction that modified remote tables")));
+ break;
+ case XACT_EVENT_COMMIT:
+ case XACT_EVENT_PREPARE:
+ /* Should not get here -- pre-commit should have handled it */
+ elog(ERROR, "missed cleaning up connection during pre-commit");
+ break;
+ case XACT_EVENT_ABORT:
+ /* If we're aborting, abort all remote transactions too */
+ res = PQexec(entry->conn, "ABORT TRANSACTION");
+ /* Note: can't throw ERROR, it would be infinite loop */
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(WARNING, res, true,
+ "ABORT TRANSACTION");
+ else
+ PQclear(res);
+ break;
+ }
+
+ /* Reset state to show we're out of a transaction */
+ entry->xact_depth = 0;
+
+ /*
+ * If the connection isn't in a good idle state, discard it to
+ * recover. Next GetConnection will open a new connection.
+ */
+ if (PQstatus(entry->conn) != CONNECTION_OK ||
+ PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+ {
+ elog(DEBUG3, "discarding connection %p", entry->conn);
+ PQfinish(entry->conn);
+ entry->conn = NULL;
+ }
+ }
+
+ /*
+ * Regardless of the event type, we can now mark ourselves as out of the
+ * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
+ * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+ */
+ xact_got_connection = false;
+
+ /* Also reset cursor numbering for next transaction */
+ cursor_number = 0;
+}
+
+/*
+ * pgfdw_subxact_callback --- cleanup at subtransaction end.
+ */
+static void
+pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
+ SubTransactionId parentSubid, void *arg)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ int curlevel;
+
+ /* Nothing to do at subxact start, nor after commit. */
+ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
+ event == SUBXACT_EVENT_ABORT_SUB))
+ return;
+
+ /* Quick exit if no connections were touched in this transaction. */
+ if (!xact_got_connection)
+ return;
+
+ /*
+ * Scan all connection cache entries to find open remote subtransactions
+ * of the current level, and close them.
+ */
+ curlevel = GetCurrentTransactionNestLevel();
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ PGresult *res;
+ char sql[100];
+
+ /*
+ * We only care about connections with open remote subtransactions of
+ * the current level.
+ */
+ if (entry->conn == NULL || entry->xact_depth < curlevel)
+ continue;
+
+ if (entry->xact_depth > curlevel)
+ elog(ERROR, "missed cleaning up remote subtransaction at level %d",
+ entry->xact_depth);
+
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+ {
+ /* Commit all remote subtransactions during pre-commit */
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+ res = PQexec(entry->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+ }
+ else
+ {
+ /* Rollback all remote subtransactions during abort */
+ snprintf(sql, sizeof(sql),
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+ curlevel, curlevel);
+ res = PQexec(entry->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(WARNING, res, true, sql);
+ else
+ PQclear(res);
+ }
+
+ /* OK, we're outta that level of subtransaction */
+ entry->xact_depth--;
+ }
+}