diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2017-05-12 10:50:56 +0100 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2017-05-12 10:50:56 +0100 |
commit | 024711bb544645c8b1061e9f02b261e2e336981d (patch) | |
tree | 8c0a7b85c0260c260ddd6cb89cd4ee578b56ab12 /src/backend/replication/logical/logical.c | |
parent | efa2c18f4e8a8ccc74d9005d960f4c1a2bf05ea9 (diff) | |
download | postgresql-024711bb544645c8b1061e9f02b261e2e336981d.tar.gz postgresql-024711bb544645c8b1061e9f02b261e2e336981d.zip |
Lag tracking for logical replication
Lag tracking is called for each commit, but we introduce
a pacing delay to ensure we don't swamp the lag tracker.
Author: Petr Jelinek, with minor pacing delay code from me
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ab963c53456..7409e5ce3de 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { ReplicationSlot *slot; MemoryContext context, @@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; + ctx->update_progress = update_progress; ctx->output_plugin_options = output_plugin_options; @@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options, * * plugin contains the name of the output plugin * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write are callbacks that have to be filled to - * perform the use-case dependent, actual, work. + * read_page, prepare_write, do_write, update_progress + * callbacks that have to be filled to perform the use-case dependent, + * actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, need_full_snapshot, read_page, prepare_write, - do_write); + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin, * output_plugin_options * contains options passed to the output plugin. * - * read_page, prepare_write, do_write + * read_page, prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, + update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) } /* + * Update progress tracking (if supported). + */ +void +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +{ + if (!ctx->update_progress) + return; + + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); +} + +/* * Load the output plugin, lookup its output plugin init function, and check * that it provides the required callbacks. */ |