aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/subscriptioncmds.c177
-rw-r--r--src/backend/parser/gram.y22
-rw-r--r--src/bin/psql/tab-complete.c12
-rw-r--r--src/include/nodes/parsenodes.h4
-rw-r--r--src/test/regress/expected/subscription.out39
-rw-r--r--src/test/regress/sql/subscription.sql31
6 files changed, 253 insertions, 32 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5282b797359..517c8edd3b2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,8 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_duplicates_in_publist(List *publist, Datum *datums);
+static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -293,8 +295,6 @@ publicationListToArray(List *publist)
{
ArrayType *arr;
Datum *datums;
- int j = 0;
- ListCell *cell;
MemoryContext memcxt;
MemoryContext oldcxt;
@@ -306,28 +306,7 @@ publicationListToArray(List *publist)
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
- foreach(cell, publist)
- {
- char *name = strVal(lfirst(cell));
- ListCell *pcell;
-
- /* Check for duplicates. */
- foreach(pcell, publist)
- {
- char *pname = strVal(lfirst(pcell));
-
- if (pcell == cell)
- break;
-
- if (strcmp(name, pname) == 0)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("publication name \"%s\" used more than once",
- pname)));
- }
-
- datums[j++] = CStringGetTextDatum(name);
- }
+ check_duplicates_in_publist(publist, datums);
MemoryContextSwitchTo(oldcxt);
@@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
update_tuple = true;
break;
- case ALTER_SUBSCRIPTION_PUBLICATION:
+ case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
bool copy_data;
bool refresh;
@@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
break;
}
+ case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+ case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+ {
+ bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+ bool copy_data;
+ bool refresh;
+ List *publist;
+
+ publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ isadd ? &copy_data : NULL, /* for drop, no
+ * "copy_data" */
+ NULL, /* no "synchronous_commit" */
+ &refresh,
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "streaming" */
+
+ values[Anum_pg_subscription_subpublications - 1] =
+ publicationListToArray(publist);
+ replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+ update_tuple = true;
+
+ /* Refresh if user asked us to. */
+ if (refresh)
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+ errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
+ /* Only refresh the added/dropped list of publications. */
+ sub->publications = stmt->publication;
+
+ AlterSubscription_refresh(sub, copy_data);
+ }
+
+ break;
+ }
+
case ALTER_SUBSCRIPTION_REFRESH:
{
bool copy_data;
@@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
errhint("Use %s to disassociate the subscription from the slot.",
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
}
+
+/*
+ * Check for duplicates in the given list of publications and error out if
+ * found one. Add publications to datums as text datums, if datums is not
+ * NULL.
+ */
+static void
+check_duplicates_in_publist(List *publist, Datum *datums)
+{
+ ListCell *cell;
+ int j = 0;
+
+ foreach(cell, publist)
+ {
+ char *name = strVal(lfirst(cell));
+ ListCell *pcell;
+
+ foreach(pcell, publist)
+ {
+ char *pname = strVal(lfirst(pcell));
+
+ if (pcell == cell)
+ break;
+
+ if (strcmp(name, pname) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication name \"%s\" used more than once",
+ pname)));
+ }
+
+ if (datums)
+ datums[j++] = CStringGetTextDatum(name);
+ }
+}
+
+/*
+ * Merge current subscription's publications and user-specified publications
+ * from ADD/DROP PUBLICATIONS.
+ *
+ * If addpub is true, we will add the list of publications into oldpublist.
+ * Otherwise, we will delete the list of publications from oldpublist. The
+ * returned list is a copy, oldpublist itself is not changed.
+ *
+ * subname is the subscription name, for error messages.
+ */
+static List *
+merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
+{
+ ListCell *lc;
+
+ oldpublist = list_copy(oldpublist);
+
+ check_duplicates_in_publist(newpublist, NULL);
+
+ foreach(lc, newpublist)
+ {
+ char *name = strVal(lfirst(lc));
+ ListCell *lc2;
+ bool found = false;
+
+ foreach(lc2, oldpublist)
+ {
+ char *pubname = strVal(lfirst(lc2));
+
+ if (strcmp(name, pubname) == 0)
+ {
+ found = true;
+ if (addpub)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("publication \"%s\" is already in subscription \"%s\"",
+ name, subname)));
+ else
+ oldpublist = foreach_delete_current(oldpublist, lc2);
+
+ break;
+ }
+ }
+
+ if (addpub && !found)
+ oldpublist = lappend(oldpublist, makeString(name));
+ else if (!addpub && !found)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication \"%s\" is not in subscription \"%s\"",
+ name, subname)));
+ }
+
+ /*
+ * XXX Probably no strong reason for this, but for now it's to make ALTER
+ * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
+ */
+ if (!oldpublist)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("subscription must contain at least one publication")));
+
+ return oldpublist;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 05cc2c9ae0d..38c36a49360 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9687,11 +9687,31 @@ AlterSubscriptionStmt:
n->options = $6;
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+ n->subname = $3;
+ n->publication = $6;
+ n->options = $7;
+ $$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+ n->subname = $3;
+ n->publication = $6;
+ n->options = $7;
+ $$ = (Node *)n;
+ }
| ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition
{
AlterSubscriptionStmt *n =
makeNode(AlterSubscriptionStmt);
- n->kind = ALTER_SUBSCRIPTION_PUBLICATION;
+ n->kind = ALTER_SUBSCRIPTION_SET_PUBLICATION;
n->subname = $3;
n->publication = $6;
n->options = $7;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index a053bc1e45d..832bcdfc3bf 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1652,7 +1652,8 @@ psql_completion(const char *text, int start, int end)
/* ALTER SUBSCRIPTION <name> */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
- "RENAME TO", "REFRESH PUBLICATION", "SET");
+ "RENAME TO", "REFRESH PUBLICATION", "SET",
+ "ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
TailMatches("REFRESH", "PUBLICATION"))
@@ -1672,14 +1673,15 @@ psql_completion(const char *text, int start, int end)
{
/* complete with nothing here as this refers to remote publications */
}
- /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> */
+ /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
- TailMatches("SET", "PUBLICATION", MatchAny))
+ TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny))
COMPLETE_WITH("WITH (");
- /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> WITH ( */
+ /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
- TailMatches("SET", "PUBLICATION", MatchAny, "WITH", "("))
+ TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
COMPLETE_WITH("copy_data", "refresh");
+
/* ALTER SCHEMA <name> */
else if (Matches("ALTER", "SCHEMA", MatchAny))
COMPLETE_WITH("OWNER TO", "RENAME TO");
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 97b80dfd210..807fbaceaac 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3618,7 +3618,9 @@ typedef enum AlterSubscriptionType
{
ALTER_SUBSCRIPTION_OPTIONS,
ALTER_SUBSCRIPTION_CONNECTION,
- ALTER_SUBSCRIPTION_PUBLICATION,
+ ALTER_SUBSCRIPTION_SET_PUBLICATION,
+ ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+ ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
ALTER_SUBSCRIPTION_ENABLED
} AlterSubscriptionType;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 14a430221d6..09576c176b6 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -200,6 +200,45 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+ERROR: publication "testpub" is already in subscription "regress_testsub"
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR: publication name "testpub1" used more than once
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+ERROR: publication "testpub1" is already in subscription "regress_testsub"
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR: publication name "testpub1" used more than once
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+ERROR: subscription must contain at least one publication
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+ERROR: publication "testpub3" is not in subscription "regress_testsub"
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+ERROR: unrecognized subscription parameter: "copy_data"
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub
WITH (connect = false, create_slot = false, copy_data = false);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 81e65e5e642..308c098c144 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -145,6 +145,37 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub