diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 89 |
1 files changed, 42 insertions, 47 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 23444f2a800..bbea5abd27a 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -247,7 +247,7 @@ typedef struct QueueBackendStatus { int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ - BackendId nextListener; /* id of next listener, or InvalidBackendId */ + ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ } QueueBackendStatus; @@ -273,13 +273,12 @@ typedef struct QueueBackendStatus * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. * * Each backend uses the backend[] array entry with index equal to its - * BackendId (which can range from 1 to MaxBackends). We rely on this to make - * SendProcSignal fast. + * ProcNumber. We rely on this to make SendProcSignal fast. * * The backend[] array entries for actively-listening backends are threaded * together using firstListener and the nextListener links, so that we can * scan them without having to iterate over inactive entries. We keep this - * list in order by BackendId so that the scan is cache-friendly when there + * list in order by ProcNumber so that the scan is cache-friendly when there * are many active entries. */ typedef struct AsyncQueueControl @@ -289,10 +288,10 @@ typedef struct AsyncQueueControl * listening backend */ int stopPage; /* oldest unrecycled page; must be <= * tail.page */ - BackendId firstListener; /* id of first listener, or InvalidBackendId */ + ProcNumber firstListener; /* id of first listener, or + * INVALID_PROC_NUMBER */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; - /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ } AsyncQueueControl; static AsyncQueueControl *asyncQueueControl; @@ -491,7 +490,7 @@ AsyncShmemSize(void) Size size; /* This had better match AsyncShmemInit */ - size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); + size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); size = add_size(size, SimpleLruShmemSize(notify_buffers, 0)); @@ -510,11 +509,8 @@ AsyncShmemInit(void) /* * Create or attach to the AsyncQueueControl structure. - * - * The used entries in the backend[] array run from 1 to MaxBackends; the - * zero'th entry is unused but must be allocated. */ - size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); + size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); asyncQueueControl = (AsyncQueueControl *) @@ -526,14 +522,13 @@ AsyncShmemInit(void) SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); QUEUE_STOP_PAGE = 0; - QUEUE_FIRST_LISTENER = InvalidBackendId; + QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; - /* zero'th entry won't be used, but let's initialize it anyway */ - for (int i = 0; i <= MaxBackends; i++) + for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; QUEUE_BACKEND_DBOID(i) = InvalidOid; - QUEUE_NEXT_LISTENER(i) = InvalidBackendId; + QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); } } @@ -1050,7 +1045,7 @@ Exec_ListenPreCommit(void) { QueuePosition head; QueuePosition max; - BackendId prevListener; + ProcNumber prevListener; /* * Nothing to do if we are already listening to something, nor if we @@ -1095,28 +1090,28 @@ Exec_ListenPreCommit(void) LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); head = QUEUE_HEAD; max = QUEUE_TAIL; - prevListener = InvalidBackendId; - for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + prevListener = INVALID_PROC_NUMBER; + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); /* Also find last listening backend before this one */ - if (i < MyBackendId) + if (i < MyProcNumber) prevListener = i; } - QUEUE_BACKEND_POS(MyBackendId) = max; - QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; - QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; + QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; + QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ - if (prevListener > 0) + if (prevListener != INVALID_PROC_NUMBER) { - QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener); - QUEUE_NEXT_LISTENER(prevListener) = MyBackendId; + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); + QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; } else { - QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER; - QUEUE_FIRST_LISTENER = MyBackendId; + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; + QUEUE_FIRST_LISTENER = MyProcNumber; } LWLockRelease(NotifyQueueLock); @@ -1248,23 +1243,23 @@ asyncQueueUnregister(void) */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); /* Mark our entry as invalid */ - QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; - QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; + QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; + QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; /* and remove it from the list */ - if (QUEUE_FIRST_LISTENER == MyBackendId) - QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId); + if (QUEUE_FIRST_LISTENER == MyProcNumber) + QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); else { - for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { - if (QUEUE_NEXT_LISTENER(i) == MyBackendId) + if (QUEUE_NEXT_LISTENER(i) == MyProcNumber) { - QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId); + QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber); break; } } } - QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId; + QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER; LWLockRelease(NotifyQueueLock); /* mark ourselves as no longer listed in the global array */ @@ -1549,7 +1544,7 @@ asyncQueueFillWarning(void) QueuePosition min = QUEUE_HEAD; int32 minPid = InvalidPid; - for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); @@ -1580,7 +1575,7 @@ asyncQueueFillWarning(void) * behind. Waken them anyway if they're far enough behind, so that they'll * advance their queue position pointers, allowing the global tail to advance. * - * Since we know the BackendId and the Pid the signaling is quite cheap. + * Since we know the ProcNumber and the Pid the signaling is quite cheap. * * This is called during CommitTransaction(), so it's important for it * to have very low probability of failure. @@ -1589,7 +1584,7 @@ static void SignalBackends(void) { int32 *pids; - BackendId *ids; + ProcNumber *procnos; int count; /* @@ -1601,11 +1596,11 @@ SignalBackends(void) * preallocate the arrays? They're not that large, though. */ pids = (int32 *) palloc(MaxBackends * sizeof(int32)); - ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); + procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); count = 0; LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { int32 pid = QUEUE_BACKEND_PID(i); QueuePosition pos; @@ -1633,7 +1628,7 @@ SignalBackends(void) } /* OK, need to signal this one */ pids[count] = pid; - ids[count] = i; + procnos[count] = i; count++; } LWLockRelease(NotifyQueueLock); @@ -1659,12 +1654,12 @@ SignalBackends(void) * NotifyQueueLock; which is unlikely but certainly possible. So we * just log a low-level debug message if it happens. */ - if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); } pfree(pids); - pfree(ids); + pfree(procnos); } /* @@ -1872,8 +1867,8 @@ asyncQueueReadAllNotifications(void) /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ - Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); - pos = QUEUE_BACKEND_POS(MyBackendId); + Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); + pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; LWLockRelease(NotifyQueueLock); @@ -1995,7 +1990,7 @@ asyncQueueReadAllNotifications(void) { /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyBackendId) = pos; + QUEUE_BACKEND_POS(MyProcNumber) = pos; LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2142,7 +2137,7 @@ asyncQueueAdvanceTail(void) */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; - for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |