diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2024-04-06 20:41:32 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2024-04-06 20:45:11 -0400 |
commit | 4643a2b265e967cc5f13ffa0c7c6912dbb3466d0 (patch) | |
tree | ae3c3e182c5e0586dcecd93f0957deb2d0416b68 /src/interfaces/libpq/fe-exec.c | |
parent | 92641d8d651e685b49a6e2842d306aa5fe7ba500 (diff) | |
download | postgresql-4643a2b265e967cc5f13ffa0c7c6912dbb3466d0.tar.gz postgresql-4643a2b265e967cc5f13ffa0c7c6912dbb3466d0.zip |
Support retrieval of results in chunks with libpq.
This patch generalizes libpq's existing single-row mode to allow
individual partial-result PGresults to contain up to N rows, rather
than always one row. This reduces malloc overhead compared to plain
single-row mode, and it is very useful for psql's FETCH_COUNT feature,
since otherwise we'd have to add code (and cycles) to either merge
single-row PGresults into a bigger one or teach psql's
results-printing logic to accept arrays of PGresults.
To avoid API breakage, PQsetSingleRowMode() remains the same, and we
add a new function PQsetChunkedRowsMode() to invoke the more general
case. Also, PGresults obtained the old way continue to carry the
PGRES_SINGLE_TUPLE status code, while if PQsetChunkedRowsMode() is
used then their status code is PGRES_TUPLES_CHUNK. The underlying
logic is the same either way, though.
Daniel Vérité, reviewed by Laurenz Albe and myself (and whacked
around a bit by me, so any remaining bugs are my fault)
Discussion: https://postgr.es/m/CAKZiRmxsVTkO928CM+-ADvsMyePmU3L9DQCa9NwqjvLPcEe5QA@mail.gmail.com
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 146 |
1 files changed, 93 insertions, 53 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c02a9180b24..7bdfc4c21aa 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" }; /* We return this if we're unable to make a PGresult at all */ @@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: /* non-error cases */ break; default: @@ -771,7 +773,7 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. * - * Any "next" result gets cleared too. + * Any "saved" result gets cleared too. */ void pqClearAsyncResult(PGconn *conn) @@ -779,8 +781,8 @@ pqClearAsyncResult(PGconn *conn) PQclear(conn->result); conn->result = NULL; conn->error_result = false; - PQclear(conn->next_result); - conn->next_result = NULL; + PQclear(conn->saved_result); + conn->saved_result = NULL; } /* @@ -911,14 +913,14 @@ pqPrepareAsyncResult(PGconn *conn) } /* - * Replace conn->result with next_result, if any. In the normal case - * there isn't a next result and we're just dropping ownership of the - * current result. In single-row mode this restores the situation to what - * it was before we created the current single-row result. + * Replace conn->result with saved_result, if any. In the normal case + * there isn't a saved result and we're just dropping ownership of the + * current result. In partial-result mode this restores the situation to + * what it was before we created the current partial result. */ - conn->result = conn->next_result; - conn->error_result = false; /* next_result is never an error */ - conn->next_result = NULL; + conn->result = conn->saved_result; + conn->error_result = false; /* saved_result is never an error */ + conn->saved_result = NULL; return res; } @@ -1199,11 +1201,6 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) * On error, *errmsgp can be set to an error string to be returned. * (Such a string should already be translated via libpq_gettext().) * If it is left NULL, the error is presumed to be "out of memory". - * - * In single-row mode, we create a new result holding just the current row, - * stashing the previous result in conn->next_result so that it becomes - * active again after pqPrepareAsyncResult(). This allows the result metadata - * (column descriptions) to be carried forward to each result row. */ int pqRowProcessor(PGconn *conn, const char **errmsgp) @@ -1215,11 +1212,14 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) int i; /* - * In single-row mode, make a new PGresult that will hold just this one - * row; the original conn->result is left unchanged so that it can be used - * again as the template for future rows. + * In partial-result mode, if we don't already have a partial PGresult + * then make one by cloning conn->result (which should hold the correct + * result metadata by now). Then the original conn->result is moved over + * to saved_result so that we can re-use it as a reference for future + * partial results. The saved result will become active again after + * pqPrepareAsyncResult() returns the partial result to the application. */ - if (conn->singleRowMode) + if (conn->partialResMode && conn->saved_result == NULL) { /* Copy everything that should be in the result at this point */ res = PQcopyResult(res, @@ -1227,6 +1227,11 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) PG_COPYRES_NOTICEHOOKS); if (!res) return 0; + /* Change result status to appropriate special value */ + res->resultStatus = (conn->singleRowMode ? PGRES_SINGLE_TUPLE : PGRES_TUPLES_CHUNK); + /* And stash it as the active result */ + conn->saved_result = conn->result; + conn->result = res; } /* @@ -1241,7 +1246,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) tup = (PGresAttValue *) pqResultAlloc(res, nfields * sizeof(PGresAttValue), true); if (tup == NULL) - goto fail; + return 0; for (i = 0; i < nfields; i++) { @@ -1260,7 +1265,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) val = (char *) pqResultAlloc(res, clen + 1, isbinary); if (val == NULL) - goto fail; + return 0; /* copy and zero-terminate the data (even if it's binary) */ memcpy(val, columns[i].value, clen); @@ -1273,30 +1278,16 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) /* And add the tuple to the PGresult's tuple array */ if (!pqAddTuple(res, tup, errmsgp)) - goto fail; + return 0; /* - * Success. In single-row mode, make the result available to the client - * immediately. + * Success. In partial-result mode, if we have enough rows then make the + * result available to the client immediately. */ - if (conn->singleRowMode) - { - /* Change result status to special single-row value */ - res->resultStatus = PGRES_SINGLE_TUPLE; - /* Stash old result for re-use later */ - conn->next_result = conn->result; - conn->result = res; - /* And mark the result ready to return */ + if (conn->partialResMode && res->ntups >= conn->maxChunkSize) conn->asyncStatus = PGASYNC_READY_MORE; - } return 1; - -fail: - /* release locally allocated PGresult, if we made one */ - if (res != conn->result) - PQclear(res); - return 0; } @@ -1745,8 +1736,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery) */ pqClearAsyncResult(conn); - /* reset single-row processing mode */ + /* reset partial-result mode */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; } /* ready to send command message */ @@ -1926,29 +1919,60 @@ sendFailed: } /* - * Select row-by-row processing mode + * Is it OK to change partial-result mode now? */ -int -PQsetSingleRowMode(PGconn *conn) +static bool +canChangeResultMode(PGconn *conn) { /* - * Only allow setting the flag when we have launched a query and not yet + * Only allow changing the mode when we have launched a query and not yet * received any results. */ if (!conn) - return 0; + return false; if (conn->asyncStatus != PGASYNC_BUSY) - return 0; + return false; if (!conn->cmd_queue_head || (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) - return 0; + return false; if (pgHavePendingResult(conn)) + return false; + return true; +} + +/* + * Select row-by-row processing mode + */ +int +PQsetSingleRowMode(PGconn *conn) +{ + if (canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = true; + conn->maxChunkSize = 1; + return 1; + } + else return 0; +} - /* OK, set flag */ - conn->singleRowMode = true; - return 1; +/* + * Select chunked results processing mode + */ +int +PQsetChunkedRowsMode(PGconn *conn, int chunkSize) +{ + if (chunkSize > 0 && canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = false; + conn->maxChunkSize = chunkSize; + return 1; + } + else + return 0; } /* @@ -2117,6 +2141,20 @@ PQgetResult(PGconn *conn) case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + /* + * Normally pqPrepareAsyncResult will have left conn->result + * empty. Otherwise, "res" must be a not-full PGRES_TUPLES_CHUNK + * result, which we want to return to the caller while staying in + * PGASYNC_READY state. Then the next call here will return the + * empty PGRES_TUPLES_OK result that was restored from + * saved_result, after which we can proceed. + */ + if (conn->result) + { + Assert(res->resultStatus == PGRES_TUPLES_CHUNK); + break; + } + /* Advance the queue as appropriate */ pqCommandQueueAdvance(conn, false, res->resultStatus == PGRES_PIPELINE_SYNC); @@ -3173,10 +3211,12 @@ pqPipelineProcessQueue(PGconn *conn) } /* - * Reset single-row processing mode. (Client has to set it up for each - * query, if desired.) + * Reset partial-result mode. (Client has to set it up for each query, if + * desired.) */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; /* * If there are no further commands to process in the queue, get us in |