diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 227 |
1 files changed, 120 insertions, 107 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 1eb29dcc99a..5fda53d02c6 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.77 2001/03/22 03:59:21 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.78 2001/06/12 05:55:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -80,7 +80,6 @@ #include "access/heapam.h" #include "catalog/catname.h" -#include "catalog/indexing.h" #include "catalog/pg_listener.h" #include "commands/async.h" #include "lib/dllist.h" @@ -195,11 +194,12 @@ void Async_Listen(char *relname, int pid) { Relation lRel; + HeapScanDesc scan; HeapTuple tuple; Datum values[Natts_pg_listener]; char nulls[Natts_pg_listener]; int i; - TupleDesc tupDesc; + bool alreadyListener = false; if (Trace_notify) elog(DEBUG, "Async_Listen: %s", relname); @@ -207,12 +207,23 @@ Async_Listen(char *relname, int pid) lRel = heap_openr(ListenerRelationName, AccessExclusiveLock); /* Detect whether we are already listening on this relname */ - if (SearchSysCacheExists(LISTENREL, - Int32GetDatum(pid), - PointerGetDatum(relname), - 0, 0)) + scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL); + while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) + { + Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); + + if (listener->listenerpid == pid && + strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) + { + alreadyListener = true; + /* No need to scan the rest of the table */ + break; + } + } + heap_endscan(scan); + + if (alreadyListener) { - /* No need to scan the rest of the table */ heap_close(lRel, AccessExclusiveLock); elog(NOTICE, "Async_Listen: We are already listening on %s", relname); return; @@ -233,9 +244,10 @@ Async_Listen(char *relname, int pid) values[i++] = (Datum) pid; values[i++] = (Datum) 0; /* no notifies pending */ - tupDesc = lRel->rd_att; - tuple = heap_formtuple(tupDesc, values, nulls); + tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); heap_insert(lRel, tuple); + +#ifdef NOT_USED /* currently there are no indexes */ if (RelationGetForm(lRel)->relhasindex) { Relation idescs[Num_pg_listener_indices]; @@ -244,6 +256,7 @@ Async_Listen(char *relname, int pid) CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple); CatalogCloseIndices(Num_pg_listener_indices, idescs); } +#endif heap_freetuple(tuple); @@ -280,7 +293,8 @@ void Async_Unlisten(char *relname, int pid) { Relation lRel; - HeapTuple lTuple; + HeapScanDesc scan; + HeapTuple tuple; /* Handle specially the `unlisten "*"' command */ if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) @@ -293,16 +307,26 @@ Async_Unlisten(char *relname, int pid) elog(DEBUG, "Async_Unlisten %s", relname); lRel = heap_openr(ListenerRelationName, AccessExclusiveLock); - /* Note we assume there can be only one matching tuple. */ - lTuple = SearchSysCache(LISTENREL, - Int32GetDatum(pid), - PointerGetDatum(relname), - 0, 0); - if (HeapTupleIsValid(lTuple)) + + scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL); + while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) { - simple_heap_delete(lRel, &lTuple->t_self); - ReleaseSysCache(lTuple); + Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); + + if (listener->listenerpid == pid && + strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) + { + /* Found the matching tuple, delete it */ + simple_heap_delete(lRel, &tuple->t_self); + /* + * We assume there can be only one match, so no need + * to scan the rest of the table + */ + break; + } } + heap_endscan(scan); + heap_close(lRel, AccessExclusiveLock); /* @@ -332,7 +356,7 @@ Async_UnlistenAll() { Relation lRel; TupleDesc tdesc; - HeapScanDesc sRel; + HeapScanDesc scan; HeapTuple lTuple; ScanKeyData key[1]; @@ -347,12 +371,12 @@ Async_UnlistenAll() Anum_pg_listener_pid, F_INT4EQ, Int32GetDatum(MyProcPid)); - sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key); + scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key); - while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) + while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0))) simple_heap_delete(lRel, &lTuple->t_self); - heap_endscan(sRel); + heap_endscan(scan); heap_close(lRel, AccessExclusiveLock); } @@ -418,16 +442,12 @@ AtCommit_Notify() { Relation lRel; TupleDesc tdesc; - HeapScanDesc sRel; + HeapScanDesc scan; HeapTuple lTuple, rTuple; - Datum d, - value[Natts_pg_listener]; + Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; - bool isnull; - char *relname; - int32 listenerPID; if (!pendingNotifies) return; /* no NOTIFY statements in this @@ -446,10 +466,6 @@ AtCommit_Notify() if (Trace_notify) elog(DEBUG, "AtCommit_Notify"); - lRel = heap_openr(ListenerRelationName, AccessExclusiveLock); - tdesc = RelationGetDescr(lRel); - sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL); - /* preset data to update notify column to MyProcPid */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; @@ -457,83 +473,81 @@ AtCommit_Notify() value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); - while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) + lRel = heap_openr(ListenerRelationName, AccessExclusiveLock); + tdesc = RelationGetDescr(lRel); + scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL); + + while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0))) { - d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull); - relname = (char *) DatumGetPointer(d); + Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); + char *relname = NameStr(listener->relname); + int32 listenerPID = listener->listenerpid; - if (AsyncExistsPendingNotify(relname)) + if (! AsyncExistsPendingNotify(relname)) + continue; + + if (listenerPID == MyProcPid) { - d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull); - listenerPID = DatumGetInt32(d); + /* + * Self-notify: no need to bother with table update. + * Indeed, we *must not* clear the notification field in + * this path, or we could lose an outside notify, which'd + * be bad for applications that ignore self-notify messages. + */ - if (listenerPID == MyProcPid) - { + if (Trace_notify) + elog(DEBUG, "AtCommit_Notify: notifying self"); + NotifyMyFrontEnd(relname, listenerPID); + } + else + { + if (Trace_notify) + elog(DEBUG, "AtCommit_Notify: notifying pid %d", + listenerPID); + + /* + * If someone has already notified this listener, we don't + * bother modifying the table, but we do still send a + * SIGUSR2 signal, just in case that backend missed the + * earlier signal for some reason. It's OK to send the + * signal first, because the other guy can't read + * pg_listener until we unlock it. + */ + if (kill(listenerPID, SIGUSR2) < 0) + { /* - * Self-notify: no need to bother with table update. - * Indeed, we *must not* clear the notification field in - * this path, or we could lose an outside notify, which'd - * be bad for applications that ignore self-notify - * messages. + * Get rid of pg_listener entry if it refers to a PID + * that no longer exists. Presumably, that backend + * crashed without deleting its pg_listener entries. + * This code used to only delete the entry if + * errno==ESRCH, but as far as I can see we should + * just do it for any failure (certainly at least for + * EPERM too...) */ - - if (Trace_notify) - elog(DEBUG, "AtCommit_Notify: notifying self"); - - NotifyMyFrontEnd(relname, listenerPID); + simple_heap_delete(lRel, &lTuple->t_self); } - else + else if (listener->notification == 0) { - if (Trace_notify) - elog(DEBUG, "AtCommit_Notify: notifying pid %d", listenerPID); + rTuple = heap_modifytuple(lTuple, lRel, + value, nulls, repl); + simple_heap_update(lRel, &lTuple->t_self, rTuple); - /* - * If someone has already notified this listener, we don't - * bother modifying the table, but we do still send a - * SIGUSR2 signal, just in case that backend missed the - * earlier signal for some reason. It's OK to send the - * signal first, because the other guy can't read - * pg_listener until we unlock it. - */ - if (kill(listenerPID, SIGUSR2) < 0) +#ifdef NOT_USED /* currently there are no indexes */ + if (RelationGetForm(lRel)->relhasindex) { + Relation idescs[Num_pg_listener_indices]; - /* - * Get rid of pg_listener entry if it refers to a PID - * that no longer exists. Presumably, that backend - * crashed without deleting its pg_listener entries. - * This code used to only delete the entry if - * errno==ESRCH, but as far as I can see we should - * just do it for any failure (certainly at least for - * EPERM too...) - */ - simple_heap_delete(lRel, &lTuple->t_self); - } - else - { - d = heap_getattr(lTuple, Anum_pg_listener_notify, - tdesc, &isnull); - if (DatumGetInt32(d) == 0) - { - rTuple = heap_modifytuple(lTuple, lRel, - value, nulls, repl); - simple_heap_update(lRel, &lTuple->t_self, rTuple); - if (RelationGetForm(lRel)->relhasindex) - { - Relation idescs[Num_pg_listener_indices]; - - CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs); - CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple); - CatalogCloseIndices(Num_pg_listener_indices, idescs); - } - } + CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs); + CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple); + CatalogCloseIndices(Num_pg_listener_indices, idescs); } +#endif } } } - heap_endscan(sRel); + heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it @@ -745,16 +759,12 @@ ProcessIncomingNotify(void) Relation lRel; TupleDesc tdesc; ScanKeyData key[1]; - HeapScanDesc sRel; + HeapScanDesc scan; HeapTuple lTuple, rTuple; - Datum d, - value[Natts_pg_listener]; + Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; - bool isnull; - char *relname; - int32 sourcePID; if (Trace_notify) elog(DEBUG, "ProcessIncomingNotify"); @@ -773,7 +783,7 @@ ProcessIncomingNotify(void) Anum_pg_listener_pid, F_INT4EQ, Int32GetDatum(MyProcPid)); - sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key); + scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key); /* Prepare data for rewriting 0 into notification field */ nulls[0] = nulls[1] = nulls[2] = ' '; @@ -782,14 +792,14 @@ ProcessIncomingNotify(void) value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); - while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) + while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0))) { - d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull); - sourcePID = DatumGetInt32(d); + Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); + char *relname = NameStr(listener->relname); + int32 sourcePID = listener->notification; + if (sourcePID != 0) { - d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull); - relname = (char *) DatumGetPointer(d); /* Notify the frontend */ if (Trace_notify) @@ -800,6 +810,8 @@ ProcessIncomingNotify(void) /* Rewrite the tuple with 0 in notification column */ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple); + +#ifdef NOT_USED /* currently there are no indexes */ if (RelationGetForm(lRel)->relhasindex) { Relation idescs[Num_pg_listener_indices]; @@ -808,9 +820,10 @@ ProcessIncomingNotify(void) CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple); CatalogCloseIndices(Num_pg_listener_indices, idescs); } +#endif } } - heap_endscan(sRel); + heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it @@ -876,7 +889,7 @@ AsyncExistsPendingNotify(char *relname) p = DLGetSucc(p)) { /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ - if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN)) + if (strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN) == 0) return 1; } |