ring_hash: fix subchannel list to not shutdown until picker is destroyed (#30714)

* ring_hash: fix subchannel list to not shutdown until picker is destroyed

* hop into WorkSerializer before unreffing subchannels

* use a weak ref for subchannel connectivity state watches

* Automated change: Fix sanity tests

* fix memory leak

* clang-format

* fix circular reference problem by moving ring into subchannel list

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30726/head
Mark D. Roth 2 years ago committed by GitHub
parent ca7c17c8d4
commit ee900c0e39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 6
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  3. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 355
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  5. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 21
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@ -4623,12 +4623,12 @@ grpc_cc_library(
language = "c++",
deps = [
"debug_location",
"dual_ref_counted",
"gpr",
"grpc_base",
"grpc_codegen",
"iomgr_fwd",
"lb_policy",
"orphanable",
"ref_counted_ptr",
"server_address",
"subchannel_interface",
@ -4701,7 +4701,6 @@ grpc_cc_library(
"lb_policy_factory",
"lb_policy_registry",
"orphanable",
"ref_counted",
"ref_counted_ptr",
"server_address",
"sockaddr_utils",

@ -649,6 +649,12 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
}
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] counting disabled; disabling "
"ejection for %s (%p)",
this, address_key.c_str(), subchannel_state.get());
}
subchannel_state->DisableEjection();
}
current_addresses.emplace(address_key);

@ -162,9 +162,9 @@ class PickFirst : public LoadBalancingPolicy {
// Lateset update args.
UpdateArgs latest_update_args_;
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Are we in IDLE state?
@ -227,7 +227,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
"[PF %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<PickFirstSubchannelList>(
latest_pending_subchannel_list_ = MakeRefCounted<PickFirstSubchannelList>(
this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// Empty update or no valid subchannels. Put the channel in

@ -56,7 +56,6 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
@ -160,9 +159,8 @@ class RingHash : public LoadBalancingPolicy {
private:
~RingHash() override;
// Forward declarations.
// Forward declaration.
class RingHashSubchannelList;
class Ring;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
@ -211,32 +209,25 @@ class RingHash : public LoadBalancingPolicy {
absl::Status connectivity_status_ ABSL_GUARDED_BY(&mu_);
};
// A list of subchannels.
// A list of subchannels and the ring containing those subchannels.
class RingHashSubchannelList
: public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
public:
struct RingEntry {
uint64_t hash;
RingHashSubchannelData* subchannel;
};
RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args),
num_idle_(num_subchannels()),
ring_(MakeRefCounted<Ring>(policy, Ref(DEBUG_LOCATION, "Ring"))) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
}
const ChannelArgs& args);
~RingHashSubchannelList() override {
ring_.reset(DEBUG_LOCATION, "~RingHashSubchannelList");
RingHash* p = static_cast<RingHash*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
const std::vector<RingEntry>& ring() const { return ring_; }
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
@ -261,17 +252,12 @@ class RingHash : public LoadBalancingPolicy {
return true;
}
void ShutdownLocked() override {
ring_.reset(DEBUG_LOCATION, "RingHashSubchannelList::ShutdownLocked()");
SubchannelList::ShutdownLocked();
}
size_t num_idle_;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
RefCountedPtr<Ring> ring_;
std::vector<RingEntry> ring_;
// The index of the subchannel currently doing an internally
// triggered connection attempt, if any.
@ -284,73 +270,83 @@ class RingHash : public LoadBalancingPolicy {
absl::Status last_failure_;
};
class Ring : public RefCounted<Ring> {
public:
struct Entry {
uint64_t hash;
RingHashSubchannelData* subchannel;
};
Ring(RingHash* parent,
RefCountedPtr<RingHashSubchannelList> subchannel_list);
const std::vector<Entry>& ring() const { return ring_; }
private:
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
std::vector<Entry> ring_;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<RingHash> parent, RefCountedPtr<Ring> ring)
: parent_(std::move(parent)), ring_(std::move(ring)) {}
explicit Picker(RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {}
~Picker() override {
// Hop into WorkSerializer to unref the subchannel list, since that may
// trigger the unreffing of the underlying subchannels.
MakeOrphanable<WorkSerializerRunner>(std::move(subchannel_list_));
}
PickResult Pick(PickArgs args) override;
private:
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
// An interface for running a callback in the control plane WorkSerializer.
class WorkSerializerRunner : public Orphanable {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHash> ring_hash_lb)
: ring_hash_lb_(std::move(ring_hash_lb)) {
explicit WorkSerializerRunner(
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Orphan() override {
// Hop into ExecCtx, so that we're not holding the data plane mutex
// while we run control-plane code.
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
// Will be invoked inside of the WorkSerializer.
virtual void Run() {}
protected:
RingHash* ring_hash_lb() const {
return static_cast<RingHash*>(subchannel_list_->policy());
}
private:
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb_->work_serializer()->Run(
self->ring_hash_lb()->work_serializer()->Run(
[self]() {
if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) {
subchannel->RequestConnection();
}
}
self->Run();
delete self;
},
DEBUG_LOCATION);
}
RefCountedPtr<RingHash> ring_hash_lb_;
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
grpc_closure closure_;
};
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public WorkSerializerRunner {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: WorkSerializerRunner(std::move(subchannel_list)) {}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Run() override {
if (!ring_hash_lb()->shutdown_) {
for (auto& subchannel : subchannels_) {
subchannel->RequestConnection();
}
}
}
private:
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
};
RefCountedPtr<RingHash> parent_;
RefCountedPtr<Ring> ring_;
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
};
void ShutdownLocked() override;
@ -359,117 +355,12 @@ class RingHash : public LoadBalancingPolicy {
RefCountedPtr<RingHashLbConfig> config_;
// list of subchannels.
OrphanablePtr<RingHashSubchannelList> subchannel_list_;
OrphanablePtr<RingHashSubchannelList> latest_pending_subchannel_list_;
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
RefCountedPtr<RingHashSubchannelList> latest_pending_subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
};
//
// RingHash::Ring
//
RingHash::Ring::Ring(RingHash* parent,
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {
size_t num_subchannels = subchannel_list_->num_subchannels();
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
std::vector<AddressWeight> address_weights;
size_t sum = 0;
address_weights.reserve(num_subchannels);
for (size_t i = 0; i < num_subchannels; ++i) {
RingHashSubchannelData* sd = subchannel_list_->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
AddressWeight address_weight;
address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false).value();
// Weight should never be zero, but ignore it just in case, since
// that value would screw up the ring-building algorithm.
if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
address_weight.weight = weight_attribute->weight();
}
sum += address_weight.weight;
address_weights.push_back(std::move(address_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
for (auto& address : address_weights) {
address.normalized_weight = static_cast<double>(address.weight) / sum;
min_normalized_weight =
std::min(address.normalized_weight, min_normalized_weight);
max_normalized_weight =
std::max(address.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
// with whole numbers, and that's fine (the ring-building algorithm below can
// handle this). This preserves the original implementation's behavior: when
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t min_ring_size = parent->config_->min_ring_size();
const size_t max_ring_size = parent->config_->max_ring_size();
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
// Reserve memory for the entire ring up front.
const size_t ring_size = std::ceil(scale);
ring_.reserve(ring_size);
// Populate the hash ring by walking through the (host, weight) pairs in
// normalized_host_weights, and generating (scale * weight) hashes for each
// host. Since these aren't necessarily whole numbers, we maintain running
// sums -- current_hashes and target_hashes -- which allows us to populate the
// ring in a mostly stable way.
absl::InlinedVector<char, 196> hash_key_buffer;
double current_hashes = 0.0;
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < num_subchannels; ++i) {
const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash, subchannel_list_->subchannel(i)});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
}
min_hashes_per_host =
std::min(static_cast<uint64_t>(i), min_hashes_per_host);
max_hashes_per_host =
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const Entry& lhs, const Entry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p picker %p] created ring from subchannel_list=%p "
"with %" PRIuPTR " ring entries",
parent, this, subchannel_list_.get(), ring_.size());
}
}
//
// RingHash::Picker
//
@ -483,7 +374,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
return PickResult::Fail(
absl::InternalError("ring hash value is not a number"));
}
const std::vector<Ring::Entry>& ring = ring_->ring();
const auto& ring = subchannel_list_->ring();
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them!
@ -516,7 +407,9 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
[&](RefCountedPtr<SubchannelInterface> subchannel) {
if (subchannel_connection_attempter == nullptr) {
subchannel_connection_attempter =
MakeOrphanable<SubchannelConnectionAttempter>(parent_);
MakeOrphanable<SubchannelConnectionAttempter>(
subchannel_list_->Ref(DEBUG_LOCATION,
"SubchannelConnectionAttempter"));
}
subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
};
@ -541,7 +434,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
bool found_second_subchannel = false;
bool found_first_non_failed = false;
for (size_t i = 1; i < ring.size(); ++i) {
const Ring::Entry& entry = ring[(first_index + i) % ring.size()];
const auto& entry = ring[(first_index + i) % ring.size()];
if (entry.subchannel == ring[first_index].subchannel) {
continue;
}
@ -585,6 +478,117 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
// RingHash::RingHashSubchannelList
//
RingHash::RingHashSubchannelList::RingHashSubchannelList(
RingHash* policy, ServerAddressList addresses, const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args),
num_idle_(num_subchannels()) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Construct the ring.
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
std::vector<AddressWeight> address_weights;
size_t sum = 0;
address_weights.reserve(num_subchannels());
for (size_t i = 0; i < num_subchannels(); ++i) {
RingHashSubchannelData* sd = subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
AddressWeight address_weight;
address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false).value();
// Weight should never be zero, but ignore it just in case, since
// that value would screw up the ring-building algorithm.
if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
address_weight.weight = weight_attribute->weight();
}
sum += address_weight.weight;
address_weights.push_back(std::move(address_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
for (auto& address : address_weights) {
address.normalized_weight = static_cast<double>(address.weight) / sum;
min_normalized_weight =
std::min(address.normalized_weight, min_normalized_weight);
max_normalized_weight =
std::max(address.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
// with whole numbers, and that's fine (the ring-building algorithm below can
// handle this). This preserves the original implementation's behavior: when
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t min_ring_size = policy->config_->min_ring_size();
const size_t max_ring_size = policy->config_->max_ring_size();
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
// Reserve memory for the entire ring up front.
const size_t ring_size = std::ceil(scale);
ring_.reserve(ring_size);
// Populate the hash ring by walking through the (host, weight) pairs in
// normalized_host_weights, and generating (scale * weight) hashes for each
// host. Since these aren't necessarily whole numbers, we maintain running
// sums -- current_hashes and target_hashes -- which allows us to populate the
// ring in a mostly stable way.
absl::InlinedVector<char, 196> hash_key_buffer;
double current_hashes = 0.0;
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < num_subchannels(); ++i) {
const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash, subchannel(i)});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
}
min_hashes_per_host =
std::min(static_cast<uint64_t>(i), min_hashes_per_host);
max_hashes_per_host =
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const RingHashSubchannelList::RingEntry& lhs,
const RingHashSubchannelList::RingEntry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] created subchannel list %p with %" PRIuPTR " ring entries",
policy, this, ring_.size());
}
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
if (old_state == GRPC_CHANNEL_IDLE) {
@ -673,8 +677,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
state, status,
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
ring_));
absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "RingHashPicker")));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
@ -836,7 +839,7 @@ void RingHash::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
latest_pending_subchannel_list_ = MakeRefCounted<RingHashSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// If we have no existing list or the new list is empty, immediately

@ -182,12 +182,12 @@ class RoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
// List of subchannels.
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
RefCountedPtr<RoundRobinSubchannelList> subchannel_list_;
// Latest pending subchannel list.
// When we get an updated address list, we create a new subchannel list
// for it here, and we wait to swap it into subchannel_list_ until the new
// list becomes READY.
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
bool shutdown_ = false;
};
@ -288,7 +288,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// If the new list is empty, immediately promote it to

@ -35,8 +35,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/load_balancing/lb_policy.h"
@ -131,7 +131,7 @@ class SubchannelData {
public:
Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
RefCountedPtr<SubchannelListType> subchannel_list)
WeakRefCountedPtr<SubchannelListType> subchannel_list)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)) {}
@ -148,7 +148,7 @@ class SubchannelData {
private:
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
RefCountedPtr<SubchannelListType> subchannel_list_;
WeakRefCountedPtr<SubchannelListType> subchannel_list_;
};
// Starts watching the connectivity state of the subchannel.
@ -176,7 +176,7 @@ class SubchannelData {
// A list of subchannels.
template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelList : public InternallyRefCounted<SubchannelListType> {
class SubchannelList : public DualRefCounted<SubchannelListType> {
public:
// Starts watching the connectivity state of all subchannels.
// Must be called immediately after instantiation.
@ -200,10 +200,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// Resets connection backoff of all subchannels.
void ResetBackoffLocked();
void Orphan() override {
ShutdownLocked();
InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
}
void Orphan() override;
protected:
SubchannelList(LoadBalancingPolicy* policy, const char* tracer,
@ -213,8 +210,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
virtual ~SubchannelList();
virtual void ShutdownLocked();
private:
// For accessing Ref() and Unref().
friend class SubchannelData<SubchannelListType, SubchannelDataType>;
@ -327,7 +322,7 @@ void SubchannelData<SubchannelListType,
}
GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ =
new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
new Watcher(this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
pending_watcher_));
@ -365,7 +360,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, const char* tracer,
ServerAddressList addresses,
LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args)
: InternallyRefCounted<SubchannelListType>(tracer),
: DualRefCounted<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
@ -419,7 +414,7 @@ void SubchannelList<SubchannelListType,
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
void SubchannelList<SubchannelListType, SubchannelDataType>::Orphan() {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_,
policy_, this);

Loading…
Cancel
Save