aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c34
1 files changed, 30 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3fea0a0206e..d3356bc84ee 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell *lc;
bool started_tx = false;
bool should_exit = false;
+ Relation rel = NULL;
Assert(!IsTransactionState());
@@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* worker to remove the origin tracking as if there is any
* error while dropping we won't restart it to drop the
* origin. So passing missing_ok = true.
+ *
+ * Lock the subscription and origin in the same order as we
+ * are doing during DDL commands to avoid deadlocks. See
+ * AlterSubscription_refresh.
*/
+ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+ 0, AccessShareLock);
+
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
rstate->relid,
originname,
@@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
- rstate->lsn);
+ rstate->lsn, true);
}
}
else
@@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won't
* be able to detect the waits on the latch.
+ *
+ * Also close any tables prior to the commit.
*/
+ if (rel)
+ {
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
CommitTransactionCommand();
pgstat_report_stat(false);
}
@@ -623,6 +642,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
+ /* Close table if opened */
+ if (rel)
+ table_close(rel, NoLock);
+
+
if (started_tx)
{
/*
@@ -1414,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();
pgstat_report_stat(true);
@@ -1547,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_FINISHEDCOPY,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ false);
CommitTransactionCommand();