aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c227
1 files changed, 120 insertions, 107 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 1eb29dcc99a..5fda53d02c6 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.77 2001/03/22 03:59:21 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.78 2001/06/12 05:55:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -80,7 +80,6 @@
#include "access/heapam.h"
#include "catalog/catname.h"
-#include "catalog/indexing.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "lib/dllist.h"
@@ -195,11 +194,12 @@ void
Async_Listen(char *relname, int pid)
{
Relation lRel;
+ HeapScanDesc scan;
HeapTuple tuple;
Datum values[Natts_pg_listener];
char nulls[Natts_pg_listener];
int i;
- TupleDesc tupDesc;
+ bool alreadyListener = false;
if (Trace_notify)
elog(DEBUG, "Async_Listen: %s", relname);
@@ -207,12 +207,23 @@ Async_Listen(char *relname, int pid)
lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
/* Detect whether we are already listening on this relname */
- if (SearchSysCacheExists(LISTENREL,
- Int32GetDatum(pid),
- PointerGetDatum(relname),
- 0, 0))
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+ {
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ {
+ alreadyListener = true;
+ /* No need to scan the rest of the table */
+ break;
+ }
+ }
+ heap_endscan(scan);
+
+ if (alreadyListener)
{
- /* No need to scan the rest of the table */
heap_close(lRel, AccessExclusiveLock);
elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
return;
@@ -233,9 +244,10 @@ Async_Listen(char *relname, int pid)
values[i++] = (Datum) pid;
values[i++] = (Datum) 0; /* no notifies pending */
- tupDesc = lRel->rd_att;
- tuple = heap_formtuple(tupDesc, values, nulls);
+ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
heap_insert(lRel, tuple);
+
+#ifdef NOT_USED /* currently there are no indexes */
if (RelationGetForm(lRel)->relhasindex)
{
Relation idescs[Num_pg_listener_indices];
@@ -244,6 +256,7 @@ Async_Listen(char *relname, int pid)
CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
heap_freetuple(tuple);
@@ -280,7 +293,8 @@ void
Async_Unlisten(char *relname, int pid)
{
Relation lRel;
- HeapTuple lTuple;
+ HeapScanDesc scan;
+ HeapTuple tuple;
/* Handle specially the `unlisten "*"' command */
if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
@@ -293,16 +307,26 @@ Async_Unlisten(char *relname, int pid)
elog(DEBUG, "Async_Unlisten %s", relname);
lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
- /* Note we assume there can be only one matching tuple. */
- lTuple = SearchSysCache(LISTENREL,
- Int32GetDatum(pid),
- PointerGetDatum(relname),
- 0, 0);
- if (HeapTupleIsValid(lTuple))
+
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
{
- simple_heap_delete(lRel, &lTuple->t_self);
- ReleaseSysCache(lTuple);
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ {
+ /* Found the matching tuple, delete it */
+ simple_heap_delete(lRel, &tuple->t_self);
+ /*
+ * We assume there can be only one match, so no need
+ * to scan the rest of the table
+ */
+ break;
+ }
}
+ heap_endscan(scan);
+
heap_close(lRel, AccessExclusiveLock);
/*
@@ -332,7 +356,7 @@ Async_UnlistenAll()
{
Relation lRel;
TupleDesc tdesc;
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple;
ScanKeyData key[1];
@@ -347,12 +371,12 @@ Async_UnlistenAll()
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
simple_heap_delete(lRel, &lTuple->t_self);
- heap_endscan(sRel);
+ heap_endscan(scan);
heap_close(lRel, AccessExclusiveLock);
}
@@ -418,16 +442,12 @@ AtCommit_Notify()
{
Relation lRel;
TupleDesc tdesc;
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
- Datum d,
- value[Natts_pg_listener];
+ Datum value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
- bool isnull;
- char *relname;
- int32 listenerPID;
if (!pendingNotifies)
return; /* no NOTIFY statements in this
@@ -446,10 +466,6 @@ AtCommit_Notify()
if (Trace_notify)
elog(DEBUG, "AtCommit_Notify");
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
- tdesc = RelationGetDescr(lRel);
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
-
/* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' ';
@@ -457,83 +473,81 @@ AtCommit_Notify()
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ tdesc = RelationGetDescr(lRel);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
- relname = (char *) DatumGetPointer(d);
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 listenerPID = listener->listenerpid;
- if (AsyncExistsPendingNotify(relname))
+ if (! AsyncExistsPendingNotify(relname))
+ continue;
+
+ if (listenerPID == MyProcPid)
{
- d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
- listenerPID = DatumGetInt32(d);
+ /*
+ * Self-notify: no need to bother with table update.
+ * Indeed, we *must not* clear the notification field in
+ * this path, or we could lose an outside notify, which'd
+ * be bad for applications that ignore self-notify messages.
+ */
- if (listenerPID == MyProcPid)
- {
+ if (Trace_notify)
+ elog(DEBUG, "AtCommit_Notify: notifying self");
+ NotifyMyFrontEnd(relname, listenerPID);
+ }
+ else
+ {
+ if (Trace_notify)
+ elog(DEBUG, "AtCommit_Notify: notifying pid %d",
+ listenerPID);
+
+ /*
+ * If someone has already notified this listener, we don't
+ * bother modifying the table, but we do still send a
+ * SIGUSR2 signal, just in case that backend missed the
+ * earlier signal for some reason. It's OK to send the
+ * signal first, because the other guy can't read
+ * pg_listener until we unlock it.
+ */
+ if (kill(listenerPID, SIGUSR2) < 0)
+ {
/*
- * Self-notify: no need to bother with table update.
- * Indeed, we *must not* clear the notification field in
- * this path, or we could lose an outside notify, which'd
- * be bad for applications that ignore self-notify
- * messages.
+ * Get rid of pg_listener entry if it refers to a PID
+ * that no longer exists. Presumably, that backend
+ * crashed without deleting its pg_listener entries.
+ * This code used to only delete the entry if
+ * errno==ESRCH, but as far as I can see we should
+ * just do it for any failure (certainly at least for
+ * EPERM too...)
*/
-
- if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify: notifying self");
-
- NotifyMyFrontEnd(relname, listenerPID);
+ simple_heap_delete(lRel, &lTuple->t_self);
}
- else
+ else if (listener->notification == 0)
{
- if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify: notifying pid %d", listenerPID);
+ rTuple = heap_modifytuple(lTuple, lRel,
+ value, nulls, repl);
+ simple_heap_update(lRel, &lTuple->t_self, rTuple);
- /*
- * If someone has already notified this listener, we don't
- * bother modifying the table, but we do still send a
- * SIGUSR2 signal, just in case that backend missed the
- * earlier signal for some reason. It's OK to send the
- * signal first, because the other guy can't read
- * pg_listener until we unlock it.
- */
- if (kill(listenerPID, SIGUSR2) < 0)
+#ifdef NOT_USED /* currently there are no indexes */
+ if (RelationGetForm(lRel)->relhasindex)
{
+ Relation idescs[Num_pg_listener_indices];
- /*
- * Get rid of pg_listener entry if it refers to a PID
- * that no longer exists. Presumably, that backend
- * crashed without deleting its pg_listener entries.
- * This code used to only delete the entry if
- * errno==ESRCH, but as far as I can see we should
- * just do it for any failure (certainly at least for
- * EPERM too...)
- */
- simple_heap_delete(lRel, &lTuple->t_self);
- }
- else
- {
- d = heap_getattr(lTuple, Anum_pg_listener_notify,
- tdesc, &isnull);
- if (DatumGetInt32(d) == 0)
- {
- rTuple = heap_modifytuple(lTuple, lRel,
- value, nulls, repl);
- simple_heap_update(lRel, &lTuple->t_self, rTuple);
- if (RelationGetForm(lRel)->relhasindex)
- {
- Relation idescs[Num_pg_listener_indices];
-
- CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
- CatalogCloseIndices(Num_pg_listener_indices, idescs);
- }
- }
+ CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
+ CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
+ CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
}
}
}
- heap_endscan(sRel);
+ heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
@@ -745,16 +759,12 @@ ProcessIncomingNotify(void)
Relation lRel;
TupleDesc tdesc;
ScanKeyData key[1];
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
- Datum d,
- value[Natts_pg_listener];
+ Datum value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
- bool isnull;
- char *relname;
- int32 sourcePID;
if (Trace_notify)
elog(DEBUG, "ProcessIncomingNotify");
@@ -773,7 +783,7 @@ ProcessIncomingNotify(void)
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
/* Prepare data for rewriting 0 into notification field */
nulls[0] = nulls[1] = nulls[2] = ' ';
@@ -782,14 +792,14 @@ ProcessIncomingNotify(void)
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
- sourcePID = DatumGetInt32(d);
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 sourcePID = listener->notification;
+
if (sourcePID != 0)
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
- relname = (char *) DatumGetPointer(d);
/* Notify the frontend */
if (Trace_notify)
@@ -800,6 +810,8 @@ ProcessIncomingNotify(void)
/* Rewrite the tuple with 0 in notification column */
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
+
+#ifdef NOT_USED /* currently there are no indexes */
if (RelationGetForm(lRel)->relhasindex)
{
Relation idescs[Num_pg_listener_indices];
@@ -808,9 +820,10 @@ ProcessIncomingNotify(void)
CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
}
}
- heap_endscan(sRel);
+ heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
@@ -876,7 +889,7 @@ AsyncExistsPendingNotify(char *relname)
p = DLGetSucc(p))
{
/* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
- if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
+ if (strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN) == 0)
return 1;
}