aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c28
-rw-r--r--src/backend/replication/logical/reorderbuffer.c21
-rw-r--r--src/backend/replication/logical/snapbuild.c6
-rw-r--r--src/include/replication/reorderbuffer.h2
4 files changed, 42 insertions, 15 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 61c3ed740bb..c530fe9ba09 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
* context.
+ *
+ * NB: Note that every record's xid needs to be processed by reorderbuffer
+ * (xids contained in the content of records are not relevant for this rule).
+ * That means that for records which'd otherwise not go through the
+ * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
+ * call ReorderBufferProcessXid for each record type by default, because
+ * e.g. empty xacts can be handled more efficiently if there's no previous
+ * state for them.
*/
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
@@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
+ buf->origptr);
+
switch (info)
{
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
- /* no point in doing anything yet, data could not be decoded anyway */
+ /*
+ * No point in doing anything yet, data could not be decoded anyway. It's
+ * ok not to call ReorderBufferProcessXid() in that case, except in the
+ * assignment case there'll not be any later records with the same xid;
+ * and in the assignment case we'll not decode those xacts.
+ */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* transactions in the changestream allowing for a kind of
* distributed 2PC.
*/
+ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
break;
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
switch (info)
{
case XLOG_RUNNING_XACTS:
@@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 478c3e874af..b3276c74c7c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
/*
- * Check whether a transaction is already known in this module.xs
+ * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
+ * least once for every xid in XLogRecord->xl_xid (other places in records
+ * may, but do not have to be passed through here).
+ *
+ * Reorderbuffer keeps some datastructures about transactions in LSN order,
+ * for efficiency. To do that it has to know about when transactions are seen
+ * first in the WAL. As many types of records are not actually interesting for
+ * logical decoding, they do not necessarily pass though here.
*/
-bool
-ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
+void
+ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
- ReorderBufferTXN *txn;
-
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
- return txn != NULL;
+ /* many records won't have an xid assigned, centralize check here */
+ if (xid != InvalidTransactionId)
+ ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
}
/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 16100538663..eb571045fea 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot()
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
-
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
@@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
* be needed to decode the change we're currently processing.
*/
- is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
- if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bf0d790053b..cf47a117200 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -369,7 +369,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
-bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
+void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);