aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c89
1 files changed, 42 insertions, 47 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 23444f2a800..bbea5abd27a 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -247,7 +247,7 @@ typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
- BackendId nextListener; /* id of next listener, or InvalidBackendId */
+ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
@@ -273,13 +273,12 @@ typedef struct QueueBackendStatus
* NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
*
* Each backend uses the backend[] array entry with index equal to its
- * BackendId (which can range from 1 to MaxBackends). We rely on this to make
- * SendProcSignal fast.
+ * ProcNumber. We rely on this to make SendProcSignal fast.
*
* The backend[] array entries for actively-listening backends are threaded
* together using firstListener and the nextListener links, so that we can
* scan them without having to iterate over inactive entries. We keep this
- * list in order by BackendId so that the scan is cache-friendly when there
+ * list in order by ProcNumber so that the scan is cache-friendly when there
* are many active entries.
*/
typedef struct AsyncQueueControl
@@ -289,10 +288,10 @@ typedef struct AsyncQueueControl
* listening backend */
int stopPage; /* oldest unrecycled page; must be <=
* tail.page */
- BackendId firstListener; /* id of first listener, or InvalidBackendId */
+ ProcNumber firstListener; /* id of first listener, or
+ * INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
- /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
@@ -491,7 +490,7 @@ AsyncShmemSize(void)
Size size;
/* This had better match AsyncShmemInit */
- size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
@@ -510,11 +509,8 @@ AsyncShmemInit(void)
/*
* Create or attach to the AsyncQueueControl structure.
- *
- * The used entries in the backend[] array run from 1 to MaxBackends; the
- * zero'th entry is unused but must be allocated.
*/
- size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
asyncQueueControl = (AsyncQueueControl *)
@@ -526,14 +522,13 @@ AsyncShmemInit(void)
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
QUEUE_STOP_PAGE = 0;
- QUEUE_FIRST_LISTENER = InvalidBackendId;
+ QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
asyncQueueControl->lastQueueFillWarn = 0;
- /* zero'th entry won't be used, but let's initialize it anyway */
- for (int i = 0; i <= MaxBackends; i++)
+ for (int i = 0; i < MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
- QUEUE_NEXT_LISTENER(i) = InvalidBackendId;
+ QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -1050,7 +1045,7 @@ Exec_ListenPreCommit(void)
{
QueuePosition head;
QueuePosition max;
- BackendId prevListener;
+ ProcNumber prevListener;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -1095,28 +1090,28 @@ Exec_ListenPreCommit(void)
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
head = QUEUE_HEAD;
max = QUEUE_TAIL;
- prevListener = InvalidBackendId;
- for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ prevListener = INVALID_PROC_NUMBER;
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
/* Also find last listening backend before this one */
- if (i < MyBackendId)
+ if (i < MyProcNumber)
prevListener = i;
}
- QUEUE_BACKEND_POS(MyBackendId) = max;
- QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
- QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
+ QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
/* Insert backend into list of listeners at correct position */
- if (prevListener > 0)
+ if (prevListener != INVALID_PROC_NUMBER)
{
- QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
- QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+ QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
}
else
{
- QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
- QUEUE_FIRST_LISTENER = MyBackendId;
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyProcNumber;
}
LWLockRelease(NotifyQueueLock);
@@ -1248,23 +1243,23 @@ asyncQueueUnregister(void)
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
/* Mark our entry as invalid */
- QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
- QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
+ QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
+ QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
/* and remove it from the list */
- if (QUEUE_FIRST_LISTENER == MyBackendId)
- QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId);
+ if (QUEUE_FIRST_LISTENER == MyProcNumber)
+ QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
else
{
- for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
- if (QUEUE_NEXT_LISTENER(i) == MyBackendId)
+ if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
{
- QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId);
+ QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
break;
}
}
}
- QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId;
+ QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
LWLockRelease(NotifyQueueLock);
/* mark ourselves as no longer listed in the global array */
@@ -1549,7 +1544,7 @@ asyncQueueFillWarning(void)
QueuePosition min = QUEUE_HEAD;
int32 minPid = InvalidPid;
- for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
@@ -1580,7 +1575,7 @@ asyncQueueFillWarning(void)
* behind. Waken them anyway if they're far enough behind, so that they'll
* advance their queue position pointers, allowing the global tail to advance.
*
- * Since we know the BackendId and the Pid the signaling is quite cheap.
+ * Since we know the ProcNumber and the Pid the signaling is quite cheap.
*
* This is called during CommitTransaction(), so it's important for it
* to have very low probability of failure.
@@ -1589,7 +1584,7 @@ static void
SignalBackends(void)
{
int32 *pids;
- BackendId *ids;
+ ProcNumber *procnos;
int count;
/*
@@ -1601,11 +1596,11 @@ SignalBackends(void)
* preallocate the arrays? They're not that large, though.
*/
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
+ procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
int32 pid = QUEUE_BACKEND_PID(i);
QueuePosition pos;
@@ -1633,7 +1628,7 @@ SignalBackends(void)
}
/* OK, need to signal this one */
pids[count] = pid;
- ids[count] = i;
+ procnos[count] = i;
count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1659,12 +1654,12 @@ SignalBackends(void)
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
pfree(pids);
- pfree(ids);
+ pfree(procnos);
}
/*
@@ -1872,8 +1867,8 @@ asyncQueueReadAllNotifications(void)
/* Fetch current state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
- Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
- pos = QUEUE_BACKEND_POS(MyBackendId);
+ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+ pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
@@ -1995,7 +1990,7 @@ asyncQueueReadAllNotifications(void)
{
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = pos;
+ QUEUE_BACKEND_POS(MyProcNumber) = pos;
LWLockRelease(NotifyQueueLock);
}
PG_END_TRY();
@@ -2142,7 +2137,7 @@ asyncQueueAdvanceTail(void)
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
- for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));