diff options
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 31 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 3 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 22 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
5 files changed, 43 insertions, 17 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1d4e83c4c1f..4e8ee2973e0 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -435,7 +435,8 @@ pa_launch_parallel_worker(void) return NULL; } - launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid, + launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY, + MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f951..7cc0a16d3bc 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running) * Returns true on success, false on failure. */ bool -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, +logicalrep_worker_launch(LogicalRepWorkerType wtype, + Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm) { BackgroundWorker bgw; @@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int nsyncworkers; int nparallelapplyworkers; TimestampTz now; - bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); - - /* Sanity check - tablesync worker cannot be a subworker */ - Assert(!(is_parallel_apply_worker && OidIsValid(relid))); + bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); + + /*---------- + * Sanity checks: + * - must be valid worker type + * - tablesync workers are only ones to have relid + * - parallel apply worker is the only kind of subworker + */ + Assert(wtype != WORKERTYPE_UNKNOWN); + Assert(is_tablesync_worker == OidIsValid(relid)); + Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -393,7 +402,7 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -427,6 +436,7 @@ retry: } /* Prepare the worker slot. */ + worker->type = wtype; worker->launch_time = now; worker->in_use = true; worker->generation++; @@ -466,7 +476,7 @@ retry: subid); snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); } - else if (OidIsValid(relid)) + else if (is_tablesync_worker) { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && OidIsValid(w->relid)) + if (w->subid == subid && isTablesyncWorker(w)) res++; } @@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg) (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID); } @@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (OidIsValid(worker.relid)) + if (isTablesyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 651a7750653..67bdd14095e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, + logicalrep_worker_launch(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 672a7117c0c..a428663859b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -27,9 +27,20 @@ #include "storage/shm_toc.h" #include "storage/spin.h" +/* Different types of worker */ +typedef enum LogicalRepWorkerType +{ + WORKERTYPE_UNKNOWN = 0, + WORKERTYPE_TABLESYNC, + WORKERTYPE_APPLY, + WORKERTYPE_PARALLEL_APPLY +} LogicalRepWorkerType; typedef struct LogicalRepWorker { + /* What type of worker is this? */ + LogicalRepWorkerType type; + /* Time at which this worker was launched. */ TimestampTz launch_time; @@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); -extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, +extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, + Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); extern void logicalrep_worker_stop(Oid subid, Oid relid); @@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid) +#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY) +#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC) static inline bool am_tablesync_worker(void) { - return OidIsValid(MyLogicalRepWorker->relid); + return isTablesyncWorker(MyLogicalRepWorker); } static inline bool am_leader_apply_worker(void) { - return (!am_tablesync_worker() && - !isParallelApplyWorker(MyLogicalRepWorker)); + return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 66823bc2a77..52a8789cc4d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData LogicalRepTupleData LogicalRepTyp LogicalRepWorker +LogicalRepWorkerType LogicalRewriteMappingData LogicalTape LogicalTapeSet |