aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/time/snapmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/time/snapmgr.c')
-rw-r--r--src/backend/utils/time/snapmgr.c210
1 files changed, 206 insertions, 4 deletions
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 7cfa0cf848e..a2cb4a037ff 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+ TransactionId xmin;
+ TransactionId xmax;
+ uint32 xcnt;
+ int32 subxcnt;
+ bool suboverflowed;
+ bool takenDuringRecovery;
+ CommandId curcid;
+} SerializedSnapshotData;
/*
* GetTransactionSnapshot
@@ -188,6 +204,10 @@ GetTransactionSnapshot(void)
Assert(pairingheap_is_empty(&RegisteredSnapshots));
Assert(FirstXactSnapshot == NULL);
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot take query snapshot during a parallel operation");
+
/*
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
@@ -239,6 +259,14 @@ Snapshot
GetLatestSnapshot(void)
{
/*
+ * We might be able to relax this, but nothing that could otherwise work
+ * needs it.
+ */
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot update SecondarySnapshot during a parallel operation");
+
+ /*
* So far there are no cases requiring support for GetLatestSnapshot()
* during logical decoding, but it wouldn't be hard to add if required.
*/
@@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot.
*/
static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
+ PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
@@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
* doesn't seem worth contorting the logic here to avoid two calls,
* especially since it's not clear that predicate.c *must* do this.
*/
- if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+ if (sourceproc != NULL)
+ {
+ if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source transaction is not running anymore.")));
+ }
+ else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
@@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot)
void
UpdateActiveSnapshotCommandId(void)
{
+ CommandId save_curcid, curcid;
Assert(ActiveSnapshot != NULL);
Assert(ActiveSnapshot->as_snap->active_count == 1);
Assert(ActiveSnapshot->as_snap->regd_count == 0);
- ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
+ /*
+ * Don't allow modification of the active snapshot during parallel
+ * operation. We share the snapshot to worker backends at beginning of
+ * parallel operation, so any change to snapshot can lead to
+ * inconsistencies. We have other defenses against
+ * CommandCounterIncrement, but there are a few places that call this
+ * directly, so we put an additional guard here.
+ */
+ save_curcid = ActiveSnapshot->as_snap->curcid;
+ curcid = GetCurrentCommandId(false);
+ if (IsInParallelMode() && save_curcid != curcid)
+ elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+ ActiveSnapshot->as_snap->curcid = curcid;
}
/*
@@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
- SetTransactionSnapshot(&snapshot, src_xid);
+ SetTransactionSnapshot(&snapshot, src_xid, NULL);
}
/*
@@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void)
Assert(HistoricSnapshotActive());
return tuplecid_data;
}
+
+/*
+ * EstimateSnapshotSpace
+ * Returns the size need to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snap)
+{
+ Size size;
+
+ Assert(snap != InvalidSnapshot);
+ Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = add_size(sizeof(SerializedSnapshotData),
+ mul_size(snap->xcnt, sizeof(TransactionId)));
+ if (snap->subxcnt > 0 &&
+ (!snap->suboverflowed || snap->takenDuringRecovery))
+ size = add_size(size,
+ mul_size(snap->subxcnt, sizeof(TransactionId)));
+
+ return size;
+}
+
+/*
+ * SerializeSnapshot
+ * Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, char *start_address)
+{
+ SerializedSnapshotData *serialized_snapshot;
+
+ Assert(snapshot->xcnt >= 0);
+ Assert(snapshot->subxcnt >= 0);
+
+ serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+ /* Copy all required fields */
+ serialized_snapshot->xmin = snapshot->xmin;
+ serialized_snapshot->xmax = snapshot->xmax;
+ serialized_snapshot->xcnt = snapshot->xcnt;
+ serialized_snapshot->subxcnt = snapshot->subxcnt;
+ serialized_snapshot->suboverflowed = snapshot->suboverflowed;
+ serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
+ serialized_snapshot->curcid = snapshot->curcid;
+
+ /*
+ * Ignore the SubXID array if it has overflowed, unless the snapshot
+ * was taken during recovey - in that case, top-level XIDs are in subxip
+ * as well, and we mustn't lose them.
+ */
+ if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
+ serialized_snapshot->subxcnt = 0;
+
+ /* Copy XID array */
+ if (snapshot->xcnt > 0)
+ memcpy((TransactionId *) (serialized_snapshot + 1),
+ snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+ /*
+ * Copy SubXID array. Don't bother to copy it if it had overflowed,
+ * though, because it's not used anywhere in that case. Except if it's a
+ * snapshot taken during recovery; all the top-level XIDs are in subxip as
+ * well in that case, so we mustn't lose them.
+ */
+ if (snapshot->subxcnt > 0)
+ {
+ Size subxipoff = sizeof(SerializedSnapshotData) +
+ snapshot->xcnt * sizeof(TransactionId);
+
+ memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
+ snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+ }
+}
+
+/*
+ * RestoreSnapshot
+ * Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0. The returned snapshot has the copied flag set.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+ SerializedSnapshotData *serialized_snapshot;
+ Size size;
+ Snapshot snapshot;
+ TransactionId *serialized_xids;
+
+ serialized_snapshot = (SerializedSnapshotData *) start_address;
+ serialized_xids = (TransactionId *)
+ (start_address + sizeof(SerializedSnapshotData));
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = sizeof(SnapshotData)
+ + serialized_snapshot->xcnt * sizeof(TransactionId)
+ + serialized_snapshot->subxcnt * sizeof(TransactionId);
+
+ /* Copy all required fields */
+ snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+ snapshot->satisfies = HeapTupleSatisfiesMVCC;
+ snapshot->xmin = serialized_snapshot->xmin;
+ snapshot->xmax = serialized_snapshot->xmax;
+ snapshot->xip = NULL;
+ snapshot->xcnt = serialized_snapshot->xcnt;
+ snapshot->subxip = NULL;
+ snapshot->subxcnt = serialized_snapshot->subxcnt;
+ snapshot->suboverflowed = serialized_snapshot->suboverflowed;
+ snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
+ snapshot->curcid = serialized_snapshot->curcid;
+
+ /* Copy XIDs, if present. */
+ if (serialized_snapshot->xcnt > 0)
+ {
+ snapshot->xip = (TransactionId *) (snapshot + 1);
+ memcpy(snapshot->xip, serialized_xids,
+ serialized_snapshot->xcnt * sizeof(TransactionId));
+ }
+
+ /* Copy SubXIDs, if present. */
+ if (serialized_snapshot->subxcnt > 0)
+ {
+ snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt;
+ memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt,
+ serialized_snapshot->subxcnt * sizeof(TransactionId));
+ }
+
+ /* Set the copied flag so that the caller will set refcounts correctly. */
+ snapshot->regd_count = 0;
+ snapshot->active_count = 0;
+ snapshot->copied = true;
+
+ return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
+{
+ SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+}