[WRR] delegate to pick_first instead of creating subchannels directly (#33087)

As part of the dualstack backend design, change WRR to delegate to
pick_first instead of creating subchannels directly.
pull/33563/head
Mark D. Roth 2 years ago committed by GitHub
parent 98417f3bd0
commit 38816cf327
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build_autogenerated.yaml
  2. 3
      src/core/BUILD
  3. 7
      src/core/ext/filters/client_channel/lb_policy/health_check_client.cc
  4. 634
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  5. 4
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc

@ -1967,7 +1967,6 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/oob_backend_metric_internal.h
- src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h
- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
- src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h
- src/core/ext/filters/client_channel/local_subchannel_pool.h
- src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.h

@ -4794,15 +4794,16 @@ grpc_cc_library(
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
deps = [
"channel_args",
"grpc_backend_metric_data",
"grpc_lb_subchannel_list",
"json",
"json_args",
"json_object_loader",
"lb_endpoint_list",
"lb_policy",
"lb_policy_factory",
"ref_counted",

@ -435,6 +435,13 @@ void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
//
HealthWatcher::~HealthWatcher() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthWatcher %p: unregistering from producer %p "
"(health_check_service_name=\"%s\")",
this, producer_.get(),
health_check_service_name_.value_or("N/A").c_str());
}
if (producer_ != nullptr) {
producer_->RemoveWatcher(this, health_check_service_name_);
}

@ -39,14 +39,15 @@
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h"
#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
@ -152,11 +153,11 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
private:
// Represents the weight for a given address.
class AddressWeight : public RefCounted<AddressWeight> {
class EndpointWeight : public RefCounted<EndpointWeight> {
public:
AddressWeight(RefCountedPtr<WeightedRoundRobin> wrr, std::string key)
EndpointWeight(RefCountedPtr<WeightedRoundRobin> wrr, std::string key)
: wrr_(std::move(wrr)), key_(std::move(key)) {}
~AddressWeight() override;
~EndpointWeight() override;
void MaybeUpdateWeight(double qps, double eps, double utilization,
float error_utilization_penalty);
@ -176,109 +177,83 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
Timestamp last_update_time_ ABSL_GUARDED_BY(&mu_) = Timestamp::InfPast();
};
// Forward declaration.
class WeightedRoundRobinSubchannelList;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class WeightedRoundRobinSubchannelData
: public SubchannelData<WeightedRoundRobinSubchannelList,
WeightedRoundRobinSubchannelData> {
class WrrEndpointList : public EndpointList {
public:
WeightedRoundRobinSubchannelData(
SubchannelList<WeightedRoundRobinSubchannelList,
WeightedRoundRobinSubchannelData>* subchannel_list,
const ServerAddress& address, RefCountedPtr<SubchannelInterface> sc);
absl::optional<grpc_connectivity_state> connectivity_state() const {
return logical_connectivity_state_;
}
RefCountedPtr<AddressWeight> weight() const { return weight_; }
private:
class OobWatcher : public OobBackendMetricWatcher {
class WrrEndpoint : public Endpoint {
public:
OobWatcher(RefCountedPtr<AddressWeight> weight,
float error_utilization_penalty)
: weight_(std::move(weight)),
error_utilization_penalty_(error_utilization_penalty) {}
WrrEndpoint(RefCountedPtr<WrrEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
: Endpoint(std::move(endpoint_list)),
weight_(policy<WeightedRoundRobin>()->GetOrCreateWeight(
address.address())) {
Init(address, args, std::move(work_serializer));
}
void OnBackendMetricReport(
const BackendMetricData& backend_metric_data) override;
RefCountedPtr<EndpointWeight> weight() const { return weight_; }
private:
RefCountedPtr<AddressWeight> weight_;
const float error_utilization_penalty_;
class OobWatcher : public OobBackendMetricWatcher {
public:
OobWatcher(RefCountedPtr<EndpointWeight> weight,
float error_utilization_penalty)
: weight_(std::move(weight)),
error_utilization_penalty_(error_utilization_penalty) {}
void OnBackendMetricReport(
const BackendMetricData& backend_metric_data) override;
private:
RefCountedPtr<EndpointWeight> weight_;
const float error_utilization_penalty_;
};
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state,
const absl::Status& status) override;
RefCountedPtr<EndpointWeight> weight_;
};
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// Updates the logical connectivity state.
void UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
// The logical connectivity state of the subchannel.
// Note that the logical connectivity state may differ from the
// actual reported state in some cases (e.g., after we see
// TRANSIENT_FAILURE, we ignore any subsequent state changes until
// we see READY).
absl::optional<grpc_connectivity_state> logical_connectivity_state_;
RefCountedPtr<AddressWeight> weight_;
};
// A list of subchannels.
class WeightedRoundRobinSubchannelList
: public SubchannelList<WeightedRoundRobinSubchannelList,
WeightedRoundRobinSubchannelData> {
public:
WeightedRoundRobinSubchannelList(WeightedRoundRobin* policy,
ServerAddressList addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WeightedRoundRobinSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr,
const ServerAddressList& addresses, const ChannelArgs& args)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WrrEndpointList"
: nullptr) {
Init(addresses, args,
[&](RefCountedPtr<WrrEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args) {
return MakeOrphanable<WrrEndpoint>(
std::move(endpoint_list), address, args,
policy<WeightedRoundRobin>()->work_serializer());
});
}
~WeightedRoundRobinSubchannelList() override {
WeightedRoundRobin* p = static_cast<WeightedRoundRobin*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
private:
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
const override {
return policy<WeightedRoundRobin>()->channel_control_helper();
}
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
// Updates the counters of children in each state when a
// child transitions from old_state to new_state.
void UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state);
// Ensures that the right subchannel list is used and then updates
// the aggregated connectivity state based on the subchannel list's
// Ensures that the right child list is used and then updates
// the WRR policy's connectivity state based on the child list's
// state counters.
void MaybeUpdateAggregatedConnectivityStateLocked(
absl::Status status_for_tf);
private:
std::shared_ptr<WorkSerializer> work_serializer() const override {
return static_cast<WeightedRoundRobin*>(policy())->work_serializer();
}
std::string CountersString() const {
return absl::StrCat("num_subchannels=", num_subchannels(),
" num_ready=", num_ready_,
return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
" num_connecting=", num_connecting_,
" num_transient_failure=", num_transient_failure_);
}
@ -295,7 +270,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<WeightedRoundRobin> wrr,
WeightedRoundRobinSubchannelList* subchannel_list);
WrrEndpointList* endpoint_list);
~Picker() override;
@ -307,31 +282,34 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
// A call tracker that collects per-call endpoint utilization reports.
class SubchannelCallTracker : public SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(RefCountedPtr<AddressWeight> weight,
float error_utilization_penalty)
SubchannelCallTracker(
RefCountedPtr<EndpointWeight> weight, float error_utilization_penalty,
std::unique_ptr<SubchannelCallTrackerInterface> child_tracker)
: weight_(std::move(weight)),
error_utilization_penalty_(error_utilization_penalty) {}
error_utilization_penalty_(error_utilization_penalty),
child_tracker_(std::move(child_tracker)) {}
void Start() override {}
void Start() override;
void Finish(FinishArgs args) override;
private:
RefCountedPtr<AddressWeight> weight_;
RefCountedPtr<EndpointWeight> weight_;
const float error_utilization_penalty_;
std::unique_ptr<SubchannelCallTrackerInterface> child_tracker_;
};
// Info stored about each subchannel.
struct SubchannelInfo {
SubchannelInfo(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<AddressWeight> weight)
: subchannel(std::move(subchannel)), weight(std::move(weight)) {}
// Info stored about each endpoint.
struct EndpointInfo {
EndpointInfo(RefCountedPtr<SubchannelPicker> picker,
RefCountedPtr<EndpointWeight> weight)
: picker(std::move(picker)), weight(std::move(weight)) {}
RefCountedPtr<SubchannelInterface> subchannel;
RefCountedPtr<AddressWeight> weight;
RefCountedPtr<SubchannelPicker> picker;
RefCountedPtr<EndpointWeight> weight;
};
// Returns the index into subchannels_ to be picked.
// Returns the index into endpoints_ to be picked.
size_t PickIndex();
// Builds a new scheduler and swaps it into place, then starts a
@ -341,7 +319,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
RefCountedPtr<WeightedRoundRobin> wrr_;
RefCountedPtr<WeightedRoundRobinConfig> config_;
std::vector<SubchannelInfo> subchannels_;
std::vector<EndpointInfo> endpoints_;
Mutex scheduler_mu_;
std::shared_ptr<StaticStrideScheduler> scheduler_
@ -359,23 +337,22 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
RefCountedPtr<AddressWeight> GetOrCreateWeight(
RefCountedPtr<EndpointWeight> GetOrCreateWeight(
const grpc_resolved_address& address);
RefCountedPtr<WeightedRoundRobinConfig> config_;
// List of subchannels.
RefCountedPtr<WeightedRoundRobinSubchannelList> subchannel_list_;
// Latest pending subchannel list.
// When we get an updated address list, we create a new subchannel list
// for it here, and we wait to swap it into subchannel_list_ until the new
// List of endpoints.
OrphanablePtr<WrrEndpointList> endpoint_list_;
// Latest pending endpoint list.
// When we get an updated address list, we create a new endpoint list
// for it here, and we wait to swap it into endpoint_list_ until the new
// list becomes READY.
RefCountedPtr<WeightedRoundRobinSubchannelList>
latest_pending_subchannel_list_;
OrphanablePtr<WrrEndpointList> latest_pending_endpoint_list_;
Mutex address_weight_map_mu_;
std::map<std::string, AddressWeight*, std::less<>> address_weight_map_
ABSL_GUARDED_BY(&address_weight_map_mu_);
Mutex endpoint_weight_map_mu_;
std::map<std::string, EndpointWeight*, std::less<>> endpoint_weight_map_
ABSL_GUARDED_BY(&endpoint_weight_map_mu_);
bool shutdown_ = false;
@ -386,18 +363,18 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
//
// WeightedRoundRobin::AddressWeight
// WeightedRoundRobin::EndpointWeight
//
WeightedRoundRobin::AddressWeight::~AddressWeight() {
MutexLock lock(&wrr_->address_weight_map_mu_);
auto it = wrr_->address_weight_map_.find(key_);
if (it != wrr_->address_weight_map_.end() && it->second == this) {
wrr_->address_weight_map_.erase(it);
WeightedRoundRobin::EndpointWeight::~EndpointWeight() {
MutexLock lock(&wrr_->endpoint_weight_map_mu_);
auto it = wrr_->endpoint_weight_map_.find(key_);
if (it != wrr_->endpoint_weight_map_.end() && it->second == this) {
wrr_->endpoint_weight_map_.erase(it);
}
}
void WeightedRoundRobin::AddressWeight::MaybeUpdateWeight(
void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
double qps, double eps, double utilization,
float error_utilization_penalty) {
// Compute weight.
@ -437,7 +414,7 @@ void WeightedRoundRobin::AddressWeight::MaybeUpdateWeight(
last_update_time_ = now;
}
float WeightedRoundRobin::AddressWeight::GetWeight(
float WeightedRoundRobin::EndpointWeight::GetWeight(
Timestamp now, Duration weight_expiration_period,
Duration blackout_period) {
MutexLock lock(&mu_);
@ -468,7 +445,7 @@ float WeightedRoundRobin::AddressWeight::GetWeight(
return weight_;
}
void WeightedRoundRobin::AddressWeight::ResetNonEmptySince() {
void WeightedRoundRobin::EndpointWeight::ResetNonEmptySince() {
MutexLock lock(&mu_);
non_empty_since_ = Timestamp::InfFuture();
}
@ -477,8 +454,13 @@ void WeightedRoundRobin::AddressWeight::ResetNonEmptySince() {
// WeightedRoundRobin::Picker::SubchannelCallTracker
//
void WeightedRoundRobin::Picker::SubchannelCallTracker::Start() {
if (child_tracker_ != nullptr) child_tracker_->Start();
}
void WeightedRoundRobin::Picker::SubchannelCallTracker::Finish(
FinishArgs args) {
if (child_tracker_ != nullptr) child_tracker_->Finish(args);
auto* backend_metric_data =
args.backend_metric_accessor->GetBackendMetricData();
double qps = 0;
@ -499,23 +481,22 @@ void WeightedRoundRobin::Picker::SubchannelCallTracker::Finish(
// WeightedRoundRobin::Picker
//
WeightedRoundRobin::Picker::Picker(
RefCountedPtr<WeightedRoundRobin> wrr,
WeightedRoundRobinSubchannelList* subchannel_list)
WeightedRoundRobin::Picker::Picker(RefCountedPtr<WeightedRoundRobin> wrr,
WrrEndpointList* endpoint_list)
: wrr_(std::move(wrr)),
config_(wrr_->config_),
last_picked_index_(absl::Uniform<size_t>(wrr_->bit_gen_)) {
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
WeightedRoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
if (sd->connectivity_state() == GRPC_CHANNEL_READY) {
subchannels_.emplace_back(sd->subchannel()->Ref(), sd->weight());
for (auto& endpoint : endpoint_list->endpoints()) {
auto* ep = static_cast<WrrEndpointList::WrrEndpoint*>(endpoint.get());
if (ep->connectivity_state() == GRPC_CHANNEL_READY) {
endpoints_.emplace_back(ep->picker(), ep->weight());
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p picker %p] created picker from subchannel_list=%p "
"[WRR %p picker %p] created picker from endpoint_list=%p "
"with %" PRIuPTR " subchannels",
wrr_.get(), this, subchannel_list, subchannels_.size());
wrr_.get(), this, endpoint_list, endpoints_.size());
}
BuildSchedulerAndStartTimerLocked();
}
@ -533,26 +514,30 @@ void WeightedRoundRobin::Picker::Orphan() {
}
wrr_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset();
wrr_.reset();
}
WeightedRoundRobin::PickResult WeightedRoundRobin::Picker::Pick(
PickArgs /*args*/) {
WeightedRoundRobin::PickResult WeightedRoundRobin::Picker::Pick(PickArgs args) {
size_t index = PickIndex();
GPR_ASSERT(index < subchannels_.size());
auto& subchannel_info = subchannels_[index];
// Collect per-call utilization data if needed.
std::unique_ptr<SubchannelCallTrackerInterface> subchannel_call_tracker;
if (!config_->enable_oob_load_report()) {
subchannel_call_tracker = std::make_unique<SubchannelCallTracker>(
subchannel_info.weight, config_->error_utilization_penalty());
}
GPR_ASSERT(index < endpoints_.size());
auto& endpoint_info = endpoints_[index];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
wrr_.get(), this, index, subchannel_info.subchannel.get());
"[WRR %p picker %p] returning index %" PRIuPTR ", picker=%p",
wrr_.get(), this, index, endpoint_info.picker.get());
}
return PickResult::Complete(subchannel_info.subchannel,
std::move(subchannel_call_tracker));
auto result = endpoint_info.picker->Pick(args);
// Collect per-call utilization data if needed.
if (!config_->enable_oob_load_report()) {
auto* complete = absl::get_if<PickResult::Complete>(&result.result);
if (complete != nullptr) {
complete->subchannel_call_tracker =
std::make_unique<SubchannelCallTracker>(
endpoint_info.weight, config_->error_utilization_penalty(),
std::move(complete->subchannel_call_tracker));
}
}
return result;
}
size_t WeightedRoundRobin::Picker::PickIndex() {
@ -566,16 +551,16 @@ size_t WeightedRoundRobin::Picker::PickIndex() {
if (scheduler != nullptr) return scheduler->Pick();
// We don't have a scheduler (i.e., either all of the weights are 0 or
// there is only one subchannel), so fall back to RR.
return last_picked_index_.fetch_add(1) % subchannels_.size();
return last_picked_index_.fetch_add(1) % endpoints_.size();
}
void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
// Build scheduler.
const Timestamp now = Timestamp::Now();
std::vector<float> weights;
weights.reserve(subchannels_.size());
for (const auto& subchannel : subchannels_) {
weights.push_back(subchannel.weight->GetWeight(
weights.reserve(endpoints_.size());
for (const auto& endpoint : endpoints_) {
weights.push_back(endpoint.weight->GetWeight(
now, config_->weight_expiration_period(), config_->blackout_period()));
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
@ -601,6 +586,10 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
scheduler_ = std::move(scheduler);
}
// Start timer.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] scheduling timer for %s", wrr_.get(),
this, config_->weight_update_period().ToString().c_str());
}
WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(), [self = std::move(self)]() mutable {
@ -636,8 +625,8 @@ WeightedRoundRobin::~WeightedRoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] Destroying Round Robin policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(endpoint_list_ == nullptr);
GPR_ASSERT(latest_pending_endpoint_list_ == nullptr);
}
void WeightedRoundRobin::ShutdownLocked() {
@ -645,14 +634,14 @@ void WeightedRoundRobin::ShutdownLocked() {
gpr_log(GPR_INFO, "[WRR %p] Shutting down", this);
}
shutdown_ = true;
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
endpoint_list_.reset();
latest_pending_endpoint_list_.reset();
}
void WeightedRoundRobin::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
endpoint_list_->ResetBackoffLocked();
if (latest_pending_endpoint_list_ != nullptr) {
latest_pending_endpoint_list_->ResetBackoffLocked();
}
}
@ -692,27 +681,28 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
}
// If we already have a subchannel list, then keep using the existing
// list, but still report back that the update was not accepted.
if (subchannel_list_ != nullptr) return args.addresses.status();
if (endpoint_list_ != nullptr) return args.addresses.status();
}
// Create new subchannel list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
latest_pending_subchannel_list_ != nullptr) {
latest_pending_endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[WRR %p] replacing previous pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
this, latest_pending_endpoint_list_.get());
}
latest_pending_subchannel_list_ =
MakeRefCounted<WeightedRoundRobinSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
latest_pending_endpoint_list_ =
MakeOrphanable<WrrEndpointList>(Ref(), std::move(addresses), args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// endpoint_list_ and report TRANSIENT_FAILURE.
// TODO(roth): As part of adding dualstack backend support, we need to
// also handle the case where the list of addresses for a given
// endpoint is empty.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
subchannel_list_ != nullptr) {
endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[WRR %p] replacing previous subchannel list %p", this,
subchannel_list_.get());
endpoint_list_.get());
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
endpoint_list_ = std::move(latest_pending_endpoint_list_);
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
@ -723,42 +713,117 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
// subchannel_list_.
if (subchannel_list_.get() == nullptr) {
subchannel_list_ = std::move(latest_pending_subchannel_list_);
// endpoint_list_.
if (endpoint_list_.get() == nullptr) {
endpoint_list_ = std::move(latest_pending_endpoint_list_);
}
return absl::OkStatus();
}
RefCountedPtr<WeightedRoundRobin::AddressWeight>
RefCountedPtr<WeightedRoundRobin::EndpointWeight>
WeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) {
auto key = grpc_sockaddr_to_uri(&address);
if (!key.ok()) return nullptr;
MutexLock lock(&address_weight_map_mu_);
auto it = address_weight_map_.find(*key);
if (it != address_weight_map_.end()) {
MutexLock lock(&endpoint_weight_map_mu_);
auto it = endpoint_weight_map_.find(*key);
if (it != endpoint_weight_map_.end()) {
auto weight = it->second->RefIfNonZero();
if (weight != nullptr) return weight;
}
auto weight =
MakeRefCounted<AddressWeight>(Ref(DEBUG_LOCATION, "AddressWeight"), *key);
address_weight_map_.emplace(*key, weight.get());
auto weight = MakeRefCounted<EndpointWeight>(
Ref(DEBUG_LOCATION, "EndpointWeight"), *key);
endpoint_weight_map_.emplace(*key, weight.get());
return weight;
}
//
// WeightedRoundRobin::WeightedRoundRobinSubchannelList
// WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OobWatcher
//
void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OobWatcher::
OnBackendMetricReport(const BackendMetricData& backend_metric_data) {
double utilization = backend_metric_data.application_utilization;
if (utilization <= 0) {
utilization = backend_metric_data.cpu_utilization;
}
weight_->MaybeUpdateWeight(backend_metric_data.qps, backend_metric_data.eps,
utilization, error_utilization_penalty_);
}
//
// WeightedRoundRobin::WrrEndpointList::WrrEndpoint
//
RefCountedPtr<SubchannelInterface>
WeightedRoundRobin::WrrEndpointList::WrrEndpoint::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
auto* wrr = policy<WeightedRoundRobin>();
auto subchannel =
wrr->channel_control_helper()->CreateSubchannel(std::move(address), args);
// Start OOB watch if configured.
if (wrr->config_->enable_oob_load_report()) {
subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
wrr->config_->oob_reporting_period(),
std::make_unique<OobWatcher>(
weight_, wrr->config_->error_utilization_penalty())));
}
return subchannel;
}
void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OnStateUpdate(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state, const absl::Status& status) {
auto* wrr_endpoint_list = endpoint_list<WrrEndpointList>();
auto* wrr = policy<WeightedRoundRobin>();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] connectivity changed for child %p, endpoint_list %p "
"(index %" PRIuPTR " of %" PRIuPTR
"): prev_state=%s new_state=%s (%s)",
wrr, this, wrr_endpoint_list, Index(), wrr_endpoint_list->size(),
(old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str());
}
if (new_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] child %p reported IDLE; requesting connection", wrr,
this);
}
ExitIdleLocked();
} else if (new_state == GRPC_CHANNEL_READY) {
// If we transition back to READY state, restart the blackout period.
// Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
weight_->ResetNonEmptySince();
}
// If state changed, update state counters.
if (!old_state.has_value() || *old_state != new_state) {
wrr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
}
// Update the policy state.
wrr_endpoint_list->MaybeUpdateAggregatedConnectivityStateLocked(status);
}
//
// WeightedRoundRobin::WrrEndpointList
//
void WeightedRoundRobin::WeightedRoundRobinSubchannelList::
UpdateStateCountersLocked(absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
void WeightedRoundRobin::WrrEndpointList::UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
// We treat IDLE the same as CONNECTING, since it will immediately
// transition into that state anyway.
if (old_state.has_value()) {
GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
if (*old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (*old_state == GRPC_CHANNEL_CONNECTING) {
} else if (*old_state == GRPC_CHANNEL_CONNECTING ||
*old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -769,208 +834,79 @@ void WeightedRoundRobin::WeightedRoundRobinSubchannelList::
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
} else if (new_state == GRPC_CHANNEL_CONNECTING ||
new_state == GRPC_CHANNEL_IDLE) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
void WeightedRoundRobin::WeightedRoundRobinSubchannelList::
void WeightedRoundRobin::WrrEndpointList::
MaybeUpdateAggregatedConnectivityStateLocked(absl::Status status_for_tf) {
WeightedRoundRobin* p = static_cast<WeightedRoundRobin*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ in the following cases:
// - subchannel_list_ has no READY subchannels.
// - This list has at least one READY subchannel and we have seen the
// initial connectivity state notification for all subchannels.
// - All of the subchannels in this list are in TRANSIENT_FAILURE.
auto* wrr = policy<WeightedRoundRobin>();
// If this is latest_pending_endpoint_list_, then swap it into
// endpoint_list_ in the following cases:
// - endpoint_list_ has no READY children.
// - This list has at least one READY child and we have seen the
// initial connectivity state notification for all children.
// - All of the children in this list are in TRANSIENT_FAILURE.
// (This may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we're doing what the control plane told us to do.)
if (p->latest_pending_subchannel_list_.get() == this &&
(p->subchannel_list_->num_ready_ == 0 ||
(num_ready_ > 0 && AllSubchannelsSeenInitialState()) ||
num_transient_failure_ == num_subchannels())) {
if (wrr->latest_pending_endpoint_list_.get() == this &&
(wrr->endpoint_list_->num_ready_ == 0 ||
(num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
num_transient_failure_ == size())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
const std::string old_counters_string =
p->subchannel_list_ != nullptr ? p->subchannel_list_->CountersString()
wrr->endpoint_list_ != nullptr ? wrr->endpoint_list_->CountersString()
: "";
gpr_log(
GPR_INFO,
"[WRR %p] swapping out subchannel list %p (%s) in favor of %p (%s)",
p, p->subchannel_list_.get(), old_counters_string.c_str(), this,
CountersString().c_str());
gpr_log(GPR_INFO,
"[WRR %p] swapping out endpoint list %p (%s) in favor of %p (%s)",
wrr, wrr->endpoint_list_.get(), old_counters_string.c_str(), this,
CountersString().c_str());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
wrr->endpoint_list_ = std::move(wrr->latest_pending_endpoint_list_);
}
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
// Only set connectivity state if this is the current endpoint list.
if (wrr->endpoint_list_.get() != this) return;
// First matching rule wins:
// 1) ANY subchannel is READY => policy is READY.
// 2) ANY subchannel is CONNECTING => policy is CONNECTING.
// 3) ALL subchannels are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
// 1) ANY child is READY => policy is READY.
// 2) ANY child is CONNECTING => policy is CONNECTING.
// 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
if (num_ready_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] reporting READY with subchannel list %p", p,
gpr_log(GPR_INFO, "[WRR %p] reporting READY with endpoint list %p", wrr,
this);
}
p->channel_control_helper()->UpdateState(
wrr->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(p->Ref(), this));
MakeRefCounted<Picker>(wrr->Ref(), this));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with subchannel list %p",
p, this);
gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with endpoint list %p",
wrr, this);
}
p->channel_control_helper()->UpdateState(
wrr->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else if (num_transient_failure_ == num_subchannels()) {
MakeRefCounted<QueuePicker>(nullptr));
} else if (num_transient_failure_ == size()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(
GPR_INFO,
"[WRR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s", p,
this, status_for_tf.ToString().c_str());
gpr_log(GPR_INFO,
"[WRR %p] reporting TRANSIENT_FAILURE with endpoint list %p: %s",
wrr, this, status_for_tf.ToString().c_str());
}
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
status_for_tf.ToString()));
}
p->channel_control_helper()->UpdateState(
wrr->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
MakeRefCounted<TransientFailurePicker>(last_failure_));
}
}
//
// WeightedRoundRobin::WeightedRoundRobinSubchannelData::OobWatcher
//
void WeightedRoundRobin::WeightedRoundRobinSubchannelData::OobWatcher::
OnBackendMetricReport(const BackendMetricData& backend_metric_data) {
double utilization = backend_metric_data.application_utilization;
if (utilization <= 0) {
utilization = backend_metric_data.cpu_utilization;
}
weight_->MaybeUpdateWeight(backend_metric_data.qps, backend_metric_data.eps,
utilization, error_utilization_penalty_);
}
//
// WeightedRoundRobin::WeightedRoundRobinSubchannelData
//
WeightedRoundRobin::WeightedRoundRobinSubchannelData::
WeightedRoundRobinSubchannelData(
SubchannelList<WeightedRoundRobinSubchannelList,
WeightedRoundRobinSubchannelData>* subchannel_list,
const ServerAddress& address, RefCountedPtr<SubchannelInterface> sc)
: SubchannelData(subchannel_list, address, std::move(sc)),
weight_(static_cast<WeightedRoundRobin*>(subchannel_list->policy())
->GetOrCreateWeight(address.address())) {
// Start OOB watch if configured.
WeightedRoundRobin* p =
static_cast<WeightedRoundRobin*>(subchannel_list->policy());
if (p->config_->enable_oob_load_report()) {
subchannel()->AddDataWatcher(MakeOobBackendMetricWatcher(
p->config_->oob_reporting_period(),
std::make_unique<OobWatcher>(weight_,
p->config_->error_utilization_penalty())));
}
}
void WeightedRoundRobin::WeightedRoundRobinSubchannelData::
ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
WeightedRoundRobin* p =
static_cast<WeightedRoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If this is not the initial state notification and the new state is
// TRANSIENT_FAILURE or IDLE, re-resolve.
// Note that we don't want to do this on the initial state notification,
// because that would result in an endless loop of re-resolution.
if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
new_state == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] Subchannel %p reported %s; requesting re-resolution", p,
subchannel(), ConnectivityStateName(new_state));
}
p->channel_control_helper()->RequestReresolution();
}
if (new_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] Subchannel %p reported IDLE; requesting connection", p,
subchannel());
}
subchannel()->RequestConnection();
} else if (new_state == GRPC_CHANNEL_READY) {
// If we transition back to READY state, restart the blackout period.
// Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
weight_->ResetNonEmptySince();
}
// Update logical connectivity state.
UpdateLogicalConnectivityStateLocked(new_state);
// Update the policy state.
subchannel_list()->MaybeUpdateAggregatedConnectivityStateLocked(
connectivity_status());
}
void WeightedRoundRobin::WeightedRoundRobinSubchannelData::
UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
WeightedRoundRobin* p =
static_cast<WeightedRoundRobin*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(
GPR_INFO,
"[WRR %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
(logical_connectivity_state_.has_value()
? ConnectivityStateName(*logical_connectivity_state_)
: "N/A"),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// If the last logical state was TRANSIENT_FAILURE, then ignore the
// state change unless the new state is READY.
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
connectivity_state != GRPC_CHANNEL_READY) {
return;
}
// If the new state is IDLE, treat it as CONNECTING, since it will
// immediately transition into CONNECTING anyway.
if (connectivity_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR
" of %" PRIuPTR "): treating IDLE as CONNECTING",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels());
}
connectivity_state = GRPC_CHANNEL_CONNECTING;
}
// If no change, return false.
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == connectivity_state) {
return;
}
// Otherwise, update counters and logical state.
subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
connectivity_state);
logical_connectivity_state_ = connectivity_state;
}
//
// factory
//

@ -127,8 +127,6 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()),
lb_policy_.get()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate(location);
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
@ -142,6 +140,8 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
<< location.line();
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate(location);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
}
return WaitForConnected(location);

Loading…
Cancel
Save