aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c26
1 files changed, 15 insertions, 11 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c27d9705895..a3989d40dd1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -288,7 +288,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
finish_sync_worker();
}
else
@@ -584,7 +584,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -657,7 +657,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -695,9 +696,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -784,7 +787,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -851,8 +854,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -897,7 +901,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* inside the transaction so that we can use the snapshot made
* by the slot to get existing data.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL "
"REPEATABLE READ", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -914,14 +918,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* that is consistent with the lsn used by the slot to start
* decoding.
*/
- walrcv_create_slot(wrconn, slotname, true,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos);
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher"),