diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/logical.c | 26 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 352 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 1 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.dat | 35 | ||||
-rw-r--r-- | src/include/replication/logical.h | 1 |
6 files changed, 367 insertions, 50 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6e5bc12e779..424fe86a1b6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options, /* * Create a new decoding context, for a new logical slot. * - * plugin contains the name of the output plugin - * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write, update_progress - * callbacks that have to be filled to perform the use-case dependent, - * actual, work. + * plugin -- contains the name of the output plugin + * output_plugin_options -- contains options passed to the output plugin + * restart_lsn -- if given as invalid, it's this routine's responsibility to + * mark WAL as reserved by setting a convenient restart_lsn for the slot. + * Otherwise, we set for decoding to start from the given LSN without + * marking WAL reserved beforehand. In that scenario, it's up to the + * caller to guarantee that WAL remains available. + * read_page, prepare_write, do_write, update_progress -- + * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -228,6 +232,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + if (XLogRecPtrIsInvalid(restart_lsn)) + ReplicationSlotReserveWal(); + else + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 224dd920c8d..d7c53c54bdb 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -10,13 +10,12 @@ * *------------------------------------------------------------------------- */ - #include "postgres.h" +#include "access/htup_details.h" +#include "access/xlog_internal.h" #include "funcapi.h" #include "miscadmin.h" - -#include "access/htup_details.h" #include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" @@ -36,6 +35,38 @@ check_permissions(void) } /* + * Helper function for creating a new physical replication slot with + * given arguments. Note that this function doesn't release the created + * slot. + * + * If restart_lsn is a valid value, we use it without WAL reservation + * routine. So the caller must guarantee that WAL is available. + */ +static void +create_physical_replication_slot(char *name, bool immediately_reserve, + bool temporary, XLogRecPtr restart_lsn) +{ + Assert(!MyReplicationSlot); + + /* acquire replication slot, this will check for conflicting names */ + ReplicationSlotCreate(name, false, + temporary ? RS_TEMPORARY : RS_PERSISTENT); + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + if (XLogRecPtrIsInvalid(restart_lsn)) + ReplicationSlotReserveWal(); + else + MyReplicationSlot->data.restart_lsn = restart_lsn; + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } +} + +/* * SQL function for creating a new physical (streaming replication) * replication slot. */ @@ -51,8 +82,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) HeapTuple tuple; Datum result; - Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); @@ -60,29 +89,21 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + create_physical_replication_slot(NameStr(*name), + immediately_reserve, + temporary, + InvalidXLogRecPtr); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; if (immediately_reserve) { - /* Reserve WAL as the user asked for it */ - ReplicationSlotReserveWal(); - - /* Write this slot to disk */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); nulls[1] = false; } else - { nulls[1] = true; - } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); @@ -94,32 +115,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) /* - * SQL function for creating a new logical replication slot. + * Helper function for creating a new logical replication slot with + * given arguments. Note that this function doesn't release the created + * slot. */ -Datum -pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +static void +create_logical_replication_slot(char *name, char *plugin, + bool temporary, XLogRecPtr restart_lsn) { - Name name = PG_GETARG_NAME(0); - Name plugin = PG_GETARG_NAME(1); - bool temporary = PG_GETARG_BOOL(2); - LogicalDecodingContext *ctx = NULL; - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; - Datum values[2]; - bool nulls[2]; - Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - check_permissions(); - - CheckLogicalDecodingRequirements(); - /* * Acquire a logical decoding slot, this will check for conflicting names. * Initially create persistent slot as ephemeral - that allows us to @@ -128,25 +135,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * slots can be created as temporary from beginning as they get dropped on * error as well. */ - ReplicationSlotCreate(NameStr(*name), true, + ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ + restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); - /* don't need the decoding context anymore */ FreeDecodingContext(ctx); +} + +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); + Datum result; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + check_permissions(); + + CheckLogicalDecodingRequirements(); + + create_logical_replication_slot(NameStr(*name), + NameStr(*plugin), + temporary, + InvalidXLogRecPtr); + + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); memset(nulls, 0, sizeof(nulls)); @@ -558,3 +594,235 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * Helper function of copying a replication slot. + */ +static Datum +copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + ReplicationSlot *src = NULL; + XLogRecPtr src_restart_lsn; + bool src_islogical; + bool temporary; + char *plugin; + Datum values[2]; + bool nulls[2]; + Datum result; + TupleDesc tupdesc; + HeapTuple tuple; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + check_permissions(); + + if (logical_slot) + CheckLogicalDecodingRequirements(); + else + CheckSlotRequirements(); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + /* + * We need to prevent the source slot's reserved WAL from being removed, + * but we don't want to lock that slot for very long, and it can advance + * in the meantime. So obtain the source slot's data, and create a new + * slot using its restart_lsn. Afterwards we lock the source slot again + * and verify that the data we copied (name, type) has not changed + * incompatibly. No inconvenient WAL removal can occur once the new slot + * is created -- but since WAL removal could have occurred before we + * managed to create the new slot, we advance the new slot's restart_lsn + * to the source slot's updated restart_lsn the second time we lock it. + */ + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) + { + SpinLockAcquire(&s->mutex); + src_islogical = SlotIsLogical(s); + src_restart_lsn = s->data.restart_lsn; + temporary = s->data.persistency == RS_TEMPORARY; + plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; + SpinLockRelease(&s->mutex); + + src = s; + break; + } + } + + LWLockRelease(ReplicationSlotControlLock); + + if (src == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", NameStr(*src_name)))); + + /* Check type of replication slot */ + if (src_islogical != logical_slot) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + src_islogical ? + errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot", + NameStr(*src_name)) : + errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot", + NameStr(*src_name)))); + + /* Copying non-reserved slot doesn't make sense */ + if (XLogRecPtrIsInvalid(src_restart_lsn)) + { + Assert(!logical_slot); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a replication slot that doesn't reserve WAL")))); + } + + /* Overwrite params from optional arguments */ + if (PG_NARGS() >= 3) + temporary = PG_GETARG_BOOL(2); + if (PG_NARGS() >= 4) + { + Assert(logical_slot); + plugin = NameStr(*(PG_GETARG_NAME(3))); + } + + /* Create new slot and acquire it */ + if (logical_slot) + create_logical_replication_slot(NameStr(*dst_name), + plugin, + temporary, + src_restart_lsn); + else + create_physical_replication_slot(NameStr(*dst_name), + true, + temporary, + src_restart_lsn); + + /* + * Update the destination slot to current values of the source slot; + * recheck that the source slot is still the one we saw previously. + */ + { + TransactionId copy_effective_xmin; + TransactionId copy_effective_catalog_xmin; + TransactionId copy_xmin; + TransactionId copy_catalog_xmin; + XLogRecPtr copy_restart_lsn; + bool copy_islogical; + char *copy_name; + + /* Copy data of source slot again */ + SpinLockAcquire(&src->mutex); + copy_effective_xmin = src->effective_xmin; + copy_effective_catalog_xmin = src->effective_catalog_xmin; + + copy_xmin = src->data.xmin; + copy_catalog_xmin = src->data.catalog_xmin; + copy_restart_lsn = src->data.restart_lsn; + + /* for existence check */ + copy_name = pstrdup(NameStr(src->data.name)); + copy_islogical = SlotIsLogical(src); + SpinLockRelease(&src->mutex); + + /* + * Check if the source slot still exists and is valid. We regards it + * as invalid if the type of replication slot or name has been + * changed, or the restart_lsn either is invalid or has gone backward. + * (The restart_lsn could go backwards if the source slot is dropped + * and copied from an older slot during installation.) + * + * Since erroring out will release and drop the destination slot we + * don't need to release it here. + */ + if (copy_restart_lsn < src_restart_lsn || + src_islogical != copy_islogical || + strcmp(copy_name, NameStr(*src_name)) != 0) + ereport(ERROR, + (errmsg("could not copy replication slot \"%s\"", + NameStr(*src_name)), + errdetail("The source replication slot was modified incompatibly during the copy operation."))); + + /* Install copied values again */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = copy_effective_xmin; + MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin; + + MyReplicationSlot->data.xmin = copy_xmin; + MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; + MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + +#ifdef USE_ASSERT_CHECKING + /* Check that the restart_lsn is available */ + { + XLogSegNo segno; + + XLByteToSeg(copy_restart_lsn, segno, wal_segment_size); + Assert(XLogGetLastRemovedSegno() < segno); + } +#endif + } + + /* target slot fully created, mark as persistent if needed */ + if (logical_slot && !temporary) + ReplicationSlotPersist(); + + /* All done. Set up the return values */ + values[0] = NameGetDatum(dst_name); + nulls[0] = false; + if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush)) + { + values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); + nulls[1] = false; + } + else + nulls[1] = true; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + ReplicationSlotRelease(); + + PG_RETURN_DATUM(result); +} + +/* The wrappers below are all to appease opr_sanity */ +Datum +pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, false); +} + +Datum +pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, false); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 21f5c868f18..aae6adc15c1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -934,6 +934,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1fe54c26653..bfd2bfc186c 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201904031 +#define CATALOG_VERSION_NO 201904051 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fb257c17c89..ad4519e0011 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9774,6 +9774,20 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, +{ oid => '4220', descr => 'copy a physical replication slot, changing temporality', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_a' }, +{ oid => '4221', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_b' }, { oid => '3780', descr => 'drop a replication slot', proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', @@ -9794,6 +9808,27 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, +{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name', + proallargtypes => '{name,name,bool,name,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_a' }, +{ oid => '4223', descr => 'copy a logical replication slot, changing temporality', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_b' }, +{ oid => '4224', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_c' }, { oid => '3782', descr => 'get changes from replication slot', proname => 'pg_logical_slot_get_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c8ffc4c4347..0a2a63a48c8 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, |