aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/tablesync.c18
-rw-r--r--src/backend/replication/logical/worker.c2
2 files changed, 15 insertions, 5 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0823000f001..7e51076b376 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
+ bool started_tx = false;
Assert(!IsTransactionState());
@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
table_states = NIL;
StartTransactionCommand();
+ started_tx = true;
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
MemoryContextSwitchTo(oldctx);
- CommitTransactionCommand();
-
table_states_valid = true;
}
@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- StartTransactionCommand();
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
SetSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
rstate->lsn);
- CommitTransactionCommand();
}
}
else
@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
}
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+ }
}
/*
@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
+ pgstat_report_stat(false);
/*
* We want to do the table data sync in single
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2d7770d4dc1..a61240ceee7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s)
replorigin_session_origin_timestamp = commit_data.committime;
CommitTransactionCommand();
+ pgstat_report_stat(false);
store_flush_position(commit_data.end_lsn);
}
@@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
- pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);
}