aboutsummaryrefslogtreecommitdiff
path: root/src/pl/plpython/plpython.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pl/plpython/plpython.c')
-rw-r--r--src/pl/plpython/plpython.c642
1 files changed, 642 insertions, 0 deletions
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index afd5dfce83a..29e0ac7c454 100644
--- a/src/pl/plpython/plpython.c
+++ b/src/pl/plpython/plpython.c
@@ -134,6 +134,11 @@ typedef int Py_ssize_t;
PyObject_HEAD_INIT(type) size,
#endif
+/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+#if PY_MAJOR_VERSION >= 3
+#define Py_TPFLAGS_HAVE_ITER 0
+#endif
+
/* define our text domain for translations */
#undef TEXTDOMAIN
#define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
@@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject
bool exited;
} PLySubtransactionObject;
+typedef struct PLyCursorObject
+{
+ PyObject_HEAD
+ char *portalname;
+ PLyTypeInfo result;
+ bool closed;
+} PLyCursorObject;
+
/* A list of all known exceptions, generated from backend/utils/errcodes.txt */
typedef struct ExceptionMap
{
@@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = {
"PostgreSQL subtransaction context manager"
};
+static char PLy_cursor_doc[] = {
+ "Wrapper around a PostgreSQL cursor"
+};
+
/*
* the function definitions
@@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *);
static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
+static PyObject *PLy_cursor(PyObject *self, PyObject *unused);
+static PyObject *PLy_cursor_query(const char *query);
+static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
+static void PLy_cursor_dealloc(PyObject *arg);
+static PyObject *PLy_cursor_iternext(PyObject *self);
+static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
+static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
+
static PyMethodDef PLy_plan_methods[] = {
{"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = {
PLy_subtransaction_methods, /* tp_tpmethods */
};
+static PyMethodDef PLy_cursor_methods[] = {
+ {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+ {"close", PLy_cursor_close, METH_NOARGS, NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_CursorType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "PLyCursor", /* tp_name */
+ sizeof(PLyCursorObject), /* tp_size */
+ 0, /* tp_itemsize */
+
+ /*
+ * methods
+ */
+ PLy_cursor_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
+ PLy_cursor_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ PyObject_SelfIter, /* tp_iter */
+ PLy_cursor_iternext, /* tp_iternext */
+ PLy_cursor_methods, /* tp_tpmethods */
+};
+
static PyMethodDef PLy_methods[] = {
/*
* logging methods
@@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = {
*/
{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
+ /*
+ * create a cursor
+ */
+ {"cursor", PLy_cursor, METH_VARARGS, NULL},
+
{NULL, NULL, 0, NULL}
};
@@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
return (PyObject *) result;
}
+/*
+ * c = plpy.cursor("select * from largetable")
+ * c = plpy.cursor(plan, [])
+ */
+static PyObject *
+PLy_cursor(PyObject *self, PyObject *args)
+{
+ char *query;
+ PyObject *plan;
+ PyObject *planargs = NULL;
+
+ if (PyArg_ParseTuple(args, "s", &query))
+ return PLy_cursor_query(query);
+
+ PyErr_Clear();
+
+ if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+ return PLy_cursor_plan(plan, planargs);
+
+ PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+ return NULL;
+}
+
+
+static PyObject *
+PLy_cursor_query(const char *query)
+{
+ PLyCursorObject *cursor;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPIPlanPtr plan;
+ Portal portal;
+
+ pg_verifymbstr(query, strlen(query), false);
+
+ plan = SPI_prepare(query, 0, NULL);
+ if (plan == NULL)
+ elog(ERROR, "SPI_prepare failed: %s",
+ SPI_result_code_string(SPI_result));
+
+ portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+ PLy_curr_procedure->fn_readonly);
+ SPI_freeplan(plan);
+
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static PyObject *
+PLy_cursor_plan(PyObject *ob, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ volatile int nargs;
+ int i;
+ PLyPlanObject *plan;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if (args)
+ {
+ if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+ {
+ PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+ return NULL;
+ }
+ nargs = PySequence_Length(args);
+ }
+ else
+ nargs = 0;
+
+ plan = (PLyPlanObject *) ob;
+
+ if (nargs != plan->nargs)
+ {
+ char *sv;
+ PyObject *so = PyObject_Str(args);
+
+ if (!so)
+ PLy_elog(ERROR, "could not execute plan");
+ sv = PyString_AsString(so);
+ PLy_exception_set_plural(PyExc_TypeError,
+ "Expected sequence of %d argument, got %d: %s",
+ "Expected sequence of %d arguments, got %d: %s",
+ plan->nargs,
+ plan->nargs, nargs, sv);
+ Py_DECREF(so);
+
+ return NULL;
+ }
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ Portal portal;
+ char *volatile nulls;
+ volatile int j;
+
+ if (nargs > 0)
+ nulls = palloc(nargs * sizeof(char));
+ else
+ nulls = NULL;
+
+ for (j = 0; j < nargs; j++)
+ {
+ PyObject *elem;
+
+ elem = PySequence_GetItem(args, j);
+ if (elem != Py_None)
+ {
+ PG_TRY();
+ {
+ plan->values[j] =
+ plan->args[j].out.d.func(&(plan->args[j].out.d),
+ -1,
+ elem);
+ }
+ PG_CATCH();
+ {
+ Py_DECREF(elem);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ Py_DECREF(elem);
+ nulls[j] = ' ';
+ }
+ else
+ {
+ Py_DECREF(elem);
+ plan->values[j] =
+ InputFunctionCall(&(plan->args[j].out.d.typfunc),
+ NULL,
+ plan->args[j].out.d.typioparam,
+ -1);
+ nulls[j] = 'n';
+ }
+ }
+
+ portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+ PLy_curr_procedure->fn_readonly);
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ int k;
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* cleanup plan->values array */
+ for (k = 0; k < nargs; k++)
+ {
+ if (!plan->args[k].out.d.typbyval &&
+ (plan->values[k] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[k]));
+ plan->values[k] = PointerGetDatum(NULL);
+ }
+ }
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ for (i = 0; i < nargs; i++)
+ {
+ if (!plan->args[i].out.d.typbyval &&
+ (plan->values[i] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[i]));
+ plan->values[i] = PointerGetDatum(NULL);
+ }
+ }
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static void
+PLy_cursor_dealloc(PyObject *arg)
+{
+ PLyCursorObject *cursor;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) arg;
+
+ if (!cursor->closed)
+ {
+ portal = GetPortalByName(cursor->portalname);
+
+ if (PortalIsValid(portal))
+ SPI_cursor_close(portal);
+ }
+
+ PLy_free(cursor->portalname);
+ cursor->portalname = NULL;
+
+ PLy_typeinfo_dealloc(&cursor->result);
+ arg->ob_type->tp_free(arg);
+}
+
+static PyObject *
+PLy_cursor_iternext(PyObject *self)
+{
+ PLyCursorObject *cursor;
+ PyObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, 1);
+ if (SPI_processed == 0)
+ {
+ PyErr_SetNone(PyExc_StopIteration);
+ ret = NULL;
+ }
+ else
+ {
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc);
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return ret;
+}
+
+static PyObject *
+PLy_cursor_fetch(PyObject *self, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ int count;
+ PLyResultObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ if (!PyArg_ParseTuple(args, "i", &count))
+ return NULL;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ ret = (PLyResultObject *) PLy_result_new();
+ if (ret == NULL)
+ return NULL;
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, count);
+
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ Py_DECREF(ret->status);
+ ret->status = PyInt_FromLong(SPI_OK_FETCH);
+
+ Py_DECREF(ret->nrows);
+ ret->nrows = PyInt_FromLong(SPI_processed);
+
+ if (SPI_processed != 0)
+ {
+ int i;
+
+ Py_DECREF(ret->rows);
+ ret->rows = PyList_New(SPI_processed);
+
+ for (i = 0; i < SPI_processed; i++)
+ {
+ PyObject *row = PLyDict_FromTuple(&cursor->result,
+ SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc);
+ PyList_SetItem(ret->rows, i, row);
+ }
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return (PyObject *) ret;
+}
+
+static PyObject *
+PLy_cursor_close(PyObject *self, PyObject *unused)
+{
+ PLyCursorObject *cursor = (PLyCursorObject *) self;
+
+ if (!cursor->closed)
+ {
+ Portal portal = GetPortalByName(cursor->portalname);
+
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "closing a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ SPI_cursor_close(portal);
+ cursor->closed = true;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
/* s = plpy.subtransaction() */
static PyObject *
PLy_subtransaction(PyObject *self, PyObject *unused)
@@ -4184,6 +4824,8 @@ PLy_init_plpy(void)
elog(ERROR, "could not initialize PLy_ResultType");
if (PyType_Ready(&PLy_SubtransactionType) < 0)
elog(ERROR, "could not initialize PLy_SubtransactionType");
+ if (PyType_Ready(&PLy_CursorType) < 0)
+ elog(ERROR, "could not initialize PLy_CursorType");
#if PY_MAJOR_VERSION >= 3
PyModule_Create(&PLy_module);