aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/Makefile2
-rw-r--r--src/backend/executor/tqueue.c262
-rw-r--r--src/backend/storage/ipc/shm_mq.c9
-rw-r--r--src/backend/tcop/dest.c7
-rw-r--r--src/include/executor/tqueue.h31
-rw-r--r--src/include/storage/shm_mq.h3
-rw-r--r--src/include/tcop/dest.h3
-rw-r--r--src/tools/pgindent/typedefs.list1
8 files changed, 316 insertions, 2 deletions
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 08cba6fa2b5..249534bb927 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,6 +24,6 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \
nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
- nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o
+ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
new file mode 100644
index 00000000000..d0edf4e5595
--- /dev/null
+++ b/src/backend/executor/tqueue.c
@@ -0,0 +1,262 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.c
+ * Use shm_mq to send & receive tuples between parallel backends
+ *
+ * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
+ * under the hood, writes tuples from the executor to a shm_mq.
+ *
+ * A TupleQueueFunnel helps manage the process of reading tuples from
+ * one or more shm_mq objects being used as tuple queues.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/tqueue.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/tqueue.h"
+#include "miscadmin.h"
+
+typedef struct
+{
+ DestReceiver pub;
+ shm_mq_handle *handle;
+} TQueueDestReceiver;
+
+struct TupleQueueFunnel
+{
+ int nqueues;
+ int maxqueues;
+ int nextqueue;
+ shm_mq_handle **queue;
+};
+
+/*
+ * Receive a tuple.
+ */
+static void
+tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+{
+ TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+ HeapTuple tuple;
+
+ tuple = ExecMaterializeSlot(slot);
+ shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+}
+
+/*
+ * Prepare to receive tuples from executor.
+ */
+static void
+tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ /* do nothing */
+}
+
+/*
+ * Clean up at end of an executor run
+ */
+static void
+tqueueShutdownReceiver(DestReceiver *self)
+{
+ /* do nothing */
+}
+
+/*
+ * Destroy receiver when done with it
+ */
+static void
+tqueueDestroyReceiver(DestReceiver *self)
+{
+ pfree(self);
+}
+
+/*
+ * Create a DestReceiver that writes tuples to a tuple queue.
+ */
+DestReceiver *
+CreateTupleQueueDestReceiver(shm_mq_handle *handle)
+{
+ TQueueDestReceiver *self;
+
+ self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
+
+ self->pub.receiveSlot = tqueueReceiveSlot;
+ self->pub.rStartup = tqueueStartupReceiver;
+ self->pub.rShutdown = tqueueShutdownReceiver;
+ self->pub.rDestroy = tqueueDestroyReceiver;
+ self->pub.mydest = DestTupleQueue;
+ self->handle = handle;
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * Create a tuple queue funnel.
+ */
+TupleQueueFunnel *
+CreateTupleQueueFunnel(void)
+{
+ TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+
+ funnel->maxqueues = 8;
+ funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+
+ return funnel;
+}
+
+/*
+ * Destroy a tuple queue funnel.
+ */
+void
+DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+{
+ int i;
+
+ for (i = 0; i < funnel->nqueues; i++)
+ shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
+ pfree(funnel->queue);
+ pfree(funnel);
+}
+
+/*
+ * Remember the shared memory queue handle in funnel.
+ */
+void
+RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+{
+ if (funnel->nqueues < funnel->maxqueues)
+ {
+ funnel->queue[funnel->nqueues++] = handle;
+ return;
+ }
+
+ if (funnel->nqueues >= funnel->maxqueues)
+ {
+ int newsize = funnel->nqueues * 2;
+
+ Assert(funnel->nqueues == funnel->maxqueues);
+
+ funnel->queue = repalloc(funnel->queue,
+ newsize * sizeof(shm_mq_handle *));
+ funnel->maxqueues = newsize;
+ }
+
+ funnel->queue[funnel->nqueues++] = handle;
+}
+
+/*
+ * Fetch a tuple from a tuple queue funnel.
+ *
+ * We try to read from the queues in round-robin fashion so as to avoid
+ * the situation where some workers get their tuples read expediently while
+ * others are barely ever serviced.
+ *
+ * Even when nowait = false, we read from the individual queues in
+ * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
+ * it can still accumulate bytes from a partially-read message, so doing it
+ * this way should outperform doing a blocking read on each queue in turn.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no queue returned a tuple without blocking. *done, if
+ * not NULL, is set to true when there are no remaining queues and false in
+ * any other case.
+ */
+HeapTuple
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+{
+ int waitpos = funnel->nextqueue;
+
+ /* Corner case: called before adding any queues, or after all are gone. */
+ if (funnel->nqueues == 0)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
+ }
+
+ if (done != NULL)
+ *done = false;
+
+ for (;;)
+ {
+ shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+ shm_mq_result result;
+ Size nbytes;
+ void *data;
+
+ /* Attempt to read a message. */
+ result = shm_mq_receive(mqh, &nbytes, &data, true);
+
+ /*
+ * Normally, we advance funnel->nextqueue to the next queue at this
+ * point, but if we're pointing to a queue that we've just discovered
+ * is detached, then forget that queue and leave the pointer where it
+ * is until the number of remaining queues fall below that pointer and
+ * at that point make the pointer point to the first queue.
+ */
+ if (result != SHM_MQ_DETACHED)
+ funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+ else
+ {
+ --funnel->nqueues;
+ if (funnel->nqueues == 0)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
+ }
+
+ memmove(&funnel->queue[funnel->nextqueue],
+ &funnel->queue[funnel->nextqueue + 1],
+ sizeof(shm_mq_handle *)
+ * (funnel->nqueues - funnel->nextqueue));
+
+ if (funnel->nextqueue >= funnel->nqueues)
+ funnel->nextqueue = 0;
+
+ if (funnel->nextqueue < waitpos)
+ --waitpos;
+
+ continue;
+ }
+
+ /* If we got a message, return it. */
+ if (result == SHM_MQ_SUCCESS)
+ {
+ HeapTupleData htup;
+
+ /*
+ * The tuple data we just read from the queue is only valid until
+ * we again attempt to read from it. Copy the tuple into a single
+ * palloc'd chunk as callers will expect.
+ */
+ ItemPointerSetInvalid(&htup.t_self);
+ htup.t_tableOid = InvalidOid;
+ htup.t_len = nbytes;
+ htup.t_data = data;
+ return heap_copytuple(&htup);
+ }
+
+ /*
+ * If we've visited all of the queues, then we should either give up
+ * and return NULL (if we're in non-blocking mode) or wait for the
+ * process latch to be set (otherwise).
+ */
+ if (funnel->nextqueue == waitpos)
+ {
+ if (nowait)
+ return NULL;
+ WaitLatch(MyLatch, WL_LATCH_SET, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(MyLatch);
+ }
+ }
+}
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 0e60dbcddc8..c78f1650e6a 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -746,6 +746,15 @@ shm_mq_detach(shm_mq *mq)
}
/*
+ * Get the shm_mq from handle.
+ */
+shm_mq *
+shm_mq_get_queue(shm_mq_handle *mqh)
+{
+ return mqh->mqh_queue;
+}
+
+/*
* Write bytes into a shared message queue.
*/
static shm_mq_result
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index bcf38952a8c..d645751ff58 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -34,6 +34,7 @@
#include "commands/createas.h"
#include "commands/matview.h"
#include "executor/functions.h"
+#include "executor/tqueue.h"
#include "executor/tstoreReceiver.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest)
case DestTransientRel:
return CreateTransientRelDestReceiver(InvalidOid);
+
+ case DestTupleQueue:
+ return CreateTupleQueueDestReceiver(NULL);
}
/* should never get here */
@@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestCopyOut:
case DestSQLFunction:
case DestTransientRel:
+ case DestTupleQueue:
break;
}
}
@@ -204,6 +209,7 @@ NullCommand(CommandDest dest)
case DestCopyOut:
case DestSQLFunction:
case DestTransientRel:
+ case DestTupleQueue:
break;
}
}
@@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest)
case DestCopyOut:
case DestSQLFunction:
case DestTransientRel:
+ case DestTupleQueue:
break;
}
}
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
new file mode 100644
index 00000000000..6f8eb73c9ae
--- /dev/null
+++ b/src/include/executor/tqueue.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.h
+ * Use shm_mq to send & receive tuples between parallel backends
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/tqueue.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef TQUEUE_H
+#define TQUEUE_H
+
+#include "storage/shm_mq.h"
+#include "tcop/dest.h"
+
+/* Use this to send tuples to a shm_mq. */
+extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
+
+/* Use these to receive tuples from a shm_mq. */
+typedef struct TupleQueueFunnel TupleQueueFunnel;
+extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
+extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
+extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
+ bool *done);
+
+#endif /* TQUEUE_H */
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 1a2ba040cb4..7621a358ab4 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
/* Break connection. */
extern void shm_mq_detach(shm_mq *);
+/* Get the shm_mq from handle. */
+extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
+
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
Size nbytes, const void *data, bool nowait);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 5bcca3fbcaa..b560672fd40 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -94,7 +94,8 @@ typedef enum
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
DestSQLFunction, /* results sent to SQL-language func mgr */
- DestTransientRel /* results sent to transient relation */
+ DestTransientRel, /* results sent to transient relation */
+ DestTupleQueue /* results sent to tuple queue */
} CommandDest;
/* ----------------
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4b650d1bde7..a037f818acc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2014,6 +2014,7 @@ TupleHashEntry
TupleHashEntryData
TupleHashIterator
TupleHashTable
+TupleQueueFunnel
TupleTableSlot
Tuplesortstate
Tuplestorestate