aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c44
1 files changed, 41 insertions, 3 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index ce1b907debd..13c8ba3b196 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/parallel.h"
+#include "access/session.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
@@ -36,6 +37,7 @@
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+#include "utils/typcache.h"
/*
@@ -51,8 +53,9 @@
#define PARALLEL_MAGIC 0x50477c7c
/*
- * Magic numbers for parallel state sharing. Higher-level code should use
- * smaller values, leaving these very large ones for use by this module.
+ * Magic numbers for per-context parallel state sharing. Higher-level code
+ * should use smaller values, leaving these very large ones for use by this
+ * module.
*/
#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
@@ -63,6 +66,7 @@
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size segsize = 0;
int i;
FixedParallelState *fps;
+ dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
Snapshot transaction_snapshot = GetTransactionSnapshot();
Snapshot active_snapshot = GetActiveSnapshot();
@@ -213,6 +218,21 @@ InitializeParallelDSM(ParallelContext *pcxt)
*/
if (pcxt->nworkers > 0)
{
+ /* Get (or create) the per-session DSM segment's handle. */
+ session_dsm_handle = GetSessionDsmHandle();
+
+ /*
+ * If we weren't able to create a per-session DSM segment, then we can
+ * continue but we can't safely launch any workers because their
+ * record typmods would be incompatible so they couldn't exchange
+ * tuples.
+ */
+ if (session_dsm_handle == DSM_HANDLE_INVALID)
+ pcxt->nworkers = 0;
+ }
+
+ if (pcxt->nworkers > 0)
+ {
/* Estimate space for various kinds of state sharing. */
library_len = EstimateLibraryStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, library_len);
@@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 6);
+ shm_toc_estimate_keys(&pcxt->estimator, 7);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *error_queue_space;
+ char *session_dsm_handle_space;
char *entrypointstate;
Size lnamelen;
@@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeSnapshot(active_snapshot, asnapspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+ /* Provide the handle for per-session segment. */
+ session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
+ sizeof(dsm_handle));
+ *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
+ session_dsm_handle_space);
+
/* Serialize transaction state. */
tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
SerializeTransactionState(tstatelen, tstatespace);
@@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg)
char *asnapspace;
char *tstatespace;
StringInfoData msgbuf;
+ char *session_dsm_handle_space;
/* Set flag to indicate that we're initializing a parallel worker. */
InitializingParallelWorker = true;
@@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg)
combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
RestoreComboCIDState(combocidspace);
+ /* Attach to the per-session DSM segment and contained objects. */
+ session_dsm_handle_space =
+ shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
+ AttachSession(*(dsm_handle *) session_dsm_handle_space);
+
/* Restore transaction snapshot. */
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
@@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg)
/* Shut down the parallel-worker transaction. */
EndParallelWorkerTransaction();
+ /* Detach from the per-session DSM segment. */
+ DetachSession();
+
/* Report success. */
pq_putmessage('X', NULL, 0);
}