diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 9 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 141 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/nodes/equalfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 47 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 |
6 files changed, 151 insertions, 85 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 22587a43b05..7dc21f10522 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -82,8 +82,10 @@ GetSubscription(Oid subid, bool missing_ok) tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + sub->slotname = NULL; /* Get synccommit */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, @@ -147,7 +149,8 @@ FreeSubscription(Subscription *sub) { pfree(sub->name); pfree(sub->conninfo); - pfree(sub->slotname); + if (sub->slotname) + pfree(sub->slotname); list_free_deep(sub->publications); pfree(sub); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fde9e6e20cd..b76cdc55384 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); */ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name, + bool *enabled, bool *create_slot, + bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit) { ListCell *lc; @@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (create_slot) *create_slot = true; if (slot_name) + { + *slot_name_given = false; *slot_name = NULL; + } if (copy_data) *copy_data = true; if (synchronous_commit) @@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, } else if (strcmp(defel->defname, "slot name") == 0 && slot_name) { - if (*slot_name) + if (*slot_name_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + *slot_name_given = true; *slot_name = defGetString(defel); + + /* Setting slot_name = NONE is treated as no slot name. */ + if (strcmp(*slot_name, "none") == 0) + *slot_name = NULL; } else if (strcmp(defel->defname, "copy data") == 0 && copy_data) { @@ -194,17 +203,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (connect && !*connect) { /* Check for incompatible options from the user. */ - if (*enabled_given && *enabled) + if (enabled && *enabled_given && *enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and enabled are mutually exclusive options"))); - if (create_slot_given && *create_slot) + if (create_slot && create_slot_given && *create_slot) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and create slot are mutually exclusive options"))); - if (copy_data_given && *copy_data) + if (copy_data && copy_data_given && *copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and copy data are mutually exclusive options"))); @@ -214,6 +223,23 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *create_slot = false; *copy_data = false; } + + /* + * Do additional checking for disallowed combination when + * slot_name = NONE was used. + */ + if (slot_name && *slot_name_given && !*slot_name) + { + if (enabled && *enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and enabled are mutually exclusive options"))); + + if (create_slot && create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and create slot are mutually exclusive options"))); + } } /* @@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *synchronous_commit; char *conninfo; char *slotname; + bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data, - &synchronous_commit); + &enabled, &create_slot, &slotname_given, + &slotname, ©_data, &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (slotname == NULL) + if (!slotname_given && slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ if (synchronous_commit == NULL) synchronous_commit = "off"; @@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = @@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (create_slot) { + Assert(slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_NOEXPORT_SNAPSHOT, &lsn); ereport(NOTICE, @@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) HeapTuple tup; Oid subid; bool update_tuple = false; + Subscription *sub; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) stmt->subname); subid = HeapTupleGetOid(tup); + sub = GetSubscription(subid, false); /* Form a new tuple. */ memset(values, 0, sizeof(values)); @@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slotname; + bool slotname_given; char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL, - &synchronous_commit); + NULL, &slotname_given, &slotname, + NULL, &synchronous_commit); - if (slot_name) + if (slotname_given) { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + if (sub->enabled && !slotname) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set slot_name = NONE for enabled subscription"))); + + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } + if (synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = @@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); Assert(enabled_given); + if (!sub->slotname && enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot enable subscription that does not have a slot name"))); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt) /* Refresh if user asked us to. */ if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); AlterSubscription_refresh(sub, copy_data); @@ -751,15 +811,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) StringInfoData cmd; /* - * Since dropping a replication slot is not transactional, the replication - * slot stays dropped even if the transaction rolls back. So we cannot - * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. - */ - if (stmt->drop_slot) - PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); - - /* * Lock pg_subscription with AccessExclusiveLock to ensure * that the launcher doesn't restart new worker during dropping * the subscription @@ -817,8 +868,24 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + slotname = NULL; + + /* + * Since dropping a replication slot is not transactional, the replication + * slot stays dropped even if the transaction rolls back. So we cannot + * run DROP SUBSCRIPTION inside a transaction block if dropping the + * replication slot. + * + * XXX The command name should really be something like "DROP SUBSCRIPTION + * of a subscription that is associated with a replication slot", but we + * don't have the proper facilities for that. + */ + if (slotname) + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); + ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); @@ -843,8 +910,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (originid != InvalidRepOriginId) replorigin_drop(originid); - /* If the user asked to not drop the slot, we are done mow.*/ - if (!stmt->drop_slot) + /* If there is no slot associated with the subscription, we can finish here. */ + if (!slotname) { heap_close(rel, NoLock); return; @@ -857,14 +924,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) load_file("libpqwalreceiver", false); initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) ereport(ERROR, (errmsg("could not connect to publisher when attempting to " "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err))); + errdetail("The error was: %s", err), + errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) " + "to disassociate the subscription from the slot."))); PG_TRY(); { diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 35a237a0008..2d2a9d00b7b 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4537,8 +4537,8 @@ _copyDropSubscriptionStmt(const DropSubscriptionStmt *from) DropSubscriptionStmt *newnode = makeNode(DropSubscriptionStmt); COPY_STRING_FIELD(subname); - COPY_SCALAR_FIELD(drop_slot); COPY_SCALAR_FIELD(missing_ok); + COPY_SCALAR_FIELD(behavior); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 21dfbb0d752..b5459cd7260 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2246,8 +2246,8 @@ _equalDropSubscriptionStmt(const DropSubscriptionStmt *a, const DropSubscriptionStmt *b) { COMPARE_STRING_FIELD(subname); - COMPARE_SCALAR_FIELD(drop_slot); COMPARE_SCALAR_FIELD(missing_ok); + COMPARE_SCALAR_FIELD(behavior); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2cad8b25b8a..65c004c5096 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -415,7 +415,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <fun_param_mode> arg_class %type <typnam> func_return func_type -%type <boolean> opt_trusted opt_restart_seqs opt_drop_slot +%type <boolean> opt_trusted opt_restart_seqs %type <ival> OptTemp %type <ival> OptNoLog %type <oncommit> OnCommitOption @@ -467,7 +467,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound columnref in_expr having_clause func_table xmltable array_expr - ExclusionWhereClause + ExclusionWhereClause operator_def_arg %type <list> rowsfrom_item rowsfrom_list opt_col_def_list %type <boolean> opt_ordinality %type <list> ExclusionConstraintList ExclusionConstraintElem @@ -671,7 +671,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW - SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P + SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P @@ -5694,6 +5694,7 @@ def_arg: func_type { $$ = (Node *)$1; } | qual_all_Op { $$ = (Node *)$1; } | NumericOnly { $$ = (Node *)$1; } | Sconst { $$ = (Node *)makeString($1); } + | NONE { $$ = (Node *)makeString(pstrdup($1)); } ; old_aggr_definition: '(' old_aggr_list ')' { $$ = $2; } @@ -8933,8 +8934,17 @@ operator_def_list: operator_def_elem { $$ = list_make1($1); } operator_def_elem: ColLabel '=' NONE { $$ = makeDefElem($1, NULL, @1); } - | ColLabel '=' def_arg - { $$ = makeDefElem($1, (Node *) $3, @1); } + | ColLabel '=' operator_def_arg + { $$ = makeDefElem($1, (Node *) $3, @1); } + ; + +/* must be similar enough to def_arg to avoid reduce/reduce conflicts */ +operator_def_arg: + func_type { $$ = (Node *)$1; } + | reserved_keyword { $$ = (Node *)makeString(pstrdup($1)); } + | qual_all_Op { $$ = (Node *)$1; } + | NumericOnly { $$ = (Node *)$1; } + | Sconst { $$ = (Node *)makeString($1); } ; /***************************************************************************** @@ -9324,42 +9334,24 @@ AlterSubscriptionStmt: * *****************************************************************************/ -DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_slot +DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $3; - n->drop_slot = $4; n->missing_ok = false; + n->behavior = $4; $$ = (Node *) n; } - | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_slot + | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $5; - n->drop_slot = $6; n->missing_ok = true; + n->behavior = $6; $$ = (Node *) n; } ; -opt_drop_slot: - DROP SLOT - { - $$ = TRUE; - } - | IDENT SLOT - { - if (strcmp($1, "nodrop") == 0) - $$ = FALSE; - else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized option \"%s\"", $1), - parser_errposition(@1))); - } - | /*EMPTY*/ { $$ = TRUE; } - ; - /***************************************************************************** * * QUERY: Define Rewrite Rule @@ -14846,7 +14838,6 @@ unreserved_keyword: | SHOW | SIMPLE | SKIP - | SLOT | SNAPSHOT | SQL_P | STABLE diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a61240ceee7..362de12457b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1329,6 +1329,22 @@ reread_subscription(void) } /* + * Exit if the subscription was disabled. + * This normally should not happen as the worker gets killed + * during ALTER SUBSCRIPTION ... DISABLE. + */ + if (!newsub->enabled) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if connection string was changed. The launcher will start * new worker. */ @@ -1358,6 +1374,9 @@ reread_subscription(void) proc_exit(0); } + /* !slotname should never happen when enabled is true. */ + Assert(newsub->slotname); + /* * We need to make new connection to new slot if slot name has changed * so exit here as well if that's the case. @@ -1388,22 +1407,6 @@ reread_subscription(void) proc_exit(0); } - /* - * Exit if the subscription was disabled. - * This normally should not happen as the worker gets killed - * during ALTER SUBSCRIPTION ... DISABLE. - */ - if (!newsub->enabled) - { - ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); - - walrcv_disconnect(wrconn); - proc_exit(0); - } - /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid) { |