diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 34 |
1 files changed, 30 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3fea0a0206e..d3356bc84ee 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; bool should_exit = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, true); } } else @@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -623,6 +642,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + + if (started_tx) { /* @@ -1414,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); CommitTransactionCommand(); pgstat_report_stat(true); @@ -1547,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); CommitTransactionCommand(); |