diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 141 |
1 files changed, 105 insertions, 36 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fde9e6e20cd..b76cdc55384 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); */ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name, + bool *enabled, bool *create_slot, + bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit) { ListCell *lc; @@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (create_slot) *create_slot = true; if (slot_name) + { + *slot_name_given = false; *slot_name = NULL; + } if (copy_data) *copy_data = true; if (synchronous_commit) @@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, } else if (strcmp(defel->defname, "slot name") == 0 && slot_name) { - if (*slot_name) + if (*slot_name_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + *slot_name_given = true; *slot_name = defGetString(defel); + + /* Setting slot_name = NONE is treated as no slot name. */ + if (strcmp(*slot_name, "none") == 0) + *slot_name = NULL; } else if (strcmp(defel->defname, "copy data") == 0 && copy_data) { @@ -194,17 +203,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (connect && !*connect) { /* Check for incompatible options from the user. */ - if (*enabled_given && *enabled) + if (enabled && *enabled_given && *enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and enabled are mutually exclusive options"))); - if (create_slot_given && *create_slot) + if (create_slot && create_slot_given && *create_slot) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and create slot are mutually exclusive options"))); - if (copy_data_given && *copy_data) + if (copy_data && copy_data_given && *copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("noconnect and copy data are mutually exclusive options"))); @@ -214,6 +223,23 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *create_slot = false; *copy_data = false; } + + /* + * Do additional checking for disallowed combination when + * slot_name = NONE was used. + */ + if (slot_name && *slot_name_given && !*slot_name) + { + if (enabled && *enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and enabled are mutually exclusive options"))); + + if (create_slot && create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and create slot are mutually exclusive options"))); + } } /* @@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *synchronous_commit; char *conninfo; char *slotname; + bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data, - &synchronous_commit); + &enabled, &create_slot, &slotname_given, + &slotname, ©_data, &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (slotname == NULL) + if (!slotname_given && slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ if (synchronous_commit == NULL) synchronous_commit = "off"; @@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = @@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (create_slot) { + Assert(slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_NOEXPORT_SNAPSHOT, &lsn); ereport(NOTICE, @@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) HeapTuple tup; Oid subid; bool update_tuple = false; + Subscription *sub; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) stmt->subname); subid = HeapTupleGetOid(tup); + sub = GetSubscription(subid, false); /* Form a new tuple. */ memset(values, 0, sizeof(values)); @@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slotname; + bool slotname_given; char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL, - &synchronous_commit); + NULL, &slotname_given, &slotname, + NULL, &synchronous_commit); - if (slot_name) + if (slotname_given) { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + if (sub->enabled && !slotname) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set slot_name = NONE for enabled subscription"))); + + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } + if (synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = @@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); Assert(enabled_given); + if (!sub->slotname && enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot enable subscription that does not have a slot name"))); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt) /* Refresh if user asked us to. */ if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); AlterSubscription_refresh(sub, copy_data); @@ -751,15 +811,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) StringInfoData cmd; /* - * Since dropping a replication slot is not transactional, the replication - * slot stays dropped even if the transaction rolls back. So we cannot - * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. - */ - if (stmt->drop_slot) - PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); - - /* * Lock pg_subscription with AccessExclusiveLock to ensure * that the launcher doesn't restart new worker during dropping * the subscription @@ -817,8 +868,24 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + slotname = NULL; + + /* + * Since dropping a replication slot is not transactional, the replication + * slot stays dropped even if the transaction rolls back. So we cannot + * run DROP SUBSCRIPTION inside a transaction block if dropping the + * replication slot. + * + * XXX The command name should really be something like "DROP SUBSCRIPTION + * of a subscription that is associated with a replication slot", but we + * don't have the proper facilities for that. + */ + if (slotname) + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); + ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); @@ -843,8 +910,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (originid != InvalidRepOriginId) replorigin_drop(originid); - /* If the user asked to not drop the slot, we are done mow.*/ - if (!stmt->drop_slot) + /* If there is no slot associated with the subscription, we can finish here. */ + if (!slotname) { heap_close(rel, NoLock); return; @@ -857,14 +924,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) load_file("libpqwalreceiver", false); initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) ereport(ERROR, (errmsg("could not connect to publisher when attempting to " "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err))); + errdetail("The error was: %s", err), + errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) " + "to disassociate the subscription from the slot."))); PG_TRY(); { |