aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c186
-rw-r--r--src/include/replication/reorderbuffer.h32
2 files changed, 177 insertions, 41 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 10a37667a51..ed5a2946dc1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -106,6 +106,7 @@
#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
+#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -260,6 +261,8 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared);
+static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -793,11 +796,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * While streaming the previous changes we have detected that the
- * transaction is aborted. So there is no point in collecting further
- * changes for it.
+ * If we have detected that the transaction is aborted while streaming the
+ * previous changes or by checking its CLOG, there is no point in
+ * collecting further changes for it.
*/
- if (txn->concurrent_abort)
+ if (rbtxn_is_aborted(txn))
{
/*
* We don't need to update memory accounting for this change as we
@@ -1620,8 +1623,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Discard changes from a transaction (and subtransactions), either after
- * streaming or decoding them at PREPARE. Keep the remaining info -
- * transactions, tuplecids, invalidations and snapshots.
+ * streaming, decoding them at PREPARE, or detecting the transaction abort.
+ * Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
*
* We additionally remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
@@ -1650,6 +1654,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
+ ReorderBufferMaybeMarkTXNStreamed(rb, subtxn);
ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
@@ -1680,24 +1685,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
- /*
- * Mark the transaction as streamed.
- *
- * The top-level transaction, is marked as streamed always, even if it
- * does not contain any changes (that is, when all the changes are in
- * subtransactions).
- *
- * For subtransactions, we only mark them as streamed when there are
- * changes in them.
- *
- * We do it this way because of aborts - we don't want to send aborts for
- * XIDs the downstream is not aware of. And of course, it always knows
- * about the toplevel xact (we send the XID in all messages), but we never
- * stream XIDs of empty subxacts.
- */
- if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
- txn->txn_flags |= RBTXN_IS_STREAMED;
-
if (txn_prepared)
{
/*
@@ -1753,6 +1740,76 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
}
/*
+ * Check the transaction status by CLOG lookup and discard all changes if
+ * the transaction is aborted. The transaction status is cached in
+ * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
+ * next call.
+ *
+ * Return true if the transaction is aborted, otherwise return false.
+ *
+ * When the 'debug_logical_replication_streaming' is set to "immediate", we
+ * don't check the transaction status, meaning the caller will always process
+ * this transaction.
+ */
+static bool
+ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /* Quick return for regression tests */
+ if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
+ return false;
+
+ /*
+ * Quick return if the transaction status is already known.
+ */
+
+ if (rbtxn_is_committed(txn))
+ return false;
+ if (rbtxn_is_aborted(txn))
+ {
+ /* Already-aborted transactions should not have any changes */
+ Assert(txn->size == 0);
+
+ return true;
+ }
+
+ /* Otherwise, check the transaction status using CLOG lookup */
+
+ if (TransactionIdIsInProgress(txn->xid))
+ return false;
+
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ /*
+ * Remember the transaction is committed so that we can skip CLOG
+ * check next time, avoiding the pressure on CLOG lookup.
+ */
+ Assert(!rbtxn_is_aborted(txn));
+ txn->txn_flags |= RBTXN_IS_COMMITTED;
+ return false;
+ }
+
+ /*
+ * The transaction aborted. We discard both the changes collected so far
+ * and the toast reconstruction data. The full cleanup will happen as part
+ * of decoding ABORT record of this transaction.
+ */
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferToastReset(rb, txn);
+
+ /* All changes should be discarded */
+ Assert(txn->size == 0);
+
+ /*
+ * Mark the transaction as aborted so we can ignore future changes of this
+ * transaction.
+ */
+ Assert(!rbtxn_is_committed(txn));
+ txn->txn_flags |= RBTXN_IS_ABORTED;
+
+ return true;
+}
+
+/*
* Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
* HeapTupleSatisfiesHistoricMVCC.
*/
@@ -1917,7 +1974,9 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* Note, we send stream prepare even if a concurrent abort is
* detected. See DecodePrepare for more information.
*/
+ Assert(!rbtxn_sent_prepare(txn));
rb->stream_prepare(rb, txn, txn->final_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
/*
* This is a PREPARED transaction, part of a two-phase commit. The
@@ -2053,6 +2112,30 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
/*
+ * Mark the given transaction as streamed if it's a top-level transaction
+ * or has changes.
+ */
+static void
+ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /*
+ * The top-level transaction, is marked as streamed always, even if it
+ * does not contain any changes (that is, when all the changes are in
+ * subtransactions).
+ *
+ * For subtransactions, we only mark them as streamed when there are
+ * changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts for
+ * XIDs the downstream is not aware of. And of course, it always knows
+ * about the top-level xact (we send the XID in all messages), but we
+ * never stream XIDs of empty subxacts.
+ */
+ if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
+ txn->txn_flags |= RBTXN_IS_STREAMED;
+}
+
+/*
* Helper function for ReorderBufferProcessTXN to handle the concurrent
* abort of the streaming transaction. This resets the TXN such that it
* can be used to stream the remaining data of transaction being processed.
@@ -2543,7 +2626,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* regular ones).
*/
if (rbtxn_prepared(txn))
+ {
+ Assert(!rbtxn_sent_prepare(txn));
rb->prepare(rb, txn, commit_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
+ }
else
rb->commit(rb, txn, commit_lsn);
}
@@ -2595,6 +2682,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (streaming || rbtxn_prepared(txn))
{
+ if (streaming)
+ ReorderBufferMaybeMarkTXNStreamed(rb, txn);
+
ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
@@ -2648,7 +2738,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
- curtxn->concurrent_abort = true;
+
+ /* Remember the transaction is aborted. */
+ Assert(!rbtxn_is_committed(curtxn));
+ curtxn->txn_flags |= RBTXN_IS_ABORTED;
+
+ /* Mark the transaction is streamed if appropriate */
+ if (stream_started)
+ ReorderBufferMaybeMarkTXNStreamed(rb, txn);
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2828,15 +2925,15 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
- * We send the prepare for the concurrently aborted xacts so that later
- * when rollback prepared is decoded and sent, the downstream should be
- * able to rollback such a xact. See comments atop DecodePrepare.
- *
- * Note, for the concurrent_abort + streaming case a stream_prepare was
- * already sent within the ReorderBufferReplay call above.
+ * Send a prepare if not already done so. This might occur if we have
+ * detected a concurrent abort while replaying the non-streaming
+ * transaction.
*/
- if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+ if (!rbtxn_sent_prepare(txn))
+ {
rb->prepare(rb, txn, txn->final_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
+ }
}
/*
@@ -3566,7 +3663,8 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
}
/*
- * Find the largest streamable toplevel transaction to evict (by streaming).
+ * Find the largest streamable (and non-aborted) toplevel transaction to evict
+ * (by streaming).
*
* This can be seen as an optimized version of ReorderBufferLargestTXN, which
* should give us the same transaction (because we don't update memory account
@@ -3608,9 +3706,15 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
/* base_snapshot must be set */
Assert(txn->base_snapshot != NULL);
+ /* Don't consider these kinds of transactions for eviction. */
+ if (rbtxn_has_partial_change(txn) ||
+ !rbtxn_has_streamable_change(txn) ||
+ rbtxn_is_aborted(txn))
+ continue;
+
+ /* Find the largest of the eviction candidates. */
if ((largest == NULL || txn->total_size > largest_size) &&
- (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
- rbtxn_has_streamable_change(txn))
+ (txn->total_size > 0))
{
largest = txn;
largest_size = txn->total_size;
@@ -3661,8 +3765,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
rb->size > 0))
{
/*
- * Pick the largest transaction and evict it from memory by streaming,
- * if possible. Otherwise, spill to disk.
+ * Pick the largest non-aborted transaction and evict it from memory
+ * by streaming, if possible. Otherwise, spill to disk.
*/
if (ReorderBufferCanStartStreaming(rb) &&
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
@@ -3672,6 +3776,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+ continue;
+
ReorderBufferStreamTXN(rb, txn);
}
else
@@ -3687,6 +3795,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->size > 0);
Assert(rb->size >= txn->size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+ continue;
+
ReorderBufferSerializeTXN(rb, txn);
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index a669658b3f1..9d9ac2f0830 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,9 @@ typedef struct ReorderBufferChange
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
+#define RBTXN_SENT_PREPARE 0x0200
+#define RBTXN_IS_COMMITTED 0x0400
+#define RBTXN_IS_ABORTED 0x0800
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -224,12 +227,36 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
-/* Has this transaction been prepared? */
+/*
+ * Is this a prepared transaction?
+ *
+ * Being true means that this transaction should be prepared instead of
+ * committed. To check whether a prepare or a stream_prepare has already
+ * been sent for this transaction, we need to use rbtxn_sent_prepare().
+ */
#define rbtxn_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
+/* Has a prepare or stream_prepare already been sent? */
+#define rbtxn_sent_prepare(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
+)
+
+/* Is this transaction committed? */
+#define rbtxn_is_committed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
+)
+
+/* Is this transaction aborted? */
+#define rbtxn_is_aborted(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
+)
+
/* prepare for this transaction skipped? */
#define rbtxn_skip_prepared(txn) \
( \
@@ -419,9 +446,6 @@ typedef struct ReorderBufferTXN
/* Size of top-transaction including sub-transactions. */
Size total_size;
- /* If we have detected concurrent abort then ignore future changes. */
- bool concurrent_abort;
-
/*
* Private data pointer of the output plugin.
*/