diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 467 |
1 files changed, 369 insertions, 98 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5ccbc9dd50f..5cf874e0b46 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/worker_internal.h" @@ -46,6 +47,8 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); + /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) Oid *pubrel_local_oids; ListCell *lc; int off; + int remove_rel_len; + Relation rel = NULL; + typedef struct SubRemoveRels + { + Oid relid; + char state; + } SubRemoveRels; + SubRemoveRels *sub_remove_rels; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ - wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); - if (!wrconn) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); - - /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + PG_TRY(); + { + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); - /* We are done with the remote side, close connection. */ - walrcv_disconnect(wrconn); + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid); + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); - /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup is - * important. - */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); - off = 0; - foreach(lc, subrel_states) - { - SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup + * is important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - subrel_local_oids[off++] = relstate->relid; - } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + /* + * Rels that we want to remove from subscription and drop any slots + * and origins corresponding to them. + */ + sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + + /* + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for + * it. + * + * Also builds array of local oids of remote tables for the next step. + */ + off = 0; + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; - /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for it. - * - * Also builds array of local oids of remote tables for the next step. - */ - off = 0; - pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + relid = RangeVarGetRelid(rv, AccessShareLock, false); - foreach(lc, pubrel_names) - { - RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); - relid = RangeVarGetRelid(rv, AccessShareLock, false); + pubrel_local_oids[off++] = relid; - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } - pubrel_local_oids[off++] = relid; + /* + * Next remove state for tables we should not care about anymore using + * the data we collected above + */ + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + remove_rel_len = 0; + for (off = 0; off < list_length(subrel_states); off++) { - AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); - } - } + Oid relid = subrel_local_oids[off]; - /* - * Next remove state for tables we should not care about anymore using the - * data we collected above - */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + char state; + XLogRecPtr statelsn; + + /* + * Lock pg_subscription_rel with AccessExclusiveLock to + * prevent any race conditions with the apply worker + * re-launching workers at the same time this code is trying + * to remove those tables. + * + * Even if new worker for this particular rel is restarted it + * won't be able to make any progress as we hold exclusive + * lock on subscription_rel till the transaction end. It will + * simply exit as there is no corresponding rel entry. + * + * This locking also ensures that the state of rels won't + * change till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + /* Last known rel state. */ + state = GetSubscriptionRelState(sub->oid, relid, &statelsn); + + sub_remove_rels[remove_rel_len].relid = relid; + sub_remove_rels[remove_rel_len++].state = state; + + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop(sub->oid, relid); + + /* + * For READY state, we would have already dropped the + * tablesync origin. + */ + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created for + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The apply worker can also + * concurrently try to drop the origin and by this time + * the origin might be already removed. For these reasons, + * passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(sub->oid, relid, originname); + replorigin_drop_by_name(originname, true, false); + } - for (off = 0; off < list_length(subrel_states); off++) - { - Oid relid = subrel_local_oids[off]; + ereport(DEBUG1, + (errmsg("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + } - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + /* + * Drop the tablesync slots associated with removed tables. This has + * to be at the end because otherwise if there is an error while doing + * the database operations we won't be able to rollback dropped slots. + */ + for (off = 0; off < remove_rel_len; off++) { - RemoveSubscriptionRel(sub->oid, relid); - - logicalrep_worker_stop_at_commit(sub->oid, relid); - - ereport(DEBUG1, - (errmsg("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + if (sub_remove_rels[off].state != SUBREL_STATE_READY && + sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + /* + * For READY/SYNCDONE states we know the tablesync slot has + * already been dropped by the tablesync worker. + * + * For other states, there is no certainty, maybe the slot + * does not exist yet. Also, if we fail after removing some of + * the slots, next time, it will again try to drop already + * dropped slots and fail. For these reasons, we allow + * missing_ok = true for the drop. + */ + ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); + } } } + PG_FINALLY(); + { + if (wrconn) + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + if (rel) + table_close(rel, NoLock); } /* * Alter the existing subscription. */ ObjectAddress -AlterSubscription(AlterSubscriptionStmt *stmt) +AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { Relation rel; ObjectAddress myself; @@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, NULL, /* no "binary" */ NULL, NULL); /* no "streaming" */ + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); + AlterSubscription_refresh(sub, copy_data); break; @@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; char *err = NULL; WalReceiverConn *wrconn = NULL; - StringInfoData cmd; Form_pg_subscription form; + List *rstates; /* * Lock pg_subscription with AccessExclusiveLock to ensure that the @@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + /* + * Cleanup of tablesync replication origins. + * + * Any READY-state relations would already have dealt with clean-ups. + * + * Note that the state can't change because we have already stopped both + * the apply and tablesync workers and they can't restart because of + * exclusive lock on the subscription. + */ + rstates = GetSubscriptionNotReadyRelations(subid); + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created for tablesync + * worker so passing missing_ok = true. This can happen for the states + * before SUBREL_STATE_FINISHEDCOPY. + */ + ReplicationOriginNameForTablesync(subid, relid, originname); + replorigin_drop_by_name(originname, true, false); + } + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); @@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * If there is no slot associated with the subscription, we can finish * here. */ - if (!slotname) + if (!slotname && rstates == NIL) { table_close(rel, NoLock); return; } /* - * Otherwise drop the replication slot at the publisher node using the - * replication connection. + * Try to acquire the connection necessary for dropping slots. + * + * Note: If the slotname is NONE/NULL then we allow the command to finish + * and users need to manually cleanup the apply and tablesync worker slots + * later. + * + * This has to be at the end because otherwise if there is an error while + * doing the database operations we won't be able to rollback dropped + * slot. */ load_file("libpqwalreceiver", false); - initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); - wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err), - /* translator: %s is an SQL ALTER command */ - errhint("Use %s to disassociate the subscription from the slot.", - "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); + { + if (!slotname) + { + /* be tidy */ + list_free(rstates); + table_close(rel, NoLock); + return; + } + else + { + ReportSlotConnectionError(rstates, subid, slotname, err); + } + } + + PG_TRY(); + { + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Drop the tablesync slots associated with removed tables. + * + * For SYNCDONE/READY states, the tablesync slot is known to have + * already been dropped by the tablesync worker. + * + * For other states, there is no certainty, maybe the slot does + * not exist yet. Also, if we fail after removing some of the + * slots, next time, it will again try to drop already dropped + * slots and fail. For these reasons, we allow missing_ok = true + * for the drop. + */ + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + ReplicationSlotNameForTablesync(subid, relid, syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); + } + } + + list_free(rstates); + + /* + * If there is a slot associated with the subscription, then drop the + * replication slot at the publisher. + */ + if (slotname) + ReplicationSlotDropAtPubNode(wrconn, slotname, false); + + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + table_close(rel, NoLock); +} + +/* + * Drop the replication slot at the publisher node using the replication + * connection. + * + * missing_ok - if true then only issue a LOG message if the slot doesn't + * exist. + */ +void +ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok) +{ + StringInfoData cmd; + + Assert(wrconn); + + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); PG_TRY(); { @@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) res = walrcv_exec(wrconn, cmd.data, 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, + if (res->status == WALRCV_OK_COMMAND) + { + /* NOTICE. Success. */ + ereport(NOTICE, + (errmsg("dropped replication slot \"%s\" on publisher", + slotname))); + } + else if (res->status == WALRCV_ERROR && + missing_ok && + res->sqlstate == ERRCODE_UNDEFINED_OBJECT) + { + /* LOG. Error, but missing_ok = true. */ + ereport(LOG, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), errdetail("The error was: %s", res->err))); + } else - ereport(NOTICE, - (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); + { + /* ERROR. */ + ereport(ERROR, + (errmsg("could not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", res->err))); + } walrcv_clear_result(res); } PG_FINALLY(); { - walrcv_disconnect(wrconn); + pfree(cmd.data); } PG_END_TRY(); - - pfree(cmd.data); - - table_close(rel, NoLock); } /* @@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * This is to report the connection failure while dropping replication slots. + * Here, we report the WARNING for all tablesync slots so that user can drop + * them manually, if required. + */ +static void +ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) +{ + ListCell *lc; + + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; + + /* Only cleanup resources of tablesync workers */ + if (!OidIsValid(relid)) + continue; + + /* + * Caller needs to ensure that relstate doesn't change underneath us. + * See DropSubscription where we get the relstates. + */ + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + char syncslotname[NAMEDATALEN] = {0}; + + ReplicationSlotNameForTablesync(subid, relid, syncslotname); + elog(WARNING, "could not drop tablesync replication slot \"%s\"", + syncslotname); + } + } + + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", slotname), + errdetail("The error was: %s", err), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s to disassociate the subscription from the slot.", + "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); +} |