aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c52
1 files changed, 39 insertions, 13 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dbcace3f93..c0763c63e2e 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -255,7 +255,7 @@ typedef struct QueueBackendStatus
* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
* entries of other backends and also change the head pointer. When holding
* both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointer.
+ * can change the tail pointers.
*
* NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get
@@ -276,6 +276,8 @@ typedef struct AsyncQueueControl
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */
+ int stopPage; /* oldest unrecycled page; must be <=
+ * tail.page */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
@@ -286,6 +288,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
+#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
@@ -537,6 +540,7 @@ AsyncShmemInit(void)
/* First time through, so initialize it */
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = InvalidBackendId;
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
@@ -1358,7 +1362,7 @@ asyncQueueIsFull(void)
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */
- boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
+ boundary = QUEUE_STOP_PAGE;
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedes(nexthead, boundary);
}
@@ -1572,6 +1576,11 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
* Return the fraction of the queue that is currently occupied.
*
* The caller must hold NotifyQueueLock in (at least) shared mode.
+ *
+ * Note: we measure the distance to the logical tail page, not the physical
+ * tail page. In some sense that's wrong, but the relative position of the
+ * physical tail is affected by details such as SLRU segment boundaries,
+ * so that a result based on that is unpleasantly unstable.
*/
static double
asyncQueueUsage(void)
@@ -2178,7 +2187,23 @@ asyncQueueAdvanceTail(void)
/* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
- /* Compute the new tail. */
+ /*
+ * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
+ * (ie, exactly match at least one backend's queue position), so it must
+ * be updated atomically with the actual computation. Since v13, we could
+ * get away with not doing it like that, but it seems prudent to keep it
+ * so.
+ *
+ * Also, because incoming backends will scan forward from QUEUE_TAIL, that
+ * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
+ * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
+ * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
+ * there are pages we can truncate but haven't yet finished doing so.
+ *
+ * For concurrency's sake, we don't want to hold NotifyQueueLock while
+ * performing SimpleLruTruncate. This is OK because no backend will try
+ * to access the pages we are in the midst of truncating.
+ */
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
@@ -2186,7 +2211,8 @@ asyncQueueAdvanceTail(void)
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
}
- oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ QUEUE_TAIL = min;
+ oldtailpage = QUEUE_STOP_PAGE;
LWLockRelease(NotifyQueueLock);
/*
@@ -2205,16 +2231,16 @@ asyncQueueAdvanceTail(void)
* release the lock again.
*/
SimpleLruTruncate(NotifyCtl, newtailpage);
- }
- /*
- * Advertise the new tail. This changes asyncQueueIsFull()'s verdict for
- * the segment immediately prior to the new tail, allowing fresh data into
- * that segment.
- */
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- QUEUE_TAIL = min;
- LWLockRelease(NotifyQueueLock);
+ /*
+ * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
+ * for the segment immediately prior to the old tail, allowing fresh
+ * data into that segment.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ QUEUE_STOP_PAGE = newtailpage;
+ LWLockRelease(NotifyQueueLock);
+ }
LWLockRelease(NotifyQueueTailLock);
}