aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/twophase.c32
-rw-r--r--src/backend/access/transam/xact.c18
-rw-r--r--src/backend/access/transam/xlog.c45
-rw-r--r--src/backend/access/transam/xlogreader.c20
-rw-r--r--src/backend/access/transam/xlogrecovery.c3
5 files changed, 91 insertions, 27 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 85cbe397cb2..7918176fc58 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact)
* starting immediately after the WAL record is inserted could complete
* without fsync'ing our state file. (This is essentially the same kind
* of race condition as the COMMIT-to-clog-write case that
- * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
+ * the critical commit section. We need to know about such transactions
+ * for conflict detection in logical replication. See
+ * GetOldestActiveTransactionId(true, false) and its use.
*
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
@@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
* RecordTransactionCommitPrepared
*
* This is basically the same as RecordTransactionCommit (q.v. if you change
- * this function): in particular, we must set DELAY_CHKPT_START to avoid a
+ * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
* race condition.
*
* We know the transaction made at least one XLOG entry (its PREPARE),
@@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
const char *gid)
{
XLogRecPtr recptr;
- TimestampTz committs = GetCurrentTimestamp();
+ TimestampTz committs;
bool replorigin;
/*
@@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
+ * commit time is written.
+ */
+ pg_write_barrier();
+
+ /*
+ * Note it is important to set committs value after marking ourselves as
+ * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
+ * we want to ensure all transactions that have acquired commit timestamp
+ * are finished before we allow the logical replication client to advance
+ * its xid which is used to hold back dead rows for conflict detection.
+ * See comments atop worker.c.
+ */
+ committs = GetCurrentTimestamp();
/*
* Emit the XLOG commit record. Note that we mark 2PC commits as
@@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 41601fcb280..b46e7e9c2a6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1431,10 +1431,22 @@ RecordTransactionCommit(void)
* without holding the ProcArrayLock, since we're the only one
* modifying it. This makes checkpoint's determination of which xacts
* are delaying the checkpoint a bit fuzzy, but it doesn't matter.
+ *
+ * Note, it is important to get the commit timestamp after marking the
+ * transaction in the commit critical section. See
+ * RecordTransactionCommitPrepared.
*/
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
START_CRIT_SECTION();
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ Assert(xactStopTimestamp == 0);
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible
+ * before commit time is written.
+ */
+ pg_write_barrier();
/*
* Insert the commit XLOG record.
@@ -1537,7 +1549,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..5553c20fee8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -96,6 +96,7 @@
#include "utils/guc_hooks.h"
#include "utils/guc_tables.h"
#include "utils/injection_point.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/relmapper.h"
#include "utils/snapmgr.h"
@@ -702,7 +703,7 @@ static void InitControlFile(uint64 sysidentifier, uint32 data_checksum_version);
static void WriteControlFile(void);
static void ReadControlFile(void);
static void UpdateControlFile(void);
-static char *str_time(pg_time_t tnow);
+static char *str_time(pg_time_t tnow, char *buf, size_t bufsize);
static int get_sync_bit(int method);
@@ -1091,6 +1092,9 @@ XLogInsertRecord(XLogRecData *rdata,
pgWalUsage.wal_bytes += rechdr->xl_tot_len;
pgWalUsage.wal_records++;
pgWalUsage.wal_fpi += num_fpi;
+
+ /* Required for the flush of pending stats WAL data */
+ pgstat_report_fixed = true;
}
return EndPos;
@@ -2108,6 +2112,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
LWLockRelease(WALWriteLock);
pgWalUsage.wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+
+ /*
+ * Required for the flush of pending stats WAL data, per
+ * update of pgWalUsage.
+ */
+ pgstat_report_fixed = true;
}
}
}
@@ -5361,11 +5371,9 @@ BootStrapXLOG(uint32 data_checksum_version)
}
static char *
-str_time(pg_time_t tnow)
+str_time(pg_time_t tnow, char *buf, size_t bufsize)
{
- char *buf = palloc(128);
-
- pg_strftime(buf, 128,
+ pg_strftime(buf, bufsize,
"%Y-%m-%d %H:%M:%S %Z",
pg_localtime(&tnow, log_timezone));
@@ -5608,6 +5616,7 @@ StartupXLOG(void)
XLogRecPtr missingContrecPtr;
TransactionId oldestActiveXID;
bool promoted = false;
+ char timebuf[128];
/*
* We should have an aux process resource owner to use, and we should not
@@ -5636,25 +5645,29 @@ StartupXLOG(void)
*/
ereport(IsPostmasterEnvironment ? LOG : NOTICE,
(errmsg("database system was shut down at %s",
- str_time(ControlFile->time))));
+ str_time(ControlFile->time,
+ timebuf, sizeof(timebuf)))));
break;
case DB_SHUTDOWNED_IN_RECOVERY:
ereport(LOG,
(errmsg("database system was shut down in recovery at %s",
- str_time(ControlFile->time))));
+ str_time(ControlFile->time,
+ timebuf, sizeof(timebuf)))));
break;
case DB_SHUTDOWNING:
ereport(LOG,
(errmsg("database system shutdown was interrupted; last known up at %s",
- str_time(ControlFile->time))));
+ str_time(ControlFile->time,
+ timebuf, sizeof(timebuf)))));
break;
case DB_IN_CRASH_RECOVERY:
ereport(LOG,
(errmsg("database system was interrupted while in recovery at %s",
- str_time(ControlFile->time)),
+ str_time(ControlFile->time,
+ timebuf, sizeof(timebuf))),
errhint("This probably means that some data is corrupted and"
" you will have to use the last backup for recovery.")));
break;
@@ -5662,7 +5675,8 @@ StartupXLOG(void)
case DB_IN_ARCHIVE_RECOVERY:
ereport(LOG,
(errmsg("database system was interrupted while in recovery at log time %s",
- str_time(ControlFile->checkPointCopy.time)),
+ str_time(ControlFile->checkPointCopy.time,
+ timebuf, sizeof(timebuf))),
errhint("If this has occurred more than once some data might be corrupted"
" and you might need to choose an earlier recovery target.")));
break;
@@ -5670,7 +5684,8 @@ StartupXLOG(void)
case DB_IN_PRODUCTION:
ereport(LOG,
(errmsg("database system was interrupted; last known up at %s",
- str_time(ControlFile->time))));
+ str_time(ControlFile->time,
+ timebuf, sizeof(timebuf)))));
break;
default:
@@ -6315,6 +6330,12 @@ StartupXLOG(void)
*/
CompleteCommitTsInitialization();
+ /* Clean up EndOfWalRecoveryInfo data to appease Valgrind leak checking */
+ if (endOfRecoveryInfo->lastPage)
+ pfree(endOfRecoveryInfo->lastPage);
+ pfree(endOfRecoveryInfo->recoveryStopReason);
+ pfree(endOfRecoveryInfo);
+
/*
* All done with end-of-recovery actions.
*
@@ -7121,7 +7142,7 @@ CreateCheckPoint(int flags)
* starting snapshot of locks and transactions.
*/
if (!shutdown && XLogStandbyInfoActive())
- checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
+ checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true);
else
checkPoint.oldestActiveXid = InvalidTransactionId;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ac1f801b1eb..dcc8d4f9c1b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -723,11 +723,12 @@ restart:
/* Calculate pointer to beginning of next page */
targetPagePtr += XLOG_BLCKSZ;
- /* Wait for the next page to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(total_len - gotlen + SizeOfXLogShortPHD,
- XLOG_BLCKSZ));
-
+ /*
+ * Read the page header before processing the record data, so we
+ * can handle the case where the previous record ended as being a
+ * partial one.
+ */
+ readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
if (readOff == XLREAD_WOULDBLOCK)
return XLREAD_WOULDBLOCK;
else if (readOff < 0)
@@ -776,6 +777,15 @@ restart:
goto err;
}
+ /* Wait for the next page to become available */
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(total_len - gotlen + SizeOfXLogShortPHD,
+ XLOG_BLCKSZ));
+ if (readOff == XLREAD_WOULDBLOCK)
+ return XLREAD_WOULDBLOCK;
+ else if (readOff < 0)
+ goto err;
+
/* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..f23ec8969c2 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1626,6 +1626,7 @@ ShutdownWalRecovery(void)
close(readFile);
readFile = -1;
}
+ pfree(xlogreader->private_data);
XLogReaderFree(xlogreader);
XLogPrefetcherFree(xlogprefetcher);
@@ -4760,7 +4761,7 @@ bool
check_primary_slot_name(char **newval, void **extra, GucSource source)
{
if (*newval && strcmp(*newval, "") != 0 &&
- !ReplicationSlotValidateName(*newval, WARNING))
+ !ReplicationSlotValidateName(*newval, false, WARNING))
return false;
return true;