aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2015-03-15 17:37:07 +0100
committerAndres Freund <andres@anarazel.de>2015-03-15 17:37:07 +0100
commit4f1b890b1377744688e43218f975d3c8b2ae39f8 (patch)
treedcdfcd3f16ca5d78ba23d2a26a6c0c8469ec414c /src/backend/replication/logical/decode.c
parenta0f5954af19ddcfea946b15744f2006a789dc4bd (diff)
downloadpostgresql-4f1b890b1377744688e43218f975d3c8b2ae39f8.tar.gz
postgresql-4f1b890b1377744688e43218f975d3c8b2ae39f8.zip
Merge the various forms of transaction commit & abort records.
Since 465883b0a two versions of commit records have existed. A compact version that was used when no cache invalidations, smgr unlinks and similar were needed, and a full version that could deal with all that. Additionally the full version was embedded into twophase commit records. That resulted in a measurable reduction in the size of the logged WAL in some workloads. But more recently additions like logical decoding, which e.g. needs information about the database something was executed on, made it applicable in fewer situations. The static split generally made it hard to expand the commit record, because concerns over the size made it hard to add anything to the compact version. Additionally it's not particularly pretty to have twophase.c insert RM_XACT records. Rejigger things so that the commit and abort records only have one form each, including the twophase equivalents. The presence of the various optional (in the sense of not being in every record) pieces is indicated by a bits in the 'xinfo' flag. That flag previously was not included in compact commit records. To prevent an increase in size due to its presence, it's only included if necessary; signalled by a bit in the xl_info bits available for xact.c, similar to heapam.c's XLOG_HEAP_OPMASK/XLOG_HEAP_INIT_PAGE. Twophase commit/aborts are now the same as their normal counterparts. The original transaction's xid is included in an optional data field. This means that commit records generally are smaller, except in the case of a transaction with subtransactions, but no other special cases; the increase there is four bytes, which seems acceptable given that the more common case of not having subtransactions shrank. The savings are especially measurable for twophase commits, which previously always used the full version; but will in practice only infrequently have required that. The motivation for this work are not the space savings and and deduplication though; it's that it makes it easier to extend commit records with additional information. That's just a few lines of code now; without impacting the common case where that information is not needed. Discussion: 20150220152150.GD4149@awork2.anarazel.de, 235610.92468.qm%40web29004.mail.ird.yahoo.com Reviewed-By: Heikki Linnakangas, Simon Riggs
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c133
1 files changed, 40 insertions, 93 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd515a..eb7293f2f33 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
- TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+ xl_xact_parsed_commit *parsed, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
XLogReaderState *r = buf->record;
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
@@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
case XLOG_XACT_COMMIT:
- {
- xl_xact_commit *xlrec;
- TransactionId *subxacts = NULL;
- SharedInvalidationMessage *invals = NULL;
-
- xlrec = (xl_xact_commit *) XLogRecGetData(r);
-
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
case XLOG_XACT_COMMIT_PREPARED:
{
- xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
- TransactionId *subxacts;
- SharedInvalidationMessage *invals = NULL;
-
- /* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
- xlrec = &prec->crec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
- case XLOG_XACT_COMMIT_COMPACT:
- {
- xl_xact_commit_compact *xlrec;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
- xlrec->xact_time,
- xlrec->nsubxacts, xlrec->subxacts,
- 0, NULL);
+ DecodeCommit(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ABORT:
- {
- xl_xact_abort *xlrec;
- TransactionId *sub_xids;
-
- xlrec = (xl_xact_abort *) XLogRecGetData(r);
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
- DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
- sub_xids, xlrec->nsubxacts);
- break;
- }
case XLOG_XACT_ABORT_PREPARED:
{
- xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
+ xl_xact_parsed_abort parsed;
+ TransactionId xid;
- /* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
- xlrec = &prec->arec;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- /* r->xl_xid is committed in a separate record */
- DecodeAbort(ctx, buf->origptr, prec->xid,
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, &parsed, xid);
break;
}
-
case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
@@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msgs)
+ xl_xact_parsed_commit *parsed, TransactionId xid)
{
int i;
@@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* transaction's contents, since the various caches need to always be
* consistent.
*/
- if (ninval_msgs > 0)
+ if (parsed->nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- ninval_msgs, msgs);
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- nsubxacts, sub_xids);
+ parsed->nsubxacts, parsed->subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
@@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (dboid != InvalidOid && dboid != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
{
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
- sub_xids++;
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
@@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* tell the reorderbuffer about the surviving subtransactions */
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
- sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ parsed->xact_time);
}
/*
@@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* snapbuild.c and reorderbuffer.c
*/
static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
- TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+ SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+ parsed->nsubxacts, parsed->subxacts);
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
- sub_xids++;
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
}
- ReorderBufferAbort(ctx->reorder, xid, lsn);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*