aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/heap/visibilitymap.c2
-rw-r--r--src/backend/access/index/amapi.c13
-rw-r--r--src/backend/access/transam/twophase.c32
-rw-r--r--src/backend/access/transam/xact.c18
-rw-r--r--src/backend/access/transam/xlog.c12
-rw-r--r--src/backend/access/transam/xlogreader.c20
-rw-r--r--src/backend/access/transam/xlogrecovery.c2
7 files changed, 82 insertions, 17 deletions
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 745a04ef26e..8f918e00af7 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -364,7 +364,7 @@ visibilitymap_get_status(Relation rel, BlockNumber heapBlk, Buffer *vmbuf)
{
*vmbuf = vm_readbuf(rel, mapBlock, false);
if (!BufferIsValid(*vmbuf))
- return false;
+ return (uint8) 0;
}
map = PageGetContents(BufferGetPage(*vmbuf));
diff --git a/src/backend/access/index/amapi.c b/src/backend/access/index/amapi.c
index f0f4f974bce..60684c53422 100644
--- a/src/backend/access/index/amapi.c
+++ b/src/backend/access/index/amapi.c
@@ -42,6 +42,19 @@ GetIndexAmRoutine(Oid amhandler)
elog(ERROR, "index access method handler function %u did not return an IndexAmRoutine struct",
amhandler);
+ /* Assert that all required callbacks are present. */
+ Assert(routine->ambuild != NULL);
+ Assert(routine->ambuildempty != NULL);
+ Assert(routine->aminsert != NULL);
+ Assert(routine->ambulkdelete != NULL);
+ Assert(routine->amvacuumcleanup != NULL);
+ Assert(routine->amcostestimate != NULL);
+ Assert(routine->amoptions != NULL);
+ Assert(routine->amvalidate != NULL);
+ Assert(routine->ambeginscan != NULL);
+ Assert(routine->amrescan != NULL);
+ Assert(routine->amendscan != NULL);
+
return routine;
}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 85cbe397cb2..7918176fc58 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact)
* starting immediately after the WAL record is inserted could complete
* without fsync'ing our state file. (This is essentially the same kind
* of race condition as the COMMIT-to-clog-write case that
- * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
+ * the critical commit section. We need to know about such transactions
+ * for conflict detection in logical replication. See
+ * GetOldestActiveTransactionId(true, false) and its use.
*
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
@@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
* RecordTransactionCommitPrepared
*
* This is basically the same as RecordTransactionCommit (q.v. if you change
- * this function): in particular, we must set DELAY_CHKPT_START to avoid a
+ * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
* race condition.
*
* We know the transaction made at least one XLOG entry (its PREPARE),
@@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
const char *gid)
{
XLogRecPtr recptr;
- TimestampTz committs = GetCurrentTimestamp();
+ TimestampTz committs;
bool replorigin;
/*
@@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
+ * commit time is written.
+ */
+ pg_write_barrier();
+
+ /*
+ * Note it is important to set committs value after marking ourselves as
+ * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
+ * we want to ensure all transactions that have acquired commit timestamp
+ * are finished before we allow the logical replication client to advance
+ * its xid which is used to hold back dead rows for conflict detection.
+ * See comments atop worker.c.
+ */
+ committs = GetCurrentTimestamp();
/*
* Emit the XLOG commit record. Note that we mark 2PC commits as
@@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 41601fcb280..b46e7e9c2a6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1431,10 +1431,22 @@ RecordTransactionCommit(void)
* without holding the ProcArrayLock, since we're the only one
* modifying it. This makes checkpoint's determination of which xacts
* are delaying the checkpoint a bit fuzzy, but it doesn't matter.
+ *
+ * Note, it is important to get the commit timestamp after marking the
+ * transaction in the commit critical section. See
+ * RecordTransactionCommitPrepared.
*/
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
START_CRIT_SECTION();
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ Assert(xactStopTimestamp == 0);
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible
+ * before commit time is written.
+ */
+ pg_write_barrier();
/*
* Insert the commit XLOG record.
@@ -1537,7 +1549,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..b0891998b24 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -96,6 +96,7 @@
#include "utils/guc_hooks.h"
#include "utils/guc_tables.h"
#include "utils/injection_point.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/relmapper.h"
#include "utils/snapmgr.h"
@@ -1091,6 +1092,9 @@ XLogInsertRecord(XLogRecData *rdata,
pgWalUsage.wal_bytes += rechdr->xl_tot_len;
pgWalUsage.wal_records++;
pgWalUsage.wal_fpi += num_fpi;
+
+ /* Required for the flush of pending stats WAL data */
+ pgstat_report_fixed = true;
}
return EndPos;
@@ -2108,6 +2112,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
LWLockRelease(WALWriteLock);
pgWalUsage.wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+
+ /*
+ * Required for the flush of pending stats WAL data, per
+ * update of pgWalUsage.
+ */
+ pgstat_report_fixed = true;
}
}
}
@@ -7121,7 +7131,7 @@ CreateCheckPoint(int flags)
* starting snapshot of locks and transactions.
*/
if (!shutdown && XLogStandbyInfoActive())
- checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
+ checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true);
else
checkPoint.oldestActiveXid = InvalidTransactionId;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ac1f801b1eb..dcc8d4f9c1b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -723,11 +723,12 @@ restart:
/* Calculate pointer to beginning of next page */
targetPagePtr += XLOG_BLCKSZ;
- /* Wait for the next page to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(total_len - gotlen + SizeOfXLogShortPHD,
- XLOG_BLCKSZ));
-
+ /*
+ * Read the page header before processing the record data, so we
+ * can handle the case where the previous record ended as being a
+ * partial one.
+ */
+ readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
if (readOff == XLREAD_WOULDBLOCK)
return XLREAD_WOULDBLOCK;
else if (readOff < 0)
@@ -776,6 +777,15 @@ restart:
goto err;
}
+ /* Wait for the next page to become available */
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(total_len - gotlen + SizeOfXLogShortPHD,
+ XLOG_BLCKSZ));
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
+ goto err;
+
/* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..e8f3ba00caa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4760,7 +4760,7 @@ bool
check_primary_slot_name(char **newval, void **extra, GucSource source)
{
if (*newval && strcmp(*newval, "") != 0 &&
- !ReplicationSlotValidateName(*newval, WARNING))
+ !ReplicationSlotValidateName(*newval, false, WARNING))
return false;
return true;