aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/Makefile2
-rw-r--r--src/backend/access/transam/README.parallel223
-rw-r--r--src/backend/access/transam/parallel.c1007
-rw-r--r--src/backend/access/transam/varsup.c7
-rw-r--r--src/backend/access/transam/xact.c486
-rw-r--r--src/backend/access/transam/xlog.c8
6 files changed, 1715 insertions, 18 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 9d4d5dbc975..94455b23f7e 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
+OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
xact.o xlog.o xlogarchive.o xlogfuncs.o \
xloginsert.o xlogreader.o xlogutils.o
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
new file mode 100644
index 00000000000..10051863fed
--- /dev/null
+++ b/src/backend/access/transam/README.parallel
@@ -0,0 +1,223 @@
+Overview
+========
+
+PostgreSQL provides some simple facilities to make writing parallel algorithms
+easier. Using a data structure called a ParallelContext, you can arrange to
+launch background worker processes, initialize their state to match that of
+the backend which initiated parallelism, communicate with them via dynamic
+shared memory, and write reasonably complex code that can run either in the
+user backend or in one of the parallel workers without needing to be aware of
+where it's running.
+
+The backend which starts a parallel operation (hereafter, the initiating
+backend) starts by creating a dynamic shared memory segment which will last
+for the lifetime of the parallel operation. This dynamic shared memory segment
+will contain (1) a shm_mq that can be used to transport errors (and other
+messages reported via elog/ereport) from the worker back to the initiating
+backend; (2) serialized representations of the initiating backend's private
+state, so that the worker can synchronize its state with of the initiating
+backend; and (3) any other data structures which a particular user of the
+ParallelContext data structure may wish to add for its own purposes. Once
+the initiating backend has initialized the dynamic shared memory segment, it
+asks the postmaster to launch the appropriate number of parallel workers.
+These workers then connect to the dynamic shared memory segment, initiate
+their state, and then invoke the appropriate entrypoint, as further detailed
+below.
+
+Error Reporting
+===============
+
+When started, each parallel worker begins by attaching the dynamic shared
+memory segment and locating the shm_mq to be used for error reporting; it
+redirects all of its protocol messages to this shm_mq. Prior to this point,
+any failure of the background worker will not be reported to the initiating
+backend; from the point of view of the initiating backend, the worker simply
+failed to start. The initiating backend must anyway be prepared to cope
+with fewer parallel workers than it originally requested, so catering to
+this case imposes no additional burden.
+
+Whenever a new message (or partial message; very large messages may wrap) is
+sent to the error-reporting queue, PROCSIG_PARALLEL_MESSAGE is sent to the
+initiating backend. This causes the next CHECK_FOR_INTERRUPTS() in the
+initiating backend to read and rethrow the message. For the most part, this
+makes error reporting in parallel mode "just work". Of course, to work
+properly, it is important that the code the initiating backend is executing
+CHECK_FOR_INTERRUPTS() regularly and avoid blocking interrupt processing for
+long periods of time, but those are good things to do anyway.
+
+(A currently-unsolved problem is that some messages may get written to the
+system log twice, once in the backend where the report was originally
+generated, and again when the initiating backend rethrows the message. If
+we decide to suppress one of these reports, it should probably be second one;
+otherwise, if the worker is for some reason unable to propagate the message
+back to the initiating backend, the message will be lost altogether.)
+
+State Sharing
+=============
+
+It's possible to write C code which works correctly without parallelism, but
+which fails when parallelism is used. No parallel infrastructure can
+completely eliminate this problem, because any global variable is a risk.
+There's no general mechanism for ensuring that every global variable in the
+worker will have the same value that it does in the initiating backend; even
+if we could ensure that, some function we're calling could update the variable
+after each call, and only the backend where that update is performed will see
+the new value. Similar problems can arise with any more-complex data
+structure we might choose to use. For example, a pseudo-random number
+generator should, given a particular seed value, produce the same predictable
+series of values every time. But it does this by relying on some private
+state which won't automatically be shared between cooperating backends. A
+parallel-safe PRNG would need to store its state in dynamic shared memory, and
+would require locking. The parallelism infrastructure has no way of knowing
+whether the user intends to call code that has this sort of problem, and can't
+do anything about it anyway.
+
+Instead, we take a more pragmatic approach. First, we try to make as many of
+the operations that are safe outside of parallel mode work correctly in
+parallel mode as well. Second, we try to prohibit common unsafe operations
+via suitable error checks. These checks are intended to catch 100% of
+unsafe things that a user might do from the SQL interface, but code written
+in C can do unsafe things that won't trigger these checks. The error checks
+are engaged via EnterParallelMode(), which should be called before creating
+a parallel context, and disarmed via ExitParallelMode(), which should be
+called after all parallel contexts have been destroyed. The most
+significant restriction imposed by parallel mode is that all operations must
+be strictly read-only; we allow no writes to the database and no DDL. We
+might try to relax these restrictions in the future.
+
+To make as many operations as possible safe in parallel mode, we try to copy
+the most important pieces of state from the initiating backend to each parallel
+worker. This includes:
+
+ - The set of libraries dynamically loaded by dfmgr.c.
+
+ - The authenticated user ID and current database. Each parallel worker
+ will connect to the same database as the initiating backend, using the
+ same user ID.
+
+ - The values of all GUCs. Accordingly, permanent changes to the value of
+ any GUC are forbidden while in parallel mode; but temporary changes,
+ such as entering a function with non-NULL proconfig, are OK.
+
+ - The current subtransaction's XID, the top-level transaction's XID, and
+ the list of XIDs considered current (that is, they are in-progress or
+ subcommitted). This information is needed to ensure that tuple visibility
+ checks return the same results in the worker as they do in the
+ initiating backend. See also the section Transaction Integration, below.
+
+ - The combo CID mappings. This is needed to ensure consistent answers to
+ tuple visibility checks. The need to synchronize this data structure is
+ a major reason why we can't support writes in parallel mode: such writes
+ might create new combo CIDs, and we have no way to let other workers
+ (or the initiating backend) know about them.
+
+ - The transaction snapshot.
+
+ - The active snapshot, which might be different from the transaction
+ snapshot.
+
+ - The currently active user ID and security context. Note that this is
+ the fourth user ID we restore: the initial step of binding to the correct
+ database also involves restoring the authenticated user ID. When GUC
+ values are restored, this incidentally sets SessionUserId and OuterUserId
+ to the correct values. This final step restores CurrentUserId.
+
+To prevent undetected or unprincipled deadlocks when running in parallel mode,
+this could should eventually handle heavyweight locks in some way. This is
+not implemented yet.
+
+Transaction Integration
+=======================
+
+Regardless of what the TransactionState stack looks like in the parallel
+leader, each parallel worker ends up with a stack of depth 1. This stack
+entry is marked with the special transaction block state
+TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary
+toplevel transaction. The XID of this TransactionState is set to the XID of
+the innermost currently-active subtransaction in the initiating backend. The
+initiating backend's toplevel XID, and the XIDs of all current (in-progress
+or subcommitted) XIDs are stored separately from the TransactionState stack,
+but in such a way that GetTopTransactionId(), GetTopTransactionIdIfAny(), and
+TransactionIdIsCurrentTransactionId() return the same values that they would
+in the initiating backend. We could copy the entire transaction state stack,
+but most of it would be useless: for example, you can't roll back to a
+savepoint from within a parallel worker, and there are no resources to
+associated with the memory contexts or resource owners of intermediate
+subtransactions.
+
+No meaningful change to the transaction state can be made while in parallel
+mode. No XIDs can be assigned, and no subtransactions can start or end,
+because we have no way of communicating these state changes to cooperating
+backends, or of synchronizing them. It's clearly unworkable for the initiating
+backend to exit any transaction or subtransaction that was in progress when
+parallelism was started before all parallel workers have exited; and it's even
+more clearly crazy for a parallel worker to try to subcommit or subabort the
+current subtransaction and execute in some other transaction context than was
+present in the initiating backend. It might be practical to allow internal
+sub-transactions (e.g. to implement a PL/pgsql EXCEPTION block) to be used in
+parallel mode, provided that they are XID-less, because other backends
+wouldn't really need to know about those transactions or do anything
+differently because of them. Right now, we don't even allow that.
+
+At the end of a parallel operation, which can happen either because it
+completed successfully or because it was interrupted by an error, parallel
+workers associated with that operation exit. In the error case, transaction
+abort processing in the parallel leader kills of any remaining workers, and
+the parallel leader then waits for them to die. In the case of a successful
+parallel operation, the parallel leader does not send any signals, but must
+wait for workers to complete and exit of their own volition. In either
+case, it is very important that all workers actually exit before the
+parallel leader cleans up the (sub)transaction in which they were created;
+otherwise, chaos can ensue. For example, if the leader is rolling back the
+transaction that created the relation being scanned by a worker, the
+relation could disappear while the worker is still busy scanning it. That's
+not safe.
+
+Generally, the cleanup performed by each worker at this point is similar to
+top-level commit or abort. Each backend has its own resource owners: buffer
+pins, catcache or relcache reference counts, tuple descriptors, and so on
+are managed separately by each backend, and must free them before exiting.
+There are, however, some important differences between parallel worker
+commit or abort and a real top-level transaction commit or abort. Most
+importantly:
+
+ - No commit or abort record is written; the initiating backend is
+ responsible for this.
+
+ - Cleanup of pg_temp namespaces is not done. Parallel workers cannot
+ safely access the initiating backend's pg_temp namespace, and should
+ not create one of their own.
+
+Coding Conventions
+===================
+
+Before beginning any parallel operation, call EnterParallelMode(); after all
+parallel operations are completed, call ExitParallelMode(). To actually
+parallelize a particular operation, use a ParallelContext. The basic coding
+pattern looks like this:
+
+ EnterParallelMode(); /* prohibit unsafe state changes */
+
+ pcxt = CreateParallelContext(entrypoint, nworkers);
+
+ /* Allow space for application-specific data here. */
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+ InitializeParallelDSM(pcxt); /* create DSM and copy state to it */
+
+ /* Store the data for which we reserved space. */
+ space = shm_toc_allocate(pcxt->toc, size);
+ shm_toc_insert(pcxt->toc, key, space);
+
+ LaunchParallelWorkers(pcxt);
+
+ /* do parallel stuff */
+
+ WaitForParallelWorkersToFinish(pcxt);
+
+ /* read any final results from dynamic shared memory */
+
+ DestroyParallelContext(pcxt);
+
+ ExitParallelMode();
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
new file mode 100644
index 00000000000..8ed7314b58c
--- /dev/null
+++ b/src/backend/access/transam/parallel.c
@@ -0,0 +1,1007 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ * Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/parallel.h"
+#include "commands/async.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "storage/spin.h"
+#include "tcop/tcopprot.h"
+#include "utils/combocid.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/snapmgr.h"
+
+/*
+ * We don't want to waste a lot of memory on an error queue which, most of
+ * the time, will process only a handful of small messages. However, it is
+ * desirable to make it large enough that a typical ErrorResponse can be sent
+ * without blocking. That way, a worker that errors out can write the whole
+ * message into the queue and terminate without waiting for the user backend.
+ */
+#define PARALLEL_ERROR_QUEUE_SIZE 16384
+
+/* Magic number for parallel context TOC. */
+#define PARALLEL_MAGIC 0x50477c7c
+
+/*
+ * Magic numbers for parallel state sharing. Higher-level code should use
+ * smaller values, leaving these very large ones for use by this module.
+ */
+#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
+#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
+#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
+#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
+#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
+#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
+#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
+#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
+
+/* Fixed-size parallel state. */
+typedef struct FixedParallelState
+{
+ /* Fixed-size state that workers must restore. */
+ Oid database_id;
+ Oid authenticated_user_id;
+ Oid current_user_id;
+ int sec_context;
+ PGPROC *parallel_master_pgproc;
+ pid_t parallel_master_pid;
+ BackendId parallel_master_backend_id;
+
+ /* Entrypoint for parallel workers. */
+ parallel_worker_main_type entrypoint;
+
+ /* Mutex protects remaining fields. */
+ slock_t mutex;
+
+ /* Track whether workers have attached. */
+ int workers_expected;
+ int workers_attached;
+
+ /* Maximum XactLastRecEnd of any worker. */
+ XLogRecPtr last_xlog_end;
+} FixedParallelState;
+
+/*
+ * Our parallel worker number. We initialize this to -1, meaning that we are
+ * not a parallel worker. In parallel workers, it will be set to a value >= 0
+ * and < the number of workers before any user code is invoked; each parallel
+ * worker will get a different parallel worker number.
+ */
+int ParallelWorkerNumber = -1;
+
+/* Is there a parallel message pending which we need to receive? */
+bool ParallelMessagePending = false;
+
+/* Pointer to our fixed parallel state. */
+static FixedParallelState *MyFixedParallelState;
+
+/* List of active parallel contexts. */
+static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+
+/* Private functions. */
+static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void ParallelErrorContext(void *arg);
+static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
+static void ParallelWorkerMain(Datum main_arg);
+
+/*
+ * Establish a new parallel context. This should be done after entering
+ * parallel mode, and (unless there is an error) the context should be
+ * destroyed before exiting the current subtransaction.
+ */
+ParallelContext *
+CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+{
+ MemoryContext oldcontext;
+ ParallelContext *pcxt;
+
+ /* It is unsafe to create a parallel context if not in parallel mode. */
+ Assert(IsInParallelMode());
+
+ /* Number of workers should be non-negative. */
+ Assert(nworkers >= 0);
+
+ /*
+ * If dynamic shared memory is not available, we won't be able to use
+ * background workers.
+ */
+ if (dynamic_shared_memory_type == DSM_IMPL_NONE)
+ nworkers = 0;
+
+ /* We might be running in a short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Initialize a new ParallelContext. */
+ pcxt = palloc0(sizeof(ParallelContext));
+ pcxt->subid = GetCurrentSubTransactionId();
+ pcxt->nworkers = nworkers;
+ pcxt->entrypoint = entrypoint;
+ pcxt->error_context_stack = error_context_stack;
+ shm_toc_initialize_estimator(&pcxt->estimator);
+ dlist_push_head(&pcxt_list, &pcxt->node);
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ return pcxt;
+}
+
+/*
+ * Establish a new parallel context that calls a function provided by an
+ * extension. This works around the fact that the library might get mapped
+ * at a different address in each backend.
+ */
+ParallelContext *
+CreateParallelContextForExternalFunction(char *library_name,
+ char *function_name,
+ int nworkers)
+{
+ MemoryContext oldcontext;
+ ParallelContext *pcxt;
+
+ /* We might be running in a very short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Create the context. */
+ pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
+ pcxt->library_name = pstrdup(library_name);
+ pcxt->function_name = pstrdup(function_name);
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ return pcxt;
+}
+
+/*
+ * Establish the dynamic shared memory segment for a parallel context and
+ * copied state and other bookkeeping information that will need by parallel
+ * workers into it.
+ */
+void
+InitializeParallelDSM(ParallelContext *pcxt)
+{
+ MemoryContext oldcontext;
+ Size library_len = 0;
+ Size guc_len = 0;
+ Size combocidlen = 0;
+ Size tsnaplen = 0;
+ Size asnaplen = 0;
+ Size tstatelen = 0;
+ Size segsize = 0;
+ int i;
+ FixedParallelState *fps;
+ Snapshot transaction_snapshot = GetTransactionSnapshot();
+ Snapshot active_snapshot = GetActiveSnapshot();
+
+ /* We might be running in a very short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Allow space to store the fixed-size parallel state. */
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /*
+ * Normally, the user will have requested at least one worker process,
+ * but if by chance they have not, we can skip a bunch of things here.
+ */
+ if (pcxt->nworkers > 0)
+ {
+ /* Estimate space for various kinds of state sharing. */
+ library_len = EstimateLibraryStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, library_len);
+ guc_len = EstimateGUCStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
+ combocidlen = EstimateComboCIDStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
+ tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+ asnaplen = EstimateSnapshotSpace(active_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+ tstatelen = EstimateTransactionStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+ /* If you add more chunks here, you probably need to add keys. */
+ shm_toc_estimate_keys(&pcxt->estimator, 6);
+
+ /* Estimate space need for error queues. */
+ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
+ PARALLEL_ERROR_QUEUE_SIZE,
+ "parallel error queue size not buffer-aligned");
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate how much we'll need for extension entrypoint info. */
+ if (pcxt->library_name != NULL)
+ {
+ Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
+ Assert(pcxt->function_name != NULL);
+ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+ + strlen(pcxt->function_name) + 2);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ }
+
+ /*
+ * Create DSM and initialize with new table of contents. But if the user
+ * didn't request any workers, then don't bother creating a dynamic shared
+ * memory segment; instead, just use backend-private memory.
+ *
+ * Also, if we can't create a dynamic shared memory segment because the
+ * maximum number of segments have already been created, then fall back
+ * to backend-private memory, and plan not to use any workers. We hope
+ * this won't happen very often, but it's better to abandon the use of
+ * parallelism than to fail outright.
+ */
+ segsize = shm_toc_estimate(&pcxt->estimator);
+ if (pcxt->nworkers != 0)
+ pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+ if (pcxt->seg != NULL)
+ pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
+ dsm_segment_address(pcxt->seg),
+ segsize);
+ else
+ {
+ pcxt->nworkers = 0;
+ pcxt->private = MemoryContextAlloc(TopMemoryContext, segsize);
+ pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private, segsize);
+ }
+
+ /* Initialize fixed-size state in shared memory. */
+ fps = (FixedParallelState *)
+ shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
+ fps->database_id = MyDatabaseId;
+ fps->authenticated_user_id = GetAuthenticatedUserId();
+ GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
+ fps->parallel_master_pgproc = MyProc;
+ fps->parallel_master_pid = MyProcPid;
+ fps->parallel_master_backend_id = MyBackendId;
+ fps->entrypoint = pcxt->entrypoint;
+ SpinLockInit(&fps->mutex);
+ fps->workers_expected = pcxt->nworkers;
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
+
+ /* We can skip the rest of this if we're not budgeting for any workers. */
+ if (pcxt->nworkers > 0)
+ {
+ char *libraryspace;
+ char *gucspace;
+ char *combocidspace;
+ char *tsnapspace;
+ char *asnapspace;
+ char *tstatespace;
+ char *error_queue_space;
+
+ /* Serialize shared libraries we have loaded. */
+ libraryspace = shm_toc_allocate(pcxt->toc, library_len);
+ SerializeLibraryState(library_len, libraryspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
+
+ /* Serialize GUC settings. */
+ gucspace = shm_toc_allocate(pcxt->toc, guc_len);
+ SerializeGUCState(guc_len, gucspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
+
+ /* Serialize combo CID state. */
+ combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
+ SerializeComboCIDState(combocidlen, combocidspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+
+ /* Serialize transaction snapshot and active snapshot. */
+ tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+ SerializeSnapshot(transaction_snapshot, tsnapspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
+ tsnapspace);
+ asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+ SerializeSnapshot(active_snapshot, asnapspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+
+ /* Serialize transaction state. */
+ tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
+ SerializeTransactionState(tstatelen, tstatespace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
+
+ /* Allocate space for worker information. */
+ pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
+
+ /*
+ * Establish error queues in dynamic shared memory.
+ *
+ * These queues should be used only for transmitting ErrorResponse,
+ * NoticeResponse, and NotifyResponse protocol messages. Tuple data
+ * should be transmitted via separate (possibly larger?) queues.
+ */
+ error_queue_space =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
+
+ /* Serialize extension entrypoint information. */
+ if (pcxt->library_name != NULL)
+ {
+ Size lnamelen = strlen(pcxt->library_name);
+ char *extensionstate;
+
+ extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
+ + strlen(pcxt->function_name) + 2);
+ strcpy(extensionstate, pcxt->library_name);
+ strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
+ extensionstate);
+ }
+ }
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Launch parallel workers.
+ */
+void
+LaunchParallelWorkers(ParallelContext *pcxt)
+{
+ MemoryContext oldcontext;
+ BackgroundWorker worker;
+ int i;
+ bool any_registrations_failed = false;
+
+ /* Skip this if we have no workers. */
+ if (pcxt->nworkers == 0)
+ return;
+
+ /* If we do have workers, we'd better have a DSM segment. */
+ Assert(pcxt->seg != NULL);
+
+ /* We might be running in a short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Configure a worker. */
+ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
+ MyProcPid);
+ worker.bgw_flags =
+ BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_main = ParallelWorkerMain;
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
+ worker.bgw_notify_pid = MyProcPid;
+
+ /*
+ * Start workers.
+ *
+ * The caller must be able to tolerate ending up with fewer workers than
+ * expected, so there is no need to throw an error here if registration
+ * fails. It wouldn't help much anyway, because registering the worker
+ * in no way guarantees that it will start up and initialize successfully.
+ */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (!any_registrations_failed &&
+ RegisterDynamicBackgroundWorker(&worker,
+ &pcxt->worker[i].bgwhandle))
+ shm_mq_set_handle(pcxt->worker[i].error_mqh,
+ pcxt->worker[i].bgwhandle);
+ else
+ {
+ /*
+ * If we weren't able to register the worker, then we've bumped
+ * up against the max_worker_processes limit, and future
+ * registrations will probably fail too, so arrange to skip them.
+ * But we still have to execute this code for the remaining slots
+ * to make sure that we forget about the error queues we budgeted
+ * for those workers. Otherwise, we'll wait for them to start,
+ * but they never will.
+ */
+ any_registrations_failed = true;
+ pcxt->worker[i].bgwhandle = NULL;
+ pcxt->worker[i].error_mqh = NULL;
+ }
+ }
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Wait for all workers to exit.
+ *
+ * Even if the parallel operation seems to have completed successfully, it's
+ * important to call this function afterwards. We must not miss any errors
+ * the workers may have thrown during the parallel operation, or any that they
+ * may yet throw while shutting down.
+ *
+ * Also, we want to update our notion of XactLastRecEnd based on worker
+ * feedback.
+ */
+void
+WaitForParallelWorkersToFinish(ParallelContext *pcxt)
+{
+ for (;;)
+ {
+ bool anyone_alive = false;
+ int i;
+
+ /*
+ * This will process any parallel messages that are pending, which
+ * may change the outcome of the loop that follows. It may also
+ * throw an error propagated from a worker.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].error_mqh != NULL)
+ {
+ anyone_alive = true;
+ break;
+ }
+ }
+
+ if (!anyone_alive)
+ break;
+
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+ ResetLatch(&MyProc->procLatch);
+ }
+
+ if (pcxt->toc != NULL)
+ {
+ FixedParallelState *fps;
+
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ if (fps->last_xlog_end > XactLastRecEnd)
+ XactLastRecEnd = fps->last_xlog_end;
+ }
+}
+
+/*
+ * Destroy a parallel context.
+ *
+ * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
+ * first, before calling this function. When this function is invoked, any
+ * remaining workers are forcibly killed; the dynamic shared memory segment
+ * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
+ */
+void
+DestroyParallelContext(ParallelContext *pcxt)
+{
+ int i;
+
+ /*
+ * Be careful about order of operations here! We remove the parallel
+ * context from the list before we do anything else; otherwise, if an
+ * error occurs during a subsequent step, we might try to nuke it again
+ * from AtEOXact_Parallel or AtEOSubXact_Parallel.
+ */
+ dlist_delete(&pcxt->node);
+
+ /* Kill each worker in turn, and forget their error queues. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].bgwhandle != NULL)
+ TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+ if (pcxt->worker[i].error_mqh != NULL)
+ {
+ pfree(pcxt->worker[i].error_mqh);
+ pcxt->worker[i].error_mqh = NULL;
+ }
+ }
+
+ /*
+ * If we have allocated a shared memory segment, detach it. This will
+ * implicitly detach the error queues, and any other shared memory queues,
+ * stored there.
+ */
+ if (pcxt->seg != NULL)
+ {
+ dsm_detach(pcxt->seg);
+ pcxt->seg = NULL;
+ }
+
+ /*
+ * If this parallel context is actually in backend-private memory rather
+ * than shared memory, free that memory instead.
+ */
+ if (pcxt->private != NULL)
+ {
+ pfree(pcxt->private);
+ pcxt->private = NULL;
+ }
+
+ /* Wait until the workers actually die. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ BgwHandleStatus status;
+
+ if (pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ /*
+ * We can't finish transaction commit or abort until all of the
+ * workers are dead. This means, in particular, that we can't respond
+ * to interrupts at this stage.
+ */
+ HOLD_INTERRUPTS();
+ status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+ RESUME_INTERRUPTS();
+
+ /*
+ * If the postmaster kicked the bucket, we have no chance of cleaning
+ * up safely -- we won't be able to tell when our workers are actually
+ * dead. This doesn't necessitate a PANIC since they will all abort
+ * eventually, but we can't safely continue this session.
+ */
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during a parallel transaction")));
+
+ /* Release memory. */
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+
+ /* Free the worker array itself. */
+ if (pcxt->worker != NULL)
+ {
+ pfree(pcxt->worker);
+ pcxt->worker = NULL;
+ }
+
+ /* Free memory. */
+ pfree(pcxt);
+}
+
+/*
+ * Are there any parallel contexts currently active?
+ */
+bool
+ParallelContextActive(void)
+{
+ return !dlist_is_empty(&pcxt_list);
+}
+
+/*
+ * Handle receipt of an interrupt indicating a parallel worker message.
+ */
+void
+HandleParallelMessageInterrupt(void)
+{
+ int save_errno = errno;
+
+ InterruptPending = true;
+ ParallelMessagePending = true;
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * Handle any queued protocol messages received from parallel workers.
+ */
+void
+HandleParallelMessages(void)
+{
+ dlist_iter iter;
+
+ ParallelMessagePending = false;
+
+ dlist_foreach(iter, &pcxt_list)
+ {
+ ParallelContext *pcxt;
+ int i;
+ Size nbytes;
+ void *data;
+
+ pcxt = dlist_container(ParallelContext, node, iter.cur);
+ if (pcxt->worker == NULL)
+ continue;
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ /*
+ * Read as many messages as we can from each worker, but stop
+ * when either (1) the error queue goes away, which can happen if
+ * we receive a Terminate message from the worker; or (2) no more
+ * messages can be read from the worker without blocking.
+ */
+ while (pcxt->worker[i].error_mqh != NULL)
+ {
+ shm_mq_result res;
+
+ res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
+ &data, true);
+ if (res == SHM_MQ_WOULD_BLOCK)
+ break;
+ else if (res == SHM_MQ_SUCCESS)
+ {
+ StringInfoData msg;
+
+ initStringInfo(&msg);
+ appendBinaryStringInfo(&msg, data, nbytes);
+ HandleParallelMessage(pcxt, i, &msg);
+ pfree(msg.data);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
+ errmsg("lost connection to parallel worker")));
+
+ /* This might make the error queue go away. */
+ CHECK_FOR_INTERRUPTS();
+ }
+ }
+ }
+}
+
+/*
+ * Handle a single protocol message received from a single parallel worker.
+ */
+static void
+HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
+{
+ char msgtype;
+
+ msgtype = pq_getmsgbyte(msg);
+
+ switch (msgtype)
+ {
+ case 'K': /* BackendKeyData */
+ {
+ int32 pid = pq_getmsgint(msg, 4);
+ (void) pq_getmsgint(msg, 4); /* discard cancel key */
+ (void) pq_getmsgend(msg);
+ pcxt->worker[i].pid = pid;
+ break;
+ }
+
+ case 'E': /* ErrorResponse */
+ case 'N': /* NoticeResponse */
+ {
+ ErrorData edata;
+ ErrorContextCallback errctx;
+ ErrorContextCallback *save_error_context_stack;
+
+ /*
+ * Rethrow the error using the error context callbacks that
+ * were in effect when the context was created, not the
+ * current ones.
+ */
+ save_error_context_stack = error_context_stack;
+ errctx.callback = ParallelErrorContext;
+ errctx.arg = &pcxt->worker[i].pid;
+ errctx.previous = pcxt->error_context_stack;
+ error_context_stack = &errctx;
+
+ /* Parse ErrorReponse or NoticeResponse. */
+ pq_parse_errornotice(msg, &edata);
+
+ /* Death of a worker isn't enough justification for suicide. */
+ edata.elevel = Min(edata.elevel, ERROR);
+
+ /* Rethrow error or notice. */
+ ThrowErrorData(&edata);
+
+ /* Restore previous context. */
+ error_context_stack = save_error_context_stack;
+
+ break;
+ }
+
+ case 'A': /* NotifyResponse */
+ {
+ /* Propagate NotifyResponse. */
+ pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+ break;
+ }
+
+ case 'X': /* Terminate, indicating clean exit */
+ {
+ pfree(pcxt->worker[i].bgwhandle);
+ pfree(pcxt->worker[i].error_mqh);
+ pcxt->worker[i].bgwhandle = NULL;
+ pcxt->worker[i].error_mqh = NULL;
+ break;
+ }
+
+ default:
+ {
+ elog(ERROR, "unknown message type: %c (%d bytes)",
+ msgtype, msg->len);
+ }
+ }
+}
+
+/*
+ * End-of-subtransaction cleanup for parallel contexts.
+ *
+ * Currently, it's forbidden to enter or leave a subtransaction while
+ * parallel mode is in effect, so we could just blow away everything. But
+ * we may want to relax that restriction in the future, so this code
+ * contemplates that there may be multiple subtransaction IDs in pcxt_list.
+ */
+void
+AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
+{
+ while (!dlist_is_empty(&pcxt_list))
+ {
+ ParallelContext *pcxt;
+
+ pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+ if (pcxt->subid != mySubId)
+ break;
+ if (isCommit)
+ elog(WARNING, "leaked parallel context");
+ DestroyParallelContext(pcxt);
+ }
+}
+
+/*
+ * End-of-transaction cleanup for parallel contexts.
+ */
+void
+AtEOXact_Parallel(bool isCommit)
+{
+ while (!dlist_is_empty(&pcxt_list))
+ {
+ ParallelContext *pcxt;
+
+ pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+ if (isCommit)
+ elog(WARNING, "leaked parallel context");
+ DestroyParallelContext(pcxt);
+ }
+}
+
+/*
+ * Main entrypoint for parallel workers.
+ */
+static void
+ParallelWorkerMain(Datum main_arg)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ FixedParallelState *fps;
+ char *error_queue_space;
+ shm_mq *mq;
+ shm_mq_handle *mqh;
+ char *libraryspace;
+ char *gucspace;
+ char *combocidspace;
+ char *tsnapspace;
+ char *asnapspace;
+ char *tstatespace;
+ StringInfoData msgbuf;
+
+ /* Establish signal handlers. */
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /* Set up a memory context and resource owner. */
+ Assert(CurrentResourceOwner == NULL);
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+ CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+ "parallel worker",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * Now that we have a resource owner, we can attach to the dynamic
+ * shared memory segment and read the table of contents.
+ */
+ seg = dsm_attach(DatumGetUInt32(main_arg));
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
+ toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
+ /* Determine and set our worker number. */
+ fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
+ Assert(fps != NULL);
+ Assert(ParallelWorkerNumber == -1);
+ SpinLockAcquire(&fps->mutex);
+ if (fps->workers_attached < fps->workers_expected)
+ ParallelWorkerNumber = fps->workers_attached++;
+ SpinLockRelease(&fps->mutex);
+ if (ParallelWorkerNumber < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("too many parallel workers already attached")));
+ MyFixedParallelState = fps;
+
+ /*
+ * Now that we have a worker number, we can find and attach to the error
+ * queue provided for us. That's good, because until we do that, any
+ * errors that happen here will not be reported back to the process that
+ * requested that this worker be launched.
+ */
+ error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
+ mq = (shm_mq *) (error_queue_space +
+ ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_sender(mq, MyProc);
+ mqh = shm_mq_attach(mq, seg, NULL);
+ pq_redirect_to_shm_mq(mq, mqh);
+ pq_set_parallel_master(fps->parallel_master_pid,
+ fps->parallel_master_backend_id);
+
+ /*
+ * Send a BackendKeyData message to the process that initiated parallelism
+ * so that it has access to our PID before it receives any other messages
+ * from us. Our cancel key is sent, too, since that's the way the protocol
+ * message is defined, but it won't actually be used for anything in this
+ * case.
+ */
+ pq_beginmessage(&msgbuf, 'K');
+ pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
+ pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
+ pq_endmessage(&msgbuf);
+
+ /*
+ * Hooray! Primary initialization is complete. Now, we need to set up
+ * our backend-local state to match the original backend.
+ */
+
+ /*
+ * Load libraries that were loaded by original backend. We want to do this
+ * before restoring GUCs, because the libraries might define custom
+ * variables.
+ */
+ libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY);
+ Assert(libraryspace != NULL);
+ RestoreLibraryState(libraryspace);
+
+ /* Restore database connection. */
+ BackgroundWorkerInitializeConnectionByOid(fps->database_id,
+ fps->authenticated_user_id);
+
+ /* Restore GUC values from launching backend. */
+ gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
+ Assert(gucspace != NULL);
+ StartTransactionCommand();
+ RestoreGUCState(gucspace);
+ CommitTransactionCommand();
+
+ /* Crank up a transaction state appropriate to a parallel worker. */
+ tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE);
+ StartParallelWorkerTransaction(tstatespace);
+
+ /* Restore combo CID state. */
+ combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
+ Assert(combocidspace != NULL);
+ RestoreComboCIDState(combocidspace);
+
+ /* Restore transaction snapshot. */
+ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
+ Assert(tsnapspace != NULL);
+ RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
+ fps->parallel_master_pgproc);
+
+ /* Restore active snapshot. */
+ asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
+ Assert(asnapspace != NULL);
+ PushActiveSnapshot(RestoreSnapshot(asnapspace));
+
+ /* Restore user ID and security context. */
+ SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
+
+ /*
+ * We've initialized all of our state now; nothing should change hereafter.
+ */
+ EnterParallelMode();
+
+ /*
+ * Time to do the real work: invoke the caller-supplied code.
+ *
+ * If you get a crash at this line, see the comments for
+ * ParallelExtensionTrampoline.
+ */
+ fps->entrypoint(seg, toc);
+
+ /* Must exit parallel mode to pop active snapshot. */
+ ExitParallelMode();
+
+ /* Must pop active snapshot so resowner.c doesn't complain. */
+ PopActiveSnapshot();
+
+ /* Shut down the parallel-worker transaction. */
+ EndParallelWorkerTransaction();
+
+ /* Report success. */
+ pq_putmessage('X', NULL, 0);
+}
+
+/*
+ * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
+ * function living in a dynamically loaded module, because the module might
+ * not be loaded in every process, or might be loaded but not at the same
+ * address. To work around that problem, CreateParallelContextForExtension()
+ * arranges to call this function rather than calling the extension-provided
+ * function directly; and this function then looks up the real entrypoint and
+ * calls it.
+ */
+static void
+ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
+{
+ char *extensionstate;
+ char *library_name;
+ char *function_name;
+ parallel_worker_main_type entrypt;
+
+ extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
+ Assert(extensionstate != NULL);
+ library_name = extensionstate;
+ function_name = extensionstate + strlen(library_name) + 1;
+
+ entrypt = (parallel_worker_main_type)
+ load_external_function(library_name, function_name, true, NULL);
+ entrypt(seg, toc);
+}
+
+/*
+ * Give the user a hint that this is a message propagated from a parallel
+ * worker. Otherwise, it can sometimes be confusing to understand what
+ * actually happened.
+ */
+static void
+ParallelErrorContext(void *arg)
+{
+ errcontext("parallel worker, pid %d", * (int32 *) arg);
+}
+
+/*
+ * Update shared memory with the ending location of the last WAL record we
+ * wrote, if it's greater than the value already stored there.
+ */
+void
+ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
+{
+ FixedParallelState *fps = MyFixedParallelState;
+
+ Assert(fps != NULL);
+ SpinLockAcquire(&fps->mutex);
+ if (fps->last_xlog_end < last_xlog_end)
+ fps->last_xlog_end = last_xlog_end;
+ SpinLockRelease(&fps->mutex);
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 42ee57fe8d7..cf3e964fc6e 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact)
TransactionId xid;
/*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for new XIDs after that point.
+ */
+ if (IsInParallelMode())
+ elog(ERROR, "cannot assign TransactionIds during a parallel operation");
+
+ /*
* During bootstrap initialization, we return the special bootstrap
* transaction id.
*/
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 511bcbbc519..a8f78d63762 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -22,6 +22,7 @@
#include "access/commit_ts.h"
#include "access/multixact.h"
+#include "access/parallel.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -51,6 +52,7 @@
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/combocid.h"
#include "utils/guc.h"
@@ -78,6 +80,33 @@ bool XactDeferrable;
int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
/*
+ * When running as a parallel worker, we place only a single
+ * TransactionStateData on the parallel worker's state stack, and the XID
+ * reflected there will be that of the *innermost* currently-active
+ * subtransaction in the backend that initiated paralllelism. However,
+ * GetTopTransactionId() and TransactionIdIsCurrentTransactionId()
+ * need to return the same answers in the parallel worker as they would have
+ * in the user backend, so we need some additional bookkeeping.
+ *
+ * XactTopTransactionId stores the XID of our toplevel transaction, which
+ * will be the same as TopTransactionState.transactionId in an ordinary
+ * backend; but in a parallel backend, which does not have the entire
+ * transaction state, it will instead be copied from the backend that started
+ * the parallel operation.
+ *
+ * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary
+ * backend, but in a parallel backend, nParallelCurrentXids will contain the
+ * number of XIDs that need to be considered current, and ParallelCurrentXids
+ * will contain the XIDs themselves. This includes all XIDs that were current
+ * or sub-committed in the parent at the time the parallel operation began.
+ * The XIDs are stored sorted in numerical order (not logical order) to make
+ * lookups as fast as possible.
+ */
+TransactionId XactTopTransactionId = InvalidTransactionId;
+int nParallelCurrentXids = 0;
+TransactionId *ParallelCurrentXids;
+
+/*
* MyXactAccessedTempRel is set when a temporary relation is accessed.
* We don't allow PREPARE TRANSACTION in that case. (This is global
* so that it can be set from heapam.c.)
@@ -113,6 +142,7 @@ typedef enum TBlockState
/* transaction block states */
TBLOCK_BEGIN, /* starting transaction block */
TBLOCK_INPROGRESS, /* live transaction */
+ TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */
TBLOCK_END, /* COMMIT received */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
@@ -154,6 +184,7 @@ typedef struct TransactionStateData
bool prevXactReadOnly; /* entry-time xact r/o state */
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
+ int parallelModeLevel; /* Enter/ExitParallelMode counter */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -184,6 +215,7 @@ static TransactionStateData TopTransactionStateData = {
false, /* entry-time xact r/o state */
false, /* startedInRecovery */
false, /* didLogXid */
+ 0, /* parallelMode */
NULL /* link to parent state block */
};
@@ -353,9 +385,9 @@ IsAbortedTransactionBlockState(void)
TransactionId
GetTopTransactionId(void)
{
- if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
+ if (!TransactionIdIsValid(XactTopTransactionId))
AssignTransactionId(&TopTransactionStateData);
- return TopTransactionStateData.transactionId;
+ return XactTopTransactionId;
}
/*
@@ -368,7 +400,7 @@ GetTopTransactionId(void)
TransactionId
GetTopTransactionIdIfAny(void)
{
- return TopTransactionStateData.transactionId;
+ return XactTopTransactionId;
}
/*
@@ -462,6 +494,13 @@ AssignTransactionId(TransactionState s)
Assert(s->state == TRANS_INPROGRESS);
/*
+ * Workers synchronize transaction state at the beginning of each
+ * parallel operation, so we can't account for new XIDs at this point.
+ */
+ if (IsInParallelMode())
+ elog(ERROR, "cannot assign XIDs during a parallel operation");
+
+ /*
* Ensure parent(s) have XIDs, so that a child always has an XID later
* than its parent. Musn't recurse here, or we might get a stack overflow
* if we're at the bottom of a huge stack of subtransactions none of which
@@ -513,6 +552,8 @@ AssignTransactionId(TransactionState s)
* the Xid as "running". See GetNewTransactionId.
*/
s->transactionId = GetNewTransactionId(isSubXact);
+ if (!isSubXact)
+ XactTopTransactionId = s->transactionId;
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId, false);
@@ -644,7 +685,16 @@ GetCurrentCommandId(bool used)
{
/* this is global to a transaction, not subtransaction-local */
if (used)
+ {
+ /*
+ * Forbid setting currentCommandIdUsed in parallel mode, because we
+ * have no provision for communicating this back to the master. We
+ * could relax this restriction when currentCommandIdUsed was already
+ * true at the start of the parallel operation.
+ */
+ Assert(CurrentTransactionState->parallelModeLevel == 0);
currentCommandIdUsed = true;
+ }
return currentCommandId;
}
@@ -738,6 +788,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
return false;
/*
+ * In parallel workers, the XIDs we must consider as current are stored
+ * in ParallelCurrentXids rather than the transaction-state stack. Note
+ * that the XIDs in this array are sorted numerically rather than
+ * according to transactionIdPrecedes order.
+ */
+ if (nParallelCurrentXids > 0)
+ {
+ int low,
+ high;
+
+ low = 0;
+ high = nParallelCurrentXids - 1;
+ while (low <= high)
+ {
+ int middle;
+ TransactionId probe;
+
+ middle = low + (high - low) / 2;
+ probe = ParallelCurrentXids[middle];
+ if (probe == xid)
+ return true;
+ else if (probe < xid)
+ low = middle + 1;
+ else
+ high = middle - 1;
+ }
+ return false;
+ }
+
+ /*
* We will return true for the Xid of the current subtransaction, any of
* its subcommitted children, any of its parents, or any of their
* previously subcommitted children. However, a transaction being aborted
@@ -791,6 +871,48 @@ TransactionStartedDuringRecovery(void)
}
/*
+ * EnterParallelMode
+ */
+void
+EnterParallelMode(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parallelModeLevel >= 0);
+
+ ++s->parallelModeLevel;
+}
+
+/*
+ * ExitParallelMode
+ */
+void
+ExitParallelMode(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parallelModeLevel > 0);
+ Assert(s->parallelModeLevel > 1 || !ParallelContextActive());
+
+ --s->parallelModeLevel;
+}
+
+/*
+ * IsInParallelMode
+ *
+ * Are we in a parallel operation, as either the master or a worker? Check
+ * this to prohibit operations that change backend-local state expected to
+ * match across all workers. Mere caches usually don't require such a
+ * restriction. State modified in a strict push/pop fashion, such as the
+ * active snapshot stack, is often fine.
+ */
+bool
+IsInParallelMode(void)
+{
+ return CurrentTransactionState->parallelModeLevel != 0;
+}
+
+/*
* CommandCounterIncrement
*/
void
@@ -804,6 +926,14 @@ CommandCounterIncrement(void)
*/
if (currentCommandIdUsed)
{
+ /*
+ * Workers synchronize transaction state at the beginning of each
+ * parallel operation, so we can't account for new commands after that
+ * point.
+ */
+ if (IsInParallelMode())
+ elog(ERROR, "cannot start commands during a parallel operation");
+
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
@@ -1650,6 +1780,8 @@ StartTransaction(void)
s = &TopTransactionStateData;
CurrentTransactionState = s;
+ Assert(XactTopTransactionId == InvalidTransactionId);
+
/*
* check the current transaction state
*/
@@ -1779,6 +1911,9 @@ CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ bool is_parallel_worker;
+
+ is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
ShowTransactionState("CommitTransaction");
@@ -1812,7 +1947,8 @@ CommitTransaction(void)
break;
}
- CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
+ CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
+ : XACT_EVENT_PRE_COMMIT);
/*
* The remaining actions cannot call any user-defined code, so it's safe
@@ -1821,6 +1957,13 @@ CommitTransaction(void)
* the transaction-abort path.
*/
+ /* If we might have parallel workers, clean them up now. */
+ if (IsInParallelMode())
+ {
+ AtEOXact_Parallel(true);
+ s->parallelModeLevel = 0;
+ }
+
/* Shut down the deferred-trigger manager */
AfterTriggerEndXact(true);
@@ -1859,10 +2002,28 @@ CommitTransaction(void)
*/
s->state = TRANS_COMMIT;
- /*
- * Here is where we really truly commit.
- */
- latestXid = RecordTransactionCommit();
+ if (!is_parallel_worker)
+ {
+ /*
+ * We need to mark our XIDs as commited in pg_clog. This is where we
+ * durably commit.
+ */
+ latestXid = RecordTransactionCommit();
+ }
+ else
+ {
+ /*
+ * We must not mark our XID committed; the parallel master is
+ * responsible for that.
+ */
+ latestXid = InvalidTransactionId;
+
+ /*
+ * Make sure the master will know about any WAL we wrote before it
+ * commits.
+ */
+ ParallelWorkerReportLastRecEnd(XactLastRecEnd);
+ }
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
@@ -1889,7 +2050,8 @@ CommitTransaction(void)
* state.
*/
- CallXactCallbacks(XACT_EVENT_COMMIT);
+ CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
+ : XACT_EVENT_COMMIT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -1937,7 +2099,7 @@ CommitTransaction(void)
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_on_commit_actions(true);
- AtEOXact_Namespace(true);
+ AtEOXact_Namespace(true, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -1962,6 +2124,9 @@ CommitTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
+
/*
* done with commit processing, set current transaction state back to
* default
@@ -1985,6 +2150,8 @@ PrepareTransaction(void)
GlobalTransaction gxact;
TimestampTz prepared_at;
+ Assert(!IsInParallelMode());
+
ShowTransactionState("PrepareTransaction");
/*
@@ -2204,7 +2371,7 @@ PrepareTransaction(void)
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_on_commit_actions(true);
- AtEOXact_Namespace(true);
+ AtEOXact_Namespace(true, false);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -2229,6 +2396,9 @@ PrepareTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
+
/*
* done with 1st phase commit processing, set current transaction state
* back to default
@@ -2247,6 +2417,7 @@ AbortTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ bool is_parallel_worker;
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
@@ -2295,6 +2466,7 @@ AbortTransaction(void)
/*
* check the current transaction state
*/
+ is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
@@ -2318,6 +2490,13 @@ AbortTransaction(void)
*/
SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
+ /* If in parallel mode, clean up workers and exit parallel mode. */
+ if (IsInParallelMode())
+ {
+ AtEOXact_Parallel(false);
+ s->parallelModeLevel = 0;
+ }
+
/*
* do abort processing
*/
@@ -2330,9 +2509,23 @@ AbortTransaction(void)
/*
* Advertise the fact that we aborted in pg_clog (assuming that we got as
- * far as assigning an XID to advertise).
+ * far as assigning an XID to advertise). But if we're inside a parallel
+ * worker, skip this; the user backend must be the one to write the abort
+ * record.
*/
- latestXid = RecordTransactionAbort(false);
+ if (!is_parallel_worker)
+ latestXid = RecordTransactionAbort(false);
+ else
+ {
+ latestXid = InvalidTransactionId;
+
+ /*
+ * Since the parallel master won't get our value of XactLastRecEnd in this
+ * case, we nudge WAL-writer ourselves in this case. See related comments in
+ * RecordTransactionAbort for why this matters.
+ */
+ XLogSetAsyncXactLSN(XactLastRecEnd);
+ }
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
@@ -2350,7 +2543,10 @@ AbortTransaction(void)
*/
if (TopTransactionResourceOwner != NULL)
{
- CallXactCallbacks(XACT_EVENT_ABORT);
+ if (is_parallel_worker)
+ CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT);
+ else
+ CallXactCallbacks(XACT_EVENT_ABORT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -2371,7 +2567,7 @@ AbortTransaction(void)
AtEOXact_GUC(false, 1);
AtEOXact_SPI(false);
AtEOXact_on_commit_actions(false);
- AtEOXact_Namespace(false);
+ AtEOXact_Namespace(false, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -2423,6 +2619,10 @@ CleanupTransaction(void)
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
+ s->parallelModeLevel = 0;
+
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
/*
* done with abort processing, set current transaction state back to
@@ -2476,6 +2676,7 @@ StartTransactionCommand(void)
/* These cases are invalid. */
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -2511,11 +2712,13 @@ CommitTransactionCommand(void)
switch (s->blockState)
{
/*
- * This shouldn't happen, because it means the previous
+ * These shouldn't happen. TBLOCK_DEFAULT means the previous
* StartTransactionCommand didn't set the STARTED state
- * appropriately.
+ * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended
+ * by EndParallelWorkerTranaction(), not this function.
*/
case TBLOCK_DEFAULT:
+ case TBLOCK_PARALLEL_INPROGRESS:
elog(FATAL, "CommitTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2797,6 +3000,7 @@ AbortCurrentTransaction(void)
* ABORT state. We will stay in ABORT until we get a ROLLBACK.
*/
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
AbortTransaction();
s->blockState = TBLOCK_ABORT;
/* CleanupTransaction happens when we exit TBLOCK_ABORT_END */
@@ -3186,6 +3390,7 @@ BeginTransactionBlock(void)
* Already a transaction block in progress.
*/
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBINPROGRESS:
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
@@ -3363,6 +3568,16 @@ EndTransactionBlock(void)
result = true;
break;
+ /*
+ * The user issued a COMMIT that somehow ran inside a parallel
+ * worker. We can't cope with that.
+ */
+ case TBLOCK_PARALLEL_INPROGRESS:
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot commit during a parallel operation")));
+ break;
+
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
@@ -3456,6 +3671,16 @@ UserAbortTransactionBlock(void)
s->blockState = TBLOCK_ABORT_PENDING;
break;
+ /*
+ * The user issued an ABORT that somehow ran inside a parallel
+ * worker. We can't cope with that.
+ */
+ case TBLOCK_PARALLEL_INPROGRESS:
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot abort during a parallel operation")));
+ break;
+
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
@@ -3485,6 +3710,18 @@ DefineSavepoint(char *name)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for new subtransactions after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot define savepoints during a parallel operation")));
+
switch (s->blockState)
{
case TBLOCK_INPROGRESS:
@@ -3505,6 +3742,7 @@ DefineSavepoint(char *name)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3539,6 +3777,18 @@ ReleaseSavepoint(List *options)
ListCell *cell;
char *name = NULL;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for transaction state change after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot release savepoints during a parallel operation")));
+
switch (s->blockState)
{
/*
@@ -3562,6 +3812,7 @@ ReleaseSavepoint(List *options)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3639,6 +3890,18 @@ RollbackToSavepoint(List *options)
ListCell *cell;
char *name = NULL;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for transaction state change after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot rollback to savepoints during a parallel operation")));
+
switch (s->blockState)
{
/*
@@ -3663,6 +3926,7 @@ RollbackToSavepoint(List *options)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3751,6 +4015,20 @@ BeginInternalSubTransaction(char *name)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for new subtransactions after that point.
+ * We might be able to make an exception for the type of subtransaction
+ * established by this function, which is typically used in contexts where
+ * we're going to release or roll back the subtransaction before proceeding
+ * further, so that no enduring change to the transaction state occurs.
+ * For now, however, we prohibit this case along with all the others.
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot start subtransactions during a parallel operation")));
+
switch (s->blockState)
{
case TBLOCK_STARTED:
@@ -3773,6 +4051,7 @@ BeginInternalSubTransaction(char *name)
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_SUBRELEASE:
case TBLOCK_SUBCOMMIT:
@@ -3805,6 +4084,18 @@ ReleaseCurrentSubTransaction(void)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for commit of subtransactions after that
+ * point. This should not happen anyway. Code calling this would
+ * typically have called BeginInternalSubTransaction() first, failing
+ * there.
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot commit subtransactions during a parallel operation")));
+
if (s->blockState != TBLOCK_SUBINPROGRESS)
elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
@@ -3827,6 +4118,14 @@ RollbackAndReleaseCurrentSubTransaction(void)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
+ * during parallel operations. That's because we may be in the master,
+ * recovering from an error thrown while we were in parallel mode. We
+ * won't reach here in a worker, because BeginInternalSubTransaction()
+ * will have failed.
+ */
+
switch (s->blockState)
{
/* Must be in a subtransaction */
@@ -3838,6 +4137,7 @@ RollbackAndReleaseCurrentSubTransaction(void)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_INPROGRESS:
case TBLOCK_END:
@@ -3913,6 +4213,7 @@ AbortOutOfAnyTransaction(void)
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
case TBLOCK_PREPARE:
@@ -4004,6 +4305,7 @@ TransactionBlockStatusCode(void)
case TBLOCK_BEGIN:
case TBLOCK_SUBBEGIN:
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBINPROGRESS:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -4107,6 +4409,13 @@ CommitSubTransaction(void)
CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
s->parent->subTransactionId);
+ /* If in parallel mode, clean up workers and exit parallel mode. */
+ if (IsInParallelMode())
+ {
+ AtEOSubXact_Parallel(true, s->subTransactionId);
+ s->parallelModeLevel = 0;
+ }
+
/* Do the actual "commit", such as it is */
s->state = TRANS_COMMIT;
@@ -4260,6 +4569,13 @@ AbortSubTransaction(void)
*/
SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
+ /* Exit from parallel mode, if necessary. */
+ if (IsInParallelMode())
+ {
+ AtEOSubXact_Parallel(false, s->subTransactionId);
+ s->parallelModeLevel = 0;
+ }
+
/*
* We can skip all this stuff if the subxact failed before creating a
* ResourceOwner...
@@ -4400,6 +4716,7 @@ PushTransaction(void)
s->blockState = TBLOCK_SUBBEGIN;
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly;
+ s->parallelModeLevel = 0;
CurrentTransactionState = s;
@@ -4447,6 +4764,139 @@ PopTransaction(void)
}
/*
+ * EstimateTransactionStateSpace
+ * Estimate the amount of space that will be needed by
+ * SerializeTransactionState. It would be OK to overestimate slightly,
+ * but it's simple for us to work out the precise value, so we do.
+ */
+Size
+EstimateTransactionStateSpace(void)
+{
+ TransactionState s;
+ Size nxids = 5; /* iso level, deferrable, top & current XID, XID count */
+
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ nxids = add_size(nxids, 1);
+ nxids = add_size(nxids, s->nChildXids);
+ }
+
+ nxids = add_size(nxids, nParallelCurrentXids);
+ return mul_size(nxids, sizeof(TransactionId));
+}
+
+/*
+ * SerializeTransactionState
+ * Write out relevant details of our transaction state that will be
+ * needed by a parallel worker.
+ *
+ * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
+ * associated with this transaction. The first eight bytes of the result
+ * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the
+ * XID of the top-level transaction and the XID of the current transaction
+ * (or, in each case, InvalidTransactionId if none). After that, the next 4
+ * bytes contain a count of how many additional XIDs follow; this is followed
+ * by all of those XIDs one after another. We emit the XIDs in sorted order
+ * for the convenience of the receiving process.
+ */
+void
+SerializeTransactionState(Size maxsize, char *start_address)
+{
+ TransactionState s;
+ Size nxids = 0;
+ Size i = 0;
+ TransactionId *workspace;
+ TransactionId *result = (TransactionId *) start_address;
+
+ Assert(maxsize >= 5 * sizeof(TransactionId));
+ result[0] = (TransactionId) XactIsoLevel;
+ result[1] = (TransactionId) XactDeferrable;
+ result[2] = XactTopTransactionId;
+ result[3] = CurrentTransactionState->transactionId;
+
+ /*
+ * If we're running in a parallel worker and launching a parallel worker
+ * of our own, we can just pass along the information that was passed to
+ * us.
+ */
+ if (nParallelCurrentXids > 0)
+ {
+ Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId));
+ result[4] = nParallelCurrentXids;
+ memcpy(&result[5], ParallelCurrentXids,
+ nParallelCurrentXids * sizeof(TransactionId));
+ return;
+ }
+
+ /*
+ * OK, we need to generate a sorted list of XIDs that our workers
+ * should view as current. First, figure out how many there are.
+ */
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ nxids = add_size(nxids, 1);
+ nxids = add_size(nxids, s->nChildXids);
+ }
+ Assert(nxids * sizeof(TransactionId) < maxsize);
+
+ /* Copy them to our scratch space. */
+ workspace = palloc(nxids * sizeof(TransactionId));
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ workspace[i++] = s->transactionId;
+ memcpy(&workspace[i], s->childXids,
+ s->nChildXids * sizeof(TransactionId));
+ i += s->nChildXids;
+ }
+ Assert(i == nxids);
+
+ /* Sort them. */
+ qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
+
+ /* Copy data into output area. */
+ result[4] = (TransactionId) nxids;
+ memcpy(&result[5], workspace, nxids * sizeof(TransactionId));
+}
+
+/*
+ * StartParallelWorkerTransaction
+ * Start a parallel worker transaction, restoring the relevant
+ * transaction state serialized by SerializeTransactionState.
+ */
+void
+StartParallelWorkerTransaction(char *tstatespace)
+{
+ TransactionId *tstate = (TransactionId *) tstatespace;
+
+ Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT);
+ StartTransaction();
+
+ XactIsoLevel = (int) tstate[0];
+ XactDeferrable = (bool) tstate[1];
+ XactTopTransactionId = tstate[2];
+ CurrentTransactionState->transactionId = tstate[3];
+ nParallelCurrentXids = (int) tstate[4];
+ ParallelCurrentXids = &tstate[5];
+
+ CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
+}
+
+/*
+ * EndParallelWorkerTransaction
+ * End a parallel worker transaction.
+ */
+void
+EndParallelWorkerTransaction(void)
+{
+ Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS);
+ CommitTransaction();
+ CurrentTransactionState->blockState = TBLOCK_DEFAULT;
+}
+
+/*
* ShowTransactionState
* Debug support
*/
@@ -4516,6 +4966,8 @@ BlockStateAsString(TBlockState blockState)
return "BEGIN";
case TBLOCK_INPROGRESS:
return "INPROGRESS";
+ case TBLOCK_PARALLEL_INPROGRESS:
+ return "PARALLEL_INPROGRESS";
case TBLOCK_END:
return "END";
case TBLOCK_ABORT:
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index da7b6c2fadd..6cf441534cc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,6 +292,14 @@ static TimeLineID curFileTLI;
* end+1 of the last record, and is reset when we end a top-level transaction,
* or start a new one; so it can be used to tell if the current transaction has
* created any XLOG records.
+ *
+ * While in parallel mode, this may not be fully up to date. When committing,
+ * a transaction can assume this covers all xlog records written either by the
+ * user backend or by any parallel worker which was present at any point during
+ * the transaction. But when aborting, or when still in parallel mode, other
+ * parallel backends may have written WAL records at later LSNs than the value
+ * stored here. The parallel leader advances its own copy, when necessary,
+ * in WaitForParallelWorkersToFinish.
*/
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;