aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
committerRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
commit924bcf4f16d54c55310b28f77686608684734f42 (patch)
tree79f35bf679c6a68dbe725cc90248a553f2b9e019 /src/backend/utils
parent669c7d20e6374850593cb430d332e11a3992bbcf (diff)
downloadpostgresql-924bcf4f16d54c55310b28f77686608684734f42.tar.gz
postgresql-924bcf4f16d54c55310b28f77686608684734f42.zip
Create an infrastructure for parallel computation in PostgreSQL.
This does four basic things. First, it provides convenience routines to coordinate the startup and shutdown of parallel workers. Second, it synchronizes various pieces of state (e.g. GUCs, combo CID mappings, transaction snapshot) from the parallel group leader to the worker processes. Third, it prohibits various operations that would result in unsafe changes to that state while parallelism is active. Finally, it propagates events that would result in an ErrorResponse, NoticeResponse, or NotifyResponse message being sent to the client from the parallel workers back to the master, from which they can then be sent on to the client. Robert Haas, Amit Kapila, Noah Misch, Rushabh Lathia, Jeevan Chalke. Suggestions and review from Andres Freund, Heikki Linnakangas, Noah Misch, Simon Riggs, Euler Taveira, and Jim Nasby.
Diffstat (limited to 'src/backend/utils')
-rw-r--r--src/backend/utils/adt/lockfuncs.c30
-rw-r--r--src/backend/utils/fmgr/dfmgr.c54
-rw-r--r--src/backend/utils/misc/guc.c23
-rw-r--r--src/backend/utils/time/combocid.c74
-rw-r--r--src/backend/utils/time/snapmgr.c210
5 files changed, 387 insertions, 4 deletions
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index a1967b69632..491824dd6bf 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/xact.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -411,6 +412,15 @@ pg_lock_status(PG_FUNCTION_ARGS)
#define SET_LOCKTAG_INT32(tag, key1, key2) \
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)
+static void
+PreventAdvisoryLocksInParallelMode(void)
+{
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot use advisory locks during a parallel operation")));
+}
+
/*
* pg_advisory_lock(int8) - acquire exclusive lock on an int8 key
*/
@@ -420,6 +430,7 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
@@ -437,6 +448,7 @@ pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ExclusiveLock, false, false);
@@ -453,6 +465,7 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ShareLock, true, false);
@@ -470,6 +483,7 @@ pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ShareLock, false, false);
@@ -489,6 +503,7 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, true, true);
@@ -509,6 +524,7 @@ pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, false, true);
@@ -528,6 +544,7 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ShareLock, true, true);
@@ -548,6 +565,7 @@ pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ShareLock, false, true);
@@ -567,6 +585,7 @@ pg_advisory_unlock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockRelease(&tag, ExclusiveLock, true);
@@ -586,6 +605,7 @@ pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockRelease(&tag, ShareLock, true);
@@ -603,6 +623,7 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
@@ -621,6 +642,7 @@ pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ExclusiveLock, false, false);
@@ -638,6 +660,7 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ShareLock, true, false);
@@ -656,6 +679,7 @@ pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ShareLock, false, false);
@@ -676,6 +700,7 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ExclusiveLock, true, true);
@@ -697,6 +722,7 @@ pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ExclusiveLock, false, true);
@@ -717,6 +743,7 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ShareLock, true, true);
@@ -738,6 +765,7 @@ pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ShareLock, false, true);
@@ -758,6 +786,7 @@ pg_advisory_unlock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockRelease(&tag, ExclusiveLock, true);
@@ -778,6 +807,7 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;
+ PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockRelease(&tag, ShareLock, true);
diff --git a/src/backend/utils/fmgr/dfmgr.c b/src/backend/utils/fmgr/dfmgr.c
index 7476a26b791..46bc1f238f2 100644
--- a/src/backend/utils/fmgr/dfmgr.c
+++ b/src/backend/utils/fmgr/dfmgr.c
@@ -23,6 +23,7 @@
#endif
#include "lib/stringinfo.h"
#include "miscadmin.h"
+#include "storage/shmem.h"
#include "utils/dynamic_loader.h"
#include "utils/hsearch.h"
@@ -692,3 +693,56 @@ find_rendezvous_variable(const char *varName)
return &hentry->varValue;
}
+
+/*
+ * Estimate the amount of space needed to serialize the list of libraries
+ * we have loaded.
+ */
+Size
+EstimateLibraryStateSpace(void)
+{
+ DynamicFileList *file_scanner;
+ Size size = 1;
+
+ for (file_scanner = file_list;
+ file_scanner != NULL;
+ file_scanner = file_scanner->next)
+ size = add_size(size, strlen(file_scanner->filename) + 1);
+
+ return size;
+}
+
+/*
+ * Serialize the list of libraries we have loaded to a chunk of memory.
+ */
+void
+SerializeLibraryState(Size maxsize, char *start_address)
+{
+ DynamicFileList *file_scanner;
+
+ for (file_scanner = file_list;
+ file_scanner != NULL;
+ file_scanner = file_scanner->next)
+ {
+ Size len;
+
+ len = strlcpy(start_address, file_scanner->filename, maxsize) + 1;
+ Assert(len < maxsize);
+ maxsize -= len;
+ start_address += len;
+ }
+ start_address[0] = '\0';
+}
+
+/*
+ * Load every library the serializing backend had loaded.
+ */
+void
+RestoreLibraryState(char *start_address)
+{
+ while (*start_address != '\0')
+ {
+ internal_load_library(start_address);
+ start_address += strlen(start_address) + 1;
+ }
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f43aff2d2cb..8727ee3b744 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -5665,6 +5665,20 @@ set_config_option(const char *name, const char *value,
elevel = ERROR;
}
+ /*
+ * GUC_ACTION_SAVE changes are acceptable during a parallel operation,
+ * because the current worker will also pop the change. We're probably
+ * dealing with a function having a proconfig entry. Only the function's
+ * body should observe the change, and peer workers do not share in the
+ * execution of a function call started by this worker.
+ *
+ * Other changes might need to affect other workers, so forbid them.
+ */
+ if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE)
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot set parameters during a parallel operation")));
+
record = find_option(name, true, elevel);
if (record == NULL)
{
@@ -6969,6 +6983,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
{
GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
+ /*
+ * Workers synchronize these parameters at the start of the parallel
+ * operation; then, we block SET during the operation.
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot set parameters during a parallel operation")));
+
switch (stmt->kind)
{
case VAR_SET_VALUE:
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
index bfd7d0ad426..cc5409b8803 100644
--- a/src/backend/utils/time/combocid.c
+++ b/src/backend/utils/time/combocid.c
@@ -44,6 +44,7 @@
#include "miscadmin.h"
#include "access/htup_details.h"
#include "access/xact.h"
+#include "storage/shmem.h"
#include "utils/combocid.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid)
Assert(combocid < usedComboCids);
return comboCids[combocid].cmax;
}
+
+/*
+ * Estimate the amount of space required to serialize the current ComboCID
+ * state.
+ */
+Size
+EstimateComboCIDStateSpace(void)
+{
+ Size size;
+
+ /* Add space required for saving usedComboCids */
+ size = sizeof(int);
+
+ /* Add space required for saving the combocids key */
+ size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+
+ return size;
+}
+
+/*
+ * Serialize the ComboCID state into the memory, beginning at start_address.
+ * maxsize should be at least as large as the value returned by
+ * EstimateComboCIDStateSpace.
+ */
+void
+SerializeComboCIDState(Size maxsize, char *start_address)
+{
+ char *endptr;
+
+ /* First, we store the number of currently-existing ComboCIDs. */
+ * (int *) start_address = usedComboCids;
+
+ /* If maxsize is too small, throw an error. */
+ endptr = start_address + sizeof(int) +
+ (sizeof(ComboCidKeyData) * usedComboCids);
+ if (endptr < start_address || endptr > start_address + maxsize)
+ elog(ERROR, "not enough space to serialize ComboCID state");
+
+ /* Now, copy the actual cmin/cmax pairs. */
+ memcpy(start_address + sizeof(int), comboCids,
+ (sizeof(ComboCidKeyData) * usedComboCids));
+}
+
+/*
+ * Read the ComboCID state at the specified address and initialize this
+ * backend with the same ComboCIDs. This is only valid in a backend that
+ * currently has no ComboCIDs (and only makes sense if the transaction state
+ * is serialized and restored as well).
+ */
+void
+RestoreComboCIDState(char *comboCIDstate)
+{
+ int num_elements;
+ ComboCidKeyData *keydata;
+ int i;
+ CommandId cid;
+
+ Assert(!comboCids && !comboHash);
+
+ /* First, we retrieve the number of ComboCIDs that were serialized. */
+ num_elements = * (int *) comboCIDstate;
+ keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
+
+ /* Use GetComboCommandId to restore each ComboCID. */
+ for (i = 0; i < num_elements; i++)
+ {
+ cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
+
+ /* Verify that we got the expected answer. */
+ if (cid != i)
+ elog(ERROR, "unexpected command ID while restoring combo CIDs");
+ }
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 7cfa0cf848e..a2cb4a037ff 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+ TransactionId xmin;
+ TransactionId xmax;
+ uint32 xcnt;
+ int32 subxcnt;
+ bool suboverflowed;
+ bool takenDuringRecovery;
+ CommandId curcid;
+} SerializedSnapshotData;
/*
* GetTransactionSnapshot
@@ -188,6 +204,10 @@ GetTransactionSnapshot(void)
Assert(pairingheap_is_empty(&RegisteredSnapshots));
Assert(FirstXactSnapshot == NULL);
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot take query snapshot during a parallel operation");
+
/*
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
@@ -239,6 +259,14 @@ Snapshot
GetLatestSnapshot(void)
{
/*
+ * We might be able to relax this, but nothing that could otherwise work
+ * needs it.
+ */
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot update SecondarySnapshot during a parallel operation");
+
+ /*
* So far there are no cases requiring support for GetLatestSnapshot()
* during logical decoding, but it wouldn't be hard to add if required.
*/
@@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid)
* in GetTransactionSnapshot.
*/
static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
+ PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
@@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
* doesn't seem worth contorting the logic here to avoid two calls,
* especially since it's not clear that predicate.c *must* do this.
*/
- if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+ if (sourceproc != NULL)
+ {
+ if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source transaction is not running anymore.")));
+ }
+ else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
@@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot)
void
UpdateActiveSnapshotCommandId(void)
{
+ CommandId save_curcid, curcid;
Assert(ActiveSnapshot != NULL);
Assert(ActiveSnapshot->as_snap->active_count == 1);
Assert(ActiveSnapshot->as_snap->regd_count == 0);
- ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
+ /*
+ * Don't allow modification of the active snapshot during parallel
+ * operation. We share the snapshot to worker backends at beginning of
+ * parallel operation, so any change to snapshot can lead to
+ * inconsistencies. We have other defenses against
+ * CommandCounterIncrement, but there are a few places that call this
+ * directly, so we put an additional guard here.
+ */
+ save_curcid = ActiveSnapshot->as_snap->curcid;
+ curcid = GetCurrentCommandId(false);
+ if (IsInParallelMode() && save_curcid != curcid)
+ elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+ ActiveSnapshot->as_snap->curcid = curcid;
}
/*
@@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr)
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
- SetTransactionSnapshot(&snapshot, src_xid);
+ SetTransactionSnapshot(&snapshot, src_xid, NULL);
}
/*
@@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void)
Assert(HistoricSnapshotActive());
return tuplecid_data;
}
+
+/*
+ * EstimateSnapshotSpace
+ * Returns the size need to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snap)
+{
+ Size size;
+
+ Assert(snap != InvalidSnapshot);
+ Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = add_size(sizeof(SerializedSnapshotData),
+ mul_size(snap->xcnt, sizeof(TransactionId)));
+ if (snap->subxcnt > 0 &&
+ (!snap->suboverflowed || snap->takenDuringRecovery))
+ size = add_size(size,
+ mul_size(snap->subxcnt, sizeof(TransactionId)));
+
+ return size;
+}
+
+/*
+ * SerializeSnapshot
+ * Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, char *start_address)
+{
+ SerializedSnapshotData *serialized_snapshot;
+
+ Assert(snapshot->xcnt >= 0);
+ Assert(snapshot->subxcnt >= 0);
+
+ serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+ /* Copy all required fields */
+ serialized_snapshot->xmin = snapshot->xmin;
+ serialized_snapshot->xmax = snapshot->xmax;
+ serialized_snapshot->xcnt = snapshot->xcnt;
+ serialized_snapshot->subxcnt = snapshot->subxcnt;
+ serialized_snapshot->suboverflowed = snapshot->suboverflowed;
+ serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
+ serialized_snapshot->curcid = snapshot->curcid;
+
+ /*
+ * Ignore the SubXID array if it has overflowed, unless the snapshot
+ * was taken during recovey - in that case, top-level XIDs are in subxip
+ * as well, and we mustn't lose them.
+ */
+ if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
+ serialized_snapshot->subxcnt = 0;
+
+ /* Copy XID array */
+ if (snapshot->xcnt > 0)
+ memcpy((TransactionId *) (serialized_snapshot + 1),
+ snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+ /*
+ * Copy SubXID array. Don't bother to copy it if it had overflowed,
+ * though, because it's not used anywhere in that case. Except if it's a
+ * snapshot taken during recovery; all the top-level XIDs are in subxip as
+ * well in that case, so we mustn't lose them.
+ */
+ if (snapshot->subxcnt > 0)
+ {
+ Size subxipoff = sizeof(SerializedSnapshotData) +
+ snapshot->xcnt * sizeof(TransactionId);
+
+ memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
+ snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+ }
+}
+
+/*
+ * RestoreSnapshot
+ * Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0. The returned snapshot has the copied flag set.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+ SerializedSnapshotData *serialized_snapshot;
+ Size size;
+ Snapshot snapshot;
+ TransactionId *serialized_xids;
+
+ serialized_snapshot = (SerializedSnapshotData *) start_address;
+ serialized_xids = (TransactionId *)
+ (start_address + sizeof(SerializedSnapshotData));
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = sizeof(SnapshotData)
+ + serialized_snapshot->xcnt * sizeof(TransactionId)
+ + serialized_snapshot->subxcnt * sizeof(TransactionId);
+
+ /* Copy all required fields */
+ snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+ snapshot->satisfies = HeapTupleSatisfiesMVCC;
+ snapshot->xmin = serialized_snapshot->xmin;
+ snapshot->xmax = serialized_snapshot->xmax;
+ snapshot->xip = NULL;
+ snapshot->xcnt = serialized_snapshot->xcnt;
+ snapshot->subxip = NULL;
+ snapshot->subxcnt = serialized_snapshot->subxcnt;
+ snapshot->suboverflowed = serialized_snapshot->suboverflowed;
+ snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
+ snapshot->curcid = serialized_snapshot->curcid;
+
+ /* Copy XIDs, if present. */
+ if (serialized_snapshot->xcnt > 0)
+ {
+ snapshot->xip = (TransactionId *) (snapshot + 1);
+ memcpy(snapshot->xip, serialized_xids,
+ serialized_snapshot->xcnt * sizeof(TransactionId));
+ }
+
+ /* Copy SubXIDs, if present. */
+ if (serialized_snapshot->subxcnt > 0)
+ {
+ snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt;
+ memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt,
+ serialized_snapshot->subxcnt * sizeof(TransactionId));
+ }
+
+ /* Set the copied flag so that the caller will set refcounts correctly. */
+ snapshot->regd_count = 0;
+ snapshot->active_count = 0;
+ snapshot->copied = true;
+
+ return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
+{
+ SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+}