|
|
|
@ -18,6 +18,7 @@ |
|
|
|
|
|
|
|
|
|
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h" |
|
|
|
|
|
|
|
|
|
#include <inttypes.h> |
|
|
|
|
#include <stddef.h> |
|
|
|
|
|
|
|
|
|
#include <algorithm> |
|
|
|
@ -282,7 +283,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void MaybeUneject(uint64_t base_ejection_time_in_millis, |
|
|
|
|
bool MaybeUneject(uint64_t base_ejection_time_in_millis, |
|
|
|
|
uint64_t max_ejection_time_in_millis) { |
|
|
|
|
if (!ejection_time_.has_value()) { |
|
|
|
|
if (multiplier_ > 0) { |
|
|
|
@ -297,8 +298,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy { |
|
|
|
|
max_ejection_time_in_millis))); |
|
|
|
|
if (change_time < ExecCtx::Get()->Now()) { |
|
|
|
|
Uneject(); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -596,9 +599,17 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// Update outlier detection timer.
|
|
|
|
|
if (!config_->CountingEnabled()) { |
|
|
|
|
// No need for timer. Cancel the current timer, if any.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] counting disabled, cancelling timer", |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
ejection_timer_.reset(); |
|
|
|
|
} else if (ejection_timer_ == nullptr) { |
|
|
|
|
// No timer running. Start it now.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this); |
|
|
|
|
} |
|
|
|
|
ejection_timer_ = |
|
|
|
|
MakeOrphanable<EjectionTimer>(Ref(), ExecCtx::Get()->Now()); |
|
|
|
|
for (const auto& p : subchannel_state_map_) { |
|
|
|
@ -610,13 +621,15 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// with the same start time.
|
|
|
|
|
// Note that if the new deadline is in the past, the timer will fire
|
|
|
|
|
// immediately.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] interval changed, replacing timer", |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
ejection_timer_ = |
|
|
|
|
MakeOrphanable<EjectionTimer>(Ref(), ejection_timer_->StartTime()); |
|
|
|
|
} |
|
|
|
|
// Create policy if needed.
|
|
|
|
|
if (child_policy_ == nullptr) { |
|
|
|
|
child_policy_ = CreateChildPolicyLocked(args.args); |
|
|
|
|
} |
|
|
|
|
// Update subchannel state map.
|
|
|
|
|
if (args.addresses.ok()) { |
|
|
|
|
std::set<std::string> current_addresses; |
|
|
|
|
for (const ServerAddress& address : *args.addresses) { |
|
|
|
@ -624,6 +637,11 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
auto& subchannel_state = subchannel_state_map_[address_key]; |
|
|
|
|
if (subchannel_state == nullptr) { |
|
|
|
|
subchannel_state = MakeRefCounted<SubchannelState>(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] adding map entry for %s (%p)", |
|
|
|
|
this, address_key.c_str(), subchannel_state.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
current_addresses.emplace(address_key); |
|
|
|
|
} |
|
|
|
@ -632,18 +650,26 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
if (current_addresses.find(it->first) == current_addresses.end()) { |
|
|
|
|
// remove each map entry for a subchannel address not in the updated
|
|
|
|
|
// address list.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] removing map entry for %s (%p)", |
|
|
|
|
this, it->first.c_str(), it->second.get()); |
|
|
|
|
} |
|
|
|
|
it = subchannel_state_map_.erase(it); |
|
|
|
|
} else { |
|
|
|
|
++it; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Construct update args.
|
|
|
|
|
// Create child policy if needed.
|
|
|
|
|
if (child_policy_ == nullptr) { |
|
|
|
|
child_policy_ = CreateChildPolicyLocked(args.args); |
|
|
|
|
} |
|
|
|
|
// Update child policy.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.addresses = std::move(args.addresses); |
|
|
|
|
update_args.config = config_->child_policy(); |
|
|
|
|
update_args.args = grpc_channel_args_copy(args.args); |
|
|
|
|
// Update the policy.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] Updating child policy handler %p", this, |
|
|
|
@ -721,8 +747,7 @@ void OutlierDetectionLb::Helper::UpdateState( |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] child connectivity state update: " |
|
|
|
|
"state=%s (%s) " |
|
|
|
|
"picker=%p", |
|
|
|
|
"state=%s (%s) picker=%p", |
|
|
|
|
outlier_detection_policy_.get(), ConnectivityStateName(state), |
|
|
|
|
status.ToString().c_str(), picker.get()); |
|
|
|
|
} |
|
|
|
@ -758,12 +783,14 @@ void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
OutlierDetectionLb::EjectionTimer::EjectionTimer( |
|
|
|
|
RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time) |
|
|
|
|
: parent_(std::move(parent)), start_time_(start_time) { |
|
|
|
|
auto interval = parent_->config_->outlier_detection_config().interval; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s", |
|
|
|
|
parent_.get(), interval.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); |
|
|
|
|
Ref().release(); |
|
|
|
|
grpc_timer_init( |
|
|
|
|
&timer_, |
|
|
|
|
start_time_ + parent_->config_->outlier_detection_config().interval, |
|
|
|
|
&on_timer_); |
|
|
|
|
grpc_timer_init(&timer_, start_time_ + interval, &on_timer_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void OutlierDetectionLb::EjectionTimer::Orphan() { |
|
|
|
@ -784,6 +811,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimer(void* arg, |
|
|
|
|
|
|
|
|
|
void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { |
|
|
|
|
if (GRPC_ERROR_IS_NONE(error) && timer_pending_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running", |
|
|
|
|
parent_.get()); |
|
|
|
|
} |
|
|
|
|
std::map<SubchannelState*, double> success_rate_ejection_candidates; |
|
|
|
|
std::map<SubchannelState*, double> failure_percentage_ejection_candidates; |
|
|
|
|
size_t ejected_host_count = 0; |
|
|
|
@ -821,36 +852,71 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] found %" PRIuPTR |
|
|
|
|
" success rate candidates and %" PRIuPTR |
|
|
|
|
" failure percentage candidates; ejected_host_count=%" PRIuPTR |
|
|
|
|
"; success_rate_sum=%.3f", |
|
|
|
|
parent_.get(), success_rate_ejection_candidates.size(), |
|
|
|
|
failure_percentage_ejection_candidates.size(), ejected_host_count, |
|
|
|
|
success_rate_sum); |
|
|
|
|
} |
|
|
|
|
// success rate algorithm
|
|
|
|
|
if (!success_rate_ejection_candidates.empty() && |
|
|
|
|
success_rate_ejection_candidates.size() >= |
|
|
|
|
config.success_rate_ejection->minimum_hosts) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] running success rate algorithm", |
|
|
|
|
parent_.get()); |
|
|
|
|
} |
|
|
|
|
// calculate ejection threshold: (mean - stdev *
|
|
|
|
|
// (success_rate_ejection.stdev_factor / 1000))
|
|
|
|
|
double mean = success_rate_sum / success_rate_ejection_candidates.size(); |
|
|
|
|
double variance = 0; |
|
|
|
|
std::for_each(success_rate_ejection_candidates.begin(), |
|
|
|
|
success_rate_ejection_candidates.end(), |
|
|
|
|
[&variance, mean](std::pair<SubchannelState*, double> v) { |
|
|
|
|
variance += std::pow(v.second - mean, 2); |
|
|
|
|
}); |
|
|
|
|
for (const auto& p : success_rate_ejection_candidates) { |
|
|
|
|
variance += std::pow(p.second - mean, 2); |
|
|
|
|
} |
|
|
|
|
variance /= success_rate_ejection_candidates.size(); |
|
|
|
|
double stdev = std::sqrt(variance); |
|
|
|
|
const double success_rate_stdev_factor = |
|
|
|
|
static_cast<double>(config.success_rate_ejection->stdev_factor) / |
|
|
|
|
1000; |
|
|
|
|
double ejection_threshold = mean - stdev * success_rate_stdev_factor; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f", |
|
|
|
|
parent_.get(), stdev, ejection_threshold); |
|
|
|
|
} |
|
|
|
|
for (auto& candidate : success_rate_ejection_candidates) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] checking candidate %p: " |
|
|
|
|
"success_rate=%.3f", |
|
|
|
|
parent_.get(), candidate.first, candidate.second); |
|
|
|
|
} |
|
|
|
|
if (candidate.second < ejection_threshold) { |
|
|
|
|
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); |
|
|
|
|
double current_percent = 100.0 * ejected_host_count / |
|
|
|
|
parent_->subchannel_state_map_.size(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] random_key=%d " |
|
|
|
|
"ejected_host_count=%" PRIuPTR " current_percent=%.3f", |
|
|
|
|
parent_.get(), random_key, ejected_host_count, |
|
|
|
|
current_percent); |
|
|
|
|
} |
|
|
|
|
if (random_key < |
|
|
|
|
config.success_rate_ejection->enforcement_percentage && |
|
|
|
|
(ejected_host_count == 0 || |
|
|
|
|
(current_percent < config.max_ejection_percent))) { |
|
|
|
|
// Eject and record the timestamp for use when ejecting addresses in
|
|
|
|
|
// this iteration.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", |
|
|
|
|
parent_.get()); |
|
|
|
|
} |
|
|
|
|
candidate.first->Eject(time_now); |
|
|
|
|
++ejected_host_count; |
|
|
|
|
} |
|
|
|
@ -861,7 +927,19 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { |
|
|
|
|
if (!failure_percentage_ejection_candidates.empty() && |
|
|
|
|
failure_percentage_ejection_candidates.size() >= |
|
|
|
|
config.failure_percentage_ejection->minimum_hosts) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] running failure percentage algorithm", |
|
|
|
|
parent_.get()); |
|
|
|
|
} |
|
|
|
|
for (auto& candidate : failure_percentage_ejection_candidates) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] checking candidate %p: " |
|
|
|
|
"success_rate=%.3f", |
|
|
|
|
parent_.get(), candidate.first, candidate.second); |
|
|
|
|
} |
|
|
|
|
// Extra check to make sure success rate algorithm didn't already
|
|
|
|
|
// eject this backend.
|
|
|
|
|
if (candidate.first->ejection_time().has_value()) continue; |
|
|
|
@ -870,12 +948,23 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { |
|
|
|
|
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); |
|
|
|
|
double current_percent = 100.0 * ejected_host_count / |
|
|
|
|
parent_->subchannel_state_map_.size(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[outlier_detection_lb %p] random_key=%d " |
|
|
|
|
"ejected_host_count=%" PRIuPTR " current_percent=%.3f", |
|
|
|
|
parent_.get(), random_key, ejected_host_count, |
|
|
|
|
current_percent); |
|
|
|
|
} |
|
|
|
|
if (random_key < |
|
|
|
|
config.failure_percentage_ejection->enforcement_percentage && |
|
|
|
|
(ejected_host_count == 0 || |
|
|
|
|
(current_percent < config.max_ejection_percent))) { |
|
|
|
|
// Eject and record the timestamp for use when ejecting addresses in
|
|
|
|
|
// this iteration.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", |
|
|
|
|
parent_.get()); |
|
|
|
|
} |
|
|
|
|
candidate.first->Eject(time_now); |
|
|
|
|
++ejected_host_count; |
|
|
|
|
} |
|
|
|
@ -890,8 +979,14 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { |
|
|
|
|
// address.
|
|
|
|
|
for (auto& state : parent_->subchannel_state_map_) { |
|
|
|
|
auto* subchannel_state = state.second.get(); |
|
|
|
|
subchannel_state->MaybeUneject(config.base_ejection_time.millis(), |
|
|
|
|
config.max_ejection_time.millis()); |
|
|
|
|
const bool unejected = |
|
|
|
|
subchannel_state->MaybeUneject(config.base_ejection_time.millis(), |
|
|
|
|
config.max_ejection_time.millis()); |
|
|
|
|
if (unejected && |
|
|
|
|
GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)", |
|
|
|
|
parent_.get(), state.first.c_str(), subchannel_state); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
timer_pending_ = false; |
|
|
|
|
parent_->ejection_timer_ = |
|
|
|
|