aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c61
1 files changed, 43 insertions, 18 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 490c84dc199..23444f2a800 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -116,7 +116,7 @@
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies.
*
- * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
+ * The amount of shared memory used for notify management (notify_buffers)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by max_notify_queue_pages GUC.
@@ -148,6 +148,7 @@
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@@ -234,7 +235,7 @@ typedef struct QueuePosition
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
- * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
+ * should likely be less than notify_buffers, to ensure that backends
* catch up before the pages they'll need to read fall out of SLRU cache.
*/
#define QUEUE_CLEANUP_DELAY 4
@@ -266,9 +267,10 @@ typedef struct QueueBackendStatus
* both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
* can change the tail pointers.
*
- * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
+ * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
+ * the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get
- * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock.
+ * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
*
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
@@ -492,7 +494,7 @@ AsyncShmemSize(void)
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
- size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0));
+ size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
return size;
}
@@ -541,8 +543,8 @@ AsyncShmemInit(void)
* names are used in order to avoid wraparound.
*/
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
- SimpleLruInit(NotifyCtl, "notify", NUM_NOTIFY_BUFFERS, 0,
- NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
+ SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
+ "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
SYNC_HANDLER_NONE, true);
if (!found)
@@ -1356,7 +1358,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* Eventually we will return NULL indicating all is done.
*
* We are holding NotifyQueueLock already from the caller and grab
- * NotifySLRULock locally in this function.
+ * page specific SLRU bank lock locally in this function.
*/
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
@@ -1366,9 +1368,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
int64 pageno;
int offset;
int slotno;
-
- /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
- LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
+ LWLock *prevlock;
/*
* We work with a local copy of QUEUE_HEAD, which we write back to shared
@@ -1389,6 +1389,11 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page should be initialized already, so just fetch it.
*/
pageno = QUEUE_POS_PAGE(queue_head);
+ prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+ /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
+ LWLockAcquire(prevlock, LW_EXCLUSIVE);
+
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
@@ -1434,6 +1439,17 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Advance queue_head appropriately, and detect if page is full */
if (asyncQueueAdvance(&(queue_head), qe.length))
{
+ LWLock *lock;
+
+ pageno = QUEUE_POS_PAGE(queue_head);
+ lock = SimpleLruGetBankLock(NotifyCtl, pageno);
+ if (lock != prevlock)
+ {
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
+
/*
* Page is full, so we're done here, but first fill the next page
* with zeroes. The reason to do this is to ensure that slru.c's
@@ -1460,7 +1476,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Success, so update the global QUEUE_HEAD */
QUEUE_HEAD = queue_head;
- LWLockRelease(NotifySLRULock);
+ LWLockRelease(prevlock);
return nextNotify;
}
@@ -1931,9 +1947,9 @@ asyncQueueReadAllNotifications(void)
/*
* We copy the data from SLRU into a local buffer, so as to avoid
- * holding the NotifySLRULock while we are examining the entries
- * and possibly transmitting them to our frontend. Copy only the
- * part of the page we will actually inspect.
+ * holding the SLRU lock while we are examining the entries and
+ * possibly transmitting them to our frontend. Copy only the part
+ * of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
InvalidTransactionId);
@@ -1953,7 +1969,7 @@ asyncQueueReadAllNotifications(void)
NotifyCtl->shared->page_buffer[slotno] + curoffset,
copysize);
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
- LWLockRelease(NotifySLRULock);
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
/*
* Process messages up to the stop position, end of page, or an
@@ -1994,7 +2010,7 @@ asyncQueueReadAllNotifications(void)
*
* The current page must have been fetched into page_buffer from shared
* memory. (We could access the page right in shared memory, but that
- * would imply holding the NotifySLRULock throughout this routine.)
+ * would imply holding the SLRU bank lock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* uncommitted transaction, or reach the end of the page.
@@ -2147,7 +2163,7 @@ asyncQueueAdvanceTail(void)
if (asyncQueuePagePrecedes(oldtailpage, boundary))
{
/*
- * SimpleLruTruncate() will ask for NotifySLRULock but will also
+ * SimpleLruTruncate() will ask for SLRU bank locks but will also
* release the lock again.
*/
SimpleLruTruncate(NotifyCtl, newtailpage);
@@ -2378,3 +2394,12 @@ ClearPendingActionsAndNotifies(void)
pendingActions = NULL;
pendingNotifies = NULL;
}
+
+/*
+ * GUC check_hook for notify_buffers
+ */
+bool
+check_notify_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("notify_buffers", newval);
+}