aboutsummaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-protocol3.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2012-04-04 18:27:56 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2012-04-04 18:27:56 -0400
commit92785dac2ee7026948962cd61c4cd84a2d052772 (patch)
treedeb7a2c120978b9f3b85410317271a91b76ad66d /src/interfaces/libpq/fe-protocol3.c
parentcb917e1544612c187c74fed1a990e26820514c8a (diff)
downloadpostgresql-92785dac2ee7026948962cd61c4cd84a2d052772.tar.gz
postgresql-92785dac2ee7026948962cd61c4cd84a2d052772.zip
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing it back to the application. That provides a simple and transactional API, but it's pretty inefficient for large result sets. This patch allows the application to process each row on-the-fly instead of accumulating the rows into the PGresult. Error recovery becomes a bit more complex, but often that tradeoff is well worth making. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
Diffstat (limited to 'src/interfaces/libpq/fe-protocol3.c')
-rw-r--r--src/interfaces/libpq/fe-protocol3.c298
1 files changed, 211 insertions, 87 deletions
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbcb00f..a773d7a5246 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -44,7 +44,7 @@
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
-static int getRowDescriptions(PGconn *conn);
+static int getRowDescriptions(PGconn *conn, int msgLength);
static int getParamDescriptions(PGconn *conn);
static int getAnotherTuple(PGconn *conn, int msgLength);
static int getParameterStatus(PGconn *conn);
@@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet,
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
+ *
+ * Note: callers of parseInput must be prepared for a longjmp exit when we are
+ * in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput3(PGconn *conn)
@@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn)
conn->queryclass == PGQUERY_DESCRIBE)
{
/* First 'T' in a query sequence */
- if (getRowDescriptions(conn))
+ if (getRowDescriptions(conn, msgLength))
return;
-
- /*
- * If we're doing a Describe, we're ready to pass the
- * result back to the client.
- */
- if (conn->queryclass == PGQUERY_DESCRIBE)
- conn->asyncStatus = PGASYNC_READY;
+ /* getRowDescriptions() moves inStart itself */
+ continue;
}
else
{
@@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength))
return;
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else if (conn->result != NULL &&
conn->result->resultStatus == PGRES_FATAL_ERROR)
@@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
* parseInput subroutine to read a 'T' (row descriptions) message.
* We'll build a new PGresult structure (unless called for a Describe
* command for a prepared statement) containing the attribute data.
- * Returns: 0 if completed message, EOF if not enough data yet.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
*
- * Note that if we run out of data, we have to release the partially
- * constructed PGresult, and rebuild it again next time. Fortunately,
- * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
*/
static int
-getRowDescriptions(PGconn *conn)
+getRowDescriptions(PGconn *conn, int msgLength)
{
PGresult *result;
int nfields;
+ const char *errmsg;
int i;
/*
@@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn)
else
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
/* parseInput already read the 'T' label and message length. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
- goto failure;
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"T\" message");
+ goto advance_and_error;
+ }
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
@@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
@@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn)
pqGetInt(&atttypmod, 4, conn) ||
pqGetInt(&format, 2, conn))
{
- goto failure;
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"T\" message");
+ goto advance_and_error;
}
/*
@@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
- goto failure;
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
result->attDescs[i].tableid = tableid;
result->attDescs[i].columnid = columnid;
result->attDescs[i].format = format;
@@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn)
result->binary = 0;
}
+ /* Sanity check that we absorbed all the data */
+ if (conn->inCursor != conn->inStart + 5 + msgLength)
+ {
+ errmsg = libpq_gettext("extraneous data in \"T\" message");
+ goto advance_and_error;
+ }
+
/* Success! */
conn->result = result;
- return 0;
-failure:
+ /*
+ * Advance inStart to show that the "T" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
/*
- * Discard incomplete result, unless it's from getParamDescriptions.
- *
- * Note that if we hit a bufferload boundary while handling the
- * describe-statement case, we'll forget any PGresult space we just
- * allocated, and then reallocate it on next try. This will bloat the
- * PGresult a little bit but the space will be freed at PQclear, so it
- * doesn't seem worth trying to be smarter.
+ * If we're doing a Describe, we're done, and ready to pass the result
+ * back to the client.
*/
- if (result != conn->result)
+ if (conn->queryclass == PGQUERY_DESCRIBE)
+ {
+ conn->asyncStatus = PGASYNC_READY;
+ return 0;
+ }
+
+ /* Give the row processor a chance to initialize for new result set */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, NULL, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /* Discard unsaved result, if any */
+ if (result && result != conn->result)
PQclear(result);
- return EOF;
+
+ /* Discard the failed message by pretending we read it */
+ conn->inStart += 5 + msgLength;
+
+set_error_result:
+
+ /*
+ * Replace partially constructed result with an error result. First
+ * discard the old result to try to win back some memory.
+ */
+ pqClearAsyncResult(conn);
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+ pqSaveErrorResult(conn);
+
+ /*
+ * Return zero to allow input parsing to continue. Subsequent "D"
+ * messages will be ignored until we get to end of data, since an error
+ * result is already set up.
+ */
+ return 0;
}
/*
@@ -613,47 +691,53 @@ failure:
/*
* parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
- * Returns: 0 if completed message, EOF if error or not enough data yet.
+ * We fill rowbuf with column pointers and then call the row processor.
+ * Returns: 0 if processed message successfully, EOF to suspend parsing
+ * (the latter case is not actually used currently).
+ * In either case, conn->inStart has been advanced past the message.
*
- * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * Note: the row processor could also choose to longjmp out of libpq,
+ * in which case the library's state must allow for resumption at the
+ * next message.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ const char *errmsg;
+ PGdataValue *rowbuf;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
- {
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- }
- tup = conn->curTuple;
-
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
- return EOF;
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
+ }
if (tupnfields != nfields)
{
- /* Replace partially constructed result with an error result */
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("unexpected field count in \"D\" message\n"));
- pqSaveErrorResult(conn);
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
- return 0;
+ errmsg = libpq_gettext("unexpected field count in \"D\" message");
+ goto advance_and_error;
+ }
+
+ /* Resize row buffer if needed */
+ rowbuf = conn->rowBuf;
+ if (nfields > conn->rowBufLen)
+ {
+ rowbuf = (PGdataValue *) realloc(rowbuf,
+ nfields * sizeof(PGdataValue));
+ if (!rowbuf)
+ {
+ errmsg = NULL; /* means "out of memory", see below */
+ goto advance_and_error;
+ }
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
}
/* Scan the fields */
@@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength)
{
/* get the value length */
if (pqGetInt(&vlen, 4, conn))
- return EOF;
- if (vlen == -1)
{
- /* null field */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- continue;
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
}
- if (vlen < 0)
- vlen = 0;
- if (tup[i].value == NULL)
- {
- bool isbinary = (result->attDescs[i].format != 0);
+ rowbuf[i].len = vlen;
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
+ /*
+ * rowbuf[i].value always points to the next address in the data
+ * buffer even if the value is NULL. This allows row processors to
+ * estimate data sizes more easily.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+
+ /* Skip over the data value */
if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- return EOF;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
+ {
+ if (pqSkipnchar(vlen, conn))
+ {
+ /* We should not run out of data here, so complain */
+ errmsg = libpq_gettext("insufficient data in \"D\" message");
+ goto advance_and_error;
+ }
+ }
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
+ /* Sanity check that we absorbed all the data */
+ if (conn->inCursor != conn->inStart + 5 + msgLength)
+ {
+ errmsg = libpq_gettext("extraneous data in \"D\" message");
+ goto advance_and_error;
+ }
- return 0;
+ /*
+ * Advance inStart to show that the "D" message has been processed. We
+ * must do this before calling the row processor, in case it longjmps.
+ */
+ conn->inStart = conn->inCursor;
-outOfMemory:
+ /* Pass the completed row values to rowProcessor */
+ errmsg = NULL;
+ switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
+ conn->rowProcessorParam))
+ {
+ case 1:
+ /* everything is good */
+ return 0;
+
+ case -1:
+ /* error, report the errmsg below */
+ break;
+
+ default:
+ /* unrecognized return code */
+ errmsg = libpq_gettext("unrecognized return value from row processor");
+ break;
+ }
+ goto set_error_result;
+
+advance_and_error:
+ /* Discard the failed message by pretending we read it */
+ conn->inStart += 5 + msgLength;
+
+set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+
+ /*
+ * If row processor didn't provide an error message, assume "out of
+ * memory" was meant. The advantage of having this special case is that
+ * freeing the old result first greatly improves the odds that gettext()
+ * will succeed in providing a translation.
+ */
+ if (!errmsg)
+ errmsg = libpq_gettext("out of memory for query result");
+
+ printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
pqSaveErrorResult(conn);
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
+ /*
+ * Return zero to allow input parsing to continue. Subsequent "D"
+ * messages will be ignored until we get to end of data, since an error
+ * result is already set up.
+ */
return 0;
}