aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_functions.sql1
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/slot.c8
-rw-r--r--src/backend/replication/slotfuncs.c16
-rw-r--r--src/backend/replication/walsender.c4
5 files changed, 24 insertions, 8 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index f315fecf186..346cfb98a04 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN slot_name name, IN plugin name,
IN temporary boolean DEFAULT false,
IN twophase boolean DEFAULT false,
+ IN failover boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6288270e2b2..c62aa0074a3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
- L.conflict_reason
+ L.conflict_reason,
+ L.failover
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 52da694c791..02a14ec210e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 3 /* version for new files */
+#define SLOT_VERSION 4 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel)
* during getting changes, if the two_phase option is enabled it can skip
* prepare because by that time start decoding point has been moved. So the
* user will only get commit prepared.
+ * failover: If enabled, allows the slot to be synced to standbys so
+ * that logical replication can be resumed after failover.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency persistency, bool two_phase)
+ ReplicationSlotPersistency persistency,
+ bool two_phase, bool failover)
{
ReplicationSlot *slot = NULL;
int i;
@@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.persistency = persistency;
slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr;
+ slot->data.failover = failover;
/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index cad35dce7fc..eb685089b36 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
+ temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
+ false);
if (immediately_reserve)
{
@@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
static void
create_logical_replication_slot(char *name, char *plugin,
bool temporary, bool two_phase,
+ bool failover,
XLogRecPtr restart_lsn,
bool find_startpoint)
{
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
* error as well.
*/
ReplicationSlotCreate(name, true,
- temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
+ temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
+ failover);
/*
* Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
+ bool failover = PG_GETARG_BOOL(4);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
NameStr(*plugin),
temporary,
two_phase,
+ failover,
InvalidXLogRecPtr,
true);
@@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 15
+#define PG_GET_REPLICATION_SLOTS_COLS 16
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
}
}
+ values[i++] = BoolGetDatum(slot_contents.data.failover);
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
+ bool failover;
char *plugin;
Datum values[2];
bool nulls[2];
@@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
src_islogical = SlotIsLogical(&first_slot_contents);
src_restart_lsn = first_slot_contents.data.restart_lsn;
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
+ failover = first_slot_contents.data.failover;
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
/* Check type of replication slot */
@@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
plugin,
temporary,
false,
+ failover,
src_restart_lsn,
false);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 087031e9dc2..aa80f3de20f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false);
+ false, false);
if (reserve_wal)
{
@@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase);
+ two_phase, false);
/*
* Do options check early so that we can bail before calling the