aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/logical.c34
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
2 files changed, 27 insertions, 9 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c53456..7409e5ce3de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
+ ctx->update_progress = update_progress;
ctx->output_plugin_options = output_plugin_options;
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
*
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- * perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ * callbacks that have to be filled to perform the use-case dependent,
+ * actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
need_full_snapshot, read_page, prepare_write,
- do_write);
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
* output_plugin_options
* contains options passed to the output plugin.
*
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- read_page, prepare_write, do_write);
+ read_page, prepare_write, do_write,
+ update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
}
/*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+ if (!ctx->update_progress)
+ return;
+
+ ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
* Load the output plugin, lookup its output plugin init function, and check
* that it provides the required callbacks.
*/
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index c251b92f57b..27164de093d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
options,
logical_read_local_xlog_page,
LogicalOutputPrepareWrite,
- LogicalOutputWrite);
+ LogicalOutputWrite, NULL);
MemoryContextSwitchTo(oldcontext);