aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-04-27 14:57:26 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-04-28 13:47:46 -0400
commite3cf708016ca6045dc1cd5a0768cfecf17caf3d1 (patch)
tree17a1354e2fef5dea53672215fa45459ea7c839a0 /src/backend/replication/logical/tablesync.c
parentd981074c24d2f1e4f44bc6d80e967e523ce64f50 (diff)
downloadpostgresql-e3cf708016ca6045dc1cd5a0768cfecf17caf3d1.tar.gz
postgresql-e3cf708016ca6045dc1cd5a0768cfecf17caf3d1.zip
Wait between tablesync worker restarts
Before restarting a tablesync worker for the same relation, wait wal_retrieve_retry_interval (currently 5s by default). This avoids restarting failing workers in a tight loop. We keep the last start times in a hash table last_start_times that is separate from the table_states list, because that list is cleared out on syscache invalidation, which happens whenever a table finishes syncing. The hash table is kept until all tables have finished syncing. A future project might be to unify these two and keep everything in one data structure, but for now this is a less invasive change to accomplish the original purpose. For the test suite, set wal_retrieve_retry_interval to its minimum value, to not increase the test suite run time. Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c58
1 files changed, 52 insertions, 6 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e63d26b0bcf..0823000f001 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If there are tables that need synchronizing and are not being synchronized
* yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers). To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info. We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
*
* For tables that are being synchronized already, check if sync workers
* either need action from the apply worker or have finished.
@@ -263,7 +266,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
+ struct tablesync_start_time_mapping
+ {
+ Oid relid;
+ TimestampTz last_start_time;
+ };
static List *table_states = NIL;
+ static HTAB *last_start_times = NULL;
ListCell *lc;
Assert(!IsTransactionState());
@@ -300,6 +309,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
table_states_valid = true;
}
+ /*
+ * Prepare hash table for tracking last start times of workers, to avoid
+ * immediate restarts. We don't need it if there are no tables that need
+ * syncing.
+ */
+ if (table_states && !last_start_times)
+ {
+ HASHCTL ctl;
+
+ memset(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
+ last_start_times = hash_create("Logical replication table sync worker start times",
+ 256, &ctl, HASH_ELEM | HASH_BLOBS);
+ }
+ /*
+ * Clean up the hash table when we're done with all tables (just to
+ * release the bit of memory).
+ */
+ else if (!table_states && last_start_times)
+ {
+ hash_destroy(last_start_times);
+ last_start_times = NULL;
+ }
+
/* Process all tables that are being synchronized. */
foreach(lc, table_states)
{
@@ -403,11 +437,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid);
+ TimestampTz now = GetCurrentTimestamp();
+ struct tablesync_start_time_mapping *hentry;
+ bool found;
+
+ hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found);
+
+ if (!found ||
+ TimestampDifferenceExceeds(hentry->last_start_time, now,
+ wal_retrieve_retry_interval))
+ {
+ logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid);
+ hentry->last_start_time = now;
+ }
}
}
}