aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c130
1 files changed, 107 insertions, 23 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 20b43626ddd..255b22597b6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -27,6 +27,7 @@
#include "access/xact.h"
#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
#include "libpq/pqsignal.h"
@@ -56,6 +57,8 @@
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
int max_logical_replication_workers = 4;
+int max_sync_workers_per_subscription = 2;
+
LogicalRepWorker *MyLogicalRepWorker = NULL;
typedef struct LogicalRepCtxStruct
@@ -198,20 +201,22 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
- * subscription id.
+ * subscription id and relid.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid)
+logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
+ if (w->subid == subid && w->relid == relid &&
+ (!only_running || (w->proc && IsBackendPid(w->proc->pid))))
{
res = w;
break;
@@ -225,7 +230,8 @@ logicalrep_worker_find(Oid subid)
* Start new apply background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
+logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+ Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -270,10 +276,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
}
/* Prepare the worker info. */
- memset(worker, 0, sizeof(LogicalRepWorker));
+ worker->proc = NULL;
worker->dbid = dbid;
worker->userid = userid;
worker->subid = subid;
+ worker->relid = relid;
+ worker->relstate = SUBREL_STATE_UNKNOWN;
+ worker->relstate_lsn = InvalidXLogRecPtr;
+ worker->last_lsn = InvalidXLogRecPtr;
+ TIMESTAMP_NOBEGIN(worker->last_send_time);
+ TIMESTAMP_NOBEGIN(worker->last_recv_time);
+ worker->reply_lsn = InvalidXLogRecPtr;
+ TIMESTAMP_NOBEGIN(worker->reply_time);
LWLockRelease(LogicalRepWorkerLock);
@@ -282,8 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyWorkerMain;
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u", subid);
+ if (OidIsValid(relid))
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication worker for subscription %u sync %u", subid, relid);
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication worker for subscription %u", subid);
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
@@ -307,13 +325,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
* slot.
*/
void
-logicalrep_worker_stop(Oid subid)
+logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid);
+ worker = logicalrep_worker_find(subid, relid, false);
/* No worker, nothing to do. */
if (!worker)
@@ -396,6 +414,31 @@ logicalrep_worker_stop(Oid subid)
}
/*
+ * Wake up (using latch) the logical replication worker.
+ */
+void
+logicalrep_worker_wakeup(Oid subid, Oid relid)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ worker = logicalrep_worker_find(subid, relid, true);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (worker)
+ logicalrep_worker_wakeup_ptr(worker);
+}
+
+/*
+ * Wake up (using latch) the logical replication worker.
+ */
+void
+logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
+{
+ SetLatch(&worker->proc->procLatch);
+}
+
+/*
* Attach to a slot.
*/
void
@@ -458,6 +501,29 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
}
/*
+ * Count the number of registered (not necessarily running) sync workers
+ * for a subscription.
+ */
+int
+logicalrep_sync_worker_count(Oid subid)
+{
+ int i;
+ int res = 0;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ if (w->subid == subid && OidIsValid(w->relid))
+ res++;
+ }
+
+ return res;
+}
+
+/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -512,7 +578,20 @@ ApplyLauncherShmemInit(void)
&found);
if (!found)
+ {
+ int slot;
+
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
+
+ /* Initialize memory and spin locks for each worker slot. */
+ for (slot = 0; slot < max_logical_replication_workers; slot++)
+ {
+ LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
+
+ memset(worker, 0, sizeof(LogicalRepWorker));
+ SpinLockInit(&worker->relmutex);
+ }
+ }
}
/*
@@ -607,12 +686,13 @@ ApplyLauncherMain(Datum main_arg)
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid);
+ w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (sub->enabled && w == NULL)
{
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
+ logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
/* Limit to one worker per mainloop cycle. */
@@ -664,7 +744,7 @@ ApplyLauncherMain(Datum main_arg)
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_COLS 7
+#define PG_STAT_GET_SUBSCRIPTION_COLS 8
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -723,27 +803,31 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
MemSet(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(worker.subid);
- values[1] = Int32GetDatum(worker_pid);
+ if (OidIsValid(worker.relid))
+ values[1] = ObjectIdGetDatum(worker.relid);
+ else
+ nulls[1] = true;
+ values[2] = Int32GetDatum(worker_pid);
if (XLogRecPtrIsInvalid(worker.last_lsn))
- nulls[2] = true;
+ nulls[3] = true;
else
- values[2] = LSNGetDatum(worker.last_lsn);
+ values[3] = LSNGetDatum(worker.last_lsn);
if (worker.last_send_time == 0)
- nulls[3] = true;
+ nulls[4] = true;
else
- values[3] = TimestampTzGetDatum(worker.last_send_time);
+ values[4] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
- nulls[4] = true;
+ nulls[5] = true;
else
- values[4] = TimestampTzGetDatum(worker.last_recv_time);
+ values[5] = TimestampTzGetDatum(worker.last_recv_time);
if (XLogRecPtrIsInvalid(worker.reply_lsn))
- nulls[5] = true;
+ nulls[6] = true;
else
- values[5] = LSNGetDatum(worker.reply_lsn);
+ values[6] = LSNGetDatum(worker.reply_lsn);
if (worker.reply_time == 0)
- nulls[6] = true;
+ nulls[7] = true;
else
- values[6] = TimestampTzGetDatum(worker.reply_time);
+ values[7] = TimestampTzGetDatum(worker.reply_time);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);