aboutsummaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-exec.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r--src/interfaces/libpq/fe-exec.c112
1 files changed, 96 insertions, 16 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 919cf5741d4..7f1ab94fd1e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1380,7 +1380,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
- if (conn->asyncStatus == PGASYNC_IDLE)
+ if (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
pqPipelineProcessQueue(conn);
break;
}
@@ -1436,6 +1437,7 @@ static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
+ PGcmdQueueEntry *entry2 = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
@@ -1451,6 +1453,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2 = pqAllocCmdQueueEntry(conn);
+ if (entry2 == NULL)
+ goto sendFailed;
+ }
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1520,6 +1528,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * When pipeline mode is in use, we need a second entry in the command
+ * queue to represent Close Portal message. This allows us later to wait
+ * for the CloseComplete message to be received before getting in IDLE
+ * state.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2->queryclass = PGQUERY_CLOSE;
+ entry2->query = NULL;
+ pqAppendCmdQueueEntry(conn, entry2);
+ }
+
return 1;
sendFailed:
@@ -1767,11 +1789,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
+
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
@@ -2144,16 +2168,21 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF)
- {
- /*
- * We're about to return the NULL that terminates the round of
- * results from the current query; prepare to send the results
- * of the next query when we're called next.
- */
- pqPipelineProcessQueue(conn);
- }
break;
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query, if any, when we're called next. If there's
+ * no next element in the command queue, this gets us in IDLE
+ * state.
+ */
+ pqPipelineProcessQueue(conn);
+ res = NULL; /* query is complete */
+ break;
+
case PGASYNC_READY:
/*
@@ -2174,7 +2203,7 @@ PQgetResult(PGconn *conn)
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
- conn->asyncStatus = PGASYNC_IDLE;
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
@@ -2220,6 +2249,22 @@ PQgetResult(PGconn *conn)
break;
}
+ /* If the next command we expect is CLOSE, read and consume it */
+ if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+ conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ if (res && res->resultStatus != PGRES_FATAL_ERROR)
+ {
+ conn->asyncStatus = PGASYNC_BUSY;
+ parseInput(conn);
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ }
+ else
+ /* we won't ever see the Close */
+ pqCommandQueueAdvance(conn);
+ }
+
/* Time to fire PGEVT_RESULTCREATE events, if there are any */
if (res && res->nEvents > 0)
(void) PQfireResultCreateEvents(conn, res);
@@ -3014,7 +3059,10 @@ PQexitPipelineMode(PGconn *conn)
if (!conn)
return 0;
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+ (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+ conn->cmd_queue_head == NULL)
return 1;
switch (conn->asyncStatus)
@@ -3031,9 +3079,16 @@ PQexitPipelineMode(PGconn *conn)
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
- default:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK */
break;
+
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while in COPY\n"));
}
/* still work to process */
@@ -3070,6 +3125,10 @@ pqCommandQueueAdvance(PGconn *conn)
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
+ /* If the queue is now empty, reset the tail too */
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_tail = NULL;
+
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3092,15 +3151,35 @@ pqPipelineProcessQueue(PGconn *conn)
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
+
case PGASYNC_IDLE:
+ /*
+ * If we're in IDLE mode and there's some command in the queue,
+ * get us into PIPELINE_IDLE mode and process normally. Otherwise
+ * there's nothing for us to do.
+ */
+ if (conn->cmd_queue_head != NULL)
+ {
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ break;
+ }
+ return;
+
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
/* next query please */
break;
}
- /* Nothing to do if not in pipeline mode, or queue is empty */
- if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
- conn->cmd_queue_head == NULL)
+ /*
+ * If there are no further commands to process in the queue, get us in
+ * "real idle" mode now.
+ */
+ if (conn->cmd_queue_head == NULL)
+ {
+ conn->asyncStatus = PGASYNC_IDLE;
return;
+ }
/*
* Reset the error state. This and the next couple of steps correspond to
@@ -3193,6 +3272,7 @@ PQpipelineSync(PGconn *conn)
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}