aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/transam/clog.c243
-rw-r--r--src/backend/access/transam/commit_ts.c88
-rw-r--r--src/backend/access/transam/multixact.c190
-rw-r--r--src/backend/access/transam/slru.c357
-rw-r--r--src/backend/access/transam/subtrans.c110
5 files changed, 739 insertions, 249 deletions
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 34f079cbb14..a787b374dac 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -3,12 +3,13 @@
* clog.c
* PostgreSQL transaction-commit-log manager
*
- * This module replaces the old "pg_log" access code, which treated pg_log
- * essentially like a relation, in that it went through the regular buffer
- * manager. The problem with that was that there wasn't any good way to
- * recycle storage space for transactions so old that they'll never be
- * looked up again. Now we use specialized access code so that the commit
- * log can be broken into relatively small, independent segments.
+ * This module stores two bits per transaction regarding its commit/abort
+ * status; the status for four transactions fit in a byte.
+ *
+ * This would be a pretty simple abstraction on top of slru.c, except that
+ * for performance reasons we allow multiple transactions that are
+ * committing concurrently to form a queue, so that a single process can
+ * update the status for all of them within a single lock acquisition run.
*
* XLOG interactions: this module generates an XLOG record whenever a new
* CLOG page is initialized to zeroes. Other writes of CLOG come from
@@ -43,6 +44,7 @@
#include "pgstat.h"
#include "storage/proc.h"
#include "storage/sync.h"
+#include "utils/guc_hooks.h"
/*
* Defines for CLOG page sizes. A page is the same BLCKSZ as is used
@@ -62,6 +64,15 @@
#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1)
+/*
+ * Because space used in CLOG by each transaction is so small, we place a
+ * smaller limit on the number of CLOG buffers than SLRU allows. No other
+ * SLRU needs this.
+ */
+#define CLOG_MAX_ALLOWED_BUFFERS \
+ Min(SLRU_MAX_ALLOWED_BUFFERS, \
+ (((MaxTransactionId / 2) + (CLOG_XACTS_PER_PAGE - 1)) / CLOG_XACTS_PER_PAGE))
+
/*
* Although we return an int64 the actual value can't currently exceed
@@ -284,15 +295,20 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
XLogRecPtr lsn, int64 pageno,
bool all_xact_same_page)
{
+ LWLock *lock;
+
/* Can't use group update when PGPROC overflows. */
StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS,
"group clog threshold less than PGPROC cached subxids");
+ /* Get the SLRU bank lock for the page we are going to access. */
+ lock = SimpleLruGetBankLock(XactCtl, pageno);
+
/*
- * When there is contention on XactSLRULock, we try to group multiple
- * updates; a single leader process will perform transaction status
- * updates for multiple backends so that the number of times XactSLRULock
- * needs to be acquired is reduced.
+ * When there is contention on the SLRU bank lock we need, we try to group
+ * multiple updates; a single leader process will perform transaction
+ * status updates for multiple backends so that the number of times the
+ * bank lock needs to be acquired is reduced.
*
* For this optimization to be safe, the XID and subxids in MyProc must be
* the same as the ones for which we're setting the status. Check that
@@ -310,17 +326,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
nsubxids * sizeof(TransactionId)) == 0))
{
/*
- * If we can immediately acquire XactSLRULock, we update the status of
- * our own XID and release the lock. If not, try use group XID
- * update. If that doesn't work out, fall back to waiting for the
- * lock to perform an update for this transaction only.
+ * If we can immediately acquire the lock, we update the status of our
+ * own XID and release the lock. If not, try use group XID update. If
+ * that doesn't work out, fall back to waiting for the lock to perform
+ * an update for this transaction only.
*/
- if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE))
+ if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE))
{
/* Got the lock without waiting! Do the update. */
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
return;
}
else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno))
@@ -333,10 +349,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
}
/* Group update not applicable, or couldn't accept this page number. */
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -355,7 +371,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
Assert(status == TRANSACTION_STATUS_COMMITTED ||
status == TRANSACTION_STATUS_ABORTED ||
(status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
- Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE));
+ Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl, pageno),
+ LW_EXCLUSIVE));
/*
* If we're doing an async commit (ie, lsn is valid), then we must wait
@@ -406,14 +423,15 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
}
/*
- * When we cannot immediately acquire XactSLRULock in exclusive mode at
+ * Subroutine for TransactionIdSetPageStatus, q.v.
+ *
+ * When we cannot immediately acquire the SLRU bank lock in exclusive mode at
* commit time, add ourselves to a list of processes that need their XIDs
* status update. The first process to add itself to the list will acquire
- * XactSLRULock in exclusive mode and set transaction status as required
- * on behalf of all group members. This avoids a great deal of contention
- * around XactSLRULock when many processes are trying to commit at once,
- * since the lock need not be repeatedly handed off from one committing
- * process to the next.
+ * the lock in exclusive mode and set transaction status as required on behalf
+ * of all group members. This avoids a great deal of contention when many
+ * processes are trying to commit at once, since the lock need not be
+ * repeatedly handed off from one committing process to the next.
*
* Returns true when transaction status has been updated in clog; returns
* false if we decided against applying the optimization because the page
@@ -425,16 +443,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
{
volatile PROC_HDR *procglobal = ProcGlobal;
PGPROC *proc = MyProc;
- int pgprocno = MyProcNumber;
uint32 nextidx;
uint32 wakeidx;
+ int prevpageno;
+ LWLock *prevlock = NULL;
/* We should definitely have an XID whose status needs to be updated. */
Assert(TransactionIdIsValid(xid));
/*
- * Add ourselves to the list of processes needing a group XID status
- * update.
+ * Prepare to add ourselves to the list of processes needing a group XID
+ * status update.
*/
proc->clogGroupMember = true;
proc->clogGroupMemberXid = xid;
@@ -442,6 +461,29 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
proc->clogGroupMemberPage = pageno;
proc->clogGroupMemberLsn = lsn;
+ /*
+ * We put ourselves in the queue by writing MyProcNumber to
+ * ProcGlobal->clogGroupFirst. However, if there's already a process
+ * listed there, we compare our pageno with that of that process; if it
+ * differs, we cannot participate in the group, so we return for caller to
+ * update pg_xact in the normal way.
+ *
+ * If we're not the first process in the list, we must follow the leader.
+ * We do this by storing the data we want updated in our PGPROC entry
+ * where the leader can find it, then going to sleep.
+ *
+ * If no process is already in the list, we're the leader; our first step
+ * is to lock the SLRU bank to which our page belongs, then we close out
+ * the group by resetting the list pointer from ProcGlobal->clogGroupFirst
+ * (this lets other processes set up other groups later); finally we do
+ * the SLRU updates, release the SLRU bank lock, and wake up the sleeping
+ * processes.
+ *
+ * If another group starts to update a page in a different SLRU bank, they
+ * can proceed concurrently, since the bank lock they're going to use is
+ * different from ours. If another group starts to update a page in the
+ * same bank as ours, they wait until we release the lock.
+ */
nextidx = pg_atomic_read_u32(&procglobal->clogGroupFirst);
while (true)
@@ -453,10 +495,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
* There is a race condition here, which is that after doing the below
* check and before adding this proc's clog update to a group, the
* group leader might have already finished the group update for this
- * page and becomes group leader of another group. This will lead to a
- * situation where a single group can have different clog page
- * updates. This isn't likely and will still work, just maybe a bit
- * less efficiently.
+ * page and becomes group leader of another group, updating a
+ * different page. This will lead to a situation where a single group
+ * can have different clog page updates. This isn't likely and will
+ * still work, just less efficiently -- we handle this case by
+ * switching to a different bank lock in the loop below.
*/
if (nextidx != INVALID_PGPROCNO &&
GetPGProcByNumber(nextidx)->clogGroupMemberPage != proc->clogGroupMemberPage)
@@ -474,7 +517,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
if (pg_atomic_compare_exchange_u32(&procglobal->clogGroupFirst,
&nextidx,
- (uint32) pgprocno))
+ (uint32) MyProcNumber))
break;
}
@@ -508,13 +551,21 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
return true;
}
- /* We are the leader. Acquire the lock on behalf of everyone. */
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ /*
+ * By here, we know we're the leader process. Acquire the SLRU bank lock
+ * that corresponds to the page we originally wanted to modify.
+ */
+ prevpageno = proc->clogGroupMemberPage;
+ prevlock = SimpleLruGetBankLock(XactCtl, prevpageno);
+ LWLockAcquire(prevlock, LW_EXCLUSIVE);
/*
* Now that we've got the lock, clear the list of processes waiting for
* group XID status update, saving a pointer to the head of the list.
- * Trying to pop elements one at a time could lead to an ABA problem.
+ * (Trying to pop elements one at a time could lead to an ABA problem.)
+ *
+ * At this point, any processes trying to do this would create a separate
+ * group.
*/
nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst,
INVALID_PGPROCNO);
@@ -526,6 +577,31 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
while (nextidx != INVALID_PGPROCNO)
{
PGPROC *nextproc = &ProcGlobal->allProcs[nextidx];
+ int thispageno = nextproc->clogGroupMemberPage;
+
+ /*
+ * If the page to update belongs to a different bank than the previous
+ * one, exchange bank lock to the new one. This should be quite rare,
+ * as described above.
+ *
+ * (We could try to optimize this by waking up the processes for which
+ * we have already updated the status while we exchange the lock, but
+ * the code doesn't do that at present. I think it'd require
+ * additional bookkeeping, making the common path slower in order to
+ * improve an infrequent case.)
+ */
+ if (thispageno != prevpageno)
+ {
+ LWLock *lock = SimpleLruGetBankLock(XactCtl, thispageno);
+
+ if (prevlock != lock)
+ {
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ }
+ prevlock = lock;
+ prevpageno = thispageno;
+ }
/*
* Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs
@@ -545,12 +621,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
}
/* We're done with the lock now. */
- LWLockRelease(XactSLRULock);
+ if (prevlock != NULL)
+ LWLockRelease(prevlock);
/*
* Now that we've released the lock, go back and wake everybody up. We
* don't do this under the lock so as to keep lock hold times to a
* minimum.
+ *
+ * (Perhaps we could do this in two passes, the first setting
+ * clogGroupNext to invalid while saving the semaphores to an array, then
+ * a single write barrier, then another pass unlocking the semaphores.)
*/
while (wakeidx != INVALID_PGPROCNO)
{
@@ -574,7 +655,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
/*
* Sets the commit status of a single transaction.
*
- * Must be called with XactSLRULock held
+ * Caller must hold the corresponding SLRU bank lock, will be held at exit.
*/
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
@@ -585,6 +666,11 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
char byteval;
char curval;
+ Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(xid));
+ Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl,
+ XactCtl->shared->page_number[slotno]),
+ LW_EXCLUSIVE));
+
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
@@ -666,7 +752,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
lsnindex = GetLSNIndex(slotno, xid);
*lsn = XactCtl->shared->group_lsn[lsnindex];
- LWLockRelease(XactSLRULock);
+ LWLockRelease(SimpleLruGetBankLock(XactCtl, pageno));
return status;
}
@@ -674,23 +760,18 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
/*
* Number of shared CLOG buffers.
*
- * On larger multi-processor systems, it is possible to have many CLOG page
- * requests in flight at one time which could lead to disk access for CLOG
- * page if the required page is not found in memory. Testing revealed that we
- * can get the best performance by having 128 CLOG buffers, more than that it
- * doesn't improve performance.
- *
- * Unconditionally keeping the number of CLOG buffers to 128 did not seem like
- * a good idea, because it would increase the minimum amount of shared memory
- * required to start, which could be a problem for people running very small
- * configurations. The following formula seems to represent a reasonable
- * compromise: people with very low values for shared_buffers will get fewer
- * CLOG buffers as well, and everyone else will get 128.
+ * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
+ * Otherwise just cap the configured amount to be between 16 and the maximum
+ * allowed.
*/
-Size
+static int
CLOGShmemBuffers(void)
{
- return Min(128, Max(4, NBuffers / 512));
+ /* auto-tune based on shared buffers */
+ if (transaction_buffers == 0)
+ return SimpleLruAutotuneBuffers(512, 1024);
+
+ return Min(Max(16, transaction_buffers), CLOG_MAX_ALLOWED_BUFFERS);
}
/*
@@ -705,14 +786,44 @@ CLOGShmemSize(void)
void
CLOGShmemInit(void)
{
+ /* If auto-tuning is requested, now is the time to do it */
+ if (transaction_buffers == 0)
+ {
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%d", CLOGShmemBuffers());
+ SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER,
+ PGC_S_DYNAMIC_DEFAULT);
+
+ /*
+ * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
+ * However, if the DBA explicitly set transaction_buffers = 0 in the
+ * config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that
+ * and we must force the matter with PGC_S_OVERRIDE.
+ */
+ if (transaction_buffers == 0) /* failed to apply it? */
+ SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER,
+ PGC_S_OVERRIDE);
+ }
+ Assert(transaction_buffers != 0);
+
XactCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(XactCtl, "transaction", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
- XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER,
- SYNC_HANDLER_CLOG, false);
+ "pg_xact", LWTRANCHE_XACT_BUFFER,
+ LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG, false);
SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
}
/*
+ * GUC check_hook for transaction_buffers
+ */
+bool
+check_transaction_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("transaction_buffers", newval);
+}
+
+/*
* This func must be called ONCE on system install. It creates
* the initial CLOG segment. (The CLOG directory is assumed to
* have been created by initdb, and CLOGShmemInit must have been
@@ -722,8 +833,9 @@ void
BootStrapCLOG(void)
{
int slotno;
+ LWLock *lock = SimpleLruGetBankLock(XactCtl, 0);
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the commit log */
slotno = ZeroCLOGPage(0, false);
@@ -732,7 +844,7 @@ BootStrapCLOG(void)
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -781,8 +893,9 @@ TrimCLOG(void)
{
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
int64 pageno = TransactionIdToPage(xid);
+ LWLock *lock = SimpleLruGetBankLock(XactCtl, pageno);
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current clog page. Under normal
@@ -814,7 +927,7 @@ TrimCLOG(void)
XactCtl->shared->page_dirty[slotno] = true;
}
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -846,6 +959,7 @@ void
ExtendCLOG(TransactionId newestXact)
{
int64 pageno;
+ LWLock *lock;
/*
* No work except at first XID of a page. But beware: just after
@@ -856,13 +970,14 @@ ExtendCLOG(TransactionId newestXact)
return;
pageno = TransactionIdToPage(newestXact);
+ lock = SimpleLruGetBankLock(XactCtl, pageno);
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
}
@@ -1000,16 +1115,18 @@ clog_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
+ LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
- LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(XactCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCLOGPage(pageno, false);
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
- LWLockRelease(XactSLRULock);
+ LWLockRelease(lock);
}
else if (info == CLOG_TRUNCATE)
{
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index d965db89c75..5c35a18348c 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -33,6 +33,7 @@
#include "pg_trace.h"
#include "storage/shmem.h"
#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
@@ -225,10 +226,11 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int64 pageno)
{
+ LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
int slotno;
int i;
- LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
@@ -238,13 +240,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
CommitTsCtl->shared->page_dirty[slotno] = true;
- LWLockRelease(CommitTsSLRULock);
+ LWLockRelease(lock);
}
/*
* Sets the commit timestamp of a single transaction.
*
- * Must be called with CommitTsSLRULock held
+ * Caller must hold the correct SLRU bank lock, will be held at exit
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
@@ -345,7 +347,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (nodeid)
*nodeid = entry.nodeid;
- LWLockRelease(CommitTsSLRULock);
+ LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
return *ts != 0;
}
@@ -499,14 +501,18 @@ pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
/*
* Number of shared CommitTS buffers.
*
- * We use a very similar logic as for the number of CLOG buffers (except we
- * scale up twice as fast with shared buffers, and the maximum is twice as
- * high); see comments in CLOGShmemBuffers.
+ * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
+ * Otherwise just cap the configured amount to be between 16 and the maximum
+ * allowed.
*/
-Size
+static int
CommitTsShmemBuffers(void)
{
- return Min(256, Max(4, NBuffers / 256));
+ /* auto-tune based on shared buffers */
+ if (commit_timestamp_buffers == 0)
+ return SimpleLruAutotuneBuffers(512, 1024);
+
+ return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
}
/*
@@ -528,10 +534,31 @@ CommitTsShmemInit(void)
{
bool found;
+ /* If auto-tuning is requested, now is the time to do it */
+ if (commit_timestamp_buffers == 0)
+ {
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
+ SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
+ PGC_S_DYNAMIC_DEFAULT);
+
+ /*
+ * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
+ * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
+ * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
+ * that and we must force the matter with PGC_S_OVERRIDE.
+ */
+ if (commit_timestamp_buffers == 0) /* failed to apply it? */
+ SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
+ PGC_S_OVERRIDE);
+ }
+ Assert(commit_timestamp_buffers != 0);
+
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
- CommitTsSLRULock, "pg_commit_ts",
- LWTRANCHE_COMMITTS_BUFFER,
+ "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
+ LWTRANCHE_COMMITTS_SLRU,
SYNC_HANDLER_COMMIT_TS,
false);
SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
@@ -554,6 +581,15 @@ CommitTsShmemInit(void)
}
/*
+ * GUC check_hook for commit_timestamp_buffers
+ */
+bool
+check_commit_ts_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("commit_timestamp_buffers", newval);
+}
+
+/*
* This function must be called ONCE on system install.
*
* (The CommitTs directory is assumed to have been created by initdb, and
@@ -715,13 +751,14 @@ ActivateCommitTs(void)
/* Create the current segment file, if necessary */
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
{
+ LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
int slotno;
- LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
- LWLockRelease(CommitTsSLRULock);
+ LWLockRelease(lock);
}
/* Change the activation status in shared memory. */
@@ -760,8 +797,6 @@ DeactivateCommitTs(void)
TransamVariables->oldestCommitTsXid = InvalidTransactionId;
TransamVariables->newestCommitTsXid = InvalidTransactionId;
- LWLockRelease(CommitTsLock);
-
/*
* Remove *all* files. This is necessary so that there are no leftover
* files; in the case where this feature is later enabled after running
@@ -769,10 +804,16 @@ DeactivateCommitTs(void)
* (We can probably tolerate out-of-sequence files, as they are going to
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
+ *
+ * Note that we do this with CommitTsLock acquired in exclusive mode. This
+ * is very heavy-handed, but since this routine can only be called in the
+ * replica and should happen very rarely, we don't worry too much about
+ * it. Note also that no process should be consulting this SLRU if we
+ * have just deactivated it.
*/
- LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
- LWLockRelease(CommitTsSLRULock);
+
+ LWLockRelease(CommitTsLock);
}
/*
@@ -804,6 +845,7 @@ void
ExtendCommitTs(TransactionId newestXact)
{
int64 pageno;
+ LWLock *lock;
/*
* Nothing to do if module not enabled. Note we do an unlocked read of
@@ -824,12 +866,14 @@ ExtendCommitTs(TransactionId newestXact)
pageno = TransactionIdToCTsPage(newestXact);
- LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
+
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCommitTsPage(pageno, !InRecovery);
- LWLockRelease(CommitTsSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -983,16 +1027,18 @@ commit_ts_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
+ LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
- LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
- LWLockRelease(CommitTsSLRULock);
+ LWLockRelease(lock);
}
else if (info == COMMIT_TS_TRUNCATE)
{
diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index 64040d330ef..9b815061452 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -88,6 +88,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@@ -192,10 +193,10 @@ static SlruCtlData MultiXactMemberCtlData;
/*
* MultiXact state shared across all backends. All this state is protected
- * by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and
- * MultiXactMemberSLRULock to guard accesses to the two sets of SLRU
- * buffers. For concurrency's sake, we avoid holding more than one of these
- * locks at a time.)
+ * by MultiXactGenLock. (We also use SLRU bank's lock of MultiXactOffset and
+ * MultiXactMember to guard accesses to the two sets of SLRU buffers. For
+ * concurrency's sake, we avoid holding more than one of these locks at a
+ * time.)
*/
typedef struct MultiXactStateData
{
@@ -870,12 +871,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
int slotno;
MultiXactOffset *offptr;
int i;
-
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+ LWLock *lock;
+ LWLock *prevlock = NULL;
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+
/*
* Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
* to complain about if there's any I/O error. This is kinda bogus, but
@@ -891,10 +895,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
- /* Exchange our lock */
- LWLockRelease(MultiXactOffsetSLRULock);
-
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+ /* Release MultiXactOffset SLRU lock. */
+ LWLockRelease(lock);
prev_pageno = -1;
@@ -916,6 +918,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
if (pageno != prev_pageno)
{
+ /*
+ * MultiXactMember SLRU page is changed so check if this new page
+ * fall into the different SLRU bank then release the old bank's
+ * lock and acquire lock on the new bank.
+ */
+ lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
+ if (lock != prevlock)
+ {
+ if (prevlock != NULL)
+ LWLockRelease(prevlock);
+
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
prev_pageno = pageno;
}
@@ -936,7 +952,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
}
- LWLockRelease(MultiXactMemberSLRULock);
+ if (prevlock != NULL)
+ LWLockRelease(prevlock);
}
/*
@@ -1239,6 +1256,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
MultiXactId tmpMXact;
MultiXactOffset nextOffset;
MultiXactMember *ptr;
+ LWLock *lock;
+ LWLock *prevlock = NULL;
debug_elog3(DEBUG2, "GetMembers: asked for %u", multi);
@@ -1342,11 +1361,22 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
* time on every multixact creation.
*/
retry:
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
-
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
+ /*
+ * If this page falls under a different bank, release the old bank's lock
+ * and acquire the lock of the new bank.
+ */
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
+ if (lock != prevlock)
+ {
+ if (prevlock != NULL)
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
+
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@@ -1379,7 +1409,21 @@ retry:
entryno = MultiXactIdToOffsetEntry(tmpMXact);
if (pageno != prev_pageno)
+ {
+ /*
+ * Since we're going to access a different SLRU page, if this page
+ * falls under a different bank, release the old bank's lock and
+ * acquire the lock of the new bank.
+ */
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
+ if (prevlock != lock)
+ {
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact);
+ }
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@@ -1388,7 +1432,8 @@ retry:
if (nextMXOffset == 0)
{
/* Corner case 2: next multixact is still being filled in */
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(prevlock);
+ prevlock = NULL;
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
goto retry;
@@ -1397,13 +1442,11 @@ retry:
length = nextMXOffset - offset;
}
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(prevlock);
+ prevlock = NULL;
ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember));
- /* Now get the members themselves. */
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
-
truelength = 0;
prev_pageno = -1;
for (i = 0; i < length; i++, offset++)
@@ -1419,6 +1462,20 @@ retry:
if (pageno != prev_pageno)
{
+ /*
+ * Since we're going to access a different SLRU page, if this page
+ * falls under a different bank, release the old bank's lock and
+ * acquire the lock of the new bank.
+ */
+ lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
+ if (lock != prevlock)
+ {
+ if (prevlock)
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
+
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
prev_pageno = pageno;
}
@@ -1442,7 +1499,8 @@ retry:
truelength++;
}
- LWLockRelease(MultiXactMemberSLRULock);
+ if (prevlock)
+ LWLockRelease(prevlock);
/* A multixid with zero members should not happen */
Assert(truelength > 0);
@@ -1834,8 +1892,8 @@ MultiXactShmemSize(void)
mul_size(sizeof(MultiXactId) * 2, MaxOldestSlot))
size = SHARED_MULTIXACT_STATE_SIZE;
- size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS, 0));
- size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS, 0));
+ size = add_size(size, SimpleLruShmemSize(multixact_offset_buffers, 0));
+ size = add_size(size, SimpleLruShmemSize(multixact_member_buffers, 0));
return size;
}
@@ -1851,16 +1909,16 @@ MultiXactShmemInit(void)
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
SimpleLruInit(MultiXactOffsetCtl,
- "multixact_offset", NUM_MULTIXACTOFFSET_BUFFERS, 0,
- MultiXactOffsetSLRULock, "pg_multixact/offsets",
- LWTRANCHE_MULTIXACTOFFSET_BUFFER,
+ "multixact_offset", multixact_offset_buffers, 0,
+ "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER,
+ LWTRANCHE_MULTIXACTOFFSET_SLRU,
SYNC_HANDLER_MULTIXACT_OFFSET,
false);
SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE);
SimpleLruInit(MultiXactMemberCtl,
- "multixact_member", NUM_MULTIXACTMEMBER_BUFFERS, 0,
- MultiXactMemberSLRULock, "pg_multixact/members",
- LWTRANCHE_MULTIXACTMEMBER_BUFFER,
+ "multixact_member", multixact_member_buffers, 0,
+ "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER,
+ LWTRANCHE_MULTIXACTMEMBER_SLRU,
SYNC_HANDLER_MULTIXACT_MEMBER,
false);
/* doesn't call SimpleLruTruncate() or meet criteria for unit tests */
@@ -1888,6 +1946,24 @@ MultiXactShmemInit(void)
}
/*
+ * GUC check_hook for multixact_offset_buffers
+ */
+bool
+check_multixact_offset_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("multixact_offset_buffers", newval);
+}
+
+/*
+ * GUC check_hook for multixact_member_buffer
+ */
+bool
+check_multixact_member_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("multixact_member_buffers", newval);
+}
+
+/*
* This func must be called ONCE on system install. It creates the initial
* MultiXact segments. (The MultiXacts directories are assumed to have been
* created by initdb, and MultiXactShmemInit must have been called already.)
@@ -1896,8 +1972,10 @@ void
BootStrapMultiXact(void)
{
int slotno;
+ LWLock *lock;
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, 0);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the offsets log */
slotno = ZeroMultiXactOffsetPage(0, false);
@@ -1906,9 +1984,10 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(lock);
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(MultiXactMemberCtl, 0);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the members log */
slotno = ZeroMultiXactMemberPage(0, false);
@@ -1917,7 +1996,7 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
- LWLockRelease(MultiXactMemberSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -1977,10 +2056,12 @@ static void
MaybeExtendOffsetSlru(void)
{
int64 pageno;
+ LWLock *lock;
pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno))
{
@@ -1995,7 +2076,7 @@ MaybeExtendOffsetSlru(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
}
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -2049,6 +2130,8 @@ TrimMultiXact(void)
oldestMXactDB = MultiXactState->oldestMultiXactDB;
LWLockRelease(MultiXactGenLock);
+ /* Clean up offsets state */
+
/*
* (Re-)Initialize our idea of the latest page number for offsets.
*/
@@ -2056,9 +2139,6 @@ TrimMultiXact(void)
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
- /* Clean up offsets state */
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
-
/*
* Zero out the remainder of the current offsets page. See notes in
* TrimCLOG() for background. Unlike CLOG, some WAL record covers every
@@ -2072,7 +2152,9 @@ TrimMultiXact(void)
{
int slotno;
MultiXactOffset *offptr;
+ LWLock *lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@@ -2080,10 +2162,9 @@ TrimMultiXact(void)
MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset)));
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
+ LWLockRelease(lock);
}
- LWLockRelease(MultiXactOffsetSLRULock);
-
/*
* And the same for members.
*
@@ -2093,8 +2174,6 @@ TrimMultiXact(void)
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
-
/*
* Zero out the remainder of the current members page. See notes in
* TrimCLOG() for motivation.
@@ -2105,7 +2184,9 @@ TrimMultiXact(void)
int slotno;
TransactionId *xidptr;
int memberoff;
+ LWLock *lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
memberoff = MXOffsetToMemberOffset(offset);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset);
xidptr = (TransactionId *)
@@ -2120,10 +2201,9 @@ TrimMultiXact(void)
*/
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
+ LWLockRelease(lock);
}
- LWLockRelease(MultiXactMemberSLRULock);
-
/* signal that we're officially up */
LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
MultiXactState->finishedStartup = true;
@@ -2411,6 +2491,7 @@ static void
ExtendMultiXactOffset(MultiXactId multi)
{
int64 pageno;
+ LWLock *lock;
/*
* No work except at first MultiXactId of a page. But beware: just after
@@ -2421,13 +2502,14 @@ ExtendMultiXactOffset(MultiXactId multi)
return;
pageno = MultiXactIdToOffsetPage(multi);
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactOffsetPage(pageno, true);
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -2460,15 +2542,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
if (flagsoff == 0 && flagsbit == 0)
{
int64 pageno;
+ LWLock *lock;
pageno = MXOffsetToMemberPage(offset);
+ lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactMemberPage(pageno, true);
- LWLockRelease(MultiXactMemberSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -2766,7 +2850,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result)
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(SimpleLruGetBankLock(MultiXactOffsetCtl, pageno));
*result = offset;
return true;
@@ -3248,31 +3332,35 @@ multixact_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
+ LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
- LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroMultiXactOffsetPage(pageno, false);
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
- LWLockRelease(MultiXactOffsetSLRULock);
+ LWLockRelease(lock);
}
else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
{
int64 pageno;
int slotno;
+ LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
- LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroMultiXactMemberPage(pageno, false);
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
- LWLockRelease(MultiXactMemberSLRULock);
+ LWLockRelease(lock);
}
else if (info == XLOG_MULTIXACT_CREATE_ID)
{
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 93cefcd10d3..f774d285b7f 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -1,28 +1,38 @@
/*-------------------------------------------------------------------------
*
* slru.c
- * Simple LRU buffering for transaction status logfiles
+ * Simple LRU buffering for wrap-around-able permanent metadata
*
- * We use a simple least-recently-used scheme to manage a pool of page
- * buffers. Under ordinary circumstances we expect that write
- * traffic will occur mostly to the latest page (and to the just-prior
- * page, soon after a page transition). Read traffic will probably touch
- * a larger span of pages, but in any case a fairly small number of page
- * buffers should be sufficient. So, we just search the buffers using plain
- * linear search; there's no need for a hashtable or anything fancy.
- * The management algorithm is straight LRU except that we will never swap
- * out the latest page (since we know it's going to be hit again eventually).
+ * This module is used to maintain various pieces of transaction status
+ * indexed by TransactionId (such as commit status, parent transaction ID,
+ * commit timestamp), as well as storage for multixacts, serializable
+ * isolation locks and NOTIFY traffic. Extensions can define their own
+ * SLRUs, too.
*
- * We use a control LWLock to protect the shared data structures, plus
- * per-buffer LWLocks that synchronize I/O for each buffer. The control lock
- * must be held to examine or modify any shared state. A process that is
- * reading in or writing out a page buffer does not hold the control lock,
- * only the per-buffer lock for the buffer it is working on. One exception
- * is latest_page_number, which is read and written using atomic ops.
+ * Under ordinary circumstances we expect that write traffic will occur
+ * mostly to the latest page (and to the just-prior page, soon after a
+ * page transition). Read traffic will probably touch a larger span of
+ * pages, but a relatively small number of buffers should be sufficient.
*
- * "Holding the control lock" means exclusive lock in all cases except for
- * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
- * the implications of that.
+ * We use a simple least-recently-used scheme to manage a pool of shared
+ * page buffers, split in banks by the lowest bits of the page number, and
+ * the management algorithm only processes the bank to which the desired
+ * page belongs, so a linear search is sufficient; there's no need for a
+ * hashtable or anything fancy. The algorithm is straight LRU except that
+ * we will never swap out the latest page (since we know it's going to be
+ * hit again eventually).
+ *
+ * We use per-bank control LWLocks to protect the shared data structures,
+ * plus per-buffer LWLocks that synchronize I/O for each buffer. The
+ * bank's control lock must be held to examine or modify any of the bank's
+ * shared state. A process that is reading in or writing out a page
+ * buffer does not hold the control lock, only the per-buffer lock for the
+ * buffer it is working on. One exception is latest_page_number, which is
+ * read and written using atomic ops.
+ *
+ * "Holding the bank control lock" means exclusive lock in all cases
+ * except for SimpleLruReadPage_ReadOnly(); see comments for
+ * SlruRecentlyUsed() for the implications of that.
*
* When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
* before releasing the control lock. The per-buffer lock is released after
@@ -60,6 +70,7 @@
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/shmem.h"
+#include "utils/guc_hooks.h"
static inline int
SlruFileName(SlruCtl ctl, char *path, int64 segno)
@@ -106,6 +117,23 @@ typedef struct SlruWriteAllData
typedef struct SlruWriteAllData *SlruWriteAll;
+
+/*
+ * Bank size for the slot array. Pages are assigned a bank according to their
+ * page number, with each bank being this size. We want a power of 2 so that
+ * we can determine the bank number for a page with just bit shifting; we also
+ * want to keep the bank size small so that LRU victim search is fast. 16
+ * buffers per bank seems a good number.
+ */
+#define SLRU_BANK_BITSHIFT 4
+#define SLRU_BANK_SIZE (1 << SLRU_BANK_BITSHIFT)
+
+/*
+ * Macro to get the bank number to which the slot belongs.
+ */
+#define SlotGetBankNumber(slotno) ((slotno) >> SLRU_BANK_BITSHIFT)
+
+
/*
* Populate a file tag describing a segment file. We only use the segment
* number, since we can derive everything else we need by having separate
@@ -118,34 +146,6 @@ typedef struct SlruWriteAllData *SlruWriteAll;
(a).segno = (xx_segno) \
)
-/*
- * Macro to mark a buffer slot "most recently used". Note multiple evaluation
- * of arguments!
- *
- * The reason for the if-test is that there are often many consecutive
- * accesses to the same page (particularly the latest page). By suppressing
- * useless increments of cur_lru_count, we reduce the probability that old
- * pages' counts will "wrap around" and make them appear recently used.
- *
- * We allow this code to be executed concurrently by multiple processes within
- * SimpleLruReadPage_ReadOnly(). As long as int reads and writes are atomic,
- * this should not cause any completely-bogus values to enter the computation.
- * However, it is possible for either cur_lru_count or individual
- * page_lru_count entries to be "reset" to lower values than they should have,
- * in case a process is delayed while it executes this macro. With care in
- * SlruSelectLRUPage(), this does little harm, and in any case the absolute
- * worst possible consequence is a nonoptimal choice of page to evict. The
- * gain from allowing concurrent reads of SLRU pages seems worth it.
- */
-#define SlruRecentlyUsed(shared, slotno) \
- do { \
- int new_lru_count = (shared)->cur_lru_count; \
- if (new_lru_count != (shared)->page_lru_count[slotno]) { \
- (shared)->cur_lru_count = ++new_lru_count; \
- (shared)->page_lru_count[slotno] = new_lru_count; \
- } \
- } while (0)
-
/* Saved info for SlruReportIOError */
typedef enum
{
@@ -173,6 +173,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
int64 segpage, void *data);
static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
+static inline void SlruRecentlyUsed(SlruShared shared, int slotno);
/*
@@ -182,8 +183,12 @@ static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
Size
SimpleLruShmemSize(int nslots, int nlsns)
{
+ int nbanks = nslots / SLRU_BANK_SIZE;
Size sz;
+ Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
+ Assert(nslots % SLRU_BANK_SIZE == 0);
+
/* we assume nslots isn't so large as to risk overflow */
sz = MAXALIGN(sizeof(SlruSharedData));
sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */
@@ -192,6 +197,8 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz += MAXALIGN(nslots * sizeof(int64)); /* page_number[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
+ sz += MAXALIGN(nbanks * sizeof(LWLockPadded)); /* bank_locks[] */
+ sz += MAXALIGN(nbanks * sizeof(int)); /* bank_cur_lru_count[] */
if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
@@ -200,6 +207,21 @@ SimpleLruShmemSize(int nslots, int nlsns)
}
/*
+ * Determine a number of SLRU buffers to use.
+ *
+ * We simply divide shared_buffers by the divisor given and cap
+ * that at the maximum given; but always at least SLRU_BANK_SIZE.
+ * Round down to the nearest multiple of SLRU_BANK_SIZE.
+ */
+int
+SimpleLruAutotuneBuffers(int divisor, int max)
+{
+ return Min(max - (max % SLRU_BANK_SIZE),
+ Max(SLRU_BANK_SIZE,
+ NBuffers / divisor - (NBuffers / divisor) % SLRU_BANK_SIZE));
+}
+
+/*
* Initialize, or attach to, a simple LRU cache in shared memory.
*
* ctl: address of local (unshared) control structure.
@@ -208,16 +230,20 @@ SimpleLruShmemSize(int nslots, int nlsns)
* nlsns: number of LSN groups per page (set to zero if not relevant).
* ctllock: LWLock to use to control access to the shared control structure.
* subdir: PGDATA-relative subdirectory that will contain the files.
- * tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks.
+ * buffer_tranche_id: tranche ID to use for the SLRU's per-buffer LWLocks.
+ * bank_tranche_id: tranche ID to use for the bank LWLocks.
* sync_handler: which set of functions to use to handle sync requests
*/
void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
- LWLock *ctllock, const char *subdir, int tranche_id,
+ const char *subdir, int buffer_tranche_id, int bank_tranche_id,
SyncRequestHandler sync_handler, bool long_segment_names)
{
SlruShared shared;
bool found;
+ int nbanks = nslots / SLRU_BANK_SIZE;
+
+ Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots, nlsns),
@@ -233,12 +259,9 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
memset(shared, 0, sizeof(SlruSharedData));
- shared->ControlLock = ctllock;
-
shared->num_slots = nslots;
shared->lsn_groups_per_page = nlsns;
- shared->cur_lru_count = 0;
pg_atomic_init_u64(&shared->latest_page_number, 0);
shared->slru_stats_idx = pgstat_get_slru_index(name);
@@ -259,6 +282,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
/* Initialize LWLocks */
shared->buffer_locks = (LWLockPadded *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLockPadded));
+ shared->bank_locks = (LWLockPadded *) (ptr + offset);
+ offset += MAXALIGN(nbanks * sizeof(LWLockPadded));
+ shared->bank_cur_lru_count = (int *) (ptr + offset);
+ offset += MAXALIGN(nbanks * sizeof(int));
if (nlsns > 0)
{
@@ -270,7 +297,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
for (int slotno = 0; slotno < nslots; slotno++)
{
LWLockInitialize(&shared->buffer_locks[slotno].lock,
- tranche_id);
+ buffer_tranche_id);
shared->page_buffer[slotno] = ptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
@@ -279,11 +306,21 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ptr += BLCKSZ;
}
+ /* Initialize the slot banks. */
+ for (int bankno = 0; bankno < nbanks; bankno++)
+ {
+ LWLockInitialize(&shared->bank_locks[bankno].lock, bank_tranche_id);
+ shared->bank_cur_lru_count[bankno] = 0;
+ }
+
/* Should fit to estimated shmem size */
Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
}
else
+ {
Assert(found);
+ Assert(shared->num_slots == nslots);
+ }
/*
* Initialize the unshared control struct, including directory path. We
@@ -292,16 +329,33 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ctl->shared = shared;
ctl->sync_handler = sync_handler;
ctl->long_segment_names = long_segment_names;
+ ctl->bank_mask = (nslots / SLRU_BANK_SIZE) - 1;
strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir));
}
/*
+ * Helper function for GUC check_hook to check whether slru buffers are in
+ * multiples of SLRU_BANK_SIZE.
+ */
+bool
+check_slru_buffers(const char *name, int *newval)
+{
+ /* Valid values are multiples of SLRU_BANK_SIZE */
+ if (*newval % SLRU_BANK_SIZE == 0)
+ return true;
+
+ GUC_check_errdetail("\"%s\" must be a multiple of %d", name,
+ SLRU_BANK_SIZE);
+ return false;
+}
+
+/*
* Initialize (or reinitialize) a page to zeroes.
*
* The page is not actually written, just set up in shared memory.
* The slot number of the new page is returned.
*
- * Control lock must be held at entry, and will be held at exit.
+ * Bank lock must be held at entry, and will be held at exit.
*/
int
SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
@@ -309,6 +363,8 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
SlruShared shared = ctl->shared;
int slotno;
+ Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
+
/* Find a suitable buffer slot for the page */
slotno = SlruSelectLRUPage(ctl, pageno);
Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
@@ -369,18 +425,21 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
* guarantee that new I/O hasn't been started before we return, though.
* In fact the slot might not even contain the same page anymore.)
*
- * Control lock must be held at entry, and will be held at exit.
+ * Bank lock must be held at entry, and will be held at exit.
*/
static void
SimpleLruWaitIO(SlruCtl ctl, int slotno)
{
SlruShared shared = ctl->shared;
+ int bankno = SlotGetBankNumber(slotno);
+
+ Assert(&shared->page_status[slotno] != SLRU_PAGE_EMPTY);
/* See notes at top of file */
- LWLockRelease(shared->ControlLock);
+ LWLockRelease(&shared->bank_locks[bankno].lock);
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
LWLockRelease(&shared->buffer_locks[slotno].lock);
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
/*
* If the slot is still in an io-in-progress state, then either someone
@@ -423,7 +482,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
- * Control lock must be held at entry, and will be held at exit.
+ * The correct bank lock must be held at entry, and will be held at exit.
*/
int
SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
@@ -431,18 +490,21 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
{
SlruShared shared = ctl->shared;
+ Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
+
/* Outer loop handles restart if we must wait for someone else's I/O */
for (;;)
{
int slotno;
+ int bankno;
bool ok;
/* See if page already is in memory; if not, pick victim slot */
slotno = SlruSelectLRUPage(ctl, pageno);
/* Did we find the page in memory? */
- if (shared->page_number[slotno] == pageno &&
- shared->page_status[slotno] != SLRU_PAGE_EMPTY)
+ if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
+ shared->page_number[slotno] == pageno)
{
/*
* If page is still being read in, we must wait for I/O. Likewise
@@ -477,9 +539,10 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
+ bankno = SlotGetBankNumber(slotno);
- /* Release control lock while doing I/O */
- LWLockRelease(shared->ControlLock);
+ /* Release bank lock while doing I/O */
+ LWLockRelease(&shared->bank_locks[bankno].lock);
/* Do the read */
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
@@ -487,8 +550,8 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Set the LSNs for this newly read-in page to zero */
SimpleLruZeroLSNs(ctl, slotno);
- /* Re-acquire control lock and update page state */
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ /* Re-acquire bank control lock and update page state */
+ LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
@@ -522,22 +585,25 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
- * Control lock must NOT be held at entry, but will be held at exit.
+ * Bank control lock must NOT be held at entry, but will be held at exit.
* It is unspecified whether the lock will be shared or exclusive.
*/
int
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
{
SlruShared shared = ctl->shared;
+ int bankno = pageno & ctl->bank_mask;
+ int bankstart = bankno * SLRU_BANK_SIZE;
+ int bankend = bankstart + SLRU_BANK_SIZE;
/* Try to find the page while holding only shared lock */
- LWLockAcquire(shared->ControlLock, LW_SHARED);
+ LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED);
/* See if page is already in a buffer */
- for (int slotno = 0; slotno < shared->num_slots; slotno++)
+ for (int slotno = bankstart; slotno < bankend; slotno++)
{
- if (shared->page_number[slotno] == pageno &&
- shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
+ if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
+ shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
{
/* See comments for SlruRecentlyUsed macro */
@@ -551,8 +617,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/* No luck, so switch to normal exclusive lock and do regular read */
- LWLockRelease(shared->ControlLock);
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ LWLockRelease(&shared->bank_locks[bankno].lock);
+ LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
return SimpleLruReadPage(ctl, pageno, true, xid);
}
@@ -566,15 +632,19 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
* the write). However, we *do* attempt a fresh write even if the page
* is already being written; this is for checkpoints.
*
- * Control lock must be held at entry, and will be held at exit.
+ * Bank lock must be held at entry, and will be held at exit.
*/
static void
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
{
SlruShared shared = ctl->shared;
int64 pageno = shared->page_number[slotno];
+ int bankno = SlotGetBankNumber(slotno);
bool ok;
+ Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
+ Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
+
/* If a write is in progress, wait for it to finish */
while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
shared->page_number[slotno] == pageno)
@@ -601,8 +671,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
- /* Release control lock while doing I/O */
- LWLockRelease(shared->ControlLock);
+ /* Release bank lock while doing I/O */
+ LWLockRelease(&shared->bank_locks[bankno].lock);
/* Do the write */
ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
@@ -614,8 +684,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
CloseTransientFile(fdata->fd[i]);
}
- /* Re-acquire control lock and update page state */
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ /* Re-acquire bank lock and update page state */
+ LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
@@ -644,6 +714,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
void
SimpleLruWritePage(SlruCtl ctl, int slotno)
{
+ Assert(&ctl->shared->page_status[slotno] != SLRU_PAGE_EMPTY);
+
SlruInternalWritePage(ctl, slotno, NULL);
}
@@ -1028,17 +1100,53 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/*
- * Select the slot to re-use when we need a free slot.
+ * Mark a buffer slot "most recently used".
+ */
+static inline void
+SlruRecentlyUsed(SlruShared shared, int slotno)
+{
+ int bankno = SlotGetBankNumber(slotno);
+ int new_lru_count = shared->bank_cur_lru_count[bankno];
+
+ Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
+
+ /*
+ * The reason for the if-test is that there are often many consecutive
+ * accesses to the same page (particularly the latest page). By
+ * suppressing useless increments of bank_cur_lru_count, we reduce the
+ * probability that old pages' counts will "wrap around" and make them
+ * appear recently used.
+ *
+ * We allow this code to be executed concurrently by multiple processes
+ * within SimpleLruReadPage_ReadOnly(). As long as int reads and writes
+ * are atomic, this should not cause any completely-bogus values to enter
+ * the computation. However, it is possible for either bank_cur_lru_count
+ * or individual page_lru_count entries to be "reset" to lower values than
+ * they should have, in case a process is delayed while it executes this
+ * function. With care in SlruSelectLRUPage(), this does little harm, and
+ * in any case the absolute worst possible consequence is a nonoptimal
+ * choice of page to evict. The gain from allowing concurrent reads of
+ * SLRU pages seems worth it.
+ */
+ if (new_lru_count != shared->page_lru_count[slotno])
+ {
+ shared->bank_cur_lru_count[bankno] = ++new_lru_count;
+ shared->page_lru_count[slotno] = new_lru_count;
+ }
+}
+
+/*
+ * Select the slot to re-use when we need a free slot for the given page.
*
- * The target page number is passed because we need to consider the
- * possibility that some other process reads in the target page while
- * we are doing I/O to free a slot. Hence, check or recheck to see if
- * any slot already holds the target page, and return that slot if so.
- * Thus, the returned slot is *either* a slot already holding the pageno
- * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
- * or CLEAN).
+ * The target page number is passed not only because we need to know the
+ * correct bank to use, but also because we need to consider the possibility
+ * that some other process reads in the target page while we are doing I/O to
+ * free a slot. Hence, check or recheck to see if any slot already holds the
+ * target page, and return that slot if so. Thus, the returned slot is
+ * *either* a slot already holding the pageno (could be any state except
+ * EMPTY), *or* a freeable slot (state EMPTY or CLEAN).
*
- * Control lock must be held at entry, and will be held at exit.
+ * The correct bank lock must be held at entry, and will be held at exit.
*/
static int
SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
@@ -1055,12 +1163,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
int bestinvalidslot = 0; /* keep compiler quiet */
int best_invalid_delta = -1;
int64 best_invalid_page_number = 0; /* keep compiler quiet */
+ int bankno = pageno & ctl->bank_mask;
+ int bankstart = bankno * SLRU_BANK_SIZE;
+ int bankend = bankstart + SLRU_BANK_SIZE;
+
+ Assert(LWLockHeldByMe(&shared->bank_locks[bankno].lock));
/* See if page already has a buffer assigned */
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
- if (shared->page_number[slotno] == pageno &&
- shared->page_status[slotno] != SLRU_PAGE_EMPTY)
+ if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
+ shared->page_number[slotno] == pageno)
return slotno;
}
@@ -1091,14 +1204,15 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
* That gets us back on the path to having good data when there are
* multiple pages with the same lru_count.
*/
- cur_count = (shared->cur_lru_count)++;
- for (int slotno = 0; slotno < shared->num_slots; slotno++)
+ cur_count = (shared->bank_cur_lru_count[bankno])++;
+ for (int slotno = bankstart; slotno < bankend; slotno++)
{
int this_delta;
int64 this_page_number;
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
return slotno;
+
this_delta = cur_count - shared->page_lru_count[slotno];
if (this_delta < 0)
{
@@ -1193,6 +1307,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
SlruShared shared = ctl->shared;
SlruWriteAllData fdata;
int64 pageno = 0;
+ int prevbank = SlotGetBankNumber(0);
bool ok;
/* update the stats counter of flushes */
@@ -1203,10 +1318,27 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
*/
fdata.num_files = 0;
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
+ int curbank = SlotGetBankNumber(slotno);
+
+ /*
+ * If the current bank lock is not same as the previous bank lock then
+ * release the previous lock and acquire the new lock.
+ */
+ if (curbank != prevbank)
+ {
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
+ LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
+ prevbank = curbank;
+ }
+
+ /* Do nothing if slot is unused */
+ if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
+ continue;
+
SlruInternalWritePage(ctl, slotno, &fdata);
/*
@@ -1220,7 +1352,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
!shared->page_dirty[slotno]));
}
- LWLockRelease(shared->ControlLock);
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
/*
* Now close any files that were open
@@ -1259,6 +1391,7 @@ void
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
{
SlruShared shared = ctl->shared;
+ int prevbank;
/* update the stats counter of truncates */
pgstat_count_slru_truncate(shared->slru_stats_idx);
@@ -1269,8 +1402,6 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
* or just after a checkpoint, any dirty pages should have been flushed
* already ... we're just being extra careful here.)
*/
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
-
restart:
/*
@@ -1282,15 +1413,29 @@ restart:
if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
cutoffPage))
{
- LWLockRelease(shared->ControlLock);
ereport(LOG,
(errmsg("could not truncate directory \"%s\": apparent wraparound",
ctl->Dir)));
return;
}
+ prevbank = SlotGetBankNumber(0);
+ LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
+ int curbank = SlotGetBankNumber(slotno);
+
+ /*
+ * If the current bank lock is not same as the previous bank lock then
+ * release the previous lock and acquire the new lock.
+ */
+ if (curbank != prevbank)
+ {
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
+ LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
+ prevbank = curbank;
+ }
+
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
@@ -1320,10 +1465,12 @@ restart:
SlruInternalWritePage(ctl, slotno, NULL);
else
SimpleLruWaitIO(ctl, slotno);
+
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
goto restart;
}
- LWLockRelease(shared->ControlLock);
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
/* Now we can remove the old segment(s) */
(void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
@@ -1362,19 +1509,33 @@ void
SlruDeleteSegment(SlruCtl ctl, int64 segno)
{
SlruShared shared = ctl->shared;
+ int prevbank = SlotGetBankNumber(0);
bool did_write;
/* Clean out any possibly existing references to the segment. */
- LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+ LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
restart:
did_write = false;
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
- int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
+ int pagesegno;
+ int curbank = SlotGetBankNumber(slotno);
+
+ /*
+ * If the current bank lock is not same as the previous bank lock then
+ * release the previous lock and acquire the new lock.
+ */
+ if (curbank != prevbank)
+ {
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
+ LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
+ prevbank = curbank;
+ }
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
+ pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
/* not the segment we're looking for */
if (pagesegno != segno)
continue;
@@ -1405,7 +1566,7 @@ restart:
SlruInternalDeleteSegment(ctl, segno);
- LWLockRelease(shared->ControlLock);
+ LWLockRelease(&shared->bank_locks[prevbank].lock);
}
/*
diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c
index 6aa47af43e2..dc9566fb51b 100644
--- a/src/backend/access/transam/subtrans.c
+++ b/src/backend/access/transam/subtrans.c
@@ -31,7 +31,9 @@
#include "access/slru.h"
#include "access/subtrans.h"
#include "access/transam.h"
+#include "miscadmin.h"
#include "pg_trace.h"
+#include "utils/guc_hooks.h"
#include "utils/snapmgr.h"
@@ -85,12 +87,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
int64 pageno = TransactionIdToPage(xid);
int entryno = TransactionIdToEntry(xid);
int slotno;
+ LWLock *lock;
TransactionId *ptr;
Assert(TransactionIdIsValid(parent));
Assert(TransactionIdFollows(xid, parent));
- LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(SubTransCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
@@ -108,7 +112,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
SubTransCtl->shared->page_dirty[slotno] = true;
}
- LWLockRelease(SubtransSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -138,7 +142,7 @@ SubTransGetParent(TransactionId xid)
parent = *ptr;
- LWLockRelease(SubtransSLRULock);
+ LWLockRelease(SimpleLruGetBankLock(SubTransCtl, pageno));
return parent;
}
@@ -186,6 +190,22 @@ SubTransGetTopmostTransaction(TransactionId xid)
return previousXid;
}
+/*
+ * Number of shared SUBTRANS buffers.
+ *
+ * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
+ * Otherwise just cap the configured amount to be between 16 and the maximum
+ * allowed.
+ */
+static int
+SUBTRANSShmemBuffers(void)
+{
+ /* auto-tune based on shared buffers */
+ if (subtransaction_buffers == 0)
+ return SimpleLruAutotuneBuffers(512, 1024);
+
+ return Min(Max(16, subtransaction_buffers), SLRU_MAX_ALLOWED_BUFFERS);
+}
/*
* Initialization of shared memory for SUBTRANS
@@ -193,21 +213,50 @@ SubTransGetTopmostTransaction(TransactionId xid)
Size
SUBTRANSShmemSize(void)
{
- return SimpleLruShmemSize(NUM_SUBTRANS_BUFFERS, 0);
+ return SimpleLruShmemSize(SUBTRANSShmemBuffers(), 0);
}
void
SUBTRANSShmemInit(void)
{
+ /* If auto-tuning is requested, now is the time to do it */
+ if (subtransaction_buffers == 0)
+ {
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%d", SUBTRANSShmemBuffers());
+ SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER,
+ PGC_S_DYNAMIC_DEFAULT);
+
+ /*
+ * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
+ * However, if the DBA explicitly set subtransaction_buffers = 0 in
+ * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
+ * that and we must force the matter with PGC_S_OVERRIDE.
+ */
+ if (subtransaction_buffers == 0) /* failed to apply it? */
+ SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER,
+ PGC_S_OVERRIDE);
+ }
+ Assert(subtransaction_buffers != 0);
+
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
- SimpleLruInit(SubTransCtl, "subtransaction", NUM_SUBTRANS_BUFFERS, 0,
- SubtransSLRULock, "pg_subtrans",
- LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE,
- false);
+ SimpleLruInit(SubTransCtl, "subtransaction", SUBTRANSShmemBuffers(), 0,
+ "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER,
+ LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE, false);
SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
}
/*
+ * GUC check_hook for subtransaction_buffers
+ */
+bool
+check_subtrans_buffers(int *newval, void **extra, GucSource source)
+{
+ return check_slru_buffers("subtransaction_buffers", newval);
+}
+
+/*
* This func must be called ONCE on system install. It creates
* the initial SUBTRANS segment. (The SUBTRANS directory is assumed to
* have been created by the initdb shell script, and SUBTRANSShmemInit
@@ -221,8 +270,9 @@ void
BootStrapSUBTRANS(void)
{
int slotno;
+ LWLock *lock = SimpleLruGetBankLock(SubTransCtl, 0);
- LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the subtrans log */
slotno = ZeroSUBTRANSPage(0);
@@ -231,7 +281,7 @@ BootStrapSUBTRANS(void)
SimpleLruWritePage(SubTransCtl, slotno);
Assert(!SubTransCtl->shared->page_dirty[slotno]);
- LWLockRelease(SubtransSLRULock);
+ LWLockRelease(lock);
}
/*
@@ -261,6 +311,8 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
FullTransactionId nextXid;
int64 startPage;
int64 endPage;
+ LWLock *prevlock;
+ LWLock *lock;
/*
* Since we don't expect pg_subtrans to be valid across crashes, we
@@ -268,23 +320,47 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
* Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
* the new page without regard to whatever was previously on disk.
*/
- LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
-
startPage = TransactionIdToPage(oldestActiveXID);
nextXid = TransamVariables->nextXid;
endPage = TransactionIdToPage(XidFromFullTransactionId(nextXid));
+ prevlock = SimpleLruGetBankLock(SubTransCtl, startPage);
+ LWLockAcquire(prevlock, LW_EXCLUSIVE);
while (startPage != endPage)
{
+ lock = SimpleLruGetBankLock(SubTransCtl, startPage);
+
+ /*
+ * Check if we need to acquire the lock on the new bank then release
+ * the lock on the old bank and acquire on the new bank.
+ */
+ if (prevlock != lock)
+ {
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ prevlock = lock;
+ }
+
(void) ZeroSUBTRANSPage(startPage);
startPage++;
/* must account for wraparound */
if (startPage > TransactionIdToPage(MaxTransactionId))
startPage = 0;
}
- (void) ZeroSUBTRANSPage(startPage);
- LWLockRelease(SubtransSLRULock);
+ lock = SimpleLruGetBankLock(SubTransCtl, startPage);
+
+ /*
+ * Check if we need to acquire the lock on the new bank then release the
+ * lock on the old bank and acquire on the new bank.
+ */
+ if (prevlock != lock)
+ {
+ LWLockRelease(prevlock);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ }
+ (void) ZeroSUBTRANSPage(startPage);
+ LWLockRelease(lock);
}
/*
@@ -318,6 +394,7 @@ void
ExtendSUBTRANS(TransactionId newestXact)
{
int64 pageno;
+ LWLock *lock;
/*
* No work except at first XID of a page. But beware: just after
@@ -329,12 +406,13 @@ ExtendSUBTRANS(TransactionId newestXact)
pageno = TransactionIdToPage(newestXact);
- LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+ lock = SimpleLruGetBankLock(SubTransCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page */
ZeroSUBTRANSPage(pageno);
- LWLockRelease(SubtransSLRULock);
+ LWLockRelease(lock);
}