EventEngine::RunAfter: OutlierDetection LB Policy (#30040)

* EventEngine::RunAfter: OutlierDetection LB Policy

* iwyu, clang format, fix_auto_deps

* fix TSAN: EjectionTimer needs no cleanup on cancellation

* redo

* exec_ctx and fix use after move

* handle orphaning with an unset timer handle

* Automated change: Fix sanity tests

* reviewer feedback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31826/head
AJ Heller 2 years ago committed by GitHub
parent 322e85253e
commit 7eb99baad8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/BUILD
  2. 338
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc

@ -4245,8 +4245,6 @@ grpc_cc_library(
language = "c++", language = "c++",
deps = [ deps = [
"channel_args", "channel_args",
"closure",
"error",
"grpc_outlier_detection_header", "grpc_outlier_detection_header",
"iomgr_fwd", "iomgr_fwd",
"json", "json",
@ -4259,11 +4257,11 @@ grpc_cc_library(
"validation_errors", "validation_errors",
"//:config", "//:config",
"//:debug_location", "//:debug_location",
"//:exec_ctx",
"//:gpr", "//:gpr",
"//:grpc_base", "//:grpc_base",
"//:grpc_client_channel", "//:grpc_client_channel",
"//:grpc_trace", "//:grpc_trace",
"//:iomgr_timer",
"//:orphanable", "//:orphanable",
"//:ref_counted_ptr", "//:ref_counted_ptr",
"//:server_address", "//:server_address",

@ -52,11 +52,9 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h" #include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h" #include "src/core/lib/load_balancing/lb_policy_factory.h"
@ -71,6 +69,8 @@ TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb");
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine;
constexpr absl::string_view kOutlierDetection = constexpr absl::string_view kOutlierDetection =
"outlier_detection_experimental"; "outlier_detection_experimental";
@ -352,13 +352,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
Timestamp StartTime() const { return start_time_; } Timestamp StartTime() const { return start_time_; }
private: private:
static void OnTimer(void* arg, grpc_error_handle error); void OnTimerLocked();
void OnTimerLocked(grpc_error_handle);
RefCountedPtr<OutlierDetectionLb> parent_; RefCountedPtr<OutlierDetectionLb> parent_;
grpc_timer timer_; absl::optional<EventEngine::TaskHandle> timer_handle_;
grpc_closure on_timer_;
bool timer_pending_ = true;
Timestamp start_time_; Timestamp start_time_;
absl::BitGen bit_gen_; absl::BitGen bit_gen_;
}; };
@ -782,210 +779,201 @@ OutlierDetectionLb::EjectionTimer::EjectionTimer(
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s", gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s",
parent_.get(), interval.ToString().c_str()); parent_.get(), interval.ToString().c_str());
} }
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
Ref().release(); interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
grpc_timer_init(&timer_, start_time_ + interval, &on_timer_); ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->parent_->work_serializer()->Run(
[self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
} }
void OutlierDetectionLb::EjectionTimer::Orphan() { void OutlierDetectionLb::EjectionTimer::Orphan() {
if (timer_pending_) { if (timer_handle_.has_value()) {
timer_pending_ = false; parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
grpc_timer_cancel(&timer_); timer_handle_.reset();
} }
Unref(); Unref();
} }
void OutlierDetectionLb::EjectionTimer::OnTimer(void* arg, void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
grpc_error_handle error) { if (!timer_handle_.has_value()) return;
auto* self = static_cast<EjectionTimer*>(arg); timer_handle_.reset();
self->parent_->work_serializer()->Run( if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
} parent_.get());
}
void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { std::map<SubchannelState*, double> success_rate_ejection_candidates;
if (error.ok() && timer_pending_) { std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { size_t ejected_host_count = 0;
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running", double success_rate_sum = 0;
parent_.get()); auto time_now = Timestamp::Now();
auto& config = parent_->config_->outlier_detection_config();
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
// For each address, swap the call counter's buckets in that address's
// map entry.
subchannel_state->RotateBucket();
// Gather data to run success rate algorithm or failure percentage
// algorithm.
if (subchannel_state->ejection_time().has_value()) {
++ejected_host_count;
} }
std::map<SubchannelState*, double> success_rate_ejection_candidates; absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
std::map<SubchannelState*, double> failure_percentage_ejection_candidates; subchannel_state->GetSuccessRateAndVolume();
size_t ejected_host_count = 0; if (!host_success_rate_and_volume.has_value()) {
double success_rate_sum = 0; continue;
auto time_now = Timestamp::Now(); }
auto& config = parent_->config_->outlier_detection_config(); double success_rate = host_success_rate_and_volume->first;
for (auto& state : parent_->subchannel_state_map_) { uint64_t request_volume = host_success_rate_and_volume->second;
auto* subchannel_state = state.second.get(); if (config.success_rate_ejection.has_value()) {
// For each address, swap the call counter's buckets in that address's if (request_volume >= config.success_rate_ejection->request_volume) {
// map entry. success_rate_ejection_candidates[subchannel_state] = success_rate;
subchannel_state->RotateBucket(); success_rate_sum += success_rate;
// Gather data to run success rate algorithm or failure percentage
// algorithm.
if (subchannel_state->ejection_time().has_value()) {
++ejected_host_count;
}
absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
subchannel_state->GetSuccessRateAndVolume();
if (!host_success_rate_and_volume.has_value()) {
continue;
}
double success_rate = host_success_rate_and_volume->first;
uint64_t request_volume = host_success_rate_and_volume->second;
if (config.success_rate_ejection.has_value()) {
if (request_volume >= config.success_rate_ejection->request_volume) {
success_rate_ejection_candidates[subchannel_state] = success_rate;
success_rate_sum += success_rate;
}
} }
if (config.failure_percentage_ejection.has_value()) { }
if (request_volume >= if (config.failure_percentage_ejection.has_value()) {
config.failure_percentage_ejection->request_volume) { if (request_volume >=
failure_percentage_ejection_candidates[subchannel_state] = config.failure_percentage_ejection->request_volume) {
success_rate; failure_percentage_ejection_candidates[subchannel_state] = success_rate;
}
} }
} }
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] found %" PRIuPTR
" success rate candidates and %" PRIuPTR
" failure percentage candidates; ejected_host_count=%" PRIuPTR
"; success_rate_sum=%.3f",
parent_.get(), success_rate_ejection_candidates.size(),
failure_percentage_ejection_candidates.size(), ejected_host_count,
success_rate_sum);
}
// success rate algorithm
if (!success_rate_ejection_candidates.empty() &&
success_rate_ejection_candidates.size() >=
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
double mean = success_rate_sum / success_rate_ejection_candidates.size();
double variance = 0;
for (const auto& p : success_rate_ejection_candidates) {
variance += std::pow(p.second - mean, 2);
}
variance /= success_rate_ejection_candidates.size();
double stdev = std::sqrt(variance);
const double success_rate_stdev_factor =
static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
double ejection_threshold = mean - stdev * success_rate_stdev_factor;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[outlier_detection_lb %p] found %" PRIuPTR "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
" success rate candidates and %" PRIuPTR parent_.get(), stdev, ejection_threshold);
" failure percentage candidates; ejected_host_count=%" PRIuPTR
"; success_rate_sum=%.3f",
parent_.get(), success_rate_ejection_candidates.size(),
failure_percentage_ejection_candidates.size(), ejected_host_count,
success_rate_sum);
} }
// success rate algorithm for (auto& candidate : success_rate_ejection_candidates) {
if (!success_rate_ejection_candidates.empty() &&
success_rate_ejection_candidates.size() >=
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
double mean = success_rate_sum / success_rate_ejection_candidates.size();
double variance = 0;
for (const auto& p : success_rate_ejection_candidates) {
variance += std::pow(p.second - mean, 2);
}
variance /= success_rate_ejection_candidates.size();
double stdev = std::sqrt(variance);
const double success_rate_stdev_factor =
static_cast<double>(config.success_rate_ejection->stdev_factor) /
1000;
double ejection_threshold = mean - stdev * success_rate_stdev_factor;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f", "[outlier_detection_lb %p] checking candidate %p: "
parent_.get(), stdev, ejection_threshold); "success_rate=%.3f",
parent_.get(), candidate.first, candidate.second);
} }
for (auto& candidate : success_rate_ejection_candidates) { if (candidate.second < ejection_threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[outlier_detection_lb %p] checking candidate %p: " "[outlier_detection_lb %p] random_key=%d "
"success_rate=%.3f", "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), candidate.first, candidate.second); parent_.get(), random_key, ejected_host_count,
current_percent);
} }
if (candidate.second < ejection_threshold) { if (random_key < config.success_rate_ejection->enforcement_percentage &&
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); (ejected_host_count == 0 ||
double current_percent = 100.0 * ejected_host_count / (current_percent < config.max_ejection_percent))) {
parent_->subchannel_state_map_.size(); // Eject and record the timestamp for use when ejecting addresses in
// this iteration.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
"[outlier_detection_lb %p] random_key=%d " parent_.get());
"ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), random_key, ejected_host_count,
current_percent);
}
if (random_key <
config.success_rate_ejection->enforcement_percentage &&
(ejected_host_count == 0 ||
(current_percent < config.max_ejection_percent))) {
// Eject and record the timestamp for use when ejecting addresses in
// this iteration.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
parent_.get());
}
candidate.first->Eject(time_now);
++ejected_host_count;
} }
candidate.first->Eject(time_now);
++ejected_host_count;
} }
} }
} }
// failure percentage algorithm }
if (!failure_percentage_ejection_candidates.empty() && // failure percentage algorithm
failure_percentage_ejection_candidates.size() >= if (!failure_percentage_ejection_candidates.empty() &&
config.failure_percentage_ejection->minimum_hosts) { failure_percentage_ejection_candidates.size() >=
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm",
parent_.get());
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log( gpr_log(GPR_INFO,
GPR_INFO, "[outlier_detection_lb %p] checking candidate %p: "
"[outlier_detection_lb %p] running failure percentage algorithm", "success_rate=%.3f",
parent_.get()); parent_.get(), candidate.first, candidate.second);
} }
for (auto& candidate : failure_percentage_ejection_candidates) { // Extra check to make sure success rate algorithm didn't already
// eject this backend.
if (candidate.first->ejection_time().has_value()) continue;
if ((100.0 - candidate.second) >
config.failure_percentage_ejection->threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[outlier_detection_lb %p] checking candidate %p: " "[outlier_detection_lb %p] random_key=%d "
"success_rate=%.3f", "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), candidate.first, candidate.second); parent_.get(), random_key, ejected_host_count,
current_percent);
} }
// Extra check to make sure success rate algorithm didn't already if (random_key <
// eject this backend. config.failure_percentage_ejection->enforcement_percentage &&
if (candidate.first->ejection_time().has_value()) continue; (ejected_host_count == 0 ||
if ((100.0 - candidate.second) > (current_percent < config.max_ejection_percent))) {
config.failure_percentage_ejection->threshold) { // Eject and record the timestamp for use when ejecting addresses in
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); // this iteration.
double current_percent = 100.0 * ejected_host_count /
parent_->subchannel_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
"[outlier_detection_lb %p] random_key=%d " parent_.get());
"ejected_host_count=%" PRIuPTR " current_percent=%.3f",
parent_.get(), random_key, ejected_host_count,
current_percent);
}
if (random_key <
config.failure_percentage_ejection->enforcement_percentage &&
(ejected_host_count == 0 ||
(current_percent < config.max_ejection_percent))) {
// Eject and record the timestamp for use when ejecting addresses in
// this iteration.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
parent_.get());
}
candidate.first->Eject(time_now);
++ejected_host_count;
} }
candidate.first->Eject(time_now);
++ejected_host_count;
} }
} }
} }
// For each address in the map: }
// If the address is not ejected and the multiplier is greater than 0, // For each address in the map:
// decrease the multiplier by 1. If the address is ejected, and the // If the address is not ejected and the multiplier is greater than 0,
// current time is after ejection_timestamp + min(base_ejection_time * // decrease the multiplier by 1. If the address is ejected, and the
// multiplier, max(base_ejection_time, max_ejection_time)), un-eject the // current time is after ejection_timestamp + min(base_ejection_time *
// address. // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
for (auto& state : parent_->subchannel_state_map_) { // address.
auto* subchannel_state = state.second.get(); for (auto& state : parent_->subchannel_state_map_) {
const bool unejected = auto* subchannel_state = state.second.get();
subchannel_state->MaybeUneject(config.base_ejection_time.millis(), const bool unejected = subchannel_state->MaybeUneject(
config.max_ejection_time.millis()); config.base_ejection_time.millis(), config.max_ejection_time.millis());
if (unejected && if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)", parent_.get(), state.first.c_str(), subchannel_state);
parent_.get(), state.first.c_str(), subchannel_state);
}
} }
timer_pending_ = false;
parent_->ejection_timer_ =
MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
} }
Unref(DEBUG_LOCATION, "Timer"); parent_->ejection_timer_ =
MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
} }
// //

Loading…
Cancel
Save