aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c50
1 files changed, 50 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd76497..c3ec97a0a62 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
+/* callback to update txn's progress */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+ ReorderBufferTXN *txn,
+ XLogRecPtr lsn);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
/*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+ /*
+ * Callback to support updating progress during sending data of a
+ * transaction (and its subtransactions) to the output plugin.
+ */
+ ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "update_progress_txn";
+ state.report_location = lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * Report this change's lsn so replies from clients can give an up-to-date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = lsn;
+
+ ctx->end_xact = false;
+
+ OutputPluginUpdateProgress(ctx, false);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.