aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-01-25 12:15:46 +0530
committerAmit Kapila <akapila@postgresql.org>2024-01-25 12:15:46 +0530
commitc393308b69d229b664391ac583b9e07418d411b6 (patch)
treed5d1eae442ac2c47562dbaeb109a7edcf00a2feb /src
parent86232a49a4373013056e8d38118339b8e7675ea0 (diff)
downloadpostgresql-c393308b69d229b664391ac583b9e07418d411b6.tar.gz
postgresql-c393308b69d229b664391ac583b9e07418d411b6.zip
Allow to enable failover property for replication slots via SQL API.
This commit adds the failover property to the replication slot. The failover property indicates whether the slot will be synced to the standby servers, enabling the resumption of corresponding logical replication after failover. But note that this commit does not yet include the capability to sync the replication slot; the subsequent commits will add that capability. A new optional parameter 'failover' is added to the pg_create_logical_replication_slot() function. We will also enable to set 'failover' option for slots via the subscription commands in the subsequent commits. The value of the 'failover' flag is displayed as part of pg_replication_slots view. Author: Hou Zhijie, Shveta Malik, Ajin Cherian Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_functions.sql1
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/slot.c8
-rw-r--r--src/backend/replication/slotfuncs.c16
-rw-r--r--src/backend/replication/walsender.c4
-rw-r--r--src/bin/pg_upgrade/info.c5
-rw-r--r--src/bin/pg_upgrade/pg_upgrade.c6
-rw-r--r--src/bin/pg_upgrade/pg_upgrade.h2
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat14
-rw-r--r--src/include/replication/slot.h8
-rw-r--r--src/test/regress/expected/rules.out5
12 files changed, 52 insertions, 22 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index f315fecf186..346cfb98a04 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN slot_name name, IN plugin name,
IN temporary boolean DEFAULT false,
IN twophase boolean DEFAULT false,
+ IN failover boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6288270e2b2..c62aa0074a3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
- L.conflict_reason
+ L.conflict_reason,
+ L.failover
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 52da694c791..02a14ec210e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 3 /* version for new files */
+#define SLOT_VERSION 4 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel)
* during getting changes, if the two_phase option is enabled it can skip
* prepare because by that time start decoding point has been moved. So the
* user will only get commit prepared.
+ * failover: If enabled, allows the slot to be synced to standbys so
+ * that logical replication can be resumed after failover.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency persistency, bool two_phase)
+ ReplicationSlotPersistency persistency,
+ bool two_phase, bool failover)
{
ReplicationSlot *slot = NULL;
int i;
@@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.persistency = persistency;
slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr;
+ slot->data.failover = failover;
/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index cad35dce7fc..eb685089b36 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
+ temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
+ false);
if (immediately_reserve)
{
@@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
static void
create_logical_replication_slot(char *name, char *plugin,
bool temporary, bool two_phase,
+ bool failover,
XLogRecPtr restart_lsn,
bool find_startpoint)
{
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
* error as well.
*/
ReplicationSlotCreate(name, true,
- temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
+ temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
+ failover);
/*
* Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
+ bool failover = PG_GETARG_BOOL(4);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
NameStr(*plugin),
temporary,
two_phase,
+ failover,
InvalidXLogRecPtr,
true);
@@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 15
+#define PG_GET_REPLICATION_SLOTS_COLS 16
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
}
}
+ values[i++] = BoolGetDatum(slot_contents.data.failover);
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
+ bool failover;
char *plugin;
Datum values[2];
bool nulls[2];
@@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
src_islogical = SlotIsLogical(&first_slot_contents);
src_restart_lsn = first_slot_contents.data.restart_lsn;
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
+ failover = first_slot_contents.data.failover;
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
/* Check type of replication slot */
@@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
plugin,
temporary,
false,
+ failover,
src_restart_lsn,
false);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 087031e9dc2..aa80f3de20f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false);
+ false, false);
if (reserve_wal)
{
@@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase);
+ two_phase, false);
/*
* Do options check early so that we can bail before calling the
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 74e02b3f826..183c2f84eb4 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
* started and stopped several times causing any temporary slots to be
* removed.
*/
- res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
+ res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
"%s as caught_up, conflict_reason IS NOT NULL as invalid "
"FROM pg_catalog.pg_replication_slots "
"WHERE slot_type = 'logical' AND "
@@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
int i_slotname;
int i_plugin;
int i_twophase;
+ int i_failover;
int i_caught_up;
int i_invalid;
@@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
i_slotname = PQfnumber(res, "slot_name");
i_plugin = PQfnumber(res, "plugin");
i_twophase = PQfnumber(res, "two_phase");
+ i_failover = PQfnumber(res, "failover");
i_caught_up = PQfnumber(res, "caught_up");
i_invalid = PQfnumber(res, "invalid");
@@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+ curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
}
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 14a36f0503d..10c94a6c1fc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -916,8 +916,10 @@ create_logical_replication_slots(void)
appendStringLiteralConn(query, slot_info->slotname, conn);
appendPQExpBuffer(query, ", ");
appendStringLiteralConn(query, slot_info->plugin, conn);
- appendPQExpBuffer(query, ", false, %s);",
- slot_info->two_phase ? "true" : "false");
+
+ appendPQExpBuffer(query, ", false, %s, %s);",
+ slot_info->two_phase ? "true" : "false",
+ slot_info->failover ? "true" : "false");
PQclear(executeQueryOrDie(conn, "%s", query->data));
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index a1d08c3dab1..d9a848cbfde 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -160,6 +160,8 @@ typedef struct
bool two_phase; /* can the slot decode 2PC? */
bool caught_up; /* has the slot caught up to latest changes? */
bool invalid; /* if true, the slot is unusable */
+ bool failover; /* is the slot designated to be synced to the
+ * physical standby? */
} LogicalSlotInfo;
typedef struct
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 23944db9e6b..739f9253bfe 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202401251
+#define CATALOG_VERSION_NO 202401252
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e4115cd0840..29af4ce65d5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11127,17 +11127,17 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record',
- proargtypes => 'name name bool bool',
- proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
- proargmodes => '{i,i,i,i,o,o}',
- proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
+ proargtypes => 'name name bool bool bool',
+ proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,i,o,o}',
+ proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222',
descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 9e39aaf3037..db9bb222661 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData
/* plugin name */
NameData plugin;
+
+ /*
+ * Is this a failover slot (sync candidate for standbys)? Only relevant
+ * for logical slots on the primary server.
+ */
+ bool failover;
} ReplicationSlotPersistentData;
/*
@@ -218,7 +224,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase);
+ bool two_phase, bool failover);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5e846b01e68..abc944e8b82 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name,
l.wal_status,
l.safe_wal_size,
l.two_phase,
- l.conflict_reason
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason)
+ l.conflict_reason,
+ l.failover
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,