aboutsummaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-exec.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2024-04-06 20:41:32 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2024-04-06 20:45:11 -0400
commit4643a2b265e967cc5f13ffa0c7c6912dbb3466d0 (patch)
treeae3c3e182c5e0586dcecd93f0957deb2d0416b68 /src/interfaces/libpq/fe-exec.c
parent92641d8d651e685b49a6e2842d306aa5fe7ba500 (diff)
downloadpostgresql-4643a2b265e967cc5f13ffa0c7c6912dbb3466d0.tar.gz
postgresql-4643a2b265e967cc5f13ffa0c7c6912dbb3466d0.zip
Support retrieval of results in chunks with libpq.
This patch generalizes libpq's existing single-row mode to allow individual partial-result PGresults to contain up to N rows, rather than always one row. This reduces malloc overhead compared to plain single-row mode, and it is very useful for psql's FETCH_COUNT feature, since otherwise we'd have to add code (and cycles) to either merge single-row PGresults into a bigger one or teach psql's results-printing logic to accept arrays of PGresults. To avoid API breakage, PQsetSingleRowMode() remains the same, and we add a new function PQsetChunkedRowsMode() to invoke the more general case. Also, PGresults obtained the old way continue to carry the PGRES_SINGLE_TUPLE status code, while if PQsetChunkedRowsMode() is used then their status code is PGRES_TUPLES_CHUNK. The underlying logic is the same either way, though. Daniel Vérité, reviewed by Laurenz Albe and myself (and whacked around a bit by me, so any remaining bugs are my fault) Discussion: https://postgr.es/m/CAKZiRmxsVTkO928CM+-ADvsMyePmU3L9DQCa9NwqjvLPcEe5QA@mail.gmail.com
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r--src/interfaces/libpq/fe-exec.c146
1 files changed, 93 insertions, 53 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c02a9180b24..7bdfc4c21aa 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
- "PGRES_PIPELINE_ABORTED"
+ "PGRES_PIPELINE_ABORTED",
+ "PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -771,7 +773,7 @@ PQclear(PGresult *res)
/*
* Handy subroutine to deallocate any partially constructed async result.
*
- * Any "next" result gets cleared too.
+ * Any "saved" result gets cleared too.
*/
void
pqClearAsyncResult(PGconn *conn)
@@ -779,8 +781,8 @@ pqClearAsyncResult(PGconn *conn)
PQclear(conn->result);
conn->result = NULL;
conn->error_result = false;
- PQclear(conn->next_result);
- conn->next_result = NULL;
+ PQclear(conn->saved_result);
+ conn->saved_result = NULL;
}
/*
@@ -911,14 +913,14 @@ pqPrepareAsyncResult(PGconn *conn)
}
/*
- * Replace conn->result with next_result, if any. In the normal case
- * there isn't a next result and we're just dropping ownership of the
- * current result. In single-row mode this restores the situation to what
- * it was before we created the current single-row result.
+ * Replace conn->result with saved_result, if any. In the normal case
+ * there isn't a saved result and we're just dropping ownership of the
+ * current result. In partial-result mode this restores the situation to
+ * what it was before we created the current partial result.
*/
- conn->result = conn->next_result;
- conn->error_result = false; /* next_result is never an error */
- conn->next_result = NULL;
+ conn->result = conn->saved_result;
+ conn->error_result = false; /* saved_result is never an error */
+ conn->saved_result = NULL;
return res;
}
@@ -1199,11 +1201,6 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* On error, *errmsgp can be set to an error string to be returned.
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
- *
- * In single-row mode, we create a new result holding just the current row,
- * stashing the previous result in conn->next_result so that it becomes
- * active again after pqPrepareAsyncResult(). This allows the result metadata
- * (column descriptions) to be carried forward to each result row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1215,11 +1212,14 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
int i;
/*
- * In single-row mode, make a new PGresult that will hold just this one
- * row; the original conn->result is left unchanged so that it can be used
- * again as the template for future rows.
+ * In partial-result mode, if we don't already have a partial PGresult
+ * then make one by cloning conn->result (which should hold the correct
+ * result metadata by now). Then the original conn->result is moved over
+ * to saved_result so that we can re-use it as a reference for future
+ * partial results. The saved result will become active again after
+ * pqPrepareAsyncResult() returns the partial result to the application.
*/
- if (conn->singleRowMode)
+ if (conn->partialResMode && conn->saved_result == NULL)
{
/* Copy everything that should be in the result at this point */
res = PQcopyResult(res,
@@ -1227,6 +1227,11 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
PG_COPYRES_NOTICEHOOKS);
if (!res)
return 0;
+ /* Change result status to appropriate special value */
+ res->resultStatus = (conn->singleRowMode ? PGRES_SINGLE_TUPLE : PGRES_TUPLES_CHUNK);
+ /* And stash it as the active result */
+ conn->saved_result = conn->result;
+ conn->result = res;
}
/*
@@ -1241,7 +1246,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
tup = (PGresAttValue *)
pqResultAlloc(res, nfields * sizeof(PGresAttValue), true);
if (tup == NULL)
- goto fail;
+ return 0;
for (i = 0; i < nfields; i++)
{
@@ -1260,7 +1265,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
val = (char *) pqResultAlloc(res, clen + 1, isbinary);
if (val == NULL)
- goto fail;
+ return 0;
/* copy and zero-terminate the data (even if it's binary) */
memcpy(val, columns[i].value, clen);
@@ -1273,30 +1278,16 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
/* And add the tuple to the PGresult's tuple array */
if (!pqAddTuple(res, tup, errmsgp))
- goto fail;
+ return 0;
/*
- * Success. In single-row mode, make the result available to the client
- * immediately.
+ * Success. In partial-result mode, if we have enough rows then make the
+ * result available to the client immediately.
*/
- if (conn->singleRowMode)
- {
- /* Change result status to special single-row value */
- res->resultStatus = PGRES_SINGLE_TUPLE;
- /* Stash old result for re-use later */
- conn->next_result = conn->result;
- conn->result = res;
- /* And mark the result ready to return */
+ if (conn->partialResMode && res->ntups >= conn->maxChunkSize)
conn->asyncStatus = PGASYNC_READY_MORE;
- }
return 1;
-
-fail:
- /* release locally allocated PGresult, if we made one */
- if (res != conn->result)
- PQclear(res);
- return 0;
}
@@ -1745,8 +1736,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
- /* reset single-row processing mode */
+ /* reset partial-result mode */
+ conn->partialResMode = false;
conn->singleRowMode = false;
+ conn->maxChunkSize = 0;
}
/* ready to send command message */
@@ -1926,29 +1919,60 @@ sendFailed:
}
/*
- * Select row-by-row processing mode
+ * Is it OK to change partial-result mode now?
*/
-int
-PQsetSingleRowMode(PGconn *conn)
+static bool
+canChangeResultMode(PGconn *conn)
{
/*
- * Only allow setting the flag when we have launched a query and not yet
+ * Only allow changing the mode when we have launched a query and not yet
* received any results.
*/
if (!conn)
- return 0;
+ return false;
if (conn->asyncStatus != PGASYNC_BUSY)
- return 0;
+ return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
- return 0;
+ return false;
if (pgHavePendingResult(conn))
+ return false;
+ return true;
+}
+
+/*
+ * Select row-by-row processing mode
+ */
+int
+PQsetSingleRowMode(PGconn *conn)
+{
+ if (canChangeResultMode(conn))
+ {
+ conn->partialResMode = true;
+ conn->singleRowMode = true;
+ conn->maxChunkSize = 1;
+ return 1;
+ }
+ else
return 0;
+}
- /* OK, set flag */
- conn->singleRowMode = true;
- return 1;
+/*
+ * Select chunked results processing mode
+ */
+int
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
+{
+ if (chunkSize > 0 && canChangeResultMode(conn))
+ {
+ conn->partialResMode = true;
+ conn->singleRowMode = false;
+ conn->maxChunkSize = chunkSize;
+ return 1;
+ }
+ else
+ return 0;
}
/*
@@ -2117,6 +2141,20 @@ PQgetResult(PGconn *conn)
case PGASYNC_READY:
res = pqPrepareAsyncResult(conn);
+ /*
+ * Normally pqPrepareAsyncResult will have left conn->result
+ * empty. Otherwise, "res" must be a not-full PGRES_TUPLES_CHUNK
+ * result, which we want to return to the caller while staying in
+ * PGASYNC_READY state. Then the next call here will return the
+ * empty PGRES_TUPLES_OK result that was restored from
+ * saved_result, after which we can proceed.
+ */
+ if (conn->result)
+ {
+ Assert(res->resultStatus == PGRES_TUPLES_CHUNK);
+ break;
+ }
+
/* Advance the queue as appropriate */
pqCommandQueueAdvance(conn, false,
res->resultStatus == PGRES_PIPELINE_SYNC);
@@ -3173,10 +3211,12 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
- * Reset single-row processing mode. (Client has to set it up for each
- * query, if desired.)
+ * Reset partial-result mode. (Client has to set it up for each query, if
+ * desired.)
*/
+ conn->partialResMode = false;
conn->singleRowMode = false;
+ conn->maxChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in