aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-04-15 16:23:27 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-04-15 16:23:27 -0400
commit9c225acf0b97a7a3a5ca1a12ee0e89c98cf16442 (patch)
tree72ce9833271db97603c1847b762f387dd2ec978f /src/backend/access/transam/parallel.c
parentd51279433cdf30d62e92bb5f5f1790e9014a0342 (diff)
downloadpostgresql-9c225acf0b97a7a3a5ca1a12ee0e89c98cf16442.tar.gz
postgresql-9c225acf0b97a7a3a5ca1a12ee0e89c98cf16442.zip
Avoid passing function pointers across process boundaries.
This back-patches commit 32470825d36d99a81347ee36c181d609c952c061 into 9.6, primarily to make buildfarm member culicidae happy. Unlike the HEAD patch, avoid changing the existing API of CreateParallelContext; instead we just switch to using CreateParallelContextForExternalFunction, even for core functions. Petr Jelinek, with a bunch of basically-cosmetic adjustments by me Discussion: https://postgr.es/m/548f9c1d-eafa-e3fa-9da8-f0cc2f654e60@2ndquadrant.com
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c145
1 files changed, 104 insertions, 41 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index d2bed727057..867ccf855d9 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -19,6 +19,7 @@
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/async.h"
+#include "executor/execParallel.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
@@ -60,7 +61,7 @@
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
-#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -76,7 +77,7 @@ typedef struct FixedParallelState
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;
- /* Entrypoint for parallel workers. */
+ /* Entrypoint for parallel workers (deprecated)! */
parallel_worker_main_type entrypoint;
/* Mutex protects remaining fields. */
@@ -106,16 +107,36 @@ static FixedParallelState *MyFixedParallelState;
/* List of active parallel contexts. */
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+/*
+ * List of internal parallel worker entry points. We need this for
+ * reasons explained in LookupParallelWorkerFunction(), below.
+ */
+static const struct
+{
+ const char *fn_name;
+ parallel_worker_main_type fn_addr;
+} InternalParallelWorkers[] =
+
+{
+ {
+ "ParallelQueryMain", ParallelQueryMain
+ }
+};
+
/* Private functions. */
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
-static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
+static parallel_worker_main_type LookupParallelWorkerFunction(char *libraryname, char *funcname);
/*
* Establish a new parallel context. This should be done after entering
* parallel mode, and (unless there is an error) the context should be
* destroyed before exiting the current subtransaction.
+ *
+ * NB: specifying the entrypoint as a function address is unportable.
+ * This will go away in Postgres 10, in favor of the API provided by
+ * CreateParallelContextForExternalFunction; in the meantime use that.
*/
ParallelContext *
CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
@@ -163,9 +184,9 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
}
/*
- * Establish a new parallel context that calls a function provided by an
- * extension. This works around the fact that the library might get mapped
- * at a different address in each backend.
+ * Establish a new parallel context that calls a function specified by name.
+ * Unlike CreateParallelContext, this is robust against possible differences
+ * in address space layout between different processes.
*/
ParallelContext *
CreateParallelContextForExternalFunction(char *library_name,
@@ -179,7 +200,7 @@ CreateParallelContextForExternalFunction(char *library_name,
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
/* Create the context. */
- pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
+ pcxt = CreateParallelContext(NULL, nworkers);
pcxt->library_name = pstrdup(library_name);
pcxt->function_name = pstrdup(function_name);
@@ -248,10 +269,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
- /* Estimate how much we'll need for extension entrypoint info. */
+ /* Estimate how much we'll need for entrypoint info. */
if (pcxt->library_name != NULL)
{
- Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
Assert(pcxt->function_name != NULL);
shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+ strlen(pcxt->function_name) + 2);
@@ -367,7 +387,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
- /* Serialize extension entrypoint information. */
+ /* Serialize entrypoint information. */
if (pcxt->library_name != NULL)
{
Size lnamelen = strlen(pcxt->library_name);
@@ -377,7 +397,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
+ strlen(pcxt->function_name) + 2);
strcpy(extensionstate, pcxt->library_name);
strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT,
extensionstate);
}
}
@@ -669,6 +689,10 @@ DestroyParallelContext(ParallelContext *pcxt)
}
/* Free memory. */
+ if (pcxt->library_name)
+ pfree(pcxt->library_name);
+ if (pcxt->function_name)
+ pfree(pcxt->function_name);
pfree(pcxt);
}
@@ -939,6 +963,8 @@ ParallelWorkerMain(Datum main_arg)
shm_mq *mq;
shm_mq_handle *mqh;
char *libraryspace;
+ char *entrypointstate;
+ parallel_worker_main_type entrypt;
char *gucspace;
char *combocidspace;
char *tsnapspace;
@@ -1038,6 +1064,25 @@ ParallelWorkerMain(Datum main_arg)
Assert(libraryspace != NULL);
RestoreLibraryState(libraryspace);
+ /*
+ * Identify the entry point to be called. In theory this could result in
+ * loading an additional library, though most likely the entry point is in
+ * the core backend or in a library we just loaded.
+ */
+ entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
+ if (entrypointstate != NULL)
+ {
+ char *library_name;
+ char *function_name;
+
+ library_name = entrypointstate;
+ function_name = entrypointstate + strlen(library_name) + 1;
+
+ entrypt = LookupParallelWorkerFunction(library_name, function_name);
+ }
+ else
+ entrypt = fps->entrypoint;
+
/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id);
@@ -1101,10 +1146,11 @@ ParallelWorkerMain(Datum main_arg)
/*
* Time to do the real work: invoke the caller-supplied code.
*
- * If you get a crash at this line, see the comments for
- * ParallelExtensionTrampoline.
+ * If you get a crash at this line, try using
+ * CreateParallelContextForExternalFunction instead of
+ * CreateParallelContext.
*/
- fps->entrypoint(seg, toc);
+ entrypt(seg, toc);
/* Must exit parallel mode to pop active snapshot. */
ExitParallelMode();
@@ -1120,33 +1166,6 @@ ParallelWorkerMain(Datum main_arg)
}
/*
- * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
- * function living in a dynamically loaded module, because the module might
- * not be loaded in every process, or might be loaded but not at the same
- * address. To work around that problem, CreateParallelContextForExtension()
- * arranges to call this function rather than calling the extension-provided
- * function directly; and this function then looks up the real entrypoint and
- * calls it.
- */
-static void
-ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
-{
- char *extensionstate;
- char *library_name;
- char *function_name;
- parallel_worker_main_type entrypt;
-
- extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
- Assert(extensionstate != NULL);
- library_name = extensionstate;
- function_name = extensionstate + strlen(library_name) + 1;
-
- entrypt = (parallel_worker_main_type)
- load_external_function(library_name, function_name, true, NULL);
- entrypt(seg, toc);
-}
-
-/*
* Update shared memory with the ending location of the last WAL record we
* wrote, if it's greater than the value already stored there.
*/
@@ -1161,3 +1180,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
fps->last_xlog_end = last_xlog_end;
SpinLockRelease(&fps->mutex);
}
+
+/*
+ * Look up (and possibly load) a parallel worker entry point function.
+ *
+ * For functions contained in the core code, we use library name "postgres"
+ * and consult the InternalParallelWorkers array. External functions are
+ * looked up, and loaded if necessary, using load_external_function().
+ *
+ * The point of this is to pass function names as strings across process
+ * boundaries. We can't pass actual function addresses because of the
+ * possibility that the function has been loaded at a different address
+ * in a different process. This is obviously a hazard for functions in
+ * loadable libraries, but it can happen even for functions in the core code
+ * on platforms using EXEC_BACKEND (e.g., Windows).
+ *
+ * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
+ * in favor of applying load_external_function() for core functions too;
+ * but that raises portability issues that are not worth addressing now.
+ */
+static parallel_worker_main_type
+LookupParallelWorkerFunction(char *libraryname, char *funcname)
+{
+ /*
+ * If the function is to be loaded from postgres itself, search the
+ * InternalParallelWorkers array.
+ */
+ if (strcmp(libraryname, "postgres") == 0)
+ {
+ int i;
+
+ for (i = 0; i < lengthof(InternalParallelWorkers); i++)
+ {
+ if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
+ return InternalParallelWorkers[i].fn_addr;
+ }
+
+ /* We can only reach this by programming error. */
+ elog(ERROR, "internal function \"%s\" not found", funcname);
+ }
+
+ /* Otherwise load from external library. */
+ return (parallel_worker_main_type)
+ load_external_function(libraryname, funcname, true, NULL);
+}