diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 6 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 15 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 43 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 2 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 58 | ||||
-rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 5 | ||||
-rw-r--r-- | src/include/nodes/replnodes.h | 2 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 6 |
8 files changed, 115 insertions, 22 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1868bf5f9ee..0198e6d75ba 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - walrcv_create_slot(wrconn, slotname, false, &lsn); + /* + * Create permanent slot for the subscription. We won't use the + * initial snapshot for anything, so no need to export it. + */ + walrcv_create_slot(wrconn, slotname, false, false, &lsn); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ebadf3680f6..cd2e57867c0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, + bool export_snapshot, XLogRecPtr *lsn); static bool libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err); @@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, XLogRecPtr *lsn) + bool temporary, bool export_snapshot, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, initStringInfo(&cmd); - appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname); + appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname); if (temporary) - appendStringInfo(&cmd, "TEMPORARY "); + appendStringInfo(&cmd, " TEMPORARY"); if (conn->logical) - appendStringInfo(&cmd, "LOGICAL pgoutput"); + { + appendStringInfo(&cmd, " LOGICAL pgoutput"); + if (export_snapshot) + appendStringInfo(&cmd, " EXPORT_SNAPSHOT"); + else + appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT"); + } res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index b35d0f0cd1a..f1e43bc9f3d 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -79,6 +79,8 @@ Node *replication_parse_result; %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_EXPORT_SNAPSHOT +%token K_NOEXPORT_SNAPSHOT %type <node> command %type <node> base_backup start_replication start_logical_replication @@ -91,7 +93,9 @@ Node *replication_parse_result; %type <defelt> plugin_opt_elem %type <node> plugin_opt_arg %type <str> opt_slot var_name -%type <boolval> opt_reserve_wal opt_temporary +%type <boolval> opt_temporary +%type <list> create_slot_opt_list +%type <defelt> create_slot_opt %% @@ -202,18 +206,18 @@ base_backup_opt: create_replication_slot: /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->slotname = $2; cmd->temporary = $3; - cmd->reserve_wal = $5; + cmd->options = $5; $$ = (Node *) cmd; } /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -221,10 +225,36 @@ create_replication_slot: cmd->slotname = $2; cmd->temporary = $3; cmd->plugin = $5; + cmd->options = $6; $$ = (Node *) cmd; } ; +create_slot_opt_list: + create_slot_opt_list create_slot_opt + { $$ = lappend($1, $2); } + | /* EMPTY */ + { $$ = NIL; } + ; + +create_slot_opt: + K_EXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(TRUE), -1); + } + | K_NOEXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(FALSE), -1); + } + | K_RESERVE_WAL + { + $$ = makeDefElem("reserve_wal", + (Node *)makeInteger(TRUE), -1); + } + ; + /* DROP_REPLICATION_SLOT slot */ drop_replication_slot: K_DROP_REPLICATION_SLOT IDENT @@ -291,11 +321,6 @@ opt_physical: | /* EMPTY */ ; -opt_reserve_wal: - K_RESERVE_WAL { $$ = true; } - | /* EMPTY */ { $$ = false; } - ; - opt_temporary: K_TEMPORARY { $$ = true; } | /* EMPTY */ { $$ = false; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 37f857925e4..f56d41d59c7 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -100,6 +100,8 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } +NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } "," { return ','; } ";" { return ';'; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dd3a936fc68..127efecb27d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -51,6 +51,7 @@ #include "catalog/pg_type.h" #include "commands/dbcommands.h" +#include "commands/defrem.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -738,6 +739,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req } /* + * Process extra options given to CREATE_REPLICATION_SLOT. + */ +static void +parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, + bool *reserve_wal, + bool *export_snapshot) +{ + ListCell *lc; + bool snapshot_action_given = false; + bool reserve_wal_given = false; + + /* Parse options */ + foreach (lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "export_snapshot") == 0) + { + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + snapshot_action_given = true; + *export_snapshot = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "reserve_wal") == 0) + { + if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + reserve_wal_given = true; + *reserve_wal = true; + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* * Create a new replication slot. */ static void @@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) const char *snapshot_name = NULL; char xpos[MAXFNAMELEN]; char *slot_name; + bool reserve_wal = false; + bool export_snapshot = true; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; @@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); + parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot); + /* setup state for XLogReadPage */ sendTimeLineIsHistoric = false; sendTimeLine = ThisTimeLineID; @@ -799,10 +846,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) DecodingContextFindStartpoint(ctx); /* - * Export a plain (not of the snapbuild.c type) snapshot to the user - * that can be imported into another session. + * Export the snapshot if we've been asked to do so. + * + * NB. We will convert the snapbuild.c kind of snapshot to normal + * snapshot when doing this. */ - snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + if (export_snapshot) + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (!cmd->temporary) ReplicationSlotPersist(); } - else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal) + else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) { ReplicationSlotReserveWal(); diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 1fe42efc215..507da5e76d1 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", slot_name); else + { appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", slot_name, plugin); + if (PQserverVersion(conn) >= 100000) + /* pg_recvlogical doesn't use an exported snapshot, so suppress */ + appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT"); + } res = PQexec(conn, query->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index f27354faaf3..996da3c02ea 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd ReplicationKind kind; char *plugin; bool temporary; - bool reserve_wal; + List *options; } CreateReplicationSlotCmd; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0857bdc5566..78e577c89b1 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, int nbytes); typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, - XLogRecPtr *lsn); + bool export_snapshot, XLogRecPtr *lsn); typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, char **err); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); @@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn) +#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) #define walrcv_command(conn, cmd, err) \ WalReceiverFunctions->walrcv_command(conn, cmd, err) #define walrcv_disconnect(conn) \ |