aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c25
1 files changed, 17 insertions, 8 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 09b3e8b32ac..38dfce71296 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -14,7 +14,7 @@
* The initial data synchronization is done separately for each table,
* in a separate apply worker that only fetches the initial snapshot data
* from the publisher and then synchronizes the position in the stream with
- * the main apply worker.
+ * the leader apply worker.
*
* There are several reasons for doing the synchronization this way:
* - It allows us to parallelize the initial data synchronization
@@ -153,7 +153,7 @@ finish_sync_worker(void)
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
- /* Find the main apply worker and signal it. */
+ /* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
@@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
- rstate->relid);
+ rstate->relid,
+ DSM_HANDLE_INVALID);
hentry->last_start_time = now;
}
}
@@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
+ /*
+ * Skip for parallel apply workers because they only operate on tables
+ * that are in a READY state. See pa_can_start() and
+ * should_apply_changes_for_rel().
+ */
+ if (am_parallel_apply_worker())
+ return;
+
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
@@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Here we use the slot name instead of the subscription name as the
- * application_name, so that it is different from the main apply worker,
+ * application_name, so that it is different from the leader apply worker,
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
@@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* time this tablesync was launched.
*/
originid = replorigin_by_name(originname, false);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
@@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
true /* go backward */ , true /* WAL log */ );
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
}
else
@@ -1468,8 +1477,8 @@ copy_table_done:
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
- * Finally, wait until the main apply worker tells us to catch up and then
- * return to let LogicalRepApplyLoop do it.
+ * Finally, wait until the leader apply worker tells us to catch up and
+ * then return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;