|
|
@ -340,11 +340,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy { |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&timer_mu_); |
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&timer_mu_); |
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<WeightedRoundRobin> wrr_; |
|
|
|
RefCountedPtr<WeightedRoundRobin> wrr_; |
|
|
|
const bool use_per_rpc_utilization_; |
|
|
|
RefCountedPtr<WeightedRoundRobinConfig> config_; |
|
|
|
const Duration weight_update_period_; |
|
|
|
|
|
|
|
const Duration weight_expiration_period_; |
|
|
|
|
|
|
|
const Duration blackout_period_; |
|
|
|
|
|
|
|
const float error_utilization_penalty_; |
|
|
|
|
|
|
|
std::vector<SubchannelInfo> subchannels_; |
|
|
|
std::vector<SubchannelInfo> subchannels_; |
|
|
|
|
|
|
|
|
|
|
|
Mutex scheduler_mu_; |
|
|
|
Mutex scheduler_mu_; |
|
|
@ -505,11 +501,7 @@ WeightedRoundRobin::Picker::Picker( |
|
|
|
RefCountedPtr<WeightedRoundRobin> wrr, |
|
|
|
RefCountedPtr<WeightedRoundRobin> wrr, |
|
|
|
WeightedRoundRobinSubchannelList* subchannel_list) |
|
|
|
WeightedRoundRobinSubchannelList* subchannel_list) |
|
|
|
: wrr_(std::move(wrr)), |
|
|
|
: wrr_(std::move(wrr)), |
|
|
|
use_per_rpc_utilization_(!wrr_->config_->enable_oob_load_report()), |
|
|
|
config_(wrr_->config_), |
|
|
|
weight_update_period_(wrr_->config_->weight_update_period()), |
|
|
|
|
|
|
|
weight_expiration_period_(wrr_->config_->weight_expiration_period()), |
|
|
|
|
|
|
|
blackout_period_(wrr_->config_->blackout_period()), |
|
|
|
|
|
|
|
error_utilization_penalty_(wrr_->config_->error_utilization_penalty()), |
|
|
|
|
|
|
|
last_picked_index_(absl::Uniform<size_t>(wrr_->bit_gen_)) { |
|
|
|
last_picked_index_(absl::Uniform<size_t>(wrr_->bit_gen_)) { |
|
|
|
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { |
|
|
|
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { |
|
|
|
WeightedRoundRobinSubchannelData* sd = subchannel_list->subchannel(i); |
|
|
|
WeightedRoundRobinSubchannelData* sd = subchannel_list->subchannel(i); |
|
|
@ -548,9 +540,9 @@ WeightedRoundRobin::PickResult WeightedRoundRobin::Picker::Pick( |
|
|
|
auto& subchannel_info = subchannels_[index]; |
|
|
|
auto& subchannel_info = subchannels_[index]; |
|
|
|
// Collect per-call utilization data if needed.
|
|
|
|
// Collect per-call utilization data if needed.
|
|
|
|
std::unique_ptr<SubchannelCallTrackerInterface> subchannel_call_tracker; |
|
|
|
std::unique_ptr<SubchannelCallTrackerInterface> subchannel_call_tracker; |
|
|
|
if (use_per_rpc_utilization_) { |
|
|
|
if (!config_->enable_oob_load_report()) { |
|
|
|
subchannel_call_tracker = std::make_unique<SubchannelCallTracker>( |
|
|
|
subchannel_call_tracker = std::make_unique<SubchannelCallTracker>( |
|
|
|
subchannel_info.weight, error_utilization_penalty_); |
|
|
|
subchannel_info.weight, config_->error_utilization_penalty()); |
|
|
|
} |
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
@ -582,7 +574,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { |
|
|
|
weights.reserve(subchannels_.size()); |
|
|
|
weights.reserve(subchannels_.size()); |
|
|
|
for (const auto& subchannel : subchannels_) { |
|
|
|
for (const auto& subchannel : subchannels_) { |
|
|
|
weights.push_back(subchannel.weight->GetWeight( |
|
|
|
weights.push_back(subchannel.weight->GetWeight( |
|
|
|
now, weight_expiration_period_, blackout_period_)); |
|
|
|
now, config_->weight_expiration_period(), config_->blackout_period())); |
|
|
|
} |
|
|
|
} |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { |
|
|
|
gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this, |
|
|
|
gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this, |
|
|
@ -609,7 +601,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { |
|
|
|
// Start timer.
|
|
|
|
// Start timer.
|
|
|
|
WeakRefCountedPtr<Picker> self = WeakRef(); |
|
|
|
WeakRefCountedPtr<Picker> self = WeakRef(); |
|
|
|
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
weight_update_period_, [self = std::move(self)]() mutable { |
|
|
|
config_->weight_update_period(), [self = std::move(self)]() mutable { |
|
|
|
ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
ExecCtx exec_ctx; |
|
|
|
ExecCtx exec_ctx; |
|
|
|
{ |
|
|
|
{ |
|
|
|