diff --git a/src/core/BUILD b/src/core/BUILD index 51fa539a5ea..03afb1b5796 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4245,8 +4245,6 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "closure", - "error", "grpc_outlier_detection_header", "iomgr_fwd", "json", @@ -4259,11 +4257,11 @@ grpc_cc_library( "validation_errors", "//:config", "//:debug_location", + "//:exec_ctx", "//:gpr", "//:grpc_base", "//:grpc_client_channel", "//:grpc_trace", - "//:iomgr_timer", "//:orphanable", "//:ref_counted_ptr", "//:server_address", diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 01f888d7efa..06e5dd5276b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -52,11 +52,9 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" @@ -71,6 +69,8 @@ TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb"); namespace { +using ::grpc_event_engine::experimental::EventEngine; + constexpr absl::string_view kOutlierDetection = "outlier_detection_experimental"; @@ -352,13 +352,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy { Timestamp StartTime() const { return start_time_; } private: - static void OnTimer(void* arg, grpc_error_handle error); - void OnTimerLocked(grpc_error_handle); + void OnTimerLocked(); RefCountedPtr parent_; - grpc_timer timer_; - grpc_closure on_timer_; - bool timer_pending_ = true; + absl::optional timer_handle_; Timestamp start_time_; absl::BitGen bit_gen_; }; @@ -782,210 +779,201 @@ OutlierDetectionLb::EjectionTimer::EjectionTimer( gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s", parent_.get(), interval.ToString().c_str()); } - GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); - Ref().release(); - grpc_timer_init(&timer_, start_time_ + interval, &on_timer_); + timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter( + interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto self_ptr = self.get(); + self_ptr->parent_->work_serializer()->Run( + [self = std::move(self)]() { self->OnTimerLocked(); }, + DEBUG_LOCATION); + }); } void OutlierDetectionLb::EjectionTimer::Orphan() { - if (timer_pending_) { - timer_pending_ = false; - grpc_timer_cancel(&timer_); + if (timer_handle_.has_value()) { + parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_); + timer_handle_.reset(); } Unref(); } -void OutlierDetectionLb::EjectionTimer::OnTimer(void* arg, - grpc_error_handle error) { - auto* self = static_cast(arg); - self->parent_->work_serializer()->Run( - [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); -} - -void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { - if (error.ok() && timer_pending_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running", - parent_.get()); +void OutlierDetectionLb::EjectionTimer::OnTimerLocked() { + if (!timer_handle_.has_value()) return; + timer_handle_.reset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running", + parent_.get()); + } + std::map success_rate_ejection_candidates; + std::map failure_percentage_ejection_candidates; + size_t ejected_host_count = 0; + double success_rate_sum = 0; + auto time_now = Timestamp::Now(); + auto& config = parent_->config_->outlier_detection_config(); + for (auto& state : parent_->subchannel_state_map_) { + auto* subchannel_state = state.second.get(); + // For each address, swap the call counter's buckets in that address's + // map entry. + subchannel_state->RotateBucket(); + // Gather data to run success rate algorithm or failure percentage + // algorithm. + if (subchannel_state->ejection_time().has_value()) { + ++ejected_host_count; } - std::map success_rate_ejection_candidates; - std::map failure_percentage_ejection_candidates; - size_t ejected_host_count = 0; - double success_rate_sum = 0; - auto time_now = Timestamp::Now(); - auto& config = parent_->config_->outlier_detection_config(); - for (auto& state : parent_->subchannel_state_map_) { - auto* subchannel_state = state.second.get(); - // For each address, swap the call counter's buckets in that address's - // map entry. - subchannel_state->RotateBucket(); - // Gather data to run success rate algorithm or failure percentage - // algorithm. - if (subchannel_state->ejection_time().has_value()) { - ++ejected_host_count; - } - absl::optional> host_success_rate_and_volume = - subchannel_state->GetSuccessRateAndVolume(); - if (!host_success_rate_and_volume.has_value()) { - continue; - } - double success_rate = host_success_rate_and_volume->first; - uint64_t request_volume = host_success_rate_and_volume->second; - if (config.success_rate_ejection.has_value()) { - if (request_volume >= config.success_rate_ejection->request_volume) { - success_rate_ejection_candidates[subchannel_state] = success_rate; - success_rate_sum += success_rate; - } + absl::optional> host_success_rate_and_volume = + subchannel_state->GetSuccessRateAndVolume(); + if (!host_success_rate_and_volume.has_value()) { + continue; + } + double success_rate = host_success_rate_and_volume->first; + uint64_t request_volume = host_success_rate_and_volume->second; + if (config.success_rate_ejection.has_value()) { + if (request_volume >= config.success_rate_ejection->request_volume) { + success_rate_ejection_candidates[subchannel_state] = success_rate; + success_rate_sum += success_rate; } - if (config.failure_percentage_ejection.has_value()) { - if (request_volume >= - config.failure_percentage_ejection->request_volume) { - failure_percentage_ejection_candidates[subchannel_state] = - success_rate; - } + } + if (config.failure_percentage_ejection.has_value()) { + if (request_volume >= + config.failure_percentage_ejection->request_volume) { + failure_percentage_ejection_candidates[subchannel_state] = success_rate; } } + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, + "[outlier_detection_lb %p] found %" PRIuPTR + " success rate candidates and %" PRIuPTR + " failure percentage candidates; ejected_host_count=%" PRIuPTR + "; success_rate_sum=%.3f", + parent_.get(), success_rate_ejection_candidates.size(), + failure_percentage_ejection_candidates.size(), ejected_host_count, + success_rate_sum); + } + // success rate algorithm + if (!success_rate_ejection_candidates.empty() && + success_rate_ejection_candidates.size() >= + config.success_rate_ejection->minimum_hosts) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, + "[outlier_detection_lb %p] running success rate algorithm", + parent_.get()); + } + // calculate ejection threshold: (mean - stdev * + // (success_rate_ejection.stdev_factor / 1000)) + double mean = success_rate_sum / success_rate_ejection_candidates.size(); + double variance = 0; + for (const auto& p : success_rate_ejection_candidates) { + variance += std::pow(p.second - mean, 2); + } + variance /= success_rate_ejection_candidates.size(); + double stdev = std::sqrt(variance); + const double success_rate_stdev_factor = + static_cast(config.success_rate_ejection->stdev_factor) / 1000; + double ejection_threshold = mean - stdev * success_rate_stdev_factor; if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, - "[outlier_detection_lb %p] found %" PRIuPTR - " success rate candidates and %" PRIuPTR - " failure percentage candidates; ejected_host_count=%" PRIuPTR - "; success_rate_sum=%.3f", - parent_.get(), success_rate_ejection_candidates.size(), - failure_percentage_ejection_candidates.size(), ejected_host_count, - success_rate_sum); + "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f", + parent_.get(), stdev, ejection_threshold); } - // success rate algorithm - if (!success_rate_ejection_candidates.empty() && - success_rate_ejection_candidates.size() >= - config.success_rate_ejection->minimum_hosts) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, - "[outlier_detection_lb %p] running success rate algorithm", - parent_.get()); - } - // calculate ejection threshold: (mean - stdev * - // (success_rate_ejection.stdev_factor / 1000)) - double mean = success_rate_sum / success_rate_ejection_candidates.size(); - double variance = 0; - for (const auto& p : success_rate_ejection_candidates) { - variance += std::pow(p.second - mean, 2); - } - variance /= success_rate_ejection_candidates.size(); - double stdev = std::sqrt(variance); - const double success_rate_stdev_factor = - static_cast(config.success_rate_ejection->stdev_factor) / - 1000; - double ejection_threshold = mean - stdev * success_rate_stdev_factor; + for (auto& candidate : success_rate_ejection_candidates) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, - "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f", - parent_.get(), stdev, ejection_threshold); + "[outlier_detection_lb %p] checking candidate %p: " + "success_rate=%.3f", + parent_.get(), candidate.first, candidate.second); } - for (auto& candidate : success_rate_ejection_candidates) { + if (candidate.second < ejection_threshold) { + uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); + double current_percent = + 100.0 * ejected_host_count / parent_->subchannel_state_map_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, - "[outlier_detection_lb %p] checking candidate %p: " - "success_rate=%.3f", - parent_.get(), candidate.first, candidate.second); + "[outlier_detection_lb %p] random_key=%d " + "ejected_host_count=%" PRIuPTR " current_percent=%.3f", + parent_.get(), random_key, ejected_host_count, + current_percent); } - if (candidate.second < ejection_threshold) { - uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); - double current_percent = 100.0 * ejected_host_count / - parent_->subchannel_state_map_.size(); + if (random_key < config.success_rate_ejection->enforcement_percentage && + (ejected_host_count == 0 || + (current_percent < config.max_ejection_percent))) { + // Eject and record the timestamp for use when ejecting addresses in + // this iteration. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, - "[outlier_detection_lb %p] random_key=%d " - "ejected_host_count=%" PRIuPTR " current_percent=%.3f", - parent_.get(), random_key, ejected_host_count, - current_percent); - } - if (random_key < - config.success_rate_ejection->enforcement_percentage && - (ejected_host_count == 0 || - (current_percent < config.max_ejection_percent))) { - // Eject and record the timestamp for use when ejecting addresses in - // this iteration. - if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", - parent_.get()); - } - candidate.first->Eject(time_now); - ++ejected_host_count; + gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", + parent_.get()); } + candidate.first->Eject(time_now); + ++ejected_host_count; } } } - // failure percentage algorithm - if (!failure_percentage_ejection_candidates.empty() && - failure_percentage_ejection_candidates.size() >= - config.failure_percentage_ejection->minimum_hosts) { + } + // failure percentage algorithm + if (!failure_percentage_ejection_candidates.empty() && + failure_percentage_ejection_candidates.size() >= + config.failure_percentage_ejection->minimum_hosts) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, + "[outlier_detection_lb %p] running failure percentage algorithm", + parent_.get()); + } + for (auto& candidate : failure_percentage_ejection_candidates) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log( - GPR_INFO, - "[outlier_detection_lb %p] running failure percentage algorithm", - parent_.get()); + gpr_log(GPR_INFO, + "[outlier_detection_lb %p] checking candidate %p: " + "success_rate=%.3f", + parent_.get(), candidate.first, candidate.second); } - for (auto& candidate : failure_percentage_ejection_candidates) { + // Extra check to make sure success rate algorithm didn't already + // eject this backend. + if (candidate.first->ejection_time().has_value()) continue; + if ((100.0 - candidate.second) > + config.failure_percentage_ejection->threshold) { + uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); + double current_percent = + 100.0 * ejected_host_count / parent_->subchannel_state_map_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, - "[outlier_detection_lb %p] checking candidate %p: " - "success_rate=%.3f", - parent_.get(), candidate.first, candidate.second); + "[outlier_detection_lb %p] random_key=%d " + "ejected_host_count=%" PRIuPTR " current_percent=%.3f", + parent_.get(), random_key, ejected_host_count, + current_percent); } - // Extra check to make sure success rate algorithm didn't already - // eject this backend. - if (candidate.first->ejection_time().has_value()) continue; - if ((100.0 - candidate.second) > - config.failure_percentage_ejection->threshold) { - uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); - double current_percent = 100.0 * ejected_host_count / - parent_->subchannel_state_map_.size(); + if (random_key < + config.failure_percentage_ejection->enforcement_percentage && + (ejected_host_count == 0 || + (current_percent < config.max_ejection_percent))) { + // Eject and record the timestamp for use when ejecting addresses in + // this iteration. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, - "[outlier_detection_lb %p] random_key=%d " - "ejected_host_count=%" PRIuPTR " current_percent=%.3f", - parent_.get(), random_key, ejected_host_count, - current_percent); - } - if (random_key < - config.failure_percentage_ejection->enforcement_percentage && - (ejected_host_count == 0 || - (current_percent < config.max_ejection_percent))) { - // Eject and record the timestamp for use when ejecting addresses in - // this iteration. - if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", - parent_.get()); - } - candidate.first->Eject(time_now); - ++ejected_host_count; + gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", + parent_.get()); } + candidate.first->Eject(time_now); + ++ejected_host_count; } } } - // For each address in the map: - // If the address is not ejected and the multiplier is greater than 0, - // decrease the multiplier by 1. If the address is ejected, and the - // current time is after ejection_timestamp + min(base_ejection_time * - // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the - // address. - for (auto& state : parent_->subchannel_state_map_) { - auto* subchannel_state = state.second.get(); - const bool unejected = - subchannel_state->MaybeUneject(config.base_ejection_time.millis(), - config.max_ejection_time.millis()); - if (unejected && - GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { - gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)", - parent_.get(), state.first.c_str(), subchannel_state); - } + } + // For each address in the map: + // If the address is not ejected and the multiplier is greater than 0, + // decrease the multiplier by 1. If the address is ejected, and the + // current time is after ejection_timestamp + min(base_ejection_time * + // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the + // address. + for (auto& state : parent_->subchannel_state_map_) { + auto* subchannel_state = state.second.get(); + const bool unejected = subchannel_state->MaybeUneject( + config.base_ejection_time.millis(), config.max_ejection_time.millis()); + if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { + gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)", + parent_.get(), state.first.c_str(), subchannel_state); } - timer_pending_ = false; - parent_->ejection_timer_ = - MakeOrphanable(parent_, Timestamp::Now()); } - Unref(DEBUG_LOCATION, "Timer"); + parent_->ejection_timer_ = + MakeOrphanable(parent_, Timestamp::Now()); } //