aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c133
1 files changed, 68 insertions, 65 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 1f7274bc572..89358a4ec3c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -94,7 +94,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*synchronous_commit = NULL;
/* Parse options */
- foreach (lc, options)
+ foreach(lc, options)
{
DefElem *defel = (DefElem *) lfirst(lc);
@@ -200,8 +200,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
}
/*
- * Do additional checking for disallowed combination when
- * slot_name = NONE was used.
+ * Do additional checking for disallowed combination when slot_name = NONE
+ * was used.
*/
if (slot_name && *slot_name_given && !*slot_name)
{
@@ -367,7 +367,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subsynccommit - 1] =
CStringGetTextDatum(synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
- publicationListToArray(publications);
+ publicationListToArray(publications);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -386,12 +386,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
if (connect)
{
- XLogRecPtr lsn;
- char *err;
- WalReceiverConn *wrconn;
- List *tables;
- ListCell *lc;
- char table_state;
+ XLogRecPtr lsn;
+ char *err;
+ WalReceiverConn *wrconn;
+ List *tables;
+ ListCell *lc;
+ char table_state;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -412,7 +412,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* info.
*/
tables = fetch_table_list(wrconn, publications);
- foreach (lc, tables)
+ foreach(lc, tables)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
@@ -431,9 +431,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
(errmsg("synchronized table states")));
/*
- * If requested, create permanent slot for the subscription.
- * We won't use the initial snapshot for anything, so no need
- * to export it.
+ * If requested, create permanent slot for the subscription. We
+ * won't use the initial snapshot for anything, so no need to
+ * export it.
*/
if (create_slot)
{
@@ -442,8 +442,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
walrcv_create_slot(wrconn, slotname, false,
CRS_NOEXPORT_SNAPSHOT, &lsn);
ereport(NOTICE,
- (errmsg("created replication slot \"%s\" on publisher",
- slotname)));
+ (errmsg("created replication slot \"%s\" on publisher",
+ slotname)));
}
}
PG_CATCH();
@@ -478,7 +478,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
- char *err;
+ char *err;
List *pubrel_names;
List *subrel_states;
Oid *subrel_local_oids;
@@ -505,31 +505,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
subrel_states = GetSubscriptionRelations(sub->oid);
/*
- * Build qsorted array of local table oids for faster lookup.
- * This can potentially contain all tables in the database so
- * speed of lookup is important.
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup is
+ * important.
*/
subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
off = 0;
foreach(lc, subrel_states)
{
SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
subrel_local_oids[off++] = relstate->relid;
}
qsort(subrel_local_oids, list_length(subrel_states),
sizeof(Oid), oid_cmp);
/*
- * Walk over the remote tables and try to match them to locally
- * known tables. If the table is not known locally create a new state
- * for it.
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for it.
*
* Also builds array of local oids of remote tables for the next step.
*/
off = 0;
pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
- foreach (lc, pubrel_names)
+ foreach(lc, pubrel_names)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
@@ -546,7 +546,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
SetSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
@@ -556,20 +556,20 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
}
/*
- * Next remove state for tables we should not care about anymore using
- * the data we collected above
+ * Next remove state for tables we should not care about anymore using the
+ * data we collected above
*/
qsort(pubrel_local_oids, list_length(pubrel_names),
sizeof(Oid), oid_cmp);
for (off = 0; off < list_length(subrel_states); off++)
{
- Oid relid = subrel_local_oids[off];
+ Oid relid = subrel_local_oids[off];
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
{
- char *namespace;
+ char *namespace;
RemoveSubscriptionRel(sub->oid, relid);
@@ -596,7 +596,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
HeapTuple tup;
Oid subid;
bool update_tuple = false;
- Subscription *sub;
+ Subscription *sub;
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
@@ -644,7 +644,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
if (slotname)
values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ DirectFunctionCall1(namein, CStringGetDatum(slotname));
else
nulls[Anum_pg_subscription_subslotname - 1] = true;
replaces[Anum_pg_subscription_subslotname - 1] = true;
@@ -663,8 +663,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_ENABLED:
{
- bool enabled,
- enabled_given;
+ bool enabled,
+ enabled_given;
parse_subscription_options(stmt->options, NULL,
&enabled_given, &enabled, NULL,
@@ -702,14 +702,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_PUBLICATION:
case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH:
{
- bool copy_data;
+ bool copy_data;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, NULL, NULL, &copy_data,
NULL);
values[Anum_pg_subscription_subpublications - 1] =
- publicationListToArray(stmt->publication);
+ publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
update_tuple = true;
@@ -733,7 +733,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_REFRESH:
{
- bool copy_data;
+ bool copy_data;
if (!sub->enabled)
ereport(ERROR,
@@ -791,14 +791,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char *slotname;
char originname[NAMEDATALEN];
char *err = NULL;
- RepOriginId originid;
- WalReceiverConn *wrconn = NULL;
- StringInfoData cmd;
+ RepOriginId originid;
+ WalReceiverConn *wrconn = NULL;
+ StringInfoData cmd;
/*
- * Lock pg_subscription with AccessExclusiveLock to ensure
- * that the launcher doesn't restart new worker during dropping
- * the subscription
+ * Lock pg_subscription with AccessExclusiveLock to ensure that the
+ * launcher doesn't restart new worker during dropping the subscription
*/
rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
@@ -833,8 +832,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
/*
- * Lock the subscription so nobody else can do anything with it
- * (including the replication workers).
+ * Lock the subscription so nobody else can do anything with it (including
+ * the replication workers).
*/
LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
@@ -895,7 +894,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (originid != InvalidRepOriginId)
replorigin_drop(originid);
- /* If there is no slot associated with the subscription, we can finish here. */
+ /*
+ * If there is no slot associated with the subscription, we can finish
+ * here.
+ */
if (!slotname)
{
heap_close(rel, NoLock);
@@ -903,8 +905,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
/*
- * Otherwise drop the replication slot at the publisher node using
- * the replication connection.
+ * Otherwise drop the replication slot at the publisher node using the
+ * replication connection.
*/
load_file("libpqwalreceiver", false);
@@ -922,14 +924,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
PG_TRY();
{
- WalRcvExecResult *res;
+ WalRcvExecResult *res;
+
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
- (errmsg("could not drop the replication slot \"%s\" on publisher",
- slotname),
- errdetail("The error was: %s", res->err)));
+ (errmsg("could not drop the replication slot \"%s\" on publisher",
+ slotname),
+ errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
@@ -973,9 +976,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
if (!superuser_arg(newOwnerId))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied to change owner of subscription \"%s\"",
- NameStr(form->subname)),
- errhint("The owner of a subscription must be a superuser.")));
+ errmsg("permission denied to change owner of subscription \"%s\"",
+ NameStr(form->subname)),
+ errhint("The owner of a subscription must be a superuser.")));
form->subowner = newOwnerId;
CatalogTupleUpdate(rel, &tup->t_self, tup);
@@ -1055,24 +1058,24 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List *
fetch_table_list(WalReceiverConn *wrconn, List *publications)
{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[2] = {TEXTOID, TEXTOID};
- ListCell *lc;
- bool first;
- List *tablelist = NIL;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
Assert(list_length(publications) > 0);
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
- " FROM pg_catalog.pg_publication_tables t\n"
- " WHERE t.pubname IN (");
+ " FROM pg_catalog.pg_publication_tables t\n"
+ " WHERE t.pubname IN (");
first = true;
- foreach (lc, publications)
+ foreach(lc, publications)
{
- char *pubname = strVal(lfirst(lc));
+ char *pubname = strVal(lfirst(lc));
if (first)
first = false;