diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2017-04-14 23:50:16 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2017-04-14 23:50:16 -0400 |
commit | 32470825d36d99a81347ee36c181d609c952c061 (patch) | |
tree | 0fe8bfc11dbb20ae8dfe943632674cd3bc859b79 /src/backend/access/transam/parallel.c | |
parent | 5a617ab3e691aec56725960e6d28c98c8af6ddaa (diff) | |
download | postgresql-32470825d36d99a81347ee36c181d609c952c061.tar.gz postgresql-32470825d36d99a81347ee36c181d609c952c061.zip |
Avoid passing function pointers across process boundaries.
We'd already recognized that we can't pass function pointers across process
boundaries for functions in loadable modules, since a shared library could
get loaded at different addresses in different processes. But actually the
practice doesn't work for functions in the core backend either, if we're
using EXEC_BACKEND. This is the cause of recent failures on buildfarm
member culicidae. Switch to passing a string function name in all cases.
Something like this needs to be back-patched into 9.6, but let's see
if the buildfarm likes it first.
Petr Jelinek, with a bunch of basically-cosmetic adjustments by me
Discussion: https://postgr.es/m/548f9c1d-eafa-e3fa-9da8-f0cc2f654e60@2ndquadrant.com
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 193 |
1 files changed, 105 insertions, 88 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index b3d3853fbc2..04fa2ed4550 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -19,6 +19,7 @@ #include "access/xlog.h" #include "catalog/namespace.h" #include "commands/async.h" +#include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -61,7 +62,7 @@ #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) -#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -77,9 +78,6 @@ typedef struct FixedParallelState pid_t parallel_master_pid; BackendId parallel_master_backend_id; - /* Entrypoint for parallel workers. */ - parallel_worker_main_type entrypoint; - /* Mutex protects remaining fields. */ slock_t mutex; @@ -107,10 +105,26 @@ static FixedParallelState *MyFixedParallelState; /* List of active parallel contexts. */ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); +/* + * List of internal parallel worker entry points. We need this for + * reasons explained in LookupParallelWorkerFunction(), below. + */ +static const struct +{ + const char *fn_name; + parallel_worker_main_type fn_addr; +} InternalParallelWorkers[] = + +{ + { + "ParallelQueryMain", ParallelQueryMain + } +}; + /* Private functions. */ static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); -static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); +static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname); /* @@ -119,7 +133,8 @@ static void WaitForParallelWorkersToExit(ParallelContext *pcxt); * destroyed before exiting the current subtransaction. */ ParallelContext * -CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +CreateParallelContext(const char *library_name, const char *function_name, + int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; @@ -152,7 +167,8 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) pcxt = palloc0(sizeof(ParallelContext)); pcxt->subid = GetCurrentSubTransactionId(); pcxt->nworkers = nworkers; - pcxt->entrypoint = entrypoint; + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); pcxt->error_context_stack = error_context_stack; shm_toc_initialize_estimator(&pcxt->estimator); dlist_push_head(&pcxt_list, &pcxt->node); @@ -164,33 +180,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) } /* - * Establish a new parallel context that calls a function provided by an - * extension. This works around the fact that the library might get mapped - * at a different address in each backend. - */ -ParallelContext * -CreateParallelContextForExternalFunction(char *library_name, - char *function_name, - int nworkers) -{ - MemoryContext oldcontext; - ParallelContext *pcxt; - - /* We might be running in a very short-lived memory context. */ - oldcontext = MemoryContextSwitchTo(TopTransactionContext); - - /* Create the context. */ - pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); - pcxt->library_name = pstrdup(library_name); - pcxt->function_name = pstrdup(function_name); - - /* Restore previous memory context. */ - MemoryContextSwitchTo(oldcontext); - - return pcxt; -} - -/* * Establish the dynamic shared memory segment for a parallel context and * copy state and other bookkeeping information that will be needed by * parallel workers into it. @@ -249,15 +238,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* Estimate how much we'll need for extension entrypoint info. */ - if (pcxt->library_name != NULL) - { - Assert(pcxt->entrypoint == ParallelExtensionTrampoline); - Assert(pcxt->function_name != NULL); - shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) - + strlen(pcxt->function_name) + 2); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } + /* Estimate how much we'll need for the entrypoint info. */ + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); } /* @@ -297,7 +281,6 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->parallel_master_pgproc = MyProc; fps->parallel_master_pid = MyProcPid; fps->parallel_master_backend_id = MyBackendId; - fps->entrypoint = pcxt->entrypoint; SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -312,6 +295,8 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *entrypointstate; + Size lnamelen; /* Serialize shared libraries we have loaded. */ libraryspace = shm_toc_allocate(pcxt->toc, library_len); @@ -368,19 +353,19 @@ InitializeParallelDSM(ParallelContext *pcxt) } shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); - /* Serialize extension entrypoint information. */ - if (pcxt->library_name != NULL) - { - Size lnamelen = strlen(pcxt->library_name); - char *extensionstate; - - extensionstate = shm_toc_allocate(pcxt->toc, lnamelen - + strlen(pcxt->function_name) + 2); - strcpy(extensionstate, pcxt->library_name); - strcpy(extensionstate + lnamelen + 1, pcxt->function_name); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, - extensionstate); - } + /* + * Serialize entrypoint information. It's unsafe to pass function + * pointers across processes, as the function pointer may be different + * in each process in EXEC_BACKEND builds, so we always pass library + * and function name. (We use library name "postgres" for functions + * in the core backend.) + */ + lnamelen = strlen(pcxt->library_name); + entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(entrypointstate, pcxt->library_name); + strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); } /* Restore previous memory context. */ @@ -671,6 +656,8 @@ DestroyParallelContext(ParallelContext *pcxt) } /* Free memory. */ + pfree(pcxt->library_name); + pfree(pcxt->function_name); pfree(pcxt); } @@ -941,6 +928,10 @@ ParallelWorkerMain(Datum main_arg) shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; + char *entrypointstate; + char *library_name; + char *function_name; + parallel_worker_main_type entrypt; char *gucspace; char *combocidspace; char *tsnapspace; @@ -1040,6 +1031,18 @@ ParallelWorkerMain(Datum main_arg) Assert(libraryspace != NULL); RestoreLibraryState(libraryspace); + /* + * Identify the entry point to be called. In theory this could result in + * loading an additional library, though most likely the entry point is in + * the core backend or in a library we just loaded. + */ + entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT); + Assert(entrypointstate != NULL); + library_name = entrypointstate; + function_name = entrypointstate + strlen(library_name) + 1; + + entrypt = LookupParallelWorkerFunction(library_name, function_name); + /* Restore database connection. */ BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); @@ -1102,11 +1105,8 @@ ParallelWorkerMain(Datum main_arg) /* * Time to do the real work: invoke the caller-supplied code. - * - * If you get a crash at this line, see the comments for - * ParallelExtensionTrampoline. */ - fps->entrypoint(seg, toc); + entrypt(seg, toc); /* Must exit parallel mode to pop active snapshot. */ ExitParallelMode(); @@ -1122,33 +1122,6 @@ ParallelWorkerMain(Datum main_arg) } /* - * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a - * function living in a dynamically loaded module, because the module might - * not be loaded in every process, or might be loaded but not at the same - * address. To work around that problem, CreateParallelContextForExtension() - * arranges to call this function rather than calling the extension-provided - * function directly; and this function then looks up the real entrypoint and - * calls it. - */ -static void -ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) -{ - char *extensionstate; - char *library_name; - char *function_name; - parallel_worker_main_type entrypt; - - extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); - Assert(extensionstate != NULL); - library_name = extensionstate; - function_name = extensionstate + strlen(library_name) + 1; - - entrypt = (parallel_worker_main_type) - load_external_function(library_name, function_name, true, NULL); - entrypt(seg, toc); -} - -/* * Update shared memory with the ending location of the last WAL record we * wrote, if it's greater than the value already stored there. */ @@ -1163,3 +1136,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) fps->last_xlog_end = last_xlog_end; SpinLockRelease(&fps->mutex); } + +/* + * Look up (and possibly load) a parallel worker entry point function. + * + * For functions contained in the core code, we use library name "postgres" + * and consult the InternalParallelWorkers array. External functions are + * looked up, and loaded if necessary, using load_external_function(). + * + * The point of this is to pass function names as strings across process + * boundaries. We can't pass actual function addresses because of the + * possibility that the function has been loaded at a different address + * in a different process. This is obviously a hazard for functions in + * loadable libraries, but it can happen even for functions in the core code + * on platforms using EXEC_BACKEND (e.g., Windows). + * + * At some point it might be worthwhile to get rid of InternalParallelWorkers[] + * in favor of applying load_external_function() for core functions too; + * but that raises portability issues that are not worth addressing now. + */ +static parallel_worker_main_type +LookupParallelWorkerFunction(const char *libraryname, const char *funcname) +{ + /* + * If the function is to be loaded from postgres itself, search the + * InternalParallelWorkers array. + */ + if (strcmp(libraryname, "postgres") == 0) + { + int i; + + for (i = 0; i < lengthof(InternalParallelWorkers); i++) + { + if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0) + return InternalParallelWorkers[i].fn_addr; + } + + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); + } + + /* Otherwise load from external library. */ + return (parallel_worker_main_type) + load_external_function(libraryname, funcname, true, NULL); +} |