aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/async.c97
1 files changed, 66 insertions, 31 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6cb2d445f0d..f26269b5eae 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -218,6 +218,7 @@ typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
+ BackendId nextListener; /* id of next listener, or InvalidBackendId */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
@@ -241,12 +242,19 @@ typedef struct QueueBackendStatus
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
* SendProcSignal fast.
+ *
+ * The backend[] array entries for actively-listening backends are threaded
+ * together using firstListener and the nextListener links, so that we can
+ * scan them without having to iterate over inactive entries. We keep this
+ * list in order by BackendId so that the scan is cache-friendly when there
+ * are many active entries.
*/
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* the global tail is equivalent to the pos of
* the "slowest" backend */
+ BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -256,8 +264,10 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
+#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
+#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
/*
@@ -490,16 +500,16 @@ AsyncShmemInit(void)
if (!found)
{
/* First time through, so initialize it */
- int i;
-
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ QUEUE_FIRST_LISTENER = InvalidBackendId;
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
- for (i = 0; i <= MaxBackends; i++)
+ for (int i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
+ QUEUE_NEXT_LISTENER(i) = InvalidBackendId;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -959,7 +969,7 @@ Exec_ListenPreCommit(void)
{
QueuePosition head;
QueuePosition max;
- int i;
+ BackendId prevListener;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -996,26 +1006,37 @@ Exec_ListenPreCommit(void)
* our database; any notifications it's already advanced over are surely
* committed and need not be re-examined by us. (We must consider only
* backends connected to our DB, because others will not have bothered to
- * check committed-ness of notifications in our DB.) But we only bother
- * with that if there's more than a page worth of notifications
- * outstanding, otherwise scanning all the other backends isn't worth it.
+ * check committed-ness of notifications in our DB.)
*
- * We need exclusive lock here so we can look at other backends' entries.
+ * We need exclusive lock here so we can look at other backends' entries
+ * and manipulate the list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
head = QUEUE_HEAD;
max = QUEUE_TAIL;
- if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
+ prevListener = InvalidBackendId;
+ for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
- for (i = 1; i <= MaxBackends; i++)
- {
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
- max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
- }
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ /* Also find last listening backend before this one */
+ if (i < MyBackendId)
+ prevListener = i;
}
QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
+ /* Insert backend into list of listeners at correct position */
+ if (prevListener > 0)
+ {
+ QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
+ QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
+ }
+ else
+ {
+ QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyBackendId;
+ }
LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@@ -1228,13 +1249,31 @@ asyncQueueUnregister(void)
if (!amRegisteredListener) /* nothing to do */
return;
- LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ /*
+ * Need exclusive lock here to manipulate list links.
+ */
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
/* check if entry is valid and oldest ... */
advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
+ /* and remove it from the list */
+ if (QUEUE_FIRST_LISTENER == MyBackendId)
+ QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId);
+ else
+ {
+ for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ {
+ if (QUEUE_NEXT_LISTENER(i) == MyBackendId)
+ {
+ QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId);
+ break;
+ }
+ }
+ }
+ QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */
@@ -1508,16 +1547,13 @@ asyncQueueFillWarning(void)
{
QueuePosition min = QUEUE_HEAD;
int32 minPid = InvalidPid;
- int i;
- for (i = 1; i <= MaxBackends; i++)
+ for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
- if (QUEUE_BACKEND_PID(i) != InvalidPid)
- {
- min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
- if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
- minPid = QUEUE_BACKEND_PID(i);
- }
+ Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
+ min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
+ minPid = QUEUE_BACKEND_PID(i);
}
ereport(WARNING,
@@ -1553,7 +1589,6 @@ SignalBackends(void)
int32 *pids;
BackendId *ids;
int count;
- int i;
int32 pid;
/*
@@ -1570,10 +1605,11 @@ SignalBackends(void)
count = 0;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- for (i = 1; i <= MaxBackends; i++)
+ for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
pid = QUEUE_BACKEND_PID(i);
- if (pid != InvalidPid && pid != MyProcPid)
+ Assert(pid != InvalidPid);
+ if (pid != MyProcPid)
{
QueuePosition pos = QUEUE_BACKEND_POS(i);
@@ -1588,7 +1624,7 @@ SignalBackends(void)
LWLockRelease(AsyncQueueLock);
/* Now send signals */
- for (i = 0; i < count; i++)
+ for (int i = 0; i < count; i++)
{
pid = pids[i];
@@ -2064,17 +2100,16 @@ static void
asyncQueueAdvanceTail(void)
{
QueuePosition min;
- int i;
int oldtailpage;
int newtailpage;
int boundary;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
- for (i = 1; i <= MaxBackends; i++)
+ for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
- if (QUEUE_BACKEND_PID(i) != InvalidPid)
- min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
+ min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;