aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c1048
1 files changed, 575 insertions, 473 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 12121977177..a8c447cb077 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,38 +1,79 @@
/*-------------------------------------------------------------------------
*
* async.c--
- * Asynchronous notification
+ * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
* Copyright (c) 1994, Regents of the University of California
*
- *
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.41 1998/10/06 02:39:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
* 1. Multiple backends on same machine. Multiple backends listening on
- * one relation.
- *
- * 2. One of the backend does a 'notify <relname>'. For all backends that
- * are listening to this relation (all notifications take place at the
- * end of commit),
- * 2.a If the process is the same as the backend process that issued
- * notification (we are notifying something that we are listening),
- * signal the corresponding frontend over the comm channel.
- * 2.b For all other listening processes, we send kill(SIGUSR2) to wake up
- * the listening backend.
- * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
- * notifying that one of the relation that we are listening is being
- * notified, we can be in either of two following states:
- * 3.a We are sleeping, wake up and signal our frontend.
- * 3.b We are in middle of another transaction, wait until the end of
- * of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and processes accordingly.
- *
- * -- jw, 12/28/93
- *
+ * one relation. (Note: "listening on a relation" is not really the
+ * right way to think about it, since the notify names need not have
+ * anything to do with the names of relations actually in the database.
+ * But this terminology is all over the code and docs, and I don't feel
+ * like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ * ie, each relname/listenerPID pair. The "notification" field of the
+ * tuple is zero when no NOTIFY is pending for that listener, or the PID
+ * of the originating backend when a cross-backend NOTIFY is pending.
+ * (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ * notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ * relname to a list of outstanding NOTIFY requests. Actual processing
+ * happens if and only if we reach transaction commit. At that time (in
+ * routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ * If the listenerPID in a matching tuple is ours, we just send a notify
+ * message to our own front end. If it is not ours, and "notification"
+ * is not already nonzero, we set notification to our own PID and send a
+ * SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ * BTW: if the signal operation fails, we presume that the listener backend
+ * crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ * notify processing immediately if this backend is idle (ie, it is
+ * waiting for a frontend command and is not within a transaction block).
+ * Otherwise the handler may only set a flag, which will cause the
+ * processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ * matching our own listenerPID and having nonzero notification fields.
+ * For each such tuple, we send a message to our frontend and clear the
+ * notification field. BTW: this routine has to start/commit its own
+ * transaction, since by assumption it is only called from outside any
+ * transaction.
+ *
+ * Note that the system's use of pg_listener is confined to very short
+ * intervals at the end of a transaction that contains NOTIFY statements,
+ * or during the transaction caused by an inbound SIGUSR2. So the fact that
+ * pg_listener is a global resource shouldn't cause too much performance
+ * problem. But application authors ought to be discouraged from doing
+ * LISTEN or UNLISTEN near the start of a long transaction --- that would
+ * result in holding the pg_listener write lock for a long time, possibly
+ * blocking unrelated activity. It could even lead to deadlock against another
+ * transaction that touches the same user tables and then tries to NOTIFY.
+ * Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.) The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies. Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY. Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
*/
#include <unistd.h>
@@ -44,90 +85,59 @@
#include "postgres.h"
+#include "commands/async.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/catname.h"
#include "catalog/pg_listener.h"
-#include "commands/async.h"
#include "fmgr.h"
#include "lib/dllist.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
-#include "nodes/memnodes.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "tcop/dest.h"
-#include "utils/mcxt.h"
#include "utils/syscache.h"
#include <utils/trace.h>
#include <utils/ps_status.h>
-#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
-#define NotifyHack pg_options[OPT_NOTIFYHACK]
-
+/* stuff that we really ought not be touching directly :-( */
extern TransactionState CurrentTransactionState;
extern CommandDest whereToSendOutput;
-GlobalMemory notifyContext = NULL;
-
-static int notifyFrontEndPending = 0;
-static int notifyIssued = 0;
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction. We do not actually perform a NOTIFY until
+ * and unless the transaction commits. pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current transaction.
+ */
static Dllist *pendingNotifies = NULL;
-static int AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-static void Async_NotifyFrontEnd(void);
-static void Async_NotifyFrontEnd_Aux(void);
-void Async_Unlisten(char *relname, int pid);
-static void Async_UnlistenOnExit(int code, char *relname);
-static void Async_UnlistenAll(void);
-
/*
- *--------------------------------------------------------------
- * Async_NotifyHandler --
- *
- * This is the signal handler for SIGUSR2. When the backend
- * is signaled, the backend can be in two states.
- * 1. If the backend is in the middle of another transaction,
- * we set the flag, notifyFrontEndPending, and wait until
- * the end of the transaction to notify the front end.
- * 2. If the backend is not in the middle of another transaction,
- * we notify the front end immediately.
- *
- * -- jw, 12/28/93
- * Results:
- * none
- *
- * Side effects:
- * none
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical! If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
*/
-void
-Async_NotifyHandler(SIGNAL_ARGS)
-{
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+static volatile int notifyInterruptEnabled = 0;
+static volatile int notifyInterruptOccurred = 0;
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
- {
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
- "waking up sleeping backend process");
- PS_SET_STATUS("async_notify");
- Async_NotifyFrontEnd();
- PS_SET_STATUS("idle");
- }
- else
- {
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
- "process in middle of transaction, state=%d, blockstate=%d",
- CurrentTransactionState->state,
- CurrentTransactionState->blockState);
- notifyFrontEndPending = 1;
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
- }
+/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
+static int unlistenExitRegistered = 0;
+
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(void);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static int AsyncExistsPendingNotify(char *relname);
+static void ClearPendingNotifies(void);
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
-}
/*
*--------------------------------------------------------------
@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS)
* This is executed by the SQL notify command.
*
* Adds the relation to the list of pending notifies.
- * All notification happens at end of commit.
- * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- *
- * All notification of backend processes happens here,
- * then each backend notifies its corresponding front end at
- * the end of commit.
- *
- * -- jw, 12/28/93
+ * Actual notification happens during transaction commit.
+ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
* Results:
* XXX
*
- * Side effects:
- * All tuples for relname in pg_listener are updated.
- *
*--------------------------------------------------------------
*/
void
Async_Notify(char *relname)
{
-
- HeapTuple lTuple,
- rTuple;
- Relation lRel;
- HeapScanDesc sRel;
- TupleDesc tdesc;
- ScanKeyData key;
- Datum d,
- value[3];
- bool isnull;
- char repl[3],
- nulls[3];
-
char *notifyName;
TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
- if (!pendingNotifies)
- pendingNotifies = DLNewList();
-
/*
- * Allocate memory from the global malloc pool because it needs to be
- * referenced also when the transaction is finished. DZ - 26-08-1996
+ * We allocate list memory from the global malloc pool to ensure that
+ * it will live until we want to use it. This is probably not necessary
+ * any longer, since we will use it before the end of the transaction.
+ * DLList only knows how to use malloc() anyway, but we could probably
+ * palloc() the strings...
*/
+ if (!pendingNotifies)
+ pendingNotifies = DLNewList();
notifyName = strdup(relname);
DLAddHead(pendingNotifies, DLNewElem(notifyName));
-
- ScanKeyEntryInitialize(&key, 0,
- Anum_pg_listener_relname,
- F_NAMEEQ,
- PointerGetDatum(notifyName));
-
- lRel = heap_openr(ListenerRelationName);
- tdesc = RelationGetDescr(lRel);
- RelationSetLockForWrite(lRel);
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-
- nulls[0] = nulls[1] = nulls[2] = ' ';
- repl[0] = repl[1] = repl[2] = ' ';
- repl[Anum_pg_listener_notify - 1] = 'r';
- value[0] = value[1] = value[2] = (Datum) 0;
- value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
-
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
- {
- d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
- if (!DatumGetInt32(d))
- {
- rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
- heap_replace(lRel, &lTuple->t_ctid, rTuple);
- /* notify is really issued only if a tuple has been changed */
- notifyIssued = 1;
- }
- }
- heap_endscan(sRel);
-
/*
- * Note: if the write lock is unset we can get multiple tuples with
- * same oid if other backends notify the same relation. Use this
- * option at your own risk.
+ * NOTE: we could check to see if pendingNotifies already has an entry
+ * for relname, and thus avoid making duplicate entries. However, most
+ * apps probably don't notify the same name multiple times per transaction,
+ * so we'd likely just be wasting cycles to make such a check.
+ * AsyncExistsPendingNotify() doesn't really care whether the list
+ * contains duplicates...
*/
- if (NotifyUnlock)
- RelationUnsetLockForWrite(lRel);
-
- heap_close(lRel);
-
- TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtCommit --
- *
- * This is called at transaction commit.
- *
- * Signal our corresponding frontend process on relations that
- * were notified. Signal all other backend process that
- * are listening also.
- *
- * -- jw, 12/28/93
- *
- * Results:
- * XXX
- *
- * Side effects:
- * Tuples in pg_listener that has our listenerpid are updated so
- * that the notification is 0. We do not want to notify frontend
- * more than once.
- *
- * -- jw, 12/28/93
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtCommit()
-{
- HeapTuple lTuple;
- Relation lRel;
- HeapScanDesc sRel;
- TupleDesc tdesc;
- ScanKeyData key;
- Datum d;
- bool isnull;
- extern TransactionState CurrentTransactionState;
-
- if (!pendingNotifies)
- pendingNotifies = DLNewList();
-
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
- {
- if (notifyIssued)
- {
- /* 'notify <relname>' issued by us */
- notifyIssued = 0;
- StartTransactionCommand();
- TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
- ScanKeyEntryInitialize(&key, 0,
- Anum_pg_listener_notify,
- F_INT4EQ,
- Int32GetDatum(1));
- lRel = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lRel);
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
- tdesc = RelationGetDescr(lRel);
-
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
- {
- d = heap_getattr(lTuple, Anum_pg_listener_relname,
- tdesc, &isnull);
-
- if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
- {
- d = heap_getattr(lTuple, Anum_pg_listener_pid,
- tdesc, &isnull);
-
- if (MyProcPid == DatumGetInt32(d))
- {
- notifyFrontEndPending = 1;
- TPRINTF(TRACE_NOTIFY,
- "Async_NotifyAtCommit: notifying self");
- }
- else
- {
- TPRINTF(TRACE_NOTIFY,
- "Async_NotifyAtCommit: notifying pid %d",
- DatumGetInt32(d));
-#ifdef HAVE_KILL
- if (kill(DatumGetInt32(d), SIGUSR2) < 0)
- {
- if (errno == ESRCH)
- heap_delete(lRel, &lTuple->t_ctid);
- }
-#endif
- }
- }
- }
- heap_endscan(sRel);
- heap_close(lRel);
-
- /*
- * Notify the frontend inside the current transaction while we
- * still have a valid write lock on pg_listeners. This avoid
- * waiting until all other backends have finished with
- * pg_listener.
- */
- if (notifyFrontEndPending)
- {
- /* The aux version is called inside transaction */
- Async_NotifyFrontEnd_Aux();
- }
-
- TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
- CommitTransactionCommand();
- }
- else
- {
-
- /*
- * No notifies issued by us. If notifyFrontEndPending has been
- * set by Async_NotifyHandler notify the frontend of pending
- * notifies from other backends.
- */
- if (notifyFrontEndPending)
- Async_NotifyFrontEnd();
- }
-
- ClearPendingNotify();
- }
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtAbort --
- *
- * This is called at transaction commit.
- *
- * Gets rid of pending notifies. List elements are automatically
- * freed through memory context.
- *
- *
- * Results:
- * XXX
- *
- * Side effects:
- * XXX
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtAbort()
-{
- if (pendingNotifies)
- {
- ClearPendingNotify();
- DLFreeList(pendingNotifies);
- }
- pendingNotifies = DLNewList();
- notifyIssued = 0;
-
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
- {
- /* don't forget to notify front end */
- if (notifyFrontEndPending)
- Async_NotifyFrontEnd();
- }
}
/*
@@ -394,108 +191,94 @@ Async_NotifyAtAbort()
* Register a backend (identified by its Unix PID) as listening
* on the specified relation.
*
- * One listener per relation, pg_listener relation is keyed
- * on (relname,pid) to provide multiple listeners in future.
- *
* Results:
- * pg_listeners is updated.
+ * XXX
*
* Side effects:
- * XXX
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
void
Async_Listen(char *relname, int pid)
{
- Datum values[Natts_pg_listener];
- char nulls[Natts_pg_listener];
+ Relation lRel;
TupleDesc tdesc;
HeapScanDesc scan;
HeapTuple tuple,
newtup;
- Relation lDesc;
+ Datum values[Natts_pg_listener];
+ char nulls[Natts_pg_listener];
Datum d;
int i;
bool isnull;
int alreadyListener = 0;
- char *relnamei;
TupleDesc tupDesc;
- if (whereToSendOutput != Remote)
- {
- elog(NOTICE, "Async_Listen: "
- "listen not available on interactive sessions");
- return;
- }
-
TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
- for (i = 0; i < Natts_pg_listener; i++)
- {
- nulls[i] = ' ';
- values[i] = PointerGetDatum(NULL);
- }
- i = 0;
- values[i++] = (Datum) relname;
- values[i++] = (Datum) pid;
- values[i++] = (Datum) 0; /* no notifies pending */
-
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ tdesc = RelationGetDescr(lRel);
- /* is someone already listening. One listener per relation */
- tdesc = RelationGetDescr(lDesc);
- scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
+ /* Detect whether we are already listening on this relname */
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc,
- &isnull);
- relnamei = DatumGetPointer(d);
- if (!strncmp(relnamei, relname, NAMEDATALEN))
+ d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
+ if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
{
d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
- pid = DatumGetInt32(d);
- if (pid == MyProcPid)
+ if (DatumGetInt32(d) == pid)
+ {
alreadyListener = 1;
- }
- if (alreadyListener)
- {
- /* No need to scan the rest of the table */
- break;
+ /* No need to scan the rest of the table */
+ break;
+ }
}
}
heap_endscan(scan);
if (alreadyListener)
{
- elog(NOTICE, "Async_Listen: We are already listening on %s",
- relname);
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+ RelationUnsetLockForWrite(lRel);
+ heap_close(lRel);
return;
}
- tupDesc = lDesc->rd_att;
- newtup = heap_formtuple(tupDesc, values, nulls);
- heap_insert(lDesc, newtup);
- pfree(newtup);
-
/*
- * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
- * listener on %s (possibly dead)",relname); }
+ * OK to insert a new tuple
*/
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ for (i = 0; i < Natts_pg_listener; i++)
+ {
+ nulls[i] = ' ';
+ values[i] = PointerGetDatum(NULL);
+ }
+
+ i = 0;
+ values[i++] = (Datum) relname;
+ values[i++] = (Datum) pid;
+ values[i++] = (Datum) 0; /* no notifies pending */
+
+ tupDesc = lRel->rd_att;
+ newtup = heap_formtuple(tupDesc, values, nulls);
+ heap_insert(lRel, newtup);
+ pfree(newtup);
+
+ RelationUnsetLockForWrite(lRel);
+ heap_close(lRel);
/*
- * now that we are listening, we should make a note to ourselves to
- * unlisten prior to dying.
+ * now that we are listening, make sure we will unlisten before dying.
*/
- relnamei = malloc(NAMEDATALEN); /* persists to process exit */
- StrNCpy(relnamei, relname, NAMEDATALEN);
- on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
+ if (! unlistenExitRegistered)
+ {
+ if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
+ elog(NOTICE, "Async_Listen: out of shmem_exit slots");
+ unlistenExitRegistered = 1;
+ }
}
/*
@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid)
* for the specified relation.
*
* Results:
- * pg_listeners is updated.
+ * XXX
*
* Side effects:
- * XXX
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
void
Async_Unlisten(char *relname, int pid)
{
- Relation lDesc;
+ Relation lRel;
HeapTuple lTuple;
/* Handle specially the `unlisten "*"' command */
@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid)
TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
+ /* Note we assume there can be only one matching tuple. */
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
if (lTuple != NULL)
{
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
- heap_delete(lDesc, &lTuple->t_ctid);
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ heap_delete(lRel, &lTuple->t_ctid);
+ RelationUnsetLockForWrite(lRel);
+ heap_close(lRel);
}
+ /* We do not complain about unlistening something not being listened;
+ * should we?
+ */
}
/*
@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid)
*
* Unlisten all relations for this backend.
*
+ * This is invoked by UNLISTEN "*" command, and also at backend exit.
+ *
* Results:
- * pg_listeners is updated.
+ * XXX
*
* Side effects:
- * XXX
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
static void
Async_UnlistenAll()
{
- HeapTuple lTuple;
Relation lRel;
- HeapScanDesc sRel;
TupleDesc tdesc;
+ HeapScanDesc sRel;
+ HeapTuple lTuple;
ScanKeyData key[1];
TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ tdesc = RelationGetDescr(lRel);
+
+ /* Find and delete all entries with my listenerPID */
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
- lRel = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lRel);
- tdesc = RelationGetDescr(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
heap_delete(lRel, &lTuple->t_ctid);
+
heap_endscan(sRel);
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
- TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
}
/*
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
* Async_UnlistenOnExit --
*
- * This is called at backend exit for each registered listen.
+ * Clean up the pg_listener table at backend exit.
+ *
+ * This is executed if we have done any LISTENs in this backend.
+ * It might not be necessary anymore, if the user UNLISTENed everything,
+ * but we don't try to detect that case.
*
* Results:
* XXX
*
- * --------------------------------------------------------------
- */
-static void
-Async_UnlistenOnExit(int code, /* from exitpg */
- char *relname)
-{
- Async_Unlisten((char *) relname, MyProcPid);
-}
-
-/*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd --
- *
- * This is called outside transactions. The real work is done
- * by Async_NotifyFrontEnd_Aux().
+ * Side effects:
+ * pg_listener is updated if necessary.
*
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
*/
static void
-Async_NotifyFrontEnd()
+Async_UnlistenOnExit()
{
+ /*
+ * We need to start/commit a transaction for the unlisten,
+ * but if there is already an active transaction we had better
+ * abort that one first. Otherwise we'd end up committing changes
+ * that probably ought to be discarded.
+ */
+ AbortOutOfAnyTransaction();
+ /* Now we can do the unlisten */
StartTransactionCommand();
- Async_NotifyFrontEnd_Aux();
+ Async_UnlistenAll();
CommitTransactionCommand();
}
/*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd_Aux --
+ *--------------------------------------------------------------
+ * AtCommit_Notify --
*
- * This must be called inside a transaction block.
+ * This is called at transaction commit.
*
- * Perform an asynchronous notification to front end over
- * portal comm channel. The name of the relation which contains the
- * data is sent to the front end.
+ * If there are outbound notify requests in the pendingNotifies list,
+ * scan pg_listener for matching tuples, and either signal the other
+ * backend or send a message to our own frontend.
*
- * We remove the notification flag from the pg_listener tuple
- * associated with our process.
+ * NOTE: we are still inside the current transaction, therefore can
+ * piggyback on its committing of changes.
*
* Results:
* XXX
*
- * --------------------------------------------------------------
+ * Side effects:
+ * Tuples in pg_listener that have matching relnames and other peoples'
+ * listenerPIDs are updated with a nonzero notification field.
+ *
+ *--------------------------------------------------------------
*/
-static void
-Async_NotifyFrontEnd_Aux()
+void
+AtCommit_Notify()
{
- HeapTuple lTuple,
- rTuple;
Relation lRel;
- HeapScanDesc sRel;
TupleDesc tdesc;
- ScanKeyData key[2];
+ HeapScanDesc sRel;
+ HeapTuple lTuple,
+ rTuple;
Datum d,
- value[3];
- char repl[3],
- nulls[3];
+ value[Natts_pg_listener];
+ char repl[Natts_pg_listener],
+ nulls[Natts_pg_listener];
bool isnull;
+ char *relname;
+ int32 listenerPID;
-#define MAX_DONE 64
+ if (!pendingNotifies)
+ return; /* no NOTIFY statements in this transaction */
- char *done[MAX_DONE];
- int ndone = 0;
- int i;
+ /* NOTIFY is disabled if not normal processing mode.
+ * This test used to be in xact.c, but it seems cleaner to do it here.
+ */
+ if (! IsNormalProcessingMode())
+ {
+ ClearPendingNotifies();
+ return;
+ }
- notifyFrontEndPending = 0;
+ TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
- TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
- StartTransactionCommand();
- ScanKeyEntryInitialize(&key[0], 0,
- Anum_pg_listener_notify,
- F_INT4EQ,
- Int32GetDatum(1));
- ScanKeyEntryInitialize(&key[1], 0,
- Anum_pg_listener_pid,
- F_INT4EQ,
- Int32GetDatum(MyProcPid));
lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel);
tdesc = RelationGetDescr(lRel);
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
+ sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ /* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' ';
repl[Anum_pg_listener_notify - 1] = 'r';
value[0] = value[1] = value[2] = (Datum) 0;
- value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+ value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
- &isnull);
+ d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+ relname = (char *) DatumGetPointer(d);
- /*
- * This hack deletes duplicate tuples which can be left in the
- * table if the NotifyUnlock option is set. I'm further
- * investigating this. -- dz
- */
- if (NotifyHack)
+ if (AsyncExistsPendingNotify(relname))
{
- for (i = 0; i < ndone; i++)
+ d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
+ listenerPID = DatumGetInt32(d);
+
+ if (listenerPID == MyProcPid)
+ {
+ /* Self-notify: no need to bother with table update.
+ * Indeed, we *must not* clear the notification field in
+ * this path, or we could lose an outside notify, which'd be
+ * bad for applications that ignore self-notify messages.
+ */
+ TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
+ NotifyMyFrontEnd(relname, listenerPID);
+ }
+ else
{
- if (strcmp(DatumGetName(d)->data, done[i]) == 0)
+ TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
+ listenerPID);
+ /*
+ * If someone has already notified this listener,
+ * we don't bother modifying the table, but we do still send
+ * a SIGUSR2 signal, just in case that backend missed the
+ * earlier signal for some reason. It's OK to send the signal
+ * first, because the other guy can't read pg_listener until
+ * we unlock it.
+ */
+#ifdef HAVE_KILL
+ if (kill(listenerPID, SIGUSR2) < 0)
{
- TPRINTF(TRACE_NOTIFY,
- "Async_NotifyFrontEnd: duplicate %s",
- DatumGetName(d)->data);
+ /* Get rid of pg_listener entry if it refers to a PID
+ * that no longer exists. Presumably, that backend
+ * crashed without deleting its pg_listener entries.
+ * This code used to only delete the entry if errno==ESRCH,
+ * but as far as I can see we should just do it for any
+ * failure (certainly at least for EPERM too...)
+ */
heap_delete(lRel, &lTuple->t_ctid);
- continue;
}
+ else
+#endif
+ {
+ d = heap_getattr(lTuple, Anum_pg_listener_notify,
+ tdesc, &isnull);
+ if (DatumGetInt32(d) == 0)
+ {
+ rTuple = heap_modifytuple(lTuple, lRel,
+ value, nulls, repl);
+ heap_replace(lRel, &lTuple->t_ctid, rTuple);
+ }
+ }
+ }
+ }
+ }
+
+ heap_endscan(sRel);
+ /*
+ * We do not do RelationUnsetLockForWrite(lRel) here, because the
+ * transaction is about to be committed anyway.
+ */
+ heap_close(lRel);
+
+ ClearPendingNotifies();
+
+ TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+}
+
+/*
+ *--------------------------------------------------------------
+ * AtAbort_Notify --
+ *
+ * This is called at transaction abort.
+ *
+ * Gets rid of pending outbound notifies that we would have executed
+ * if the transaction got committed.
+ *
+ * Results:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+AtAbort_Notify()
+{
+ ClearPendingNotifies();
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyHandler --
+ *
+ * This is the signal handler for SIGUSR2.
+ *
+ * If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ * ProcessIncomingNotify directly. Otherwise, just set a flag
+ * to do it later.
+ *
+ * Results:
+ * none
+ *
+ * Side effects:
+ * per above
+ *--------------------------------------------------------------
+ */
+
+void
+Async_NotifyHandler(SIGNAL_ARGS)
+{
+ /*
+ * Note: this is a SIGNAL HANDLER. You must be very wary what you do here.
+ * Some helpful soul had this routine sprinkled with TPRINTFs, which would
+ * likely lead to corruption of stdio buffers if they were ever turned on.
+ */
+
+ if (notifyInterruptEnabled)
+ {
+ /* I'm not sure whether some flavors of Unix might allow another
+ * SIGUSR2 occurrence to recursively interrupt this routine.
+ * To cope with the possibility, we do the same sort of dance that
+ * EnableNotifyInterrupt must do --- see that routine for comments.
+ */
+ notifyInterruptEnabled = 0; /* disable any recursive signal */
+ notifyInterruptOccurred = 1; /* do at least one iteration */
+ for (;;)
+ {
+ notifyInterruptEnabled = 1;
+ if (! notifyInterruptOccurred)
+ break;
+ notifyInterruptEnabled = 0;
+ if (notifyInterruptOccurred)
+ {
+ /* Here, it is finally safe to do stuff. */
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyHandler: perform async notify");
+ ProcessIncomingNotify();
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
}
- if (ndone < MAX_DONE)
- done[ndone++] = pstrdup(DatumGetName(d)->data);
}
+ }
+ else
+ {
+ /* In this path it is NOT SAFE to do much of anything, except this: */
+ notifyInterruptOccurred = 1;
+ }
+}
+
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt --
+ *
+ * This is called by the PostgresMain main loop just before waiting
+ * for a frontend command. If we are truly idle (ie, *not* inside
+ * a transaction block), then process any pending inbound notifies,
+ * and enable the signal handler to process future notifies directly.
+ *
+ * NOTE: the signal handler starts out disabled, and stays so until
+ * PostgresMain calls this the first time.
+ * --------------------------------------------------------------
+ */
+
+void
+EnableNotifyInterrupt(void)
+{
+ if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+ return; /* not really idle */
+
+ /*
+ * This code is tricky because we are communicating with a signal
+ * handler that could interrupt us at any point. If we just checked
+ * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+ * could fail to respond promptly to a signal that happens in between
+ * those two steps. (A very small time window, perhaps, but Murphy's
+ * Law says you can hit it...) Instead, we first set the enable flag,
+ * then test the occurred flag. If we see an unserviced interrupt
+ * has occurred, we re-clear the enable flag before going off to do
+ * the service work. (That prevents re-entrant invocation of
+ * ProcessIncomingNotify() if another interrupt occurs.)
+ * If an interrupt comes in between the setting and clearing of
+ * notifyInterruptEnabled, then it will have done the service
+ * work and left notifyInterruptOccurred zero, so we have to check
+ * again after clearing enable. The whole thing has to be in a loop
+ * in case another interrupt occurs while we're servicing the first.
+ * Once we get out of the loop, enable is set and we know there is no
+ * unserviced interrupt.
+ *
+ * NB: an overenthusiastic optimizing compiler could easily break this
+ * code. Hopefully, they all understand what "volatile" means these days.
+ */
+ for (;;)
+ {
+ notifyInterruptEnabled = 1;
+ if (! notifyInterruptOccurred)
+ break;
+ notifyInterruptEnabled = 0;
+ if (notifyInterruptOccurred)
+ {
+ TPRINTF(TRACE_NOTIFY,
+ "EnableNotifyInterrupt: perform async notify");
+ ProcessIncomingNotify();
+ TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+ }
+ }
+}
+
+/*
+ * --------------------------------------------------------------
+ * DisableNotifyInterrupt --
+ *
+ * This is called by the PostgresMain main loop just after receiving
+ * a frontend command. Signal handler execution of inbound notifies
+ * is disabled until the next EnableNotifyInterrupt call.
+ * --------------------------------------------------------------
+ */
+
+void
+DisableNotifyInterrupt(void)
+{
+ notifyInterruptEnabled = 0;
+}
+
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify --
+ *
+ * Deal with arriving NOTIFYs from other backends.
+ * This is called either directly from the SIGUSR2 signal handler,
+ * or the next time control reaches the outer idle loop.
+ * Scan pg_listener for arriving notifies, report them to my front end,
+ * and clear the notification field in pg_listener until next time.
+ *
+ * NOTE: since we are outside any transaction, we must create our own.
+ *
+ * Results:
+ * XXX
+ *
+ * --------------------------------------------------------------
+ */
+static void
+ProcessIncomingNotify(void)
+{
+ Relation lRel;
+ TupleDesc tdesc;
+ ScanKeyData key[1];
+ HeapScanDesc sRel;
+ HeapTuple lTuple,
+ rTuple;
+ Datum d,
+ value[Natts_pg_listener];
+ char repl[Natts_pg_listener],
+ nulls[Natts_pg_listener];
+ bool isnull;
+ char *relname;
+ int32 sourcePID;
+
+ TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
+ PS_SET_STATUS("async_notify");
+
+ notifyInterruptOccurred = 0;
+
+ StartTransactionCommand();
- rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
- heap_replace(lRel, &lTuple->t_ctid, rTuple);
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ tdesc = RelationGetDescr(lRel);
+
+ /* Scan only entries with my listenerPID */
+ ScanKeyEntryInitialize(&key[0], 0,
+ Anum_pg_listener_pid,
+ F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
- /* notifying the front end */
- TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
- DatumGetName(d)->data);
+ /* Prepare data for rewriting 0 into notification field */
+ nulls[0] = nulls[1] = nulls[2] = ' ';
+ repl[0] = repl[1] = repl[2] = ' ';
+ repl[Anum_pg_listener_notify - 1] = 'r';
+ value[0] = value[1] = value[2] = (Datum) 0;
+ value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
- if (whereToSendOutput == Remote)
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ {
+ d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
+ sourcePID = DatumGetInt32(d);
+ if (sourcePID != 0)
{
- pq_putnchar("A", 1);
- pq_putint((int32) MyProcPid, sizeof(int32));
- pq_putstr(DatumGetName(d)->data);
- pq_flush();
+ d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+ relname = (char *) DatumGetPointer(d);
+ /* Notify the frontend */
+ TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
+ relname, (int) sourcePID);
+ NotifyMyFrontEnd(relname, sourcePID);
+ /* Rewrite the tuple with 0 in notification column */
+ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+ heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
+ /*
+ * We do not do RelationUnsetLockForWrite(lRel) here, because the
+ * transaction is about to be committed anyway.
+ */
heap_close(lRel);
- TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
+ CommitTransactionCommand();
+
+ /* Must flush the notify messages to ensure frontend gets them promptly. */
+ pq_flush();
+
+ PS_SET_STATUS("idle");
+ TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
}
+/* Send NOTIFY message to my front end. */
+
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+ if (whereToSendOutput == Remote)
+ {
+ pq_putnchar("A", 1);
+ pq_putint(listenerPID, sizeof(int32));
+ pq_putstr(relname);
+ /* NOTE: we do not do pq_flush() here. For a self-notify, it will
+ * happen at the end of the transaction, and for incoming notifies
+ * ProcessIncomingNotify will do it after finding all the notifies.
+ */
+ }
+ else
+ {
+ elog(NOTICE, "NOTIFY for %s", relname);
+ }
+}
+
+/* Does pendingNotifies include the given relname?
+ *
+ * NB: not called unless pendingNotifies != NULL.
+ */
+
static int
AsyncExistsPendingNotify(char *relname)
{
@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname)
return 0;
}
+/* Clear the pendingNotifies list. */
+
static void
-ClearPendingNotify()
+ClearPendingNotifies()
{
Dlelem *p;
- while ((p = DLRemHead(pendingNotifies)) != NULL)
- free(DLE_VAL(p));
+ if (pendingNotifies)
+ {
+ /* Since the referenced strings are malloc'd, we have to scan the
+ * list and delete them individually. If we used palloc for the
+ * strings then we could just do DLFreeList to get rid of both
+ * the list nodes and the list base...
+ */
+ while ((p = DLRemHead(pendingNotifies)) != NULL)
+ {
+ free(DLE_VAL(p));
+ DLFreeElem(p);
+ }
+ DLFreeList(pendingNotifies);
+ pendingNotifies = NULL;
+ }
}