diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2017-04-27 14:57:26 -0400 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2017-04-28 13:47:46 -0400 |
commit | e3cf708016ca6045dc1cd5a0768cfecf17caf3d1 (patch) | |
tree | 17a1354e2fef5dea53672215fa45459ea7c839a0 /src/backend/replication/logical/tablesync.c | |
parent | d981074c24d2f1e4f44bc6d80e967e523ce64f50 (diff) | |
download | postgresql-e3cf708016ca6045dc1cd5a0768cfecf17caf3d1.tar.gz postgresql-e3cf708016ca6045dc1cd5a0768cfecf17caf3d1.zip |
Wait between tablesync worker restarts
Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval (currently 5s by default). This avoids
restarting failing workers in a tight loop.
We keep the last start times in a hash table last_start_times that is
separate from the table_states list, because that list is cleared out on
syscache invalidation, which happens whenever a table finishes syncing.
The hash table is kept until all tables have finished syncing.
A future project might be to unify these two and keep everything in one
data structure, but for now this is a less invasive change to accomplish
the original purpose.
For the test suite, set wal_retrieve_retry_interval to its minimum
value, to not increase the test suite run time.
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 58 |
1 files changed, 52 insertions, 6 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e63d26b0bcf..0823000f001 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * * If there are tables that need synchronizing and are not being synchronized * yet, start sync workers for them (if there are free slots for sync - * workers). + * workers). To prevent starting the sync worker for the same relation at a + * high frequency after a failure, we store its last start time with each sync + * state info. We start the sync worker for the same relation after waiting + * at least wal_retrieve_retry_interval. * * For tables that are being synchronized already, check if sync workers * either need action from the apply worker or have finished. @@ -263,7 +266,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) { + struct tablesync_start_time_mapping + { + Oid relid; + TimestampTz last_start_time; + }; static List *table_states = NIL; + static HTAB *last_start_times = NULL; ListCell *lc; Assert(!IsTransactionState()); @@ -300,6 +309,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) table_states_valid = true; } + /* + * Prepare hash table for tracking last start times of workers, to avoid + * immediate restarts. We don't need it if there are no tables that need + * syncing. + */ + if (table_states && !last_start_times) + { + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(struct tablesync_start_time_mapping); + last_start_times = hash_create("Logical replication table sync worker start times", + 256, &ctl, HASH_ELEM | HASH_BLOBS); + } + /* + * Clean up the hash table when we're done with all tables (just to + * release the bit of memory). + */ + else if (!table_states && last_start_times) + { + hash_destroy(last_start_times); + last_start_times = NULL; + } + /* Process all tables that are being synchronized. */ foreach(lc, table_states) { @@ -403,11 +437,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid); + TimestampTz now = GetCurrentTimestamp(); + struct tablesync_start_time_mapping *hentry; + bool found; + + hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found); + + if (!found || + TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + hentry->last_start_time = now; + } } } } |