aboutsummaryrefslogtreecommitdiff
path: root/src/backend/libpq/pqmq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/libpq/pqmq.c')
-rw-r--r--src/backend/libpq/pqmq.c16
1 files changed, 11 insertions, 5 deletions
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index f1a08bc32ca..5f39949a367 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -23,7 +23,7 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
-static shm_mq_handle *pq_mq_handle;
+static shm_mq_handle *pq_mq_handle = NULL;
static bool pq_mq_busy = false;
static pid_t pq_mq_parallel_leader_pid = 0;
static ProcNumber pq_mq_parallel_leader_proc_number = INVALID_PROC_NUMBER;
@@ -66,7 +66,11 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
static void
pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
{
- pq_mq_handle = NULL;
+ if (pq_mq_handle != NULL)
+ {
+ pfree(pq_mq_handle);
+ pq_mq_handle = NULL;
+ }
whereToSendOutput = DestNone;
}
@@ -131,8 +135,11 @@ mq_putmessage(char msgtype, const char *s, size_t len)
if (pq_mq_busy)
{
if (pq_mq_handle != NULL)
+ {
shm_mq_detach(pq_mq_handle);
- pq_mq_handle = NULL;
+ pfree(pq_mq_handle);
+ pq_mq_handle = NULL;
+ }
return EOF;
}
@@ -152,8 +159,6 @@ mq_putmessage(char msgtype, const char *s, size_t len)
iov[1].data = s;
iov[1].len = len;
- Assert(pq_mq_handle != NULL);
-
for (;;)
{
/*
@@ -161,6 +166,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
* that the shared memory value is updated before we send the parallel
* message signal right after this.
*/
+ Assert(pq_mq_handle != NULL);
result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
if (pq_mq_parallel_leader_pid != 0)