aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/ondisk_startup.out15
-rw-r--r--contrib/test_decoding/specs/ondisk_startup.spec8
-rw-r--r--src/backend/replication/logical/decode.c3
-rw-r--r--src/backend/replication/logical/reorderbuffer.c2
-rw-r--r--src/backend/replication/logical/snapbuild.c418
-rw-r--r--src/include/replication/snapbuild.h25
6 files changed, 221 insertions, 250 deletions
diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out
index 65115c830a4..c7b1f45b46b 100644
--- a/contrib/test_decoding/expected/ondisk_startup.out
+++ b/contrib/test_decoding/expected/ondisk_startup.out
@@ -1,21 +1,30 @@
Parsed test spec with 3 sessions
-starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
-step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
?column?
f
step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
-step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+step s3b: BEGIN;
+step s3txid: SELECT txid_current() IS NULL;
?column?
f
step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
step s2c: COMMIT;
+step s2b: BEGIN;
+step s2txid: SELECT txid_current() IS NULL;
+?column?
+
+f
+step s3c: COMMIT;
step s1init: <... completed>
?column?
init
+step s2c: COMMIT;
step s1insert: INSERT INTO do_write DEFAULT VALUES;
step s1checkpoint: CHECKPOINT;
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec
index 82237056392..12c57a813da 100644
--- a/contrib/test_decoding/specs/ondisk_startup.spec
+++ b/contrib/test_decoding/specs/ondisk_startup.spec
@@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
session "s2"
setup { SET synchronous_commit=on; }
-step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT txid_current() IS NULL; }
step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
step "s2c" { COMMIT; }
@@ -32,7 +33,8 @@ step "s2c" { COMMIT; }
session "s3"
setup { SET synchronous_commit=on; }
-step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT txid_current() IS NULL; }
step "s3c" { COMMIT; }
# Force usage of ondisk snapshot by starting and not finishing a
@@ -40,4 +42,4 @@ step "s3c" { COMMIT; }
# reached. In combination with a checkpoint forcing a snapshot to be
# written and a new restart point computed that'll lead to the usage
# of the snapshot.
-permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 46cd5ba1f2d..4fabb880ca4 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
- parsed->nsubxacts, parsed->subxacts);
-
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 38c088f23b4..10798aee443 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1768,7 +1768,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
{
- elog(DEBUG1, "aborting old transaction %u", txn->xid);
+ elog(DEBUG2, "aborting old transaction %u", txn->xid);
/* remove potential on-disk data, and deallocate this tx */
ReorderBufferCleanupTXN(rb, txn);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 9f0796a0ec9..8001f5b1f0d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -56,23 +56,34 @@
*
*
* The snapbuild machinery is starting up in several stages, as illustrated
- * by the following graph:
+ * by the following graph describing the SnapBuild->state transitions:
+ *
* +-------------------------+
- * +----|SNAPBUILD_START |-------------+
+ * +----| START |-------------+
* | +-------------------------+ |
* | | |
* | | |
- * | running_xacts with running xacts |
+ * | running_xacts #1 |
* | | |
* | | |
* | v |
* | +-------------------------+ v
- * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
+ * | | BUILDING_SNAPSHOT |------------>|
* | +-------------------------+ |
+ * | | |
+ * | | |
+ * | running_xacts #2, xacts from #1 finished |
+ * | | |
+ * | | |
+ * | v |
+ * | +-------------------------+ v
+ * | | FULL_SNAPSHOT |------------>|
+ * | +-------------------------+ |
+ * | | |
* running_xacts | saved snapshot
* with zero xacts | at running_xacts's lsn
* | | |
- * | all running toplevel TXNs finished |
+ * | running_xacts with xacts from #2 finished |
* | | |
* | v |
* | +-------------------------+ |
@@ -82,8 +93,8 @@
* Initially the machinery is in the START stage. When an xl_running_xacts
* record is read that is sufficiently new (above the safe xmin horizon),
* there's a state transition. If there were no running xacts when the
- * runnign_xacts record was generated, we'll directly go into CONSISTENT
- * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
+ * running_xacts record was generated, we'll directly go into CONSISTENT
+ * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
* snapshot means that all transactions that start henceforth can be decoded
* in their entirety, but transactions that started previously can't. In
* FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
@@ -183,26 +194,24 @@ struct SnapBuild
ReorderBuffer *reorder;
/*
- * Information about initially running transactions
- *
- * When we start building a snapshot there already may be transactions in
- * progress. Those are stored in running.xip. We don't have enough
- * information about those to decode their contents, so until they are
- * finished (xcnt=0) we cannot switch to a CONSISTENT state.
+ * Outdated: This struct isn't used for its original purpose anymore, but
+ * can't be removed / changed in a minor version, because it's stored
+ * on-disk.
*/
struct
{
/*
- * As long as running.xcnt all XIDs < running.xmin and > running.xmax
- * have to be checked whether they still are running.
+ * NB: This field is misused, until a major version can break on-disk
+ * compatibility. See SnapBuildNextPhaseAt() /
+ * SnapBuildStartNextPhaseAt().
*/
- TransactionId xmin;
- TransactionId xmax;
+ TransactionId was_xmin;
+ TransactionId was_xmax;
- size_t xcnt; /* number of used xip entries */
- size_t xcnt_space; /* allocated size of xip */
- TransactionId *xip; /* running xacts array, xidComparator-sorted */
- } running;
+ size_t was_xcnt; /* number of used xip entries */
+ size_t was_xcnt_space; /* allocated size of xip */
+ TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
+ } was_running;
/*
* Array of transactions which could have catalog changes that committed
@@ -248,12 +257,6 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* transaction state manipulation functions */
-static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
-
-/* ->running manipulation */
-static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
-
/* ->committed manipulation */
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
@@ -268,11 +271,39 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr
/* xlog reading helper functions for SnapBuildProcessRecord */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+/*
+ * Return TransactionId after which the next phase of initial snapshot
+ * building will happen.
+ */
+static inline TransactionId
+SnapBuildNextPhaseAt(SnapBuild *builder)
+{
+ /*
+ * For backward compatibility reasons this has to be stored in the wrongly
+ * named field. Will be fixed in next major version.
+ */
+ return builder->was_running.was_xmax;
+}
+
+/*
+ * Set TransactionId after which the next phase of initial snapshot building
+ * will happen.
+ */
+static inline void
+SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
+{
+ /*
+ * For backward compatibility reasons this has to be stored in the wrongly
+ * named field. Will be fixed in next major version.
+ */
+ builder->was_running.was_xmax = at;
+}
/*
* Allocate a new snapshot builder.
@@ -682,7 +713,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
* we got into the SNAPBUILD_FULL_SNAPSHOT state.
*/
if (builder->state < SNAPBUILD_CONSISTENT &&
- SnapBuildTxnIsRunning(builder, xid))
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
return false;
/*
@@ -751,38 +782,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
}
/*
- * Check whether `xid` is currently 'running'.
- *
- * Running transactions in our parlance are transactions which we didn't
- * observe from the start so we can't properly decode their contents. They
- * only exist after we freshly started from an < CONSISTENT snapshot.
- */
-static bool
-SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
-{
- Assert(builder->state < SNAPBUILD_CONSISTENT);
- Assert(TransactionIdIsNormal(builder->running.xmin));
- Assert(TransactionIdIsNormal(builder->running.xmax));
-
- if (builder->running.xcnt &&
- NormalTransactionIdFollows(xid, builder->running.xmin) &&
- NormalTransactionIdPrecedes(xid, builder->running.xmax))
- {
- TransactionId *search =
- bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
- sizeof(TransactionId), xidComparator);
-
- if (search != NULL)
- {
- Assert(*search == xid);
- return true;
- }
- }
-
- return false;
-}
-
-/*
* Add a new Snapshot to all transactions we're decoding that currently are
* in-progress so they can see new catalog contents made by the transaction
* that just committed. This is necessary because those in-progress
@@ -904,63 +903,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
}
/*
- * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
- * keeping track of the amount of running transactions.
- */
-static void
-SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
-{
- if (builder->state == SNAPBUILD_CONSISTENT)
- return;
-
- /*
- * NB: This handles subtransactions correctly even if we started from
- * suboverflowed xl_running_xacts because we only keep track of toplevel
- * transactions. Since the latter are always allocated before their
- * subxids and since they end at the same time it's sufficient to deal
- * with them here.
- */
- if (SnapBuildTxnIsRunning(builder, xid))
- {
- Assert(builder->running.xcnt > 0);
-
- if (!--builder->running.xcnt)
- {
- /*
- * None of the originally running transaction is running anymore,
- * so our incrementally built snapshot now is consistent.
- */
- ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- (uint32) (lsn >> 32), (uint32) lsn),
- errdetail("Transaction ID %u finished; no more running transactions.",
- xid)));
- builder->state = SNAPBUILD_CONSISTENT;
- }
- }
-}
-
-/*
- * Abort a transaction, throw away all state we kept.
- */
-void
-SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
- TransactionId xid,
- int nsubxacts, TransactionId *subxacts)
-{
- int i;
-
- for (i = 0; i < nsubxacts; i++)
- {
- TransactionId subxid = subxacts[i];
-
- SnapBuildEndTxn(builder, lsn, subxid);
- }
-
- SnapBuildEndTxn(builder, lsn, xid);
-}
-
-/*
* Handle everything that needs to be done when a transaction commits
*/
void
@@ -1004,11 +946,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
TransactionId subxid = subxacts[nxact];
/*
- * make sure txn is not tracked in running txn's anymore, switch state
- */
- SnapBuildEndTxn(builder, lsn, subxid);
-
- /*
* If we're forcing timetravel we also need visibility information
* about subtransaction, so keep track of subtransaction's state.
*/
@@ -1037,12 +974,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
}
- /*
- * Make sure toplevel txn is not tracked in running txn's anymore, switch
- * state to consistent if possible.
- */
- SnapBuildEndTxn(builder, lsn, xid);
-
if (forced_timetravel)
{
elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
@@ -1232,25 +1163,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
*
* a) There were no running transactions when the xl_running_xacts record
* was inserted, jump to CONSISTENT immediately. We might find such a
- * state we were waiting for b) or c).
- *
- * b) Wait for all toplevel transactions that were running to end. We
- * simply track the number of in-progress toplevel transactions and
- * lower it whenever one commits or aborts. When that number
- * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
- * to CONSISTENT.
- * NB: We need to search running.xip when seeing a transaction's end to
- * make sure it's a toplevel transaction and it's been one of the
- * initially running ones.
- * Interestingly, in contrast to HS, this allows us not to care about
- * subtransactions - and by extension suboverflowed xl_running_xacts -
- * at all.
+ * state while waiting on c)'s sub-states.
*
- * c) This (in a previous run) or another decoding slot serialized a
+ * b) This (in a previous run) or another decoding slot serialized a
* snapshot to disk that we can use. Can't use this method for the
* initial snapshot when slot is being created and needs full snapshot
* for export or direct use, as that snapshot will only contain catalog
* modifying transactions.
+ *
+ * c) First incrementally build a snapshot for catalog tuples
+ * (BUILDING_SNAPSHOT), that requires all, already in-progress,
+ * transactions to finish. Every transaction starting after that
+ * (FULL_SNAPSHOT state), has enough information to be decoded. But
+ * for older running transactions no viable snapshot exists yet, so
+ * CONSISTENT will only be reached once all of those have finished.
* ---
*/
@@ -1267,16 +1193,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
(uint32) (lsn >> 32), (uint32) lsn),
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
builder->initial_xmin_horizon, running->oldestRunningXid)));
+
+
+ SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
+
return true;
}
/*
* a) No transaction were running, we can jump to consistent.
*
+ * This is not affected by races around xl_running_xacts, because we can
+ * miss transaction commits, but currently not transactions starting.
+ *
* NB: We might have already started to incrementally assemble a snapshot,
* so we need to be careful to deal with that.
*/
- if (running->xcnt == 0)
+ if (running->oldestRunningXid == running->nextXid)
{
if (builder->start_decoding_at == InvalidXLogRecPtr ||
builder->start_decoding_at <= lsn)
@@ -1291,12 +1224,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
- /* no transactions running now */
- builder->running.xcnt = 0;
- builder->running.xmin = InvalidTransactionId;
- builder->running.xmax = InvalidTransactionId;
-
builder->state = SNAPBUILD_CONSISTENT;
+ SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
@@ -1305,30 +1234,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false;
}
- /* c) valid on disk state and not building full snapshot */
+ /* b) valid on disk state and not building full snapshot */
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
return false;
}
-
/*
- * b) first encounter of a useable xl_running_xacts record. If we had
- * found one earlier we would either track running transactions (i.e.
- * builder->running.xcnt != 0) or be consistent (this function wouldn't
- * get called).
+ * c) transition from START to BUILDING_SNAPSHOT.
+ *
+ * In START state, and a xl_running_xacts record with running xacts is
+ * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
+ * record xl_running_xacts->nextXid. Once all running xacts have finished
+ * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
+ * might look that we could use xl_running_xact's ->xids information to
+ * get there quicker, but that is problematic because transactions marked
+ * as running, might already have inserted their commit record - it's
+ * infeasible to change that with locking.
*/
- else if (!builder->running.xcnt)
+ else if (builder->state == SNAPBUILD_START)
{
- int off;
-
- /*
- * We only care about toplevel xids as those are the ones we
- * definitely see in the wal stream. As snapbuild.c tracks committed
- * instead of running transactions we don't need to know anything
- * about uncommitted subtransactions.
- */
+ builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
+ SnapBuildStartNextPhaseAt(builder, running->nextXid);
/*
* Start with an xmin/xmax that's correct for future, when all the
@@ -1342,59 +1270,57 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
- builder->running.xcnt = running->xcnt;
- builder->running.xcnt_space = running->xcnt;
- builder->running.xip =
- MemoryContextAlloc(builder->context,
- builder->running.xcnt * sizeof(TransactionId));
- memcpy(builder->running.xip, running->xids,
- builder->running.xcnt * sizeof(TransactionId));
-
- /* sort so we can do a binary search */
- qsort(builder->running.xip, builder->running.xcnt,
- sizeof(TransactionId), xidComparator);
-
- builder->running.xmin = builder->running.xip[0];
- builder->running.xmax = builder->running.xip[running->xcnt - 1];
-
- /* makes comparisons cheaper later */
- TransactionIdRetreat(builder->running.xmin);
- TransactionIdAdvance(builder->running.xmax);
-
- builder->state = SNAPBUILD_FULL_SNAPSHOT;
-
ereport(LOG,
(errmsg("logical decoding found initial starting point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
- errdetail_plural("%u transaction needs to finish.",
- "%u transactions need to finish.",
- builder->running.xcnt,
- (uint32) builder->running.xcnt)));
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid)));
- /*
- * Iterate through all xids, wait for them to finish.
- *
- * This isn't required for the correctness of decoding, but to allow
- * isolationtester to notice that we're currently waiting for
- * something.
- */
- for (off = 0; off < builder->running.xcnt; off++)
- {
- TransactionId xid = builder->running.xip[off];
+ SnapBuildWaitSnapshot(running, running->nextXid);
+ }
+ /*
+ * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
+ *
+ * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
+ * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
+ * means all transactions starting afterwards have enough information to
+ * be decoded. Switch to FULL_SNAPSHOT.
+ */
+ else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+ running->oldestRunningXid))
+ {
+ builder->state = SNAPBUILD_FULL_SNAPSHOT;
+ SnapBuildStartNextPhaseAt(builder, running->nextXid);
- /*
- * Upper layers should prevent that we ever need to wait on
- * ourselves. Check anyway, since failing to do so would either
- * result in an endless wait or an Assert() failure.
- */
- if (TransactionIdIsCurrentTransactionId(xid))
- elog(ERROR, "waiting for ourselves");
+ ereport(LOG,
+ (errmsg("logical decoding found initial consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid)));
- XactLockTableWait(xid, NULL, NULL, XLTW_None);
- }
+ SnapBuildWaitSnapshot(running, running->nextXid);
+ }
+ /*
+ * c) transition from FULL_SNAPSHOT to CONSISTENT.
+ *
+ * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
+ * oldestRunningXid is >= than nextXid from when we switched to
+ * FULL_SNAPSHOT. This means all transactions that are currently in
+ * progress have a catalog snapshot, and all their changes have been
+ * collected. Switch to CONSISTENT.
+ */
+ else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
+ TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
+ running->oldestRunningXid))
+ {
+ builder->state = SNAPBUILD_CONSISTENT;
+ SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
- /* nothing could have built up so far, so don't perform cleanup */
- return false;
+ ereport(LOG,
+ (errmsg("logical decoding found consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("There are no old transactions anymore.")));
}
/*
@@ -1403,8 +1329,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* records so incremental cleanup can be performed.
*/
return true;
+
}
+/* ---
+ * Iterate through xids in record, wait for all older than the cutoff to
+ * finish. Then, if possible, log a new xl_running_xacts record.
+ *
+ * This isn't required for the correctness of decoding, but to:
+ * a) allow isolationtester to notice that we're currently waiting for
+ * something.
+ * b) log a new xl_running_xacts record where it'd be helpful, without having
+ * to write for bgwriter or checkpointer.
+ * ---
+ */
+static void
+SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
+{
+ int off;
+
+ for (off = 0; off < running->xcnt; off++)
+ {
+ TransactionId xid = running->xids[off];
+
+ /*
+ * Upper layers should prevent that we ever need to wait on
+ * ourselves. Check anyway, since failing to do so would either
+ * result in an endless wait or an Assert() failure.
+ */
+ if (TransactionIdIsCurrentTransactionId(xid))
+ elog(ERROR, "waiting for ourselves");
+
+ if (TransactionIdFollows(xid, cutoff))
+ continue;
+
+ XactLockTableWait(xid, NULL, NULL, XLTW_None);
+ }
+
+ /*
+ * All transactions we needed to finish finished - try to ensure there is
+ * another xl_running_xacts record in a timely manner, without having to
+ * write for bgwriter or checkpointer to log one. During recovery we
+ * can't enforce that, so we'll have to wait.
+ */
+ if (!RecoveryInProgress())
+ {
+ LogStandbySnapshot();
+ }
+}
/* -----------------------------------
* Snapshot serialization support
@@ -1554,7 +1526,6 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
errmsg("could not remove file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
- sizeof(TransactionId) * builder->running.xcnt_space +
sizeof(TransactionId) * builder->committed.xcnt;
ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
@@ -1573,18 +1544,14 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ondisk->builder.context = NULL;
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
- ondisk->builder.running.xip = NULL;
ondisk->builder.committed.xip = NULL;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
- /* copy running xacts */
- sz = sizeof(TransactionId) * builder->running.xcnt_space;
- memcpy(ondisk_c, builder->running.xip, sz);
- COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
- ondisk_c += sz;
+ /* there shouldn't be any running xacts */
+ Assert(builder->was_running.was_xcnt == 0);
/* copy committed xacts */
sz = sizeof(TransactionId) * builder->committed.xcnt;
@@ -1736,10 +1703,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
- /* restore running xacts information */
- sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
- ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
- readBytes = read(fd, ondisk.builder.running.xip, sz);
+ /* restore running xacts (dead, but kept for backward compat) */
+ sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
+ ondisk.builder.was_running.was_xip =
+ MemoryContextAllocZero(builder->context, sz);
+ readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
if (readBytes != sz)
{
CloseTransientFile(fd);
@@ -1748,7 +1716,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sz)));
}
- COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
+ COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
@@ -1812,12 +1780,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
ondisk.builder.committed.xip = NULL;
- builder->running.xcnt = ondisk.builder.running.xcnt;
- if (builder->running.xip)
- pfree(builder->running.xip);
- builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
- builder->running.xip = ondisk.builder.running.xip;
-
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
@@ -1837,8 +1799,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
return true;
snapshot_not_interesting:
- if (ondisk.builder.running.xip != NULL)
- pfree(ondisk.builder.running.xip);
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
return false;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 1df4f376344..80e91c9c254 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -20,24 +20,30 @@ typedef enum
/*
* Initial state, we can't do much yet.
*/
- SNAPBUILD_START,
+ SNAPBUILD_START = -1,
+
+ /*
+ * Collecting committed transactions, to build the initial catalog
+ * snapshot.
+ */
+ SNAPBUILD_BUILDING_SNAPSHOT = 0,
/*
* We have collected enough information to decode tuples in transactions
* that started after this.
*
* Once we reached this we start to collect changes. We cannot apply them
- * yet because the might be based on transactions that were still running
- * when we reached them yet.
+ * yet, because they might be based on transactions that were still running
+ * when FULL_SNAPSHOT was reached.
*/
- SNAPBUILD_FULL_SNAPSHOT,
+ SNAPBUILD_FULL_SNAPSHOT = 1,
/*
- * Found a point after hitting built_full_snapshot where all transactions
- * that were running at that point finished. Till we reach that we hold
- * off calling any commit callbacks.
+ * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
+ * were running at that point finished. Till we reach that we hold off
+ * calling any commit callbacks.
*/
- SNAPBUILD_CONSISTENT
+ SNAPBUILD_CONSISTENT = 2
} SnapBuildState;
/* forward declare so we don't have to expose the struct to the public */
@@ -72,9 +78,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
TransactionId *subxacts);
-extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
- TransactionId xid, int nsubxacts,
- TransactionId *subxacts);
extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn);
extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,