aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2016-03-05 18:02:20 -0800
committerAndres Freund <andres@anarazel.de>2016-03-05 18:02:20 -0800
commit465dd92d98e42233df8bce6fbe8c90a65853536b (patch)
tree1b3d7fba47ec6198cd3cb51212fcd7d5b09a98ca /src
parenta9613ee692463c6ff061227c3ca63075ea1f10f1 (diff)
downloadpostgresql-465dd92d98e42233df8bce6fbe8c90a65853536b.tar.gz
postgresql-465dd92d98e42233df8bce6fbe8c90a65853536b.zip
logical decoding: Tell reorderbuffer about all xids.
Logical decoding's reorderbuffer keeps transactions in an LSN ordered list for efficiency. To make that's efficiently possible upper-level xids are forced to be logged before nested subtransaction xids. That only works though if these records are all looked at: Unfortunately we didn't do so for e.g. row level locks, which are otherwise uninteresting for logical decoding. This could lead to errors like: "ERROR: subxact logged without previous toplevel record". It's not sufficient to just look at row locking records, the xid could appear first due to a lot of other types of records (which will trigger the transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny). So invent infrastructure to tell reorderbuffer about xids seen, when they'd otherwise not pass through reorderbuffer.c. Reported-By: Jarred Ward Bug: #13844 Discussion: 20160105033249.1087.66040@wrigleys.postgresql.org Backpatch: 9.4, where logical decoding was added
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c28
-rw-r--r--src/backend/replication/logical/reorderbuffer.c21
-rw-r--r--src/backend/replication/logical/snapbuild.c6
-rw-r--r--src/include/replication/reorderbuffer.h2
4 files changed, 42 insertions, 15 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8f8732afdce..9610b6ce773 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -77,6 +77,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
* context.
+ *
+ * NB: Note that every record's xid needs to be processed by reorderbuffer
+ * (xids contained in the content of records are not relevant for this rule).
+ * That means that for records which'd otherwise not go through the
+ * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
+ * call ReorderBufferProcessXid for each record type by default, because
+ * e.g. empty xacts can be handled more efficiently if there's no previous
+ * state for them.
*/
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
@@ -132,6 +140,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
case RM_GIST_ID:
case RM_SEQ_ID:
case RM_SPGIST_ID:
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, record->xl_xid,
+ buf.origptr);
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
@@ -147,6 +158,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = buf->record.xl_info & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, buf->record.xl_xid,
+ buf->origptr);
+
switch (info)
{
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -187,7 +201,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogRecord *r = &buf->record;
uint8 info = r->xl_info & ~XLR_INFO_MASK;
- /* no point in doing anything yet, data could not be decoded anyway */
+ /*
+ * No point in doing anything yet, data could not be decoded anyway. It's
+ * ok not to call ReorderBufferProcessXid() in that case, except in the
+ * assignment case there'll not be any later records with the same xid;
+ * and in the assignment case we'll not decode those xacts.
+ */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -302,6 +321,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* transactions in the changestream allowing for a kind of
* distributed 2PC.
*/
+ ReorderBufferProcessXid(reorder, r->xl_xid, buf->origptr);
break;
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -318,6 +338,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogRecord *r = &buf->record;
uint8 info = r->xl_info & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, r->xl_xid, buf->origptr);
+
switch (info)
{
case XLOG_RUNNING_XACTS:
@@ -355,6 +377,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = buf->record.xl_xid;
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -408,6 +432,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = buf->record.xl_xid;
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 68cacff08b0..5b434ac825f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1669,16 +1669,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
/*
- * Check whether a transaction is already known in this module.xs
+ * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
+ * least once for every xid in XLogRecord->xl_xid (other places in records
+ * may, but do not have to be passed through here).
+ *
+ * Reorderbuffer keeps some datastructures about transactions in LSN order,
+ * for efficiency. To do that it has to know about when transactions are seen
+ * first in the WAL. As many types of records are not actually interesting for
+ * logical decoding, they do not necessarily pass though here.
*/
-bool
-ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
+void
+ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
- ReorderBufferTXN *txn;
-
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
- return txn != NULL;
+ /* many records won't have an xid assigned, centralize check here */
+ if (xid != InvalidTransactionId)
+ ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
}
/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 4374d6dd4ad..f16273a8209 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -636,8 +636,6 @@ SnapBuildClearExportedSnapshot()
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
-
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
@@ -658,9 +656,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
* be needed to decode the change we're currently processing.
*/
- is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
- if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3ee3b5a0c46..00183fa16d6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -352,7 +352,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
-bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
+void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);