diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0638f5c7f87..67f907cdd96 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* End wal streaming so wrconn can be re-used to drop the slot. */ - walrcv_endstreaming(wrconn, &tli); + /* + * End streaming so that LogRepWorkerWalRcvConn can be used to drop + * the slot. + */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); /* * Cleanup the tablesync slot. @@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * otherwise, it won't be dropped till the corresponding subscription * is dropped. So passing missing_ok = false. */ - ReplicationSlotDropAtPubNode(wrconn, syncslotname, false); + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); finish_sync_worker(); } @@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread) for (;;) { /* Try read the data. */ - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); CHECK_FOR_INTERRUPTS(); @@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname, " AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(tableRow), tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname, " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, - (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""), + (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? + "AND a.attgenerated = ''" : ""), lrel->remoteid); - res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrRow), attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -841,7 +847,7 @@ copy_table(Relation rel) appendStringInfo(&cmd, " FROM %s) TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); } - res = walrcv_exec(wrconn, cmd.data, 0, NULL); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, @@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * application_name, so that it is different from the main apply worker, * so that synchronous replication can distinguish them. */ - wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); - if (wrconn == NULL) + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * breakdown then it wouldn't have succeeded so trying it next time * seems like a better bet. */ - ReplicationSlotDropAtPubNode(wrconn, slotname, true); + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true); } else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) { @@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * ensures that both the replication slot we create (see below) and the * COPY are consistent with each other. */ - res = walrcv_exec(wrconn, + res = walrcv_exec(LogRepWorkerWalRcvConn, "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", 0, NULL); if (res->status != WALRCV_OK_COMMAND) @@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(wrconn, slotname, false /* permanent */ , + walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) copy_table(rel); PopActiveSnapshot(); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("table copy could not finish transaction on publisher: %s", |