diff options
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ce1b907debd..13c8ba3b196 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/session.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" @@ -36,6 +37,7 @@ #include "utils/memutils.h" #include "utils/resowner.h" #include "utils/snapmgr.h" +#include "utils/typcache.h" /* @@ -51,8 +53,9 @@ #define PARALLEL_MAGIC 0x50477c7c /* - * Magic numbers for parallel state sharing. Higher-level code should use - * smaller values, leaving these very large ones for use by this module. + * Magic numbers for per-context parallel state sharing. Higher-level code + * should use smaller values, leaving these very large ones for use by this + * module. */ #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) @@ -63,6 +66,7 @@ #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size segsize = 0; int i; FixedParallelState *fps; + dsm_handle session_dsm_handle = DSM_HANDLE_INVALID; Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); @@ -213,6 +218,21 @@ InitializeParallelDSM(ParallelContext *pcxt) */ if (pcxt->nworkers > 0) { + /* Get (or create) the per-session DSM segment's handle. */ + session_dsm_handle = GetSessionDsmHandle(); + + /* + * If we weren't able to create a per-session DSM segment, then we can + * continue but we can't safely launch any workers because their + * record typmods would be incompatible so they couldn't exchange + * tuples. + */ + if (session_dsm_handle == DSM_HANDLE_INVALID) + pcxt->nworkers = 0; + } + + if (pcxt->nworkers > 0) + { /* Estimate space for various kinds of state sharing. */ library_len = EstimateLibraryStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, library_len); @@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 6); + shm_toc_estimate_keys(&pcxt->estimator, 7); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *session_dsm_handle_space; char *entrypointstate; Size lnamelen; @@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeSnapshot(active_snapshot, asnapspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + /* Provide the handle for per-session segment. */ + session_dsm_handle_space = shm_toc_allocate(pcxt->toc, + sizeof(dsm_handle)); + *(dsm_handle *) session_dsm_handle_space = session_dsm_handle; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM, + session_dsm_handle_space); + /* Serialize transaction state. */ tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); SerializeTransactionState(tstatelen, tstatespace); @@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg) char *asnapspace; char *tstatespace; StringInfoData msgbuf; + char *session_dsm_handle_space; /* Set flag to indicate that we're initializing a parallel worker. */ InitializingParallelWorker = true; @@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg) combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false); RestoreComboCIDState(combocidspace); + /* Attach to the per-session DSM segment and contained objects. */ + session_dsm_handle_space = + shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false); + AttachSession(*(dsm_handle *) session_dsm_handle_space); + /* Restore transaction snapshot. */ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false); RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), @@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg) /* Shut down the parallel-worker transaction. */ EndParallelWorkerTransaction(); + /* Detach from the per-session DSM segment. */ + DetachSession(); + /* Report success. */ pq_putmessage('X', NULL, 0); } |