aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c177
1 files changed, 152 insertions, 25 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5282b797359..517c8edd3b2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,8 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_duplicates_in_publist(List *publist, Datum *datums);
+static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -293,8 +295,6 @@ publicationListToArray(List *publist)
{
ArrayType *arr;
Datum *datums;
- int j = 0;
- ListCell *cell;
MemoryContext memcxt;
MemoryContext oldcxt;
@@ -306,28 +306,7 @@ publicationListToArray(List *publist)
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
- foreach(cell, publist)
- {
- char *name = strVal(lfirst(cell));
- ListCell *pcell;
-
- /* Check for duplicates. */
- foreach(pcell, publist)
- {
- char *pname = strVal(lfirst(pcell));
-
- if (pcell == cell)
- break;
-
- if (strcmp(name, pname) == 0)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("publication name \"%s\" used more than once",
- pname)));
- }
-
- datums[j++] = CStringGetTextDatum(name);
- }
+ check_duplicates_in_publist(publist, datums);
MemoryContextSwitchTo(oldcxt);
@@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
update_tuple = true;
break;
- case ALTER_SUBSCRIPTION_PUBLICATION:
+ case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
bool copy_data;
bool refresh;
@@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
break;
}
+ case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+ case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+ {
+ bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+ bool copy_data;
+ bool refresh;
+ List *publist;
+
+ publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ isadd ? &copy_data : NULL, /* for drop, no
+ * "copy_data" */
+ NULL, /* no "synchronous_commit" */
+ &refresh,
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "streaming" */
+
+ values[Anum_pg_subscription_subpublications - 1] =
+ publicationListToArray(publist);
+ replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+ update_tuple = true;
+
+ /* Refresh if user asked us to. */
+ if (refresh)
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+ errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
+ /* Only refresh the added/dropped list of publications. */
+ sub->publications = stmt->publication;
+
+ AlterSubscription_refresh(sub, copy_data);
+ }
+
+ break;
+ }
+
case ALTER_SUBSCRIPTION_REFRESH:
{
bool copy_data;
@@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
errhint("Use %s to disassociate the subscription from the slot.",
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
}
+
+/*
+ * Check for duplicates in the given list of publications and error out if
+ * found one. Add publications to datums as text datums, if datums is not
+ * NULL.
+ */
+static void
+check_duplicates_in_publist(List *publist, Datum *datums)
+{
+ ListCell *cell;
+ int j = 0;
+
+ foreach(cell, publist)
+ {
+ char *name = strVal(lfirst(cell));
+ ListCell *pcell;
+
+ foreach(pcell, publist)
+ {
+ char *pname = strVal(lfirst(pcell));
+
+ if (pcell == cell)
+ break;
+
+ if (strcmp(name, pname) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication name \"%s\" used more than once",
+ pname)));
+ }
+
+ if (datums)
+ datums[j++] = CStringGetTextDatum(name);
+ }
+}
+
+/*
+ * Merge current subscription's publications and user-specified publications
+ * from ADD/DROP PUBLICATIONS.
+ *
+ * If addpub is true, we will add the list of publications into oldpublist.
+ * Otherwise, we will delete the list of publications from oldpublist. The
+ * returned list is a copy, oldpublist itself is not changed.
+ *
+ * subname is the subscription name, for error messages.
+ */
+static List *
+merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
+{
+ ListCell *lc;
+
+ oldpublist = list_copy(oldpublist);
+
+ check_duplicates_in_publist(newpublist, NULL);
+
+ foreach(lc, newpublist)
+ {
+ char *name = strVal(lfirst(lc));
+ ListCell *lc2;
+ bool found = false;
+
+ foreach(lc2, oldpublist)
+ {
+ char *pubname = strVal(lfirst(lc2));
+
+ if (strcmp(name, pubname) == 0)
+ {
+ found = true;
+ if (addpub)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("publication \"%s\" is already in subscription \"%s\"",
+ name, subname)));
+ else
+ oldpublist = foreach_delete_current(oldpublist, lc2);
+
+ break;
+ }
+ }
+
+ if (addpub && !found)
+ oldpublist = lappend(oldpublist, makeString(name));
+ else if (!addpub && !found)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication \"%s\" is not in subscription \"%s\"",
+ name, subname)));
+ }
+
+ /*
+ * XXX Probably no strong reason for this, but for now it's to make ALTER
+ * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
+ */
+ if (!oldpublist)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("subscription must contain at least one publication")));
+
+ return oldpublist;
+}