aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/launcher.c31
-rw-r--r--src/backend/replication/logical/tablesync.c3
-rw-r--r--src/include/replication/worker_internal.h22
-rw-r--r--src/tools/pgindent/typedefs.list1
5 files changed, 43 insertions, 17 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c4c1f..4e8ee2973e0 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
return NULL;
}
- launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+ MyLogicalRepWorker->dbid,
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f951..7cc0a16d3bc 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
* Returns true on success, false on failure.
*/
bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+ Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int nsyncworkers;
int nparallelapplyworkers;
TimestampTz now;
- bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
- /* Sanity check - tablesync worker cannot be a subworker */
- Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+ bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+ /*----------
+ * Sanity checks:
+ * - must be valid worker type
+ * - tablesync workers are only ones to have relid
+ * - parallel apply worker is the only kind of subworker
+ */
+ Assert(wtype != WORKERTYPE_UNKNOWN);
+ Assert(is_tablesync_worker == OidIsValid(relid));
+ Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -427,6 +436,7 @@ retry:
}
/* Prepare the worker slot. */
+ worker->type = wtype;
worker->launch_time = now;
worker->in_use = true;
worker->generation++;
@@ -466,7 +476,7 @@ retry:
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
}
- else if (OidIsValid(relid))
+ else if (is_tablesync_worker)
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && OidIsValid(w->relid))
+ if (w->subid == subid && isTablesyncWorker(w))
res++;
}
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID);
}
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a7750653..67bdd14095e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a7117c0c..a428663859b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
#include "storage/shm_toc.h"
#include "storage/spin.h"
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+ WORKERTYPE_UNKNOWN = 0,
+ WORKERTYPE_TABLESYNC,
+ WORKERTYPE_APPLY,
+ WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
typedef struct LogicalRepWorker
{
+ /* What type of worker is this? */
+ LogicalRepWorkerType type;
+
/* Time at which this worker was launched. */
TimestampTz launch_time;
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+ Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
static inline bool
am_tablesync_worker(void)
{
- return OidIsValid(MyLogicalRepWorker->relid);
+ return isTablesyncWorker(MyLogicalRepWorker);
}
static inline bool
am_leader_apply_worker(void)
{
- return (!am_tablesync_worker() &&
- !isParallelApplyWorker(MyLogicalRepWorker));
+ return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
}
static inline bool
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc2a77..52a8789cc4d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
LogicalRepTupleData
LogicalRepTyp
LogicalRepWorker
+LogicalRepWorkerType
LogicalRewriteMappingData
LogicalTape
LogicalTapeSet