diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2016-09-27 14:29:12 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2016-09-27 14:29:12 -0400 |
commit | 0109ab27609c0d58c1eddc6b799077d0968083de (patch) | |
tree | c6b167cc4265e02c10d9090bc79aab8cf41b5ea8 /src/bin/pg_dump/parallel.c | |
parent | fb03d08a89e81a68585f17fd8e7f21c618f4e851 (diff) | |
download | postgresql-0109ab27609c0d58c1eddc6b799077d0968083de.tar.gz postgresql-0109ab27609c0d58c1eddc6b799077d0968083de.zip |
Make struct ParallelSlot private within pg_dump/parallel.c.
The only field of this struct that other files have any need to touch
is the pointer to the TocEntry a worker is working on. (Well,
pg_backup_archiver.c is actually looking at workerStatus too, but that
can be finessed by specifying that the TocEntry pointer is NULL for a
non-busy worker.)
Hence, move out the TocEntry pointers to a separate array within
struct ParallelState, and then we can make struct ParallelSlot private.
I noted the possibility of this previously, but hadn't got round to
actually doing it.
Discussion: <1188.1464544443@sss.pgh.pa.us>
Diffstat (limited to 'src/bin/pg_dump/parallel.c')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 62 |
1 files changed, 54 insertions, 8 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 0e2bfa106a7..5630dc626d7 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -45,6 +45,8 @@ * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended + * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING + * state, and must be NULL in other states. */ #include "postgres_fe.h" @@ -71,6 +73,45 @@ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ +/* Worker process statuses */ +typedef enum +{ + WRKR_IDLE, + WRKR_WORKING, + WRKR_TERMINATED +} T_WorkerStatus; + +/* + * Private per-parallel-worker state (typedef for this is in parallel.h). + * + * Much of this is valid only in the master process (or, on Windows, should + * be touched only by the master thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ +struct ParallelSlot +{ + T_WorkerStatus workerStatus; /* see enum above */ + + /* These fields are valid if workerStatus == WRKR_WORKING: */ + ParallelCompletionPtr callback; /* function to call on completion */ + void *callback_data; /* passthru data for it */ + + ArchiveHandle *AH; /* Archive data worker is using */ + + int pipeRead; /* master's end of the pipes */ + int pipeWrite; + int pipeRevRead; /* child's end of the pipes */ + int pipeRevWrite; + + /* Child process/thread identity info: */ +#ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; +#else + pid_t pid; +#endif +}; + #ifdef WIN32 /* @@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate) } #endif /* WIN32 */ - /* On all platforms, update workerStatus as well */ + /* On all platforms, update workerStatus and te[] as well */ Assert(j < pstate->numWorkers); slot->workerStatus = WRKR_TERMINATED; + pstate->te[j] = NULL; } } @@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH) { ParallelState *pstate; int i; - const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); Assert(AH->public.numWorkers > 0); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate->numWorkers = AH->public.numWorkers; + pstate->te = NULL; pstate->parallelSlot = NULL; if (AH->public.numWorkers == 1) return pstate; - pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); - memset((void *) pstate->parallelSlot, 0, slotSize); + pstate->te = (TocEntry **) + pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); + pstate->parallelSlot = (ParallelSlot *) + pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot)); #ifdef WIN32 /* Make fmtId() and fmtQualifiedId() use thread-local storage */ @@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH) "could not create communication channels: %s\n", strerror(errno)); + pstate->te[i] = NULL; /* just for safety */ + slot->workerStatus = WRKR_IDLE; slot->AH = NULL; - slot->te = NULL; slot->callback = NULL; slot->callback_data = NULL; @@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) set_cancel_pstate(NULL); /* Release state (mere neatnik-ism, since we're about to terminate) */ + free(pstate->te); free(pstate->parallelSlot); free(pstate); } @@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH, /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; - pstate->parallelSlot[worker].te = te; pstate->parallelSlot[worker].callback = callback; pstate->parallelSlot[worker].callback_data = callback_data; + pstate->te[worker] = te; } /* @@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) if (messageStartsWith(msg, "OK ")) { ParallelSlot *slot = &pstate->parallelSlot[worker]; - TocEntry *te = slot->te; + TocEntry *te = pstate->te[worker]; int status; status = parseWorkerResponse(AH, te, msg); slot->callback(AH, te, status, slot->callback_data); slot->workerStatus = WRKR_IDLE; - slot->te = NULL; + pstate->te[worker] = NULL; } else exit_horribly(modulename, |