diff options
Diffstat (limited to 'src/backend/utils/time/snapmgr.c')
-rw-r--r-- | src/backend/utils/time/snapmgr.c | 210 |
1 files changed, 206 insertions, 4 deletions
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 7cfa0cf848e..a2cb4a037ff 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +/* + * Snapshot fields to be serialized. + * + * Only these fields need to be sent to the cooperating backend; the + * remaining ones can (and must) set by the receiver upon restore. + */ +typedef struct SerializedSnapshotData +{ + TransactionId xmin; + TransactionId xmax; + uint32 xcnt; + int32 subxcnt; + bool suboverflowed; + bool takenDuringRecovery; + CommandId curcid; +} SerializedSnapshotData; /* * GetTransactionSnapshot @@ -188,6 +204,10 @@ GetTransactionSnapshot(void) Assert(pairingheap_is_empty(&RegisteredSnapshots)); Assert(FirstXactSnapshot == NULL); + if (IsInParallelMode()) + elog(ERROR, + "cannot take query snapshot during a parallel operation"); + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must @@ -239,6 +259,14 @@ Snapshot GetLatestSnapshot(void) { /* + * We might be able to relax this, but nothing that could otherwise work + * needs it. + */ + if (IsInParallelMode()) + elog(ERROR, + "cannot update SecondarySnapshot during a parallel operation"); + + /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. */ @@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) +SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, + PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) * doesn't seem worth contorting the logic here to avoid two calls, * especially since it's not clear that predicate.c *must* do this. */ - if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + if (sourceproc != NULL) + { + if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source transaction is not running anymore."))); + } + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), @@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot) void UpdateActiveSnapshotCommandId(void) { + CommandId save_curcid, curcid; Assert(ActiveSnapshot != NULL); Assert(ActiveSnapshot->as_snap->active_count == 1); Assert(ActiveSnapshot->as_snap->regd_count == 0); - ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false); + /* + * Don't allow modification of the active snapshot during parallel + * operation. We share the snapshot to worker backends at beginning of + * parallel operation, so any change to snapshot can lead to + * inconsistencies. We have other defenses against + * CommandCounterIncrement, but there are a few places that call this + * directly, so we put an additional guard here. + */ + save_curcid = ActiveSnapshot->as_snap->curcid; + curcid = GetCurrentCommandId(false); + if (IsInParallelMode() && save_curcid != curcid) + elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); + ActiveSnapshot->as_snap->curcid = curcid; } /* @@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid); + SetTransactionSnapshot(&snapshot, src_xid, NULL); } /* @@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void) Assert(HistoricSnapshotActive()); return tuplecid_data; } + +/* + * EstimateSnapshotSpace + * Returns the size need to store the given snapshot. + * + * We are exporting only required fields from the Snapshot, stored in + * SerializedSnapshotData. + */ +Size +EstimateSnapshotSpace(Snapshot snap) +{ + Size size; + + Assert(snap != InvalidSnapshot); + Assert(snap->satisfies == HeapTupleSatisfiesMVCC); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = add_size(sizeof(SerializedSnapshotData), + mul_size(snap->xcnt, sizeof(TransactionId))); + if (snap->subxcnt > 0 && + (!snap->suboverflowed || snap->takenDuringRecovery)) + size = add_size(size, + mul_size(snap->subxcnt, sizeof(TransactionId))); + + return size; +} + +/* + * SerializeSnapshot + * Dumps the serialized snapshot (extracted from given snapshot) onto the + * memory location at start_address. + */ +void +SerializeSnapshot(Snapshot snapshot, char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + + Assert(snapshot->xcnt >= 0); + Assert(snapshot->subxcnt >= 0); + + serialized_snapshot = (SerializedSnapshotData *) start_address; + + /* Copy all required fields */ + serialized_snapshot->xmin = snapshot->xmin; + serialized_snapshot->xmax = snapshot->xmax; + serialized_snapshot->xcnt = snapshot->xcnt; + serialized_snapshot->subxcnt = snapshot->subxcnt; + serialized_snapshot->suboverflowed = snapshot->suboverflowed; + serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery; + serialized_snapshot->curcid = snapshot->curcid; + + /* + * Ignore the SubXID array if it has overflowed, unless the snapshot + * was taken during recovey - in that case, top-level XIDs are in subxip + * as well, and we mustn't lose them. + */ + if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery) + serialized_snapshot->subxcnt = 0; + + /* Copy XID array */ + if (snapshot->xcnt > 0) + memcpy((TransactionId *) (serialized_snapshot + 1), + snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); + + /* + * Copy SubXID array. Don't bother to copy it if it had overflowed, + * though, because it's not used anywhere in that case. Except if it's a + * snapshot taken during recovery; all the top-level XIDs are in subxip as + * well in that case, so we mustn't lose them. + */ + if (snapshot->subxcnt > 0) + { + Size subxipoff = sizeof(SerializedSnapshotData) + + snapshot->xcnt * sizeof(TransactionId); + + memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff), + snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); + } +} + +/* + * RestoreSnapshot + * Restore a serialized snapshot from the specified address. + * + * The copy is palloc'd in TopTransactionContext and has initial refcounts set + * to 0. The returned snapshot has the copied flag set. + */ +Snapshot +RestoreSnapshot(char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + Size size; + Snapshot snapshot; + TransactionId *serialized_xids; + + serialized_snapshot = (SerializedSnapshotData *) start_address; + serialized_xids = (TransactionId *) + (start_address + sizeof(SerializedSnapshotData)); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = sizeof(SnapshotData) + + serialized_snapshot->xcnt * sizeof(TransactionId) + + serialized_snapshot->subxcnt * sizeof(TransactionId); + + /* Copy all required fields */ + snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); + snapshot->satisfies = HeapTupleSatisfiesMVCC; + snapshot->xmin = serialized_snapshot->xmin; + snapshot->xmax = serialized_snapshot->xmax; + snapshot->xip = NULL; + snapshot->xcnt = serialized_snapshot->xcnt; + snapshot->subxip = NULL; + snapshot->subxcnt = serialized_snapshot->subxcnt; + snapshot->suboverflowed = serialized_snapshot->suboverflowed; + snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery; + snapshot->curcid = serialized_snapshot->curcid; + + /* Copy XIDs, if present. */ + if (serialized_snapshot->xcnt > 0) + { + snapshot->xip = (TransactionId *) (snapshot + 1); + memcpy(snapshot->xip, serialized_xids, + serialized_snapshot->xcnt * sizeof(TransactionId)); + } + + /* Copy SubXIDs, if present. */ + if (serialized_snapshot->subxcnt > 0) + { + snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt; + memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt, + serialized_snapshot->subxcnt * sizeof(TransactionId)); + } + + /* Set the copied flag so that the caller will set refcounts correctly. */ + snapshot->regd_count = 0; + snapshot->active_count = 0; + snapshot->copied = true; + + return snapshot; +} + +/* + * Install a restored snapshot as the transaction snapshot. + * + * The second argument is of type void * so that snapmgr.h need not include + * the declaration for PGPROC. + */ +void +RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) +{ + SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); +} |