aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c467
1 files changed, 369 insertions, 98 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5ccbc9dd50f..5cf874e0b46 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
@@ -46,6 +47,8 @@
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
Oid *pubrel_local_oids;
ListCell *lc;
int off;
+ int remove_rel_len;
+ Relation rel = NULL;
+ typedef struct SubRemoveRels
+ {
+ Oid relid;
+ char state;
+ } SubRemoveRels;
+ SubRemoveRels *sub_remove_rels;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
- wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
- if (!wrconn)
- ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
-
- /* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ PG_TRY();
+ {
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
- /* We are done with the remote side, close connection. */
- walrcv_disconnect(wrconn);
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
- /* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid);
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
- /*
- * Build qsorted array of local table oids for faster lookup. This can
- * potentially contain all tables in the database so speed of lookup is
- * important.
- */
- subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
- off = 0;
- foreach(lc, subrel_states)
- {
- SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
- subrel_local_oids[off++] = relstate->relid;
- }
- qsort(subrel_local_oids, list_length(subrel_states),
- sizeof(Oid), oid_cmp);
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
- /*
- * Walk over the remote tables and try to match them to locally known
- * tables. If the table is not known locally create a new state for it.
- *
- * Also builds array of local oids of remote tables for the next step.
- */
- off = 0;
- pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
- foreach(lc, pubrel_names)
- {
- RangeVar *rv = (RangeVar *) lfirst(lc);
- Oid relid;
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
- relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ pubrel_local_oids[off++] = relid;
- /* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
- pubrel_local_oids[off++] = relid;
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
- if (!bsearch(&relid, subrel_local_oids,
- list_length(subrel_states), sizeof(Oid), oid_cmp))
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
{
- AddSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
- }
- }
+ Oid relid = subrel_local_oids[off];
- /*
- * Next remove state for tables we should not care about anymore using the
- * data we collected above
- */
- qsort(pubrel_local_oids, list_length(pubrel_names),
- sizeof(Oid), oid_cmp);
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
- for (off = 0; off < list_length(subrel_states); off++)
- {
- Oid relid = subrel_local_oids[off];
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
- if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
{
- RemoveSubscriptionRel(sub->oid, relid);
-
- logicalrep_worker_stop_at_commit(sub->oid, relid);
-
- ereport(DEBUG1,
- (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name)));
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
}
}
+ PG_FINALLY();
+ {
+ if (wrconn)
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ if (rel)
+ table_close(rel, NoLock);
}
/*
* Alter the existing subscription.
*/
ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt)
+AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
@@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+
AlterSubscription_refresh(sub, copy_data);
break;
@@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char originname[NAMEDATALEN];
char *err = NULL;
WalReceiverConn *wrconn = NULL;
- StringInfoData cmd;
Form_pg_subscription form;
+ List *rstates;
/*
* Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
list_free(subworkers);
+ /*
+ * Cleanup of tablesync replication origins.
+ *
+ * Any READY-state relations would already have dealt with clean-ups.
+ *
+ * Note that the state can't change because we have already stopped both
+ * the apply and tablesync workers and they can't restart because of
+ * exclusive lock on the subscription.
+ */
+ rstates = GetSubscriptionNotReadyRelations(subid);
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for tablesync
+ * worker so passing missing_ok = true. This can happen for the states
+ * before SUBREL_STATE_FINISHEDCOPY.
+ */
+ ReplicationOriginNameForTablesync(subid, relid, originname);
+ replorigin_drop_by_name(originname, true, false);
+ }
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
@@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!slotname && rstates == NIL)
{
table_close(rel, NoLock);
return;
}
/*
- * Otherwise drop the replication slot at the publisher node using the
- * replication connection.
+ * Try to acquire the connection necessary for dropping slots.
+ *
+ * Note: If the slotname is NONE/NULL then we allow the command to finish
+ * and users need to manually cleanup the apply and tablesync worker slots
+ * later.
+ *
+ * This has to be at the end because otherwise if there is an error while
+ * doing the database operations we won't be able to rollback dropped
+ * slot.
*/
load_file("libpqwalreceiver", false);
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
-
wrconn = walrcv_connect(conninfo, true, subname, &err);
if (wrconn == NULL)
- ereport(ERROR,
- (errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
- errdetail("The error was: %s", err),
- /* translator: %s is an SQL ALTER command */
- errhint("Use %s to disassociate the subscription from the slot.",
- "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+ {
+ if (!slotname)
+ {
+ /* be tidy */
+ list_free(rstates);
+ table_close(rel, NoLock);
+ return;
+ }
+ else
+ {
+ ReportSlotConnectionError(rstates, subid, slotname, err);
+ }
+ }
+
+ PG_TRY();
+ {
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Drop the tablesync slots associated with removed tables.
+ *
+ * For SYNCDONE/READY states, the tablesync slot is known to have
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot does
+ * not exist yet. Also, if we fail after removing some of the
+ * slots, next time, it will again try to drop already dropped
+ * slots and fail. For these reasons, we allow missing_ok = true
+ * for the drop.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
+ list_free(rstates);
+
+ /*
+ * If there is a slot associated with the subscription, then drop the
+ * replication slot at the publisher.
+ */
+ if (slotname)
+ ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ table_close(rel, NoLock);
+}
+
+/*
+ * Drop the replication slot at the publisher node using the replication
+ * connection.
+ *
+ * missing_ok - if true then only issue a LOG message if the slot doesn't
+ * exist.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+ StringInfoData cmd;
+
+ Assert(wrconn);
+
+ load_file("libpqwalreceiver", false);
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
PG_TRY();
{
@@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
+ if (res->status == WALRCV_OK_COMMAND)
+ {
+ /* NOTICE. Success. */
+ ereport(NOTICE,
+ (errmsg("dropped replication slot \"%s\" on publisher",
+ slotname)));
+ }
+ else if (res->status == WALRCV_ERROR &&
+ missing_ok &&
+ res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
+ {
+ /* LOG. Error, but missing_ok = true. */
+ ereport(LOG,
(errmsg("could not drop the replication slot \"%s\" on publisher",
slotname),
errdetail("The error was: %s", res->err)));
+ }
else
- ereport(NOTICE,
- (errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ {
+ /* ERROR. */
+ ereport(ERROR,
+ (errmsg("could not drop the replication slot \"%s\" on publisher",
+ slotname),
+ errdetail("The error was: %s", res->err)));
+ }
walrcv_clear_result(res);
}
PG_FINALLY();
{
- walrcv_disconnect(wrconn);
+ pfree(cmd.data);
}
PG_END_TRY();
-
- pfree(cmd.data);
-
- table_close(rel, NoLock);
}
/*
@@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+
+/*
+ * This is to report the connection failure while dropping replication slots.
+ * Here, we report the WARNING for all tablesync slots so that user can drop
+ * them manually, if required.
+ */
+static void
+ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
+{
+ ListCell *lc;
+
+ foreach(lc, rstates)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ Oid relid = rstate->relid;
+
+ /* Only cleanup resources of tablesync workers */
+ if (!OidIsValid(relid))
+ continue;
+
+ /*
+ * Caller needs to ensure that relstate doesn't change underneath us.
+ * See DropSubscription where we get the relstates.
+ */
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+ elog(WARNING, "could not drop tablesync replication slot \"%s\"",
+ syncslotname);
+ }
+ }
+
+ ereport(ERROR,
+ (errmsg("could not connect to publisher when attempting to "
+ "drop the replication slot \"%s\"", slotname),
+ errdetail("The error was: %s", err),
+ /* translator: %s is an SQL ALTER command */
+ errhint("Use %s to disassociate the subscription from the slot.",
+ "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+}