diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 61 |
1 files changed, 43 insertions, 18 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 490c84dc199..23444f2a800 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -116,7 +116,7 @@ * frontend during startup.) The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies. * - * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS) + * The amount of shared memory used for notify management (notify_buffers) * can be varied without affecting anything but performance. The maximum * amount of notification data that can be queued at one time is determined * by max_notify_queue_pages GUC. @@ -148,6 +148,7 @@ #include "storage/sinval.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/snapmgr.h" @@ -234,7 +235,7 @@ typedef struct QueuePosition * * Resist the temptation to make this really large. While that would save * work in some places, it would add cost in others. In particular, this - * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends + * should likely be less than notify_buffers, to ensure that backends * catch up before the pages they'll need to read fall out of SLRU cache. */ #define QUEUE_CLEANUP_DELAY 4 @@ -266,9 +267,10 @@ typedef struct QueueBackendStatus * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends * can change the tail pointers. * - * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. + * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as + * the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need multiple locks, we first get - * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock. + * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. * * Each backend uses the backend[] array entry with index equal to its * BackendId (which can range from 1 to MaxBackends). We rely on this to make @@ -492,7 +494,7 @@ AsyncShmemSize(void) size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); - size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(notify_buffers, 0)); return size; } @@ -541,8 +543,8 @@ AsyncShmemInit(void) * names are used in order to avoid wraparound. */ NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; - SimpleLruInit(NotifyCtl, "notify", NUM_NOTIFY_BUFFERS, 0, - NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, + SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0, + "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, SYNC_HANDLER_NONE, true); if (!found) @@ -1356,7 +1358,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * Eventually we will return NULL indicating all is done. * * We are holding NotifyQueueLock already from the caller and grab - * NotifySLRULock locally in this function. + * page specific SLRU bank lock locally in this function. */ static ListCell * asyncQueueAddEntries(ListCell *nextNotify) @@ -1366,9 +1368,7 @@ asyncQueueAddEntries(ListCell *nextNotify) int64 pageno; int offset; int slotno; - - /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ - LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); + LWLock *prevlock; /* * We work with a local copy of QUEUE_HEAD, which we write back to shared @@ -1389,6 +1389,11 @@ asyncQueueAddEntries(ListCell *nextNotify) * page should be initialized already, so just fetch it. */ pageno = QUEUE_POS_PAGE(queue_head); + prevlock = SimpleLruGetBankLock(NotifyCtl, pageno); + + /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ + LWLockAcquire(prevlock, LW_EXCLUSIVE); + if (QUEUE_POS_IS_ZERO(queue_head)) slotno = SimpleLruZeroPage(NotifyCtl, pageno); else @@ -1434,6 +1439,17 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Advance queue_head appropriately, and detect if page is full */ if (asyncQueueAdvance(&(queue_head), qe.length)) { + LWLock *lock; + + pageno = QUEUE_POS_PAGE(queue_head); + lock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (lock != prevlock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + /* * Page is full, so we're done here, but first fill the next page * with zeroes. The reason to do this is to ensure that slru.c's @@ -1460,7 +1476,7 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Success, so update the global QUEUE_HEAD */ QUEUE_HEAD = queue_head; - LWLockRelease(NotifySLRULock); + LWLockRelease(prevlock); return nextNotify; } @@ -1931,9 +1947,9 @@ asyncQueueReadAllNotifications(void) /* * We copy the data from SLRU into a local buffer, so as to avoid - * holding the NotifySLRULock while we are examining the entries - * and possibly transmitting them to our frontend. Copy only the - * part of the page we will actually inspect. + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. */ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, InvalidTransactionId); @@ -1953,7 +1969,7 @@ asyncQueueReadAllNotifications(void) NotifyCtl->shared->page_buffer[slotno] + curoffset, copysize); /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(NotifySLRULock); + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); /* * Process messages up to the stop position, end of page, or an @@ -1994,7 +2010,7 @@ asyncQueueReadAllNotifications(void) * * The current page must have been fetched into page_buffer from shared * memory. (We could access the page right in shared memory, but that - * would imply holding the NotifySLRULock throughout this routine.) + * would imply holding the SLRU bank lock throughout this routine.) * * We stop if we reach the "stop" position, or reach a notification from an * uncommitted transaction, or reach the end of the page. @@ -2147,7 +2163,7 @@ asyncQueueAdvanceTail(void) if (asyncQueuePagePrecedes(oldtailpage, boundary)) { /* - * SimpleLruTruncate() will ask for NotifySLRULock but will also + * SimpleLruTruncate() will ask for SLRU bank locks but will also * release the lock again. */ SimpleLruTruncate(NotifyCtl, newtailpage); @@ -2378,3 +2394,12 @@ ClearPendingActionsAndNotifies(void) pendingActions = NULL; pendingNotifies = NULL; } + +/* + * GUC check_hook for notify_buffers + */ +bool +check_notify_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("notify_buffers", newval); +} |