diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/heap/heapam.c | 13 | ||||
-rw-r--r-- | src/backend/access/heap/heapam_visibility.c | 42 | ||||
-rw-r--r-- | src/backend/access/index/genam.c | 53 | ||||
-rw-r--r-- | src/backend/access/table/tableam.c | 8 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 19 |
5 files changed, 130 insertions, 5 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 5eef225f5c7..00169006fb1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid CheckXidAlive + * for catalog or regular tables. See detailed comments in xact.c where + * these variables are declared. Normally we have such a check at tableam + * level API but this is called from many places so we need to ensure it + * here. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected heap_getnext call during logical decoding"); + /* Note: no locking manipulations needed */ if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) @@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index dba10890aab..c77128087cf 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmin is + * definitely in the future, and we're not supposed to see the tuple + * yet. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmax is + * definitely in the future, and we're still supposed to see the + * tuple. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index dfba5ae39ae..e3164e674a7 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -429,10 +430,37 @@ systable_beginscan(Relation heapRelation, sysscan->iscan = NULL; } + /* + * If CheckXidAlive is set then set a flag to indicate that system table + * scan is in-progress. See detailed comments in xact.c where these + * variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = true; + return sysscan; } /* + * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive. + * + * Error out, if CheckXidAlive is aborted. We can't directly use + * TransactionIdDidAbort as after crash such transaction might not have been + * marked as aborted. See detailed comments in xact.c where the variable + * is declared. + */ +static inline void +HandleConcurrentAbort() +{ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); +} + +/* * systable_getnext --- get next tuple in a heap-or-index scan * * Returns NULL if no more tuples available. @@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } @@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return result; } @@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan) if (sysscan->snapshot) UnregisterSnapshot(sysscan->snapshot); + /* + * Reset the bsysscan flag at the end of the systable scan. See + * detailed comments in xact.c where these variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = false; + pfree(sysscan); } @@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 3afb63b1fe4..c6383197657 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -249,6 +249,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) const TableAmRoutine *tableam = rel->rd_tableam; /* + * We don't expect direct calls to table_tuple_get_latest_tid with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding"); + + /* * Since this can be called with user-supplied TID, don't trust the input * too much. */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d4f7c29847f..727d6160359 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -83,6 +83,19 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; /* + * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) + * transaction. Currently, it is used in logical decoding. It's possible + * that such transactions can get aborted while the decoding is ongoing in + * which case we skip decoding that particular transaction. To ensure that we + * check whether the CheckXidAlive is aborted after fetching the tuple from + * system tables. We also ensure that during logical decoding we never + * directly access the tableam or heap APIs because we are checking for the + * concurrent aborts only in systable_* APIs. + */ +TransactionId CheckXidAlive = InvalidTransactionId; +bool bsysscan = false; + +/* * When running as a parallel worker, we place only a single * TransactionStateData on the parallel worker's state stack, and the XID * reflected there will be that of the *innermost* currently-active @@ -2680,6 +2693,9 @@ AbortTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* If in parallel mode, clean up workers and exit parallel mode. */ if (IsInParallelMode()) { @@ -4982,6 +4998,9 @@ AbortSubTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* Exit from parallel mode, if necessary. */ if (IsInParallelMode()) { |