aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c141
1 files changed, 105 insertions, 36 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index fde9e6e20cd..b76cdc55384 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
*/
static void
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
- bool *enabled, bool *create_slot, char **slot_name,
+ bool *enabled, bool *create_slot,
+ bool *slot_name_given, char **slot_name,
bool *copy_data, char **synchronous_commit)
{
ListCell *lc;
@@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
if (create_slot)
*create_slot = true;
if (slot_name)
+ {
+ *slot_name_given = false;
*slot_name = NULL;
+ }
if (copy_data)
*copy_data = true;
if (synchronous_commit)
@@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
}
else if (strcmp(defel->defname, "slot name") == 0 && slot_name)
{
- if (*slot_name)
+ if (*slot_name_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
+ *slot_name_given = true;
*slot_name = defGetString(defel);
+
+ /* Setting slot_name = NONE is treated as no slot name. */
+ if (strcmp(*slot_name, "none") == 0)
+ *slot_name = NULL;
}
else if (strcmp(defel->defname, "copy data") == 0 && copy_data)
{
@@ -194,17 +203,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
if (connect && !*connect)
{
/* Check for incompatible options from the user. */
- if (*enabled_given && *enabled)
+ if (enabled && *enabled_given && *enabled)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("noconnect and enabled are mutually exclusive options")));
- if (create_slot_given && *create_slot)
+ if (create_slot && create_slot_given && *create_slot)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("noconnect and create slot are mutually exclusive options")));
- if (copy_data_given && *copy_data)
+ if (copy_data && copy_data_given && *copy_data)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("noconnect and copy data are mutually exclusive options")));
@@ -214,6 +223,23 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*create_slot = false;
*copy_data = false;
}
+
+ /*
+ * Do additional checking for disallowed combination when
+ * slot_name = NONE was used.
+ */
+ if (slot_name && *slot_name_given && !*slot_name)
+ {
+ if (enabled && *enabled_given && *enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("slot_name = NONE and enabled are mutually exclusive options")));
+
+ if (create_slot && create_slot_given && *create_slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("slot_name = NONE and create slot are mutually exclusive options")));
+ }
}
/*
@@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char *synchronous_commit;
char *conninfo;
char *slotname;
+ bool slotname_given;
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
@@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, &connect, &enabled_given,
- &enabled, &create_slot, &slotname, &copy_data,
- &synchronous_commit);
+ &enabled, &create_slot, &slotname_given,
+ &slotname, &copy_data, &synchronous_commit);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
stmt->subname)));
}
- if (slotname == NULL)
+ if (!slotname_given && slotname == NULL)
slotname = stmt->subname;
+
/* The default for synchronous_commit of subscriptions is off. */
if (synchronous_commit == NULL)
synchronous_commit = "off";
@@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
- values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ if (slotname)
+ values[Anum_pg_subscription_subslotname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ else
+ nulls[Anum_pg_subscription_subslotname - 1] = true;
values[Anum_pg_subscription_subsynccommit - 1] =
CStringGetTextDatum(synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
@@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
if (create_slot)
{
+ Assert(slotname);
+
walrcv_create_slot(wrconn, slotname, false,
CRS_NOEXPORT_SNAPSHOT, &lsn);
ereport(NOTICE,
@@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
HeapTuple tup;
Oid subid;
bool update_tuple = false;
+ Subscription *sub;
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
@@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
stmt->subname);
subid = HeapTupleGetOid(tup);
+ sub = GetSubscription(subid, false);
/* Form a new tuple. */
memset(values, 0, sizeof(values));
@@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
{
case ALTER_SUBSCRIPTION_OPTIONS:
{
- char *slot_name;
+ char *slotname;
+ bool slotname_given;
char *synchronous_commit;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, &slot_name, NULL,
- &synchronous_commit);
+ NULL, &slotname_given, &slotname,
+ NULL, &synchronous_commit);
- if (slot_name)
+ if (slotname_given)
{
- values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+ if (sub->enabled && !slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot set slot_name = NONE for enabled subscription")));
+
+ if (slotname)
+ values[Anum_pg_subscription_subslotname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ else
+ nulls[Anum_pg_subscription_subslotname - 1] = true;
replaces[Anum_pg_subscription_subslotname - 1] = true;
}
+
if (synchronous_commit)
{
values[Anum_pg_subscription_subsynccommit - 1] =
@@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
parse_subscription_options(stmt->options, NULL,
&enabled_given, &enabled, NULL,
- NULL, NULL, NULL);
+ NULL, NULL, NULL, NULL);
Assert(enabled_given);
+ if (!sub->slotname && enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot enable subscription that does not have a slot name")));
+
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
@@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH:
{
bool copy_data;
- Subscription *sub = GetSubscription(subid, false);
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data, NULL);
+ NULL, NULL, NULL, &copy_data,
+ NULL);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
@@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
/* Refresh if user asked us to. */
if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH)
{
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
@@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_REFRESH:
{
bool copy_data;
- Subscription *sub = GetSubscription(subid, false);
+
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data, NULL);
+ NULL, NULL, NULL, &copy_data,
+ NULL);
AlterSubscription_refresh(sub, copy_data);
@@ -751,15 +811,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
StringInfoData cmd;
/*
- * Since dropping a replication slot is not transactional, the replication
- * slot stays dropped even if the transaction rolls back. So we cannot
- * run DROP SUBSCRIPTION inside a transaction block if dropping the
- * replication slot.
- */
- if (stmt->drop_slot)
- PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
-
- /*
* Lock pg_subscription with AccessExclusiveLock to ensure
* that the launcher doesn't restart new worker during dropping
* the subscription
@@ -817,8 +868,24 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
Anum_pg_subscription_subslotname, &isnull);
- Assert(!isnull);
- slotname = pstrdup(NameStr(*DatumGetName(datum)));
+ if (!isnull)
+ slotname = pstrdup(NameStr(*DatumGetName(datum)));
+ else
+ slotname = NULL;
+
+ /*
+ * Since dropping a replication slot is not transactional, the replication
+ * slot stays dropped even if the transaction rolls back. So we cannot
+ * run DROP SUBSCRIPTION inside a transaction block if dropping the
+ * replication slot.
+ *
+ * XXX The command name should really be something like "DROP SUBSCRIPTION
+ * of a subscription that is associated with a replication slot", but we
+ * don't have the proper facilities for that.
+ */
+ if (slotname)
+ PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
+
ObjectAddressSet(myself, SubscriptionRelationId, subid);
EventTriggerSQLDropAddObject(&myself, true, true);
@@ -843,8 +910,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (originid != InvalidRepOriginId)
replorigin_drop(originid);
- /* If the user asked to not drop the slot, we are done mow.*/
- if (!stmt->drop_slot)
+ /* If there is no slot associated with the subscription, we can finish here. */
+ if (!slotname)
{
heap_close(rel, NoLock);
return;
@@ -857,14 +924,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file("libpqwalreceiver", false);
initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname);
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
wrconn = walrcv_connect(conninfo, true, subname, &err);
if (wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
"drop the replication slot \"%s\"", slotname),
- errdetail("The error was: %s", err)));
+ errdetail("The error was: %s", err),
+ errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) "
+ "to disassociate the subscription from the slot.")));
PG_TRY();
{