diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 177 |
1 files changed, 152 insertions, 25 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5282b797359..517c8edd3b2 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -47,6 +47,8 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_duplicates_in_publist(List *publist, Datum *datums); +static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -293,8 +295,6 @@ publicationListToArray(List *publist) { ArrayType *arr; Datum *datums; - int j = 0; - ListCell *cell; MemoryContext memcxt; MemoryContext oldcxt; @@ -306,28 +306,7 @@ publicationListToArray(List *publist) datums = (Datum *) palloc(sizeof(Datum) * list_length(publist)); - foreach(cell, publist) - { - char *name = strVal(lfirst(cell)); - ListCell *pcell; - - /* Check for duplicates. */ - foreach(pcell, publist) - { - char *pname = strVal(lfirst(pcell)); - - if (pcell == cell) - break; - - if (strcmp(name, pname) == 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("publication name \"%s\" used more than once", - pname))); - } - - datums[j++] = CStringGetTextDatum(name); - } + check_duplicates_in_publist(publist, datums); MemoryContextSwitchTo(oldcxt); @@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; break; - case ALTER_SUBSCRIPTION_PUBLICATION: + case ALTER_SUBSCRIPTION_SET_PUBLICATION: { bool copy_data; bool refresh; @@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) break; } + case ALTER_SUBSCRIPTION_ADD_PUBLICATION: + case ALTER_SUBSCRIPTION_DROP_PUBLICATION: + { + bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; + bool copy_data; + bool refresh; + List *publist; + + publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); + + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + isadd ? ©_data : NULL, /* for drop, no + * "copy_data" */ + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ + + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publist); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Refresh if user asked us to. */ + if (refresh) + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); + + /* Only refresh the added/dropped list of publications. */ + sub->publications = stmt->publication; + + AlterSubscription_refresh(sub, copy_data); + } + + break; + } + case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; @@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) errhint("Use %s to disassociate the subscription from the slot.", "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); } + +/* + * Check for duplicates in the given list of publications and error out if + * found one. Add publications to datums as text datums, if datums is not + * NULL. + */ +static void +check_duplicates_in_publist(List *publist, Datum *datums) +{ + ListCell *cell; + int j = 0; + + foreach(cell, publist) + { + char *name = strVal(lfirst(cell)); + ListCell *pcell; + + foreach(pcell, publist) + { + char *pname = strVal(lfirst(pcell)); + + if (pcell == cell) + break; + + if (strcmp(name, pname) == 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" used more than once", + pname))); + } + + if (datums) + datums[j++] = CStringGetTextDatum(name); + } +} + +/* + * Merge current subscription's publications and user-specified publications + * from ADD/DROP PUBLICATIONS. + * + * If addpub is true, we will add the list of publications into oldpublist. + * Otherwise, we will delete the list of publications from oldpublist. The + * returned list is a copy, oldpublist itself is not changed. + * + * subname is the subscription name, for error messages. + */ +static List * +merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname) +{ + ListCell *lc; + + oldpublist = list_copy(oldpublist); + + check_duplicates_in_publist(newpublist, NULL); + + foreach(lc, newpublist) + { + char *name = strVal(lfirst(lc)); + ListCell *lc2; + bool found = false; + + foreach(lc2, oldpublist) + { + char *pubname = strVal(lfirst(lc2)); + + if (strcmp(name, pubname) == 0) + { + found = true; + if (addpub) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("publication \"%s\" is already in subscription \"%s\"", + name, subname))); + else + oldpublist = foreach_delete_current(oldpublist, lc2); + + break; + } + } + + if (addpub && !found) + oldpublist = lappend(oldpublist, makeString(name)); + else if (!addpub && !found) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication \"%s\" is not in subscription \"%s\"", + name, subname))); + } + + /* + * XXX Probably no strong reason for this, but for now it's to make ALTER + * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION. + */ + if (!oldpublist) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("subscription must contain at least one publication"))); + + return oldpublist; +} |