aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2017-05-12 10:50:56 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2017-05-12 10:50:56 +0100
commit024711bb544645c8b1061e9f02b261e2e336981d (patch)
tree8c0a7b85c0260c260ddd6cb89cd4ee578b56ab12 /src/backend/replication/logical/logical.c
parentefa2c18f4e8a8ccc74d9005d960f4c1a2bf05ea9 (diff)
downloadpostgresql-024711bb544645c8b1061e9f02b261e2e336981d.tar.gz
postgresql-024711bb544645c8b1061e9f02b261e2e336981d.zip
Lag tracking for logical replication
Lag tracking is called for each commit, but we introduce a pacing delay to ensure we don't swamp the lag tracker. Author: Petr Jelinek, with minor pacing delay code from me
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c34
1 files changed, 26 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ab963c53456..7409e5ce3de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
+ ctx->update_progress = update_progress;
ctx->output_plugin_options = output_plugin_options;
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
*
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- * perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ * callbacks that have to be filled to perform the use-case dependent,
+ * actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
need_full_snapshot, read_page, prepare_write,
- do_write);
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
* output_plugin_options
* contains options passed to the output plugin.
*
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
- LogicalOutputPluginWriterWrite do_write)
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- read_page, prepare_write, do_write);
+ read_page, prepare_write, do_write,
+ update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -504,6 +510,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
}
/*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+ if (!ctx->update_progress)
+ return;
+
+ ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
+/*
* Load the output plugin, lookup its output plugin init function, and check
* that it provides the required callbacks.
*/