aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/interfaces/libpq/fe-exec.c53
-rw-r--r--src/interfaces/libpq/fe-protocol3.c9
-rw-r--r--src/interfaces/libpq/libpq-int.h3
3 files changed, 40 insertions, 25 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 04610ccf5e8..b9511df2c26 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2112,19 +2112,12 @@ PQgetResult(PGconn *conn)
break;
case PGASYNC_READY:
-
- /*
- * For any query type other than simple query protocol, we advance
- * the command queue here. This is because for simple query
- * protocol we can get the READY state multiple times before the
- * command is actually complete, since the command string can
- * contain many queries. In simple query protocol, the queue
- * advance is done by fe-protocol3 when it receives ReadyForQuery.
- */
- if (conn->cmd_queue_head &&
- conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
- pqCommandQueueAdvance(conn);
res = pqPrepareAsyncResult(conn);
+
+ /* Advance the queue as appropriate */
+ pqCommandQueueAdvance(conn, false,
+ res->resultStatus == PGRES_PIPELINE_SYNC);
+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
@@ -3088,18 +3081,44 @@ PQexitPipelineMode(PGconn *conn)
/*
* pqCommandQueueAdvance
- * Remove one query from the command queue, when we receive
- * all results from the server that pertain to it.
+ * Remove one query from the command queue, if appropriate.
+ *
+ * If we have received all results corresponding to the head element
+ * in the command queue, remove it.
+ *
+ * In simple query protocol we must not advance the command queue until the
+ * ReadyForQuery message has been received. This is because in simple mode a
+ * command can have multiple queries, and we must process result for all of
+ * them before moving on to the next command.
+ *
+ * Another consideration is synchronization during error processing in
+ * extended query protocol: we refuse to advance the queue past a SYNC queue
+ * element, unless the result we've received is also a SYNC. In particular
+ * this protects us from advancing when an error is received at an
+ * inappropriate moment.
*/
void
-pqCommandQueueAdvance(PGconn *conn)
+pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync)
{
PGcmdQueueEntry *prevquery;
if (conn->cmd_queue_head == NULL)
return;
- /* delink from queue */
+ /*
+ * If processing a query of simple query protocol, we only advance the
+ * queue when we receive the ReadyForQuery message for it.
+ */
+ if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery)
+ return;
+
+ /*
+ * If we're waiting for a SYNC, don't advance the queue until we get one.
+ */
+ if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync)
+ return;
+
+ /* delink element from queue */
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
@@ -3107,7 +3126,7 @@ pqCommandQueueAdvance(PGconn *conn)
if (conn->cmd_queue_head == NULL)
conn->cmd_queue_tail = NULL;
- /* and make it recyclable */
+ /* and make the queue element recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 5613c56b141..8c4ec079caa 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -240,13 +240,8 @@ pqParseInput3(PGconn *conn)
}
else
{
- /*
- * In simple query protocol, advance the command queue
- * (see PQgetResult).
- */
- if (conn->cmd_queue_head &&
- conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
- pqCommandQueueAdvance(conn);
+ /* Advance the command queue and set us idle */
+ pqCommandQueueAdvance(conn, true, false);
conn->asyncStatus = PGASYNC_IDLE;
}
break;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 22bc682ffce..7888199b0d9 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -698,7 +698,8 @@ extern void pqSaveMessageField(PGresult *res, char code,
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
const char *value);
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
-extern void pqCommandQueueAdvance(PGconn *conn);
+extern void pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery,
+ bool gotSync);
extern int PQsendQueryContinue(PGconn *conn, const char *query);
/* === in fe-protocol3.c === */