aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/launcher.c4
-rw-r--r--src/backend/replication/logical/tablesync.c35
-rw-r--r--src/backend/replication/logical/worker.c23
-rw-r--r--src/include/replication/worker_internal.h2
4 files changed, 36 insertions, 28 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 85f325c3896..e3b11daa897 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -643,8 +643,8 @@ static void
logicalrep_worker_onexit(int code, Datum arg)
{
/* Disconnect gracefully from the remote side. */
- if (wrconn)
- walrcv_disconnect(wrconn);
+ if (LogRepWorkerWalRcvConn)
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
logicalrep_worker_detach();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c7f87..67f907cdd96 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /*
+ * End streaming so that LogRepWorkerWalRcvConn can be used to drop
+ * the slot.
+ */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f157172b2..1432554d5a7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
/*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(LogRepWorkerWalRcvConn,
+ reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
- &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
}
/*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e5a9b..179eb43900d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
extern MemoryContext ApplyContext;
/* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;