diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 0468d12936f..d5f90a5f5d2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, PG_TRY(); { ReorderBufferChange *change; + int changes_count = 0; /* used to accumulate the number of + * changes */ if (using_subtxn) BeginInternalSubTransaction(streaming ? "stream" : "replay"); @@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } + + /* + * It is possible that the data is not sent to downstream for a + * long time either because the output plugin filtered it or there + * is a DDL that generates a lot of data that is not processed by + * the plugin. So, in such cases, the downstream can timeout. To + * avoid that we try to send a keepalive message if required. + * Trying to send a keepalive message after every change has some + * overhead, but testing showed there is no noticeable overhead if + * we do it after every ~100 changes. + */ +#define CHANGES_THRESHOLD 100 + + if (++changes_count >= CHANGES_THRESHOLD) + { + rb->update_progress_txn(rb, txn, change->lsn); + changes_count = 0; + } } /* speculative insertion record must be freed by now */ |