diff options
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/Makefile | 2 | ||||
-rw-r--r-- | src/backend/access/transam/README.parallel | 223 | ||||
-rw-r--r-- | src/backend/access/transam/parallel.c | 1007 | ||||
-rw-r--r-- | src/backend/access/transam/varsup.c | 7 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 486 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 8 |
6 files changed, 1715 insertions, 18 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 9d4d5dbc975..94455b23f7e 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \ +OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \ timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ xloginsert.o xlogreader.o xlogutils.o diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel new file mode 100644 index 00000000000..10051863fed --- /dev/null +++ b/src/backend/access/transam/README.parallel @@ -0,0 +1,223 @@ +Overview +======== + +PostgreSQL provides some simple facilities to make writing parallel algorithms +easier. Using a data structure called a ParallelContext, you can arrange to +launch background worker processes, initialize their state to match that of +the backend which initiated parallelism, communicate with them via dynamic +shared memory, and write reasonably complex code that can run either in the +user backend or in one of the parallel workers without needing to be aware of +where it's running. + +The backend which starts a parallel operation (hereafter, the initiating +backend) starts by creating a dynamic shared memory segment which will last +for the lifetime of the parallel operation. This dynamic shared memory segment +will contain (1) a shm_mq that can be used to transport errors (and other +messages reported via elog/ereport) from the worker back to the initiating +backend; (2) serialized representations of the initiating backend's private +state, so that the worker can synchronize its state with of the initiating +backend; and (3) any other data structures which a particular user of the +ParallelContext data structure may wish to add for its own purposes. Once +the initiating backend has initialized the dynamic shared memory segment, it +asks the postmaster to launch the appropriate number of parallel workers. +These workers then connect to the dynamic shared memory segment, initiate +their state, and then invoke the appropriate entrypoint, as further detailed +below. + +Error Reporting +=============== + +When started, each parallel worker begins by attaching the dynamic shared +memory segment and locating the shm_mq to be used for error reporting; it +redirects all of its protocol messages to this shm_mq. Prior to this point, +any failure of the background worker will not be reported to the initiating +backend; from the point of view of the initiating backend, the worker simply +failed to start. The initiating backend must anyway be prepared to cope +with fewer parallel workers than it originally requested, so catering to +this case imposes no additional burden. + +Whenever a new message (or partial message; very large messages may wrap) is +sent to the error-reporting queue, PROCSIG_PARALLEL_MESSAGE is sent to the +initiating backend. This causes the next CHECK_FOR_INTERRUPTS() in the +initiating backend to read and rethrow the message. For the most part, this +makes error reporting in parallel mode "just work". Of course, to work +properly, it is important that the code the initiating backend is executing +CHECK_FOR_INTERRUPTS() regularly and avoid blocking interrupt processing for +long periods of time, but those are good things to do anyway. + +(A currently-unsolved problem is that some messages may get written to the +system log twice, once in the backend where the report was originally +generated, and again when the initiating backend rethrows the message. If +we decide to suppress one of these reports, it should probably be second one; +otherwise, if the worker is for some reason unable to propagate the message +back to the initiating backend, the message will be lost altogether.) + +State Sharing +============= + +It's possible to write C code which works correctly without parallelism, but +which fails when parallelism is used. No parallel infrastructure can +completely eliminate this problem, because any global variable is a risk. +There's no general mechanism for ensuring that every global variable in the +worker will have the same value that it does in the initiating backend; even +if we could ensure that, some function we're calling could update the variable +after each call, and only the backend where that update is performed will see +the new value. Similar problems can arise with any more-complex data +structure we might choose to use. For example, a pseudo-random number +generator should, given a particular seed value, produce the same predictable +series of values every time. But it does this by relying on some private +state which won't automatically be shared between cooperating backends. A +parallel-safe PRNG would need to store its state in dynamic shared memory, and +would require locking. The parallelism infrastructure has no way of knowing +whether the user intends to call code that has this sort of problem, and can't +do anything about it anyway. + +Instead, we take a more pragmatic approach. First, we try to make as many of +the operations that are safe outside of parallel mode work correctly in +parallel mode as well. Second, we try to prohibit common unsafe operations +via suitable error checks. These checks are intended to catch 100% of +unsafe things that a user might do from the SQL interface, but code written +in C can do unsafe things that won't trigger these checks. The error checks +are engaged via EnterParallelMode(), which should be called before creating +a parallel context, and disarmed via ExitParallelMode(), which should be +called after all parallel contexts have been destroyed. The most +significant restriction imposed by parallel mode is that all operations must +be strictly read-only; we allow no writes to the database and no DDL. We +might try to relax these restrictions in the future. + +To make as many operations as possible safe in parallel mode, we try to copy +the most important pieces of state from the initiating backend to each parallel +worker. This includes: + + - The set of libraries dynamically loaded by dfmgr.c. + + - The authenticated user ID and current database. Each parallel worker + will connect to the same database as the initiating backend, using the + same user ID. + + - The values of all GUCs. Accordingly, permanent changes to the value of + any GUC are forbidden while in parallel mode; but temporary changes, + such as entering a function with non-NULL proconfig, are OK. + + - The current subtransaction's XID, the top-level transaction's XID, and + the list of XIDs considered current (that is, they are in-progress or + subcommitted). This information is needed to ensure that tuple visibility + checks return the same results in the worker as they do in the + initiating backend. See also the section Transaction Integration, below. + + - The combo CID mappings. This is needed to ensure consistent answers to + tuple visibility checks. The need to synchronize this data structure is + a major reason why we can't support writes in parallel mode: such writes + might create new combo CIDs, and we have no way to let other workers + (or the initiating backend) know about them. + + - The transaction snapshot. + + - The active snapshot, which might be different from the transaction + snapshot. + + - The currently active user ID and security context. Note that this is + the fourth user ID we restore: the initial step of binding to the correct + database also involves restoring the authenticated user ID. When GUC + values are restored, this incidentally sets SessionUserId and OuterUserId + to the correct values. This final step restores CurrentUserId. + +To prevent undetected or unprincipled deadlocks when running in parallel mode, +this could should eventually handle heavyweight locks in some way. This is +not implemented yet. + +Transaction Integration +======================= + +Regardless of what the TransactionState stack looks like in the parallel +leader, each parallel worker ends up with a stack of depth 1. This stack +entry is marked with the special transaction block state +TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary +toplevel transaction. The XID of this TransactionState is set to the XID of +the innermost currently-active subtransaction in the initiating backend. The +initiating backend's toplevel XID, and the XIDs of all current (in-progress +or subcommitted) XIDs are stored separately from the TransactionState stack, +but in such a way that GetTopTransactionId(), GetTopTransactionIdIfAny(), and +TransactionIdIsCurrentTransactionId() return the same values that they would +in the initiating backend. We could copy the entire transaction state stack, +but most of it would be useless: for example, you can't roll back to a +savepoint from within a parallel worker, and there are no resources to +associated with the memory contexts or resource owners of intermediate +subtransactions. + +No meaningful change to the transaction state can be made while in parallel +mode. No XIDs can be assigned, and no subtransactions can start or end, +because we have no way of communicating these state changes to cooperating +backends, or of synchronizing them. It's clearly unworkable for the initiating +backend to exit any transaction or subtransaction that was in progress when +parallelism was started before all parallel workers have exited; and it's even +more clearly crazy for a parallel worker to try to subcommit or subabort the +current subtransaction and execute in some other transaction context than was +present in the initiating backend. It might be practical to allow internal +sub-transactions (e.g. to implement a PL/pgsql EXCEPTION block) to be used in +parallel mode, provided that they are XID-less, because other backends +wouldn't really need to know about those transactions or do anything +differently because of them. Right now, we don't even allow that. + +At the end of a parallel operation, which can happen either because it +completed successfully or because it was interrupted by an error, parallel +workers associated with that operation exit. In the error case, transaction +abort processing in the parallel leader kills of any remaining workers, and +the parallel leader then waits for them to die. In the case of a successful +parallel operation, the parallel leader does not send any signals, but must +wait for workers to complete and exit of their own volition. In either +case, it is very important that all workers actually exit before the +parallel leader cleans up the (sub)transaction in which they were created; +otherwise, chaos can ensue. For example, if the leader is rolling back the +transaction that created the relation being scanned by a worker, the +relation could disappear while the worker is still busy scanning it. That's +not safe. + +Generally, the cleanup performed by each worker at this point is similar to +top-level commit or abort. Each backend has its own resource owners: buffer +pins, catcache or relcache reference counts, tuple descriptors, and so on +are managed separately by each backend, and must free them before exiting. +There are, however, some important differences between parallel worker +commit or abort and a real top-level transaction commit or abort. Most +importantly: + + - No commit or abort record is written; the initiating backend is + responsible for this. + + - Cleanup of pg_temp namespaces is not done. Parallel workers cannot + safely access the initiating backend's pg_temp namespace, and should + not create one of their own. + +Coding Conventions +=================== + +Before beginning any parallel operation, call EnterParallelMode(); after all +parallel operations are completed, call ExitParallelMode(). To actually +parallelize a particular operation, use a ParallelContext. The basic coding +pattern looks like this: + + EnterParallelMode(); /* prohibit unsafe state changes */ + + pcxt = CreateParallelContext(entrypoint, nworkers); + + /* Allow space for application-specific data here. */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); + + InitializeParallelDSM(pcxt); /* create DSM and copy state to it */ + + /* Store the data for which we reserved space. */ + space = shm_toc_allocate(pcxt->toc, size); + shm_toc_insert(pcxt->toc, key, space); + + LaunchParallelWorkers(pcxt); + + /* do parallel stuff */ + + WaitForParallelWorkersToFinish(pcxt); + + /* read any final results from dynamic shared memory */ + + DestroyParallelContext(pcxt); + + ExitParallelMode(); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c new file mode 100644 index 00000000000..8ed7314b58c --- /dev/null +++ b/src/backend/access/transam/parallel.c @@ -0,0 +1,1007 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/parallel.h" +#include "commands/async.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/sinval.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/combocid.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" + +/* + * We don't want to waste a lot of memory on an error queue which, most of + * the time, will process only a handful of small messages. However, it is + * desirable to make it large enough that a typical ErrorResponse can be sent + * without blocking. That way, a worker that errors out can write the whole + * message into the queue and terminate without waiting for the user backend. + */ +#define PARALLEL_ERROR_QUEUE_SIZE 16384 + +/* Magic number for parallel context TOC. */ +#define PARALLEL_MAGIC 0x50477c7c + +/* + * Magic numbers for parallel state sharing. Higher-level code should use + * smaller values, leaving these very large ones for use by this module. + */ +#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) +#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) +#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) +#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) +#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) +#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) +#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) +#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) +#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) + +/* Fixed-size parallel state. */ +typedef struct FixedParallelState +{ + /* Fixed-size state that workers must restore. */ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; + PGPROC *parallel_master_pgproc; + pid_t parallel_master_pid; + BackendId parallel_master_backend_id; + + /* Entrypoint for parallel workers. */ + parallel_worker_main_type entrypoint; + + /* Mutex protects remaining fields. */ + slock_t mutex; + + /* Track whether workers have attached. */ + int workers_expected; + int workers_attached; + + /* Maximum XactLastRecEnd of any worker. */ + XLogRecPtr last_xlog_end; +} FixedParallelState; + +/* + * Our parallel worker number. We initialize this to -1, meaning that we are + * not a parallel worker. In parallel workers, it will be set to a value >= 0 + * and < the number of workers before any user code is invoked; each parallel + * worker will get a different parallel worker number. + */ +int ParallelWorkerNumber = -1; + +/* Is there a parallel message pending which we need to receive? */ +bool ParallelMessagePending = false; + +/* Pointer to our fixed parallel state. */ +static FixedParallelState *MyFixedParallelState; + +/* List of active parallel contexts. */ +static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); + +/* Private functions. */ +static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void ParallelErrorContext(void *arg); +static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); +static void ParallelWorkerMain(Datum main_arg); + +/* + * Establish a new parallel context. This should be done after entering + * parallel mode, and (unless there is an error) the context should be + * destroyed before exiting the current subtransaction. + */ +ParallelContext * +CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* It is unsafe to create a parallel context if not in parallel mode. */ + Assert(IsInParallelMode()); + + /* Number of workers should be non-negative. */ + Assert(nworkers >= 0); + + /* + * If dynamic shared memory is not available, we won't be able to use + * background workers. + */ + if (dynamic_shared_memory_type == DSM_IMPL_NONE) + nworkers = 0; + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize a new ParallelContext. */ + pcxt = palloc0(sizeof(ParallelContext)); + pcxt->subid = GetCurrentSubTransactionId(); + pcxt->nworkers = nworkers; + pcxt->entrypoint = entrypoint; + pcxt->error_context_stack = error_context_stack; + shm_toc_initialize_estimator(&pcxt->estimator); + dlist_push_head(&pcxt_list, &pcxt->node); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish a new parallel context that calls a function provided by an + * extension. This works around the fact that the library might get mapped + * at a different address in each backend. + */ +ParallelContext * +CreateParallelContextForExternalFunction(char *library_name, + char *function_name, + int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Create the context. */ + pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish the dynamic shared memory segment for a parallel context and + * copied state and other bookkeeping information that will need by parallel + * workers into it. + */ +void +InitializeParallelDSM(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + Size library_len = 0; + Size guc_len = 0; + Size combocidlen = 0; + Size tsnaplen = 0; + Size asnaplen = 0; + Size tstatelen = 0; + Size segsize = 0; + int i; + FixedParallelState *fps; + Snapshot transaction_snapshot = GetTransactionSnapshot(); + Snapshot active_snapshot = GetActiveSnapshot(); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Allow space to store the fixed-size parallel state. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Normally, the user will have requested at least one worker process, + * but if by chance they have not, we can skip a bunch of things here. + */ + if (pcxt->nworkers > 0) + { + /* Estimate space for various kinds of state sharing. */ + library_len = EstimateLibraryStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, library_len); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, guc_len); + combocidlen = EstimateComboCIDStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); + tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); + asnaplen = EstimateSnapshotSpace(active_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); + tstatelen = EstimateTransactionStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + /* If you add more chunks here, you probably need to add keys. */ + shm_toc_estimate_keys(&pcxt->estimator, 6); + + /* Estimate space need for error queues. */ + StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == + PARALLEL_ERROR_QUEUE_SIZE, + "parallel error queue size not buffer-aligned"); + shm_toc_estimate_chunk(&pcxt->estimator, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate how much we'll need for extension entrypoint info. */ + if (pcxt->library_name != NULL) + { + Assert(pcxt->entrypoint == ParallelExtensionTrampoline); + Assert(pcxt->function_name != NULL); + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + } + + /* + * Create DSM and initialize with new table of contents. But if the user + * didn't request any workers, then don't bother creating a dynamic shared + * memory segment; instead, just use backend-private memory. + * + * Also, if we can't create a dynamic shared memory segment because the + * maximum number of segments have already been created, then fall back + * to backend-private memory, and plan not to use any workers. We hope + * this won't happen very often, but it's better to abandon the use of + * parallelism than to fail outright. + */ + segsize = shm_toc_estimate(&pcxt->estimator); + if (pcxt->nworkers != 0) + pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (pcxt->seg != NULL) + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, + dsm_segment_address(pcxt->seg), + segsize); + else + { + pcxt->nworkers = 0; + pcxt->private = MemoryContextAlloc(TopMemoryContext, segsize); + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private, segsize); + } + + /* Initialize fixed-size state in shared memory. */ + fps = (FixedParallelState *) + shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); + fps->database_id = MyDatabaseId; + fps->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); + fps->parallel_master_pgproc = MyProc; + fps->parallel_master_pid = MyProcPid; + fps->parallel_master_backend_id = MyBackendId; + fps->entrypoint = pcxt->entrypoint; + SpinLockInit(&fps->mutex); + fps->workers_expected = pcxt->nworkers; + fps->workers_attached = 0; + fps->last_xlog_end = 0; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); + + /* We can skip the rest of this if we're not budgeting for any workers. */ + if (pcxt->nworkers > 0) + { + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + char *error_queue_space; + + /* Serialize shared libraries we have loaded. */ + libraryspace = shm_toc_allocate(pcxt->toc, library_len); + SerializeLibraryState(library_len, libraryspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace); + + /* Serialize GUC settings. */ + gucspace = shm_toc_allocate(pcxt->toc, guc_len); + SerializeGUCState(guc_len, gucspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); + + /* Serialize combo CID state. */ + combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); + SerializeComboCIDState(combocidlen, combocidspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); + + /* Serialize transaction snapshot and active snapshot. */ + tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); + SerializeSnapshot(transaction_snapshot, tsnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, + tsnapspace); + asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); + SerializeSnapshot(active_snapshot, asnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + + /* Serialize transaction state. */ + tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); + SerializeTransactionState(tstatelen, tstatespace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + + /* Allocate space for worker information. */ + pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); + + /* + * Establish error queues in dynamic shared memory. + * + * These queues should be used only for transmitting ErrorResponse, + * NoticeResponse, and NotifyResponse protocol messages. Tuple data + * should be transmitted via separate (possibly larger?) queues. + */ + error_queue_space = + shm_toc_allocate(pcxt->toc, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); + + /* Serialize extension entrypoint information. */ + if (pcxt->library_name != NULL) + { + Size lnamelen = strlen(pcxt->library_name); + char *extensionstate; + + extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(extensionstate, pcxt->library_name); + strcpy(extensionstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, + extensionstate); + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Launch parallel workers. + */ +void +LaunchParallelWorkers(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + int i; + bool any_registrations_failed = false; + + /* Skip this if we have no workers. */ + if (pcxt->nworkers == 0) + return; + + /* If we do have workers, we'd better have a DSM segment. */ + Assert(pcxt->seg != NULL); + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Configure a worker. */ + snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", + MyProcPid); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = ParallelWorkerMain; + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); + worker.bgw_notify_pid = MyProcPid; + + /* + * Start workers. + * + * The caller must be able to tolerate ending up with fewer workers than + * expected, so there is no need to throw an error here if registration + * fails. It wouldn't help much anyway, because registering the worker + * in no way guarantees that it will start up and initialize successfully. + */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (!any_registrations_failed && + RegisterDynamicBackgroundWorker(&worker, + &pcxt->worker[i].bgwhandle)) + shm_mq_set_handle(pcxt->worker[i].error_mqh, + pcxt->worker[i].bgwhandle); + else + { + /* + * If we weren't able to register the worker, then we've bumped + * up against the max_worker_processes limit, and future + * registrations will probably fail too, so arrange to skip them. + * But we still have to execute this code for the remaining slots + * to make sure that we forget about the error queues we budgeted + * for those workers. Otherwise, we'll wait for them to start, + * but they never will. + */ + any_registrations_failed = true; + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Wait for all workers to exit. + * + * Even if the parallel operation seems to have completed successfully, it's + * important to call this function afterwards. We must not miss any errors + * the workers may have thrown during the parallel operation, or any that they + * may yet throw while shutting down. + * + * Also, we want to update our notion of XactLastRecEnd based on worker + * feedback. + */ +void +WaitForParallelWorkersToFinish(ParallelContext *pcxt) +{ + for (;;) + { + bool anyone_alive = false; + int i; + + /* + * This will process any parallel messages that are pending, which + * may change the outcome of the loop that follows. It may also + * throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + { + anyone_alive = true; + break; + } + } + + if (!anyone_alive) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); + ResetLatch(&MyProc->procLatch); + } + + if (pcxt->toc != NULL) + { + FixedParallelState *fps; + + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + if (fps->last_xlog_end > XactLastRecEnd) + XactLastRecEnd = fps->last_xlog_end; + } +} + +/* + * Destroy a parallel context. + * + * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() + * first, before calling this function. When this function is invoked, any + * remaining workers are forcibly killed; the dynamic shared memory segment + * is unmapped; and we then wait (uninterruptibly) for the workers to exit. + */ +void +DestroyParallelContext(ParallelContext *pcxt) +{ + int i; + + /* + * Be careful about order of operations here! We remove the parallel + * context from the list before we do anything else; otherwise, if an + * error occurs during a subsequent step, we might try to nuke it again + * from AtEOXact_Parallel or AtEOSubXact_Parallel. + */ + dlist_delete(&pcxt->node); + + /* Kill each worker in turn, and forget their error queues. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].bgwhandle != NULL) + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + if (pcxt->worker[i].error_mqh != NULL) + { + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].error_mqh = NULL; + } + } + + /* + * If we have allocated a shared memory segment, detach it. This will + * implicitly detach the error queues, and any other shared memory queues, + * stored there. + */ + if (pcxt->seg != NULL) + { + dsm_detach(pcxt->seg); + pcxt->seg = NULL; + } + + /* + * If this parallel context is actually in backend-private memory rather + * than shared memory, free that memory instead. + */ + if (pcxt->private != NULL) + { + pfree(pcxt->private); + pcxt->private = NULL; + } + + /* Wait until the workers actually die. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + BgwHandleStatus status; + + if (pcxt->worker[i].bgwhandle == NULL) + continue; + + /* + * We can't finish transaction commit or abort until all of the + * workers are dead. This means, in particular, that we can't respond + * to interrupts at this stage. + */ + HOLD_INTERRUPTS(); + status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); + RESUME_INTERRUPTS(); + + /* + * If the postmaster kicked the bucket, we have no chance of cleaning + * up safely -- we won't be able to tell when our workers are actually + * dead. This doesn't necessitate a PANIC since they will all abort + * eventually, but we can't safely continue this session. + */ + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during a parallel transaction"))); + + /* Release memory. */ + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + + /* Free the worker array itself. */ + if (pcxt->worker != NULL) + { + pfree(pcxt->worker); + pcxt->worker = NULL; + } + + /* Free memory. */ + pfree(pcxt); +} + +/* + * Are there any parallel contexts currently active? + */ +bool +ParallelContextActive(void) +{ + return !dlist_is_empty(&pcxt_list); +} + +/* + * Handle receipt of an interrupt indicating a parallel worker message. + */ +void +HandleParallelMessageInterrupt(void) +{ + int save_errno = errno; + + InterruptPending = true; + ParallelMessagePending = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Handle any queued protocol messages received from parallel workers. + */ +void +HandleParallelMessages(void) +{ + dlist_iter iter; + + ParallelMessagePending = false; + + dlist_foreach(iter, &pcxt_list) + { + ParallelContext *pcxt; + int i; + Size nbytes; + void *data; + + pcxt = dlist_container(ParallelContext, node, iter.cur); + if (pcxt->worker == NULL) + continue; + + for (i = 0; i < pcxt->nworkers; ++i) + { + /* + * Read as many messages as we can from each worker, but stop + * when either (1) the error queue goes away, which can happen if + * we receive a Terminate message from the worker; or (2) no more + * messages can be read from the worker without blocking. + */ + while (pcxt->worker[i].error_mqh != NULL) + { + shm_mq_result res; + + res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, + &data, true); + if (res == SHM_MQ_WOULD_BLOCK) + break; + else if (res == SHM_MQ_SUCCESS) + { + StringInfoData msg; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + HandleParallelMessage(pcxt, i, &msg); + pfree(msg.data); + } + else + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ + errmsg("lost connection to parallel worker"))); + + /* This might make the error queue go away. */ + CHECK_FOR_INTERRUPTS(); + } + } + } +} + +/* + * Handle a single protocol message received from a single parallel worker. + */ +static void +HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) +{ + char msgtype; + + msgtype = pq_getmsgbyte(msg); + + switch (msgtype) + { + case 'K': /* BackendKeyData */ + { + int32 pid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 4); /* discard cancel key */ + (void) pq_getmsgend(msg); + pcxt->worker[i].pid = pid; + break; + } + + case 'E': /* ErrorResponse */ + case 'N': /* NoticeResponse */ + { + ErrorData edata; + ErrorContextCallback errctx; + ErrorContextCallback *save_error_context_stack; + + /* + * Rethrow the error using the error context callbacks that + * were in effect when the context was created, not the + * current ones. + */ + save_error_context_stack = error_context_stack; + errctx.callback = ParallelErrorContext; + errctx.arg = &pcxt->worker[i].pid; + errctx.previous = pcxt->error_context_stack; + error_context_stack = &errctx; + + /* Parse ErrorReponse or NoticeResponse. */ + pq_parse_errornotice(msg, &edata); + + /* Death of a worker isn't enough justification for suicide. */ + edata.elevel = Min(edata.elevel, ERROR); + + /* Rethrow error or notice. */ + ThrowErrorData(&edata); + + /* Restore previous context. */ + error_context_stack = save_error_context_stack; + + break; + } + + case 'A': /* NotifyResponse */ + { + /* Propagate NotifyResponse. */ + pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + break; + } + + case 'X': /* Terminate, indicating clean exit */ + { + pfree(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + break; + } + + default: + { + elog(ERROR, "unknown message type: %c (%d bytes)", + msgtype, msg->len); + } + } +} + +/* + * End-of-subtransaction cleanup for parallel contexts. + * + * Currently, it's forbidden to enter or leave a subtransaction while + * parallel mode is in effect, so we could just blow away everything. But + * we may want to relax that restriction in the future, so this code + * contemplates that there may be multiple subtransaction IDs in pcxt_list. + */ +void +AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (pcxt->subid != mySubId) + break; + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * End-of-transaction cleanup for parallel contexts. + */ +void +AtEOXact_Parallel(bool isCommit) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * Main entrypoint for parallel workers. + */ +static void +ParallelWorkerMain(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + FixedParallelState *fps; + char *error_queue_space; + shm_mq *mq; + shm_mq_handle *mqh; + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + StringInfoData msgbuf; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "parallel worker", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Now that we have a resource owner, we can attach to the dynamic + * shared memory segment and read the table of contents. + */ + seg = dsm_attach(DatumGetUInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Determine and set our worker number. */ + fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); + Assert(fps != NULL); + Assert(ParallelWorkerNumber == -1); + SpinLockAcquire(&fps->mutex); + if (fps->workers_attached < fps->workers_expected) + ParallelWorkerNumber = fps->workers_attached++; + SpinLockRelease(&fps->mutex); + if (ParallelWorkerNumber < 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many parallel workers already attached"))); + MyFixedParallelState = fps; + + /* + * Now that we have a worker number, we can find and attach to the error + * queue provided for us. That's good, because until we do that, any + * errors that happen here will not be reported back to the process that + * requested that this worker be launched. + */ + error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); + mq = (shm_mq *) (error_queue_space + + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_sender(mq, MyProc); + mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(mq, mqh); + pq_set_parallel_master(fps->parallel_master_pid, + fps->parallel_master_backend_id); + + /* + * Send a BackendKeyData message to the process that initiated parallelism + * so that it has access to our PID before it receives any other messages + * from us. Our cancel key is sent, too, since that's the way the protocol + * message is defined, but it won't actually be used for anything in this + * case. + */ + pq_beginmessage(&msgbuf, 'K'); + pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32)); + pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32)); + pq_endmessage(&msgbuf); + + /* + * Hooray! Primary initialization is complete. Now, we need to set up + * our backend-local state to match the original backend. + */ + + /* + * Load libraries that were loaded by original backend. We want to do this + * before restoring GUCs, because the libraries might define custom + * variables. + */ + libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); + Assert(libraryspace != NULL); + RestoreLibraryState(libraryspace); + + /* Restore database connection. */ + BackgroundWorkerInitializeConnectionByOid(fps->database_id, + fps->authenticated_user_id); + + /* Restore GUC values from launching backend. */ + gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); + Assert(gucspace != NULL); + StartTransactionCommand(); + RestoreGUCState(gucspace); + CommitTransactionCommand(); + + /* Crank up a transaction state appropriate to a parallel worker. */ + tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); + StartParallelWorkerTransaction(tstatespace); + + /* Restore combo CID state. */ + combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); + Assert(combocidspace != NULL); + RestoreComboCIDState(combocidspace); + + /* Restore transaction snapshot. */ + tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); + Assert(tsnapspace != NULL); + RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), + fps->parallel_master_pgproc); + + /* Restore active snapshot. */ + asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); + Assert(asnapspace != NULL); + PushActiveSnapshot(RestoreSnapshot(asnapspace)); + + /* Restore user ID and security context. */ + SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); + + /* + * We've initialized all of our state now; nothing should change hereafter. + */ + EnterParallelMode(); + + /* + * Time to do the real work: invoke the caller-supplied code. + * + * If you get a crash at this line, see the comments for + * ParallelExtensionTrampoline. + */ + fps->entrypoint(seg, toc); + + /* Must exit parallel mode to pop active snapshot. */ + ExitParallelMode(); + + /* Must pop active snapshot so resowner.c doesn't complain. */ + PopActiveSnapshot(); + + /* Shut down the parallel-worker transaction. */ + EndParallelWorkerTransaction(); + + /* Report success. */ + pq_putmessage('X', NULL, 0); +} + +/* + * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a + * function living in a dynamically loaded module, because the module might + * not be loaded in every process, or might be loaded but not at the same + * address. To work around that problem, CreateParallelContextForExtension() + * arranges to call this function rather than calling the extension-provided + * function directly; and this function then looks up the real entrypoint and + * calls it. + */ +static void +ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) +{ + char *extensionstate; + char *library_name; + char *function_name; + parallel_worker_main_type entrypt; + + extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); + Assert(extensionstate != NULL); + library_name = extensionstate; + function_name = extensionstate + strlen(library_name) + 1; + + entrypt = (parallel_worker_main_type) + load_external_function(library_name, function_name, true, NULL); + entrypt(seg, toc); +} + +/* + * Give the user a hint that this is a message propagated from a parallel + * worker. Otherwise, it can sometimes be confusing to understand what + * actually happened. + */ +static void +ParallelErrorContext(void *arg) +{ + errcontext("parallel worker, pid %d", * (int32 *) arg); +} + +/* + * Update shared memory with the ending location of the last WAL record we + * wrote, if it's greater than the value already stored there. + */ +void +ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) +{ + FixedParallelState *fps = MyFixedParallelState; + + Assert(fps != NULL); + SpinLockAcquire(&fps->mutex); + if (fps->last_xlog_end < last_xlog_end) + fps->last_xlog_end = last_xlog_end; + SpinLockRelease(&fps->mutex); +} diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 42ee57fe8d7..cf3e964fc6e 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact) TransactionId xid; /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new XIDs after that point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + + /* * During bootstrap initialization, we return the special bootstrap * transaction id. */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 511bcbbc519..a8f78d63762 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -22,6 +22,7 @@ #include "access/commit_ts.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -51,6 +52,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "utils/builtins.h" #include "utils/catcache.h" #include "utils/combocid.h" #include "utils/guc.h" @@ -78,6 +80,33 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; /* + * When running as a parallel worker, we place only a single + * TransactionStateData on the parallel worker's state stack, and the XID + * reflected there will be that of the *innermost* currently-active + * subtransaction in the backend that initiated paralllelism. However, + * GetTopTransactionId() and TransactionIdIsCurrentTransactionId() + * need to return the same answers in the parallel worker as they would have + * in the user backend, so we need some additional bookkeeping. + * + * XactTopTransactionId stores the XID of our toplevel transaction, which + * will be the same as TopTransactionState.transactionId in an ordinary + * backend; but in a parallel backend, which does not have the entire + * transaction state, it will instead be copied from the backend that started + * the parallel operation. + * + * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary + * backend, but in a parallel backend, nParallelCurrentXids will contain the + * number of XIDs that need to be considered current, and ParallelCurrentXids + * will contain the XIDs themselves. This includes all XIDs that were current + * or sub-committed in the parent at the time the parallel operation began. + * The XIDs are stored sorted in numerical order (not logical order) to make + * lookups as fast as possible. + */ +TransactionId XactTopTransactionId = InvalidTransactionId; +int nParallelCurrentXids = 0; +TransactionId *ParallelCurrentXids; + +/* * MyXactAccessedTempRel is set when a temporary relation is accessed. * We don't allow PREPARE TRANSACTION in that case. (This is global * so that it can be set from heapam.c.) @@ -113,6 +142,7 @@ typedef enum TBlockState /* transaction block states */ TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ + TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ @@ -154,6 +184,7 @@ typedef struct TransactionStateData bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ + int parallelModeLevel; /* Enter/ExitParallelMode counter */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -184,6 +215,7 @@ static TransactionStateData TopTransactionStateData = { false, /* entry-time xact r/o state */ false, /* startedInRecovery */ false, /* didLogXid */ + 0, /* parallelMode */ NULL /* link to parent state block */ }; @@ -353,9 +385,9 @@ IsAbortedTransactionBlockState(void) TransactionId GetTopTransactionId(void) { - if (!TransactionIdIsValid(TopTransactionStateData.transactionId)) + if (!TransactionIdIsValid(XactTopTransactionId)) AssignTransactionId(&TopTransactionStateData); - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -368,7 +400,7 @@ GetTopTransactionId(void) TransactionId GetTopTransactionIdIfAny(void) { - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -462,6 +494,13 @@ AssignTransactionId(TransactionState s) Assert(s->state == TRANS_INPROGRESS); /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new XIDs at this point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign XIDs during a parallel operation"); + + /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow * if we're at the bottom of a huge stack of subtransactions none of which @@ -513,6 +552,8 @@ AssignTransactionId(TransactionState s) * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); + if (!isSubXact) + XactTopTransactionId = s->transactionId; if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); @@ -644,7 +685,16 @@ GetCurrentCommandId(bool used) { /* this is global to a transaction, not subtransaction-local */ if (used) + { + /* + * Forbid setting currentCommandIdUsed in parallel mode, because we + * have no provision for communicating this back to the master. We + * could relax this restriction when currentCommandIdUsed was already + * true at the start of the parallel operation. + */ + Assert(CurrentTransactionState->parallelModeLevel == 0); currentCommandIdUsed = true; + } return currentCommandId; } @@ -738,6 +788,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) return false; /* + * In parallel workers, the XIDs we must consider as current are stored + * in ParallelCurrentXids rather than the transaction-state stack. Note + * that the XIDs in this array are sorted numerically rather than + * according to transactionIdPrecedes order. + */ + if (nParallelCurrentXids > 0) + { + int low, + high; + + low = 0; + high = nParallelCurrentXids - 1; + while (low <= high) + { + int middle; + TransactionId probe; + + middle = low + (high - low) / 2; + probe = ParallelCurrentXids[middle]; + if (probe == xid) + return true; + else if (probe < xid) + low = middle + 1; + else + high = middle - 1; + } + return false; + } + + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their * previously subcommitted children. However, a transaction being aborted @@ -791,6 +871,48 @@ TransactionStartedDuringRecovery(void) } /* + * EnterParallelMode + */ +void +EnterParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel >= 0); + + ++s->parallelModeLevel; +} + +/* + * ExitParallelMode + */ +void +ExitParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel > 0); + Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); + + --s->parallelModeLevel; +} + +/* + * IsInParallelMode + * + * Are we in a parallel operation, as either the master or a worker? Check + * this to prohibit operations that change backend-local state expected to + * match across all workers. Mere caches usually don't require such a + * restriction. State modified in a strict push/pop fashion, such as the + * active snapshot stack, is often fine. + */ +bool +IsInParallelMode(void) +{ + return CurrentTransactionState->parallelModeLevel != 0; +} + +/* * CommandCounterIncrement */ void @@ -804,6 +926,14 @@ CommandCounterIncrement(void) */ if (currentCommandIdUsed) { + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new commands after that + * point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot start commands during a parallel operation"); + currentCommandId += 1; if (currentCommandId == InvalidCommandId) { @@ -1650,6 +1780,8 @@ StartTransaction(void) s = &TopTransactionStateData; CurrentTransactionState = s; + Assert(XactTopTransactionId == InvalidTransactionId); + /* * check the current transaction state */ @@ -1779,6 +1911,9 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; + + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); ShowTransactionState("CommitTransaction"); @@ -1812,7 +1947,8 @@ CommitTransaction(void) break; } - CallXactCallbacks(XACT_EVENT_PRE_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT + : XACT_EVENT_PRE_COMMIT); /* * The remaining actions cannot call any user-defined code, so it's safe @@ -1821,6 +1957,13 @@ CommitTransaction(void) * the transaction-abort path. */ + /* If we might have parallel workers, clean them up now. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(true); + s->parallelModeLevel = 0; + } + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -1859,10 +2002,28 @@ CommitTransaction(void) */ s->state = TRANS_COMMIT; - /* - * Here is where we really truly commit. - */ - latestXid = RecordTransactionCommit(); + if (!is_parallel_worker) + { + /* + * We need to mark our XIDs as commited in pg_clog. This is where we + * durably commit. + */ + latestXid = RecordTransactionCommit(); + } + else + { + /* + * We must not mark our XID committed; the parallel master is + * responsible for that. + */ + latestXid = InvalidTransactionId; + + /* + * Make sure the master will know about any WAL we wrote before it + * commits. + */ + ParallelWorkerReportLastRecEnd(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); @@ -1889,7 +2050,8 @@ CommitTransaction(void) * state. */ - CallXactCallbacks(XACT_EVENT_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT + : XACT_EVENT_COMMIT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -1937,7 +2099,7 @@ CommitTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -1962,6 +2124,9 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with commit processing, set current transaction state back to * default @@ -1985,6 +2150,8 @@ PrepareTransaction(void) GlobalTransaction gxact; TimestampTz prepared_at; + Assert(!IsInParallelMode()); + ShowTransactionState("PrepareTransaction"); /* @@ -2204,7 +2371,7 @@ PrepareTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, false); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2229,6 +2396,9 @@ PrepareTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with 1st phase commit processing, set current transaction state * back to default @@ -2247,6 +2417,7 @@ AbortTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -2295,6 +2466,7 @@ AbortTransaction(void) /* * check the current transaction state */ + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); @@ -2318,6 +2490,13 @@ AbortTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(false); + s->parallelModeLevel = 0; + } + /* * do abort processing */ @@ -2330,9 +2509,23 @@ AbortTransaction(void) /* * Advertise the fact that we aborted in pg_clog (assuming that we got as - * far as assigning an XID to advertise). + * far as assigning an XID to advertise). But if we're inside a parallel + * worker, skip this; the user backend must be the one to write the abort + * record. */ - latestXid = RecordTransactionAbort(false); + if (!is_parallel_worker) + latestXid = RecordTransactionAbort(false); + else + { + latestXid = InvalidTransactionId; + + /* + * Since the parallel master won't get our value of XactLastRecEnd in this + * case, we nudge WAL-writer ourselves in this case. See related comments in + * RecordTransactionAbort for why this matters. + */ + XLogSetAsyncXactLSN(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2350,7 +2543,10 @@ AbortTransaction(void) */ if (TopTransactionResourceOwner != NULL) { - CallXactCallbacks(XACT_EVENT_ABORT); + if (is_parallel_worker) + CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT); + else + CallXactCallbacks(XACT_EVENT_ABORT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -2371,7 +2567,7 @@ AbortTransaction(void) AtEOXact_GUC(false, 1); AtEOXact_SPI(false); AtEOXact_on_commit_actions(false); - AtEOXact_Namespace(false); + AtEOXact_Namespace(false, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2423,6 +2619,10 @@ CleanupTransaction(void) s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; + s->parallelModeLevel = 0; + + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; /* * done with abort processing, set current transaction state back to @@ -2476,6 +2676,7 @@ StartTransactionCommand(void) /* These cases are invalid. */ case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -2511,11 +2712,13 @@ CommitTransactionCommand(void) switch (s->blockState) { /* - * This shouldn't happen, because it means the previous + * These shouldn't happen. TBLOCK_DEFAULT means the previous * StartTransactionCommand didn't set the STARTED state - * appropriately. + * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended + * by EndParallelWorkerTranaction(), not this function. */ case TBLOCK_DEFAULT: + case TBLOCK_PARALLEL_INPROGRESS: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2797,6 +3000,7 @@ AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ @@ -3186,6 +3390,7 @@ BeginTransactionBlock(void) * Already a transaction block in progress. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3363,6 +3568,16 @@ EndTransactionBlock(void) result = true; break; + /* + * The user issued a COMMIT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3456,6 +3671,16 @@ UserAbortTransactionBlock(void) s->blockState = TBLOCK_ABORT_PENDING; break; + /* + * The user issued an ABORT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot abort during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3485,6 +3710,18 @@ DefineSavepoint(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot define savepoints during a parallel operation"))); + switch (s->blockState) { case TBLOCK_INPROGRESS: @@ -3505,6 +3742,7 @@ DefineSavepoint(char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3539,6 +3777,18 @@ ReleaseSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot release savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3562,6 +3812,7 @@ ReleaseSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3639,6 +3890,18 @@ RollbackToSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot rollback to savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3663,6 +3926,7 @@ RollbackToSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3751,6 +4015,20 @@ BeginInternalSubTransaction(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that point. + * We might be able to make an exception for the type of subtransaction + * established by this function, which is typically used in contexts where + * we're going to release or roll back the subtransaction before proceeding + * further, so that no enduring change to the transaction state occurs. + * For now, however, we prohibit this case along with all the others. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start subtransactions during a parallel operation"))); + switch (s->blockState) { case TBLOCK_STARTED: @@ -3773,6 +4051,7 @@ BeginInternalSubTransaction(char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -3805,6 +4084,18 @@ ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for commit of subtransactions after that + * point. This should not happen anyway. Code calling this would + * typically have called BeginInternalSubTransaction() first, failing + * there. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit subtransactions during a parallel operation"))); + if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); @@ -3827,6 +4118,14 @@ RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted + * during parallel operations. That's because we may be in the master, + * recovering from an error thrown while we were in parallel mode. We + * won't reach here in a worker, because BeginInternalSubTransaction() + * will have failed. + */ + switch (s->blockState) { /* Must be in a subtransaction */ @@ -3838,6 +4137,7 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_END: @@ -3913,6 +4213,7 @@ AbortOutOfAnyTransaction(void) case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: case TBLOCK_PREPARE: @@ -4004,6 +4305,7 @@ TransactionBlockStatusCode(void) case TBLOCK_BEGIN: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4107,6 +4409,13 @@ CommitSubTransaction(void) CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(true, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* Do the actual "commit", such as it is */ s->state = TRANS_COMMIT; @@ -4260,6 +4569,13 @@ AbortSubTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* Exit from parallel mode, if necessary. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -4400,6 +4716,7 @@ PushTransaction(void) s->blockState = TBLOCK_SUBBEGIN; GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; + s->parallelModeLevel = 0; CurrentTransactionState = s; @@ -4447,6 +4764,139 @@ PopTransaction(void) } /* + * EstimateTransactionStateSpace + * Estimate the amount of space that will be needed by + * SerializeTransactionState. It would be OK to overestimate slightly, + * but it's simple for us to work out the precise value, so we do. + */ +Size +EstimateTransactionStateSpace(void) +{ + TransactionState s; + Size nxids = 5; /* iso level, deferrable, top & current XID, XID count */ + + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + + nxids = add_size(nxids, nParallelCurrentXids); + return mul_size(nxids, sizeof(TransactionId)); +} + +/* + * SerializeTransactionState + * Write out relevant details of our transaction state that will be + * needed by a parallel worker. + * + * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs + * associated with this transaction. The first eight bytes of the result + * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the + * XID of the top-level transaction and the XID of the current transaction + * (or, in each case, InvalidTransactionId if none). After that, the next 4 + * bytes contain a count of how many additional XIDs follow; this is followed + * by all of those XIDs one after another. We emit the XIDs in sorted order + * for the convenience of the receiving process. + */ +void +SerializeTransactionState(Size maxsize, char *start_address) +{ + TransactionState s; + Size nxids = 0; + Size i = 0; + TransactionId *workspace; + TransactionId *result = (TransactionId *) start_address; + + Assert(maxsize >= 5 * sizeof(TransactionId)); + result[0] = (TransactionId) XactIsoLevel; + result[1] = (TransactionId) XactDeferrable; + result[2] = XactTopTransactionId; + result[3] = CurrentTransactionState->transactionId; + + /* + * If we're running in a parallel worker and launching a parallel worker + * of our own, we can just pass along the information that was passed to + * us. + */ + if (nParallelCurrentXids > 0) + { + Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId)); + result[4] = nParallelCurrentXids; + memcpy(&result[5], ParallelCurrentXids, + nParallelCurrentXids * sizeof(TransactionId)); + return; + } + + /* + * OK, we need to generate a sorted list of XIDs that our workers + * should view as current. First, figure out how many there are. + */ + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + Assert(nxids * sizeof(TransactionId) < maxsize); + + /* Copy them to our scratch space. */ + workspace = palloc(nxids * sizeof(TransactionId)); + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + workspace[i++] = s->transactionId; + memcpy(&workspace[i], s->childXids, + s->nChildXids * sizeof(TransactionId)); + i += s->nChildXids; + } + Assert(i == nxids); + + /* Sort them. */ + qsort(workspace, nxids, sizeof(TransactionId), xidComparator); + + /* Copy data into output area. */ + result[4] = (TransactionId) nxids; + memcpy(&result[5], workspace, nxids * sizeof(TransactionId)); +} + +/* + * StartParallelWorkerTransaction + * Start a parallel worker transaction, restoring the relevant + * transaction state serialized by SerializeTransactionState. + */ +void +StartParallelWorkerTransaction(char *tstatespace) +{ + TransactionId *tstate = (TransactionId *) tstatespace; + + Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT); + StartTransaction(); + + XactIsoLevel = (int) tstate[0]; + XactDeferrable = (bool) tstate[1]; + XactTopTransactionId = tstate[2]; + CurrentTransactionState->transactionId = tstate[3]; + nParallelCurrentXids = (int) tstate[4]; + ParallelCurrentXids = &tstate[5]; + + CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; +} + +/* + * EndParallelWorkerTransaction + * End a parallel worker transaction. + */ +void +EndParallelWorkerTransaction(void) +{ + Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); + CommitTransaction(); + CurrentTransactionState->blockState = TBLOCK_DEFAULT; +} + +/* * ShowTransactionState * Debug support */ @@ -4516,6 +4966,8 @@ BlockStateAsString(TBlockState blockState) return "BEGIN"; case TBLOCK_INPROGRESS: return "INPROGRESS"; + case TBLOCK_PARALLEL_INPROGRESS: + return "PARALLEL_INPROGRESS"; case TBLOCK_END: return "END"; case TBLOCK_ABORT: diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index da7b6c2fadd..6cf441534cc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -292,6 +292,14 @@ static TimeLineID curFileTLI; * end+1 of the last record, and is reset when we end a top-level transaction, * or start a new one; so it can be used to tell if the current transaction has * created any XLOG records. + * + * While in parallel mode, this may not be fully up to date. When committing, + * a transaction can assume this covers all xlog records written either by the + * user backend or by any parallel worker which was present at any point during + * the transaction. But when aborting, or when still in parallel mode, other + * parallel backends may have written WAL records at later LSNs than the value + * stored here. The parallel leader advances its own copy, when necessary, + * in WaitForParallelWorkersToFinish. */ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; |