aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2021-05-12 19:13:54 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2021-05-12 19:13:54 -0400
commitdb16c656478b815627a03bb0a31833391a733eb0 (patch)
treeb91687646b423a68e5f7e0592a1b9511b00d6512 /src/backend/replication/logical/tablesync.c
parent7dde98728a2ef6d48ef397ee783dd130fdb34e6b (diff)
downloadpostgresql-db16c656478b815627a03bb0a31833391a733eb0.tar.gz
postgresql-db16c656478b815627a03bb0a31833391a733eb0.zip
Rename the logical replication global "wrconn"
The worker.c global wrconn is only meant to be used by logical apply/ tablesync workers, but there are other variables with the same name. To reduce future confusion rename the global from "wrconn" to "LogRepWorkerWalRcvConn". While this is just cosmetic, it seems better to backpatch it all the way back to 10 where this code appeared, to avoid future backpatching issues. Author: Peter Smith <smithpb2250@gmail.com> Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c35
1 files changed, 21 insertions, 14 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c7f87..67f907cdd96 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /* End wal streaming so wrconn can be re-used to drop the slot. */
- walrcv_endstreaming(wrconn, &tli);
+ /*
+ * End streaming so that LogRepWorkerWalRcvConn can be used to drop
+ * the slot.
+ */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
/*
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
- ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
finish_sync_worker();
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for (;;)
{
/* Try read the data. */
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
CHECK_FOR_INTERRUPTS();
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+ (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
}
- res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name, so that it is different from the main apply worker,
* so that synchronous replication can distinguish them.
*/
- wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
- ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* ensures that both the replication slot we create (see below) and the
* COPY are consistent with each other.
*/
- res = walrcv_exec(wrconn,
+ res = walrcv_exec(LogRepWorkerWalRcvConn,
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table(rel);
PopActiveSnapshot();
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("table copy could not finish transaction on publisher: %s",