aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2015-03-15 17:37:07 +0100
committerAndres Freund <andres@anarazel.de>2015-03-15 17:37:07 +0100
commit4f1b890b1377744688e43218f975d3c8b2ae39f8 (patch)
treedcdfcd3f16ca5d78ba23d2a26a6c0c8469ec414c /src/backend/access
parenta0f5954af19ddcfea946b15744f2006a789dc4bd (diff)
downloadpostgresql-4f1b890b1377744688e43218f975d3c8b2ae39f8.tar.gz
postgresql-4f1b890b1377744688e43218f975d3c8b2ae39f8.zip
Merge the various forms of transaction commit & abort records.
Since 465883b0a two versions of commit records have existed. A compact version that was used when no cache invalidations, smgr unlinks and similar were needed, and a full version that could deal with all that. Additionally the full version was embedded into twophase commit records. That resulted in a measurable reduction in the size of the logged WAL in some workloads. But more recently additions like logical decoding, which e.g. needs information about the database something was executed on, made it applicable in fewer situations. The static split generally made it hard to expand the commit record, because concerns over the size made it hard to add anything to the compact version. Additionally it's not particularly pretty to have twophase.c insert RM_XACT records. Rejigger things so that the commit and abort records only have one form each, including the twophase equivalents. The presence of the various optional (in the sense of not being in every record) pieces is indicated by a bits in the 'xinfo' flag. That flag previously was not included in compact commit records. To prevent an increase in size due to its presence, it's only included if necessary; signalled by a bit in the xl_info bits available for xact.c, similar to heapam.c's XLOG_HEAP_OPMASK/XLOG_HEAP_INIT_PAGE. Twophase commit/aborts are now the same as their normal counterparts. The original transaction's xid is included in an optional data field. This means that commit records generally are smaller, except in the case of a transaction with subtransactions, but no other special cases; the increase there is four bytes, which seems acceptable given that the more common case of not having subtransactions shrank. The savings are especially measurable for twophase commits, which previously always used the full version; but will in practice only infrequently have required that. The motivation for this work are not the space savings and and deduplication though; it's that it makes it easier to extend commit records with additional information. That's just a few lines of code now; without impacting the common case where that information is not needed. Discussion: 20150220152150.GD4149@awork2.anarazel.de, 235610.92468.qm%40web29004.mail.ird.yahoo.com Reviewed-By: Heikki Linnakangas, Simon Riggs
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c245
-rw-r--r--src/backend/access/transam/twophase.c59
-rw-r--r--src/backend/access/transam/xact.c475
-rw-r--r--src/backend/access/transam/xlog.c126
4 files changed, 551 insertions, 354 deletions
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978cef8..b036b6d5242 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -14,53 +14,188 @@
*/
#include "postgres.h"
+#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
#include "utils/timestamp.h"
+/*
+ * Parse the WAL format of a xact commit and abort records into a easier to
+ * understand format.
+ *
+ * This routines are in xactdesc.c because they're accessed in backend (when
+ * replaying WAL) and frontend (pg_xlogdump) code. This file is the only xact
+ * specific one shared between both. They're complicated enough that
+ * duplication would be bothersome.
+ */
+
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+ parsed->nmsgs = xl_invals->nmsgs;
+ parsed->msgs = xl_invals->msgs;
+
+ data += MinSizeOfXactInvals;
+ data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
+
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
{
+ xl_xact_parsed_commit parsed;
int i;
- TransactionId *subxacts;
- subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ ParseCommitRecord(info, xlrec, &parsed);
+
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
+ if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", subxacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
- if (xlrec->nmsgs > 0)
+ if (parsed.nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
- if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+ if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- xlrec->dbId, xlrec->tsId);
+ parsed.dbId, parsed.tsId);
appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < xlrec->nmsgs; i++)
+ for (i = 0; i < parsed.nmsgs; i++)
{
- SharedInvalidationMessage *msg = &msgs[i];
+ SharedInvalidationMessage *msg = &parsed.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
@@ -80,48 +215,41 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+
+ if (XactCompletionForceSyncCommit(parsed.xinfo))
+ appendStringInfo(buf, "; sync");
}
static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
+ xl_xact_parsed_abort parsed;
int i;
- appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+ ParseAbortRecord(info, xlrec, &parsed);
- if (xlrec->nsubxacts > 0)
- {
- appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xlrec->subxacts[i]);
- }
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
- int i;
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
+ if (parsed.nsubxacts > 0)
+ {
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
}
@@ -140,39 +268,19 @@ void
xact_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
- xact_desc_commit_compact(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- xact_desc_abort(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_commit(buf, &xlrec->crec);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_abort(buf, &xlrec->arec);
+ xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
@@ -193,7 +301,7 @@ xact_identify(uint8 info)
{
const char *id = NULL;
- switch (info & ~XLR_INFO_MASK)
+ switch (info & XLOG_XACT_OPMASK)
{
case XLOG_XACT_COMMIT:
id = "COMMIT";
@@ -213,9 +321,6 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
- case XLOG_XACT_COMMIT_COMPACT:
- id = "COMMIT_COMPACT";
- break;
}
return id;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6edc22704cc..4075a6f743a 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2079,7 +2079,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
START_CRIT_SECTION();
@@ -2088,36 +2087,11 @@ RecordTransactionCommitPrepared(TransactionId xid,
MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */
- xlrec.xid = xid;
-
- xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
- xlrec.crec.dbId = MyDatabaseId;
- xlrec.crec.tsId = MyDatabaseTableSpace;
-
- xlrec.crec.xact_time = GetCurrentTimestamp();
- xlrec.crec.nrels = nrels;
- xlrec.crec.nsubxacts = nchildren;
- xlrec.crec.nmsgs = ninvalmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- /* dump cache invalidation messages */
- if (ninvalmsgs > 0)
- XLogRegisterData((char *) invalmsgs,
- ninvalmsgs * sizeof(SharedInvalidationMessage));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+ recptr = XactLogCommitRecord(GetCurrentTimestamp(),
+ nchildren, children, nrels, rels,
+ ninvalmsgs, invalmsgs,
+ initfileinval, false,
+ xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2160,7 +2134,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
/*
@@ -2174,24 +2147,10 @@ RecordTransactionAbortPrepared(TransactionId xid,
START_CRIT_SECTION();
/* Emit the XLOG abort record */
- xlrec.xid = xid;
- xlrec.arec.xact_time = GetCurrentTimestamp();
- xlrec.arec.nrels = nrels;
- xlrec.arec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+ recptr = XactLogAbortRecord(GetCurrentTimestamp(),
+ nchildren, children,
+ nrels, rels,
+ xid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 89769eac072..1495bb499f5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1068,70 +1068,11 @@ RecordTransactionCommit(void)
SetCurrentTransactionStopTimestamp();
- /*
- * Do we need the long commit record? If not, use the compact format.
- *
- * For now always use the non-compact version if wal_level=logical, so
- * we can hide commits from other databases. TODO: In the future we
- * should merge compact and non-compact commits and use a flags
- * variable to determine if it contains subxacts, relations or
- * invalidation messages, that's more extensible and degrades more
- * gracefully. Till then, it's just 20 bytes of overhead.
- */
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
- XLogLogicalInfoActive())
- {
- xl_xact_commit xlrec;
-
- /*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels,
- nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
- XLogRegisterData((char *) invalMessages,
- nmsgs * sizeof(SharedInvalidationMessage));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
- }
- else
- {
- xl_xact_commit_compact xlrec;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
- }
+ XactLogCommitRecord(xactStopTimestamp,
+ nchildren, children, nrels, rels,
+ nmsgs, invalMessages,
+ RelcacheInitFileInval, forceSyncCommit,
+ InvalidTransactionId /* plain commit */);
}
/*
@@ -1424,7 +1365,7 @@ RecordTransactionAbort(bool isSubXact)
RelFileNode *rels;
int nchildren;
TransactionId *children;
- xl_xact_abort xlrec;
+ TimestampTz xact_time;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1464,28 +1405,17 @@ RecordTransactionAbort(bool isSubXact)
/* Write the ABORT record */
if (isSubXact)
- xlrec.xact_time = GetCurrentTimestamp();
+ xact_time = GetCurrentTimestamp();
else
{
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
+ xact_time = xactStopTimestamp;
}
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+ XactLogAbortRecord(xact_time,
+ nchildren, children,
+ nrels, rels,
+ InvalidTransactionId);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -4659,23 +4589,229 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
+
+/*
+ * Log the commit record for a plain or twophase transaction commit.
+ *
+ * A 2pc commit will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid)
+{
+ xl_xact_commit xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_invals xl_invals;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc commit */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_COMMIT;
+ else
+ info = XLOG_XACT_COMMIT_PREPARED;
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = commit_time;
+
+ if (relcacheInval)
+ xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ /*
+ * Relcache invalidations requires information about the current database
+ * and so does logical decoding.
+ */
+ if (nmsgs > 0 || XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (nmsgs > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
+ xl_invals.nmsgs = nmsgs;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the commit record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
+/*
+ * Log the commit record for a plain or twophase transaction abort.
+ *
+ * A 2pc abort will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid)
+{
+ xl_xact_abort xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc abort */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_ABORT;
+ else
+ info = XLOG_XACT_ABORT_PREPARED;
+
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = abort_time;
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the abort record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
/*
* Before 9.0 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TimestampTz commit_time,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit(xl_xact_parsed_commit *parsed,
+ TransactionId xid,
+ XLogRecPtr lsn)
{
TransactionId max_xid;
int i;
- max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+ max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4694,15 +4830,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Set the transaction commit timestamp and metadata */
- TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
- commit_time, InvalidCommitTsNodeId, false);
+ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
+ parsed->xact_time, InvalidCommitTsNodeId,
+ false);
if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4726,21 +4863,24 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(
+ xid, parsed->nsubxacts, parsed->subxacts, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
- XactCompletionRelcacheInitFileInval(xinfo),
- dbId, tsId);
+ ProcessCommittedInvalidationMessages(
+ parsed->msgs, parsed->nmsgs,
+ XactCompletionRelcacheInitFileInval(parsed->xinfo),
+ parsed->dbId, parsed->tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
@@ -4753,7 +4893,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Make sure files supposed to be dropped are dropped */
- if (nrels > 0)
+ if (parsed->nrels > 0)
{
/*
* First update minimum recovery point to cover this WAL record. Once
@@ -4772,13 +4912,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
*/
XLogFlush(lsn);
- for (i = 0; i < nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4796,52 +4936,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* minRecoveryPoint during recovery) helps to reduce that problem window,
* for any user that requested ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xinfo))
+ if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
}
/*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- TransactionId *subxacts;
- SharedInvalidationMessage *inval_msgs;
-
- /* subxid array follows relfilenodes */
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
-}
-
-/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
*
@@ -4851,14 +4951,10 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
{
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ int i;
+ TransactionId max_xid;
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4867,6 +4963,10 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
+ max_xid = TransactionIdLatest(xid,
+ parsed->nsubxacts,
+ parsed->subxacts);
+
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
@@ -4879,7 +4979,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4895,12 +4995,13 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
RecordKnownAssignedTransactionIds(max_xid);
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
/*
* We must update the ProcArray after we have marked clog.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* There are no flat files that need updating, nor invalidation
@@ -4910,17 +5011,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
/*
* Release locks, if any. There are no invalidations to send.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4929,28 +5030,52 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
void
xact_redo(XLogReaderState *record)
{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
- xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+ if (info == XLOG_XACT_COMMIT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr);
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, parsed.twophase_xid,
+ record->EndRecPtr);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_abort(xlrec, XLogRecGetXid(record));
+ if (info == XLOG_XACT_ABORT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, XLogRecGetXid(record));
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, parsed.twophase_xid);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
else if (info == XLOG_XACT_PREPARE)
{
@@ -4958,20 +5083,6 @@ xact_redo(XLogReaderState *record)
RecreateTwoPhaseFile(XLogRecGetXid(record),
XLogRecGetData(record), XLogRecGetDataLen(record));
}
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
-
- xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
-
- xact_redo_abort(&xlrec->arec, xlrec->xid);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 72af2c43303..e2d187f74d6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5168,39 +5168,27 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
static bool
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 xact_info = info & XLOG_XACT_OPMASK;
uint8 rmid = XLogRecGetRmid(record);
- if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
- {
- *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED))
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
- {
- *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED))
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
- {
- *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
- return true;
- }
return false;
}
@@ -5216,7 +5204,7 @@ static bool
recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
- uint8 record_info;
+ uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
@@ -5237,27 +5225,40 @@ recoveryStopsBefore(XLogReaderState *record)
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_COMMIT_PREPARED)
+ else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
isCommit = true;
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
- else if (record_info == XLOG_XACT_ABORT)
+ else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
- isCommit = false;
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ isCommit = true;
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
else
return false;
@@ -5325,11 +5326,12 @@ recoveryStopsBefore(XLogReaderState *record)
static bool
recoveryStopsAfter(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 info;
+ uint8 xact_info;
uint8 rmid;
TimestampTz recordXtime;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);
/*
@@ -5337,7 +5339,7 @@ recoveryStopsAfter(XLogReaderState *record)
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
@@ -5358,12 +5360,15 @@ recoveryStopsAfter(XLogReaderState *record)
}
}
- if (rmid == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED ||
- record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED))
+ if (rmid != RM_XACT_ID)
+ return false;
+
+ xact_info = info & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED ||
+ xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
TransactionId recordXid;
@@ -5372,10 +5377,26 @@ recoveryStopsAfter(XLogReaderState *record)
SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
- if (record_info == XLOG_XACT_COMMIT_PREPARED)
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ if (xact_info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
else
recordXid = XLogRecGetXid(record);
@@ -5396,17 +5417,16 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
- if (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
- else if (record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
@@ -5494,7 +5514,7 @@ SetRecoveryPause(bool recoveryPause)
static bool
recoveryApplyDelay(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 xact_info;
TimestampTz xtime;
long secs;
int microsecs;
@@ -5511,11 +5531,13 @@ recoveryApplyDelay(XLogReaderState *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)))
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
+ return false;
+
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT;
+
+ if (xact_info != XLOG_XACT_COMMIT &&
+ xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
if (!getRecordTimestamp(record, &xtime))