diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2012-04-04 18:27:56 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2012-04-04 18:27:56 -0400 |
commit | 92785dac2ee7026948962cd61c4cd84a2d052772 (patch) | |
tree | deb7a2c120978b9f3b85410317271a91b76ad66d /src/interfaces/libpq/fe-protocol3.c | |
parent | cb917e1544612c187c74fed1a990e26820514c8a (diff) | |
download | postgresql-92785dac2ee7026948962cd61c4cd84a2d052772.tar.gz postgresql-92785dac2ee7026948962cd61c4cd84a2d052772.zip |
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing
it back to the application. That provides a simple and transactional API,
but it's pretty inefficient for large result sets. This patch allows the
application to process each row on-the-fly instead of accumulating the
rows into the PGresult. Error recovery becomes a bit more complex, but
often that tradeoff is well worth making.
Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
Diffstat (limited to 'src/interfaces/libpq/fe-protocol3.c')
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 298 |
1 files changed, 211 insertions, 87 deletions
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbcb00f..a773d7a5246 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -44,7 +44,7 @@ static void handleSyncLoss(PGconn *conn, char id, int msgLength); -static int getRowDescriptions(PGconn *conn); +static int getRowDescriptions(PGconn *conn, int msgLength); static int getParamDescriptions(PGconn *conn); static int getAnotherTuple(PGconn *conn, int msgLength); static int getParameterStatus(PGconn *conn); @@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet, * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. + * + * Note: callers of parseInput must be prepared for a longjmp exit when we are + * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput3(PGconn *conn) @@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn) conn->queryclass == PGQUERY_DESCRIBE) { /* First 'T' in a query sequence */ - if (getRowDescriptions(conn)) + if (getRowDescriptions(conn, msgLength)) return; - - /* - * If we're doing a Describe, we're ready to pass the - * result back to the client. - */ - if (conn->queryclass == PGQUERY_DESCRIBE) - conn->asyncStatus = PGASYNC_READY; + /* getRowDescriptions() moves inStart itself */ + continue; } else { @@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn) /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) return; + /* getAnotherTuple() moves inStart itself */ + continue; } else if (conn->result != NULL && conn->result->resultStatus == PGRES_FATAL_ERROR) @@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) * parseInput subroutine to read a 'T' (row descriptions) message. * We'll build a new PGresult structure (unless called for a Describe * command for a prepared statement) containing the attribute data. - * Returns: 0 if completed message, EOF if not enough data yet. + * Returns: 0 if processed message successfully, EOF to suspend parsing + * (the latter case is not actually used currently). + * In either case, conn->inStart has been advanced past the message. * - * Note that if we run out of data, we have to release the partially - * constructed PGresult, and rebuild it again next time. Fortunately, - * that shouldn't happen often, since 'T' messages usually fit in a packet. + * Note: the row processor could also choose to longjmp out of libpq, + * in which case the library's state must allow for resumption at the + * next message. */ static int -getRowDescriptions(PGconn *conn) +getRowDescriptions(PGconn *conn, int msgLength) { PGresult *result; int nfields; + const char *errmsg; int i; /* @@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn) else result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK); if (!result) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } /* parseInput already read the 'T' label and message length. */ /* the next two bytes are the number of fields */ if (pqGetInt(&(result->numAttributes), 2, conn)) - goto failure; + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"T\" message"); + goto advance_and_error; + } nfields = result->numAttributes; /* allocate space for the attribute descriptors */ @@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn) result->attDescs = (PGresAttDesc *) pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); if (!result->attDescs) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); } @@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn) pqGetInt(&atttypmod, 4, conn) || pqGetInt(&format, 2, conn)) { - goto failure; + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"T\" message"); + goto advance_and_error; } /* @@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn) result->attDescs[i].name = pqResultStrdup(result, conn->workBuffer.data); if (!result->attDescs[i].name) - goto failure; + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } result->attDescs[i].tableid = tableid; result->attDescs[i].columnid = columnid; result->attDescs[i].format = format; @@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn) result->binary = 0; } + /* Sanity check that we absorbed all the data */ + if (conn->inCursor != conn->inStart + 5 + msgLength) + { + errmsg = libpq_gettext("extraneous data in \"T\" message"); + goto advance_and_error; + } + /* Success! */ conn->result = result; - return 0; -failure: + /* + * Advance inStart to show that the "T" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; /* - * Discard incomplete result, unless it's from getParamDescriptions. - * - * Note that if we hit a bufferload boundary while handling the - * describe-statement case, we'll forget any PGresult space we just - * allocated, and then reallocate it on next try. This will bloat the - * PGresult a little bit but the space will be freed at PQclear, so it - * doesn't seem worth trying to be smarter. + * If we're doing a Describe, we're done, and ready to pass the result + * back to the client. */ - if (result != conn->result) + if (conn->queryclass == PGQUERY_DESCRIBE) + { + conn->asyncStatus = PGASYNC_READY; + return 0; + } + + /* Give the row processor a chance to initialize for new result set */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, NULL, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* Discard unsaved result, if any */ + if (result && result != conn->result) PQclear(result); - return EOF; + + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; + +set_error_result: + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + pqSaveErrorResult(conn); + + /* + * Return zero to allow input parsing to continue. Subsequent "D" + * messages will be ignored until we get to end of data, since an error + * result is already set up. + */ + return 0; } /* @@ -613,47 +691,53 @@ failure: /* * parseInput subroutine to read a 'D' (row data) message. - * We add another tuple to the existing PGresult structure. - * Returns: 0 if completed message, EOF if error or not enough data yet. + * We fill rowbuf with column pointers and then call the row processor. + * Returns: 0 if processed message successfully, EOF to suspend parsing + * (the latter case is not actually used currently). + * In either case, conn->inStart has been advanced past the message. * - * Note that if we run out of data, we have to suspend and reprocess - * the message after more data is received. We keep a partially constructed - * tuple in conn->curTuple, and avoid reallocating already-allocated storage. + * Note: the row processor could also choose to longjmp out of libpq, + * in which case the library's state must allow for resumption at the + * next message. */ static int getAnotherTuple(PGconn *conn, int msgLength) { PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + const char *errmsg; + PGdataValue *rowbuf; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; - /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) - return EOF; + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; + } if (tupnfields != nfields) { - /* Replace partially constructed result with an error result */ - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("unexpected field count in \"D\" message\n")); - pqSaveErrorResult(conn); - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; - return 0; + errmsg = libpq_gettext("unexpected field count in \"D\" message"); + goto advance_and_error; + } + + /* Resize row buffer if needed */ + rowbuf = conn->rowBuf; + if (nfields > conn->rowBufLen) + { + rowbuf = (PGdataValue *) realloc(rowbuf, + nfields * sizeof(PGdataValue)); + if (!rowbuf) + { + errmsg = NULL; /* means "out of memory", see below */ + goto advance_and_error; + } + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; } /* Scan the fields */ @@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength) { /* get the value length */ if (pqGetInt(&vlen, 4, conn)) - return EOF; - if (vlen == -1) { - /* null field */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - continue; + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; } - if (vlen < 0) - vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); + rowbuf[i].len = vlen; - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ + /* + * rowbuf[i].value always points to the next address in the data + * buffer even if the value is NULL. This allows row processors to + * estimate data sizes more easily. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + + /* Skip over the data value */ if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; + { + if (pqSkipnchar(vlen, conn)) + { + /* We should not run out of data here, so complain */ + errmsg = libpq_gettext("insufficient data in \"D\" message"); + goto advance_and_error; + } + } } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* Sanity check that we absorbed all the data */ + if (conn->inCursor != conn->inStart + 5 + msgLength) + { + errmsg = libpq_gettext("extraneous data in \"D\" message"); + goto advance_and_error; + } - return 0; + /* + * Advance inStart to show that the "D" message has been processed. We + * must do this before calling the row processor, in case it longjmps. + */ + conn->inStart = conn->inCursor; -outOfMemory: + /* Pass the completed row values to rowProcessor */ + errmsg = NULL; + switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, + conn->rowProcessorParam)) + { + case 1: + /* everything is good */ + return 0; + + case -1: + /* error, report the errmsg below */ + break; + + default: + /* unrecognized return code */ + errmsg = libpq_gettext("unrecognized return value from row processor"); + break; + } + goto set_error_result; + +advance_and_error: + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; + +set_error_result: /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + + /* + * If row processor didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory for query result"); + + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); pqSaveErrorResult(conn); - /* Discard the failed message by pretending we read it */ - conn->inCursor = conn->inStart + 5 + msgLength; + /* + * Return zero to allow input parsing to continue. Subsequent "D" + * messages will be ignored until we get to end of data, since an error + * result is already set up. + */ return 0; } |