From 5d43fa00e410adad22be6a5d0d0a159b15f85e83 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 27 Dec 2023 08:25:13 -0800 Subject: [PATCH] [SSA] improve lifetime management of subchannel map (#35379) Currently, each subchannel wrapper stores a ref to the policy and its key in the policy's subchannel map, and it looks up its entry in the map whenever it needs to modify that entry. There's some complexity due to the need to avoid deadlocks in the case where we remove the last strong ref to a subchannel wrapper from a map entry. This approach has a number of problems: - The subchannel wrapper is dropping its key when it gets orphaned, meaning that it will *never* actually remove itself from the map entry when it is destroyed, which is not what we want. (This isn't actually causing a bug, but it does mean that we'll never delete the subchannel wrapper, even when it is really unused.) - Having the subchannel wrapper look up its key in the map every time it needs to modfy its entry is fairly inefficient, especially if there are a large number of endpoints. - There is a race condition that was accidentally introduced in #34472. The subchannel wrapper's key is being modified when the subchannel wrapper is orphaned, but that PR changed the picker to read the same value without any synchronization between the two, and we didn't notice the bug or catch it in any tests. - The code is fairly hard to understand, with a bunch of special cases that are not obvious to the reader. This PR addresses those problems by making the entries in the subchannel map be ref-counted, where a ref is held both by the map and by each subchannel wrapper. Specific changes: - Because the wrapper holds a ref directly to the map entry, there is no longer any need for a map lookup every time the subchannel wrapper needs to access its map entry. - We now avoid deadlocks by waiting until after we've released the lock to drop refs to subchannel wrappers, so there is no more need to modify the internal state of a subchannel wrapper. - We now remove subchannel wrappers from the map entry when they are orphaned, so there is no longer any need to hold a weak ref in the map entry; instead, we now just use a raw pointer. - The connectivity state is now stored in the map entry instead of in each individual subchannel wrapper. And we no longer need to use an atomic for it, since we are always holding the lock when it is accessed. - All state guarded by the mutex (other than the subchannel map itself) is now in the subchannel entry, and I have added lock annotations so that the compiler can enforce the lock semantics. This PR paves the way for subsequent work that will make SSA work across priorities (see in-progress [gRFC A75](https://github.com/grpc/proposal/pull/405)), where we will need to generalize the behavior such that we hold strong refs to subchannels in any state (not just DRAINING) when the child policy is not holding its own refs. Closes #35379 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35379 from markdroth:xds_ssa_tsan_fix 4927e04eb16cd6c4e8534999ff8e3602a26bc45b PiperOrigin-RevId: 594015497 --- .../lb_policy/xds/xds_override_host.cc | 302 ++++++++++-------- 1 file changed, 166 insertions(+), 136 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index 4ab446edc69..361d27298b6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -112,6 +111,7 @@ XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) { // // xds_override_host LB policy // + class XdsOverrideHostLb : public LoadBalancingPolicy { public: explicit XdsOverrideHostLb(Args args); @@ -125,12 +125,18 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: + class SubchannelEntry; + class SubchannelWrapper : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel, RefCountedPtr policy); - ~SubchannelWrapper() override; + // Called immediately after construction. We use two-phase initialization + // to avoid doing an allocation while holding the lock. + void set_subchannel_entry(RefCountedPtr subchannel_entry) { + subchannel_entry_ = std::move(subchannel_entry); + } void WatchConnectivityState( std::unique_ptr watcher) override; @@ -138,14 +144,12 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) override; - grpc_connectivity_state connectivity_state() { - return connectivity_state_.load(); + RefCountedStringValue address_list() const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + return subchannel_entry_->address_list(); } - XdsOverrideHostLb* policy() { return policy_.get(); } - - void set_key(absl::string_view key) { key_ = std::string(key); } - const absl::optional& key() const { return key_; } + XdsOverrideHostLb* policy() const { return policy_.get(); } private: class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface { @@ -155,9 +159,13 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { : subchannel_(std::move(subchannel)) {} void OnConnectivityStateChange(grpc_connectivity_state state, - absl::Status status) override; + absl::Status status) override { + subchannel_->UpdateConnectivityState(state, status); + } - grpc_pollset_set* interested_parties() override; + grpc_pollset_set* interested_parties() override { + return subchannel_->policy()->interested_parties(); + } private: WeakRefCountedPtr subchannel_; @@ -168,68 +176,114 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void UpdateConnectivityState(grpc_connectivity_state state, absl::Status status); - ConnectivityStateWatcher* watcher_; - absl::optional key_; RefCountedPtr policy_; + RefCountedPtr subchannel_entry_; + ConnectivityStateWatcher* watcher_; std::set, PtrLessThan> watchers_; - std::atomic connectivity_state_ = { - GRPC_CHANNEL_IDLE}; }; - class SubchannelEntry { + class SubchannelEntry : public RefCounted { public: + using SubchannelPtr = + absl::variant>; + explicit SubchannelEntry(XdsHealthStatus eds_health_status) : eds_health_status_(eds_health_status) {} - void SetSubchannel(SubchannelWrapper* subchannel) { + RefCountedPtr TakeSubchannelRef() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + return MatchMutable( + &subchannel_, + [](SubchannelWrapper**) -> RefCountedPtr { + return nullptr; + }, + [](RefCountedPtr* subchannel) { + return std::move(*subchannel); + }); + } + + grpc_connectivity_state connectivity_state() const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + return connectivity_state_; + } + void set_connectivity_state(grpc_connectivity_state state) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + connectivity_state_ = state; + } + + void SetSubchannel(SubchannelWrapper* subchannel) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { if (eds_health_status_.status() == XdsHealthStatus::kDraining) { subchannel_ = subchannel->RefAsSubclass(); } else { - subchannel_ = subchannel->WeakRefAsSubclass(); + subchannel_ = subchannel; } } - void UnsetSubchannel() { - subchannel_ = WeakRefCountedPtr(nullptr); + void UnsetSubchannel(SubchannelWrapper* wrapper) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + if (GetSubchannel() == wrapper) subchannel_ = nullptr; } - SubchannelWrapper* GetSubchannel() const { + SubchannelWrapper* GetSubchannel() const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { return Match( - subchannel_, - [](WeakRefCountedPtr - subchannel) { return subchannel.get(); }, - [](RefCountedPtr subchannel) { + subchannel_, [](SubchannelWrapper* subchannel) { return subchannel; }, + [](const RefCountedPtr& subchannel) { return subchannel.get(); }); } - void SetEdsHealthStatus(XdsHealthStatus eds_health_status) { + // Returns the previously held strong ref, if any, which the caller + // will need to release after releasing the lock, because if this is + // the last strong ref, we need to avoid deadlock caused by + // SubchannelWrapper::Orphan() re-acquiring the lock. + RefCountedPtr SetEdsHealthStatus( + XdsHealthStatus eds_health_status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + if (eds_health_status_ == eds_health_status) return nullptr; eds_health_status_ = eds_health_status; - auto subchannel = GetSubchannel(); - if (subchannel == nullptr) return; - if (eds_health_status_.status() == XdsHealthStatus::kDraining) { - subchannel_ = subchannel->RefAsSubclass(); - } else { - subchannel_ = subchannel->WeakRefAsSubclass(); + // TODO(roth): Change this to use the gprpp MatchMutable() function + // once we can do that without breaking lock annotations. + auto* raw_ptr = absl::get_if(&subchannel_); + if (raw_ptr != nullptr) { + if (eds_health_status_.status() == XdsHealthStatus::kDraining && + *raw_ptr != nullptr) { + subchannel_ = + (*raw_ptr)->RefIfNonZero().TakeAsSubclass(); + } + return nullptr; } + auto strong_ref = + std::move(absl::get>(subchannel_)); + subchannel_ = strong_ref.get(); + return strong_ref; } - XdsHealthStatus eds_health_status() const { return eds_health_status_; } + XdsHealthStatus eds_health_status() const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + return eds_health_status_; + } - void set_address_list(RefCountedStringValue address_list) { + void set_address_list(RefCountedStringValue address_list) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { address_list_ = std::move(address_list); } - RefCountedStringValue address_list() const { return address_list_; } + RefCountedStringValue address_list() const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) { + return address_list_; + } private: - absl::variant, - RefCountedPtr> - subchannel_; - XdsHealthStatus eds_health_status_; - RefCountedStringValue address_list_; + grpc_connectivity_state connectivity_state_ + ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) = GRPC_CHANNEL_IDLE; + SubchannelPtr subchannel_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_); + XdsHealthStatus eds_health_status_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_); + RefCountedStringValue address_list_ + ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_); }; // A picker that wraps the picker from the child for cases when cookie is @@ -306,13 +360,6 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { const grpc_resolved_address& address, RefCountedPtr subchannel); - void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); - - void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key) - ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the - // WorkSerializer and does not require - // additional synchronization - // Current config from the resolver. RefCountedPtr config_; @@ -325,9 +372,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; absl::Status status_; RefCountedPtr picker_; - Mutex subchannel_map_mu_; - std::map> subchannel_map_ - ABSL_GUARDED_BY(subchannel_map_mu_); + Mutex mu_; + std::map, std::less<>> + subchannel_map_ ABSL_GUARDED_BY(mu_); }; // @@ -357,14 +404,15 @@ XdsOverrideHostLb::Picker::PickOverridenHost( RefCountedPtr idle_subchannel; bool found_connecting = false; { - MutexLock lock(&policy_->subchannel_map_mu_); + MutexLock lock(&policy_->mu_); for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) { RefCountedPtr subchannel; auto it = policy_->subchannel_map_.find(address); if (it != policy_->subchannel_map_.end()) { - subchannel = it->second.GetSubchannel() - ->RefIfNonZero() - .TakeAsSubclass(); + auto* sc = it->second->GetSubchannel(); + if (sc != nullptr) { + subchannel = sc->RefIfNonZero().TakeAsSubclass(); + } } if (subchannel == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { @@ -374,16 +422,16 @@ XdsOverrideHostLb::Picker::PickOverridenHost( continue; } if (!override_host_health_status_set_.Contains( - it->second.eds_health_status())) { + it->second->eds_health_status())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "Subchannel %s health status is not overridden (%s)", std::string(address).c_str(), - it->second.eds_health_status().ToString()); + it->second->eds_health_status().ToString()); } continue; } - auto connectivity_state = subchannel->connectivity_state(); + auto connectivity_state = it->second->connectivity_state(); if (connectivity_state == GRPC_CHANNEL_READY) { // Found a READY subchannel. Pass back the actual address list // and return the subchannel. @@ -391,7 +439,7 @@ XdsOverrideHostLb::Picker::PickOverridenHost( gpr_log(GPR_INFO, "Picker override found READY subchannel %s", std::string(address).c_str()); } - override_host_attr->set_actual_address_list(it->second.address_list()); + override_host_attr->set_actual_address_list(it->second->address_list()); return PickResult::Complete(subchannel->wrapped_subchannel()); } else if (connectivity_state == GRPC_CHANNEL_IDLE) { if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel); @@ -445,15 +493,8 @@ LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) { // Populate the address list in the override host attribute so that // the StatefulSession filter can set the cookie. if (override_host_attr != nullptr) { - auto& key = wrapper->key(); - if (key.has_value()) { - MutexLock lock(&policy_->subchannel_map_mu_); - auto it = policy_->subchannel_map_.find(*key); - if (it != policy_->subchannel_map_.end()) { // Should always be true. - override_host_attr->set_actual_address_list( - it->second.address_list()); - } - } + MutexLock lock(&wrapper->policy()->mu_); + override_host_attr->set_actual_address_list(wrapper->address_list()); } // Unwrap the subchannel. complete_pick->subchannel = wrapper->wrapped_subchannel(); @@ -486,7 +527,16 @@ void XdsOverrideHostLb::ShutdownLocked() { } shutting_down_ = true; { - MutexLock lock(&subchannel_map_mu_); + // Drop subchannel refs after releasing the lock to avoid deadlock. + std::vector subchannel_refs_to_drop; + MutexLock lock(&mu_); + subchannel_refs_to_drop.reserve(subchannel_map_.size()); + for (auto& p : subchannel_map_) { + auto subchannel = p.second->TakeSubchannelRef(); + if (subchannel != nullptr) { + subchannel_refs_to_drop.push_back(std::move(subchannel)); + } + } subchannel_map_.clear(); } // Remove the child policy's interested_parties pollset_set from the @@ -665,13 +715,19 @@ void XdsOverrideHostLb::UpdateAddressMap( }); // Now grab the lock and update subchannel_map_ from addresses_for_map. { - MutexLock lock(&subchannel_map_mu_); + // Drop subchannel refs after releasing the lock to avoid deadlock. + std::vector subchannel_refs_to_drop; + MutexLock lock(&mu_); for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { if (addresses_for_map.find(it->first) == addresses_for_map.end()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] removing map key %s", this, it->first.c_str()); } + auto subchannel = it->second->TakeSubchannelRef(); + if (subchannel != nullptr) { + subchannel_refs_to_drop.push_back(std::move(subchannel)); + } it = subchannel_map_.erase(it); } else { ++it; @@ -687,9 +743,8 @@ void XdsOverrideHostLb::UpdateAddressMap( address.c_str()); } it = subchannel_map_ - .emplace(std::piecewise_construct, - std::forward_as_tuple(address), - std::forward_as_tuple(address_info.eds_health_status)) + .emplace(address, MakeRefCounted( + address_info.eds_health_status)) .first; } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { @@ -699,14 +754,18 @@ void XdsOverrideHostLb::UpdateAddressMap( this, address.c_str(), address_info.eds_health_status.ToString()); } - it->second.SetEdsHealthStatus(address_info.eds_health_status); + auto subchannel_ref = + it->second->SetEdsHealthStatus(address_info.eds_health_status); + if (subchannel_ref != nullptr) { + subchannel_refs_to_drop.push_back(std::move(subchannel_ref)); + } } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] setting address list for %s to %s", this, address.c_str(), address_info.address_list.c_str()); } - it->second.set_address_list(std::move(address_info.address_list)); + it->second->set_address_list(std::move(address_info.address_list)); } } } @@ -715,42 +774,20 @@ RefCountedPtr XdsOverrideHostLb::AdoptSubchannel( const grpc_resolved_address& address, RefCountedPtr subchannel) { - auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false); auto wrapper = MakeRefCounted( std::move(subchannel), RefAsSubclass()); + auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false); if (key.ok()) { - MutexLock lock(&subchannel_map_mu_); + MutexLock lock(&mu_); auto it = subchannel_map_.find(*key); if (it != subchannel_map_.end()) { - wrapper->set_key(*key); - it->second.SetSubchannel(wrapper.get()); + wrapper->set_subchannel_entry(it->second); + it->second->SetSubchannel(wrapper.get()); } } return wrapper; } -void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key, - SubchannelWrapper* subchannel) { - MutexLock lock(&subchannel_map_mu_); - auto it = subchannel_map_.find(key); - if (it != subchannel_map_.end()) { - if (subchannel == it->second.GetSubchannel()) { - it->second.UnsetSubchannel(); - } - } -} - -void XdsOverrideHostLb::OnSubchannelConnectivityStateChange( - absl::string_view subchannel_key) { - auto it = subchannel_map_.find(subchannel_key); - if (it == subchannel_map_.end()) { - return; - } - if (it->second.eds_health_status().status() == XdsHealthStatus::kDraining) { - MaybeUpdatePickerLocked(); - } -} - // // XdsOverrideHostLb::Helper // @@ -776,7 +813,7 @@ void XdsOverrideHostLb::Helper::UpdateState( } // -// XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper +// XdsOverrideHostLb::SubchannelWrapper // XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper( @@ -789,12 +826,6 @@ XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper( wrapped_subchannel()->WatchConnectivityState(std::move(watcher)); } -XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() { - if (key_.has_value()) { - policy_->UnsetSubchannel(*key_, this); - } -} - void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState( std::unique_ptr watcher) { watchers_.insert(std::move(watcher)); @@ -808,9 +839,33 @@ void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch( } } +void XdsOverrideHostLb::SubchannelWrapper::Orphan() { + if (subchannel_entry_ != nullptr) { + MutexLock lock(&policy()->mu_); + subchannel_entry_->UnsetSubchannel(this); + } + if (!IsWorkSerializerDispatchEnabled()) { + wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); + return; + } + policy()->work_serializer()->Run( + [self = WeakRefAsSubclass()]() { + self->wrapped_subchannel()->CancelConnectivityStateWatch( + self->watcher_); + }, + DEBUG_LOCATION); +} + void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState( grpc_connectivity_state state, absl::Status status) { - connectivity_state_.store(state); + bool update_picker = false; + if (subchannel_entry_ != nullptr) { + MutexLock lock(&policy()->mu_); + subchannel_entry_->set_connectivity_state(state); + update_picker = subchannel_entry_->GetSubchannel() == this && + subchannel_entry_->eds_health_status().status() == + XdsHealthStatus::kDraining; + } // Sending connectivity state notifications to the watchers may cause the set // of watchers to change, so we can't be iterating over the set of watchers // while we send the notifications @@ -824,40 +879,13 @@ void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState( watcher->OnConnectivityStateChange(state, status); } } - if (key_.has_value()) { - policy_->OnSubchannelConnectivityStateChange(*key_); - } -} - -void XdsOverrideHostLb::SubchannelWrapper::Orphan() { - if (!IsWorkSerializerDispatchEnabled()) { - key_.reset(); - wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); - return; - } - policy_->work_serializer()->Run( - [self = WeakRefAsSubclass()]() { - self->key_.reset(); - self->wrapped_subchannel()->CancelConnectivityStateWatch( - self->watcher_); - }, - DEBUG_LOCATION); -} - -grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper:: - ConnectivityStateWatcher::interested_parties() { - return subchannel_->policy_->interested_parties(); -} - -void XdsOverrideHostLb::SubchannelWrapper::ConnectivityStateWatcher:: - OnConnectivityStateChange(grpc_connectivity_state state, - absl::Status status) { - subchannel_->UpdateConnectivityState(state, status); + if (update_picker) policy()->MaybeUpdatePickerLocked(); } // // factory // + class XdsOverrideHostLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( @@ -884,7 +912,9 @@ void RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder* builder) { std::make_unique()); } +// // XdsOverrideHostLbConfig +// const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader( const JsonArgs&) {