aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c26
1 files changed, 19 insertions, 7 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12e779..424fe86a1b6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options,
/*
* Create a new decoding context, for a new logical slot.
*
- * plugin contains the name of the output plugin
- * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write, update_progress
- * callbacks that have to be filled to perform the use-case dependent,
- * actual, work.
+ * plugin -- contains the name of the output plugin
+ * output_plugin_options -- contains options passed to the output plugin
+ * restart_lsn -- if given as invalid, it's this routine's responsibility to
+ * mark WAL as reserved by setting a convenient restart_lsn for the slot.
+ * Otherwise, we set for decoding to start from the given LSN without
+ * marking WAL reserved beforehand. In that scenario, it's up to the
+ * caller to guarantee that WAL remains available.
+ * read_page, prepare_write, do_write, update_progress --
+ * callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -228,6 +232,7 @@ LogicalDecodingContext *
CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
- ReplicationSlotReserveWal();
+ if (XLogRecPtrIsInvalid(restart_lsn))
+ ReplicationSlotReserveWal();
+ else
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+ }
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
- ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+ ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
read_page, prepare_write, do_write,
update_progress);