aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c59
1 files changed, 38 insertions, 21 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a43e2eeaf30..6ee50d01d52 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -80,6 +80,8 @@ bool fullPageWrites = true;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
int wal_level = WAL_LEVEL_MINIMAL;
+int CommitDelay = 0; /* precommit delay in microseconds */
+int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
@@ -2098,34 +2100,49 @@ XLogFlush(XLogRecPtr record)
*/
continue;
}
- /* Got the lock */
+
+ /* Got the lock; recheck whether request is satisfied */
LogwrtResult = XLogCtl->LogwrtResult;
- if (!XLByteLE(record, LogwrtResult.Flush))
+ if (XLByteLE(record, LogwrtResult.Flush))
+ break;
+
+ /*
+ * Sleep before flush! By adding a delay here, we may give further
+ * backends the opportunity to join the backlog of group commit
+ * followers; this can significantly improve transaction throughput, at
+ * the risk of increasing transaction latency.
+ *
+ * We do not sleep if enableFsync is not turned on, nor if there are
+ * fewer than CommitSiblings other backends with active transactions.
+ */
+ if (CommitDelay > 0 && enableFsync &&
+ MinimumActiveBackends(CommitSiblings))
+ pg_usleep(CommitDelay);
+
+ /* try to write/flush later additions to XLOG as well */
+ if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
{
- /* try to write/flush later additions to XLOG as well */
- if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- uint32 freespace = INSERT_FREESPACE(Insert);
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint32 freespace = INSERT_FREESPACE(Insert);
- if (freespace == 0) /* buffer is full */
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- else
- {
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- WriteRqstPtr -= freespace;
- }
- LWLockRelease(WALInsertLock);
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = WriteRqstPtr;
- }
+ if (freespace == 0) /* buffer is full */
+ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
else
{
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = record;
+ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+ WriteRqstPtr -= freespace;
}
- XLogWrite(WriteRqst, false, false);
+ LWLockRelease(WALInsertLock);
+ WriteRqst.Write = WriteRqstPtr;
+ WriteRqst.Flush = WriteRqstPtr;
}
+ else
+ {
+ WriteRqst.Write = WriteRqstPtr;
+ WriteRqst.Flush = record;
+ }
+ XLogWrite(WriteRqst, false, false);
+
LWLockRelease(WALWriteLock);
/* done */
break;