From 89e850e6fda9e4e441712012abe971fe938d595a Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 5 Dec 2011 19:52:15 +0200 Subject: plpython: Add SPI cursor support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a function plpy.cursor that is similar to plpy.execute but uses an SPI cursor to avoid fetching the entire result set into memory. Jan UrbaƄski, reviewed by Steve Singer --- src/pl/plpython/plpython.c | 642 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 642 insertions(+) (limited to 'src/pl/plpython/plpython.c') diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c index afd5dfce83a..29e0ac7c454 100644 --- a/src/pl/plpython/plpython.c +++ b/src/pl/plpython/plpython.c @@ -134,6 +134,11 @@ typedef int Py_ssize_t; PyObject_HEAD_INIT(type) size, #endif +/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */ +#if PY_MAJOR_VERSION >= 3 +#define Py_TPFLAGS_HAVE_ITER 0 +#endif + /* define our text domain for translations */ #undef TEXTDOMAIN #define TEXTDOMAIN PG_TEXTDOMAIN("plpython") @@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject bool exited; } PLySubtransactionObject; +typedef struct PLyCursorObject +{ + PyObject_HEAD + char *portalname; + PLyTypeInfo result; + bool closed; +} PLyCursorObject; + /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ typedef struct ExceptionMap { @@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = { "PostgreSQL subtransaction context manager" }; +static char PLy_cursor_doc[] = { + "Wrapper around a PostgreSQL cursor" +}; + /* * the function definitions @@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *); static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *); static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *); +static PyObject *PLy_cursor(PyObject *self, PyObject *unused); +static PyObject *PLy_cursor_query(const char *query); +static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args); +static void PLy_cursor_dealloc(PyObject *arg); +static PyObject *PLy_cursor_iternext(PyObject *self); +static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args); +static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused); + static PyMethodDef PLy_plan_methods[] = { {"status", PLy_plan_status, METH_VARARGS, NULL}, @@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = { PLy_subtransaction_methods, /* tp_tpmethods */ }; +static PyMethodDef PLy_cursor_methods[] = { + {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL}, + {"close", PLy_cursor_close, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PLy_CursorType = { + PyVarObject_HEAD_INIT(NULL, 0) + "PLyCursor", /* tp_name */ + sizeof(PLyCursorObject), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLy_cursor_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */ + PLy_cursor_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + PyObject_SelfIter, /* tp_iter */ + PLy_cursor_iternext, /* tp_iternext */ + PLy_cursor_methods, /* tp_tpmethods */ +}; + static PyMethodDef PLy_methods[] = { /* * logging methods @@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = { */ {"subtransaction", PLy_subtransaction, METH_NOARGS, NULL}, + /* + * create a cursor + */ + {"cursor", PLy_cursor, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) return (PyObject *) result; } +/* + * c = plpy.cursor("select * from largetable") + * c = plpy.cursor(plan, []) + */ +static PyObject * +PLy_cursor(PyObject *self, PyObject *args) +{ + char *query; + PyObject *plan; + PyObject *planargs = NULL; + + if (PyArg_ParseTuple(args, "s", &query)) + return PLy_cursor_query(query); + + PyErr_Clear(); + + if (PyArg_ParseTuple(args, "O|O", &plan, &planargs)) + return PLy_cursor_plan(plan, planargs); + + PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan"); + return NULL; +} + + +static PyObject * +PLy_cursor_query(const char *query) +{ + PLyCursorObject *cursor; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portalname = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPIPlanPtr plan; + Portal portal; + + pg_verifymbstr(query, strlen(query), false); + + plan = SPI_prepare(query, 0, NULL); + if (plan == NULL) + elog(ERROR, "SPI_prepare failed: %s", + SPI_result_code_string(SPI_result)); + + portal = SPI_cursor_open(NULL, plan, NULL, NULL, + PLy_curr_procedure->fn_readonly); + SPI_freeplan(plan); + + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portalname = PLy_strdup(portal->name); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + Assert(cursor->portalname != NULL); + return (PyObject *) cursor; +} + +static PyObject * +PLy_cursor_plan(PyObject *ob, PyObject *args) +{ + PLyCursorObject *cursor; + volatile int nargs; + int i; + PLyPlanObject *plan; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if (args) + { + if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args)) + { + PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument"); + return NULL; + } + nargs = PySequence_Length(args); + } + else + nargs = 0; + + plan = (PLyPlanObject *) ob; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(args); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portalname = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + Portal portal; + char *volatile nulls; + volatile int j; + + if (nargs > 0) + nulls = palloc(nargs * sizeof(char)); + else + nulls = NULL; + + for (j = 0; j < nargs; j++) + { + PyObject *elem; + + elem = PySequence_GetItem(args, j); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[j] = + plan->args[j].out.d.func(&(plan->args[j].out.d), + -1, + elem); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[j] = ' '; + } + else + { + Py_DECREF(elem); + plan->values[j] = + InputFunctionCall(&(plan->args[j].out.d.typfunc), + NULL, + plan->args[j].out.d.typioparam, + -1); + nulls[j] = 'n'; + } + } + + portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls, + PLy_curr_procedure->fn_readonly); + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portalname = PLy_strdup(portal->name); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + int k; + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* cleanup plan->values array */ + for (k = 0; k < nargs; k++) + { + if (!plan->args[k].out.d.typbyval && + (plan->values[k] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[k])); + plan->values[k] = PointerGetDatum(NULL); + } + } + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + for (i = 0; i < nargs; i++) + { + if (!plan->args[i].out.d.typbyval && + (plan->values[i] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[i])); + plan->values[i] = PointerGetDatum(NULL); + } + } + + Assert(cursor->portalname != NULL); + return (PyObject *) cursor; +} + +static void +PLy_cursor_dealloc(PyObject *arg) +{ + PLyCursorObject *cursor; + Portal portal; + + cursor = (PLyCursorObject *) arg; + + if (!cursor->closed) + { + portal = GetPortalByName(cursor->portalname); + + if (PortalIsValid(portal)) + SPI_cursor_close(portal); + } + + PLy_free(cursor->portalname); + cursor->portalname = NULL; + + PLy_typeinfo_dealloc(&cursor->result); + arg->ob_type->tp_free(arg); +} + +static PyObject * +PLy_cursor_iternext(PyObject *self) +{ + PLyCursorObject *cursor; + PyObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + Portal portal; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "iterating a closed cursor"); + return NULL; + } + + portal = GetPortalByName(cursor->portalname); + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "iterating a cursor in an aborted subtransaction"); + return NULL; + } + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPI_cursor_fetch(portal, true, 1); + if (SPI_processed == 0) + { + PyErr_SetNone(PyExc_StopIteration); + ret = NULL; + } + else + { + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0], + SPI_tuptable->tupdesc); + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return ret; +} + +static PyObject * +PLy_cursor_fetch(PyObject *self, PyObject *args) +{ + PLyCursorObject *cursor; + int count; + PLyResultObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + Portal portal; + + if (!PyArg_ParseTuple(args, "i", &count)) + return NULL; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor"); + return NULL; + } + + portal = GetPortalByName(cursor->portalname); + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "iterating a cursor in an aborted subtransaction"); + return NULL; + } + + ret = (PLyResultObject *) PLy_result_new(); + if (ret == NULL) + return NULL; + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPI_cursor_fetch(portal, true, count); + + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + Py_DECREF(ret->status); + ret->status = PyInt_FromLong(SPI_OK_FETCH); + + Py_DECREF(ret->nrows); + ret->nrows = PyInt_FromLong(SPI_processed); + + if (SPI_processed != 0) + { + int i; + + Py_DECREF(ret->rows); + ret->rows = PyList_New(SPI_processed); + + for (i = 0; i < SPI_processed; i++) + { + PyObject *row = PLyDict_FromTuple(&cursor->result, + SPI_tuptable->vals[i], + SPI_tuptable->tupdesc); + PyList_SetItem(ret->rows, i, row); + } + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return (PyObject *) ret; +} + +static PyObject * +PLy_cursor_close(PyObject *self, PyObject *unused) +{ + PLyCursorObject *cursor = (PLyCursorObject *) self; + + if (!cursor->closed) + { + Portal portal = GetPortalByName(cursor->portalname); + + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "closing a cursor in an aborted subtransaction"); + return NULL; + } + + SPI_cursor_close(portal); + cursor->closed = true; + } + + Py_INCREF(Py_None); + return Py_None; +} + /* s = plpy.subtransaction() */ static PyObject * PLy_subtransaction(PyObject *self, PyObject *unused) @@ -4184,6 +4824,8 @@ PLy_init_plpy(void) elog(ERROR, "could not initialize PLy_ResultType"); if (PyType_Ready(&PLy_SubtransactionType) < 0) elog(ERROR, "could not initialize PLy_SubtransactionType"); + if (PyType_Ready(&PLy_CursorType) < 0) + elog(ERROR, "could not initialize PLy_CursorType"); #if PY_MAJOR_VERSION >= 3 PyModule_Create(&PLy_module); -- cgit v1.2.3