aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Conway <mail@joeconway.com>2005-10-18 02:55:49 +0000
committerJoe Conway <mail@joeconway.com>2005-10-18 02:55:49 +0000
commit056eb1412c864c61b26eb8d540fd92636795f67d (patch)
treea69c1aaadadee895c41e7b8c04bbb9a6c4d3dd98
parentc62b29a6034b1cb16abef7012f475fbe6fedc3d5 (diff)
downloadpostgresql-056eb1412c864c61b26eb8d540fd92636795f67d.tar.gz
postgresql-056eb1412c864c61b26eb8d540fd92636795f67d.zip
When a cursor is opened using dblink_open, only start a transaction
if there isn't one already open. Upon dblink_close, only commit the open transaction if it was started by dblink_open, and only then when all cursors opened by dblink_open are closed. The transaction accounting is done individually for all named connections, plus the persistent unnamed connection.
-rw-r--r--contrib/dblink/dblink.c114
-rw-r--r--contrib/dblink/expected/dblink.out82
-rw-r--r--contrib/dblink/sql/dblink.sql36
3 files changed, 193 insertions, 39 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index c11b15860a3..54e787bb206 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -60,9 +60,9 @@
typedef struct remoteConn
{
- PGconn *conn; /* Hold the remote connection */
- int autoXactCursors;/* Indicates the number of open cursors,
- * non-zero means we opened the xact ourselves */
+ PGconn *conn; /* Hold the remote connection */
+ int openCursorCount; /* The number of open cursors */
+ bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
/*
@@ -84,10 +84,8 @@ static Oid get_relid_from_relname(text *relname_text);
static char *generate_relation_name(Oid relid);
/* Global */
-List *res_id = NIL;
-int res_id_index = 0;
-PGconn *persistent_conn = NULL;
-static HTAB *remoteConnHash = NULL;
+static remoteConn *pconn = NULL;
+static HTAB *remoteConnHash = NULL;
/*
* Following is list that holds multiple remote connections.
@@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
} \
} while (0)
+#define DBLINK_INIT \
+ do { \
+ if (!pconn) \
+ { \
+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+ pconn->conn = NULL; \
+ pconn->openCursorCount = 0; \
+ pconn->newXactForCursor = FALSE; \
+ } \
+ } while (0)
/*
* Create a persistent connection to another database
@@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
PGconn *conn = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
connstr = GET_STR(PG_GETARG_TEXT_P(1));
@@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
createNewConnection(connname, rconn);
}
else
- persistent_conn = conn;
+ pconn->conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
@@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL;
PGconn *conn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
@@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
conn = rconn->conn;
}
else
- conn = persistent_conn;
+ conn = pconn->conn;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
@@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
pfree(rconn);
}
else
- persistent_conn = NULL;
+ pconn->conn = NULL;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
@@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
/* text,text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
- conn = persistent_conn;
+ rconn = pconn;
}
else if (PG_NARGS() == 3)
{
@@ -307,7 +321,7 @@ dblink_open(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
- conn = persistent_conn;
+ rconn = pconn;
}
else
{
@@ -315,8 +329,6 @@ dblink_open(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(1));
sql = GET_STR(PG_GETARG_TEXT_P(2));
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
}
else if (PG_NARGS() == 4)
@@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
sql = GET_STR(PG_GETARG_TEXT_P(2));
fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
- if (!conn)
+ if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL;
+ else
+ conn = rconn->conn;
- res = PQexec(conn, "BEGIN");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("begin error");
+ /* If we are not in a transaction, start one */
+ if (PQtransactionStatus(conn) == PQTRANS_IDLE)
+ {
+ res = PQexec(conn, "BEGIN");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ DBLINK_RES_INTERNALERROR("begin error");
+ PQclear(res);
+ rconn->newXactForCursor = TRUE;
+ }
- PQclear(res);
+ /* if we started a transaction, increment cursor count */
+ if (rconn->newXactForCursor)
+ (rconn->openCursorCount)++;
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, str->data);
@@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
/* text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
- conn = persistent_conn;
+ rconn = pconn;
}
else if (PG_NARGS() == 2)
{
@@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
- conn = persistent_conn;
+ rconn = pconn;
}
else
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1));
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
}
if (PG_NARGS() == 3)
@@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
}
- if (!conn)
+ if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL;
+ else
+ conn = rconn->conn;
appendStringInfo(str, "CLOSE %s", curname);
@@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
PQclear(res);
- /* commit the transaction */
- res = PQexec(conn, "COMMIT");
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("commit error");
+ /* if we started a transaction, decrement cursor count */
+ if (rconn->newXactForCursor)
+ {
+ (rconn->openCursorCount)--;
- PQclear(res);
+ /* if count is zero, commit the transaction */
+ if (rconn->openCursorCount == 0)
+ {
+ rconn->newXactForCursor = FALSE;
+
+ res = PQexec(conn, "COMMIT");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ DBLINK_RES_INTERNALERROR("commit error");
+ PQclear(res);
+ }
+ }
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
@@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *conname = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
@@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2);
- conn = persistent_conn;
+ conn = pconn->conn;
}
else
{
@@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
/* text,int */
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
- conn = persistent_conn;
+ conn = pconn->conn;
}
if (!conn)
@@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
MemoryContext oldcontext;
bool freeconn = false;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
@@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
@@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
else if (PG_NARGS() == 1)
{
/* text */
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
@@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
bool freeconn = false;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 3)
{
/* must be text,text,bool */
@@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
@@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
else if (PG_NARGS() == 1)
{
/* must be single text argument */
- conn = persistent_conn;
+ conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index cd2d4b18450..55d2b9e7c4c 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -436,6 +436,88 @@ SELECT dblink_exec('myconn','ABORT');
ROLLBACK
(1 row)
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+ dblink_exec
+-------------
+ BEGIN
+(1 row)
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+ dblink_exec
+-------------
+ COMMIT
+(1 row)
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ dblink_open
+-------------
+ OK
+(1 row)
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+----------------
+ DECLARE CURSOR
+(1 row)
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+--------------
+ OK
+(1 row)
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ERROR: sql error
+DETAIL: ERROR: cursor "xact_test" already exists
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+ dblink_exec
+-------------
+ ROLLBACK
+(1 row)
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index db9dd6582fd..66e2607cfee 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -217,6 +217,42 @@ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foobar',false);
-- reset remote transaction state
SELECT dblink_exec('myconn','ABORT');
+-- test opening cursor in a transaction
+SELECT dblink_exec('myconn','BEGIN');
+
+-- an open transaction will prevent dblink_open() from opening its own
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- this should not commit the transaction because the client opened it
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- commit remote transaction
+SELECT dblink_exec('myconn','COMMIT');
+
+-- test automatic transactions for multiple cursor opens
+SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+-- the second cursor
+SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+-- this should not commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+-- this should succeed because we have an open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- this should commit the transaction
+SELECT dblink_close('myconn','rmt_foo_cursor');
+
+-- this should fail because there is no open transaction
+SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+-- reset remote transaction state
+SELECT dblink_exec('myconn','ABORT');
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');