From 7eb99baad858625699071d18f636dff268aa9b45 Mon Sep 17 00:00:00 2001
From: AJ Heller <hork@google.com>
Date: Wed, 7 Dec 2022 18:40:27 -0800
Subject: [PATCH] EventEngine::RunAfter: OutlierDetection LB Policy (#30040)

* EventEngine::RunAfter: OutlierDetection LB Policy

* iwyu, clang format, fix_auto_deps

* fix TSAN: EjectionTimer needs no cleanup on cancellation

* redo

* exec_ctx and fix use after move

* handle orphaning with an unset timer handle

* Automated change: Fix sanity tests

* reviewer feedback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
---
 src/core/BUILD                                |   4 +-
 .../outlier_detection/outlier_detection.cc    | 338 +++++++++---------
 2 files changed, 164 insertions(+), 178 deletions(-)

diff --git a/src/core/BUILD b/src/core/BUILD
index 51fa539a5ea..03afb1b5796 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -4245,8 +4245,6 @@ grpc_cc_library(
     language = "c++",
     deps = [
         "channel_args",
-        "closure",
-        "error",
         "grpc_outlier_detection_header",
         "iomgr_fwd",
         "json",
@@ -4259,11 +4257,11 @@ grpc_cc_library(
         "validation_errors",
         "//:config",
         "//:debug_location",
+        "//:exec_ctx",
         "//:gpr",
         "//:grpc_base",
         "//:grpc_client_channel",
         "//:grpc_trace",
-        "//:iomgr_timer",
         "//:orphanable",
         "//:ref_counted_ptr",
         "//:server_address",
diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
index 01f888d7efa..06e5dd5276b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
@@ -52,11 +52,9 @@
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/gprpp/validation_errors.h"
 #include "src/core/lib/gprpp/work_serializer.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
 #include "src/core/lib/iomgr/iomgr_fwd.h"
 #include "src/core/lib/iomgr/pollset_set.h"
-#include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/json/json.h"
 #include "src/core/lib/load_balancing/lb_policy.h"
 #include "src/core/lib/load_balancing/lb_policy_factory.h"
@@ -71,6 +69,8 @@ TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb");
 
 namespace {
 
+using ::grpc_event_engine::experimental::EventEngine;
+
 constexpr absl::string_view kOutlierDetection =
     "outlier_detection_experimental";
 
@@ -352,13 +352,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
     Timestamp StartTime() const { return start_time_; }
 
    private:
-    static void OnTimer(void* arg, grpc_error_handle error);
-    void OnTimerLocked(grpc_error_handle);
+    void OnTimerLocked();
 
     RefCountedPtr<OutlierDetectionLb> parent_;
-    grpc_timer timer_;
-    grpc_closure on_timer_;
-    bool timer_pending_ = true;
+    absl::optional<EventEngine::TaskHandle> timer_handle_;
     Timestamp start_time_;
     absl::BitGen bit_gen_;
   };
@@ -782,210 +779,201 @@ OutlierDetectionLb::EjectionTimer::EjectionTimer(
     gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s",
             parent_.get(), interval.ToString().c_str());
   }
-  GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
-  Ref().release();
-  grpc_timer_init(&timer_, start_time_ + interval, &on_timer_);
+  timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
+      interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
+        ApplicationCallbackExecCtx callback_exec_ctx;
+        ExecCtx exec_ctx;
+        auto self_ptr = self.get();
+        self_ptr->parent_->work_serializer()->Run(
+            [self = std::move(self)]() { self->OnTimerLocked(); },
+            DEBUG_LOCATION);
+      });
 }
 
 void OutlierDetectionLb::EjectionTimer::Orphan() {
-  if (timer_pending_) {
-    timer_pending_ = false;
-    grpc_timer_cancel(&timer_);
+  if (timer_handle_.has_value()) {
+    parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
+    timer_handle_.reset();
   }
   Unref();
 }
 
-void OutlierDetectionLb::EjectionTimer::OnTimer(void* arg,
-                                                grpc_error_handle error) {
-  auto* self = static_cast<EjectionTimer*>(arg);
-  self->parent_->work_serializer()->Run(
-      [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
-}
-
-void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) {
-  if (error.ok() && timer_pending_) {
-    if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-      gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
-              parent_.get());
+void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
+  if (!timer_handle_.has_value()) return;
+  timer_handle_.reset();
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+    gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
+            parent_.get());
+  }
+  std::map<SubchannelState*, double> success_rate_ejection_candidates;
+  std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
+  size_t ejected_host_count = 0;
+  double success_rate_sum = 0;
+  auto time_now = Timestamp::Now();
+  auto& config = parent_->config_->outlier_detection_config();
+  for (auto& state : parent_->subchannel_state_map_) {
+    auto* subchannel_state = state.second.get();
+    // For each address, swap the call counter's buckets in that address's
+    // map entry.
+    subchannel_state->RotateBucket();
+    // Gather data to run success rate algorithm or failure percentage
+    // algorithm.
+    if (subchannel_state->ejection_time().has_value()) {
+      ++ejected_host_count;
     }
-    std::map<SubchannelState*, double> success_rate_ejection_candidates;
-    std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
-    size_t ejected_host_count = 0;
-    double success_rate_sum = 0;
-    auto time_now = Timestamp::Now();
-    auto& config = parent_->config_->outlier_detection_config();
-    for (auto& state : parent_->subchannel_state_map_) {
-      auto* subchannel_state = state.second.get();
-      // For each address, swap the call counter's buckets in that address's
-      // map entry.
-      subchannel_state->RotateBucket();
-      // Gather data to run success rate algorithm or failure percentage
-      // algorithm.
-      if (subchannel_state->ejection_time().has_value()) {
-        ++ejected_host_count;
-      }
-      absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
-          subchannel_state->GetSuccessRateAndVolume();
-      if (!host_success_rate_and_volume.has_value()) {
-        continue;
-      }
-      double success_rate = host_success_rate_and_volume->first;
-      uint64_t request_volume = host_success_rate_and_volume->second;
-      if (config.success_rate_ejection.has_value()) {
-        if (request_volume >= config.success_rate_ejection->request_volume) {
-          success_rate_ejection_candidates[subchannel_state] = success_rate;
-          success_rate_sum += success_rate;
-        }
+    absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
+        subchannel_state->GetSuccessRateAndVolume();
+    if (!host_success_rate_and_volume.has_value()) {
+      continue;
+    }
+    double success_rate = host_success_rate_and_volume->first;
+    uint64_t request_volume = host_success_rate_and_volume->second;
+    if (config.success_rate_ejection.has_value()) {
+      if (request_volume >= config.success_rate_ejection->request_volume) {
+        success_rate_ejection_candidates[subchannel_state] = success_rate;
+        success_rate_sum += success_rate;
       }
-      if (config.failure_percentage_ejection.has_value()) {
-        if (request_volume >=
-            config.failure_percentage_ejection->request_volume) {
-          failure_percentage_ejection_candidates[subchannel_state] =
-              success_rate;
-        }
+    }
+    if (config.failure_percentage_ejection.has_value()) {
+      if (request_volume >=
+          config.failure_percentage_ejection->request_volume) {
+        failure_percentage_ejection_candidates[subchannel_state] = success_rate;
       }
     }
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+    gpr_log(GPR_INFO,
+            "[outlier_detection_lb %p] found %" PRIuPTR
+            " success rate candidates and %" PRIuPTR
+            " failure percentage candidates; ejected_host_count=%" PRIuPTR
+            "; success_rate_sum=%.3f",
+            parent_.get(), success_rate_ejection_candidates.size(),
+            failure_percentage_ejection_candidates.size(), ejected_host_count,
+            success_rate_sum);
+  }
+  // success rate algorithm
+  if (!success_rate_ejection_candidates.empty() &&
+      success_rate_ejection_candidates.size() >=
+          config.success_rate_ejection->minimum_hosts) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+      gpr_log(GPR_INFO,
+              "[outlier_detection_lb %p] running success rate algorithm",
+              parent_.get());
+    }
+    // calculate ejection threshold: (mean - stdev *
+    // (success_rate_ejection.stdev_factor / 1000))
+    double mean = success_rate_sum / success_rate_ejection_candidates.size();
+    double variance = 0;
+    for (const auto& p : success_rate_ejection_candidates) {
+      variance += std::pow(p.second - mean, 2);
+    }
+    variance /= success_rate_ejection_candidates.size();
+    double stdev = std::sqrt(variance);
+    const double success_rate_stdev_factor =
+        static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
+    double ejection_threshold = mean - stdev * success_rate_stdev_factor;
     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
       gpr_log(GPR_INFO,
-              "[outlier_detection_lb %p] found %" PRIuPTR
-              " success rate candidates and %" PRIuPTR
-              " failure percentage candidates; ejected_host_count=%" PRIuPTR
-              "; success_rate_sum=%.3f",
-              parent_.get(), success_rate_ejection_candidates.size(),
-              failure_percentage_ejection_candidates.size(), ejected_host_count,
-              success_rate_sum);
+              "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
+              parent_.get(), stdev, ejection_threshold);
     }
-    // success rate algorithm
-    if (!success_rate_ejection_candidates.empty() &&
-        success_rate_ejection_candidates.size() >=
-            config.success_rate_ejection->minimum_hosts) {
-      if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-        gpr_log(GPR_INFO,
-                "[outlier_detection_lb %p] running success rate algorithm",
-                parent_.get());
-      }
-      // calculate ejection threshold: (mean - stdev *
-      // (success_rate_ejection.stdev_factor / 1000))
-      double mean = success_rate_sum / success_rate_ejection_candidates.size();
-      double variance = 0;
-      for (const auto& p : success_rate_ejection_candidates) {
-        variance += std::pow(p.second - mean, 2);
-      }
-      variance /= success_rate_ejection_candidates.size();
-      double stdev = std::sqrt(variance);
-      const double success_rate_stdev_factor =
-          static_cast<double>(config.success_rate_ejection->stdev_factor) /
-          1000;
-      double ejection_threshold = mean - stdev * success_rate_stdev_factor;
+    for (auto& candidate : success_rate_ejection_candidates) {
       if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
         gpr_log(GPR_INFO,
-                "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
-                parent_.get(), stdev, ejection_threshold);
+                "[outlier_detection_lb %p] checking candidate %p: "
+                "success_rate=%.3f",
+                parent_.get(), candidate.first, candidate.second);
       }
-      for (auto& candidate : success_rate_ejection_candidates) {
+      if (candidate.second < ejection_threshold) {
+        uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
+        double current_percent =
+            100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
           gpr_log(GPR_INFO,
-                  "[outlier_detection_lb %p] checking candidate %p: "
-                  "success_rate=%.3f",
-                  parent_.get(), candidate.first, candidate.second);
+                  "[outlier_detection_lb %p] random_key=%d "
+                  "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
+                  parent_.get(), random_key, ejected_host_count,
+                  current_percent);
         }
-        if (candidate.second < ejection_threshold) {
-          uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
-          double current_percent = 100.0 * ejected_host_count /
-                                   parent_->subchannel_state_map_.size();
+        if (random_key < config.success_rate_ejection->enforcement_percentage &&
+            (ejected_host_count == 0 ||
+             (current_percent < config.max_ejection_percent))) {
+          // Eject and record the timestamp for use when ejecting addresses in
+          // this iteration.
           if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-            gpr_log(GPR_INFO,
-                    "[outlier_detection_lb %p] random_key=%d "
-                    "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
-                    parent_.get(), random_key, ejected_host_count,
-                    current_percent);
-          }
-          if (random_key <
-                  config.success_rate_ejection->enforcement_percentage &&
-              (ejected_host_count == 0 ||
-               (current_percent < config.max_ejection_percent))) {
-            // Eject and record the timestamp for use when ejecting addresses in
-            // this iteration.
-            if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-              gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
-                      parent_.get());
-            }
-            candidate.first->Eject(time_now);
-            ++ejected_host_count;
+            gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
+                    parent_.get());
           }
+          candidate.first->Eject(time_now);
+          ++ejected_host_count;
         }
       }
     }
-    // failure percentage algorithm
-    if (!failure_percentage_ejection_candidates.empty() &&
-        failure_percentage_ejection_candidates.size() >=
-            config.failure_percentage_ejection->minimum_hosts) {
+  }
+  // failure percentage algorithm
+  if (!failure_percentage_ejection_candidates.empty() &&
+      failure_percentage_ejection_candidates.size() >=
+          config.failure_percentage_ejection->minimum_hosts) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+      gpr_log(GPR_INFO,
+              "[outlier_detection_lb %p] running failure percentage algorithm",
+              parent_.get());
+    }
+    for (auto& candidate : failure_percentage_ejection_candidates) {
       if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-        gpr_log(
-            GPR_INFO,
-            "[outlier_detection_lb %p] running failure percentage algorithm",
-            parent_.get());
+        gpr_log(GPR_INFO,
+                "[outlier_detection_lb %p] checking candidate %p: "
+                "success_rate=%.3f",
+                parent_.get(), candidate.first, candidate.second);
       }
-      for (auto& candidate : failure_percentage_ejection_candidates) {
+      // Extra check to make sure success rate algorithm didn't already
+      // eject this backend.
+      if (candidate.first->ejection_time().has_value()) continue;
+      if ((100.0 - candidate.second) >
+          config.failure_percentage_ejection->threshold) {
+        uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
+        double current_percent =
+            100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
           gpr_log(GPR_INFO,
-                  "[outlier_detection_lb %p] checking candidate %p: "
-                  "success_rate=%.3f",
-                  parent_.get(), candidate.first, candidate.second);
+                  "[outlier_detection_lb %p] random_key=%d "
+                  "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
+                  parent_.get(), random_key, ejected_host_count,
+                  current_percent);
         }
-        // Extra check to make sure success rate algorithm didn't already
-        // eject this backend.
-        if (candidate.first->ejection_time().has_value()) continue;
-        if ((100.0 - candidate.second) >
-            config.failure_percentage_ejection->threshold) {
-          uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
-          double current_percent = 100.0 * ejected_host_count /
-                                   parent_->subchannel_state_map_.size();
+        if (random_key <
+                config.failure_percentage_ejection->enforcement_percentage &&
+            (ejected_host_count == 0 ||
+             (current_percent < config.max_ejection_percent))) {
+          // Eject and record the timestamp for use when ejecting addresses in
+          // this iteration.
           if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-            gpr_log(GPR_INFO,
-                    "[outlier_detection_lb %p] random_key=%d "
-                    "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
-                    parent_.get(), random_key, ejected_host_count,
-                    current_percent);
-          }
-          if (random_key <
-                  config.failure_percentage_ejection->enforcement_percentage &&
-              (ejected_host_count == 0 ||
-               (current_percent < config.max_ejection_percent))) {
-            // Eject and record the timestamp for use when ejecting addresses in
-            // this iteration.
-            if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-              gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
-                      parent_.get());
-            }
-            candidate.first->Eject(time_now);
-            ++ejected_host_count;
+            gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
+                    parent_.get());
           }
+          candidate.first->Eject(time_now);
+          ++ejected_host_count;
         }
       }
     }
-    // For each address in the map:
-    //   If the address is not ejected and the multiplier is greater than 0,
-    //   decrease the multiplier by 1. If the address is ejected, and the
-    //   current time is after ejection_timestamp + min(base_ejection_time *
-    //   multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
-    //   address.
-    for (auto& state : parent_->subchannel_state_map_) {
-      auto* subchannel_state = state.second.get();
-      const bool unejected =
-          subchannel_state->MaybeUneject(config.base_ejection_time.millis(),
-                                         config.max_ejection_time.millis());
-      if (unejected &&
-          GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
-        gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
-                parent_.get(), state.first.c_str(), subchannel_state);
-      }
+  }
+  // For each address in the map:
+  //   If the address is not ejected and the multiplier is greater than 0,
+  //   decrease the multiplier by 1. If the address is ejected, and the
+  //   current time is after ejection_timestamp + min(base_ejection_time *
+  //   multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
+  //   address.
+  for (auto& state : parent_->subchannel_state_map_) {
+    auto* subchannel_state = state.second.get();
+    const bool unejected = subchannel_state->MaybeUneject(
+        config.base_ejection_time.millis(), config.max_ejection_time.millis());
+    if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+      gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
+              parent_.get(), state.first.c_str(), subchannel_state);
     }
-    timer_pending_ = false;
-    parent_->ejection_timer_ =
-        MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
   }
-  Unref(DEBUG_LOCATION, "Timer");
+  parent_->ejection_timer_ =
+      MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
 }
 
 //