aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c14
-rw-r--r--src/backend/libpq/auth.c3
-rw-r--r--src/backend/libpq/pqcomm.c76
-rw-r--r--src/backend/postmaster/postmaster.c2
-rw-r--r--src/backend/replication/walsender.c35
-rw-r--r--src/backend/storage/lmgr/proc.c7
-rw-r--r--src/backend/tcop/fastpath.c29
-rw-r--r--src/backend/tcop/postgres.c166
-rw-r--r--src/backend/utils/error/elog.c1
-rw-r--r--src/backend/utils/init/globals.c1
-rw-r--r--src/include/libpq/libpq.h3
-rw-r--r--src/include/miscadmin.h13
-rw-r--r--src/include/tcop/fastpath.h1
13 files changed, 244 insertions, 107 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8cb2f13b278..92ff632e124 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,8 @@ ReceiveCopyBegin(CopyState cstate)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('G');
+ /* any error in old protocol will make us lose sync */
+ pq_startmsgread();
cstate->copy_dest = COPY_OLD_FE;
}
else
@@ -420,6 +422,8 @@ ReceiveCopyBegin(CopyState cstate)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('D');
+ /* any error in old protocol will make us lose sync */
+ pq_startmsgread();
cstate->copy_dest = COPY_OLD_FE;
}
/* We *must* flush here to ensure FE knows it can send. */
@@ -606,6 +610,8 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
int mtype;
readmessage:
+ HOLD_CANCEL_INTERRUPTS();
+ pq_startmsgread();
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
@@ -615,6 +621,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection with an open transaction")));
+ RESUME_CANCEL_INTERRUPTS();
switch (mtype)
{
case 'd': /* CopyData */
@@ -2463,6 +2470,13 @@ CopyFrom(CopyState cstate)
MemoryContextSwitchTo(oldcontext);
+ /*
+ * In the old protocol, tell pqcomm that we can process normal protocol
+ * messages again.
+ */
+ if (cstate->copy_dest == COPY_OLD_FE)
+ pq_endmsgread();
+
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c
index e9c8e569e2a..3f3cf4485ac 100644
--- a/src/backend/libpq/auth.c
+++ b/src/backend/libpq/auth.c
@@ -625,6 +625,7 @@ recv_password_packet(Port *port)
{
StringInfoData buf;
+ pq_startmsgread();
if (PG_PROTOCOL_MAJOR(port->proto) >= 3)
{
/* Expect 'p' message type */
@@ -849,6 +850,7 @@ pg_GSS_recvauth(Port *port)
*/
do
{
+ pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
{
@@ -1083,6 +1085,7 @@ pg_SSPI_recvauth(Port *port)
*/
do
{
+ pq_startmsgread();
mtype = pq_getbyte();
if (mtype != 'p')
{
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index e3efac34ce4..254fd8285b0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -127,8 +127,9 @@ static int PqRecvLength; /* End of data available in PqRecvBuffer */
/*
* Message status
*/
-static bool PqCommBusy;
-static bool DoingCopyOut;
+static bool PqCommBusy; /* busy sending data to the client */
+static bool PqCommReadingMsg; /* in the middle of reading a message */
+static bool DoingCopyOut; /* in old-protocol COPY OUT processing */
/* Internal functions */
@@ -177,6 +178,7 @@ pq_init(void)
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
+ PqCommReadingMsg = false;
DoingCopyOut = false;
on_proc_exit(socket_close, 0);
}
@@ -916,6 +918,8 @@ pq_recvbuf(void)
int
pq_getbyte(void)
{
+ Assert(PqCommReadingMsg);
+
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
@@ -954,6 +958,8 @@ pq_getbyte_if_available(unsigned char *c)
{
int r;
+ Assert(PqCommReadingMsg);
+
if (PqRecvPointer < PqRecvLength)
{
*c = PqRecvBuffer[PqRecvPointer++];
@@ -1006,6 +1012,8 @@ pq_getbytes(char *s, size_t len)
{
size_t amount;
+ Assert(PqCommReadingMsg);
+
while (len > 0)
{
while (PqRecvPointer >= PqRecvLength)
@@ -1038,6 +1046,8 @@ pq_discardbytes(size_t len)
{
size_t amount;
+ Assert(PqCommReadingMsg);
+
while (len > 0)
{
while (PqRecvPointer >= PqRecvLength)
@@ -1074,6 +1084,8 @@ pq_getstring(StringInfo s)
{
int i;
+ Assert(PqCommReadingMsg);
+
resetStringInfo(s);
/* Read until we get the terminating '\0' */
@@ -1106,6 +1118,58 @@ pq_getstring(StringInfo s)
/* --------------------------------
+ * pq_startmsgread - begin reading a message from the client.
+ *
+ * This must be called before any of the pq_get* functions.
+ * --------------------------------
+ */
+void
+pq_startmsgread(void)
+{
+ /*
+ * There shouldn't be a read active already, but let's check just to be
+ * sure.
+ */
+ if (PqCommReadingMsg)
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("terminating connection because protocol sync was lost")));
+
+ PqCommReadingMsg = true;
+}
+
+
+/* --------------------------------
+ * pq_endmsgread - finish reading message.
+ *
+ * This must be called after reading a V2 protocol message with
+ * pq_getstring() and friends, to indicate that we have read the whole
+ * message. In V3 protocol, pq_getmessage() does this implicitly.
+ * --------------------------------
+ */
+void
+pq_endmsgread(void)
+{
+ Assert(PqCommReadingMsg);
+
+ PqCommReadingMsg = false;
+}
+
+/* --------------------------------
+ * pq_is_reading_msg - are we currently reading a message?
+ *
+ * This is used in error recovery at the outer idle loop to detect if we have
+ * lost protocol sync, and need to terminate the connection. pq_startmsgread()
+ * will check for that too, but it's nicer to detect it earlier.
+ * --------------------------------
+ */
+bool
+pq_is_reading_msg(void)
+{
+ return PqCommReadingMsg;
+}
+
+/* --------------------------------
* pq_getmessage - get a message with length word from connection
*
* The return value is placed in an expansible StringInfo, which has
@@ -1126,6 +1190,8 @@ pq_getmessage(StringInfo s, int maxlen)
{
int32 len;
+ Assert(PqCommReadingMsg);
+
resetStringInfo(s);
/* Read message length word */
@@ -1167,6 +1233,9 @@ pq_getmessage(StringInfo s, int maxlen)
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("incomplete message from client")));
+
+ /* we discarded the rest of the message so we're back in sync. */
+ PqCommReadingMsg = false;
PG_RE_THROW();
}
PG_END_TRY();
@@ -1184,6 +1253,9 @@ pq_getmessage(StringInfo s, int maxlen)
s->data[len] = '\0';
}
+ /* finished reading the message. */
+ PqCommReadingMsg = false;
+
return 0;
}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index fe6316ecbe9..36b8267fa50 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1761,6 +1761,7 @@ ProcessStartupPacket(Port *port, bool SSLdone)
ProtocolVersion proto;
MemoryContext oldcontext;
+ pq_startmsgread();
if (pq_getbytes((char *) &len, 4) == EOF)
{
/*
@@ -1805,6 +1806,7 @@ ProcessStartupPacket(Port *port, bool SSLdone)
errmsg("incomplete startup packet")));
return STATUS_ERROR;
}
+ pq_endmsgread();
/*
* The first field is either a protocol version number or a special
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 05d2339b150..25fcbca39e9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1357,6 +1357,7 @@ ProcessRepliesIfAny(void)
for (;;)
{
+ pq_startmsgread();
r = pq_getbyte_if_available(&firstchar);
if (r < 0)
{
@@ -1369,9 +1370,20 @@ ProcessRepliesIfAny(void)
if (r == 0)
{
/* no data available without blocking */
+ pq_endmsgread();
break;
}
+ /* Read the message contents */
+ resetStringInfo(&reply_message);
+ if (pq_getmessage(&reply_message, 0))
+ {
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+
/*
* If we already received a CopyDone from the frontend, the frontend
* should not send us anything until we've closed our end of the COPY.
@@ -1407,16 +1419,6 @@ ProcessRepliesIfAny(void)
streamingDoneSending = true;
}
- /* consume the CopyData message */
- resetStringInfo(&reply_message);
- if (pq_getmessage(&reply_message, 0))
- {
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
-
streamingDoneReceiving = true;
received = true;
break;
@@ -1453,19 +1455,6 @@ ProcessStandbyMessage(void)
{
char msgtype;
- resetStringInfo(&reply_message);
-
- /*
- * Read the message contents.
- */
- if (pq_getmessage(&reply_message, 0))
- {
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
-
/*
* Check message type from the first byte.
*/
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 65e8afe457f..24636223b1d 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -655,11 +655,16 @@ LockErrorCleanup(void)
LWLock *partitionLock;
DisableTimeoutParams timeouts[2];
+ HOLD_INTERRUPTS();
+
AbortStrongLockAcquire();
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
+ {
+ RESUME_INTERRUPTS();
return;
+ }
/*
* Turn off the deadlock and lock timeout timers, if they are still
@@ -709,6 +714,8 @@ LockErrorCleanup(void)
* wakeup signal isn't harmful, and it seems not worth expending cycles to
* get rid of a signal that most likely isn't there.
*/
+
+ RESUME_INTERRUPTS();
}
diff --git a/src/backend/tcop/fastpath.c b/src/backend/tcop/fastpath.c
index 042956096e4..ce3b9ebdca3 100644
--- a/src/backend/tcop/fastpath.c
+++ b/src/backend/tcop/fastpath.c
@@ -75,7 +75,7 @@ static int16 parse_fcall_arguments_20(StringInfo msgBuf, struct fp_info * fip,
* The caller should already have initialized buf to empty.
* ----------------
*/
-static int
+int
GetOldFunctionMessage(StringInfo buf)
{
int32 ibuf;
@@ -281,33 +281,6 @@ HandleFunctionRequest(StringInfo msgBuf)
char msec_str[32];
/*
- * Read message contents if not already done.
- */
- if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
- {
- if (GetOldFunctionMessage(msgBuf))
- {
- if (IsTransactionState())
- ereport(COMMERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on client connection with an open transaction")));
- else
- {
- /*
- * Can't send DEBUG log messages to client at this point.
- * Since we're disconnecting right away, we don't need to
- * restore whereToSendOutput.
- */
- whereToSendOutput = DestNone;
- ereport(DEBUG1,
- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
- errmsg("unexpected EOF on client connection")));
- }
- return EOF;
- }
- }
-
- /*
* Now that we've eaten the input message, check to see if we actually
* want to do the function call or not. It's now safe to ereport(); we
* won't lose sync with the frontend.
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f743536cff..b82c3b333bd 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -331,6 +331,8 @@ SocketBackend(StringInfo inBuf)
/*
* Get message type code from the frontend.
*/
+ HOLD_CANCEL_INTERRUPTS();
+ pq_startmsgread();
qtype = pq_getbyte();
if (qtype == EOF) /* frontend disconnected */
@@ -379,7 +381,7 @@ SocketBackend(StringInfo inBuf)
{
/*
* Can't send DEBUG log messages to client at this
- * point.Since we're disconnecting right away, we
+ * point. Since we're disconnecting right away, we
* don't need to restore whereToSendOutput.
*/
whereToSendOutput = DestNone;
@@ -393,8 +395,30 @@ SocketBackend(StringInfo inBuf)
break;
case 'F': /* fastpath function call */
- /* we let fastpath.c cope with old-style input of this */
doing_extended_query_message = false;
+ if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3)
+ {
+ if (GetOldFunctionMessage(inBuf))
+ {
+ if (IsTransactionState())
+ ereport(COMMERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("unexpected EOF on client connection with an open transaction")));
+ else
+ {
+ /*
+ * Can't send DEBUG log messages to client at this
+ * point. Since we're disconnecting right away, we
+ * don't need to restore whereToSendOutput.
+ */
+ whereToSendOutput = DestNone;
+ ereport(DEBUG1,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("unexpected EOF on client connection")));
+ }
+ return EOF;
+ }
+ }
break;
case 'X': /* terminate */
@@ -462,6 +486,9 @@ SocketBackend(StringInfo inBuf)
if (pq_getmessage(inBuf, 0))
return EOF; /* suitable message already logged */
}
+ else
+ pq_endmsgread();
+ RESUME_CANCEL_INTERRUPTS();
return qtype;
}
@@ -506,7 +533,7 @@ prepare_for_client_read(void)
EnableNotifyInterrupt();
EnableCatchupInterrupt();
- /* Allow cancel/die interrupts to be processed while waiting */
+ /* Allow die interrupts to be processed while waiting */
ImmediateInterruptOK = true;
/* And don't forget to detect one that already arrived */
@@ -2589,21 +2616,11 @@ die(SIGNAL_ARGS)
ProcDiePending = true;
/*
- * If it's safe to interrupt, and we're waiting for input or a lock,
- * service the interrupt immediately
+ * If we're waiting for input or a lock so that it's safe to
+ * interrupt, service the interrupt immediately
*/
- if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
- CritSectionCount == 0)
- {
- /* bump holdoff count to make ProcessInterrupts() a no-op */
- /* until we are done getting ready for it */
- InterruptHoldoffCount++;
- LockErrorCleanup(); /* prevent CheckDeadLock from running */
- DisableNotifyInterrupt();
- DisableCatchupInterrupt();
- InterruptHoldoffCount--;
+ if (ImmediateInterruptOK)
ProcessInterrupts();
- }
}
/* If we're still here, waken anything waiting on the process latch */
@@ -2630,21 +2647,11 @@ StatementCancelHandler(SIGNAL_ARGS)
QueryCancelPending = true;
/*
- * If it's safe to interrupt, and we're waiting for input or a lock,
- * service the interrupt immediately
+ * If we're waiting for input or a lock so that it's safe to
+ * interrupt, service the interrupt immediately
*/
- if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
- CritSectionCount == 0)
- {
- /* bump holdoff count to make ProcessInterrupts() a no-op */
- /* until we are done getting ready for it */
- InterruptHoldoffCount++;
- LockErrorCleanup(); /* prevent CheckDeadLock from running */
- DisableNotifyInterrupt();
- DisableCatchupInterrupt();
- InterruptHoldoffCount--;
+ if (ImmediateInterruptOK)
ProcessInterrupts();
- }
}
/* If we're still here, waken anything waiting on the process latch */
@@ -2787,21 +2794,11 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
RecoveryConflictRetryable = false;
/*
- * If it's safe to interrupt, and we're waiting for input or a lock,
- * service the interrupt immediately
+ * If we're waiting for input or a lock so that it's safe to
+ * interrupt, service the interrupt immediately.
*/
- if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
- CritSectionCount == 0)
- {
- /* bump holdoff count to make ProcessInterrupts() a no-op */
- /* until we are done getting ready for it */
- InterruptHoldoffCount++;
- LockErrorCleanup(); /* prevent CheckDeadLock from running */
- DisableNotifyInterrupt();
- DisableCatchupInterrupt();
- InterruptHoldoffCount--;
+ if (ImmediateInterruptOK)
ProcessInterrupts();
- }
}
/*
@@ -2826,15 +2823,17 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
void
ProcessInterrupts(void)
{
- /* OK to accept interrupt now? */
+ /* OK to accept any interrupts now? */
if (InterruptHoldoffCount != 0 || CritSectionCount != 0)
return;
InterruptPending = false;
+
if (ProcDiePending)
{
ProcDiePending = false;
QueryCancelPending = false; /* ProcDie trumps QueryCancel */
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* As in quickdie, don't risk sending to client during auth */
@@ -2871,6 +2870,7 @@ ProcessInterrupts(void)
{
QueryCancelPending = false; /* lost connection trumps QueryCancel */
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* don't send to client, we already know the connection to be dead. */
@@ -2879,12 +2879,53 @@ ProcessInterrupts(void)
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection to client lost")));
}
+
+ /*
+ * If a recovery conflict happens while we are waiting for input from the
+ * client, the client is presumably just sitting idle in a transaction,
+ * preventing recovery from making progress. Terminate the connection to
+ * dislodge it.
+ */
+ if (RecoveryConflictPending && DoingCommandRead)
+ {
+ QueryCancelPending = false; /* this trumps QueryCancel */
+ ImmediateInterruptOK = false; /* not idle anymore */
+ RecoveryConflictPending = false;
+ LockErrorCleanup();
+ DisableNotifyInterrupt();
+ DisableCatchupInterrupt();
+ pgstat_report_recovery_conflict(RecoveryConflictReason);
+ ereport(FATAL,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("terminating connection due to conflict with recovery"),
+ errdetail_recovery_conflict(),
+ errhint("In a moment you should be able to reconnect to the"
+ " database and repeat your command.")));
+ }
+
if (QueryCancelPending)
{
+ /*
+ * Don't allow query cancel interrupts while reading input from the
+ * client, because we might lose sync in the FE/BE protocol. (Die
+ * interrupts are OK, because we won't read any further messages from
+ * the client in that case.)
+ */
+ if (QueryCancelHoldoffCount != 0)
+ {
+ /*
+ * Re-arm InterruptPending so that we process the cancel request
+ * as soon as we're done reading the message.
+ */
+ InterruptPending = true;
+ return;
+ }
+
QueryCancelPending = false;
if (ClientAuthInProgress)
{
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* As in quickdie, don't risk sending to client during auth */
@@ -2903,6 +2944,7 @@ ProcessInterrupts(void)
{
ImmediateInterruptOK = false; /* not idle anymore */
(void) get_timeout_indicator(STATEMENT_TIMEOUT, true);
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR,
@@ -2912,6 +2954,7 @@ ProcessInterrupts(void)
if (get_timeout_indicator(STATEMENT_TIMEOUT, true))
{
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR,
@@ -2921,6 +2964,7 @@ ProcessInterrupts(void)
if (IsAutoVacuumWorkerProcess())
{
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR,
@@ -2931,21 +2975,14 @@ ProcessInterrupts(void)
{
ImmediateInterruptOK = false; /* not idle anymore */
RecoveryConflictPending = false;
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
pgstat_report_recovery_conflict(RecoveryConflictReason);
- if (DoingCommandRead)
- ereport(FATAL,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("terminating connection due to conflict with recovery"),
- errdetail_recovery_conflict(),
- errhint("In a moment you should be able to reconnect to the"
- " database and repeat your command.")));
- else
- ereport(ERROR,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("canceling statement due to conflict with recovery"),
- errdetail_recovery_conflict()));
+ errdetail_recovery_conflict()));
}
/*
@@ -2956,6 +2993,7 @@ ProcessInterrupts(void)
if (!DoingCommandRead)
{
ImmediateInterruptOK = false; /* not idle anymore */
+ LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR,
@@ -3862,6 +3900,19 @@ PostgresMain(int argc, char *argv[],
/* We don't have a transaction command open anymore */
xact_started = false;
+ /*
+ * If an error occurred while we were reading a message from the
+ * client, we have potentially lost track of where the previous
+ * message ends and the next one begins. Even though we have
+ * otherwise recovered from the error, we cannot safely read any more
+ * messages from the client, so there isn't much we can do with the
+ * connection anymore.
+ */
+ if (pq_is_reading_msg())
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("terminating connection because protocol sync was lost")));
+
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
@@ -3946,7 +3997,14 @@ PostgresMain(int argc, char *argv[],
/*
* (4) disable async signal conditions again.
+ *
+ * Query cancel is supposed to be a no-op when there is no query in
+ * progress, so if a query cancel arrived while we were idle, just
+ * reset QueryCancelPending. ProcessInterrupts() has that effect when
+ * it's called when DoingCommandRead is set, so check for interrupts
+ * before resetting DoingCommandRead.
*/
+ CHECK_FOR_INTERRUPTS();
DoingCommandRead = false;
/*
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 310c5bbffa0..0f7aa1993a4 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -469,6 +469,7 @@ errfinish(int dummy,...)
* while doing error cleanup.
*/
InterruptHoldoffCount = 0;
+ QueryCancelHoldoffCount = 0;
CritSectionCount = 0; /* should be unnecessary, but... */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index c35867bcfe9..8cf2ead3925 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -32,6 +32,7 @@ volatile bool ProcDiePending = false;
volatile bool ClientConnectionLost = false;
volatile bool ImmediateInterruptOK = false;
volatile uint32 InterruptHoldoffCount = 0;
+volatile uint32 QueryCancelHoldoffCount = 0;
volatile uint32 CritSectionCount = 0;
int MyProcPid;
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 50dd1f081af..af4ba2ab07c 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -78,6 +78,9 @@ extern void TouchSocketFiles(void);
extern void pq_init(void);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
+extern void pq_startmsgread(void);
+extern void pq_endmsgread(void);
+extern bool pq_is_reading_msg(void);
extern int pq_getmessage(StringInfo s, int maxlen);
extern int pq_getbyte(void);
extern int pq_peekbyte(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 6e33a172122..6c68da5f64f 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -52,6 +52,10 @@
* will be held off until CHECK_FOR_INTERRUPTS() is done outside any
* HOLD_INTERRUPTS() ... RESUME_INTERRUPTS() section.
*
+ * There is also a mechanism to prevent query cancel interrupts, while still
+ * allowing die interrupts: HOLD_CANCEL_INTERRUPTS() and
+ * RESUME_CANCEL_INTERRUPTS().
+ *
* Special mechanisms are used to let an interrupt be accepted when we are
* waiting for a lock or when we are waiting for command input (but, of
* course, only if the interrupt holdoff counter is zero). See the
@@ -82,6 +86,7 @@ extern volatile bool ClientConnectionLost;
/* these are marked volatile because they are examined by signal handlers: */
extern PGDLLIMPORT volatile bool ImmediateInterruptOK;
extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
+extern PGDLLIMPORT volatile uint32 QueryCancelHoldoffCount;
extern PGDLLIMPORT volatile uint32 CritSectionCount;
/* in tcop/postgres.c */
@@ -114,6 +119,14 @@ do { \
InterruptHoldoffCount--; \
} while(0)
+#define HOLD_CANCEL_INTERRUPTS() (QueryCancelHoldoffCount++)
+
+#define RESUME_CANCEL_INTERRUPTS() \
+do { \
+ Assert(QueryCancelHoldoffCount > 0); \
+ QueryCancelHoldoffCount--; \
+} while(0)
+
#define START_CRIT_SECTION() (CritSectionCount++)
#define END_CRIT_SECTION() \
diff --git a/src/include/tcop/fastpath.h b/src/include/tcop/fastpath.h
index 6286c0a687f..47028cb113c 100644
--- a/src/include/tcop/fastpath.h
+++ b/src/include/tcop/fastpath.h
@@ -15,6 +15,7 @@
#include "lib/stringinfo.h"
+extern int GetOldFunctionMessage(StringInfo buf);
extern int HandleFunctionRequest(StringInfo msgBuf);
#endif /* FASTPATH_H */