diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 130 |
1 files changed, 107 insertions, 23 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 20b43626ddd..255b22597b6 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -27,6 +27,7 @@ #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "libpq/pqsignal.h" @@ -56,6 +57,8 @@ #define DEFAULT_NAPTIME_PER_CYCLE 180000L int max_logical_replication_workers = 4; +int max_sync_workers_per_subscription = 2; + LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct @@ -198,20 +201,22 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id. + * subscription id and relid. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid) +logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid)) + if (w->subid == subid && w->relid == relid && + (!only_running || (w->proc && IsBackendPid(w->proc->pid)))) { res = w; break; @@ -225,7 +230,8 @@ logicalrep_worker_find(Oid subid) * Start new apply background worker. */ void -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) +logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, + Oid relid) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -270,10 +276,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) } /* Prepare the worker info. */ - memset(worker, 0, sizeof(LogicalRepWorker)); + worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; worker->subid = subid; + worker->relid = relid; + worker->relstate = SUBREL_STATE_UNKNOWN; + worker->relstate_lsn = InvalidXLogRecPtr; + worker->last_lsn = InvalidXLogRecPtr; + TIMESTAMP_NOBEGIN(worker->last_send_time); + TIMESTAMP_NOBEGIN(worker->last_recv_time); + worker->reply_lsn = InvalidXLogRecPtr; + TIMESTAMP_NOBEGIN(worker->reply_time); LWLockRelease(LogicalRepWorkerLock); @@ -282,8 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; bgw.bgw_main = ApplyWorkerMain; - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u", subid); + if (OidIsValid(relid)) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker for subscription %u sync %u", subid, relid); + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker for subscription %u", subid); bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; @@ -307,13 +325,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) * slot. */ void -logicalrep_worker_stop(Oid subid) +logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid); + worker = logicalrep_worker_find(subid, relid, false); /* No worker, nothing to do. */ if (!worker) @@ -396,6 +414,31 @@ logicalrep_worker_stop(Oid subid) } /* + * Wake up (using latch) the logical replication worker. + */ +void +logicalrep_worker_wakeup(Oid subid, Oid relid) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, true); + LWLockRelease(LogicalRepWorkerLock); + + if (worker) + logicalrep_worker_wakeup_ptr(worker); +} + +/* + * Wake up (using latch) the logical replication worker. + */ +void +logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) +{ + SetLatch(&worker->proc->procLatch); +} + +/* * Attach to a slot. */ void @@ -458,6 +501,29 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) } /* + * Count the number of registered (not necessarily running) sync workers + * for a subscription. + */ +int +logicalrep_sync_worker_count(Oid subid) +{ + int i; + int res = 0; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->subid == subid && OidIsValid(w->relid)) + res++; + } + + return res; +} + +/* * ApplyLauncherShmemSize * Compute space needed for replication launcher shared memory */ @@ -512,7 +578,20 @@ ApplyLauncherShmemInit(void) &found); if (!found) + { + int slot; + memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); + + /* Initialize memory and spin locks for each worker slot. */ + for (slot = 0; slot < max_logical_replication_workers; slot++) + { + LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; + + memset(worker, 0, sizeof(LogicalRepWorker)); + SpinLockInit(&worker->relmutex); + } + } } /* @@ -607,12 +686,13 @@ ApplyLauncherMain(Datum main_arg) LogicalRepWorker *w; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (sub->enabled && w == NULL) { - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); last_start_time = now; wait_time = wal_retrieve_retry_interval; /* Limit to one worker per mainloop cycle. */ @@ -664,7 +744,7 @@ ApplyLauncherMain(Datum main_arg) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 7 +#define PG_STAT_GET_SUBSCRIPTION_COLS 8 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -723,27 +803,31 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) MemSet(nulls, 0, sizeof(nulls)); values[0] = ObjectIdGetDatum(worker.subid); - values[1] = Int32GetDatum(worker_pid); + if (OidIsValid(worker.relid)) + values[1] = ObjectIdGetDatum(worker.relid); + else + nulls[1] = true; + values[2] = Int32GetDatum(worker_pid); if (XLogRecPtrIsInvalid(worker.last_lsn)) - nulls[2] = true; + nulls[3] = true; else - values[2] = LSNGetDatum(worker.last_lsn); + values[3] = LSNGetDatum(worker.last_lsn); if (worker.last_send_time == 0) - nulls[3] = true; + nulls[4] = true; else - values[3] = TimestampTzGetDatum(worker.last_send_time); + values[4] = TimestampTzGetDatum(worker.last_send_time); if (worker.last_recv_time == 0) - nulls[4] = true; + nulls[5] = true; else - values[4] = TimestampTzGetDatum(worker.last_recv_time); + values[5] = TimestampTzGetDatum(worker.last_recv_time); if (XLogRecPtrIsInvalid(worker.reply_lsn)) - nulls[5] = true; + nulls[6] = true; else - values[5] = LSNGetDatum(worker.reply_lsn); + values[6] = LSNGetDatum(worker.reply_lsn); if (worker.reply_time == 0) - nulls[6] = true; + nulls[7] = true; else - values[6] = TimestampTzGetDatum(worker.reply_time); + values[7] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(tupstore, tupdesc, values, nulls); |