aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xact.c245
-rw-r--r--src/backend/access/transam/xlog.c11
-rw-r--r--src/include/access/xact.h15
-rw-r--r--src/include/access/xlog_internal.h2
4 files changed, 192 insertions, 81 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2ca1c145499..7f01a83c4f9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -962,26 +962,10 @@ RecordTransactionCommit(void)
/*
* Begin commit critical section and insert the commit XLOG record.
*/
- XLogRecData rdata[4];
- int lastrdata = 0;
- xl_xact_commit xlrec;
-
/* Tell bufmgr and smgr to prepare for commit */
BufmgrCommit();
/*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- /*
* Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated
* pg_clog. Without this, it is possible for the checkpoint to set
@@ -1002,43 +986,88 @@ RecordTransactionCommit(void)
MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommit;
- rdata[0].buffer = InvalidBuffer;
- /* dump rels to delete */
- if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
- /* dump committed child Xids */
- if (nchildren > 0)
+
+ /*
+ * Do we need the long commit record? If not, use the compact format.
+ */
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
{
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
+ XLogRecData rdata[4];
+ int lastrdata = 0;
+ xl_xact_commit xlrec;
+ /*
+ * Set flags required for recovery processing of commits.
+ */
+ xlrec.xinfo = 0;
+ if (RelcacheInitFileInval)
+ xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.tsId = MyDatabaseTableSpace;
+
+ xlrec.xact_time = xactStopTimestamp;
+ xlrec.nrels = nrels;
+ xlrec.nsubxacts = nchildren;
+ xlrec.nmsgs = nmsgs;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactCommit;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) rels;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ rdata[2].buffer = InvalidBuffer;
+ lastrdata = 2;
+ }
+ /* dump shared cache invalidation messages */
+ if (nmsgs > 0)
+ {
+ rdata[lastrdata].next = &(rdata[3]);
+ rdata[3].data = (char *) invalMessages;
+ rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
+ rdata[3].buffer = InvalidBuffer;
+ lastrdata = 3;
+ }
+ rdata[lastrdata].next = NULL;
+
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
}
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
+ else
{
- rdata[lastrdata].next = &(rdata[3]);
- rdata[3].data = (char *) invalMessages;
- rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
- rdata[3].buffer = InvalidBuffer;
- lastrdata = 3;
- }
- rdata[lastrdata].next = NULL;
+ XLogRecData rdata[2];
+ int lastrdata = 0;
+ xl_xact_commit_compact xlrec;
+ xlrec.xact_time = xactStopTimestamp;
+ xlrec.nsubxacts = nchildren;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactCommitCompact;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) children;
+ rdata[1].len = nchildren * sizeof(TransactionId);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ rdata[lastrdata].next = NULL;
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
+ }
}
/*
@@ -4441,19 +4470,17 @@ xactGetCommittedChildren(TransactionId **ptr)
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+ TransactionId *sub_xids, int nsubxacts,
+ SharedInvalidationMessage *inval_msgs, int nmsgs,
+ RelFileNode *xnodes, int nrels,
+ Oid dbId, Oid tsId,
+ uint32 xinfo)
{
- TransactionId *sub_xids;
- SharedInvalidationMessage *inval_msgs;
TransactionId max_xid;
int i;
- /* subxid array follows relfilenodes */
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
-
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4476,7 +4503,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, nsubxacts, sub_xids);
}
else
{
@@ -4500,41 +4527,41 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
- XactCompletionRelcacheInitFileInval(xlrec),
- xlrec->dbId, xlrec->tsId);
+ ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
+ XactCompletionRelcacheInitFileInval(xinfo),
+ dbId, tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
* phase transactions. In effect we are ignoring the prepare phase and
* just going straight to lock release.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
{
if (smgrexists(srel, fork))
{
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(xnodes[i], fork);
smgrdounlink(srel, fork, true);
}
}
@@ -4553,8 +4580,46 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
* to reduce that problem window, for any user that requested
* ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xlrec))
+ if (XactCompletionForceSyncCommit(xinfo))
XLogFlush(lsn);
+
+}
+/*
+ * Utility function to call xact_redo_commit_internal after breaking down xlrec
+ */
+static void
+xact_redo_commit(xl_xact_commit *xlrec,
+ TransactionId xid, XLogRecPtr lsn)
+{
+ TransactionId *subxacts;
+ SharedInvalidationMessage *inval_msgs;
+
+ /* subxid array follows relfilenodes */
+ subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ /* invalidation messages array follows subxids */
+ inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+
+ xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
+ inval_msgs, xlrec->nmsgs,
+ xlrec->xnodes, xlrec->nrels,
+ xlrec->dbId,
+ xlrec->tsId,
+ xlrec->xinfo);
+}
+
+/*
+ * Utility function to call xact_redo_commit_internal for compact form of message.
+ */
+static void
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
+ TransactionId xid, XLogRecPtr lsn)
+{
+ xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
+ NULL, 0, /* inval msgs */
+ NULL, 0, /* relfilenodes */
+ InvalidOid, /* dbId */
+ InvalidOid, /* tsId */
+ 0); /* xinfo */
}
/*
@@ -4655,7 +4720,13 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
/* Backup blocks are not used in xact records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
- if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT_COMPACT)
+ {
+ xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
+
+ xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+ }
+ else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
@@ -4703,9 +4774,9 @@ static void
xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
int i;
- TransactionId *xacts;
+ TransactionId *subxacts;
- xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
@@ -4724,15 +4795,15 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
appendStringInfo(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xacts[i]);
+ appendStringInfo(buf, " %u", subxacts[i]);
}
if (xlrec->nmsgs > 0)
{
SharedInvalidationMessage *msgs;
- msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
+ msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
- if (XactCompletionRelcacheInitFileInval(xlrec))
+ if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
xlrec->dbId, xlrec->tsId);
@@ -4759,6 +4830,21 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
}
static void
+xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+{
+ int i;
+
+ appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+
+ if (xlrec->nsubxacts > 0)
+ {
+ appendStringInfo(buf, "; subxacts:");
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ appendStringInfo(buf, " %u", xlrec->subxacts[i]);
+ }
+}
+
+static void
xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
{
int i;
@@ -4802,7 +4888,14 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
- if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT_COMPACT)
+ {
+ xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+
+ appendStringInfo(buf, "commit: ");
+ xact_desc_commit_compact(buf, xlrec);
+ }
+ else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4952d223cdf..178a458eb78 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5593,7 +5593,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
return false;
record_info = record->xl_info & ~XLR_INFO_MASK;
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+ {
+ xl_xact_commit_compact *recordXactCommitData;
+
+ recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
+ recordXtime = recordXactCommitData->xact_time;
+ }
+ else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
xl_xact_commit *recordXactCommitData;
@@ -5680,7 +5687,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recoveryStopTime = recordXtime;
recoveryStopAfter = *includeThis;
- if (record_info == XLOG_XACT_COMMIT)
+ if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{
if (recoveryStopAfter)
ereport(LOG,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb440d41f14..c585752b004 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -106,6 +106,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
+#define XLOG_XACT_COMMIT_COMPACT 0x60
typedef struct xl_xact_assignment
{
@@ -116,6 +117,16 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
+typedef struct xl_xact_commit_compact
+{
+ TimestampTz xact_time; /* time of commit */
+ int nsubxacts; /* number of subtransaction XIDs */
+ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
+ TransactionId subxacts[1]; /* VARIABLE LENGTH ARRAY */
+} xl_xact_commit_compact;
+
+#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+
typedef struct xl_xact_commit
{
TimestampTz xact_time; /* time of commit */
@@ -145,8 +156,8 @@ typedef struct xl_xact_commit
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
typedef struct xl_xact_abort
{
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 34316fffeba..4eaa243948b 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -71,7 +71,7 @@ typedef struct XLogContRecord
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD068 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{