aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c26
-rw-r--r--src/backend/replication/slotfuncs.c352
-rw-r--r--src/backend/replication/walsender.c1
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat35
-rw-r--r--src/include/replication/logical.h1
6 files changed, 367 insertions, 50 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12e779..424fe86a1b6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options,
/*
* Create a new decoding context, for a new logical slot.
*
- * plugin contains the name of the output plugin
- * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write, update_progress
- * callbacks that have to be filled to perform the use-case dependent,
- * actual, work.
+ * plugin -- contains the name of the output plugin
+ * output_plugin_options -- contains options passed to the output plugin
+ * restart_lsn -- if given as invalid, it's this routine's responsibility to
+ * mark WAL as reserved by setting a convenient restart_lsn for the slot.
+ * Otherwise, we set for decoding to start from the given LSN without
+ * marking WAL reserved beforehand. In that scenario, it's up to the
+ * caller to guarantee that WAL remains available.
+ * read_page, prepare_write, do_write, update_progress --
+ * callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -228,6 +232,7 @@ LogicalDecodingContext *
CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
- ReplicationSlotReserveWal();
+ if (XLogRecPtrIsInvalid(restart_lsn))
+ ReplicationSlotReserveWal();
+ else
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+ }
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
- ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+ ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
read_page, prepare_write, do_write,
update_progress);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 224dd920c8d..d7c53c54bdb 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -10,13 +10,12 @@
*
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"
+#include "access/htup_details.h"
+#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
-
-#include "access/htup_details.h"
#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
@@ -36,6 +35,38 @@ check_permissions(void)
}
/*
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
+ *
+ * If restart_lsn is a valid value, we use it without WAL reservation
+ * routine. So the caller must guarantee that WAL is available.
+ */
+static void
+create_physical_replication_slot(char *name, bool immediately_reserve,
+ bool temporary, XLogRecPtr restart_lsn)
+{
+ Assert(!MyReplicationSlot);
+
+ /* acquire replication slot, this will check for conflicting names */
+ ReplicationSlotCreate(name, false,
+ temporary ? RS_TEMPORARY : RS_PERSISTENT);
+
+ if (immediately_reserve)
+ {
+ /* Reserve WAL as the user asked for it */
+ if (XLogRecPtrIsInvalid(restart_lsn))
+ ReplicationSlotReserveWal();
+ else
+ MyReplicationSlot->data.restart_lsn = restart_lsn;
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }
+}
+
+/*
* SQL function for creating a new physical (streaming replication)
* replication slot.
*/
@@ -51,8 +82,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
HeapTuple tuple;
Datum result;
- Assert(!MyReplicationSlot);
-
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
@@ -60,29 +89,21 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
- /* acquire replication slot, this will check for conflicting names */
- ReplicationSlotCreate(NameStr(*name), false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ create_physical_replication_slot(NameStr(*name),
+ immediately_reserve,
+ temporary,
+ InvalidXLogRecPtr);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
if (immediately_reserve)
{
- /* Reserve WAL as the user asked for it */
- ReplicationSlotReserveWal();
-
- /* Write this slot to disk */
- ReplicationSlotMarkDirty();
- ReplicationSlotSave();
-
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
nulls[1] = false;
}
else
- {
nulls[1] = true;
- }
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
@@ -94,32 +115,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
/*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
*/
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static void
+create_logical_replication_slot(char *name, char *plugin,
+ bool temporary, XLogRecPtr restart_lsn)
{
- Name name = PG_GETARG_NAME(0);
- Name plugin = PG_GETARG_NAME(1);
- bool temporary = PG_GETARG_BOOL(2);
-
LogicalDecodingContext *ctx = NULL;
- TupleDesc tupdesc;
- HeapTuple tuple;
- Datum result;
- Datum values[2];
- bool nulls[2];
-
Assert(!MyReplicationSlot);
- if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
- elog(ERROR, "return type must be a row type");
-
- check_permissions();
-
- CheckLogicalDecodingRequirements();
-
/*
* Acquire a logical decoding slot, this will check for conflicting names.
* Initially create persistent slot as ephemeral - that allows us to
@@ -128,25 +135,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
* slots can be created as temporary from beginning as they get dropped on
* error as well.
*/
- ReplicationSlotCreate(NameStr(*name), true,
+ ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
* Create logical decoding context, to build the initial snapshot.
*/
- ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+ ctx = CreateInitDecodingContext(plugin, NIL,
false, /* do not build snapshot */
+ restart_lsn,
logical_read_local_xlog_page, NULL, NULL,
NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
- values[0] = NameGetDatum(&MyReplicationSlot->data.name);
- values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Name plugin = PG_GETARG_NAME(1);
+ bool temporary = PG_GETARG_BOOL(2);
+ Datum result;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum values[2];
+ bool nulls[2];
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ create_logical_replication_slot(NameStr(*name),
+ NameStr(*plugin),
+ temporary,
+ InvalidXLogRecPtr);
+
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
memset(nulls, 0, sizeof(nulls));
@@ -558,3 +594,235 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}
+
+/*
+ * Helper function of copying a replication slot.
+ */
+static Datum
+copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
+{
+ Name src_name = PG_GETARG_NAME(0);
+ Name dst_name = PG_GETARG_NAME(1);
+ ReplicationSlot *src = NULL;
+ XLogRecPtr src_restart_lsn;
+ bool src_islogical;
+ bool temporary;
+ char *plugin;
+ Datum values[2];
+ bool nulls[2];
+ Datum result;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ check_permissions();
+
+ if (logical_slot)
+ CheckLogicalDecodingRequirements();
+ else
+ CheckSlotRequirements();
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ /*
+ * We need to prevent the source slot's reserved WAL from being removed,
+ * but we don't want to lock that slot for very long, and it can advance
+ * in the meantime. So obtain the source slot's data, and create a new
+ * slot using its restart_lsn. Afterwards we lock the source slot again
+ * and verify that the data we copied (name, type) has not changed
+ * incompatibly. No inconvenient WAL removal can occur once the new slot
+ * is created -- but since WAL removal could have occurred before we
+ * managed to create the new slot, we advance the new slot's restart_lsn
+ * to the source slot's updated restart_lsn the second time we lock it.
+ */
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
+ {
+ SpinLockAcquire(&s->mutex);
+ src_islogical = SlotIsLogical(s);
+ src_restart_lsn = s->data.restart_lsn;
+ temporary = s->data.persistency == RS_TEMPORARY;
+ plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+ SpinLockRelease(&s->mutex);
+
+ src = s;
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (src == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+
+ /* Check type of replication slot */
+ if (src_islogical != logical_slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ src_islogical ?
+ errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
+ NameStr(*src_name)) :
+ errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
+ NameStr(*src_name))));
+
+ /* Copying non-reserved slot doesn't make sense */
+ if (XLogRecPtrIsInvalid(src_restart_lsn))
+ {
+ Assert(!logical_slot);
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
+ }
+
+ /* Overwrite params from optional arguments */
+ if (PG_NARGS() >= 3)
+ temporary = PG_GETARG_BOOL(2);
+ if (PG_NARGS() >= 4)
+ {
+ Assert(logical_slot);
+ plugin = NameStr(*(PG_GETARG_NAME(3)));
+ }
+
+ /* Create new slot and acquire it */
+ if (logical_slot)
+ create_logical_replication_slot(NameStr(*dst_name),
+ plugin,
+ temporary,
+ src_restart_lsn);
+ else
+ create_physical_replication_slot(NameStr(*dst_name),
+ true,
+ temporary,
+ src_restart_lsn);
+
+ /*
+ * Update the destination slot to current values of the source slot;
+ * recheck that the source slot is still the one we saw previously.
+ */
+ {
+ TransactionId copy_effective_xmin;
+ TransactionId copy_effective_catalog_xmin;
+ TransactionId copy_xmin;
+ TransactionId copy_catalog_xmin;
+ XLogRecPtr copy_restart_lsn;
+ bool copy_islogical;
+ char *copy_name;
+
+ /* Copy data of source slot again */
+ SpinLockAcquire(&src->mutex);
+ copy_effective_xmin = src->effective_xmin;
+ copy_effective_catalog_xmin = src->effective_catalog_xmin;
+
+ copy_xmin = src->data.xmin;
+ copy_catalog_xmin = src->data.catalog_xmin;
+ copy_restart_lsn = src->data.restart_lsn;
+
+ /* for existence check */
+ copy_name = pstrdup(NameStr(src->data.name));
+ copy_islogical = SlotIsLogical(src);
+ SpinLockRelease(&src->mutex);
+
+ /*
+ * Check if the source slot still exists and is valid. We regards it
+ * as invalid if the type of replication slot or name has been
+ * changed, or the restart_lsn either is invalid or has gone backward.
+ * (The restart_lsn could go backwards if the source slot is dropped
+ * and copied from an older slot during installation.)
+ *
+ * Since erroring out will release and drop the destination slot we
+ * don't need to release it here.
+ */
+ if (copy_restart_lsn < src_restart_lsn ||
+ src_islogical != copy_islogical ||
+ strcmp(copy_name, NameStr(*src_name)) != 0)
+ ereport(ERROR,
+ (errmsg("could not copy replication slot \"%s\"",
+ NameStr(*src_name)),
+ errdetail("The source replication slot was modified incompatibly during the copy operation.")));
+
+ /* Install copied values again */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = copy_effective_xmin;
+ MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
+
+ MyReplicationSlot->data.xmin = copy_xmin;
+ MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
+ MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ ReplicationSlotSave();
+
+#ifdef USE_ASSERT_CHECKING
+ /* Check that the restart_lsn is available */
+ {
+ XLogSegNo segno;
+
+ XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
+ Assert(XLogGetLastRemovedSegno() < segno);
+ }
+#endif
+ }
+
+ /* target slot fully created, mark as persistent if needed */
+ if (logical_slot && !temporary)
+ ReplicationSlotPersist();
+
+ /* All done. Set up the return values */
+ values[0] = NameGetDatum(dst_name);
+ nulls[0] = false;
+ if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
+ {
+ values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+ nulls[1] = false;
+ }
+ else
+ nulls[1] = true;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+/* The wrappers below are all to appease opr_sanity */
+Datum
+pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, true);
+}
+
+Datum
+pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, false);
+}
+
+Datum
+pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
+{
+ return copy_replication_slot(fcinfo, false);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 21f5c868f18..aae6adc15c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -934,6 +934,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
}
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+ InvalidXLogRecPtr,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 1fe54c26653..bfd2bfc186c 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201904031
+#define CATALOG_VERSION_NO 201904051
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb257c17c89..ad4519e0011 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9774,6 +9774,20 @@
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4220', descr => 'copy a physical replication slot, changing temporality',
+ proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+ proallargtypes => '{name,name,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+ prosrc => 'pg_copy_physical_replication_slot_a' },
+{ oid => '4221', descr => 'copy a physical replication slot',
+ proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+ proallargtypes => '{name,name,name,pg_lsn}',
+ proargmodes => '{i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+ prosrc => 'pg_copy_physical_replication_slot_b' },
{ oid => '3780', descr => 'drop a replication slot',
proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
prorettype => 'void', proargtypes => 'name',
@@ -9794,6 +9808,27 @@
proargmodes => '{i,i,i,o,o}',
proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name',
+ proallargtypes => '{name,name,bool,name,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_a' },
+{ oid => '4223', descr => 'copy a logical replication slot, changing temporality',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+ proallargtypes => '{name,name,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_b' },
+{ oid => '4224', descr => 'copy a logical replication slot',
+ proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+ proallargtypes => '{name,name,name,pg_lsn}',
+ proargmodes => '{i,i,o,o}',
+ proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+ prosrc => 'pg_copy_logical_replication_slot_c' },
{ oid => '3782', descr => 'get changes from replication slot',
proname => 'pg_logical_slot_get_changes', procost => '1000',
prorows => '1000', provariadic => 'text', proisstrict => 'f',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c8ffc4c4347..0a2a63a48c8 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,