diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3922658bbca..e16f04626de 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -45,6 +45,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" /* @@ -62,6 +63,7 @@ #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 +#define SUBOPT_LSN 0x00000800 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +86,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + XLogRecPtr lsn; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_LSN) && + strcmp(defel->defname, "lsn") == 0) + { + char *lsn_str = defGetString(defel); + XLogRecPtr lsn; + + if (IsSet(opts->specified_opts, SUBOPT_LSN)) + errorConflictingDefElem(defel, pstate); + + /* Setting lsn = NONE is treated as resetting LSN */ + if (strcmp(lsn_str, "none") == 0) + lsn = InvalidXLogRecPtr; + else + { + /* Parse the argument as LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(lsn_str))); + + if (XLogRecPtrIsInvalid(lsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid WAL location (LSN): %s", lsn_str))); + } + + opts->specified_opts |= SUBOPT_LSN; + opts->lsn = lsn; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SKIP: + { + parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); + + /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */ + Assert(IsSet(opts.specified_opts, SUBOPT_LSN)); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + /* + * If the user sets subskiplsn, we do a sanity check to make + * sure that the specified LSN is a probable value. + */ + if (!XLogRecPtrIsInvalid(opts.lsn)) + { + RepOriginId originid; + char originname[NAMEDATALEN]; + XLogRecPtr remote_lsn; + + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, false); + remote_lsn = replorigin_get_progress(originid, false); + + /* Check the given LSN is at least a future LSN */ + if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X", + LSN_FORMAT_ARGS(opts.lsn), + LSN_FORMAT_ARGS(remote_lsn)))); + } + + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + update_tuple = true; + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); |