aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/executor/nodeGather.c5
-rw-r--r--src/backend/libpq/pqmq.c2
-rw-r--r--src/backend/storage/ipc/shm_mq.c18
-rw-r--r--src/include/storage/latch.h16
-rw-r--r--src/test/modules/test_shm_mq/setup.c6
-rw-r--r--src/test/modules/test_shm_mq/test.c2
6 files changed, 32 insertions, 17 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 93a566ba629..438d1b24fc2 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -330,8 +330,8 @@ gather_readnext(GatherState *gatherstate)
HeapTuple tup;
bool readerdone;
- /* Make sure we've read all messages from workers. */
- HandleParallelMessages();
+ /* Check for async events, particularly messages from workers. */
+ CHECK_FOR_INTERRUPTS();
/* Attempt to read a tuple, but don't block if none is available. */
reader = gatherstate->reader[gatherstate->nextreader];
@@ -388,7 +388,6 @@ gather_readnext(GatherState *gatherstate)
/* Nothing to do except wait for developments. */
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch);
nvisited = 0;
}
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 0dcdee03db5..921242fbc4e 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -172,8 +172,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
break;
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
- CHECK_FOR_INTERRUPTS();
ResetLatch(&MyProc->procLatch);
+ CHECK_FOR_INTERRUPTS();
}
pq_mq_busy = false;
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 44ede336162..5b32782022b 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -896,11 +896,11 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
*/
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- /* An interrupt may have occurred while we were waiting. */
- CHECK_FOR_INTERRUPTS();
-
/* Reset the latch so we don't spin. */
ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
}
else
{
@@ -993,11 +993,11 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
*/
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- /* An interrupt may have occurred while we were waiting. */
- CHECK_FOR_INTERRUPTS();
-
/* Reset the latch so we don't spin. */
ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
}
}
@@ -1092,11 +1092,11 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
/* Wait to be signalled. */
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- /* An interrupt may have occurred while we were waiting. */
- CHECK_FOR_INTERRUPTS();
-
/* Reset the latch so we don't spin. */
ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
}
return result;
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 85d211c0e1c..5179ecc0dbd 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -52,6 +52,22 @@
* do. Otherwise, if someone sets the latch between the check and the
* ResetLatch call, you will miss it and Wait will incorrectly block.
*
+ * Another valid coding pattern looks like:
+ *
+ * for (;;)
+ * {
+ * if (work to do)
+ * Do Stuff(); // in particular, exit loop if some condition satisfied
+ * WaitLatch();
+ * ResetLatch();
+ * }
+ *
+ * This is useful to reduce latch traffic if it's expected that the loop's
+ * termination condition will often be satisfied in the first iteration;
+ * the cost is an extra loop iteration before blocking when it is not.
+ * What must be avoided is placing any checks for asynchronous events after
+ * WaitLatch and before ResetLatch, as that creates a race condition.
+ *
* To wake up the waiter, you must first set a global flag or something
* else that the wait loop tests in the "if (work to do)" part, and call
* SetLatch *after* that. SetLatch is designed to return quickly if the
diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c
index 5bd282078cc..143df4eb651 100644
--- a/src/test/modules/test_shm_mq/setup.c
+++ b/src/test/modules/test_shm_mq/setup.c
@@ -281,11 +281,11 @@ wait_for_workers_to_become_ready(worker_state *wstate,
/* Wait to be signalled. */
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- /* An interrupt may have occurred while we were waiting. */
- CHECK_FOR_INTERRUPTS();
-
/* Reset the latch so we don't spin. */
ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
}
if (!result)
diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c
index 6948e208996..dd34bc7e7f0 100644
--- a/src/test/modules/test_shm_mq/test.c
+++ b/src/test/modules/test_shm_mq/test.c
@@ -231,8 +231,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
* for us to do.
*/
WaitLatch(MyLatch, WL_LATCH_SET, 0);
- CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
}
}