diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 26 |
1 files changed, 19 insertions, 7 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6e5bc12e779..424fe86a1b6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options, /* * Create a new decoding context, for a new logical slot. * - * plugin contains the name of the output plugin - * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write, update_progress - * callbacks that have to be filled to perform the use-case dependent, - * actual, work. + * plugin -- contains the name of the output plugin + * output_plugin_options -- contains options passed to the output plugin + * restart_lsn -- if given as invalid, it's this routine's responsibility to + * mark WAL as reserved by setting a convenient restart_lsn for the slot. + * Otherwise, we set for decoding to start from the given LSN without + * marking WAL reserved beforehand. In that scenario, it's up to the + * caller to guarantee that WAL remains available. + * read_page, prepare_write, do_write, update_progress -- + * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -228,6 +232,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + if (XLogRecPtrIsInvalid(restart_lsn)) + ReplicationSlotReserveWal(); + else + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, read_page, prepare_write, do_write, update_progress); |