diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 133 |
1 files changed, 128 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f87796e5afe..66d800f0cff 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -92,6 +92,10 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin(WalReceiverConn *wrconn, + List *publications, bool copydata, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { check_publications(wrconn, publications); + check_publications_origin(wrconn, publications, opts.copy_data, + opts.origin, NULL, 0, stmt->subname); /* * Set sync state based on if we were asked to do data copy or @@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, ListCell *lc; int off; int remove_rel_len; + int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels { @@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_count = list_length(subrel_states); /* * Build qsorted array of local table oids for faster lookup. This can * potentially contain all tables in the database so speed of lookup * is important. */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; foreach(lc, subrel_states) { @@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), + qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); + check_publications_origin(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count, sub->name); + /* * Rels that we want to remove from subscription and drop any slots * and origins corresponding to them. */ - sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); /* * Walk over the remote tables and try to match them to locally known @@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + subrel_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; @@ -1785,6 +1797,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) } /* + * Check and log a warning if the publisher has subscribed to the same table + * from some other publisher. This check is required only if "copy_data = true" + * and "origin = none" for CREATE SUBSCRIPTION and + * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data + * having origin might have been copied. + * + * This check need not be performed on the tables that are already added + * because incremental sync for those tables will happen through WAL and the + * origin of the data can be identified from the WAL records. + * + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. + */ +static void +check_publications_origin(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + List *publist = NIL; + int i; + + if (!copydata || !origin || + (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND P.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains + * the list of relation oids that are already present on the subscriber. + * This check should be skipped for these tables. + */ + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same table from + * some other publisher. We cannot know the origin of data during the + * initial sync. Data origins can be found only from the WAL by looking at + * the origin id. + * + * XXX: For simplicity, we don't check whether the table has any data or + * not. If the table doesn't have any data then we don't need to + * distinguish between data having origin and data not having origin so we + * can avoid logging a warning in that case. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + get_publications_str(publist, pubnames, false); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", + subname), + errdetail_plural("Subscribed publication %s is subscribing to other publications.", + "Subscribed publications %s are subscribing to other publications.", + list_length(publist), pubnames->data), + errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* * Get the list of tables which belong to specified publications on the * publisher connection. * |