diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/clog.c | 243 | ||||
-rw-r--r-- | src/backend/access/transam/commit_ts.c | 88 | ||||
-rw-r--r-- | src/backend/access/transam/multixact.c | 190 | ||||
-rw-r--r-- | src/backend/access/transam/slru.c | 357 | ||||
-rw-r--r-- | src/backend/access/transam/subtrans.c | 110 |
5 files changed, 739 insertions, 249 deletions
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 34f079cbb14..a787b374dac 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -3,12 +3,13 @@ * clog.c * PostgreSQL transaction-commit-log manager * - * This module replaces the old "pg_log" access code, which treated pg_log - * essentially like a relation, in that it went through the regular buffer - * manager. The problem with that was that there wasn't any good way to - * recycle storage space for transactions so old that they'll never be - * looked up again. Now we use specialized access code so that the commit - * log can be broken into relatively small, independent segments. + * This module stores two bits per transaction regarding its commit/abort + * status; the status for four transactions fit in a byte. + * + * This would be a pretty simple abstraction on top of slru.c, except that + * for performance reasons we allow multiple transactions that are + * committing concurrently to form a queue, so that a single process can + * update the status for all of them within a single lock acquisition run. * * XLOG interactions: this module generates an XLOG record whenever a new * CLOG page is initialized to zeroes. Other writes of CLOG come from @@ -43,6 +44,7 @@ #include "pgstat.h" #include "storage/proc.h" #include "storage/sync.h" +#include "utils/guc_hooks.h" /* * Defines for CLOG page sizes. A page is the same BLCKSZ as is used @@ -62,6 +64,15 @@ #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE) #define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1) +/* + * Because space used in CLOG by each transaction is so small, we place a + * smaller limit on the number of CLOG buffers than SLRU allows. No other + * SLRU needs this. + */ +#define CLOG_MAX_ALLOWED_BUFFERS \ + Min(SLRU_MAX_ALLOWED_BUFFERS, \ + (((MaxTransactionId / 2) + (CLOG_XACTS_PER_PAGE - 1)) / CLOG_XACTS_PER_PAGE)) + /* * Although we return an int64 the actual value can't currently exceed @@ -284,15 +295,20 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, XLogRecPtr lsn, int64 pageno, bool all_xact_same_page) { + LWLock *lock; + /* Can't use group update when PGPROC overflows. */ StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS, "group clog threshold less than PGPROC cached subxids"); + /* Get the SLRU bank lock for the page we are going to access. */ + lock = SimpleLruGetBankLock(XactCtl, pageno); + /* - * When there is contention on XactSLRULock, we try to group multiple - * updates; a single leader process will perform transaction status - * updates for multiple backends so that the number of times XactSLRULock - * needs to be acquired is reduced. + * When there is contention on the SLRU bank lock we need, we try to group + * multiple updates; a single leader process will perform transaction + * status updates for multiple backends so that the number of times the + * bank lock needs to be acquired is reduced. * * For this optimization to be safe, the XID and subxids in MyProc must be * the same as the ones for which we're setting the status. Check that @@ -310,17 +326,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, nsubxids * sizeof(TransactionId)) == 0)) { /* - * If we can immediately acquire XactSLRULock, we update the status of - * our own XID and release the lock. If not, try use group XID - * update. If that doesn't work out, fall back to waiting for the - * lock to perform an update for this transaction only. + * If we can immediately acquire the lock, we update the status of our + * own XID and release the lock. If not, try use group XID update. If + * that doesn't work out, fall back to waiting for the lock to perform + * an update for this transaction only. */ - if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE)) + if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE)) { /* Got the lock without waiting! Do the update. */ TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); return; } else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno)) @@ -333,10 +349,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, } /* Group update not applicable, or couldn't accept this page number. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -355,7 +371,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, Assert(status == TRANSACTION_STATUS_COMMITTED || status == TRANSACTION_STATUS_ABORTED || (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid))); - Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE)); + Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl, pageno), + LW_EXCLUSIVE)); /* * If we're doing an async commit (ie, lsn is valid), then we must wait @@ -406,14 +423,15 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, } /* - * When we cannot immediately acquire XactSLRULock in exclusive mode at + * Subroutine for TransactionIdSetPageStatus, q.v. + * + * When we cannot immediately acquire the SLRU bank lock in exclusive mode at * commit time, add ourselves to a list of processes that need their XIDs * status update. The first process to add itself to the list will acquire - * XactSLRULock in exclusive mode and set transaction status as required - * on behalf of all group members. This avoids a great deal of contention - * around XactSLRULock when many processes are trying to commit at once, - * since the lock need not be repeatedly handed off from one committing - * process to the next. + * the lock in exclusive mode and set transaction status as required on behalf + * of all group members. This avoids a great deal of contention when many + * processes are trying to commit at once, since the lock need not be + * repeatedly handed off from one committing process to the next. * * Returns true when transaction status has been updated in clog; returns * false if we decided against applying the optimization because the page @@ -425,16 +443,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, { volatile PROC_HDR *procglobal = ProcGlobal; PGPROC *proc = MyProc; - int pgprocno = MyProcNumber; uint32 nextidx; uint32 wakeidx; + int prevpageno; + LWLock *prevlock = NULL; /* We should definitely have an XID whose status needs to be updated. */ Assert(TransactionIdIsValid(xid)); /* - * Add ourselves to the list of processes needing a group XID status - * update. + * Prepare to add ourselves to the list of processes needing a group XID + * status update. */ proc->clogGroupMember = true; proc->clogGroupMemberXid = xid; @@ -442,6 +461,29 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, proc->clogGroupMemberPage = pageno; proc->clogGroupMemberLsn = lsn; + /* + * We put ourselves in the queue by writing MyProcNumber to + * ProcGlobal->clogGroupFirst. However, if there's already a process + * listed there, we compare our pageno with that of that process; if it + * differs, we cannot participate in the group, so we return for caller to + * update pg_xact in the normal way. + * + * If we're not the first process in the list, we must follow the leader. + * We do this by storing the data we want updated in our PGPROC entry + * where the leader can find it, then going to sleep. + * + * If no process is already in the list, we're the leader; our first step + * is to lock the SLRU bank to which our page belongs, then we close out + * the group by resetting the list pointer from ProcGlobal->clogGroupFirst + * (this lets other processes set up other groups later); finally we do + * the SLRU updates, release the SLRU bank lock, and wake up the sleeping + * processes. + * + * If another group starts to update a page in a different SLRU bank, they + * can proceed concurrently, since the bank lock they're going to use is + * different from ours. If another group starts to update a page in the + * same bank as ours, they wait until we release the lock. + */ nextidx = pg_atomic_read_u32(&procglobal->clogGroupFirst); while (true) @@ -453,10 +495,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, * There is a race condition here, which is that after doing the below * check and before adding this proc's clog update to a group, the * group leader might have already finished the group update for this - * page and becomes group leader of another group. This will lead to a - * situation where a single group can have different clog page - * updates. This isn't likely and will still work, just maybe a bit - * less efficiently. + * page and becomes group leader of another group, updating a + * different page. This will lead to a situation where a single group + * can have different clog page updates. This isn't likely and will + * still work, just less efficiently -- we handle this case by + * switching to a different bank lock in the loop below. */ if (nextidx != INVALID_PGPROCNO && GetPGProcByNumber(nextidx)->clogGroupMemberPage != proc->clogGroupMemberPage) @@ -474,7 +517,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, if (pg_atomic_compare_exchange_u32(&procglobal->clogGroupFirst, &nextidx, - (uint32) pgprocno)) + (uint32) MyProcNumber)) break; } @@ -508,13 +551,21 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, return true; } - /* We are the leader. Acquire the lock on behalf of everyone. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + /* + * By here, we know we're the leader process. Acquire the SLRU bank lock + * that corresponds to the page we originally wanted to modify. + */ + prevpageno = proc->clogGroupMemberPage; + prevlock = SimpleLruGetBankLock(XactCtl, prevpageno); + LWLockAcquire(prevlock, LW_EXCLUSIVE); /* * Now that we've got the lock, clear the list of processes waiting for * group XID status update, saving a pointer to the head of the list. - * Trying to pop elements one at a time could lead to an ABA problem. + * (Trying to pop elements one at a time could lead to an ABA problem.) + * + * At this point, any processes trying to do this would create a separate + * group. */ nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst, INVALID_PGPROCNO); @@ -526,6 +577,31 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, while (nextidx != INVALID_PGPROCNO) { PGPROC *nextproc = &ProcGlobal->allProcs[nextidx]; + int thispageno = nextproc->clogGroupMemberPage; + + /* + * If the page to update belongs to a different bank than the previous + * one, exchange bank lock to the new one. This should be quite rare, + * as described above. + * + * (We could try to optimize this by waking up the processes for which + * we have already updated the status while we exchange the lock, but + * the code doesn't do that at present. I think it'd require + * additional bookkeeping, making the common path slower in order to + * improve an infrequent case.) + */ + if (thispageno != prevpageno) + { + LWLock *lock = SimpleLruGetBankLock(XactCtl, thispageno); + + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + } + prevlock = lock; + prevpageno = thispageno; + } /* * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs @@ -545,12 +621,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, } /* We're done with the lock now. */ - LWLockRelease(XactSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); /* * Now that we've released the lock, go back and wake everybody up. We * don't do this under the lock so as to keep lock hold times to a * minimum. + * + * (Perhaps we could do this in two passes, the first setting + * clogGroupNext to invalid while saving the semaphores to an array, then + * a single write barrier, then another pass unlocking the semaphores.) */ while (wakeidx != INVALID_PGPROCNO) { @@ -574,7 +655,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, /* * Sets the commit status of a single transaction. * - * Must be called with XactSLRULock held + * Caller must hold the corresponding SLRU bank lock, will be held at exit. */ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno) @@ -585,6 +666,11 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i char byteval; char curval; + Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(xid)); + Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl, + XactCtl->shared->page_number[slotno]), + LW_EXCLUSIVE)); + byteptr = XactCtl->shared->page_buffer[slotno] + byteno; curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK; @@ -666,7 +752,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) lsnindex = GetLSNIndex(slotno, xid); *lsn = XactCtl->shared->group_lsn[lsnindex]; - LWLockRelease(XactSLRULock); + LWLockRelease(SimpleLruGetBankLock(XactCtl, pageno)); return status; } @@ -674,23 +760,18 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) /* * Number of shared CLOG buffers. * - * On larger multi-processor systems, it is possible to have many CLOG page - * requests in flight at one time which could lead to disk access for CLOG - * page if the required page is not found in memory. Testing revealed that we - * can get the best performance by having 128 CLOG buffers, more than that it - * doesn't improve performance. - * - * Unconditionally keeping the number of CLOG buffers to 128 did not seem like - * a good idea, because it would increase the minimum amount of shared memory - * required to start, which could be a problem for people running very small - * configurations. The following formula seems to represent a reasonable - * compromise: people with very low values for shared_buffers will get fewer - * CLOG buffers as well, and everyone else will get 128. + * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB. + * Otherwise just cap the configured amount to be between 16 and the maximum + * allowed. */ -Size +static int CLOGShmemBuffers(void) { - return Min(128, Max(4, NBuffers / 512)); + /* auto-tune based on shared buffers */ + if (transaction_buffers == 0) + return SimpleLruAutotuneBuffers(512, 1024); + + return Min(Max(16, transaction_buffers), CLOG_MAX_ALLOWED_BUFFERS); } /* @@ -705,14 +786,44 @@ CLOGShmemSize(void) void CLOGShmemInit(void) { + /* If auto-tuning is requested, now is the time to do it */ + if (transaction_buffers == 0) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", CLOGShmemBuffers()); + SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + + /* + * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT. + * However, if the DBA explicitly set transaction_buffers = 0 in the + * config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that + * and we must force the matter with PGC_S_OVERRIDE. + */ + if (transaction_buffers == 0) /* failed to apply it? */ + SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + Assert(transaction_buffers != 0); + XactCtl->PagePrecedes = CLOGPagePrecedes; SimpleLruInit(XactCtl, "transaction", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE, - XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER, - SYNC_HANDLER_CLOG, false); + "pg_xact", LWTRANCHE_XACT_BUFFER, + LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG, false); SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE); } /* + * GUC check_hook for transaction_buffers + */ +bool +check_transaction_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("transaction_buffers", newval); +} + +/* * This func must be called ONCE on system install. It creates * the initial CLOG segment. (The CLOG directory is assumed to * have been created by initdb, and CLOGShmemInit must have been @@ -722,8 +833,9 @@ void BootStrapCLOG(void) { int slotno; + LWLock *lock = SimpleLruGetBankLock(XactCtl, 0); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the commit log */ slotno = ZeroCLOGPage(0, false); @@ -732,7 +844,7 @@ BootStrapCLOG(void) SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -781,8 +893,9 @@ TrimCLOG(void) { TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid); int64 pageno = TransactionIdToPage(xid); + LWLock *lock = SimpleLruGetBankLock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* * Zero out the remainder of the current clog page. Under normal @@ -814,7 +927,7 @@ TrimCLOG(void) XactCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -846,6 +959,7 @@ void ExtendCLOG(TransactionId newestXact) { int64 pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -856,13 +970,14 @@ ExtendCLOG(TransactionId newestXact) return; pageno = TransactionIdToPage(newestXact); + lock = SimpleLruGetBankLock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCLOGPage(pageno, true); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } @@ -1000,16 +1115,18 @@ clog_redo(XLogReaderState *record) { int64 pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(pageno)); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(XactCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCLOGPage(pageno, false); SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } else if (info == CLOG_TRUNCATE) { diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index d965db89c75..5c35a18348c 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -33,6 +33,7 @@ #include "pg_trace.h" #include "storage/shmem.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" @@ -225,10 +226,11 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno) { + LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno); int slotno; int i; - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); @@ -238,13 +240,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, CommitTsCtl->shared->page_dirty[slotno] = true; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* * Sets the commit timestamp of a single transaction. * - * Must be called with CommitTsSLRULock held + * Caller must hold the correct SLRU bank lock, will be held at exit */ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, @@ -345,7 +347,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, if (nodeid) *nodeid = entry.nodeid; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno)); return *ts != 0; } @@ -499,14 +501,18 @@ pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS) /* * Number of shared CommitTS buffers. * - * We use a very similar logic as for the number of CLOG buffers (except we - * scale up twice as fast with shared buffers, and the maximum is twice as - * high); see comments in CLOGShmemBuffers. + * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB. + * Otherwise just cap the configured amount to be between 16 and the maximum + * allowed. */ -Size +static int CommitTsShmemBuffers(void) { - return Min(256, Max(4, NBuffers / 256)); + /* auto-tune based on shared buffers */ + if (commit_timestamp_buffers == 0) + return SimpleLruAutotuneBuffers(512, 1024); + + return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS); } /* @@ -528,10 +534,31 @@ CommitTsShmemInit(void) { bool found; + /* If auto-tuning is requested, now is the time to do it */ + if (commit_timestamp_buffers == 0) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers()); + SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + + /* + * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT. + * However, if the DBA explicitly set commit_timestamp_buffers = 0 in + * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override + * that and we must force the matter with PGC_S_OVERRIDE. + */ + if (commit_timestamp_buffers == 0) /* failed to apply it? */ + SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + Assert(commit_timestamp_buffers != 0); + CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0, - CommitTsSLRULock, "pg_commit_ts", - LWTRANCHE_COMMITTS_BUFFER, + "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER, + LWTRANCHE_COMMITTS_SLRU, SYNC_HANDLER_COMMIT_TS, false); SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE); @@ -554,6 +581,15 @@ CommitTsShmemInit(void) } /* + * GUC check_hook for commit_timestamp_buffers + */ +bool +check_commit_ts_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("commit_timestamp_buffers", newval); +} + +/* * This function must be called ONCE on system install. * * (The CommitTs directory is assumed to have been created by initdb, and @@ -715,13 +751,14 @@ ActivateCommitTs(void) /* Create the current segment file, if necessary */ if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) { + LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno); int slotno; - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* Change the activation status in shared memory. */ @@ -760,8 +797,6 @@ DeactivateCommitTs(void) TransamVariables->oldestCommitTsXid = InvalidTransactionId; TransamVariables->newestCommitTsXid = InvalidTransactionId; - LWLockRelease(CommitTsLock); - /* * Remove *all* files. This is necessary so that there are no leftover * files; in the case where this feature is later enabled after running @@ -769,10 +804,16 @@ DeactivateCommitTs(void) * (We can probably tolerate out-of-sequence files, as they are going to * be overwritten anyway when we wrap around, but it seems better to be * tidy.) + * + * Note that we do this with CommitTsLock acquired in exclusive mode. This + * is very heavy-handed, but since this routine can only be called in the + * replica and should happen very rarely, we don't worry too much about + * it. Note also that no process should be consulting this SLRU if we + * have just deactivated it. */ - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); - LWLockRelease(CommitTsSLRULock); + + LWLockRelease(CommitTsLock); } /* @@ -804,6 +845,7 @@ void ExtendCommitTs(TransactionId newestXact) { int64 pageno; + LWLock *lock; /* * Nothing to do if module not enabled. Note we do an unlocked read of @@ -824,12 +866,14 @@ ExtendCommitTs(TransactionId newestXact) pageno = TransactionIdToCTsPage(newestXact); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(CommitTsCtl, pageno); + + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCommitTsPage(pageno, !InRecovery); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* @@ -983,16 +1027,18 @@ commit_ts_redo(XLogReaderState *record) { int64 pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(pageno)); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(CommitTsCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } else if (info == COMMIT_TS_TRUNCATE) { diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 64040d330ef..9b815061452 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -88,6 +88,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/snapmgr.h" @@ -192,10 +193,10 @@ static SlruCtlData MultiXactMemberCtlData; /* * MultiXact state shared across all backends. All this state is protected - * by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and - * MultiXactMemberSLRULock to guard accesses to the two sets of SLRU - * buffers. For concurrency's sake, we avoid holding more than one of these - * locks at a time.) + * by MultiXactGenLock. (We also use SLRU bank's lock of MultiXactOffset and + * MultiXactMember to guard accesses to the two sets of SLRU buffers. For + * concurrency's sake, we avoid holding more than one of these locks at a + * time.) */ typedef struct MultiXactStateData { @@ -870,12 +871,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, int slotno; MultiXactOffset *offptr; int i; - - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLock *lock; + LWLock *prevlock = NULL; pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); + /* * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction" * to complain about if there's any I/O error. This is kinda bogus, but @@ -891,10 +895,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactOffsetCtl->shared->page_dirty[slotno] = true; - /* Exchange our lock */ - LWLockRelease(MultiXactOffsetSLRULock); - - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + /* Release MultiXactOffset SLRU lock. */ + LWLockRelease(lock); prev_pageno = -1; @@ -916,6 +918,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, if (pageno != prev_pageno) { + /* + * MultiXactMember SLRU page is changed so check if this new page + * fall into the different SLRU bank then release the old bank's + * lock and acquire lock on the new bank. + */ + lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -936,7 +952,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactMemberCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); } /* @@ -1239,6 +1256,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, MultiXactId tmpMXact; MultiXactOffset nextOffset; MultiXactMember *ptr; + LWLock *lock; + LWLock *prevlock = NULL; debug_elog3(DEBUG2, "GetMembers: asked for %u", multi); @@ -1342,11 +1361,22 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, * time on every multixact creation. */ retry: - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); - pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + /* + * If this page falls under a different bank, release the old bank's lock + * and acquire the lock of the new bank. + */ + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1379,7 +1409,21 @@ retry: entryno = MultiXactIdToOffsetEntry(tmpMXact); if (pageno != prev_pageno) + { + /* + * Since we're going to access a different SLRU page, if this page + * falls under a different bank, release the old bank's lock and + * acquire the lock of the new bank. + */ + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact); + } offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1388,7 +1432,8 @@ retry: if (nextMXOffset == 0) { /* Corner case 2: next multixact is still being filled in */ - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); goto retry; @@ -1397,13 +1442,11 @@ retry: length = nextMXOffset - offset; } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember)); - /* Now get the members themselves. */ - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); - truelength = 0; prev_pageno = -1; for (i = 0; i < length; i++, offset++) @@ -1419,6 +1462,20 @@ retry: if (pageno != prev_pageno) { + /* + * Since we're going to access a different SLRU page, if this page + * falls under a different bank, release the old bank's lock and + * acquire the lock of the new bank. + */ + lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -1442,7 +1499,8 @@ retry: truelength++; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock) + LWLockRelease(prevlock); /* A multixid with zero members should not happen */ Assert(truelength > 0); @@ -1834,8 +1892,8 @@ MultiXactShmemSize(void) mul_size(sizeof(MultiXactId) * 2, MaxOldestSlot)) size = SHARED_MULTIXACT_STATE_SIZE; - size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS, 0)); - size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(multixact_offset_buffers, 0)); + size = add_size(size, SimpleLruShmemSize(multixact_member_buffers, 0)); return size; } @@ -1851,16 +1909,16 @@ MultiXactShmemInit(void) MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes; SimpleLruInit(MultiXactOffsetCtl, - "multixact_offset", NUM_MULTIXACTOFFSET_BUFFERS, 0, - MultiXactOffsetSLRULock, "pg_multixact/offsets", - LWTRANCHE_MULTIXACTOFFSET_BUFFER, + "multixact_offset", multixact_offset_buffers, 0, + "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER, + LWTRANCHE_MULTIXACTOFFSET_SLRU, SYNC_HANDLER_MULTIXACT_OFFSET, false); SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE); SimpleLruInit(MultiXactMemberCtl, - "multixact_member", NUM_MULTIXACTMEMBER_BUFFERS, 0, - MultiXactMemberSLRULock, "pg_multixact/members", - LWTRANCHE_MULTIXACTMEMBER_BUFFER, + "multixact_member", multixact_member_buffers, 0, + "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER, + LWTRANCHE_MULTIXACTMEMBER_SLRU, SYNC_HANDLER_MULTIXACT_MEMBER, false); /* doesn't call SimpleLruTruncate() or meet criteria for unit tests */ @@ -1888,6 +1946,24 @@ MultiXactShmemInit(void) } /* + * GUC check_hook for multixact_offset_buffers + */ +bool +check_multixact_offset_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("multixact_offset_buffers", newval); +} + +/* + * GUC check_hook for multixact_member_buffer + */ +bool +check_multixact_member_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("multixact_member_buffers", newval); +} + +/* * This func must be called ONCE on system install. It creates the initial * MultiXact segments. (The MultiXacts directories are assumed to have been * created by initdb, and MultiXactShmemInit must have been called already.) @@ -1896,8 +1972,10 @@ void BootStrapMultiXact(void) { int slotno; + LWLock *lock; - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the offsets log */ slotno = ZeroMultiXactOffsetPage(0, false); @@ -1906,9 +1984,10 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(MultiXactMemberCtl, 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the members log */ slotno = ZeroMultiXactMemberPage(0, false); @@ -1917,7 +1996,7 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -1977,10 +2056,12 @@ static void MaybeExtendOffsetSlru(void) { int64 pageno; + LWLock *lock; pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact); + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno)) { @@ -1995,7 +2076,7 @@ MaybeExtendOffsetSlru(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2049,6 +2130,8 @@ TrimMultiXact(void) oldestMXactDB = MultiXactState->oldestMultiXactDB; LWLockRelease(MultiXactGenLock); + /* Clean up offsets state */ + /* * (Re-)Initialize our idea of the latest page number for offsets. */ @@ -2056,9 +2139,6 @@ TrimMultiXact(void) pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number, pageno); - /* Clean up offsets state */ - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); - /* * Zero out the remainder of the current offsets page. See notes in * TrimCLOG() for background. Unlike CLOG, some WAL record covers every @@ -2072,7 +2152,9 @@ TrimMultiXact(void) { int slotno; MultiXactOffset *offptr; + LWLock *lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -2080,10 +2162,9 @@ TrimMultiXact(void) MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset))); MultiXactOffsetCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactOffsetSLRULock); - /* * And the same for members. * @@ -2093,8 +2174,6 @@ TrimMultiXact(void) pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number, pageno); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); - /* * Zero out the remainder of the current members page. See notes in * TrimCLOG() for motivation. @@ -2105,7 +2184,9 @@ TrimMultiXact(void) int slotno; TransactionId *xidptr; int memberoff; + LWLock *lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); memberoff = MXOffsetToMemberOffset(offset); slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset); xidptr = (TransactionId *) @@ -2120,10 +2201,9 @@ TrimMultiXact(void) */ MultiXactMemberCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactMemberSLRULock); - /* signal that we're officially up */ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE); MultiXactState->finishedStartup = true; @@ -2411,6 +2491,7 @@ static void ExtendMultiXactOffset(MultiXactId multi) { int64 pageno; + LWLock *lock; /* * No work except at first MultiXactId of a page. But beware: just after @@ -2421,13 +2502,14 @@ ExtendMultiXactOffset(MultiXactId multi) return; pageno = MultiXactIdToOffsetPage(multi); + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactOffsetPage(pageno, true); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2460,15 +2542,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers) if (flagsoff == 0 && flagsbit == 0) { int64 pageno; + LWLock *lock; pageno = MXOffsetToMemberPage(offset); + lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactMemberPage(pageno, true); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -2766,7 +2850,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result) offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; offset = *offptr; - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(SimpleLruGetBankLock(MultiXactOffsetCtl, pageno)); *result = offset; return true; @@ -3248,31 +3332,35 @@ multixact_redo(XLogReaderState *record) { int64 pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(pageno)); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactOffsetPage(pageno, false); SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE) { int64 pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(pageno)); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactMemberPage(pageno, false); SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_CREATE_ID) { diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 93cefcd10d3..f774d285b7f 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -1,28 +1,38 @@ /*------------------------------------------------------------------------- * * slru.c - * Simple LRU buffering for transaction status logfiles + * Simple LRU buffering for wrap-around-able permanent metadata * - * We use a simple least-recently-used scheme to manage a pool of page - * buffers. Under ordinary circumstances we expect that write - * traffic will occur mostly to the latest page (and to the just-prior - * page, soon after a page transition). Read traffic will probably touch - * a larger span of pages, but in any case a fairly small number of page - * buffers should be sufficient. So, we just search the buffers using plain - * linear search; there's no need for a hashtable or anything fancy. - * The management algorithm is straight LRU except that we will never swap - * out the latest page (since we know it's going to be hit again eventually). + * This module is used to maintain various pieces of transaction status + * indexed by TransactionId (such as commit status, parent transaction ID, + * commit timestamp), as well as storage for multixacts, serializable + * isolation locks and NOTIFY traffic. Extensions can define their own + * SLRUs, too. * - * We use a control LWLock to protect the shared data structures, plus - * per-buffer LWLocks that synchronize I/O for each buffer. The control lock - * must be held to examine or modify any shared state. A process that is - * reading in or writing out a page buffer does not hold the control lock, - * only the per-buffer lock for the buffer it is working on. One exception - * is latest_page_number, which is read and written using atomic ops. + * Under ordinary circumstances we expect that write traffic will occur + * mostly to the latest page (and to the just-prior page, soon after a + * page transition). Read traffic will probably touch a larger span of + * pages, but a relatively small number of buffers should be sufficient. * - * "Holding the control lock" means exclusive lock in all cases except for - * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for - * the implications of that. + * We use a simple least-recently-used scheme to manage a pool of shared + * page buffers, split in banks by the lowest bits of the page number, and + * the management algorithm only processes the bank to which the desired + * page belongs, so a linear search is sufficient; there's no need for a + * hashtable or anything fancy. The algorithm is straight LRU except that + * we will never swap out the latest page (since we know it's going to be + * hit again eventually). + * + * We use per-bank control LWLocks to protect the shared data structures, + * plus per-buffer LWLocks that synchronize I/O for each buffer. The + * bank's control lock must be held to examine or modify any of the bank's + * shared state. A process that is reading in or writing out a page + * buffer does not hold the control lock, only the per-buffer lock for the + * buffer it is working on. One exception is latest_page_number, which is + * read and written using atomic ops. + * + * "Holding the bank control lock" means exclusive lock in all cases + * except for SimpleLruReadPage_ReadOnly(); see comments for + * SlruRecentlyUsed() for the implications of that. * * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively * before releasing the control lock. The per-buffer lock is released after @@ -60,6 +70,7 @@ #include "pgstat.h" #include "storage/fd.h" #include "storage/shmem.h" +#include "utils/guc_hooks.h" static inline int SlruFileName(SlruCtl ctl, char *path, int64 segno) @@ -106,6 +117,23 @@ typedef struct SlruWriteAllData typedef struct SlruWriteAllData *SlruWriteAll; + +/* + * Bank size for the slot array. Pages are assigned a bank according to their + * page number, with each bank being this size. We want a power of 2 so that + * we can determine the bank number for a page with just bit shifting; we also + * want to keep the bank size small so that LRU victim search is fast. 16 + * buffers per bank seems a good number. + */ +#define SLRU_BANK_BITSHIFT 4 +#define SLRU_BANK_SIZE (1 << SLRU_BANK_BITSHIFT) + +/* + * Macro to get the bank number to which the slot belongs. + */ +#define SlotGetBankNumber(slotno) ((slotno) >> SLRU_BANK_BITSHIFT) + + /* * Populate a file tag describing a segment file. We only use the segment * number, since we can derive everything else we need by having separate @@ -118,34 +146,6 @@ typedef struct SlruWriteAllData *SlruWriteAll; (a).segno = (xx_segno) \ ) -/* - * Macro to mark a buffer slot "most recently used". Note multiple evaluation - * of arguments! - * - * The reason for the if-test is that there are often many consecutive - * accesses to the same page (particularly the latest page). By suppressing - * useless increments of cur_lru_count, we reduce the probability that old - * pages' counts will "wrap around" and make them appear recently used. - * - * We allow this code to be executed concurrently by multiple processes within - * SimpleLruReadPage_ReadOnly(). As long as int reads and writes are atomic, - * this should not cause any completely-bogus values to enter the computation. - * However, it is possible for either cur_lru_count or individual - * page_lru_count entries to be "reset" to lower values than they should have, - * in case a process is delayed while it executes this macro. With care in - * SlruSelectLRUPage(), this does little harm, and in any case the absolute - * worst possible consequence is a nonoptimal choice of page to evict. The - * gain from allowing concurrent reads of SLRU pages seems worth it. - */ -#define SlruRecentlyUsed(shared, slotno) \ - do { \ - int new_lru_count = (shared)->cur_lru_count; \ - if (new_lru_count != (shared)->page_lru_count[slotno]) { \ - (shared)->cur_lru_count = ++new_lru_count; \ - (shared)->page_lru_count[slotno] = new_lru_count; \ - } \ - } while (0) - /* Saved info for SlruReportIOError */ typedef enum { @@ -173,6 +173,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno); static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int64 segpage, void *data); static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno); +static inline void SlruRecentlyUsed(SlruShared shared, int slotno); /* @@ -182,8 +183,12 @@ static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno); Size SimpleLruShmemSize(int nslots, int nlsns) { + int nbanks = nslots / SLRU_BANK_SIZE; Size sz; + Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS); + Assert(nslots % SLRU_BANK_SIZE == 0); + /* we assume nslots isn't so large as to risk overflow */ sz = MAXALIGN(sizeof(SlruSharedData)); sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */ @@ -192,6 +197,8 @@ SimpleLruShmemSize(int nslots, int nlsns) sz += MAXALIGN(nslots * sizeof(int64)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ + sz += MAXALIGN(nbanks * sizeof(LWLockPadded)); /* bank_locks[] */ + sz += MAXALIGN(nbanks * sizeof(int)); /* bank_cur_lru_count[] */ if (nlsns > 0) sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ @@ -200,6 +207,21 @@ SimpleLruShmemSize(int nslots, int nlsns) } /* + * Determine a number of SLRU buffers to use. + * + * We simply divide shared_buffers by the divisor given and cap + * that at the maximum given; but always at least SLRU_BANK_SIZE. + * Round down to the nearest multiple of SLRU_BANK_SIZE. + */ +int +SimpleLruAutotuneBuffers(int divisor, int max) +{ + return Min(max - (max % SLRU_BANK_SIZE), + Max(SLRU_BANK_SIZE, + NBuffers / divisor - (NBuffers / divisor) % SLRU_BANK_SIZE)); +} + +/* * Initialize, or attach to, a simple LRU cache in shared memory. * * ctl: address of local (unshared) control structure. @@ -208,16 +230,20 @@ SimpleLruShmemSize(int nslots, int nlsns) * nlsns: number of LSN groups per page (set to zero if not relevant). * ctllock: LWLock to use to control access to the shared control structure. * subdir: PGDATA-relative subdirectory that will contain the files. - * tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks. + * buffer_tranche_id: tranche ID to use for the SLRU's per-buffer LWLocks. + * bank_tranche_id: tranche ID to use for the bank LWLocks. * sync_handler: which set of functions to use to handle sync requests */ void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, - LWLock *ctllock, const char *subdir, int tranche_id, + const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names) { SlruShared shared; bool found; + int nbanks = nslots / SLRU_BANK_SIZE; + + Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS); shared = (SlruShared) ShmemInitStruct(name, SimpleLruShmemSize(nslots, nlsns), @@ -233,12 +259,9 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, memset(shared, 0, sizeof(SlruSharedData)); - shared->ControlLock = ctllock; - shared->num_slots = nslots; shared->lsn_groups_per_page = nlsns; - shared->cur_lru_count = 0; pg_atomic_init_u64(&shared->latest_page_number, 0); shared->slru_stats_idx = pgstat_get_slru_index(name); @@ -259,6 +282,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, /* Initialize LWLocks */ shared->buffer_locks = (LWLockPadded *) (ptr + offset); offset += MAXALIGN(nslots * sizeof(LWLockPadded)); + shared->bank_locks = (LWLockPadded *) (ptr + offset); + offset += MAXALIGN(nbanks * sizeof(LWLockPadded)); + shared->bank_cur_lru_count = (int *) (ptr + offset); + offset += MAXALIGN(nbanks * sizeof(int)); if (nlsns > 0) { @@ -270,7 +297,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, for (int slotno = 0; slotno < nslots; slotno++) { LWLockInitialize(&shared->buffer_locks[slotno].lock, - tranche_id); + buffer_tranche_id); shared->page_buffer[slotno] = ptr; shared->page_status[slotno] = SLRU_PAGE_EMPTY; @@ -279,11 +306,21 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ptr += BLCKSZ; } + /* Initialize the slot banks. */ + for (int bankno = 0; bankno < nbanks; bankno++) + { + LWLockInitialize(&shared->bank_locks[bankno].lock, bank_tranche_id); + shared->bank_cur_lru_count[bankno] = 0; + } + /* Should fit to estimated shmem size */ Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns)); } else + { Assert(found); + Assert(shared->num_slots == nslots); + } /* * Initialize the unshared control struct, including directory path. We @@ -292,16 +329,33 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, ctl->shared = shared; ctl->sync_handler = sync_handler; ctl->long_segment_names = long_segment_names; + ctl->bank_mask = (nslots / SLRU_BANK_SIZE) - 1; strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir)); } /* + * Helper function for GUC check_hook to check whether slru buffers are in + * multiples of SLRU_BANK_SIZE. + */ +bool +check_slru_buffers(const char *name, int *newval) +{ + /* Valid values are multiples of SLRU_BANK_SIZE */ + if (*newval % SLRU_BANK_SIZE == 0) + return true; + + GUC_check_errdetail("\"%s\" must be a multiple of %d", name, + SLRU_BANK_SIZE); + return false; +} + +/* * Initialize (or reinitialize) a page to zeroes. * * The page is not actually written, just set up in shared memory. * The slot number of the new page is returned. * - * Control lock must be held at entry, and will be held at exit. + * Bank lock must be held at entry, and will be held at exit. */ int SimpleLruZeroPage(SlruCtl ctl, int64 pageno) @@ -309,6 +363,8 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno) SlruShared shared = ctl->shared; int slotno; + Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE)); + /* Find a suitable buffer slot for the page */ slotno = SlruSelectLRUPage(ctl, pageno); Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY || @@ -369,18 +425,21 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno) * guarantee that new I/O hasn't been started before we return, though. * In fact the slot might not even contain the same page anymore.) * - * Control lock must be held at entry, and will be held at exit. + * Bank lock must be held at entry, and will be held at exit. */ static void SimpleLruWaitIO(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; + int bankno = SlotGetBankNumber(slotno); + + Assert(&shared->page_status[slotno] != SLRU_PAGE_EMPTY); /* See notes at top of file */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED); LWLockRelease(&shared->buffer_locks[slotno].lock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); /* * If the slot is still in an io-in-progress state, then either someone @@ -423,7 +482,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) * Return value is the shared-buffer slot number now holding the page. * The buffer's LRU access info is updated. * - * Control lock must be held at entry, and will be held at exit. + * The correct bank lock must be held at entry, and will be held at exit. */ int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, @@ -431,18 +490,21 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, { SlruShared shared = ctl->shared; + Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE)); + /* Outer loop handles restart if we must wait for someone else's I/O */ for (;;) { int slotno; + int bankno; bool ok; /* See if page already is in memory; if not, pick victim slot */ slotno = SlruSelectLRUPage(ctl, pageno); /* Did we find the page in memory? */ - if (shared->page_number[slotno] == pageno && - shared->page_status[slotno] != SLRU_PAGE_EMPTY) + if (shared->page_status[slotno] != SLRU_PAGE_EMPTY && + shared->page_number[slotno] == pageno) { /* * If page is still being read in, we must wait for I/O. Likewise @@ -477,9 +539,10 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); + bankno = SlotGetBankNumber(slotno); - /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + /* Release bank lock while doing I/O */ + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the read */ ok = SlruPhysicalReadPage(ctl, pageno, slotno); @@ -487,8 +550,8 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, /* Set the LSNs for this newly read-in page to zero */ SimpleLruZeroLSNs(ctl, slotno); - /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + /* Re-acquire bank control lock and update page state */ + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS && @@ -522,22 +585,25 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, * Return value is the shared-buffer slot number now holding the page. * The buffer's LRU access info is updated. * - * Control lock must NOT be held at entry, but will be held at exit. + * Bank control lock must NOT be held at entry, but will be held at exit. * It is unspecified whether the lock will be shared or exclusive. */ int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid) { SlruShared shared = ctl->shared; + int bankno = pageno & ctl->bank_mask; + int bankstart = bankno * SLRU_BANK_SIZE; + int bankend = bankstart + SLRU_BANK_SIZE; /* Try to find the page while holding only shared lock */ - LWLockAcquire(shared->ControlLock, LW_SHARED); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED); /* See if page is already in a buffer */ - for (int slotno = 0; slotno < shared->num_slots; slotno++) + for (int slotno = bankstart; slotno < bankend; slotno++) { - if (shared->page_number[slotno] == pageno && - shared->page_status[slotno] != SLRU_PAGE_EMPTY && + if (shared->page_status[slotno] != SLRU_PAGE_EMPTY && + shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS) { /* See comments for SlruRecentlyUsed macro */ @@ -551,8 +617,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid) } /* No luck, so switch to normal exclusive lock and do regular read */ - LWLockRelease(shared->ControlLock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockRelease(&shared->bank_locks[bankno].lock); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); return SimpleLruReadPage(ctl, pageno, true, xid); } @@ -566,15 +632,19 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid) * the write). However, we *do* attempt a fresh write even if the page * is already being written; this is for checkpoints. * - * Control lock must be held at entry, and will be held at exit. + * Bank lock must be held at entry, and will be held at exit. */ static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) { SlruShared shared = ctl->shared; int64 pageno = shared->page_number[slotno]; + int bankno = SlotGetBankNumber(slotno); bool ok; + Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY); + Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE)); + /* If a write is in progress, wait for it to finish */ while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && shared->page_number[slotno] == pageno) @@ -601,8 +671,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); - /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + /* Release bank lock while doing I/O */ + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the write */ ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata); @@ -614,8 +684,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) CloseTransientFile(fdata->fd[i]); } - /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + /* Re-acquire bank lock and update page state */ + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS); @@ -644,6 +714,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) void SimpleLruWritePage(SlruCtl ctl, int slotno) { + Assert(&ctl->shared->page_status[slotno] != SLRU_PAGE_EMPTY); + SlruInternalWritePage(ctl, slotno, NULL); } @@ -1028,17 +1100,53 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid) } /* - * Select the slot to re-use when we need a free slot. + * Mark a buffer slot "most recently used". + */ +static inline void +SlruRecentlyUsed(SlruShared shared, int slotno) +{ + int bankno = SlotGetBankNumber(slotno); + int new_lru_count = shared->bank_cur_lru_count[bankno]; + + Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY); + + /* + * The reason for the if-test is that there are often many consecutive + * accesses to the same page (particularly the latest page). By + * suppressing useless increments of bank_cur_lru_count, we reduce the + * probability that old pages' counts will "wrap around" and make them + * appear recently used. + * + * We allow this code to be executed concurrently by multiple processes + * within SimpleLruReadPage_ReadOnly(). As long as int reads and writes + * are atomic, this should not cause any completely-bogus values to enter + * the computation. However, it is possible for either bank_cur_lru_count + * or individual page_lru_count entries to be "reset" to lower values than + * they should have, in case a process is delayed while it executes this + * function. With care in SlruSelectLRUPage(), this does little harm, and + * in any case the absolute worst possible consequence is a nonoptimal + * choice of page to evict. The gain from allowing concurrent reads of + * SLRU pages seems worth it. + */ + if (new_lru_count != shared->page_lru_count[slotno]) + { + shared->bank_cur_lru_count[bankno] = ++new_lru_count; + shared->page_lru_count[slotno] = new_lru_count; + } +} + +/* + * Select the slot to re-use when we need a free slot for the given page. * - * The target page number is passed because we need to consider the - * possibility that some other process reads in the target page while - * we are doing I/O to free a slot. Hence, check or recheck to see if - * any slot already holds the target page, and return that slot if so. - * Thus, the returned slot is *either* a slot already holding the pageno - * (could be any state except EMPTY), *or* a freeable slot (state EMPTY - * or CLEAN). + * The target page number is passed not only because we need to know the + * correct bank to use, but also because we need to consider the possibility + * that some other process reads in the target page while we are doing I/O to + * free a slot. Hence, check or recheck to see if any slot already holds the + * target page, and return that slot if so. Thus, the returned slot is + * *either* a slot already holding the pageno (could be any state except + * EMPTY), *or* a freeable slot (state EMPTY or CLEAN). * - * Control lock must be held at entry, and will be held at exit. + * The correct bank lock must be held at entry, and will be held at exit. */ static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno) @@ -1055,12 +1163,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno) int bestinvalidslot = 0; /* keep compiler quiet */ int best_invalid_delta = -1; int64 best_invalid_page_number = 0; /* keep compiler quiet */ + int bankno = pageno & ctl->bank_mask; + int bankstart = bankno * SLRU_BANK_SIZE; + int bankend = bankstart + SLRU_BANK_SIZE; + + Assert(LWLockHeldByMe(&shared->bank_locks[bankno].lock)); /* See if page already has a buffer assigned */ for (int slotno = 0; slotno < shared->num_slots; slotno++) { - if (shared->page_number[slotno] == pageno && - shared->page_status[slotno] != SLRU_PAGE_EMPTY) + if (shared->page_status[slotno] != SLRU_PAGE_EMPTY && + shared->page_number[slotno] == pageno) return slotno; } @@ -1091,14 +1204,15 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno) * That gets us back on the path to having good data when there are * multiple pages with the same lru_count. */ - cur_count = (shared->cur_lru_count)++; - for (int slotno = 0; slotno < shared->num_slots; slotno++) + cur_count = (shared->bank_cur_lru_count[bankno])++; + for (int slotno = bankstart; slotno < bankend; slotno++) { int this_delta; int64 this_page_number; if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) return slotno; + this_delta = cur_count - shared->page_lru_count[slotno]; if (this_delta < 0) { @@ -1193,6 +1307,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) SlruShared shared = ctl->shared; SlruWriteAllData fdata; int64 pageno = 0; + int prevbank = SlotGetBankNumber(0); bool ok; /* update the stats counter of flushes */ @@ -1203,10 +1318,27 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) */ fdata.num_files = 0; - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE); for (int slotno = 0; slotno < shared->num_slots; slotno++) { + int curbank = SlotGetBankNumber(slotno); + + /* + * If the current bank lock is not same as the previous bank lock then + * release the previous lock and acquire the new lock. + */ + if (curbank != prevbank) + { + LWLockRelease(&shared->bank_locks[prevbank].lock); + LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE); + prevbank = curbank; + } + + /* Do nothing if slot is unused */ + if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) + continue; + SlruInternalWritePage(ctl, slotno, &fdata); /* @@ -1220,7 +1352,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) !shared->page_dirty[slotno])); } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbank].lock); /* * Now close any files that were open @@ -1259,6 +1391,7 @@ void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage) { SlruShared shared = ctl->shared; + int prevbank; /* update the stats counter of truncates */ pgstat_count_slru_truncate(shared->slru_stats_idx); @@ -1269,8 +1402,6 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage) * or just after a checkpoint, any dirty pages should have been flushed * already ... we're just being extra careful here.) */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); - restart: /* @@ -1282,15 +1413,29 @@ restart: if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number), cutoffPage)) { - LWLockRelease(shared->ControlLock); ereport(LOG, (errmsg("could not truncate directory \"%s\": apparent wraparound", ctl->Dir))); return; } + prevbank = SlotGetBankNumber(0); + LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE); for (int slotno = 0; slotno < shared->num_slots; slotno++) { + int curbank = SlotGetBankNumber(slotno); + + /* + * If the current bank lock is not same as the previous bank lock then + * release the previous lock and acquire the new lock. + */ + if (curbank != prevbank) + { + LWLockRelease(&shared->bank_locks[prevbank].lock); + LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE); + prevbank = curbank; + } + if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage)) @@ -1320,10 +1465,12 @@ restart: SlruInternalWritePage(ctl, slotno, NULL); else SimpleLruWaitIO(ctl, slotno); + + LWLockRelease(&shared->bank_locks[prevbank].lock); goto restart; } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbank].lock); /* Now we can remove the old segment(s) */ (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage); @@ -1362,19 +1509,33 @@ void SlruDeleteSegment(SlruCtl ctl, int64 segno) { SlruShared shared = ctl->shared; + int prevbank = SlotGetBankNumber(0); bool did_write; /* Clean out any possibly existing references to the segment. */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE); restart: did_write = false; for (int slotno = 0; slotno < shared->num_slots; slotno++) { - int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; + int pagesegno; + int curbank = SlotGetBankNumber(slotno); + + /* + * If the current bank lock is not same as the previous bank lock then + * release the previous lock and acquire the new lock. + */ + if (curbank != prevbank) + { + LWLockRelease(&shared->bank_locks[prevbank].lock); + LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE); + prevbank = curbank; + } if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; + pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; /* not the segment we're looking for */ if (pagesegno != segno) continue; @@ -1405,7 +1566,7 @@ restart: SlruInternalDeleteSegment(ctl, segno); - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbank].lock); } /* diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 6aa47af43e2..dc9566fb51b 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -31,7 +31,9 @@ #include "access/slru.h" #include "access/subtrans.h" #include "access/transam.h" +#include "miscadmin.h" #include "pg_trace.h" +#include "utils/guc_hooks.h" #include "utils/snapmgr.h" @@ -85,12 +87,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent) int64 pageno = TransactionIdToPage(xid); int entryno = TransactionIdToEntry(xid); int slotno; + LWLock *lock; TransactionId *ptr; Assert(TransactionIdIsValid(parent)); Assert(TransactionIdFollows(xid, parent)); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid); ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno]; @@ -108,7 +112,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent) SubTransCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -138,7 +142,7 @@ SubTransGetParent(TransactionId xid) parent = *ptr; - LWLockRelease(SubtransSLRULock); + LWLockRelease(SimpleLruGetBankLock(SubTransCtl, pageno)); return parent; } @@ -186,6 +190,22 @@ SubTransGetTopmostTransaction(TransactionId xid) return previousXid; } +/* + * Number of shared SUBTRANS buffers. + * + * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB. + * Otherwise just cap the configured amount to be between 16 and the maximum + * allowed. + */ +static int +SUBTRANSShmemBuffers(void) +{ + /* auto-tune based on shared buffers */ + if (subtransaction_buffers == 0) + return SimpleLruAutotuneBuffers(512, 1024); + + return Min(Max(16, subtransaction_buffers), SLRU_MAX_ALLOWED_BUFFERS); +} /* * Initialization of shared memory for SUBTRANS @@ -193,21 +213,50 @@ SubTransGetTopmostTransaction(TransactionId xid) Size SUBTRANSShmemSize(void) { - return SimpleLruShmemSize(NUM_SUBTRANS_BUFFERS, 0); + return SimpleLruShmemSize(SUBTRANSShmemBuffers(), 0); } void SUBTRANSShmemInit(void) { + /* If auto-tuning is requested, now is the time to do it */ + if (subtransaction_buffers == 0) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", SUBTRANSShmemBuffers()); + SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + + /* + * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT. + * However, if the DBA explicitly set subtransaction_buffers = 0 in + * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override + * that and we must force the matter with PGC_S_OVERRIDE. + */ + if (subtransaction_buffers == 0) /* failed to apply it? */ + SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + Assert(subtransaction_buffers != 0); + SubTransCtl->PagePrecedes = SubTransPagePrecedes; - SimpleLruInit(SubTransCtl, "subtransaction", NUM_SUBTRANS_BUFFERS, 0, - SubtransSLRULock, "pg_subtrans", - LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE, - false); + SimpleLruInit(SubTransCtl, "subtransaction", SUBTRANSShmemBuffers(), 0, + "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER, + LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE, false); SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE); } /* + * GUC check_hook for subtransaction_buffers + */ +bool +check_subtrans_buffers(int *newval, void **extra, GucSource source) +{ + return check_slru_buffers("subtransaction_buffers", newval); +} + +/* * This func must be called ONCE on system install. It creates * the initial SUBTRANS segment. (The SUBTRANS directory is assumed to * have been created by the initdb shell script, and SUBTRANSShmemInit @@ -221,8 +270,9 @@ void BootStrapSUBTRANS(void) { int slotno; + LWLock *lock = SimpleLruGetBankLock(SubTransCtl, 0); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the subtrans log */ slotno = ZeroSUBTRANSPage(0); @@ -231,7 +281,7 @@ BootStrapSUBTRANS(void) SimpleLruWritePage(SubTransCtl, slotno); Assert(!SubTransCtl->shared->page_dirty[slotno]); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -261,6 +311,8 @@ StartupSUBTRANS(TransactionId oldestActiveXID) FullTransactionId nextXid; int64 startPage; int64 endPage; + LWLock *prevlock; + LWLock *lock; /* * Since we don't expect pg_subtrans to be valid across crashes, we @@ -268,23 +320,47 @@ StartupSUBTRANS(TransactionId oldestActiveXID) * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero * the new page without regard to whatever was previously on disk. */ - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); - startPage = TransactionIdToPage(oldestActiveXID); nextXid = TransamVariables->nextXid; endPage = TransactionIdToPage(XidFromFullTransactionId(nextXid)); + prevlock = SimpleLruGetBankLock(SubTransCtl, startPage); + LWLockAcquire(prevlock, LW_EXCLUSIVE); while (startPage != endPage) { + lock = SimpleLruGetBankLock(SubTransCtl, startPage); + + /* + * Check if we need to acquire the lock on the new bank then release + * the lock on the old bank and acquire on the new bank. + */ + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + (void) ZeroSUBTRANSPage(startPage); startPage++; /* must account for wraparound */ if (startPage > TransactionIdToPage(MaxTransactionId)) startPage = 0; } - (void) ZeroSUBTRANSPage(startPage); - LWLockRelease(SubtransSLRULock); + lock = SimpleLruGetBankLock(SubTransCtl, startPage); + + /* + * Check if we need to acquire the lock on the new bank then release the + * lock on the old bank and acquire on the new bank. + */ + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + } + (void) ZeroSUBTRANSPage(startPage); + LWLockRelease(lock); } /* @@ -318,6 +394,7 @@ void ExtendSUBTRANS(TransactionId newestXact) { int64 pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -329,12 +406,13 @@ ExtendSUBTRANS(TransactionId newestXact) pageno = TransactionIdToPage(newestXact); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetBankLock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page */ ZeroSUBTRANSPage(pageno); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } |