diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f6c0c28672e..ff887ea437a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -100,7 +100,7 @@ typedef struct SlotErrCallbackArg static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; @@ -1517,7 +1517,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); if (len != 0) { @@ -1597,7 +1597,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -1627,7 +1627,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { TimeLineID tli; - walrcv_endstreaming(wrconn, &tli); + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); break; } @@ -1790,7 +1790,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) (uint32) (flushpos >> 32), (uint32) flushpos ); - walrcv_send(wrconn, reply_message->data, reply_message->len); + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); if (recvpos > last_recvpos) last_recvpos = recvpos; @@ -2088,9 +2089,9 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); - wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, - &err); - if (wrconn == NULL) + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -2098,7 +2099,7 @@ ApplyWorkerMain(Datum main_arg) * We don't really use the output identify_system for anything but it * does some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(wrconn, &startpointTLI); + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); } /* @@ -2117,7 +2118,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.publication_names = MySubscription->publications; /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); |