From 6b79989d86c9c82f34947dd36a56158be5bd4cd4 Mon Sep 17 00:00:00 2001
From: Yijie Ma <5663878+yijiem@users.noreply.github.com>
Date: Mon, 5 Dec 2022 17:05:00 -0800
Subject: [PATCH] EventEngine::RunAfter: PollingResolver (#31717)

* EventEngine::RunAfter: polling_resolver

initial draft

* restore some iwyu change

* fix: get rid of OnNextResolution

* fix some test crashes

* fix more tests

* clang-tidy

* review

* clang-tidy

* fix

* review

* review

* review

* fix use-after-move

* revert

* review
---
 src/core/BUILD                                |  5 +-
 .../resolver/dns/c_ares/grpc_ares_wrapper.h   |  2 +-
 .../resolver/polling_resolver.cc              | 72 ++++++++++---------
 .../resolver/polling_resolver.h               | 18 +++--
 .../resolvers/dns_resolver_cooldown_test.cc   |  5 +-
 .../resolvers/dns_resolver_test.cc            | 12 ++--
 test/cpp/naming/cancel_ares_query_test.cc     |  8 ++-
 test/cpp/naming/resolver_component_test.cc    |  3 +
 8 files changed, 70 insertions(+), 55 deletions(-)

diff --git a/src/core/BUILD b/src/core/BUILD
index 374df712f5c..904b77f0cb1 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -4439,19 +4439,16 @@ grpc_cc_library(
     language = "c++",
     deps = [
         "channel_args",
-        "closure",
-        "error",
         "grpc_service_config",
         "iomgr_fwd",
-        "status_helper",
         "time",
         "//:backoff",
         "//:debug_location",
+        "//:event_engine_base_hdrs",
         "//:exec_ctx",
         "//:gpr",
         "//:grpc_resolver",
         "//:grpc_trace",
-        "//:iomgr_timer",
         "//:orphanable",
         "//:ref_counted_ptr",
         "//:uri_parser",
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 3deb9ca3f40..c949c813d44 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -70,7 +70,7 @@ struct grpc_ares_request {
       ABSL_GUARDED_BY(mu);
   /** the pointer to receive the service config in JSON */
   char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr;
-  /** the evernt driver used by this request */
+  /** the event driver used by this request */
   grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr;
   /** number of ongoing queries */
   size_t pending_queries ABSL_GUARDED_BY(mu) = 0;
diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc
index 57dadfca353..e0cd4debe6c 100644
--- a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc
@@ -36,15 +36,15 @@
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gprpp/debug_location.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/gprpp/status_helper.h"
 #include "src/core/lib/gprpp/work_serializer.h"
 #include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/service_config/service_config.h"
 #include "src/core/lib/uri/uri_parser.h"
 
 namespace grpc_core {
 
+using ::grpc_event_engine::experimental::EventEngine;
+
 PollingResolver::PollingResolver(ResolverArgs args,
                                  const ChannelArgs& channel_args,
                                  Duration min_time_between_resolutions,
@@ -88,9 +88,7 @@ void PollingResolver::RequestReresolutionLocked() {
 }
 
 void PollingResolver::ResetBackoffLocked() {
-  if (have_next_resolution_timer_) {
-    grpc_timer_cancel(&next_resolution_timer_);
-  }
+  MaybeCancelNextResolutionTimer();
   backoff_.Reset();
 }
 
@@ -99,30 +97,46 @@ void PollingResolver::ShutdownLocked() {
     gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this);
   }
   shutdown_ = true;
-  if (have_next_resolution_timer_) {
-    grpc_timer_cancel(&next_resolution_timer_);
-  }
+  MaybeCancelNextResolutionTimer();
   request_.reset();
 }
 
-void PollingResolver::OnNextResolution(void* arg, grpc_error_handle error) {
-  auto* self = static_cast<PollingResolver*>(arg);
-  self->work_serializer_->Run(
-      [self, error]() { self->OnNextResolutionLocked(error); }, DEBUG_LOCATION);
+void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
+  RefCountedPtr<PollingResolver> self = Ref();
+  next_resolution_timer_handle_ =
+      channel_args_.GetObject<EventEngine>()->RunAfter(
+          timeout, [self = std::move(self)]() mutable {
+            ApplicationCallbackExecCtx callback_exec_ctx;
+            ExecCtx exec_ctx;
+            auto* self_ptr = self.get();
+            self_ptr->work_serializer_->Run(
+                [self = std::move(self)]() { self->OnNextResolutionLocked(); },
+                DEBUG_LOCATION);
+          });
 }
 
-void PollingResolver::OnNextResolutionLocked(grpc_error_handle error) {
+void PollingResolver::OnNextResolutionLocked() {
   if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
     gpr_log(GPR_INFO,
-            "[polling resolver %p] re-resolution timer fired: error=\"%s\", "
-            "shutdown_=%d",
-            this, StatusToString(error).c_str(), shutdown_);
+            "[polling resolver %p] re-resolution timer fired: shutdown_=%d",
+            this, shutdown_);
   }
-  have_next_resolution_timer_ = false;
-  if (error.ok() && !shutdown_) {
+  next_resolution_timer_handle_.reset();
+  if (!shutdown_) {
     StartResolvingLocked();
   }
-  Unref(DEBUG_LOCATION, "retry-timer");
+}
+
+void PollingResolver::MaybeCancelNextResolutionTimer() {
+  if (next_resolution_timer_handle_.has_value()) {
+    if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
+      gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer",
+              this);
+    }
+    channel_args_.GetObject<EventEngine>()->Cancel(
+        *next_resolution_timer_handle_);
+    next_resolution_timer_handle_.reset();
+  }
 }
 
 void PollingResolver::OnRequestComplete(Result result) {
@@ -188,10 +202,9 @@ void PollingResolver::GetResultStatus(absl::Status status) {
     // in a loop while draining the currently-held WorkSerializer.
     // Also see https://github.com/grpc/grpc/issues/26079.
     ExecCtx::Get()->InvalidateNow();
-    Timestamp next_try = backoff_.NextAttemptTime();
-    Duration timeout = next_try - Timestamp::Now();
-    GPR_ASSERT(!have_next_resolution_timer_);
-    have_next_resolution_timer_ = true;
+    const Timestamp next_try = backoff_.NextAttemptTime();
+    const Duration timeout = next_try - Timestamp::Now();
+    GPR_ASSERT(!next_resolution_timer_handle_.has_value());
     if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
       if (timeout > Duration::Zero()) {
         gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
@@ -200,9 +213,7 @@ void PollingResolver::GetResultStatus(absl::Status status) {
         gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
       }
     }
-    Ref(DEBUG_LOCATION, "next_resolution_timer").release();
-    GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
-    grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
+    ScheduleNextResolutionTimer(timeout);
     // Reset result_status_state_.  Note that even if re-resolution was
     // requested while the result-health callback was pending, we can
     // ignore it here, because we are in backoff to re-resolve anyway.
@@ -213,7 +224,7 @@ void PollingResolver::GetResultStatus(absl::Status status) {
 void PollingResolver::MaybeStartResolvingLocked() {
   // If there is an existing timer, the time it fires is the earliest time we
   // can start the next resolution.
-  if (have_next_resolution_timer_) return;
+  if (next_resolution_timer_handle_.has_value()) return;
   if (last_resolution_timestamp_.has_value()) {
     // InvalidateNow to avoid getting stuck re-initializing this timer
     // in a loop while draining the currently-held WorkSerializer.
@@ -234,12 +245,7 @@ void PollingResolver::MaybeStartResolvingLocked() {
                 this, last_resolution_ago.millis(),
                 time_until_next_resolution.millis());
       }
-      have_next_resolution_timer_ = true;
-      Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
-      GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
-      grpc_timer_init(&next_resolution_timer_,
-                      Timestamp::Now() + time_until_next_resolution,
-                      &on_next_resolution_);
+      ScheduleNextResolutionTimer(time_until_next_resolution);
       return;
     }
   }
diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.h b/src/core/ext/filters/client_channel/resolver/polling_resolver.h
index 8abbec16cfd..e696c5280dd 100644
--- a/src/core/ext/filters/client_channel/resolver/polling_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.h
@@ -25,16 +25,15 @@
 #include "absl/status/status.h"
 #include "absl/types/optional.h"
 
+#include <grpc/event_engine/event_engine.h>
+
 #include "src/core/lib/backoff/backoff.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/debug/trace.h"
 #include "src/core/lib/gprpp/orphanable.h"
 #include "src/core/lib/gprpp/time.h"
 #include "src/core/lib/gprpp/work_serializer.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/iomgr_fwd.h"
-#include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/resolver/resolver.h"
 #include "src/core/lib/resolver/resolver_factory.h"
 
@@ -77,11 +76,11 @@ class PollingResolver : public Resolver {
   void StartResolvingLocked();
 
   void OnRequestCompleteLocked(Result result);
-
   void GetResultStatus(absl::Status status);
 
-  static void OnNextResolution(void* arg, grpc_error_handle error);
-  void OnNextResolutionLocked(grpc_error_handle error);
+  void ScheduleNextResolutionTimer(const Duration& timeout);
+  void OnNextResolutionLocked();
+  void MaybeCancelNextResolutionTimer();
 
   /// authority
   std::string authority_;
@@ -98,10 +97,6 @@ class PollingResolver : public Resolver {
   bool shutdown_ = false;
   /// are we currently resolving?
   OrphanablePtr<Orphanable> request_;
-  /// next resolution timer
-  bool have_next_resolution_timer_ = false;
-  grpc_timer next_resolution_timer_;
-  grpc_closure on_next_resolution_;
   /// min time between DNS requests
   Duration min_time_between_resolutions_;
   /// timestamp of last DNS request
@@ -116,6 +111,9 @@ class PollingResolver : public Resolver {
     kReresolutionRequestedWhileCallbackWasPending,
   };
   ResultStatusState result_status_state_ = ResultStatusState::kNone;
+  /// next resolution timer
+  absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
+      next_resolution_timer_handle_;
 };
 
 }  // namespace grpc_core
diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
index 0be3f1cf12d..8fe88d068fa 100644
--- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
+++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
@@ -62,6 +62,8 @@
 #include "src/core/lib/uri/uri_parser.h"
 #include "test/core/util/test_config.h"
 
+using ::grpc_event_engine::experimental::GetDefaultEventEngine;
+
 constexpr int kMinResolutionPeriodMs = 1000;
 
 static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
@@ -91,7 +93,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
   explicit TestDNSResolver(
       std::shared_ptr<grpc_core::DNSResolver> default_resolver)
       : default_resolver_(std::move(default_resolver)),
-        engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
+        engine_(GetDefaultEventEngine()) {}
   // Wrapper around default resolve_address in order to count the number of
   // times we incur in a system-level name resolution.
   TaskHandle LookupHostname(
@@ -383,6 +385,7 @@ static void start_test_under_work_serializer(void* arg) {
       kMinResolutionPeriodMs);
   grpc_channel_args cooldown_args = {1, &cooldown_arg};
   args.args = grpc_core::ChannelArgs::FromC(&cooldown_args);
+  args.args = args.args.SetObject(GetDefaultEventEngine());
   res_cb_arg->resolver = factory->CreateResolver(std::move(args));
   ASSERT_NE(res_cb_arg->resolver, nullptr);
   // First resolution, would incur in system-level resolution.
diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc
index c6ea07ed817..c3355bcb5f7 100644
--- a/test/core/client_channel/resolvers/dns_resolver_test.cc
+++ b/test/core/client_channel/resolvers/dns_resolver_test.cc
@@ -28,7 +28,9 @@
 #include <grpc/support/log.h>
 
 #include "src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h"
+#include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/config/core_configuration.h"
+#include "src/core/lib/event_engine/default_event_engine.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/global_config_generic.h"
 #include "src/core/lib/gprpp/memory.h"
@@ -41,6 +43,8 @@
 #include "src/core/lib/uri/uri_parser.h"
 #include "test/core/util/test_config.h"
 
+using ::grpc_event_engine::experimental::GetDefaultEventEngine;
+
 static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
 
 class TestResultHandler : public grpc_core::Resolver::ResultHandler {
@@ -54,13 +58,13 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
   grpc_core::ExecCtx exec_ctx;
   absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(string);
   if (!uri.ok()) {
-    gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
-    ASSERT_TRUE(uri.ok());
+    FAIL() << "Error: " << uri.status().ToString();
   }
   grpc_core::ResolverArgs args;
   args.uri = std::move(*uri);
   args.work_serializer = *g_work_serializer;
   args.result_handler = std::make_unique<TestResultHandler>();
+  args.args = args.args.SetObject(GetDefaultEventEngine());
   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
       factory->CreateResolver(std::move(args));
   ASSERT_NE(resolver, nullptr);
@@ -73,13 +77,13 @@ static void test_fails(grpc_core::ResolverFactory* factory,
   grpc_core::ExecCtx exec_ctx;
   absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(string);
   if (!uri.ok()) {
-    gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
-    ASSERT_TRUE(uri.ok());
+    FAIL() << "Error: " << uri.status().ToString();
   }
   grpc_core::ResolverArgs args;
   args.uri = std::move(*uri);
   args.work_serializer = *g_work_serializer;
   args.result_handler = std::make_unique<TestResultHandler>();
+  args.args = args.args.SetObject(GetDefaultEventEngine());
   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
       factory->CreateResolver(std::move(args));
   ASSERT_EQ(resolver, nullptr);
diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc
index b8b16e3c425..1ab3672af82 100644
--- a/test/cpp/naming/cancel_ares_query_test.cc
+++ b/test/cpp/naming/cancel_ares_query_test.cc
@@ -38,6 +38,7 @@
 #include "src/core/lib/config/core_configuration.h"
 #include "src/core/lib/debug/stats.h"
 #include "src/core/lib/debug/stats_data.h"
+#include "src/core/lib/event_engine/default_event_engine.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/orphanable.h"
 #include "src/core/lib/gprpp/thd.h"
@@ -64,6 +65,8 @@
 
 namespace {
 
+using ::grpc_event_engine::experimental::GetDefaultEventEngine;
+
 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
 
 gpr_timespec FiveSecondsFromNow(void) {
@@ -166,8 +169,9 @@ void TestCancelActiveDNSQuery(ArgsStruct* args) {
   // create resolver and resolve
   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
       grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver(
-          client_target.c_str(), grpc_core::ChannelArgs(), args->pollset_set,
-          args->lock,
+          client_target.c_str(),
+          grpc_core::ChannelArgs().SetObject(GetDefaultEventEngine()),
+          args->pollset_set, args->lock,
           std::unique_ptr<grpc_core::Resolver::ResultHandler>(
               new AssertFailureResultHandler(args)));
   resolver->StartLocked();
diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc
index 9f60fd8e764..7d2fa31995f 100644
--- a/test/cpp/naming/resolver_component_test.cc
+++ b/test/cpp/naming/resolver_component_test.cc
@@ -47,6 +47,7 @@
 #include "src/core/lib/address_utils/sockaddr_utils.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/config/core_configuration.h"
+#include "src/core/lib/event_engine/default_event_engine.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/host_port.h"
 #include "src/core/lib/gprpp/orphanable.h"
@@ -77,6 +78,7 @@
 #define BAD_SOCKET_RETURN_VAL (-1)
 #endif
 
+using ::grpc_event_engine::experimental::GetDefaultEventEngine;
 using std::vector;
 using testing::UnorderedElementsAreArray;
 
@@ -629,6 +631,7 @@ void RunResolvesRelevantRecordsTest(
     gpr_log(GPR_DEBUG, "Invalid value for --enable_txt_queries.");
     abort();
   }
+  resolver_args = resolver_args.SetObject(GetDefaultEventEngine());
   // create resolver and resolve
   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
       grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver(