aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/heap/heapam.c13
-rw-r--r--src/backend/access/heap/heapam_visibility.c42
-rw-r--r--src/backend/access/index/genam.c53
-rw-r--r--src/backend/access/table/tableam.c8
-rw-r--r--src/backend/access/transam/xact.c19
5 files changed, 130 insertions, 5 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 5eef225f5c7..00169006fb1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg_internal("only heap AM is supported")));
+ /*
+ * We don't expect direct calls to heap_getnext with valid CheckXidAlive
+ * for catalog or regular tables. See detailed comments in xact.c where
+ * these variables are declared. Normally we have such a check at tableam
+ * level API but this is called from many places so we need to ensure it
+ * here.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected heap_getnext call during logical decoding");
+
/* Note: no locking manipulations needed */
if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
{
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
bufflags |= REGBUF_KEEP_DATA;
+
+ if (IsToastRelation(relation))
+ xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
}
XLogBeginInsert();
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index dba10890aab..c77128087cf 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer,
&cmin, &cmax);
+ /*
+ * If we haven't resolved the combocid to cmin/cmax, that means we
+ * have not decoded the combocid yet. That means the cmin is
+ * definitely in the future, and we're not supposed to see the tuple
+ * yet.
+ *
+ * XXX This only applies to decoding of in-progress transactions. In
+ * regular logical decoding we only execute this code at commit time,
+ * at which point we should have seen all relevant combocids. So
+ * ideally, we should error out in this case but in practice, this
+ * won't happen. If we are too worried about this then we can add an
+ * elog inside ResolveCminCmaxDuringDecoding.
+ *
+ * XXX For the streaming case, we can track the largest combocid
+ * assigned, and error out based on this (when unable to resolve
+ * combocid below that observed maximum value).
+ */
if (!resolved)
- elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+ return false;
Assert(cmin != InvalidCommandId);
@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
htup, buffer,
&cmin, &cmax);
- if (!resolved)
- elog(ERROR, "could not resolve combocid to cmax");
-
- Assert(cmax != InvalidCommandId);
+ /*
+ * If we haven't resolved the combocid to cmin/cmax, that means we
+ * have not decoded the combocid yet. That means the cmax is
+ * definitely in the future, and we're still supposed to see the
+ * tuple.
+ *
+ * XXX This only applies to decoding of in-progress transactions. In
+ * regular logical decoding we only execute this code at commit time,
+ * at which point we should have seen all relevant combocids. So
+ * ideally, we should error out in this case but in practice, this
+ * won't happen. If we are too worried about this then we can add an
+ * elog inside ResolveCminCmaxDuringDecoding.
+ *
+ * XXX For the streaming case, we can track the largest combocid
+ * assigned, and error out based on this (when unable to resolve
+ * combocid below that observed maximum value).
+ */
+ if (!resolved || cmax == InvalidCommandId)
+ return true;
if (cmax >= snapshot->curcid)
return true; /* deleted after scan started */
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index dfba5ae39ae..e3164e674a7 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -28,6 +28,7 @@
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
+#include "storage/procarray.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -429,10 +430,37 @@ systable_beginscan(Relation heapRelation,
sysscan->iscan = NULL;
}
+ /*
+ * If CheckXidAlive is set then set a flag to indicate that system table
+ * scan is in-progress. See detailed comments in xact.c where these
+ * variables are declared.
+ */
+ if (TransactionIdIsValid(CheckXidAlive))
+ bsysscan = true;
+
return sysscan;
}
/*
+ * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive.
+ *
+ * Error out, if CheckXidAlive is aborted. We can't directly use
+ * TransactionIdDidAbort as after crash such transaction might not have been
+ * marked as aborted. See detailed comments in xact.c where the variable
+ * is declared.
+ */
+static inline void
+HandleConcurrentAbort()
+{
+ if (TransactionIdIsValid(CheckXidAlive) &&
+ !TransactionIdIsInProgress(CheckXidAlive) &&
+ !TransactionIdDidCommit(CheckXidAlive))
+ ereport(ERROR,
+ (errcode(ERRCODE_TRANSACTION_ROLLBACK),
+ errmsg("transaction aborted during system catalog scan")));
+}
+
+/*
* systable_getnext --- get next tuple in a heap-or-index scan
*
* Returns NULL if no more tuples available.
@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan)
}
}
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return htup;
}
@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
sysscan->slot,
freshsnap);
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return result;
}
@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan)
if (sysscan->snapshot)
UnregisterSnapshot(sysscan->snapshot);
+ /*
+ * Reset the bsysscan flag at the end of the systable scan. See
+ * detailed comments in xact.c where these variables are declared.
+ */
+ if (TransactionIdIsValid(CheckXidAlive))
+ bsysscan = false;
+
pfree(sysscan);
}
@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
if (htup && sysscan->iscan->xs_recheck)
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
+ /*
+ * Handle the concurrent abort while fetching the catalog tuple during
+ * logical streaming of a transaction.
+ */
+ HandleConcurrentAbort();
+
return htup;
}
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 3afb63b1fe4..c6383197657 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -249,6 +249,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
const TableAmRoutine *tableam = rel->rd_tableam;
/*
+ * We don't expect direct calls to table_tuple_get_latest_tid with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
+
+ /*
* Since this can be called with user-supplied TID, don't trust the input
* too much.
*/
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d4f7c29847f..727d6160359 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -83,6 +83,19 @@ bool XactDeferrable;
int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
/*
+ * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
+ * transaction. Currently, it is used in logical decoding. It's possible
+ * that such transactions can get aborted while the decoding is ongoing in
+ * which case we skip decoding that particular transaction. To ensure that we
+ * check whether the CheckXidAlive is aborted after fetching the tuple from
+ * system tables. We also ensure that during logical decoding we never
+ * directly access the tableam or heap APIs because we are checking for the
+ * concurrent aborts only in systable_* APIs.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+bool bsysscan = false;
+
+/*
* When running as a parallel worker, we place only a single
* TransactionStateData on the parallel worker's state stack, and the XID
* reflected there will be that of the *innermost* currently-active
@@ -2680,6 +2693,9 @@ AbortTransaction(void)
/* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel);
+ /* Reset logical streaming state. */
+ ResetLogicalStreamingState();
+
/* If in parallel mode, clean up workers and exit parallel mode. */
if (IsInParallelMode())
{
@@ -4982,6 +4998,9 @@ AbortSubTransaction(void)
/* Forget about any active REINDEX. */
ResetReindexState(s->nestingLevel);
+ /* Reset logical streaming state. */
+ ResetLogicalStreamingState();
+
/* Exit from parallel mode, if necessary. */
if (IsInParallelMode())
{