aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/parallel.c6
-rw-r--r--src/backend/executor/tqueue.c9
-rw-r--r--src/backend/libpq/pqmq.c10
-rw-r--r--src/backend/storage/ipc/shm_mq.c43
-rw-r--r--src/include/storage/shm_mq.h4
5 files changed, 51 insertions, 21 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 17b10383e44..ce1b907debd 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/
any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL;
- pfree(pcxt->worker[i].error_mqh);
+ shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
}
@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
{
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
- pfree(pcxt->worker[i].error_mqh);
+ shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
}
@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */
{
- pfree(pcxt->worker[i].error_mqh);
+ shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
break;
}
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 4c4fcf530d7..ee4bec0385e 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- shm_mq_detach(shm_mq_get_queue(tqueue->queue));
+ if (tqueue->queue != NULL)
+ shm_mq_detach(tqueue->queue);
+ tqueue->queue = NULL;
}
/*
@@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+ /* We probably already detached from queue, but let's be sure */
+ if (tqueue->queue != NULL)
+ shm_mq_detach(tqueue->queue);
if (tqueue->tmpcontext != NULL)
MemoryContextDelete(tqueue->tmpcontext);
if (tqueue->recordhtab != NULL)
@@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
void
DestroyTupleQueueReader(TupleQueueReader *reader)
{
- shm_mq_detach(shm_mq_get_queue(reader->queue));
+ shm_mq_detach(reader->queue);
if (reader->typmodmap != NULL)
hash_destroy(reader->typmodmap);
/* Is it worth trying to free substructure of the remap tree? */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 8fbc03819d9..e1a24b62c8f 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -21,7 +21,6 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
-static shm_mq *pq_mq;
static shm_mq_handle *pq_mq_handle;
static bool pq_mq_busy = false;
static pid_t pq_mq_parallel_master_pid = 0;
@@ -56,7 +55,6 @@ void
pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
{
PqCommMethods = &PqCommMqMethods;
- pq_mq = shm_mq_get_queue(mqh);
pq_mq_handle = mqh;
whereToSendOutput = DestRemote;
FrontendProtocol = PG_PROTOCOL_LATEST;
@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
static void
pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
{
- pq_mq = NULL;
pq_mq_handle = NULL;
whereToSendOutput = DestNone;
}
@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
*/
if (pq_mq_busy)
{
- if (pq_mq != NULL)
- shm_mq_detach(pq_mq);
- pq_mq = NULL;
+ if (pq_mq_handle != NULL)
+ shm_mq_detach(pq_mq_handle);
pq_mq_handle = NULL;
return EOF;
}
@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
* be generated late in the shutdown sequence, after all DSMs have already
* been detached.
*/
- if (pq_mq == NULL)
+ if (pq_mq_handle == NULL)
return 0;
pq_mq_busy = true;
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f45a67cc278..770559a03e3 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -83,7 +83,9 @@ struct shm_mq
* This structure is a backend-private handle for access to a queue.
*
* mqh_queue is a pointer to the queue we've attached, and mqh_segment is
- * a pointer to the dynamic shared memory segment that contains it.
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
*
* If this queue is intended to connect the current process with a background
* worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
MemoryContext mqh_context;
};
+static void shm_mq_detach_internal(shm_mq *mq);
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
const void *data, bool nowait, Size *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
mqh->mqh_queue = mq;
mqh->mqh_segment = seg;
- mqh->mqh_buffer = NULL;
mqh->mqh_handle = handle;
+ mqh->mqh_buffer = NULL;
mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0;
- mqh->mqh_context = CurrentMemoryContext;
mqh->mqh_partial_bytes = 0;
+ mqh->mqh_expected_bytes = 0;
mqh->mqh_length_word_complete = false;
mqh->mqh_counterparty_attached = false;
+ mqh->mqh_context = CurrentMemoryContext;
if (seg != NULL)
on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
@@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
}
/*
- * Detach a shared message queue.
+ * Detach from a shared message queue, and destroy the shm_mq_handle.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+ /* Notify counterparty that we're outta here. */
+ shm_mq_detach_internal(mqh->mqh_queue);
+
+ /* Cancel on_dsm_detach callback, if any. */
+ if (mqh->mqh_segment)
+ cancel_on_dsm_detach(mqh->mqh_segment,
+ shm_mq_detach_callback,
+ PointerGetDatum(mqh->mqh_queue));
+
+ /* Release local memory associated with handle. */
+ if (mqh->mqh_buffer != NULL)
+ pfree(mqh->mqh_buffer);
+ pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
*
* The purpose of this function is to make sure that the process
* with which we're communicating doesn't block forever waiting for us to
@@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
* detaches, the receiver can read any messages remaining in the queue;
* further reads will return SHM_MQ_DETACHED. If the receiver detaches,
* further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much. We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
*/
-void
-shm_mq_detach(shm_mq *mq)
+static void
+shm_mq_detach_internal(shm_mq *mq)
{
volatile shm_mq *vmq = mq;
PGPROC *victim;
@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
{
shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
- shm_mq_detach(mq);
+ shm_mq_detach_internal(mq);
}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 02a93e02222..7709efcc483 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
/* Associate worker handle with shm_mq. */
extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
-/* Break connection. */
-extern void shm_mq_detach(shm_mq *);
+/* Break connection, release handle resources. */
+extern void shm_mq_detach(shm_mq_handle *mqh);
/* Get the shm_mq from handle. */
extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);