From ee900c0e39dfaa003ff6ef8829a162ca9ef3d28a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 23 Aug 2022 22:48:36 -0700 Subject: [PATCH] ring_hash: fix subchannel list to not shutdown until picker is destroyed (#30714) * ring_hash: fix subchannel list to not shutdown until picker is destroyed * hop into WorkSerializer before unreffing subchannels * use a weak ref for subchannel connectivity state watches * Automated change: Fix sanity tests * fix memory leak * clang-format * fix circular reference problem by moving ring into subchannel list * Automated change: Fix sanity tests Co-authored-by: markdroth --- BUILD | 3 +- .../outlier_detection/outlier_detection.cc | 6 + .../lb_policy/pick_first/pick_first.cc | 6 +- .../lb_policy/ring_hash/ring_hash.cc | 355 +++++++++--------- .../lb_policy/round_robin/round_robin.cc | 6 +- .../lb_policy/subchannel_list.h | 21 +- 6 files changed, 200 insertions(+), 197 deletions(-) diff --git a/BUILD b/BUILD index 6c801fda52d..2f97f97d3ce 100644 --- a/BUILD +++ b/BUILD @@ -4623,12 +4623,12 @@ grpc_cc_library( language = "c++", deps = [ "debug_location", + "dual_ref_counted", "gpr", "grpc_base", "grpc_codegen", "iomgr_fwd", "lb_policy", - "orphanable", "ref_counted_ptr", "server_address", "subchannel_interface", @@ -4701,7 +4701,6 @@ grpc_cc_library( "lb_policy_factory", "lb_policy_registry", "orphanable", - "ref_counted", "ref_counted_ptr", "server_address", "sockaddr_utils", diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index e4f2ad1486c..7810f2af812 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -649,6 +649,12 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { } } else if (!config_->CountingEnabled()) { // If counting is not enabled, reset state. + if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, + "[outlier_detection_lb %p] counting disabled; disabling " + "ejection for %s (%p)", + this, address_key.c_str(), subchannel_state.get()); + } subchannel_state->DisableEjection(); } current_addresses.emplace(address_key); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 71562f53465..18e501e7d7a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -162,9 +162,9 @@ class PickFirst : public LoadBalancingPolicy { // Lateset update args. UpdateArgs latest_update_args_; // All our subchannels. - OrphanablePtr subchannel_list_; + RefCountedPtr subchannel_list_; // Latest pending subchannel list. - OrphanablePtr latest_pending_subchannel_list_; + RefCountedPtr latest_pending_subchannel_list_; // Selected subchannel in \a subchannel_list_. PickFirstSubchannelData* selected_ = nullptr; // Are we in IDLE state? @@ -227,7 +227,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { "[PF %p] Shutting down previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_ = MakeOrphanable( + latest_pending_subchannel_list_ = MakeRefCounted( this, std::move(addresses), latest_update_args_.args); latest_pending_subchannel_list_->StartWatchingLocked(); // Empty update or no valid subchannels. Put the channel in diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index dd29193d558..ab7b5e18226 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -56,7 +56,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/unique_type_name.h" @@ -160,9 +159,8 @@ class RingHash : public LoadBalancingPolicy { private: ~RingHash() override; - // Forward declarations. + // Forward declaration. class RingHashSubchannelList; - class Ring; // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: @@ -211,32 +209,25 @@ class RingHash : public LoadBalancingPolicy { absl::Status connectivity_status_ ABSL_GUARDED_BY(&mu_); }; - // A list of subchannels. + // A list of subchannels and the ring containing those subchannels. class RingHashSubchannelList : public SubchannelList { public: + struct RingEntry { + uint64_t hash; + RingHashSubchannelData* subchannel; + }; + RingHashSubchannelList(RingHash* policy, ServerAddressList addresses, - const ChannelArgs& args) - : SubchannelList(policy, - (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) - ? "RingHashSubchannelList" - : nullptr), - std::move(addresses), policy->channel_control_helper(), - args), - num_idle_(num_subchannels()), - ring_(MakeRefCounted(policy, Ref(DEBUG_LOCATION, "Ring"))) { - // Need to maintain a ref to the LB policy as long as we maintain - // any references to subchannels, since the subchannels' - // pollset_sets will include the LB policy's pollset_set. - policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); - } + const ChannelArgs& args); ~RingHashSubchannelList() override { - ring_.reset(DEBUG_LOCATION, "~RingHashSubchannelList"); RingHash* p = static_cast(policy()); p->Unref(DEBUG_LOCATION, "subchannel_list"); } + const std::vector& ring() const { return ring_; } + // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. void UpdateStateCountersLocked(grpc_connectivity_state old_state, @@ -261,17 +252,12 @@ class RingHash : public LoadBalancingPolicy { return true; } - void ShutdownLocked() override { - ring_.reset(DEBUG_LOCATION, "RingHashSubchannelList::ShutdownLocked()"); - SubchannelList::ShutdownLocked(); - } - size_t num_idle_; size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; - RefCountedPtr ring_; + std::vector ring_; // The index of the subchannel currently doing an internally // triggered connection attempt, if any. @@ -284,73 +270,83 @@ class RingHash : public LoadBalancingPolicy { absl::Status last_failure_; }; - class Ring : public RefCounted { - public: - struct Entry { - uint64_t hash; - RingHashSubchannelData* subchannel; - }; - - Ring(RingHash* parent, - RefCountedPtr subchannel_list); - - const std::vector& ring() const { return ring_; } - - private: - RefCountedPtr subchannel_list_; - std::vector ring_; - }; - class Picker : public SubchannelPicker { public: - Picker(RefCountedPtr parent, RefCountedPtr ring) - : parent_(std::move(parent)), ring_(std::move(ring)) {} + explicit Picker(RefCountedPtr subchannel_list) + : subchannel_list_(std::move(subchannel_list)) {} + + ~Picker() override { + // Hop into WorkSerializer to unref the subchannel list, since that may + // trigger the unreffing of the underlying subchannels. + MakeOrphanable(std::move(subchannel_list_)); + } PickResult Pick(PickArgs args) override; private: - // A fire-and-forget class that schedules subchannel connection attempts - // on the control plane WorkSerializer. - class SubchannelConnectionAttempter : public Orphanable { + // An interface for running a callback in the control plane WorkSerializer. + class WorkSerializerRunner : public Orphanable { public: - explicit SubchannelConnectionAttempter( - RefCountedPtr ring_hash_lb) - : ring_hash_lb_(std::move(ring_hash_lb)) { + explicit WorkSerializerRunner( + RefCountedPtr subchannel_list) + : subchannel_list_(std::move(subchannel_list)) { GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); } - void AddSubchannel(RefCountedPtr subchannel) { - subchannels_.push_back(std::move(subchannel)); - } - void Orphan() override { // Hop into ExecCtx, so that we're not holding the data plane mutex // while we run control-plane code. ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } + // Will be invoked inside of the WorkSerializer. + virtual void Run() {} + + protected: + RingHash* ring_hash_lb() const { + return static_cast(subchannel_list_->policy()); + } + private: static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); - self->ring_hash_lb_->work_serializer()->Run( + self->ring_hash_lb()->work_serializer()->Run( [self]() { - if (!self->ring_hash_lb_->shutdown_) { - for (auto& subchannel : self->subchannels_) { - subchannel->RequestConnection(); - } - } + self->Run(); delete self; }, DEBUG_LOCATION); } - RefCountedPtr ring_hash_lb_; + RefCountedPtr subchannel_list_; grpc_closure closure_; + }; + + // A fire-and-forget class that schedules subchannel connection attempts + // on the control plane WorkSerializer. + class SubchannelConnectionAttempter : public WorkSerializerRunner { + public: + explicit SubchannelConnectionAttempter( + RefCountedPtr subchannel_list) + : WorkSerializerRunner(std::move(subchannel_list)) {} + + void AddSubchannel(RefCountedPtr subchannel) { + subchannels_.push_back(std::move(subchannel)); + } + + void Run() override { + if (!ring_hash_lb()->shutdown_) { + for (auto& subchannel : subchannels_) { + subchannel->RequestConnection(); + } + } + } + + private: std::vector> subchannels_; }; - RefCountedPtr parent_; - RefCountedPtr ring_; + RefCountedPtr subchannel_list_; }; void ShutdownLocked() override; @@ -359,117 +355,12 @@ class RingHash : public LoadBalancingPolicy { RefCountedPtr config_; // list of subchannels. - OrphanablePtr subchannel_list_; - OrphanablePtr latest_pending_subchannel_list_; + RefCountedPtr subchannel_list_; + RefCountedPtr latest_pending_subchannel_list_; // indicating if we are shutting down. bool shutdown_ = false; }; -// -// RingHash::Ring -// - -RingHash::Ring::Ring(RingHash* parent, - RefCountedPtr subchannel_list) - : subchannel_list_(std::move(subchannel_list)) { - size_t num_subchannels = subchannel_list_->num_subchannels(); - // Store the weights while finding the sum. - struct AddressWeight { - std::string address; - // Default weight is 1 for the cases where a weight is not provided, - // each occurrence of the address will be counted a weight value of 1. - uint32_t weight = 1; - double normalized_weight; - }; - std::vector address_weights; - size_t sum = 0; - address_weights.reserve(num_subchannels); - for (size_t i = 0; i < num_subchannels; ++i) { - RingHashSubchannelData* sd = subchannel_list_->subchannel(i); - const ServerAddressWeightAttribute* weight_attribute = static_cast< - const ServerAddressWeightAttribute*>(sd->address().GetAttribute( - ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); - AddressWeight address_weight; - address_weight.address = - grpc_sockaddr_to_string(&sd->address().address(), false).value(); - // Weight should never be zero, but ignore it just in case, since - // that value would screw up the ring-building algorithm. - if (weight_attribute != nullptr && weight_attribute->weight() > 0) { - address_weight.weight = weight_attribute->weight(); - } - sum += address_weight.weight; - address_weights.push_back(std::move(address_weight)); - } - // Calculating normalized weights and find min and max. - double min_normalized_weight = 1.0; - double max_normalized_weight = 0.0; - for (auto& address : address_weights) { - address.normalized_weight = static_cast(address.weight) / sum; - min_normalized_weight = - std::min(address.normalized_weight, min_normalized_weight); - max_normalized_weight = - std::max(address.normalized_weight, max_normalized_weight); - } - // Scale up the number of hashes per host such that the least-weighted host - // gets a whole number of hashes on the ring. Other hosts might not end up - // with whole numbers, and that's fine (the ring-building algorithm below can - // handle this). This preserves the original implementation's behavior: when - // weights aren't provided, all hosts should get an equal number of hashes. In - // the case where this number exceeds the max_ring_size, it's scaled back down - // to fit. - const size_t min_ring_size = parent->config_->min_ring_size(); - const size_t max_ring_size = parent->config_->max_ring_size(); - const double scale = std::min( - std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, - static_cast(max_ring_size)); - // Reserve memory for the entire ring up front. - const size_t ring_size = std::ceil(scale); - ring_.reserve(ring_size); - // Populate the hash ring by walking through the (host, weight) pairs in - // normalized_host_weights, and generating (scale * weight) hashes for each - // host. Since these aren't necessarily whole numbers, we maintain running - // sums -- current_hashes and target_hashes -- which allows us to populate the - // ring in a mostly stable way. - absl::InlinedVector hash_key_buffer; - double current_hashes = 0.0; - double target_hashes = 0.0; - uint64_t min_hashes_per_host = ring_size; - uint64_t max_hashes_per_host = 0; - for (size_t i = 0; i < num_subchannels; ++i) { - const std::string& address_string = address_weights[i].address; - hash_key_buffer.assign(address_string.begin(), address_string.end()); - hash_key_buffer.emplace_back('_'); - auto offset_start = hash_key_buffer.end(); - target_hashes += scale * address_weights[i].normalized_weight; - size_t count = 0; - while (current_hashes < target_hashes) { - const std::string count_str = absl::StrCat(count); - hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end()); - absl::string_view hash_key(hash_key_buffer.data(), - hash_key_buffer.size()); - const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0); - ring_.push_back({hash, subchannel_list_->subchannel(i)}); - ++count; - ++current_hashes; - hash_key_buffer.erase(offset_start, hash_key_buffer.end()); - } - min_hashes_per_host = - std::min(static_cast(i), min_hashes_per_host); - max_hashes_per_host = - std::max(static_cast(i), max_hashes_per_host); - } - std::sort(ring_.begin(), ring_.end(), - [](const Entry& lhs, const Entry& rhs) -> bool { - return lhs.hash < rhs.hash; - }); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, - "[RH %p picker %p] created ring from subchannel_list=%p " - "with %" PRIuPTR " ring entries", - parent, this, subchannel_list_.get(), ring_.size()); - } -} - // // RingHash::Picker // @@ -483,7 +374,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { return PickResult::Fail( absl::InternalError("ring hash value is not a number")); } - const std::vector& ring = ring_->ring(); + const auto& ring = subchannel_list_->ring(); // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c // (ketama_get_server) NOTE: The algorithm depends on using signed integers // for lowp, highp, and first_index. Do not change them! @@ -516,7 +407,9 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { [&](RefCountedPtr subchannel) { if (subchannel_connection_attempter == nullptr) { subchannel_connection_attempter = - MakeOrphanable(parent_); + MakeOrphanable( + subchannel_list_->Ref(DEBUG_LOCATION, + "SubchannelConnectionAttempter")); } subchannel_connection_attempter->AddSubchannel(std::move(subchannel)); }; @@ -541,7 +434,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { bool found_second_subchannel = false; bool found_first_non_failed = false; for (size_t i = 1; i < ring.size(); ++i) { - const Ring::Entry& entry = ring[(first_index + i) % ring.size()]; + const auto& entry = ring[(first_index + i) % ring.size()]; if (entry.subchannel == ring[first_index].subchannel) { continue; } @@ -585,6 +478,117 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { // RingHash::RingHashSubchannelList // +RingHash::RingHashSubchannelList::RingHashSubchannelList( + RingHash* policy, ServerAddressList addresses, const ChannelArgs& args) + : SubchannelList(policy, + (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) + ? "RingHashSubchannelList" + : nullptr), + std::move(addresses), policy->channel_control_helper(), + args), + num_idle_(num_subchannels()) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + // Construct the ring. + // Store the weights while finding the sum. + struct AddressWeight { + std::string address; + // Default weight is 1 for the cases where a weight is not provided, + // each occurrence of the address will be counted a weight value of 1. + uint32_t weight = 1; + double normalized_weight; + }; + std::vector address_weights; + size_t sum = 0; + address_weights.reserve(num_subchannels()); + for (size_t i = 0; i < num_subchannels(); ++i) { + RingHashSubchannelData* sd = subchannel(i); + const ServerAddressWeightAttribute* weight_attribute = static_cast< + const ServerAddressWeightAttribute*>(sd->address().GetAttribute( + ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); + AddressWeight address_weight; + address_weight.address = + grpc_sockaddr_to_string(&sd->address().address(), false).value(); + // Weight should never be zero, but ignore it just in case, since + // that value would screw up the ring-building algorithm. + if (weight_attribute != nullptr && weight_attribute->weight() > 0) { + address_weight.weight = weight_attribute->weight(); + } + sum += address_weight.weight; + address_weights.push_back(std::move(address_weight)); + } + // Calculating normalized weights and find min and max. + double min_normalized_weight = 1.0; + double max_normalized_weight = 0.0; + for (auto& address : address_weights) { + address.normalized_weight = static_cast(address.weight) / sum; + min_normalized_weight = + std::min(address.normalized_weight, min_normalized_weight); + max_normalized_weight = + std::max(address.normalized_weight, max_normalized_weight); + } + // Scale up the number of hashes per host such that the least-weighted host + // gets a whole number of hashes on the ring. Other hosts might not end up + // with whole numbers, and that's fine (the ring-building algorithm below can + // handle this). This preserves the original implementation's behavior: when + // weights aren't provided, all hosts should get an equal number of hashes. In + // the case where this number exceeds the max_ring_size, it's scaled back down + // to fit. + const size_t min_ring_size = policy->config_->min_ring_size(); + const size_t max_ring_size = policy->config_->max_ring_size(); + const double scale = std::min( + std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, + static_cast(max_ring_size)); + // Reserve memory for the entire ring up front. + const size_t ring_size = std::ceil(scale); + ring_.reserve(ring_size); + // Populate the hash ring by walking through the (host, weight) pairs in + // normalized_host_weights, and generating (scale * weight) hashes for each + // host. Since these aren't necessarily whole numbers, we maintain running + // sums -- current_hashes and target_hashes -- which allows us to populate the + // ring in a mostly stable way. + absl::InlinedVector hash_key_buffer; + double current_hashes = 0.0; + double target_hashes = 0.0; + uint64_t min_hashes_per_host = ring_size; + uint64_t max_hashes_per_host = 0; + for (size_t i = 0; i < num_subchannels(); ++i) { + const std::string& address_string = address_weights[i].address; + hash_key_buffer.assign(address_string.begin(), address_string.end()); + hash_key_buffer.emplace_back('_'); + auto offset_start = hash_key_buffer.end(); + target_hashes += scale * address_weights[i].normalized_weight; + size_t count = 0; + while (current_hashes < target_hashes) { + const std::string count_str = absl::StrCat(count); + hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end()); + absl::string_view hash_key(hash_key_buffer.data(), + hash_key_buffer.size()); + const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0); + ring_.push_back({hash, subchannel(i)}); + ++count; + ++current_hashes; + hash_key_buffer.erase(offset_start, hash_key_buffer.end()); + } + min_hashes_per_host = + std::min(static_cast(i), min_hashes_per_host); + max_hashes_per_host = + std::max(static_cast(i), max_hashes_per_host); + } + std::sort(ring_.begin(), ring_.end(), + [](const RingHashSubchannelList::RingEntry& lhs, + const RingHashSubchannelList::RingEntry& rhs) -> bool { + return lhs.hash < rhs.hash; + }); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries", + policy, this, ring_.size()); + } +} + void RingHash::RingHashSubchannelList::UpdateStateCountersLocked( grpc_connectivity_state old_state, grpc_connectivity_state new_state) { if (old_state == GRPC_CHANNEL_IDLE) { @@ -673,8 +677,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( // Note that we use our own picker regardless of connectivity state. p->channel_control_helper()->UpdateState( state, status, - absl::make_unique(p->Ref(DEBUG_LOCATION, "RingHashPicker"), - ring_)); + absl::make_unique(Ref(DEBUG_LOCATION, "RingHashPicker"))); // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will // not be getting any pick requests from the priority policy. // However, because the ring_hash policy does not attempt to @@ -836,7 +839,7 @@ void RingHash::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_ = MakeOrphanable( + latest_pending_subchannel_list_ = MakeRefCounted( this, std::move(addresses), args.args); latest_pending_subchannel_list_->StartWatchingLocked(); // If we have no existing list or the new list is empty, immediately diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 1be2f06705a..f61e5e4f90f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -182,12 +182,12 @@ class RoundRobin : public LoadBalancingPolicy { void ShutdownLocked() override; // List of subchannels. - OrphanablePtr subchannel_list_; + RefCountedPtr subchannel_list_; // Latest pending subchannel list. // When we get an updated address list, we create a new subchannel list // for it here, and we wait to swap it into subchannel_list_ until the new // list becomes READY. - OrphanablePtr latest_pending_subchannel_list_; + RefCountedPtr latest_pending_subchannel_list_; bool shutdown_ = false; }; @@ -288,7 +288,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_ = MakeOrphanable( + latest_pending_subchannel_list_ = MakeRefCounted( this, std::move(addresses), args.args); latest_pending_subchannel_list_->StartWatchingLocked(); // If the new list is empty, immediately promote it to diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index e9433e0db18..cb977ab4d6d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -35,8 +35,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/load_balancing/lb_policy.h" @@ -131,7 +131,7 @@ class SubchannelData { public: Watcher( SubchannelData* subchannel_data, - RefCountedPtr subchannel_list) + WeakRefCountedPtr subchannel_list) : subchannel_data_(subchannel_data), subchannel_list_(std::move(subchannel_list)) {} @@ -148,7 +148,7 @@ class SubchannelData { private: SubchannelData* subchannel_data_; - RefCountedPtr subchannel_list_; + WeakRefCountedPtr subchannel_list_; }; // Starts watching the connectivity state of the subchannel. @@ -176,7 +176,7 @@ class SubchannelData { // A list of subchannels. template -class SubchannelList : public InternallyRefCounted { +class SubchannelList : public DualRefCounted { public: // Starts watching the connectivity state of all subchannels. // Must be called immediately after instantiation. @@ -200,10 +200,7 @@ class SubchannelList : public InternallyRefCounted { // Resets connection backoff of all subchannels. void ResetBackoffLocked(); - void Orphan() override { - ShutdownLocked(); - InternallyRefCounted::Unref(DEBUG_LOCATION, "shutdown"); - } + void Orphan() override; protected: SubchannelList(LoadBalancingPolicy* policy, const char* tracer, @@ -213,8 +210,6 @@ class SubchannelList : public InternallyRefCounted { virtual ~SubchannelList(); - virtual void ShutdownLocked(); - private: // For accessing Ref() and Unref(). friend class SubchannelData; @@ -327,7 +322,7 @@ void SubchannelDataRef(DEBUG_LOCATION, "Watcher")); + new Watcher(this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( std::unique_ptr( pending_watcher_)); @@ -365,7 +360,7 @@ SubchannelList::SubchannelList( LoadBalancingPolicy* policy, const char* tracer, ServerAddressList addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args) - : InternallyRefCounted(tracer), + : DualRefCounted(tracer), policy_(policy), tracer_(tracer) { if (GPR_UNLIKELY(tracer_ != nullptr)) { @@ -419,7 +414,7 @@ void SubchannelList -void SubchannelList::ShutdownLocked() { +void SubchannelList::Orphan() { if (GPR_UNLIKELY(tracer_ != nullptr)) { gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_, policy_, this);