aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c605
1 files changed, 605 insertions, 0 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
new file mode 100644
index 00000000000..2d3064fa472
--- /dev/null
+++ b/src/backend/commands/async.c
@@ -0,0 +1,605 @@
+/*-------------------------------------------------------------------------
+ *
+ * async.c--
+ * Asynchronous notification
+ *
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.1.1.1 1996/07/09 06:21:19 scrappy Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+/* New Async Notification Model:
+ * 1. Multiple backends on same machine. Multiple backends listening on
+ * one relation.
+ *
+ * 2. One of the backend does a 'notify <relname>'. For all backends that
+ * are listening to this relation (all notifications take place at the
+ * end of commit),
+ * 2.a If the process is the same as the backend process that issued
+ * notification (we are notifying something that we are listening),
+ * signal the corresponding frontend over the comm channel using the
+ * out-of-band channel.
+ * 2.b For all other listening processes, we send kill(2) to wake up
+ * the listening backend.
+ * 3. Upon receiving a kill(2) signal from another backend process notifying
+ * that one of the relation that we are listening is being notified,
+ * we can be in either of two following states:
+ * 3.a We are sleeping, wake up and signal our frontend.
+ * 3.b We are in middle of another transaction, wait until the end of
+ * of the current transaction and signal our frontend.
+ * 4. Each frontend receives this notification and prcesses accordingly.
+ *
+ * -- jw, 12/28/93
+ *
+ */
+/*
+ * The following is the old model which does not work.
+ */
+/*
+ * Model is:
+ * 1. Multiple backends on same machine.
+ *
+ * 2. Query on one backend sends stuff over an asynchronous portal by
+ * appending to a relation, and then doing an async. notification
+ * (which takes place after commit) to all listeners on this relation.
+ *
+ * 3. Async. notification results in all backends listening on relation
+ * to be woken up, by a process signal kill(2), with name of relation
+ * passed in shared memory.
+ *
+ * 4. Each backend notifies its respective frontend over the comm
+ * channel using the out-of-band channel.
+ *
+ * 5. Each frontend receives this notification and processes accordingly.
+ *
+ * #4,#5 are changing soon with pending rewrite of portal/protocol.
+ *
+ */
+
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+#include "postgres.h"
+
+#include "access/attnum.h"
+#include "access/heapam.h"
+#include "access/htup.h"
+#include "access/relscan.h"
+#include "access/skey.h"
+#include "utils/builtins.h"
+#include "utils/tqual.h"
+#include "access/xact.h"
+
+#include "commands/async.h"
+#include "commands/copy.h"
+#include "storage/buf.h"
+#include "storage/itemptr.h"
+#include "miscadmin.h"
+#include "utils/portal.h"
+#include "utils/excid.h"
+#include "utils/elog.h"
+#include "utils/mcxt.h"
+#include "utils/palloc.h"
+#include "utils/rel.h"
+
+#include "nodes/pg_list.h"
+#include "tcop/dest.h"
+#include "commands/command.h"
+
+#include "catalog/catname.h"
+#include "utils/syscache.h"
+#include "catalog/pg_attribute.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_type.h"
+#include "catalog/pg_listener.h"
+
+#include "executor/execdefs.h"
+/* #include "executor/execdesc.h"*/
+
+#include "storage/bufmgr.h"
+#include "lib/dllist.h"
+#include "libpq/libpq.h"
+
+
+static int notifyFrontEndPending = 0;
+static int notifyIssued = 0;
+static Dllist *pendingNotifies = NULL;
+
+
+static int AsyncExistsPendingNotify(char *);
+static void ClearPendingNotify(void);
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyHandler --
+ *
+ * This is the signal handler for SIGUSR2. When the backend
+ * is signaled, the backend can be in two states.
+ * 1. If the backend is in the middle of another transaction,
+ * we set the flag, notifyFrontEndPending, and wait until
+ * the end of the transaction to notify the front end.
+ * 2. If the backend is not in the middle of another transaction,
+ * we notify the front end immediately.
+ *
+ * -- jw, 12/28/93
+ * Results:
+ * none
+ *
+ * Side effects:
+ * none
+ */
+void
+#if defined(PORTNAME_linux)
+Async_NotifyHandler(int i)
+#else
+Async_NotifyHandler()
+#endif
+{
+ extern TransactionState CurrentTransactionState;
+
+ if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
+ (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
+
+ elog(DEBUG, "Waking up sleeping backend process");
+ Async_NotifyFrontEnd();
+
+ }else {
+ elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
+ CurrentTransactionState->state,
+ CurrentTransactionState->blockState);
+ notifyFrontEndPending = 1;
+ }
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_Notify --
+ *
+ * Adds the relation to the list of pending notifies.
+ * All notification happens at end of commit.
+ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ *
+ * All notification of backend processes happens here,
+ * then each backend notifies its corresponding front end at
+ * the end of commit.
+ *
+ * This correspond to 'notify <relname>' command
+ * -- jw, 12/28/93
+ *
+ * Results:
+ * XXX
+ *
+ * Side effects:
+ * All tuples for relname in pg_listener are updated.
+ *
+ *--------------------------------------------------------------
+ */
+void
+Async_Notify(char *relname)
+{
+
+ HeapTuple lTuple, rTuple;
+ Relation lRel;
+ HeapScanDesc sRel;
+ TupleDesc tdesc;
+ ScanKeyData key;
+ Buffer b;
+ Datum d, value[3];
+ bool isnull;
+ char repl[3], nulls[3];
+
+ char *notifyName;
+
+ elog(DEBUG,"Async_Notify: %s",relname);
+
+ if (!pendingNotifies)
+ pendingNotifies = DLNewList();
+
+ notifyName = pstrdup(relname);
+ DLAddHead(pendingNotifies, DLNewElem(notifyName));
+
+ ScanKeyEntryInitialize(&key, 0,
+ Anum_pg_listener_relname,
+ NameEqualRegProcedure,
+ PointerGetDatum(notifyName));
+
+ lRel = heap_openr(ListenerRelationName);
+ tdesc = RelationGetTupleDescriptor(lRel);
+ sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
+
+ nulls[0] = nulls[1] = nulls[2] = ' ';
+ repl[0] = repl[1] = repl[2] = ' ';
+ repl[Anum_pg_listener_notify - 1] = 'r';
+ value[0] = value[1] = value[2] = (Datum) 0;
+ value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) {
+ d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify,
+ tdesc, &isnull);
+ if (!DatumGetInt32(d)) {
+ rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
+ (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
+ }
+ ReleaseBuffer(b);
+ }
+ heap_endscan(sRel);
+ heap_close(lRel);
+ notifyIssued = 1;
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyAtCommit --
+ *
+ * Signal our corresponding frontend process on relations that
+ * were notified. Signal all other backend process that
+ * are listening also.
+ *
+ * -- jw, 12/28/93
+ *
+ * Results:
+ * XXX
+ *
+ * Side effects:
+ * Tuples in pg_listener that has our listenerpid are updated so
+ * that the notification is 0. We do not want to notify frontend
+ * more than once.
+ *
+ * -- jw, 12/28/93
+ *
+ *--------------------------------------------------------------
+ */
+void
+Async_NotifyAtCommit()
+{
+ HeapTuple lTuple;
+ Relation lRel;
+ HeapScanDesc sRel;
+ TupleDesc tdesc;
+ ScanKeyData key;
+ Datum d;
+ int ourpid;
+ bool isnull;
+ Buffer b;
+ extern TransactionState CurrentTransactionState;
+
+ if (!pendingNotifies)
+ pendingNotifies = DLNewList();
+
+ if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
+ (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
+
+ if (notifyIssued) { /* 'notify <relname>' issued by us */
+ notifyIssued = 0;
+ StartTransactionCommand();
+ elog(DEBUG, "Async_NotifyAtCommit.");
+ ScanKeyEntryInitialize(&key, 0,
+ Anum_pg_listener_notify,
+ Integer32EqualRegProcedure,
+ Int32GetDatum(1));
+ lRel = heap_openr(ListenerRelationName);
+ sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
+ tdesc = RelationGetTupleDescriptor(lRel);
+ ourpid = getpid();
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) {
+ d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
+ tdesc, &isnull);
+
+ if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) {
+ d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid,
+ tdesc, &isnull);
+
+ if (ourpid == DatumGetInt32(d)) {
+ elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
+ notifyFrontEndPending = 1;
+ } else {
+ elog(DEBUG, "Notifying others");
+#ifndef WIN32
+ if (kill(DatumGetInt32(d), SIGUSR2) < 0) {
+ if (errno == ESRCH) {
+ heap_delete(lRel, &lTuple->t_ctid);
+ }
+ }
+#endif /* WIN32 */
+ }
+ }
+ ReleaseBuffer(b);
+ }
+ CommitTransactionCommand();
+ ClearPendingNotify();
+ }
+
+ if (notifyFrontEndPending) { /* we need to notify the frontend of
+ all pending notifies. */
+ notifyFrontEndPending = 1;
+ Async_NotifyFrontEnd();
+ }
+ }
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyAtAbort --
+ *
+ * Gets rid of pending notifies. List elements are automatically
+ * freed through memory context.
+ *
+ *
+ * Results:
+ * XXX
+ *
+ * Side effects:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+Async_NotifyAtAbort()
+{
+ extern TransactionState CurrentTransactionState;
+
+ if (notifyIssued) {
+ ClearPendingNotify();
+ }
+ notifyIssued = 0;
+ if (pendingNotifies)
+ DLFreeList(pendingNotifies);
+ pendingNotifies = DLNewList();
+
+ if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
+ (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
+ if (notifyFrontEndPending) { /* don't forget to notify front end */
+ Async_NotifyFrontEnd();
+ }
+ }
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_Listen --
+ *
+ * Register a backend (identified by its Unix PID) as listening
+ * on the specified relation.
+ *
+ * This corresponds to the 'listen <relation>' command in SQL
+ *
+ * One listener per relation, pg_listener relation is keyed
+ * on (relname,pid) to provide multiple listeners in future.
+ *
+ * Results:
+ * pg_listeners is updated.
+ *
+ * Side effects:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+Async_Listen(char *relname, int pid)
+{
+ Datum values[Natts_pg_listener];
+ char nulls[Natts_pg_listener];
+ TupleDesc tdesc;
+ HeapScanDesc s;
+ HeapTuple htup,tup;
+ Relation lDesc;
+ Buffer b;
+ Datum d;
+ int i;
+ bool isnull;
+ int alreadyListener = 0;
+ int ourPid = getpid();
+ char *relnamei;
+ TupleDesc tupDesc;
+
+ elog(DEBUG,"Async_Listen: %s",relname);
+ for (i = 0 ; i < Natts_pg_listener; i++) {
+ nulls[i] = ' ';
+ values[i] = PointerGetDatum(NULL);
+ }
+
+ i = 0;
+ values[i++] = (Datum) relname;
+ values[i++] = (Datum) pid;
+ values[i++] = (Datum) 0; /* no notifies pending */
+
+ lDesc = heap_openr(ListenerRelationName);
+
+ /* is someone already listening. One listener per relation */
+ tdesc = RelationGetTupleDescriptor(lDesc);
+ s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
+ while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
+ d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
+ &isnull);
+ relnamei = DatumGetPointer(d);
+ if (!strncmp(relnamei,relname, NAMEDATALEN)) {
+ d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
+ pid = DatumGetInt32(d);
+ if (pid == ourPid) {
+ alreadyListener = 1;
+ }
+ }
+ ReleaseBuffer(b);
+ }
+ heap_endscan(s);
+
+ if (alreadyListener) {
+ elog(NOTICE, "Async_Listen: We are already listening on %s",
+ relname);
+ return;
+ }
+
+ tupDesc = lDesc->rd_att;
+ tup = heap_formtuple(tupDesc,
+ values,
+ nulls);
+ heap_insert(lDesc, tup);
+
+ pfree(tup);
+ /* if (alreadyListener) {
+ elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
+ }*/
+ heap_close(lDesc);
+
+ /*
+ * now that we are listening, we should make a note to ourselves
+ * to unlisten prior to dying.
+ */
+ relnamei = malloc(NAMEDATALEN); /* persists to process exit */
+ memset (relnamei, 0, NAMEDATALEN);
+ strncpy(relnamei, relname, NAMEDATALEN);
+ on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_Unlisten --
+ *
+ * Remove the backend from the list of listening backends
+ * for the specified relation.
+ *
+ * This would correspond to the 'unlisten <relation>'
+ * command, but there isn't one yet.
+ *
+ * Results:
+ * pg_listeners is updated.
+ *
+ * Side effects:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+Async_Unlisten(char *relname, int pid)
+{
+ Relation lDesc;
+ HeapTuple lTuple;
+
+ lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
+ Int32GetDatum(pid),
+ 0,0);
+ lDesc = heap_openr(ListenerRelationName);
+ if (lTuple != NULL) {
+ heap_delete(lDesc,&lTuple->t_ctid);
+ }
+ heap_close(lDesc);
+}
+
+void
+Async_UnlistenOnExit(int code, /* from exitpg */
+ char *relname)
+{
+ Async_Unlisten((char *) relname, getpid());
+}
+
+/*
+ * --------------------------------------------------------------
+ * Async_NotifyFrontEnd --
+ *
+ * Perform an asynchronous notification to front end over
+ * portal comm channel. The name of the relation which contains the
+ * data is sent to the front end.
+ *
+ * We remove the notification flag from the pg_listener tuple
+ * associated with our process.
+ *
+ * Results:
+ * XXX
+ *
+ * Side effects:
+ *
+ * We make use of the out-of-band channel to transmit the
+ * notification to the front end. The actual data transfer takes
+ * place at the front end's request.
+ *
+ * --------------------------------------------------------------
+ */
+GlobalMemory notifyContext = NULL;
+
+void
+Async_NotifyFrontEnd()
+{
+ extern CommandDest whereToSendOutput;
+ HeapTuple lTuple, rTuple;
+ Relation lRel;
+ HeapScanDesc sRel;
+ TupleDesc tdesc;
+ ScanKeyData key[2];
+ Datum d, value[3];
+ char repl[3], nulls[3];
+ Buffer b;
+ int ourpid;
+ bool isnull;
+
+ notifyFrontEndPending = 0;
+
+ elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
+
+ StartTransactionCommand();
+ ourpid = getpid();
+ ScanKeyEntryInitialize(&key[0], 0,
+ Anum_pg_listener_notify,
+ Integer32EqualRegProcedure,
+ Int32GetDatum(1));
+ ScanKeyEntryInitialize(&key[1], 0,
+ Anum_pg_listener_pid,
+ Integer32EqualRegProcedure,
+ Int32GetDatum(ourpid));
+ lRel = heap_openr(ListenerRelationName);
+ tdesc = RelationGetTupleDescriptor(lRel);
+ sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key);
+
+ nulls[0] = nulls[1] = nulls[2] = ' ';
+ repl[0] = repl[1] = repl[2] = ' ';
+ repl[Anum_pg_listener_notify - 1] = 'r';
+ value[0] = value[1] = value[2] = (Datum) 0;
+ value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) {
+ d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
+ tdesc, &isnull);
+ rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
+ (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
+
+ /* notifying the front end */
+
+ if (whereToSendOutput == Remote) {
+ pq_putnchar("A", 1);
+ pq_putint(ourpid, 4);
+ pq_putstr(DatumGetName(d)->data);
+ pq_flush();
+ } else {
+ elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
+ }
+ ReleaseBuffer(b);
+ }
+ CommitTransactionCommand();
+}
+
+static int
+AsyncExistsPendingNotify(char *relname)
+{
+ Dlelem* p;
+ for (p = DLGetHead(pendingNotifies);
+ p != NULL;
+ p = DLGetSucc(p)) {
+ if (!strcmp(DLE_VAL(p), relname))
+ return 1;
+ }
+
+ return 0;
+}
+
+static void
+ClearPendingNotify()
+{
+ Dlelem* p;
+ while ( (p = DLRemHead(pendingNotifies)) != NULL)
+ free(DLE_VAL(p));
+}
+