diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/origin.c | 62 |
1 files changed, 33 insertions, 29 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index b754c43840f..2c04c8707dc 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -338,16 +338,14 @@ replorigin_create(const char *roname) * Helper function to drop a replication origin. */ static void -replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait) +replorigin_state_clear(RepOriginId roident, bool nowait) { - HeapTuple tuple; int i; /* - * First, clean up the slot state info, if there is any matching slot. + * Clean up the slot state info, if there is any matching slot. */ restart: - tuple = NULL; LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); for (i = 0; i < max_replication_slots; i++) @@ -402,19 +400,6 @@ restart: } LWLockRelease(ReplicationOriginLock); ConditionVariableCancelSleep(); - - /* - * Now, we can delete the catalog entry. - */ - tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for replication origin with ID %d", - roident); - - CatalogTupleDelete(rel, &tuple->t_self); - ReleaseSysCache(tuple); - - CommandCounterIncrement(); } /* @@ -427,24 +412,43 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) { RepOriginId roident; Relation rel; + HeapTuple tuple; Assert(IsTransactionState()); - /* - * To interlock against concurrent drops, we hold ExclusiveLock on - * pg_replication_origin till xact commit. - * - * XXX We can optimize this by acquiring the lock on a specific origin by - * using LockSharedObject if required. However, for that, we first to - * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock - * the specific origin and then re-check if the origin still exists. - */ - rel = table_open(ReplicationOriginRelationId, ExclusiveLock); + rel = table_open(ReplicationOriginRelationId, RowExclusiveLock); roident = replorigin_by_name(name, missing_ok); - if (OidIsValid(roident)) - replorigin_drop_guts(rel, roident, nowait); + /* Lock the origin to prevent concurrent drops. */ + LockSharedObject(ReplicationOriginRelationId, roident, 0, + AccessExclusiveLock); + + tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); + if (!HeapTupleIsValid(tuple)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for replication origin with ID %d", + roident); + + /* + * We don't need to retain the locks if the origin is already dropped. + */ + UnlockSharedObject(ReplicationOriginRelationId, roident, 0, + AccessExclusiveLock); + table_close(rel, RowExclusiveLock); + return; + } + + replorigin_state_clear(roident, nowait); + + /* + * Now, we can delete the catalog entry. + */ + CatalogTupleDelete(rel, &tuple->t_self); + ReleaseSysCache(tuple); + + CommandCounterIncrement(); /* We keep the lock on pg_replication_origin until commit */ table_close(rel, NoLock); |