aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c20
1 files changed, 20 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 0468d12936f..d5f90a5f5d2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
PG_TRY();
{
ReorderBufferChange *change;
+ int changes_count = 0; /* used to accumulate the number of
+ * changes */
if (using_subtxn)
BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
elog(ERROR, "tuplecid value in changequeue");
break;
}
+
+ /*
+ * It is possible that the data is not sent to downstream for a
+ * long time either because the output plugin filtered it or there
+ * is a DDL that generates a lot of data that is not processed by
+ * the plugin. So, in such cases, the downstream can timeout. To
+ * avoid that we try to send a keepalive message if required.
+ * Trying to send a keepalive message after every change has some
+ * overhead, but testing showed there is no noticeable overhead if
+ * we do it after every ~100 changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ if (++changes_count >= CHANGES_THRESHOLD)
+ {
+ rb->update_progress_txn(rb, txn, change->lsn);
+ changes_count = 0;
+ }
}
/* speculative insertion record must be freed by now */