aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c133
1 files changed, 128 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f87796e5afe..66d800f0cff 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -92,6 +92,10 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin(WalReceiverConn *wrconn,
+ List *publications, bool copydata,
+ char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
check_publications(wrconn, publications);
+ check_publications_origin(wrconn, publications, opts.copy_data,
+ opts.origin, NULL, 0, stmt->subname);
/*
* Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
ListCell *lc;
int off;
int remove_rel_len;
+ int subrel_count;
Relation rel = NULL;
typedef struct SubRemoveRels
{
@@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid, false);
+ subrel_count = list_length(subrel_states);
/*
* Build qsorted array of local table oids for faster lookup. This can
* potentially contain all tables in the database so speed of lookup
* is important.
*/
- subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ subrel_local_oids = palloc(subrel_count * sizeof(Oid));
off = 0;
foreach(lc, subrel_states)
{
@@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
subrel_local_oids[off++] = relstate->relid;
}
- qsort(subrel_local_oids, list_length(subrel_states),
+ qsort(subrel_local_oids, subrel_count,
sizeof(Oid), oid_cmp);
+ check_publications_origin(wrconn, sub->publications, copy_data,
+ sub->origin, subrel_local_oids,
+ subrel_count, sub->name);
+
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them.
*/
- sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+ sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
/*
* Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
- list_length(subrel_states), sizeof(Oid), oid_cmp))
+ subrel_count, sizeof(Oid), oid_cmp))
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
sizeof(Oid), oid_cmp);
remove_rel_len = 0;
- for (off = 0; off < list_length(subrel_states); off++)
+ for (off = 0; off < subrel_count; off++)
{
Oid relid = subrel_local_oids[off];
@@ -1785,6 +1797,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
}
/*
+ * Check and log a warning if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if "copy_data = true"
+ * and "origin = none" for CREATE SUBSCRIPTION and
+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
+ * having origin might have been copied.
+ *
+ * This check need not be performed on the tables that are already added
+ * because incremental sync for those tables will happen through WAL and the
+ * origin of the data can be identified from the WAL records.
+ *
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber.
+ */
+static void
+check_publications_origin(WalReceiverConn *wrconn, List *publications,
+ bool copydata, char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[1] = {TEXTOID};
+ List *publist = NIL;
+ int i;
+
+ if (!copydata || !origin ||
+ (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+ return;
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd,
+ "SELECT DISTINCT P.pubname AS pubname\n"
+ "FROM pg_publication P,\n"
+ " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+ " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+ "WHERE C.oid = GPT.relid AND P.pubname IN (");
+ get_publications_str(publications, &cmd, true);
+ appendStringInfoString(&cmd, ")\n");
+
+ /*
+ * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
+ * the list of relation oids that are already present on the subscriber.
+ * This check should be skipped for these tables.
+ */
+ for (i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
+
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
+
+ res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *pubname;
+ bool isnull;
+
+ pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ publist = list_append_unique(publist, makeString(pubname));
+ }
+
+ /*
+ * Log a warning if the publisher has subscribed to the same table from
+ * some other publisher. We cannot know the origin of data during the
+ * initial sync. Data origins can be found only from the WAL by looking at
+ * the origin id.
+ *
+ * XXX: For simplicity, we don't check whether the table has any data or
+ * not. If the table doesn't have any data then we don't need to
+ * distinguish between data having origin and data not having origin so we
+ * can avoid logging a warning in that case.
+ */
+ if (publist)
+ {
+ StringInfo pubnames = makeStringInfo();
+
+ /* Prepare the list of publication(s) for warning message. */
+ get_publications_str(publist, pubnames, false);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
+ subname),
+ errdetail_plural("Subscribed publication %s is subscribing to other publications.",
+ "Subscribed publications %s are subscribing to other publications.",
+ list_length(publist), pubnames->data),
+ errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
*