aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2011-12-05 19:52:15 +0200
committerPeter Eisentraut <peter_e@gmx.net>2011-12-05 19:52:15 +0200
commit89e850e6fda9e4e441712012abe971fe938d595a (patch)
tree4ad184525069fbd25f89879c2613403f2212d869 /src
parente6d9e2106f0dda459063126d07967df197b7b5fe (diff)
downloadpostgresql-89e850e6fda9e4e441712012abe971fe938d595a.tar.gz
postgresql-89e850e6fda9e4e441712012abe971fe938d595a.zip
plpython: Add SPI cursor support
Add a function plpy.cursor that is similar to plpy.execute but uses an SPI cursor to avoid fetching the entire result set into memory. Jan UrbaƄski, reviewed by Steve Singer
Diffstat (limited to 'src')
-rw-r--r--src/pl/plpython/expected/plpython_spi.out151
-rw-r--r--src/pl/plpython/expected/plpython_subtransaction.out66
-rw-r--r--src/pl/plpython/expected/plpython_subtransaction_0.out70
-rw-r--r--src/pl/plpython/expected/plpython_subtransaction_5.out70
-rw-r--r--src/pl/plpython/expected/plpython_test.out6
-rw-r--r--src/pl/plpython/plpython.c642
-rw-r--r--src/pl/plpython/sql/plpython_spi.sql116
-rw-r--r--src/pl/plpython/sql/plpython_subtransaction.sql52
8 files changed, 1170 insertions, 3 deletions
diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out
index 7f4ae5ca997..3b4d7a30105 100644
--- a/src/pl/plpython/expected/plpython_spi.out
+++ b/src/pl/plpython/expected/plpython_spi.out
@@ -133,3 +133,154 @@ CONTEXT: PL/Python function "result_nrows_test"
2
(1 row)
+-- cursor objects
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+ if row['lname'] == 'doe':
+ does += 1
+return does
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+ # use next() or __next__(), the method name changed in
+ # http://www.python.org/dev/peps/pep-3114/
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+ item = res.next()
+except AttributeError:
+ item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ res.fetch(1)
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+ "select fname, lname from users where fname like $1 || '%' order by fname",
+ ["text"])
+for row in plpy.cursor(plan, ["w"]):
+ yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+ yield row['fname']
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+ ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+SELECT simple_cursor_test();
+ simple_cursor_test
+--------------------
+ 3
+(1 row)
+
+SELECT double_cursor_close();
+ double_cursor_close
+---------------------
+
+(1 row)
+
+SELECT cursor_fetch();
+ cursor_fetch
+--------------
+
+(1 row)
+
+SELECT cursor_mix_next_and_fetch();
+ cursor_mix_next_and_fetch
+---------------------------
+
+(1 row)
+
+SELECT fetch_after_close();
+ fetch_after_close
+-------------------
+
+(1 row)
+
+SELECT next_after_close();
+ next_after_close
+------------------
+
+(1 row)
+
+SELECT cursor_fetch_next_empty();
+ cursor_fetch_next_empty
+-------------------------
+
+(1 row)
+
+SELECT cursor_plan();
+ cursor_plan
+-------------
+ willem
+ jane
+ john
+(3 rows)
+
+SELECT cursor_plan_wrong_args();
+ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_plan_wrong_args", line 4, in <module>
+ c = plpy.cursor(plan, ["a", "b"])
+PL/Python function "cursor_plan_wrong_args"
diff --git a/src/pl/plpython/expected/plpython_subtransaction.out b/src/pl/plpython/expected/plpython_subtransaction.out
index 515b0bb7344..c2c22f0ae41 100644
--- a/src/pl/plpython/expected/plpython_subtransaction.out
+++ b/src/pl/plpython/expected/plpython_subtransaction.out
@@ -409,3 +409,69 @@ SELECT * FROM subtransaction_tbl;
(1 row)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+SELECT cursor_in_subxact();
+ cursor_in_subxact
+-------------------
+ 16
+(1 row)
+
+SELECT cursor_aborted_subxact();
+ERROR: ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_aborted_subxact", line 8, in <module>
+ fetched = cur.fetch(10)
+PL/Python function "cursor_aborted_subxact"
+SELECT cursor_plan_aborted_subxact();
+ERROR: ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_plan_aborted_subxact", line 10, in <module>
+ fetched = cur.fetch(5)
+PL/Python function "cursor_plan_aborted_subxact"
+SELECT cursor_close_aborted_subxact();
+ERROR: ValueError: closing a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_close_aborted_subxact", line 7, in <module>
+ cur.close()
+PL/Python function "cursor_close_aborted_subxact"
diff --git a/src/pl/plpython/expected/plpython_subtransaction_0.out b/src/pl/plpython/expected/plpython_subtransaction_0.out
index 4017c41edc9..ece0134a946 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_0.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_0.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
(0 rows)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_in_subxact"
+DETAIL: SyntaxError: invalid syntax (line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+SELECT cursor_in_subxact();
+ERROR: function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR: function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR: function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR: function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_subtransaction_5.out b/src/pl/plpython/expected/plpython_subtransaction_5.out
index 9216151b94e..66de2394990 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_5.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_5.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
(0 rows)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_in_subxact"
+DETAIL: SyntaxError: invalid syntax (<string>, line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (<string>, line 4)
+SELECT cursor_in_subxact();
+ERROR: function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR: function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR: function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR: function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index f2dda66532e..a884fc0e27f 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,9 +43,9 @@ contents.sort()
return ", ".join(contents)
$$ LANGUAGE plpythonu;
select module_contents();
- module_contents
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
+ module_contents
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
(1 row)
CREATE FUNCTION elog_test() RETURNS void
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index afd5dfce83a..29e0ac7c454 100644
--- a/src/pl/plpython/plpython.c
+++ b/src/pl/plpython/plpython.c
@@ -134,6 +134,11 @@ typedef int Py_ssize_t;
PyObject_HEAD_INIT(type) size,
#endif
+/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+#if PY_MAJOR_VERSION >= 3
+#define Py_TPFLAGS_HAVE_ITER 0
+#endif
+
/* define our text domain for translations */
#undef TEXTDOMAIN
#define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
@@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject
bool exited;
} PLySubtransactionObject;
+typedef struct PLyCursorObject
+{
+ PyObject_HEAD
+ char *portalname;
+ PLyTypeInfo result;
+ bool closed;
+} PLyCursorObject;
+
/* A list of all known exceptions, generated from backend/utils/errcodes.txt */
typedef struct ExceptionMap
{
@@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = {
"PostgreSQL subtransaction context manager"
};
+static char PLy_cursor_doc[] = {
+ "Wrapper around a PostgreSQL cursor"
+};
+
/*
* the function definitions
@@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *);
static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
+static PyObject *PLy_cursor(PyObject *self, PyObject *unused);
+static PyObject *PLy_cursor_query(const char *query);
+static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
+static void PLy_cursor_dealloc(PyObject *arg);
+static PyObject *PLy_cursor_iternext(PyObject *self);
+static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
+static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
+
static PyMethodDef PLy_plan_methods[] = {
{"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = {
PLy_subtransaction_methods, /* tp_tpmethods */
};
+static PyMethodDef PLy_cursor_methods[] = {
+ {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+ {"close", PLy_cursor_close, METH_NOARGS, NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_CursorType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "PLyCursor", /* tp_name */
+ sizeof(PLyCursorObject), /* tp_size */
+ 0, /* tp_itemsize */
+
+ /*
+ * methods
+ */
+ PLy_cursor_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
+ PLy_cursor_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ PyObject_SelfIter, /* tp_iter */
+ PLy_cursor_iternext, /* tp_iternext */
+ PLy_cursor_methods, /* tp_tpmethods */
+};
+
static PyMethodDef PLy_methods[] = {
/*
* logging methods
@@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = {
*/
{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
+ /*
+ * create a cursor
+ */
+ {"cursor", PLy_cursor, METH_VARARGS, NULL},
+
{NULL, NULL, 0, NULL}
};
@@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
return (PyObject *) result;
}
+/*
+ * c = plpy.cursor("select * from largetable")
+ * c = plpy.cursor(plan, [])
+ */
+static PyObject *
+PLy_cursor(PyObject *self, PyObject *args)
+{
+ char *query;
+ PyObject *plan;
+ PyObject *planargs = NULL;
+
+ if (PyArg_ParseTuple(args, "s", &query))
+ return PLy_cursor_query(query);
+
+ PyErr_Clear();
+
+ if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+ return PLy_cursor_plan(plan, planargs);
+
+ PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+ return NULL;
+}
+
+
+static PyObject *
+PLy_cursor_query(const char *query)
+{
+ PLyCursorObject *cursor;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPIPlanPtr plan;
+ Portal portal;
+
+ pg_verifymbstr(query, strlen(query), false);
+
+ plan = SPI_prepare(query, 0, NULL);
+ if (plan == NULL)
+ elog(ERROR, "SPI_prepare failed: %s",
+ SPI_result_code_string(SPI_result));
+
+ portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+ PLy_curr_procedure->fn_readonly);
+ SPI_freeplan(plan);
+
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static PyObject *
+PLy_cursor_plan(PyObject *ob, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ volatile int nargs;
+ int i;
+ PLyPlanObject *plan;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if (args)
+ {
+ if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+ {
+ PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+ return NULL;
+ }
+ nargs = PySequence_Length(args);
+ }
+ else
+ nargs = 0;
+
+ plan = (PLyPlanObject *) ob;
+
+ if (nargs != plan->nargs)
+ {
+ char *sv;
+ PyObject *so = PyObject_Str(args);
+
+ if (!so)
+ PLy_elog(ERROR, "could not execute plan");
+ sv = PyString_AsString(so);
+ PLy_exception_set_plural(PyExc_TypeError,
+ "Expected sequence of %d argument, got %d: %s",
+ "Expected sequence of %d arguments, got %d: %s",
+ plan->nargs,
+ plan->nargs, nargs, sv);
+ Py_DECREF(so);
+
+ return NULL;
+ }
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ Portal portal;
+ char *volatile nulls;
+ volatile int j;
+
+ if (nargs > 0)
+ nulls = palloc(nargs * sizeof(char));
+ else
+ nulls = NULL;
+
+ for (j = 0; j < nargs; j++)
+ {
+ PyObject *elem;
+
+ elem = PySequence_GetItem(args, j);
+ if (elem != Py_None)
+ {
+ PG_TRY();
+ {
+ plan->values[j] =
+ plan->args[j].out.d.func(&(plan->args[j].out.d),
+ -1,
+ elem);
+ }
+ PG_CATCH();
+ {
+ Py_DECREF(elem);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ Py_DECREF(elem);
+ nulls[j] = ' ';
+ }
+ else
+ {
+ Py_DECREF(elem);
+ plan->values[j] =
+ InputFunctionCall(&(plan->args[j].out.d.typfunc),
+ NULL,
+ plan->args[j].out.d.typioparam,
+ -1);
+ nulls[j] = 'n';
+ }
+ }
+
+ portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+ PLy_curr_procedure->fn_readonly);
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ int k;
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* cleanup plan->values array */
+ for (k = 0; k < nargs; k++)
+ {
+ if (!plan->args[k].out.d.typbyval &&
+ (plan->values[k] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[k]));
+ plan->values[k] = PointerGetDatum(NULL);
+ }
+ }
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ for (i = 0; i < nargs; i++)
+ {
+ if (!plan->args[i].out.d.typbyval &&
+ (plan->values[i] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[i]));
+ plan->values[i] = PointerGetDatum(NULL);
+ }
+ }
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static void
+PLy_cursor_dealloc(PyObject *arg)
+{
+ PLyCursorObject *cursor;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) arg;
+
+ if (!cursor->closed)
+ {
+ portal = GetPortalByName(cursor->portalname);
+
+ if (PortalIsValid(portal))
+ SPI_cursor_close(portal);
+ }
+
+ PLy_free(cursor->portalname);
+ cursor->portalname = NULL;
+
+ PLy_typeinfo_dealloc(&cursor->result);
+ arg->ob_type->tp_free(arg);
+}
+
+static PyObject *
+PLy_cursor_iternext(PyObject *self)
+{
+ PLyCursorObject *cursor;
+ PyObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, 1);
+ if (SPI_processed == 0)
+ {
+ PyErr_SetNone(PyExc_StopIteration);
+ ret = NULL;
+ }
+ else
+ {
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc);
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return ret;
+}
+
+static PyObject *
+PLy_cursor_fetch(PyObject *self, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ int count;
+ PLyResultObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ if (!PyArg_ParseTuple(args, "i", &count))
+ return NULL;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ ret = (PLyResultObject *) PLy_result_new();
+ if (ret == NULL)
+ return NULL;
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, count);
+
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ Py_DECREF(ret->status);
+ ret->status = PyInt_FromLong(SPI_OK_FETCH);
+
+ Py_DECREF(ret->nrows);
+ ret->nrows = PyInt_FromLong(SPI_processed);
+
+ if (SPI_processed != 0)
+ {
+ int i;
+
+ Py_DECREF(ret->rows);
+ ret->rows = PyList_New(SPI_processed);
+
+ for (i = 0; i < SPI_processed; i++)
+ {
+ PyObject *row = PLyDict_FromTuple(&cursor->result,
+ SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc);
+ PyList_SetItem(ret->rows, i, row);
+ }
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return (PyObject *) ret;
+}
+
+static PyObject *
+PLy_cursor_close(PyObject *self, PyObject *unused)
+{
+ PLyCursorObject *cursor = (PLyCursorObject *) self;
+
+ if (!cursor->closed)
+ {
+ Portal portal = GetPortalByName(cursor->portalname);
+
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "closing a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ SPI_cursor_close(portal);
+ cursor->closed = true;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
/* s = plpy.subtransaction() */
static PyObject *
PLy_subtransaction(PyObject *self, PyObject *unused)
@@ -4184,6 +4824,8 @@ PLy_init_plpy(void)
elog(ERROR, "could not initialize PLy_ResultType");
if (PyType_Ready(&PLy_SubtransactionType) < 0)
elog(ERROR, "could not initialize PLy_SubtransactionType");
+ if (PyType_Ready(&PLy_CursorType) < 0)
+ elog(ERROR, "could not initialize PLy_CursorType");
#if PY_MAJOR_VERSION >= 3
PyModule_Create(&PLy_module);
diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql
index 7f8f6a33d26..874b31e6df6 100644
--- a/src/pl/plpython/sql/plpython_spi.sql
+++ b/src/pl/plpython/sql/plpython_spi.sql
@@ -105,3 +105,119 @@ else:
$$ LANGUAGE plpythonu;
SELECT result_nrows_test();
+
+
+-- cursor objects
+
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+ if row['lname'] == 'doe':
+ does += 1
+return does
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+ # use next() or __next__(), the method name changed in
+ # http://www.python.org/dev/peps/pep-3114/
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+ item = res.next()
+except AttributeError:
+ item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ res.fetch(1)
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+ "select fname, lname from users where fname like $1 || '%' order by fname",
+ ["text"])
+for row in plpy.cursor(plan, ["w"]):
+ yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+ yield row['fname']
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+ ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+
+SELECT simple_cursor_test();
+SELECT double_cursor_close();
+SELECT cursor_fetch();
+SELECT cursor_mix_next_and_fetch();
+SELECT fetch_after_close();
+SELECT next_after_close();
+SELECT cursor_fetch_next_empty();
+SELECT cursor_plan();
+SELECT cursor_plan_wrong_args();
diff --git a/src/pl/plpython/sql/plpython_subtransaction.sql b/src/pl/plpython/sql/plpython_subtransaction.sql
index a19cad5104e..9ad6377c7cd 100644
--- a/src/pl/plpython/sql/plpython_subtransaction.sql
+++ b/src/pl/plpython/sql/plpython_subtransaction.sql
@@ -242,3 +242,55 @@ SELECT pk_violation_inside_subtransaction();
SELECT * FROM subtransaction_tbl;
DROP TABLE subtransaction_tbl;
+
+-- cursor/subtransactions interactions
+
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+
+SELECT cursor_in_subxact();
+SELECT cursor_aborted_subxact();
+SELECT cursor_plan_aborted_subxact();
+SELECT cursor_close_aborted_subxact();