diff options
Diffstat (limited to 'contrib/test_shm_mq/worker.c')
-rw-r--r-- | contrib/test_shm_mq/worker.c | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/contrib/test_shm_mq/worker.c b/contrib/test_shm_mq/worker.c new file mode 100644 index 00000000000..95b23c9788a --- /dev/null +++ b/contrib/test_shm_mq/worker.c @@ -0,0 +1,224 @@ +/*-------------------------------------------------------------------------- + * + * worker.c + * Code for sample worker making use of shared memory message queues. + * Our test worker simply reads messages from one message queue and + * writes them back out to another message queue. In a real + * application, you'd presumably want the worker to do some more + * complex calculation rather than simply returning the input, + * but it should be possible to use much of the control logic just + * as presented here. + * + * Copyright (C) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_shm_mq/worker.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/resowner.h" + +#include "test_shm_mq.h" + +static void handle_sigterm(SIGNAL_ARGS); +static void attach_to_queues(dsm_segment *seg, shm_toc *toc, + int myworkernumber, shm_mq_handle **inqhp, + shm_mq_handle **outqhp); +static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh); + +/* + * Background worker entrypoint. + * + * This is intended to demonstrate how a background worker can be used to + * facilitate a parallel computation. Most of the logic here is fairly + * boilerplate stuff, designed to attach to the shared memory segment, + * notify the user backend that we're alive, and so on. The + * application-specific bits of logic that you'd replace for your own worker + * are attach_to_queues() and copy_messages(). + */ +void +test_shm_mq_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + shm_mq_handle *inqh; + shm_mq_handle *outqh; + volatile test_shm_mq_header *hdr; + int myworkernumber; + PGPROC *registrant; + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just + * as it would a normal user backend. To make that happen, we establish + * a signal handler that is a stripped-down version of die(). We don't + * have any equivalent of the backend's command-read loop, where interrupts + * can be processed immediately, so make sure ImmediateInterruptOK is + * turned off. + */ + pqsignal(SIGTERM, handle_sigterm); + ImmediateInterruptOK = false; + BackgroundWorkerUnblockSignals(); + + /* + * Connect to the dynamic shared memory segment. + * + * The backend that registered this worker passed us the ID of a shared + * memory segment to which we must attach for further instructions. In + * order to attach to dynamic shared memory, we need a resource owner. + * Once we've mapped the segment in our address space, attach to the table + * of contents so we can locate the various data structures we'll need + * to find within the segment. + */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker"); + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* + * Acquire a worker number. + * + * By convention, the process registering this background worker should + * have stored the control structure at key 0. We look up that key to + * find it. Our worker number gives our identity: there may be just one + * worker involved in this parallel operation, or there may be many. + */ + hdr = shm_toc_lookup(toc, 0); + SpinLockAcquire(&hdr->mutex); + myworkernumber = ++hdr->workers_attached; + SpinLockRelease(&hdr->mutex); + if (myworkernumber > hdr->workers_total) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many message queue testing workers already"))); + + /* + * Attach to the appropriate message queues. + */ + attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh); + + /* + * Indicate that we're fully initialized and ready to begin the main + * part of the parallel operation. + * + * Once we signal that we're ready, the user backend is entitled to assume + * that our on_dsm_detach callbacks will fire before we disconnect from + * the shared memory segment and exit. Generally, that means we must have + * attached to all relevant dynamic shared memory data structures by now. + */ + SpinLockAcquire(&hdr->mutex); + ++hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); + if (registrant == NULL) + { + elog(DEBUG1, "registrant backend has exited prematurely"); + proc_exit(1); + } + SetLatch(®istrant->procLatch); + + /* Do the work. */ + copy_messages(inqh, outqh); + + /* + * We're done. Explicitly detach the shared memory segment so that we + * don't get a resource leak warning at commit time. This will fire any + * on_dsm_detach callbacks we've registered, as well. Once that's done, + * we can go ahead and exit. + */ + dsm_detach(seg); + proc_exit(1); +} + +/* + * Attach to shared memory message queues. + * + * We use our worker number to determine to which queue we should attach. + * The queues are registered at keys 1..<number-of-workers>. The user backend + * writes to queue #1 and reads from queue #<number-of-workers>; each worker + * reads from the queue whose number is equal to its worker number and writes + * to the next higher-numbered queue. + */ +static void +attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber, + shm_mq_handle **inqhp, shm_mq_handle **outqhp) +{ + shm_mq *inq; + shm_mq *outq; + + inq = shm_toc_lookup(toc, myworkernumber); + shm_mq_set_receiver(inq, MyProc); + *inqhp = shm_mq_attach(inq, seg, NULL); + outq = shm_toc_lookup(toc, myworkernumber + 1); + shm_mq_set_sender(outq, MyProc); + *outqhp = shm_mq_attach(outq, seg, NULL); +} + +/* + * Loop, receiving and sending messages, until the connection is broken. + * + * This is the "real work" performed by this worker process. Everything that + * happens before this is initialization of one form or another, and everything + * after this point is cleanup. + */ +static void +copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) +{ + uint64 len; + void *data; + shm_mq_result res; + + for (;;) + { + /* Notice any interrupts that have occurred. */ + CHECK_FOR_INTERRUPTS(); + + /* Receive a message. */ + res = shm_mq_receive(inqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + break; + + /* Send it back out. */ + res = shm_mq_send(outqh, len, data, false); + if (res != SHM_MQ_SUCCESS) + break; + } +} + +/* + * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just + * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right + * thing. + */ +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} |