diff options
author | Andres Freund <andres@anarazel.de> | 2017-09-14 19:59:21 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2017-09-14 19:59:21 -0700 |
commit | cc5f81366c36b3dd8f02bd9be1cf75b2cc8482bd (patch) | |
tree | 8670bbca7404426515b5556ab2c6477b316e3edc /src/backend/access | |
parent | 9b6cb4650bc6a56114000678c1944afdb95f8333 (diff) | |
download | postgresql-cc5f81366c36b3dd8f02bd9be1cf75b2cc8482bd.tar.gz postgresql-cc5f81366c36b3dd8f02bd9be1cf75b2cc8482bd.zip |
Add support for coordinating record typmods among parallel workers.
Tuples can have type RECORDOID and a typmod number that identifies a blessed
TupleDesc in a backend-private cache. To support the sharing of such tuples
through shared memory and temporary files, provide a typmod registry in
shared memory.
To achieve that, introduce per-session DSM segments, created on demand when a
backend first runs a parallel query. The per-session DSM segment has a
table-of-contents just like the per-query DSM segment, and initially the
contents are a shared record typmod registry and a DSA area to provide the
space it needs to grow.
State relating to the current session is accessed via a Session object
reached through global variable CurrentSession that may require significant
redesign further down the road as we figure out what else needs to be shared
or remodelled.
Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/common/Makefile | 2 | ||||
-rw-r--r-- | src/backend/access/common/session.c | 208 | ||||
-rw-r--r-- | src/backend/access/common/tupdesc.c | 16 | ||||
-rw-r--r-- | src/backend/access/transam/parallel.c | 44 |
4 files changed, 266 insertions, 4 deletions
diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile index fb27944b891..f130b6e3501 100644 --- a/src/backend/access/common/Makefile +++ b/src/backend/access/common/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = bufmask.o heaptuple.o indextuple.o printsimple.o printtup.o \ - reloptions.o scankey.o tupconvert.o tupdesc.o + reloptions.o scankey.o session.o tupconvert.o tupdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/common/session.c b/src/backend/access/common/session.c new file mode 100644 index 00000000000..865999b063b --- /dev/null +++ b/src/backend/access/common/session.c @@ -0,0 +1,208 @@ +/*------------------------------------------------------------------------- + * + * session.c + * Encapsulation of user session. + * + * This is intended to contain data that needs to be shared between backends + * performing work for a client session. In particular such a session is + * shared between the leader and worker processes for parallel queries. At + * some later point it might also become useful infrastructure for separating + * backends from client connections, e.g. for the purpose of pooling. + * + * Currently this infrastructure is used to share: + * - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc. + * + * Portions Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/backend/access/common/session.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/session.h" +#include "storage/lwlock.h" +#include "storage/shm_toc.h" +#include "utils/memutils.h" +#include "utils/typcache.h" + +/* Magic number for per-session DSM TOC. */ +#define SESSION_MAGIC 0xabb0fbc9 + +/* + * We want to create a DSA area to store shared state that has the same + * lifetime as a session. So far, it's only used to hold the shared record + * type registry. We don't want it to have to create any DSM segments just + * yet in common cases, so we'll give it enough space to hold a very small + * SharedRecordTypmodRegistry. + */ +#define SESSION_DSA_SIZE 0x30000 + +/* + * Magic numbers for state sharing in the per-session DSM area. + */ +#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001) +#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002) + +/* This backend's current session. */ +Session *CurrentSession = NULL; + +/* + * Set up CurrentSession to point to an empty Session object. + */ +void +InitializeSession(void) +{ + CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session)); +} + +/* + * Initialize the per-session DSM segment if it isn't already initialized, and + * return its handle so that worker processes can attach to it. + * + * Unlike the per-context DSM segment, this segement and its contents are + * reused for future parallel queries. + * + * Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of + * resources. + */ +dsm_handle +GetSessionDsmHandle(void) +{ + shm_toc_estimator estimator; + shm_toc *toc; + dsm_segment *seg; + size_t typmod_registry_size; + size_t size; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + /* + * If we have already created a session-scope DSM segment in this backend, + * return its handle. The same segment will be used for the rest of this + * backend's lifetime. + */ + if (CurrentSession->segment != NULL) + return dsm_segment_handle(CurrentSession->segment); + + /* Otherwise, prepare to set one up. */ + old_context = MemoryContextSwitchTo(TopMemoryContext); + shm_toc_initialize_estimator(&estimator); + + /* Estimate space for the per-session DSA area. */ + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE); + + /* Estimate space for the per-session record typmod registry. */ + typmod_registry_size = SharedRecordTypmodRegistryEstimate(); + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, typmod_registry_size); + + /* Set up segment and TOC. */ + size = shm_toc_estimate(&estimator); + seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + { + MemoryContextSwitchTo(old_context); + + return DSM_HANDLE_INVALID; + } + toc = shm_toc_create(SESSION_MAGIC, + dsm_segment_address(seg), + size); + + /* Create per-session DSA area. */ + dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE); + dsa = dsa_create_in_place(dsa_space, + SESSION_DSA_SIZE, + LWTRANCHE_SESSION_DSA, + seg); + shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space); + + + /* Create session-scoped shared record typmod registry. */ + typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size); + SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *) + typmod_registry_space, seg, dsa); + shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, + typmod_registry_space); + + /* + * If we got this far, we can pin the shared memory so it stays mapped for + * the rest of this backend's life. If we don't make it this far, cleanup + * callbacks for anything we installed above (ie currently + * SharedRecordTypemodRegistry) will run when the DSM segment is detached + * by CurrentResourceOwner so we aren't left with a broken CurrentSession. + */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + /* Make segment and area available via CurrentSession. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + MemoryContextSwitchTo(old_context); + + return dsm_segment_handle(seg); +} + +/* + * Attach to a per-session DSM segment provided by a parallel leader. + */ +void +AttachSession(dsm_handle handle) +{ + dsm_segment *seg; + shm_toc *toc; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Attach to the DSM segment. */ + seg = dsm_attach(handle); + if (seg == NULL) + elog(ERROR, "could not attach to per-session DSM segment"); + toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg)); + + /* Attach to the DSA area. */ + dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false); + dsa = dsa_attach_in_place(dsa_space, seg); + + /* Make them available via the current session. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + /* Attach to the shared record typmod registry. */ + typmod_registry_space = + shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false); + SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *) + typmod_registry_space); + + /* Remain attached until end of backend or DetachSession(). */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + MemoryContextSwitchTo(old_context); +} + +/* + * Detach from the current session DSM segment. It's not strictly necessary + * to do this explicitly since we'll detach automatically at backend exit, but + * if we ever reuse parallel workers it will become important for workers to + * detach from one session before attaching to another. Note that this runs + * detach hooks. + */ +void +DetachSession(void) +{ + /* Runs detach hooks. */ + dsm_detach(CurrentSession->segment); + CurrentSession->segment = NULL; + dsa_detach(CurrentSession->area); + CurrentSession->area = NULL; +} diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 4436c863617..9e37ca73a86 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -185,6 +185,22 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) } /* + * TupleDescCopy + * Copy a tuple descriptor into caller-supplied memory. + * The memory may be shared memory mapped at any address, and must + * be sufficient to hold TupleDescSize(src) bytes. + * + * !!! Constraints and defaults are not copied !!! + */ +void +TupleDescCopy(TupleDesc dst, TupleDesc src) +{ + memcpy(dst, src, TupleDescSize(src)); + dst->constr = NULL; + dst->tdrefcount = -1; +} + +/* * TupleDescCopyEntry * This function copies a single attribute structure from one tuple * descriptor to another. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ce1b907debd..13c8ba3b196 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/session.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" @@ -36,6 +37,7 @@ #include "utils/memutils.h" #include "utils/resowner.h" #include "utils/snapmgr.h" +#include "utils/typcache.h" /* @@ -51,8 +53,9 @@ #define PARALLEL_MAGIC 0x50477c7c /* - * Magic numbers for parallel state sharing. Higher-level code should use - * smaller values, leaving these very large ones for use by this module. + * Magic numbers for per-context parallel state sharing. Higher-level code + * should use smaller values, leaving these very large ones for use by this + * module. */ #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) @@ -63,6 +66,7 @@ #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size segsize = 0; int i; FixedParallelState *fps; + dsm_handle session_dsm_handle = DSM_HANDLE_INVALID; Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); @@ -213,6 +218,21 @@ InitializeParallelDSM(ParallelContext *pcxt) */ if (pcxt->nworkers > 0) { + /* Get (or create) the per-session DSM segment's handle. */ + session_dsm_handle = GetSessionDsmHandle(); + + /* + * If we weren't able to create a per-session DSM segment, then we can + * continue but we can't safely launch any workers because their + * record typmods would be incompatible so they couldn't exchange + * tuples. + */ + if (session_dsm_handle == DSM_HANDLE_INVALID) + pcxt->nworkers = 0; + } + + if (pcxt->nworkers > 0) + { /* Estimate space for various kinds of state sharing. */ library_len = EstimateLibraryStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, library_len); @@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 6); + shm_toc_estimate_keys(&pcxt->estimator, 7); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *session_dsm_handle_space; char *entrypointstate; Size lnamelen; @@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeSnapshot(active_snapshot, asnapspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + /* Provide the handle for per-session segment. */ + session_dsm_handle_space = shm_toc_allocate(pcxt->toc, + sizeof(dsm_handle)); + *(dsm_handle *) session_dsm_handle_space = session_dsm_handle; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM, + session_dsm_handle_space); + /* Serialize transaction state. */ tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); SerializeTransactionState(tstatelen, tstatespace); @@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg) char *asnapspace; char *tstatespace; StringInfoData msgbuf; + char *session_dsm_handle_space; /* Set flag to indicate that we're initializing a parallel worker. */ InitializingParallelWorker = true; @@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg) combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false); RestoreComboCIDState(combocidspace); + /* Attach to the per-session DSM segment and contained objects. */ + session_dsm_handle_space = + shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false); + AttachSession(*(dsm_handle *) session_dsm_handle_space); + /* Restore transaction snapshot. */ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false); RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), @@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg) /* Shut down the parallel-worker transaction. */ EndParallelWorkerTransaction(); + /* Detach from the per-session DSM segment. */ + DetachSession(); + /* Report success. */ pq_putmessage('X', NULL, 0); } |