diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2017-05-09 10:20:42 -0400 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2017-05-09 10:20:42 -0400 |
commit | 013c1178fd0adefa0f68d5ce2d84e7ae6f9613a1 (patch) | |
tree | c1d81aeda68bec07b67c317617b40cbc8ea2e925 /src/backend | |
parent | c4c493fd3581dfbce45e903b87e12eea508f47e4 (diff) | |
download | postgresql-013c1178fd0adefa0f68d5ce2d84e7ae6f9613a1.tar.gz postgresql-013c1178fd0adefa0f68d5ce2d84e7ae6f9613a1.zip |
Remove the NODROP SLOT option from DROP SUBSCRIPTION
It turned out this approach had problems, because a DROP command should
not have any options other than CASCADE and RESTRICT. Instead, always
attempt to drop the slot if there is one configured, but also add an
ALTER SUBSCRIPTION action to set the slot to NONE.
Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Tom Lane <tgl@sss.pgh.pa.us>
Discussion: https://www.postgresql.org/message-id/29431.1493730652@sss.pgh.pa.us
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 9 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 141 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/nodes/equalfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 47 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 |
6 files changed, 151 insertions, 85 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 22587a43b05..7dc21f10522 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -82,8 +82,10 @@ GetSubscription(Oid subid, bool missing_ok) tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + sub->slotname = NULL; /* Get synccommit */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, @@ -147,7 +149,8 @@ FreeSubscription(Subscription *sub) { pfree(sub->name); pfree(sub->conninfo); - pfree(sub->slotname); + if (sub->slotname) + pfree(sub->slotname); list_free_deep(sub->publications); pfree(sub); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fde9e6e20cd..b76cdc55384 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); */ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name, + bool *enabled, bool *create_slot, + bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit) { ListCell *lc; @@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (create_slot) *create_slot = true; if (slot_name) + { + *slot_name_given = false; *slot_name = NULL; + } if (copy_data) *copy_data = true; if (synchronous_commit) @@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, } else if (strcmp(defel->defname, "slot name") == 0 && slot_name) { - if (*slot_name) + if (*slot_name_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + *slot_name_given = true; *slot_name = defGetString(defel); + + /* Setting slot_name = NONE is treated as no slot name. */ + if (strcmp(*slot_name, "none") == 0) + *slot_name = NULL; } else if (strcmp(defel->defname, "copy data") == 0 && copy_data) { @@ -194,17 +203,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (connect && !*connect) { /* Check for incompatible options from the user. */ - if (*enabled_given && *enabled) + if (enabled && *enabled_given && *enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and enabled are mutually exclusive options"))); - if (create_slot_given && *create_slot) + if (create_slot && create_slot_given && *create_slot) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and create slot are mutually exclusive options"))); - if (copy_data_given && *copy_data) + if (copy_data && copy_data_given && *copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and copy data are mutually exclusive options"))); @@ -214,6 +223,23 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *create_slot = false; *copy_data = false; } + + /* + * Do additional checking for disallowed combination when + * slot_name = NONE was used. + */ + if (slot_name && *slot_name_given && !*slot_name) + { + if (enabled && *enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and enabled are mutually exclusive options"))); + + if (create_slot && create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and create slot are mutually exclusive options"))); + } } /* @@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *synchronous_commit; char *conninfo; char *slotname; + bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data, - &synchronous_commit); + &enabled, &create_slot, &slotname_given, + &slotname, ©_data, &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (slotname == NULL) + if (!slotname_given && slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ if (synchronous_commit == NULL) synchronous_commit = "off"; @@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = @@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (create_slot) { + Assert(slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_NOEXPORT_SNAPSHOT, &lsn); ereport(NOTICE, @@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) HeapTuple tup; Oid subid; bool update_tuple = false; + Subscription *sub; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) stmt->subname); subid = HeapTupleGetOid(tup); + sub = GetSubscription(subid, false); /* Form a new tuple. */ memset(values, 0, sizeof(values)); @@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slotname; + bool slotname_given; char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL, - &synchronous_commit); + NULL, &slotname_given, &slotname, + NULL, &synchronous_commit); - if (slot_name) + if (slotname_given) { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + if (sub->enabled && !slotname) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set slot_name = NONE for enabled subscription"))); + + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } + if (synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = @@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); Assert(enabled_given); + if (!sub->slotname && enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot enable subscription that does not have a slot name"))); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt) /* Refresh if user asked us to. */ if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); AlterSubscription_refresh(sub, copy_data); @@ -751,15 +811,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) StringInfoData cmd; /* - * Since dropping a replication slot is not transactional, the replication - * slot stays dropped even if the transaction rolls back. So we cannot - * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. - */ - if (stmt->drop_slot) - PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); - - /* * Lock pg_subscription with AccessExclusiveLock to ensure * that the launcher doesn't restart new worker during dropping * the subscription @@ -817,8 +868,24 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + slotname = NULL; + + /* + * Since dropping a replication slot is not transactional, the replication + * slot stays dropped even if the transaction rolls back. So we cannot + * run DROP SUBSCRIPTION inside a transaction block if dropping the + * replication slot. + * + * XXX The command name should really be something like "DROP SUBSCRIPTION + * of a subscription that is associated with a replication slot", but we + * don't have the proper facilities for that. + */ + if (slotname) + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); + ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); @@ -843,8 +910,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (originid != InvalidRepOriginId) replorigin_drop(originid); - /* If the user asked to not drop the slot, we are done mow.*/ - if (!stmt->drop_slot) + /* If there is no slot associated with the subscription, we can finish here. */ + if (!slotname) { heap_close(rel, NoLock); return; @@ -857,14 +924,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) load_file("libpqwalreceiver", false); initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) ereport(ERROR, (errmsg("could not connect to publisher when attempting to " "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err))); + errdetail("The error was: %s", err), + errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) " + "to disassociate the subscription from the slot."))); PG_TRY(); { diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 35a237a0008..2d2a9d00b7b 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4537,8 +4537,8 @@ _copyDropSubscriptionStmt(const DropSubscriptionStmt *from) DropSubscriptionStmt *newnode = makeNode(DropSubscriptionStmt); COPY_STRING_FIELD(subname); - COPY_SCALAR_FIELD(drop_slot); COPY_SCALAR_FIELD(missing_ok); + COPY_SCALAR_FIELD(behavior); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 21dfbb0d752..b5459cd7260 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2246,8 +2246,8 @@ _equalDropSubscriptionStmt(const DropSubscriptionStmt *a, const DropSubscriptionStmt *b) { COMPARE_STRING_FIELD(subname); - COMPARE_SCALAR_FIELD(drop_slot); COMPARE_SCALAR_FIELD(missing_ok); + COMPARE_SCALAR_FIELD(behavior); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2cad8b25b8a..65c004c5096 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -415,7 +415,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <fun_param_mode> arg_class %type <typnam> func_return func_type -%type <boolean> opt_trusted opt_restart_seqs opt_drop_slot +%type <boolean> opt_trusted opt_restart_seqs %type <ival> OptTemp %type <ival> OptNoLog %type <oncommit> OnCommitOption @@ -467,7 +467,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound columnref in_expr having_clause func_table xmltable array_expr - ExclusionWhereClause + ExclusionWhereClause operator_def_arg %type <list> rowsfrom_item rowsfrom_list opt_col_def_list %type <boolean> opt_ordinality %type <list> ExclusionConstraintList ExclusionConstraintElem @@ -671,7 +671,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW - SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P + SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P @@ -5694,6 +5694,7 @@ def_arg: func_type { $$ = (Node *)$1; } | qual_all_Op { $$ = (Node *)$1; } | NumericOnly { $$ = (Node *)$1; } | Sconst { $$ = (Node *)makeString($1); } + | NONE { $$ = (Node *)makeString(pstrdup($1)); } ; old_aggr_definition: '(' old_aggr_list ')' { $$ = $2; } @@ -8933,8 +8934,17 @@ operator_def_list: operator_def_elem { $$ = list_make1($1); } operator_def_elem: ColLabel '=' NONE { $$ = makeDefElem($1, NULL, @1); } - | ColLabel '=' def_arg - { $$ = makeDefElem($1, (Node *) $3, @1); } + | ColLabel '=' operator_def_arg + { $$ = makeDefElem($1, (Node *) $3, @1); } + ; + +/* must be similar enough to def_arg to avoid reduce/reduce conflicts */ +operator_def_arg: + func_type { $$ = (Node *)$1; } + | reserved_keyword { $$ = (Node *)makeString(pstrdup($1)); } + | qual_all_Op { $$ = (Node *)$1; } + | NumericOnly { $$ = (Node *)$1; } + | Sconst { $$ = (Node *)makeString($1); } ; /***************************************************************************** @@ -9324,42 +9334,24 @@ AlterSubscriptionStmt: * *****************************************************************************/ -DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_slot +DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $3; - n->drop_slot = $4; n->missing_ok = false; + n->behavior = $4; $$ = (Node *) n; } - | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_slot + | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $5; - n->drop_slot = $6; n->missing_ok = true; + n->behavior = $6; $$ = (Node *) n; } ; -opt_drop_slot: - DROP SLOT - { - $$ = TRUE; - } - | IDENT SLOT - { - if (strcmp($1, "nodrop") == 0) - $$ = FALSE; - else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized option \"%s\"", $1), - parser_errposition(@1))); - } - | /*EMPTY*/ { $$ = TRUE; } - ; - /***************************************************************************** * * QUERY: Define Rewrite Rule @@ -14846,7 +14838,6 @@ unreserved_keyword: | SHOW | SIMPLE | SKIP - | SLOT | SNAPSHOT | SQL_P | STABLE diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a61240ceee7..362de12457b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1329,6 +1329,22 @@ reread_subscription(void) } /* + * Exit if the subscription was disabled. + * This normally should not happen as the worker gets killed + * during ALTER SUBSCRIPTION ... DISABLE. + */ + if (!newsub->enabled) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if connection string was changed. The launcher will start * new worker. */ @@ -1358,6 +1374,9 @@ reread_subscription(void) proc_exit(0); } + /* !slotname should never happen when enabled is true. */ + Assert(newsub->slotname); + /* * We need to make new connection to new slot if slot name has changed * so exit here as well if that's the case. @@ -1388,22 +1407,6 @@ reread_subscription(void) proc_exit(0); } - /* - * Exit if the subscription was disabled. - * This normally should not happen as the worker gets killed - * during ALTER SUBSCRIPTION ... DISABLE. - */ - if (!newsub->enabled) - { - ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); - - walrcv_disconnect(wrconn); - proc_exit(0); - } - /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid) { |