aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-03-23 08:36:36 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-03-23 08:55:37 -0400
commit7c4f52409a8c7d85ed169bbbc1f6092274d03920 (patch)
treefa3dc592bb2855e5cc0a200f4c408b4c8d299be5 /src/backend/replication/logical/launcher.c
parent707576b571f05ec5b89adb65964d55f3ccccbd1b (diff)
downloadpostgresql-7c4f52409a8c7d85ed169bbbc1f6092274d03920.tar.gz
postgresql-7c4f52409a8c7d85ed169bbbc1f6092274d03920.zip
Logical replication support for initial data copy
Add functionality for a new subscription to copy the initial data in the tables and then sync with the ongoing apply process. For the copying, add a new internal COPY option to have the COPY source data provided by a callback function. The initial data copy works on the subscriber by receiving COPY data from the publisher and then providing it locally into a COPY that writes to the destination table. A WAL receiver can now execute full SQL commands. This is used here to obtain information about tables and publications. Several new options were added to CREATE and ALTER SUBSCRIPTION to control whether and when initial table syncing happens. Change pg_dump option --no-create-subscription-slots to --no-subscription-connect and use the new CREATE SUBSCRIPTION ... NOCONNECT option for that. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Erik Rijkers <er@xs4all.nl>
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c130
1 files changed, 107 insertions, 23 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 20b43626ddd..255b22597b6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -27,6 +27,7 @@
#include "access/xact.h"
#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
#include "libpq/pqsignal.h"
@@ -56,6 +57,8 @@
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
int max_logical_replication_workers = 4;
+int max_sync_workers_per_subscription = 2;
+
LogicalRepWorker *MyLogicalRepWorker = NULL;
typedef struct LogicalRepCtxStruct
@@ -198,20 +201,22 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
- * subscription id.
+ * subscription id and relid.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid)
+logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
+ if (w->subid == subid && w->relid == relid &&
+ (!only_running || (w->proc && IsBackendPid(w->proc->pid))))
{
res = w;
break;
@@ -225,7 +230,8 @@ logicalrep_worker_find(Oid subid)
* Start new apply background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
+logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+ Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -270,10 +276,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
}
/* Prepare the worker info. */
- memset(worker, 0, sizeof(LogicalRepWorker));
+ worker->proc = NULL;
worker->dbid = dbid;
worker->userid = userid;
worker->subid = subid;
+ worker->relid = relid;
+ worker->relstate = SUBREL_STATE_UNKNOWN;
+ worker->relstate_lsn = InvalidXLogRecPtr;
+ worker->last_lsn = InvalidXLogRecPtr;
+ TIMESTAMP_NOBEGIN(worker->last_send_time);
+ TIMESTAMP_NOBEGIN(worker->last_recv_time);
+ worker->reply_lsn = InvalidXLogRecPtr;
+ TIMESTAMP_NOBEGIN(worker->reply_time);
LWLockRelease(LogicalRepWorkerLock);
@@ -282,8 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyWorkerMain;
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u", subid);
+ if (OidIsValid(relid))
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication worker for subscription %u sync %u", subid, relid);
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication worker for subscription %u", subid);
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
@@ -307,13 +325,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
* slot.
*/
void
-logicalrep_worker_stop(Oid subid)
+logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid);
+ worker = logicalrep_worker_find(subid, relid, false);
/* No worker, nothing to do. */
if (!worker)
@@ -396,6 +414,31 @@ logicalrep_worker_stop(Oid subid)
}
/*
+ * Wake up (using latch) the logical replication worker.
+ */
+void
+logicalrep_worker_wakeup(Oid subid, Oid relid)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ worker = logicalrep_worker_find(subid, relid, true);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (worker)
+ logicalrep_worker_wakeup_ptr(worker);
+}
+
+/*
+ * Wake up (using latch) the logical replication worker.
+ */
+void
+logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
+{
+ SetLatch(&worker->proc->procLatch);
+}
+
+/*
* Attach to a slot.
*/
void
@@ -458,6 +501,29 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
}
/*
+ * Count the number of registered (not necessarily running) sync workers
+ * for a subscription.
+ */
+int
+logicalrep_sync_worker_count(Oid subid)
+{
+ int i;
+ int res = 0;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ if (w->subid == subid && OidIsValid(w->relid))
+ res++;
+ }
+
+ return res;
+}
+
+/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -512,7 +578,20 @@ ApplyLauncherShmemInit(void)
&found);
if (!found)
+ {
+ int slot;
+
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
+
+ /* Initialize memory and spin locks for each worker slot. */
+ for (slot = 0; slot < max_logical_replication_workers; slot++)
+ {
+ LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
+
+ memset(worker, 0, sizeof(LogicalRepWorker));
+ SpinLockInit(&worker->relmutex);
+ }
+ }
}
/*
@@ -607,12 +686,13 @@ ApplyLauncherMain(Datum main_arg)
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid);
+ w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (sub->enabled && w == NULL)
{
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
+ logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
/* Limit to one worker per mainloop cycle. */
@@ -664,7 +744,7 @@ ApplyLauncherMain(Datum main_arg)
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_COLS 7
+#define PG_STAT_GET_SUBSCRIPTION_COLS 8
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -723,27 +803,31 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
MemSet(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(worker.subid);
- values[1] = Int32GetDatum(worker_pid);
+ if (OidIsValid(worker.relid))
+ values[1] = ObjectIdGetDatum(worker.relid);
+ else
+ nulls[1] = true;
+ values[2] = Int32GetDatum(worker_pid);
if (XLogRecPtrIsInvalid(worker.last_lsn))
- nulls[2] = true;
+ nulls[3] = true;
else
- values[2] = LSNGetDatum(worker.last_lsn);
+ values[3] = LSNGetDatum(worker.last_lsn);
if (worker.last_send_time == 0)
- nulls[3] = true;
+ nulls[4] = true;
else
- values[3] = TimestampTzGetDatum(worker.last_send_time);
+ values[4] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
- nulls[4] = true;
+ nulls[5] = true;
else
- values[4] = TimestampTzGetDatum(worker.last_recv_time);
+ values[5] = TimestampTzGetDatum(worker.last_recv_time);
if (XLogRecPtrIsInvalid(worker.reply_lsn))
- nulls[5] = true;
+ nulls[6] = true;
else
- values[5] = LSNGetDatum(worker.reply_lsn);
+ values[6] = LSNGetDatum(worker.reply_lsn);
if (worker.reply_time == 0)
- nulls[6] = true;
+ nulls[7] = true;
else
- values[6] = TimestampTzGetDatum(worker.reply_time);
+ values[7] = TimestampTzGetDatum(worker.reply_time);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);