diff options
Diffstat (limited to 'contrib/pg_upgrade/parallel.c')
-rw-r--r-- | contrib/pg_upgrade/parallel.c | 116 |
1 files changed, 59 insertions, 57 deletions
diff --git a/contrib/pg_upgrade/parallel.c b/contrib/pg_upgrade/parallel.c index 688a53112c2..8725170d1b5 100644 --- a/contrib/pg_upgrade/parallel.c +++ b/contrib/pg_upgrade/parallel.c @@ -20,7 +20,7 @@ #include <io.h> #endif -static int parallel_jobs; +static int parallel_jobs; #ifdef WIN32 /* @@ -28,31 +28,32 @@ static int parallel_jobs; * it can be passed to WaitForMultipleObjects(). We use two arrays * so the thread_handles array can be passed to WaitForMultipleObjects(). */ -HANDLE *thread_handles; +HANDLE *thread_handles; -typedef struct { - char log_file[MAXPGPATH]; - char opt_log_file[MAXPGPATH]; - char cmd[MAX_STRING]; +typedef struct +{ + char log_file[MAXPGPATH]; + char opt_log_file[MAXPGPATH]; + char cmd[MAX_STRING]; } exec_thread_arg; -typedef struct { - DbInfoArr *old_db_arr; - DbInfoArr *new_db_arr; - char old_pgdata[MAXPGPATH]; - char new_pgdata[MAXPGPATH]; - char old_tablespace[MAXPGPATH]; +typedef struct +{ + DbInfoArr *old_db_arr; + DbInfoArr *new_db_arr; + char old_pgdata[MAXPGPATH]; + char new_pgdata[MAXPGPATH]; + char old_tablespace[MAXPGPATH]; } transfer_thread_arg; exec_thread_arg **exec_thread_args; transfer_thread_arg **transfer_thread_args; /* track current thread_args struct so reap_child() can be used for all cases */ -void **cur_thread_args; - -DWORD win32_exec_prog(exec_thread_arg *args); -DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args); +void **cur_thread_args; +DWORD win32_exec_prog(exec_thread_arg *args); +DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args); #endif /* @@ -67,11 +68,12 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, { va_list args; char cmd[MAX_STRING]; + #ifndef WIN32 pid_t child; #else HANDLE child; - exec_thread_arg *new_arg; + exec_thread_arg *new_arg; #endif va_start(args, fmt); @@ -85,8 +87,8 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, { /* parallel */ #ifdef WIN32 - cur_thread_args = (void **)exec_thread_args; -#endif + cur_thread_args = (void **) exec_thread_args; +#endif /* harvest any dead children */ while (reap_child(false) == true) ; @@ -94,10 +96,10 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, /* must we wait for a dead child? */ if (parallel_jobs >= user_opts.jobs) reap_child(true); - + /* set this before we start the job */ parallel_jobs++; - + /* Ensure stdio state is quiesced before forking */ fflush(NULL); @@ -112,22 +114,22 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, #else if (thread_handles == NULL) { - int i; + int i; thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *)); /* - * For safety and performance, we keep the args allocated during - * the entire life of the process, and we don't free the args - * in a thread different from the one that allocated it. + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args in a + * thread different from the one that allocated it. */ for (i = 0; i < user_opts.jobs; i++) exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg)); } /* use first empty array element */ - new_arg = exec_thread_args[parallel_jobs-1]; + new_arg = exec_thread_args[parallel_jobs - 1]; /* Can only pass one pointer into the function, so use a struct */ strcpy(new_arg->log_file, log_file); @@ -135,11 +137,11 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, strcpy(new_arg->cmd, cmd); child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, - new_arg, 0, NULL); + new_arg, 0, NULL); if (child == 0) pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); - thread_handles[parallel_jobs-1] = child; + thread_handles[parallel_jobs - 1] = child; #endif } @@ -151,7 +153,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, DWORD win32_exec_prog(exec_thread_arg *args) { - int ret; + int ret; ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd); @@ -167,15 +169,16 @@ win32_exec_prog(exec_thread_arg *args) * This has the same API as transfer_all_new_dbs, except it does parallel execution * by transfering multiple tablespaces in parallel */ -void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, - char *old_pgdata, char *new_pgdata, - char *old_tablespace) +void +parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata, + char *old_tablespace) { #ifndef WIN32 pid_t child; #else HANDLE child; - transfer_thread_arg *new_arg; + transfer_thread_arg *new_arg; #endif if (user_opts.jobs <= 1) @@ -185,7 +188,7 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, { /* parallel */ #ifdef WIN32 - cur_thread_args = (void **)transfer_thread_args; + cur_thread_args = (void **) transfer_thread_args; #endif /* harvest any dead children */ while (reap_child(false) == true) @@ -194,10 +197,10 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, /* must we wait for a dead child? */ if (parallel_jobs >= user_opts.jobs) reap_child(true); - + /* set this before we start the job */ parallel_jobs++; - + /* Ensure stdio state is quiesced before forking */ fflush(NULL); @@ -217,22 +220,22 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, #else if (thread_handles == NULL) { - int i; + int i; thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *)); /* - * For safety and performance, we keep the args allocated during - * the entire life of the process, and we don't free the args - * in a thread different from the one that allocated it. + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args in a + * thread different from the one that allocated it. */ for (i = 0; i < user_opts.jobs; i++) transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg)); } /* use first empty array element */ - new_arg = transfer_thread_args[parallel_jobs-1]; + new_arg = transfer_thread_args[parallel_jobs - 1]; /* Can only pass one pointer into the function, so use a struct */ new_arg->old_db_arr = old_db_arr; @@ -242,11 +245,11 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, strcpy(new_arg->old_tablespace, old_tablespace); child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, - new_arg, 0, NULL); + new_arg, 0, NULL); if (child == 0) pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); - thread_handles[parallel_jobs-1] = child; + thread_handles[parallel_jobs - 1] = child; #endif } @@ -274,11 +277,11 @@ bool reap_child(bool wait_for_child) { #ifndef WIN32 - int work_status; - int ret; + int work_status; + int ret; #else - int thread_num; - DWORD res; + int thread_num; + DWORD res; #endif if (user_opts.jobs <= 1 || parallel_jobs == 0) @@ -293,18 +296,17 @@ reap_child(bool wait_for_child) if (WEXITSTATUS(work_status) != 0) pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno)); - #else /* wait for one to finish */ thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles, - false, wait_for_child ? INFINITE : 0); + false, wait_for_child ? INFINITE : 0); if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED) return false; /* compute thread index in active_threads */ thread_num -= WAIT_OBJECT_0; - + /* get the result */ GetExitCodeThread(thread_handles[thread_num], &res); if (res != 0) @@ -313,18 +315,18 @@ reap_child(bool wait_for_child) /* dispose of handle to stop leaks */ CloseHandle(thread_handles[thread_num]); - /* Move last slot into dead child's position */ + /* Move last slot into dead child's position */ if (thread_num != parallel_jobs - 1) { - void *tmp_args; - + void *tmp_args; + thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; /* - * We must swap the arg struct pointers because the thread we - * just moved is active, and we must make sure it is not - * reused by the next created thread. Instead, the new thread - * will use the arg struct of the thread that just died. + * We must swap the arg struct pointers because the thread we just + * moved is active, and we must make sure it is not reused by the next + * created thread. Instead, the new thread will use the arg struct of + * the thread that just died. */ tmp_args = cur_thread_args[thread_num]; cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1]; |