aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/twophase.c175
-rw-r--r--src/test/recovery/t/009_twophase.pl34
2 files changed, 55 insertions, 154 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index a3190dc4f1a..ab2f4a8a92f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
+static char *ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool setParent, bool setNextXid);
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
const char *gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
-static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
+static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
/*
@@ -927,26 +927,41 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
/************************************************************************/
/*
- * Compute FullTransactionId for the given TransactionId, using the current
- * epoch.
+ * Compute the FullTransactionId for the given TransactionId.
+ *
+ * The wrap logic is safe here because the span of active xids cannot exceed one
+ * epoch at any given time.
*/
static inline FullTransactionId
-FullTransactionIdFromCurrentEpoch(TransactionId xid)
+AdjustToFullTransactionId(TransactionId xid)
{
- FullTransactionId fxid;
FullTransactionId nextFullXid;
+ TransactionId nextXid;
uint32 epoch;
- nextFullXid = ReadNextFullTransactionId();
+ Assert(TransactionIdIsValid(xid));
+
+ LWLockAcquire(XidGenLock, LW_SHARED);
+ nextFullXid = TransamVariables->nextXid;
+ LWLockRelease(XidGenLock);
+
+ nextXid = XidFromFullTransactionId(nextFullXid);
epoch = EpochFromFullTransactionId(nextFullXid);
+ if (unlikely(xid > nextXid))
+ {
+ /* Wraparound occurred, must be from a prev epoch. */
+ Assert(epoch > 0);
+ epoch--;
+ }
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- return fxid;
+ return FullTransactionIdFromEpochAndXid(epoch, xid);
}
static inline int
-TwoPhaseFilePath(char *path, FullTransactionId fxid)
+TwoPhaseFilePath(char *path, TransactionId xid)
{
+ FullTransactionId fxid = AdjustToFullTransactionId(xid);
+
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
EpochFromFullTransactionId(fxid),
XidFromFullTransactionId(fxid));
@@ -1282,8 +1297,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* If it looks OK (has a valid magic number and CRC), return the palloc'd
* contents of the file, issuing an error when finding corrupted data. If
* missing_ok is true, which indicates that missing files can be safely
- * ignored, then return NULL. This state can be reached when doing recovery
- * after discarding two-phase files from other epochs.
+ * ignored, then return NULL. This state can be reached when doing recovery.
*/
static char *
ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
@@ -1297,10 +1311,8 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc,
file_crc;
int r;
- FullTransactionId fxid;
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
@@ -1665,16 +1677,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat(isCommit, false);
/*
- * And now we can clean up any files we may have left. These should be
- * from the current epoch.
+ * And now we can clean up any files we may have left.
*/
if (ondisk)
- {
- FullTransactionId fxid;
-
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- RemoveTwoPhaseFile(fxid, true);
- }
+ RemoveTwoPhaseFile(xid, true);
MyLockedGxact = NULL;
@@ -1712,17 +1718,13 @@ ProcessRecords(char *bufptr, TransactionId xid,
*
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
- *
- * This routine is used at early stages at recovery where future and
- * past orphaned files are checked, hence the FullTransactionId to build
- * a complete file name fit for the removal.
*/
static void
-RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
+RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
{
char path[MAXPGPATH];
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
if (unlink(path))
if (errno != ENOENT || giveWarning)
ereport(WARNING,
@@ -1742,16 +1744,13 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
char path[MAXPGPATH];
pg_crc32c statefile_crc;
int fd;
- FullTransactionId fxid;
/* Recompute CRC */
INIT_CRC32C(statefile_crc);
COMP_CRC32C(statefile_crc, content, len);
FIN_CRC32C(statefile_crc);
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path,
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
@@ -1899,9 +1898,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
* This is called once at the beginning of recovery, saving any extra
* lookups in the future. Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way. Two-phase files with
- * an epoch older or newer than the current checkpoint's record epoch
- * are also discarded.
+ * minimum XID horizon are discarded on the way.
*/
void
restoreTwoPhaseData(void)
@@ -1916,11 +1913,14 @@ restoreTwoPhaseData(void)
if (strlen(clde->d_name) == 16 &&
strspn(clde->d_name, "0123456789ABCDEF") == 16)
{
+ TransactionId xid;
FullTransactionId fxid;
char *buf;
fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
- buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
+ xid = XidFromFullTransactionId(fxid);
+
+ buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
true, false, false);
if (buf == NULL)
continue;
@@ -1971,7 +1971,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
- uint32 epoch = EpochFromFullTransactionId(nextXid);
int nxids = 0;
int allocsize = 0;
int i;
@@ -1980,7 +1979,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
@@ -1988,12 +1986,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
xid = gxact->xid;
- /*
- * All two-phase files with past and future epoch in pg_twophase are
- * gone at this point, so we're OK to rely on only the current epoch.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, false, true);
@@ -2055,18 +2048,11 @@ void
StandbyRecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
@@ -2074,12 +2060,7 @@ StandbyRecoverPreparedTransactions(void)
xid = gxact->xid;
- /*
- * At this stage, we're OK to work with the current epoch as all past
- * and future files have been already discarded.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf != NULL)
@@ -2108,18 +2089,11 @@ void
RecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
char *bufptr;
@@ -2127,10 +2101,6 @@ RecoverPreparedTransactions(void)
TransactionId *subxids;
const char *gid;
- /*
- * At this stage, we're OK to work with the current epoch as all past
- * and future files have been already discarded.
- */
xid = gxact->xid;
/*
@@ -2142,8 +2112,7 @@ RecoverPreparedTransactions(void)
* SubTransSetParent has been set before, if the prepared transaction
* generated xid assignment records.
*/
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf == NULL)
@@ -2211,7 +2180,7 @@ RecoverPreparedTransactions(void)
/*
* ProcessTwoPhaseBuffer
*
- * Given a FullTransactionId, read it either from disk or read it directly
+ * Given a transaction id, read it either from disk or read it directly
* via shmem xlog record pointer using the provided "prepare_start_lsn".
*
* If setParent is true, set up subtransaction parent linkages.
@@ -2220,80 +2189,57 @@ RecoverPreparedTransactions(void)
* value scanned.
*/
static char *
-ProcessTwoPhaseBuffer(FullTransactionId fxid,
+ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk,
bool setParent, bool setNextXid)
{
FullTransactionId nextXid = TransamVariables->nextXid;
+ TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
int i;
- TransactionId xid = XidFromFullTransactionId(fxid);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
- /*
- * Reject full XID if too new. Note that this discards files from future
- * epochs.
- */
- if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing future two-phase state file of epoch %u for transaction %u",
- EpochFromFullTransactionId(fxid), xid)));
- RemoveTwoPhaseFile(fxid, true);
- }
- else
- {
- ereport(WARNING,
- (errmsg("removing future two-phase state from memory for transaction %u",
+ (errmsg("removing stale two-phase state file for transaction %u",
xid)));
- PrepareRedoRemove(xid, true);
- }
- return NULL;
- }
-
- /* Discard files from past epochs */
- if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
- {
- if (fromdisk)
- {
- ereport(WARNING,
- (errmsg("removing past two-phase state file of epoch %u for transaction %u",
- EpochFromFullTransactionId(fxid), xid)));
- RemoveTwoPhaseFile(fxid, true);
+ RemoveTwoPhaseFile(xid, true);
}
else
{
ereport(WARNING,
- (errmsg("removing past two-phase state from memory for transaction %u",
+ (errmsg("removing stale two-phase state from memory for transaction %u",
xid)));
PrepareRedoRemove(xid, true);
}
return NULL;
}
- /* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ /* Reject XID if too new */
+ if (TransactionIdFollowsOrEquals(xid, origNextXid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing stale two-phase state file for transaction %u",
+ (errmsg("removing future two-phase state file for transaction %u",
xid)));
- RemoveTwoPhaseFile(fxid, true);
+ RemoveTwoPhaseFile(xid, true);
}
else
{
ereport(WARNING,
- (errmsg("removing stale two-phase state from memory for transaction %u",
+ (errmsg("removing future two-phase state from memory for transaction %u",
xid)));
PrepareRedoRemove(xid, true);
}
@@ -2574,11 +2520,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
if (!XLogRecPtrIsInvalid(start_lsn))
{
char path[MAXPGPATH];
- FullTransactionId fxid;
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, hdr->xid);
if (access(path, F_OK) == 0)
{
@@ -2673,15 +2616,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
*/
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
if (gxact->ondisk)
- {
- FullTransactionId fxid;
-
- /*
- * We should deal with a file at the current epoch here.
- */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- RemoveTwoPhaseFile(fxid, giveWarning);
- }
+ RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
}
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
index cf61a2f3285..1a662ebe499 100644
--- a/src/test/recovery/t/009_twophase.pl
+++ b/src/test/recovery/t/009_twophase.pl
@@ -572,38 +572,4 @@ my $nsubtrans = $cur_primary->safe_psql('postgres',
);
isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
-###############################################################################
-# Check handling of orphaned 2PC files at recovery.
-###############################################################################
-
-$cur_standby->teardown_node;
-$cur_primary->teardown_node;
-
-# Grab location in logs of primary
-my $log_offset = -s $cur_primary->logfile;
-
-# Create fake files with a transaction ID large or low enough to be in the
-# future or the past, in different epochs, then check that the primary is able
-# to start and remove these files at recovery.
-
-# First bump the epoch with pg_resetwal.
-$cur_primary->command_ok(
- [ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ],
- 'bump epoch of primary');
-
-my $future_2pc_file =
- $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF';
-append_to_file $future_2pc_file, "";
-my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF';
-append_to_file $past_2pc_file, "";
-
-$cur_primary->start;
-$cur_primary->log_check(
- "two-phase files removed at recovery",
- $log_offset,
- log_like => [
- qr/removing past two-phase state file of epoch 238 for transaction 4095/,
- qr/removing future two-phase state file of epoch 511 for transaction 4095/
- ]);
-
done_testing();