aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/subscriptioncmds.c6
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c15
-rw-r--r--src/backend/replication/repl_gram.y43
-rw-r--r--src/backend/replication/repl_scanner.l2
-rw-r--r--src/backend/replication/walsender.c58
-rw-r--r--src/bin/pg_basebackup/streamutil.c5
-rw-r--r--src/include/nodes/replnodes.h2
-rw-r--r--src/include/replication/walreceiver.h6
8 files changed, 115 insertions, 22 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 1868bf5f9ee..0198e6d75ba 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
PG_TRY();
{
- walrcv_create_slot(wrconn, slotname, false, &lsn);
+ /*
+ * Create permanent slot for the subscription. We won't use the
+ * initial snapshot for anything, so no need to export it.
+ */
+ walrcv_create_slot(wrconn, slotname, false, false, &lsn);
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
slotname)));
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index ebadf3680f6..cd2e57867c0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
+ bool export_snapshot,
XLogRecPtr *lsn);
static bool libpqrcv_command(WalReceiverConn *conn,
const char *cmd, char **err);
@@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
- bool temporary, XLogRecPtr *lsn)
+ bool temporary, bool export_snapshot, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
@@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
initStringInfo(&cmd);
- appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+ appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
if (temporary)
- appendStringInfo(&cmd, "TEMPORARY ");
+ appendStringInfo(&cmd, " TEMPORARY");
if (conn->logical)
- appendStringInfo(&cmd, "LOGICAL pgoutput");
+ {
+ appendStringInfo(&cmd, " LOGICAL pgoutput");
+ if (export_snapshot)
+ appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
+ else
+ appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
+ }
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b35d0f0cd1a..f1e43bc9f3d 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -79,6 +79,8 @@ Node *replication_parse_result;
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_EXPORT_SNAPSHOT
+%token K_NOEXPORT_SNAPSHOT
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -91,7 +93,9 @@ Node *replication_parse_result;
%type <defelt> plugin_opt_elem
%type <node> plugin_opt_arg
%type <str> opt_slot var_name
-%type <boolval> opt_reserve_wal opt_temporary
+%type <boolval> opt_temporary
+%type <list> create_slot_opt_list
+%type <defelt> create_slot_opt
%%
@@ -202,18 +206,18 @@ base_backup_opt:
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
+ K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->temporary = $3;
- cmd->reserve_wal = $5;
+ cmd->options = $5;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
+ | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
@@ -221,10 +225,36 @@ create_replication_slot:
cmd->slotname = $2;
cmd->temporary = $3;
cmd->plugin = $5;
+ cmd->options = $6;
$$ = (Node *) cmd;
}
;
+create_slot_opt_list:
+ create_slot_opt_list create_slot_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+create_slot_opt:
+ K_EXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | K_NOEXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(FALSE), -1);
+ }
+ | K_RESERVE_WAL
+ {
+ $$ = makeDefElem("reserve_wal",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ ;
+
/* DROP_REPLICATION_SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
@@ -291,11 +321,6 @@ opt_physical:
| /* EMPTY */
;
-opt_reserve_wal:
- K_RESERVE_WAL { $$ = true; }
- | /* EMPTY */ { $$ = false; }
- ;
-
opt_temporary:
K_TEMPORARY { $$ = true; }
| /* EMPTY */ { $$ = false; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 37f857925e4..f56d41d59c7 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -100,6 +100,8 @@ RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
+NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index dd3a936fc68..127efecb27d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -51,6 +51,7 @@
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
+#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -738,6 +739,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
}
/*
+ * Process extra options given to CREATE_REPLICATION_SLOT.
+ */
+static void
+parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
+ bool *reserve_wal,
+ bool *export_snapshot)
+{
+ ListCell *lc;
+ bool snapshot_action_given = false;
+ bool reserve_wal_given = false;
+
+ /* Parse options */
+ foreach (lc, cmd->options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lc);
+
+ if (strcmp(defel->defname, "export_snapshot") == 0)
+ {
+ if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ snapshot_action_given = true;
+ *export_snapshot = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "reserve_wal") == 0)
+ {
+ if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ reserve_wal_given = true;
+ *reserve_wal = true;
+ }
+ else
+ elog(ERROR, "unrecognized option: %s", defel->defname);
+ }
+}
+
+/*
* Create a new replication slot.
*/
static void
@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN];
char *slot_name;
+ bool reserve_wal = false;
+ bool export_snapshot = true;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
+
/* setup state for XLogReadPage */
sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID;
@@ -799,10 +846,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
DecodingContextFindStartpoint(ctx);
/*
- * Export a plain (not of the snapbuild.c type) snapshot to the user
- * that can be imported into another session.
+ * Export the snapshot if we've been asked to do so.
+ *
+ * NB. We will convert the snapbuild.c kind of snapshot to normal
+ * snapshot when doing this.
*/
- snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+ if (export_snapshot)
+ snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
@@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (!cmd->temporary)
ReplicationSlotPersist();
}
- else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
+ else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
{
ReplicationSlotReserveWal();
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1fe42efc215..507da5e76d1 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
slot_name);
else
+ {
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
slot_name, plugin);
+ if (PQserverVersion(conn) >= 100000)
+ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+ appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+ }
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index f27354faaf3..996da3c02ea 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd
ReplicationKind kind;
char *plugin;
bool temporary;
- bool reserve_wal;
+ List *options;
} CreateReplicationSlotCmd;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0857bdc5566..78e577c89b1 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- XLogRecPtr *lsn);
+ bool export_snapshot, XLogRecPtr *lsn);
typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
char **err);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
#define walrcv_command(conn, cmd, err) \
WalReceiverFunctions->walrcv_command(conn, cmd, err)
#define walrcv_disconnect(conn) \