aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/tqueue.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-09-18 21:10:08 -0400
committerRobert Haas <rhaas@postgresql.org>2015-09-18 21:56:58 -0400
commit4a4e6893aa080b9094dadbe0e65f8a75fee41ac6 (patch)
treebdce94c526a3f138660c33f4fa39a2589c9175f6 /src/backend/executor/tqueue.c
parentc00c3249e3247d24751d97ff6f26603810593414 (diff)
downloadpostgresql-4a4e6893aa080b9094dadbe0e65f8a75fee41ac6.tar.gz
postgresql-4a4e6893aa080b9094dadbe0e65f8a75fee41ac6.zip
Glue layer to connect the executor to the shm_mq mechanism.
The shm_mq mechanism was built to send error (and notice) messages and tuples between backends. However, shm_mq itself only deals in raw bytes. Since commit 2bd9e412f92bc6a68f3e8bcb18e04955cc35001d, we have had infrastructure for one message to redirect protocol messages to a queue and for another backend to parse them and do useful things with them. This commit introduces a somewhat analogous facility for tuples by adding a new type of DestReceiver, DestTupleQueue, which writes each tuple generated by a query into a shm_mq, and a new TupleQueueFunnel facility which reads raw tuples out of the queue and reconstructs the HeapTuple format expected by the executor. The TupleQueueFunnel abstraction supports reading from multiple tuple streams at the same time, but only in round-robin fashion. Someone could imaginably want other policies, but this should be good enough to meet our short-term needs related to parallel query, and we can always extend it later. This also makes one minor addition to the shm_mq API that didn' seem worth breaking out as a separate patch. Extracted from Amit Kapila's parallel sequential scan patch. This code was originally written by me, and then it was revised by Amit, and then it was revised some more by me.
Diffstat (limited to 'src/backend/executor/tqueue.c')
-rw-r--r--src/backend/executor/tqueue.c262
1 files changed, 262 insertions, 0 deletions
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
new file mode 100644
index 00000000000..d0edf4e5595
--- /dev/null
+++ b/src/backend/executor/tqueue.c
@@ -0,0 +1,262 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.c
+ * Use shm_mq to send & receive tuples between parallel backends
+ *
+ * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
+ * under the hood, writes tuples from the executor to a shm_mq.
+ *
+ * A TupleQueueFunnel helps manage the process of reading tuples from
+ * one or more shm_mq objects being used as tuple queues.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/tqueue.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/tqueue.h"
+#include "miscadmin.h"
+
+typedef struct
+{
+ DestReceiver pub;
+ shm_mq_handle *handle;
+} TQueueDestReceiver;
+
+struct TupleQueueFunnel
+{
+ int nqueues;
+ int maxqueues;
+ int nextqueue;
+ shm_mq_handle **queue;
+};
+
+/*
+ * Receive a tuple.
+ */
+static void
+tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+{
+ TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+ HeapTuple tuple;
+
+ tuple = ExecMaterializeSlot(slot);
+ shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+}
+
+/*
+ * Prepare to receive tuples from executor.
+ */
+static void
+tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ /* do nothing */
+}
+
+/*
+ * Clean up at end of an executor run
+ */
+static void
+tqueueShutdownReceiver(DestReceiver *self)
+{
+ /* do nothing */
+}
+
+/*
+ * Destroy receiver when done with it
+ */
+static void
+tqueueDestroyReceiver(DestReceiver *self)
+{
+ pfree(self);
+}
+
+/*
+ * Create a DestReceiver that writes tuples to a tuple queue.
+ */
+DestReceiver *
+CreateTupleQueueDestReceiver(shm_mq_handle *handle)
+{
+ TQueueDestReceiver *self;
+
+ self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
+
+ self->pub.receiveSlot = tqueueReceiveSlot;
+ self->pub.rStartup = tqueueStartupReceiver;
+ self->pub.rShutdown = tqueueShutdownReceiver;
+ self->pub.rDestroy = tqueueDestroyReceiver;
+ self->pub.mydest = DestTupleQueue;
+ self->handle = handle;
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * Create a tuple queue funnel.
+ */
+TupleQueueFunnel *
+CreateTupleQueueFunnel(void)
+{
+ TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+
+ funnel->maxqueues = 8;
+ funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+
+ return funnel;
+}
+
+/*
+ * Destroy a tuple queue funnel.
+ */
+void
+DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+{
+ int i;
+
+ for (i = 0; i < funnel->nqueues; i++)
+ shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
+ pfree(funnel->queue);
+ pfree(funnel);
+}
+
+/*
+ * Remember the shared memory queue handle in funnel.
+ */
+void
+RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+{
+ if (funnel->nqueues < funnel->maxqueues)
+ {
+ funnel->queue[funnel->nqueues++] = handle;
+ return;
+ }
+
+ if (funnel->nqueues >= funnel->maxqueues)
+ {
+ int newsize = funnel->nqueues * 2;
+
+ Assert(funnel->nqueues == funnel->maxqueues);
+
+ funnel->queue = repalloc(funnel->queue,
+ newsize * sizeof(shm_mq_handle *));
+ funnel->maxqueues = newsize;
+ }
+
+ funnel->queue[funnel->nqueues++] = handle;
+}
+
+/*
+ * Fetch a tuple from a tuple queue funnel.
+ *
+ * We try to read from the queues in round-robin fashion so as to avoid
+ * the situation where some workers get their tuples read expediently while
+ * others are barely ever serviced.
+ *
+ * Even when nowait = false, we read from the individual queues in
+ * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
+ * it can still accumulate bytes from a partially-read message, so doing it
+ * this way should outperform doing a blocking read on each queue in turn.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no queue returned a tuple without blocking. *done, if
+ * not NULL, is set to true when there are no remaining queues and false in
+ * any other case.
+ */
+HeapTuple
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+{
+ int waitpos = funnel->nextqueue;
+
+ /* Corner case: called before adding any queues, or after all are gone. */
+ if (funnel->nqueues == 0)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
+ }
+
+ if (done != NULL)
+ *done = false;
+
+ for (;;)
+ {
+ shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+ shm_mq_result result;
+ Size nbytes;
+ void *data;
+
+ /* Attempt to read a message. */
+ result = shm_mq_receive(mqh, &nbytes, &data, true);
+
+ /*
+ * Normally, we advance funnel->nextqueue to the next queue at this
+ * point, but if we're pointing to a queue that we've just discovered
+ * is detached, then forget that queue and leave the pointer where it
+ * is until the number of remaining queues fall below that pointer and
+ * at that point make the pointer point to the first queue.
+ */
+ if (result != SHM_MQ_DETACHED)
+ funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+ else
+ {
+ --funnel->nqueues;
+ if (funnel->nqueues == 0)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
+ }
+
+ memmove(&funnel->queue[funnel->nextqueue],
+ &funnel->queue[funnel->nextqueue + 1],
+ sizeof(shm_mq_handle *)
+ * (funnel->nqueues - funnel->nextqueue));
+
+ if (funnel->nextqueue >= funnel->nqueues)
+ funnel->nextqueue = 0;
+
+ if (funnel->nextqueue < waitpos)
+ --waitpos;
+
+ continue;
+ }
+
+ /* If we got a message, return it. */
+ if (result == SHM_MQ_SUCCESS)
+ {
+ HeapTupleData htup;
+
+ /*
+ * The tuple data we just read from the queue is only valid until
+ * we again attempt to read from it. Copy the tuple into a single
+ * palloc'd chunk as callers will expect.
+ */
+ ItemPointerSetInvalid(&htup.t_self);
+ htup.t_tableOid = InvalidOid;
+ htup.t_len = nbytes;
+ htup.t_data = data;
+ return heap_copytuple(&htup);
+ }
+
+ /*
+ * If we've visited all of the queues, then we should either give up
+ * and return NULL (if we're in non-blocking mode) or wait for the
+ * process latch to be set (otherwise).
+ */
+ if (funnel->nextqueue == waitpos)
+ {
+ if (nowait)
+ return NULL;
+ WaitLatch(MyLatch, WL_LATCH_SET, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(MyLatch);
+ }
+ }
+}