diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1a58dd76497..c3ec97a0a62 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +/* callback to update txn's progress */ +static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); /* @@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + /* + * Callback to support updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "update_progress_txn"; + state.report_location = lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->write_xid = txn->xid; + + /* + * Report this change's lsn so replies from clients can give an up-to-date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = lsn; + + ctx->end_xact = false; + + OutputPluginUpdateProgress(ctx, false); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. |