diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 09b3e8b32ac..38dfce71296 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -14,7 +14,7 @@ * The initial data synchronization is done separately for each table, * in a separate apply worker that only fetches the initial snapshot data * from the publisher and then synchronizes the position in the stream with - * the main apply worker. + * the leader apply worker. * * There are several reasons for doing the synchronization this way: * - It allows us to parallelize the initial data synchronization @@ -153,7 +153,7 @@ finish_sync_worker(void) get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); - /* Find the main apply worker and signal it. */ + /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ @@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, - rstate->relid); + rstate->relid, + DSM_HANDLE_INVALID); hentry->last_start_time = now; } } @@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { + /* + * Skip for parallel apply workers because they only operate on tables + * that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + if (am_parallel_apply_worker()) + return; + if (am_tablesync_worker()) process_syncing_tables_for_sync(current_lsn); else @@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the main apply worker, + * application_name, so that it is different from the leader apply worker, * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = @@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * time this tablesync was launched. */ originid = replorigin_by_name(originname, false); - replorigin_session_setup(originid); + replorigin_session_setup(originid, 0); replorigin_session_origin = originid; *origin_startpos = replorigin_session_get_progress(false); @@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_session_setup(originid); + replorigin_session_setup(originid, 0); replorigin_session_origin = originid; } else @@ -1468,8 +1477,8 @@ copy_table_done: SpinLockRelease(&MyLogicalRepWorker->relmutex); /* - * Finally, wait until the main apply worker tells us to catch up and then - * return to let LogicalRepApplyLoop do it. + * Finally, wait until the leader apply worker tells us to catch up and + * then return to let LogicalRepApplyLoop do it. */ wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; |