diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2003-04-29 03:21:30 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2003-04-29 03:21:30 +0000 |
commit | 4a5f38c4e624fbdc8bb4511394c45dc2c7f784ec (patch) | |
tree | 715c7e2440e4616e6084b6348453ad379733e9c9 /src/backend/commands | |
parent | 6a17d226b535c1fc7db8a10b5f2b482e852af928 (diff) | |
download | postgresql-4a5f38c4e624fbdc8bb4511394c45dc2c7f784ec.tar.gz postgresql-4a5f38c4e624fbdc8bb4511394c45dc2c7f784ec.zip |
Code review for holdable-cursors patch. Fix error recovery, memory
context sloppiness, some other things. Includes Neil's mopup patch
of 22-Apr.
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/portalcmds.c | 288 |
1 files changed, 149 insertions, 139 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 7eabc58d495..9dc7583f06c 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.11 2003/03/27 16:51:27 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.12 2003/04/29 03:21:29 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,14 +24,12 @@ #include "rewrite/rewriteHandler.h" #include "utils/memutils.h" + static long DoRelativeFetch(Portal portal, bool forward, long count, CommandDest dest); -static long DoRelativeStoreFetch(Portal portal, - bool forward, - long count, - CommandDest dest); +static uint32 RunFromStore(Portal portal, ScanDirection direction, long count); static void DoPortalRewind(Portal portal); static Portal PreparePortal(DeclareCursorStmt *stmt); @@ -52,11 +50,9 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) QueryDesc *queryDesc; /* - * If this is a non-holdable cursor, we ensure that this statement + * If this is a non-holdable cursor, we require that this statement * has been executed inside a transaction block (or else, it would * have no user-visible effect). - * - * XXX: surely there is a better way to check this? */ if (!(stmt->options & CURSOR_OPT_HOLD)) RequireTransactionChain((void *) stmt, "DECLARE CURSOR"); @@ -106,7 +102,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) ExecutorStart(queryDesc); /* Arrange to shut down the executor if portal is dropped */ - PortalSetQuery(portal, queryDesc, PortalCleanup); + PortalSetQuery(portal, queryDesc); /* * We're done; the query won't actually be run until PerformPortalFetch @@ -352,15 +348,11 @@ DoRelativeFetch(Portal portal, CommandDest dest) { QueryDesc *queryDesc; - EState *estate; - ScanDirection direction; QueryDesc temp_queryDesc; - - if (portal->holdStore) - return DoRelativeStoreFetch(portal, forward, count, dest); + ScanDirection direction; + uint32 nprocessed; queryDesc = PortalGetQueryDesc(portal); - estate = queryDesc->estate; /* * If the requested destination is not the same as the query's @@ -403,19 +395,26 @@ DoRelativeFetch(Portal portal, if (count == FETCH_ALL) count = 0; - ExecutorRun(queryDesc, direction, count); + if (portal->holdStore) + nprocessed = RunFromStore(portal, direction, count); + else + { + Assert(portal->executorRunning); + ExecutorRun(queryDesc, direction, count); + nprocessed = queryDesc->estate->es_processed; + } if (direction != NoMovementScanDirection) { long oldPos; - if (estate->es_processed > 0) + if (nprocessed > 0) portal->atStart = false; /* OK to go backward now */ if (count == 0 || - (unsigned long) estate->es_processed < (unsigned long) count) + (unsigned long) nprocessed < (unsigned long) count) portal->atEnd = true; /* we retrieved 'em all */ oldPos = portal->portalPos; - portal->portalPos += estate->es_processed; + portal->portalPos += nprocessed; /* portalPos doesn't advance when we fall off the end */ if (portal->portalPos < oldPos) portal->posOverflow = true; @@ -436,17 +435,24 @@ DoRelativeFetch(Portal portal, if (count == FETCH_ALL) count = 0; - ExecutorRun(queryDesc, direction, count); + if (portal->holdStore) + nprocessed = RunFromStore(portal, direction, count); + else + { + Assert(portal->executorRunning); + ExecutorRun(queryDesc, direction, count); + nprocessed = queryDesc->estate->es_processed; + } if (direction != NoMovementScanDirection) { - if (estate->es_processed > 0 && portal->atEnd) + if (nprocessed > 0 && portal->atEnd) { portal->atEnd = false; /* OK to go forward now */ portal->portalPos++; /* adjust for endpoint case */ } if (count == 0 || - (unsigned long) estate->es_processed < (unsigned long) count) + (unsigned long) nprocessed < (unsigned long) count) { portal->atStart = true; /* we retrieved 'em all */ portal->portalPos = 0; @@ -457,7 +463,7 @@ DoRelativeFetch(Portal portal, long oldPos; oldPos = portal->portalPos; - portal->portalPos -= estate->es_processed; + portal->portalPos -= nprocessed; if (portal->portalPos > oldPos || portal->portalPos <= 0) portal->posOverflow = true; @@ -465,76 +471,75 @@ DoRelativeFetch(Portal portal, } } - return estate->es_processed; + return nprocessed; } /* - * DoRelativeStoreFetch - * Do fetch for a simple N-rows-forward-or-backward case, getting - * the results from the portal's tuple store. + * RunFromStore + * Fetch tuples from the portal's tuple store. + * + * Calling conventions are similar to ExecutorRun, except that we + * do not depend on having an estate, and therefore return the number + * of tuples processed as the result, not in estate->es_processed. + * + * One difference from ExecutorRun is that the destination receiver functions + * are run in the caller's memory context (since we have no estate). Watch + * out for memory leaks. */ -static long -DoRelativeStoreFetch(Portal portal, - bool forward, - long count, - CommandDest dest) +static uint32 +RunFromStore(Portal portal, ScanDirection direction, long count) { + QueryDesc *queryDesc = PortalGetQueryDesc(portal); DestReceiver *destfunc; - QueryDesc *queryDesc = portal->queryDesc; - long rows_fetched = 0; + long current_tuple_count = 0; - if (!forward && portal->scrollType == DISABLE_SCROLL) - elog(ERROR, "Cursor can only scan forward" - "\n\tDeclare it with SCROLL option to enable backward scan"); - - destfunc = DestToFunction(dest); + destfunc = DestToFunction(queryDesc->dest); (*destfunc->setup) (destfunc, queryDesc->operation, - portal->name, queryDesc->tupDesc); + queryDesc->portalName, queryDesc->tupDesc); - for (;;) + if (direction == NoMovementScanDirection) { - HeapTuple tup; - bool should_free; + /* do nothing except start/stop the destination */ + } + else + { + bool forward = (direction == ForwardScanDirection); - if (rows_fetched >= count) - break; - if (portal->atEnd && forward) - break; - if (portal->atStart && !forward) - break; + for (;;) + { + MemoryContext oldcontext; + HeapTuple tup; + bool should_free; - tup = tuplestore_getheaptuple(portal->holdStore, forward, &should_free); + oldcontext = MemoryContextSwitchTo(portal->holdContext); - if (tup == NULL) - { - if (forward) - portal->atEnd = true; - else - portal->atStart = true; + tup = tuplestore_getheaptuple(portal->holdStore, forward, + &should_free); - break; - } + MemoryContextSwitchTo(oldcontext); - (*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc); + if (tup == NULL) + break; - rows_fetched++; - if (forward) - portal->portalPos++; - else - portal->portalPos--; + (*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc); - if (forward && portal->atStart) - portal->atStart = false; - if (!forward && portal->atEnd) - portal->atEnd = false; + if (should_free) + pfree(tup); - if (should_free) - pfree(tup); + /* + * check our tuple count.. if we've processed the proper number + * then quit, else loop again and process more tuples. Zero + * count means no limit. + */ + current_tuple_count++; + if (count && count == current_tuple_count) + break; + } } (*destfunc->cleanup) (destfunc); - return rows_fetched; + return (uint32) current_tuple_count; } /* @@ -544,9 +549,17 @@ static void DoPortalRewind(Portal portal) { if (portal->holdStore) + { + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(portal->holdContext); tuplestore_rescan(portal->holdStore); - else + MemoryContextSwitchTo(oldcontext); + } + if (portal->executorRunning) + { ExecutorRewind(PortalGetQueryDesc(portal)); + } portal->atStart = true; portal->atEnd = false; @@ -630,12 +643,11 @@ PreparePortal(DeclareCursorStmt *stmt) /* * PortalCleanup * - * Clean up a portal when it's dropped. Since this mainly exists to run - * ExecutorEnd(), it should not be set as the cleanup hook until we have - * called ExecutorStart() on the portal's query. + * Clean up a portal when it's dropped. This is the standard cleanup hook + * for portals. */ void -PortalCleanup(Portal portal) +PortalCleanup(Portal portal, bool isError) { /* * sanity checks @@ -643,11 +655,31 @@ PortalCleanup(Portal portal) AssertArg(PortalIsValid(portal)); AssertArg(portal->cleanup == PortalCleanup); + /* + * Delete tuplestore if present. (Note: portalmem.c is responsible + * for removing holdContext.) We should do this even under error + * conditions. + */ if (portal->holdStore) - tuplestore_end(portal->holdStore); - else - ExecutorEnd(PortalGetQueryDesc(portal)); + { + MemoryContext oldcontext; + oldcontext = MemoryContextSwitchTo(portal->holdContext); + tuplestore_end(portal->holdStore); + MemoryContextSwitchTo(oldcontext); + portal->holdStore = NULL; + } + /* + * Shut down executor, if still running. We skip this during error + * abort, since other mechanisms will take care of releasing executor + * resources, and we can't be sure that ExecutorEnd itself wouldn't fail. + */ + if (portal->executorRunning) + { + portal->executorRunning = false; + if (!isError) + ExecutorEnd(PortalGetQueryDesc(portal)); + } } /* @@ -661,8 +693,10 @@ PortalCleanup(Portal portal) void PersistHoldablePortal(Portal portal) { - MemoryContext oldcxt; QueryDesc *queryDesc = PortalGetQueryDesc(portal); + MemoryContext oldcxt; + CommandDest olddest; + TupleDesc tupdesc; /* * If we're preserving a holdable portal, we had better be @@ -672,17 +706,15 @@ PersistHoldablePortal(Portal portal) Assert(portal->holdStore == NULL); /* - * This context is used to store portal data that needs to persist - * between transactions. + * This context is used to store the tuple set. + * Caller must have created it already. */ + Assert(portal->holdContext != NULL); oldcxt = MemoryContextSwitchTo(portal->holdContext); /* XXX: Should SortMem be used for this? */ portal->holdStore = tuplestore_begin_heap(true, true, SortMem); - /* Set the destination to output to the tuplestore */ - queryDesc->dest = Tuplestore; - /* * Rewind the executor: we need to store the entire result set in * the tuplestore, so that subsequent backward FETCHs can be @@ -690,8 +722,29 @@ PersistHoldablePortal(Portal portal) */ ExecutorRewind(queryDesc); + /* Set the destination to output to the tuplestore */ + olddest = queryDesc->dest; + queryDesc->dest = Tuplestore; + /* Fetch the result set into the tuplestore */ - ExecutorRun(queryDesc, ForwardScanDirection, 0); + ExecutorRun(queryDesc, ForwardScanDirection, 0L); + + queryDesc->dest = olddest; + + /* + * Before closing down the executor, we must copy the tupdesc, since + * it was created in executor memory. + */ + tupdesc = CreateTupleDescCopy(queryDesc->tupDesc); + + /* + * Now shut down the inner executor. + */ + portal->executorRunning = false; + ExecutorEnd(queryDesc); + + /* ExecutorEnd clears this, so must wait to save copied pointer */ + queryDesc->tupDesc = tupdesc; /* * Reset the position in the result set: ideally, this could be @@ -702,69 +755,26 @@ PersistHoldablePortal(Portal portal) */ if (!portal->atEnd) { - int store_pos = 0; - bool should_free; + long store_pos; tuplestore_rescan(portal->holdStore); - while (store_pos < portal->portalPos) + for (store_pos = 0; store_pos < portal->portalPos; store_pos++) { - HeapTuple tmp = tuplestore_gettuple(portal->holdStore, - true, &should_free); + HeapTuple tup; + bool should_free; - if (tmp == NULL) + tup = tuplestore_gettuple(portal->holdStore, true, + &should_free); + + if (tup == NULL) elog(ERROR, "PersistHoldablePortal: unexpected end of tuple stream"); - store_pos++; - - /* - * This could probably be optimized by creating and then - * deleting a separate memory context for this series of - * operations. - */ if (should_free) - pfree(tmp); + pfree(tup); } } - /* - * The current Portal structure contains some data that will be - * needed by the holdable cursor, but it has been allocated in a - * memory context that is not sufficiently long-lived: we need to - * copy it into the portal's long-term memory context. - */ - { - TupleDesc tupDescCopy; - QueryDesc *queryDescCopy; - - /* - * We need to use this order as ExecutorEnd invalidates the - * queryDesc's tuple descriptor - */ - tupDescCopy = CreateTupleDescCopy(queryDesc->tupDesc); - - ExecutorEnd(queryDesc); - - queryDescCopy = palloc(sizeof(*queryDescCopy)); - - /* - * This doesn't copy all the dependant data in the QueryDesc, - * but that's okay -- the only complex field we need to keep is - * the query's tupledesc, which we've copied ourselves. - */ - memcpy(queryDescCopy, queryDesc, sizeof(*queryDesc)); - - FreeQueryDesc(queryDesc); - - queryDescCopy->tupDesc = tupDescCopy; - portal->queryDesc = queryDescCopy; - } - - /* - * We no longer need the portal's short-term memory context. - */ - MemoryContextDelete(PortalGetHeapMemory(portal)); - - PortalGetHeapMemory(portal) = NULL; + MemoryContextSwitchTo(oldcxt); } |