diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 133 |
1 files changed, 68 insertions, 65 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1f7274bc572..89358a4ec3c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -94,7 +94,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; /* Parse options */ - foreach (lc, options) + foreach(lc, options) { DefElem *defel = (DefElem *) lfirst(lc); @@ -200,8 +200,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, } /* - * Do additional checking for disallowed combination when - * slot_name = NONE was used. + * Do additional checking for disallowed combination when slot_name = NONE + * was used. */ if (slot_name && *slot_name_given && !*slot_name) { @@ -367,7 +367,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = - publicationListToArray(publications); + publicationListToArray(publications); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -386,12 +386,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (connect) { - XLogRecPtr lsn; - char *err; - WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; + XLogRecPtr lsn; + char *err; + WalReceiverConn *wrconn; + List *tables; + ListCell *lc; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -412,7 +412,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * info. */ tables = fetch_table_list(wrconn, publications); - foreach (lc, tables) + foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -431,9 +431,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) (errmsg("synchronized table states"))); /* - * If requested, create permanent slot for the subscription. - * We won't use the initial snapshot for anything, so no need - * to export it. + * If requested, create permanent slot for the subscription. We + * won't use the initial snapshot for anything, so no need to + * export it. */ if (create_slot) { @@ -442,8 +442,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) walrcv_create_slot(wrconn, slotname, false, CRS_NOEXPORT_SNAPSHOT, &lsn); ereport(NOTICE, - (errmsg("created replication slot \"%s\" on publisher", - slotname))); + (errmsg("created replication slot \"%s\" on publisher", + slotname))); } } PG_CATCH(); @@ -478,7 +478,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) static void AlterSubscription_refresh(Subscription *sub, bool copy_data) { - char *err; + char *err; List *pubrel_names; List *subrel_states; Oid *subrel_local_oids; @@ -505,31 +505,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) subrel_states = GetSubscriptionRelations(sub->oid); /* - * Build qsorted array of local table oids for faster lookup. - * This can potentially contain all tables in the database so - * speed of lookup is important. + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup is + * important. */ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); off = 0; foreach(lc, subrel_states) { SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + subrel_local_oids[off++] = relstate->relid; } qsort(subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp); /* - * Walk over the remote tables and try to match them to locally - * known tables. If the table is not known locally create a new state - * for it. + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for it. * * Also builds array of local oids of remote tables for the next step. */ off = 0; pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); - foreach (lc, pubrel_names) + foreach(lc, pubrel_names) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -546,7 +546,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) list_length(subrel_states), sizeof(Oid), oid_cmp)) { SetSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr); ereport(NOTICE, (errmsg("added subscription for table %s.%s", @@ -556,20 +556,20 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) } /* - * Next remove state for tables we should not care about anymore using - * the data we collected above + * Next remove state for tables we should not care about anymore using the + * data we collected above */ qsort(pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp); for (off = 0; off < list_length(subrel_states); off++) { - Oid relid = subrel_local_oids[off]; + Oid relid = subrel_local_oids[off]; if (!bsearch(&relid, pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp)) { - char *namespace; + char *namespace; RemoveSubscriptionRel(sub->oid, relid); @@ -596,7 +596,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) HeapTuple tup; Oid subid; bool update_tuple = false; - Subscription *sub; + Subscription *sub; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -644,7 +644,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) if (slotname) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(slotname)); else nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; @@ -663,8 +663,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_ENABLED: { - bool enabled, - enabled_given; + bool enabled, + enabled_given; parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, @@ -702,14 +702,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_PUBLICATION: case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { - bool copy_data; + bool copy_data; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = - publicationListToArray(stmt->publication); + publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; @@ -733,7 +733,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_REFRESH: { - bool copy_data; + bool copy_data; if (!sub->enabled) ereport(ERROR, @@ -791,14 +791,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *slotname; char originname[NAMEDATALEN]; char *err = NULL; - RepOriginId originid; - WalReceiverConn *wrconn = NULL; - StringInfoData cmd; + RepOriginId originid; + WalReceiverConn *wrconn = NULL; + StringInfoData cmd; /* - * Lock pg_subscription with AccessExclusiveLock to ensure - * that the launcher doesn't restart new worker during dropping - * the subscription + * Lock pg_subscription with AccessExclusiveLock to ensure that the + * launcher doesn't restart new worker during dropping the subscription */ rel = heap_open(SubscriptionRelationId, AccessExclusiveLock); @@ -833,8 +832,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) InvokeObjectDropHook(SubscriptionRelationId, subid, 0); /* - * Lock the subscription so nobody else can do anything with it - * (including the replication workers). + * Lock the subscription so nobody else can do anything with it (including + * the replication workers). */ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); @@ -895,7 +894,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (originid != InvalidRepOriginId) replorigin_drop(originid); - /* If there is no slot associated with the subscription, we can finish here. */ + /* + * If there is no slot associated with the subscription, we can finish + * here. + */ if (!slotname) { heap_close(rel, NoLock); @@ -903,8 +905,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } /* - * Otherwise drop the replication slot at the publisher node using - * the replication connection. + * Otherwise drop the replication slot at the publisher node using the + * replication connection. */ load_file("libpqwalreceiver", false); @@ -922,14 +924,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - WalRcvExecResult *res; + WalRcvExecResult *res; + res = walrcv_exec(wrconn, cmd.data, 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, - (errmsg("could not drop the replication slot \"%s\" on publisher", - slotname), - errdetail("The error was: %s", res->err))); + (errmsg("could not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", res->err))); else ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", @@ -973,9 +976,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) if (!superuser_arg(newOwnerId)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied to change owner of subscription \"%s\"", - NameStr(form->subname)), - errhint("The owner of a subscription must be a superuser."))); + errmsg("permission denied to change owner of subscription \"%s\"", + NameStr(form->subname)), + errhint("The owner of a subscription must be a superuser."))); form->subowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); @@ -1055,24 +1058,24 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) static List * fetch_table_list(WalReceiverConn *wrconn, List *publications) { - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; - List *tablelist = NIL; + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; Assert(list_length(publications) > 0); initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); first = true; - foreach (lc, publications) + foreach(lc, publications) { - char *pubname = strVal(lfirst(lc)); + char *pubname = strVal(lfirst(lc)); if (first) first = false; |