aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c167
-rw-r--r--src/test/recovery/t/009_twophase.pl34
2 files changed, 150 insertions, 51 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c66ca72bf57..fd55b50262a 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(TransactionId xid,
+static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool setParent, bool setNextXid);
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
const char *gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
-static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
/*
@@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
/************************************************************************/
/*
- * Compute the FullTransactionId for the given TransactionId.
- *
- * The wrap logic is safe here because the span of active xids cannot exceed one
- * epoch at any given time.
+ * Compute FullTransactionId for the given TransactionId, using the current
+ * epoch.
*/
static inline FullTransactionId
-AdjustToFullTransactionId(TransactionId xid)
+FullTransactionIdFromCurrentEpoch(TransactionId xid)
{
+ FullTransactionId fxid;
FullTransactionId nextFullXid;
- TransactionId nextXid;
uint32 epoch;
- Assert(TransactionIdIsValid(xid));
-
- LWLockAcquire(XidGenLock, LW_SHARED);
- nextFullXid = TransamVariables->nextXid;
- LWLockRelease(XidGenLock);
-
- nextXid = XidFromFullTransactionId(nextFullXid);
+ nextFullXid = ReadNextFullTransactionId();
epoch = EpochFromFullTransactionId(nextFullXid);
- if (unlikely(xid > nextXid))
- {
- /* Wraparound occurred, must be from a prev epoch. */
- Assert(epoch > 0);
- epoch--;
- }
- return FullTransactionIdFromEpochAndXid(epoch, xid);
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ return fxid;
}
static inline int
-TwoPhaseFilePath(char *path, TransactionId xid)
+TwoPhaseFilePath(char *path, FullTransactionId fxid)
{
- FullTransactionId fxid = AdjustToFullTransactionId(xid);
-
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
EpochFromFullTransactionId(fxid),
XidFromFullTransactionId(fxid));
@@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* If it looks OK (has a valid magic number and CRC), return the palloc'd
* contents of the file, issuing an error when finding corrupted data. If
* missing_ok is true, which indicates that missing files can be safely
- * ignored, then return NULL. This state can be reached when doing recovery.
+ * ignored, then return NULL. This state can be reached when doing recovery
+ * after discarding two-phase files from other epochs.
*/
static char *
ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
@@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc,
file_crc;
int r;
+ FullTransactionId fxid;
- TwoPhaseFilePath(path, xid);
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
@@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat(isCommit, false);
/*
- * And now we can clean up any files we may have left.
+ * And now we can clean up any files we may have left. These should be
+ * from the current epoch.
*/
if (ondisk)
- RemoveTwoPhaseFile(xid, true);
+ {
+ FullTransactionId fxid;
+
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ RemoveTwoPhaseFile(fxid, true);
+ }
MyLockedGxact = NULL;
@@ -1719,13 +1713,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
*
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
+ *
+ * This routine is used at early stages at recovery where future and
+ * past orphaned files are checked, hence the FullTransactionId to build
+ * a complete file name fit for the removal.
*/
static void
-RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
+RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
{
char path[MAXPGPATH];
- TwoPhaseFilePath(path, xid);
+ TwoPhaseFilePath(path, fxid);
if (unlink(path))
if (errno != ENOENT || giveWarning)
ereport(WARNING,
@@ -1745,13 +1743,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
char path[MAXPGPATH];
pg_crc32c statefile_crc;
int fd;
+ FullTransactionId fxid;
/* Recompute CRC */
INIT_CRC32C(statefile_crc);
COMP_CRC32C(statefile_crc, content, len);
FIN_CRC32C(statefile_crc);
- TwoPhaseFilePath(path, xid);
+ /* Use current epoch */
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path,
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
@@ -1899,7 +1900,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
* This is called once at the beginning of recovery, saving any extra
* lookups in the future. Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way.
+ * minimum XID horizon are discarded on the way. Two-phase files with
+ * an epoch older or newer than the current checkpoint's record epoch
+ * are also discarded.
*/
void
restoreTwoPhaseData(void)
@@ -1914,14 +1917,11 @@ restoreTwoPhaseData(void)
if (strlen(clde->d_name) == 16 &&
strspn(clde->d_name, "0123456789ABCDEF") == 16)
{
- TransactionId xid;
FullTransactionId fxid;
char *buf;
fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
- xid = XidFromFullTransactionId(fxid);
-
- buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
+ buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
true, false, false);
if (buf == NULL)
continue;
@@ -1972,6 +1972,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
+ uint32 epoch = EpochFromFullTransactionId(nextXid);
int nxids = 0;
int allocsize = 0;
int i;
@@ -1980,6 +1981,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
@@ -1987,7 +1989,12 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
xid = gxact->xid;
- buf = ProcessTwoPhaseBuffer(xid,
+ /*
+ * All two-phase files with past and future epoch in pg_twophase are
+ * gone at this point, so we're OK to rely on only the current epoch.
+ */
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, false, true);
@@ -2049,11 +2056,18 @@ void
StandbyRecoverPreparedTransactions(void)
{
int i;
+ uint32 epoch;
+ FullTransactionId nextFullXid;
+
+ /* get current epoch */
+ nextFullXid = ReadNextFullTransactionId();
+ epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
@@ -2061,7 +2075,12 @@ StandbyRecoverPreparedTransactions(void)
xid = gxact->xid;
- buf = ProcessTwoPhaseBuffer(xid,
+ /*
+ * At this stage, we're OK to work with the current epoch as all past
+ * and future files have been already discarded.
+ */
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf != NULL)
@@ -2090,11 +2109,18 @@ void
RecoverPreparedTransactions(void)
{
int i;
+ uint32 epoch;
+ FullTransactionId nextFullXid;
+
+ /* get current epoch */
+ nextFullXid = ReadNextFullTransactionId();
+ epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
char *bufptr;
@@ -2102,6 +2128,10 @@ RecoverPreparedTransactions(void)
TransactionId *subxids;
const char *gid;
+ /*
+ * At this stage, we're OK to work with the current epoch as all past
+ * and future files have been already discarded.
+ */
xid = gxact->xid;
/*
@@ -2113,7 +2143,8 @@ RecoverPreparedTransactions(void)
* SubTransSetParent has been set before, if the prepared transaction
* generated xid assignment records.
*/
- buf = ProcessTwoPhaseBuffer(xid,
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf == NULL)
@@ -2181,7 +2212,7 @@ RecoverPreparedTransactions(void)
/*
* ProcessTwoPhaseBuffer
*
- * Given a transaction id, read it either from disk or read it directly
+ * Given a FullTransactionId, read it either from disk or read it directly
* via shmem xlog record pointer using the provided "prepare_start_lsn".
*
* If setParent is true, set up subtransaction parent linkages.
@@ -2190,32 +2221,35 @@ RecoverPreparedTransactions(void)
* value scanned.
*/
static char *
-ProcessTwoPhaseBuffer(TransactionId xid,
+ProcessTwoPhaseBuffer(FullTransactionId fxid,
XLogRecPtr prepare_start_lsn,
bool fromdisk,
bool setParent, bool setNextXid)
{
FullTransactionId nextXid = TransamVariables->nextXid;
- TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
int i;
+ TransactionId xid = XidFromFullTransactionId(fxid);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
- /* Reject XID if too new */
- if (TransactionIdFollowsOrEquals(xid, origNextXid))
+ /*
+ * Reject full XID if too new. Note that this discards files from future
+ * epochs.
+ */
+ if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing future two-phase state file for transaction %u",
- xid)));
- RemoveTwoPhaseFile(xid, true);
+ (errmsg("removing future two-phase state file of epoch %u for transaction %u",
+ EpochFromFullTransactionId(fxid), xid)));
+ RemoveTwoPhaseFile(fxid, true);
}
else
{
@@ -2227,6 +2261,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
return NULL;
}
+ /* Discard files from past epochs */
+ if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
+ {
+ if (fromdisk)
+ {
+ ereport(WARNING,
+ (errmsg("removing past two-phase state file of epoch %u for transaction %u",
+ EpochFromFullTransactionId(fxid), xid)));
+ RemoveTwoPhaseFile(fxid, true);
+ }
+ else
+ {
+ ereport(WARNING,
+ (errmsg("removing past two-phase state from memory for transaction %u",
+ xid)));
+ PrepareRedoRemove(xid, true);
+ }
+ return NULL;
+ }
+
/* Already processed? */
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
{
@@ -2235,7 +2289,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
ereport(WARNING,
(errmsg("removing stale two-phase state file for transaction %u",
xid)));
- RemoveTwoPhaseFile(xid, true);
+ RemoveTwoPhaseFile(fxid, true);
}
else
{
@@ -2521,8 +2575,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
if (!XLogRecPtrIsInvalid(start_lsn))
{
char path[MAXPGPATH];
+ FullTransactionId fxid;
- TwoPhaseFilePath(path, hdr->xid);
+ /* Use current epoch */
+ fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
+ TwoPhaseFilePath(path, fxid);
if (access(path, F_OK) == 0)
{
@@ -2617,7 +2674,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
*/
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
if (gxact->ondisk)
- RemoveTwoPhaseFile(xid, giveWarning);
+ {
+ FullTransactionId fxid;
+
+ /*
+ * We should deal with a file at the current epoch here.
+ */
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ RemoveTwoPhaseFile(fxid, giveWarning);
+ }
RemoveGXact(gxact);
}
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
index 4b3e0f77dc0..f504245c3b2 100644
--- a/src/test/recovery/t/009_twophase.pl
+++ b/src/test/recovery/t/009_twophase.pl
@@ -572,4 +572,38 @@ my $nsubtrans = $cur_primary->safe_psql('postgres',
);
isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
+###############################################################################
+# Check handling of orphaned 2PC files at recovery.
+###############################################################################
+
+$cur_standby->teardown_node;
+$cur_primary->teardown_node;
+
+# Grab location in logs of primary
+my $log_offset = -s $cur_primary->logfile;
+
+# Create fake files with a transaction ID large or low enough to be in the
+# future or the past, in different epochs, then check that the primary is able
+# to start and remove these files at recovery.
+
+# First bump the epoch with pg_resetwal.
+$cur_primary->command_ok(
+ [ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ],
+ 'bump epoch of primary');
+
+my $future_2pc_file =
+ $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF';
+append_to_file $future_2pc_file, "";
+my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF';
+append_to_file $past_2pc_file, "";
+
+$cur_primary->start;
+$cur_primary->log_check(
+ "two-phase files removed at recovery",
+ $log_offset,
+ log_like => [
+ qr/removing past two-phase state file of epoch 238 for transaction 4095/,
+ qr/removing future two-phase state file of epoch 511 for transaction 4095/
+ ]);
+
done_testing();