diff --git a/src/core/BUILD b/src/core/BUILD index 374df712f5c..904b77f0cb1 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4439,19 +4439,16 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "closure", - "error", "grpc_service_config", "iomgr_fwd", - "status_helper", "time", "//:backoff", "//:debug_location", + "//:event_engine_base_hdrs", "//:exec_ctx", "//:gpr", "//:grpc_resolver", "//:grpc_trace", - "//:iomgr_timer", "//:orphanable", "//:ref_counted_ptr", "//:uri_parser", diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 3deb9ca3f40..c949c813d44 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -70,7 +70,7 @@ struct grpc_ares_request { ABSL_GUARDED_BY(mu); /** the pointer to receive the service config in JSON */ char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr; - /** the evernt driver used by this request */ + /** the event driver used by this request */ grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr; /** number of ongoing queries */ size_t pending_queries ABSL_GUARDED_BY(mu) = 0; diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc index 57dadfca353..e0cd4debe6c 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc @@ -36,15 +36,15 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/service_config/service_config.h" #include "src/core/lib/uri/uri_parser.h" namespace grpc_core { +using ::grpc_event_engine::experimental::EventEngine; + PollingResolver::PollingResolver(ResolverArgs args, const ChannelArgs& channel_args, Duration min_time_between_resolutions, @@ -88,9 +88,7 @@ void PollingResolver::RequestReresolutionLocked() { } void PollingResolver::ResetBackoffLocked() { - if (have_next_resolution_timer_) { - grpc_timer_cancel(&next_resolution_timer_); - } + MaybeCancelNextResolutionTimer(); backoff_.Reset(); } @@ -99,30 +97,46 @@ void PollingResolver::ShutdownLocked() { gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this); } shutdown_ = true; - if (have_next_resolution_timer_) { - grpc_timer_cancel(&next_resolution_timer_); - } + MaybeCancelNextResolutionTimer(); request_.reset(); } -void PollingResolver::OnNextResolution(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->work_serializer_->Run( - [self, error]() { self->OnNextResolutionLocked(error); }, DEBUG_LOCATION); +void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) { + RefCountedPtr self = Ref(); + next_resolution_timer_handle_ = + channel_args_.GetObject()->RunAfter( + timeout, [self = std::move(self)]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto* self_ptr = self.get(); + self_ptr->work_serializer_->Run( + [self = std::move(self)]() { self->OnNextResolutionLocked(); }, + DEBUG_LOCATION); + }); } -void PollingResolver::OnNextResolutionLocked(grpc_error_handle error) { +void PollingResolver::OnNextResolutionLocked() { if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { gpr_log(GPR_INFO, - "[polling resolver %p] re-resolution timer fired: error=\"%s\", " - "shutdown_=%d", - this, StatusToString(error).c_str(), shutdown_); + "[polling resolver %p] re-resolution timer fired: shutdown_=%d", + this, shutdown_); } - have_next_resolution_timer_ = false; - if (error.ok() && !shutdown_) { + next_resolution_timer_handle_.reset(); + if (!shutdown_) { StartResolvingLocked(); } - Unref(DEBUG_LOCATION, "retry-timer"); +} + +void PollingResolver::MaybeCancelNextResolutionTimer() { + if (next_resolution_timer_handle_.has_value()) { + if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { + gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer", + this); + } + channel_args_.GetObject()->Cancel( + *next_resolution_timer_handle_); + next_resolution_timer_handle_.reset(); + } } void PollingResolver::OnRequestComplete(Result result) { @@ -188,10 +202,9 @@ void PollingResolver::GetResultStatus(absl::Status status) { // in a loop while draining the currently-held WorkSerializer. // Also see https://github.com/grpc/grpc/issues/26079. ExecCtx::Get()->InvalidateNow(); - Timestamp next_try = backoff_.NextAttemptTime(); - Duration timeout = next_try - Timestamp::Now(); - GPR_ASSERT(!have_next_resolution_timer_); - have_next_resolution_timer_ = true; + const Timestamp next_try = backoff_.NextAttemptTime(); + const Duration timeout = next_try - Timestamp::Now(); + GPR_ASSERT(!next_resolution_timer_handle_.has_value()); if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { if (timeout > Duration::Zero()) { gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms", @@ -200,9 +213,7 @@ void PollingResolver::GetResultStatus(absl::Status status) { gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this); } } - Ref(DEBUG_LOCATION, "next_resolution_timer").release(); - GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr); - grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_); + ScheduleNextResolutionTimer(timeout); // Reset result_status_state_. Note that even if re-resolution was // requested while the result-health callback was pending, we can // ignore it here, because we are in backoff to re-resolve anyway. @@ -213,7 +224,7 @@ void PollingResolver::GetResultStatus(absl::Status status) { void PollingResolver::MaybeStartResolvingLocked() { // If there is an existing timer, the time it fires is the earliest time we // can start the next resolution. - if (have_next_resolution_timer_) return; + if (next_resolution_timer_handle_.has_value()) return; if (last_resolution_timestamp_.has_value()) { // InvalidateNow to avoid getting stuck re-initializing this timer // in a loop while draining the currently-held WorkSerializer. @@ -234,12 +245,7 @@ void PollingResolver::MaybeStartResolvingLocked() { this, last_resolution_ago.millis(), time_until_next_resolution.millis()); } - have_next_resolution_timer_ = true; - Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release(); - GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr); - grpc_timer_init(&next_resolution_timer_, - Timestamp::Now() + time_until_next_resolution, - &on_next_resolution_); + ScheduleNextResolutionTimer(time_until_next_resolution); return; } } diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.h b/src/core/ext/filters/client_channel/resolver/polling_resolver.h index 8abbec16cfd..e696c5280dd 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.h @@ -25,16 +25,15 @@ #include "absl/status/status.h" #include "absl/types/optional.h" +#include + #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/iomgr_fwd.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resolver/resolver.h" #include "src/core/lib/resolver/resolver_factory.h" @@ -77,11 +76,11 @@ class PollingResolver : public Resolver { void StartResolvingLocked(); void OnRequestCompleteLocked(Result result); - void GetResultStatus(absl::Status status); - static void OnNextResolution(void* arg, grpc_error_handle error); - void OnNextResolutionLocked(grpc_error_handle error); + void ScheduleNextResolutionTimer(const Duration& timeout); + void OnNextResolutionLocked(); + void MaybeCancelNextResolutionTimer(); /// authority std::string authority_; @@ -98,10 +97,6 @@ class PollingResolver : public Resolver { bool shutdown_ = false; /// are we currently resolving? OrphanablePtr request_; - /// next resolution timer - bool have_next_resolution_timer_ = false; - grpc_timer next_resolution_timer_; - grpc_closure on_next_resolution_; /// min time between DNS requests Duration min_time_between_resolutions_; /// timestamp of last DNS request @@ -116,6 +111,9 @@ class PollingResolver : public Resolver { kReresolutionRequestedWhileCallbackWasPending, }; ResultStatusState result_status_state_ = ResultStatusState::kNone; + /// next resolution timer + absl::optional + next_resolution_timer_handle_; }; } // namespace grpc_core diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 0be3f1cf12d..8fe88d068fa 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -62,6 +62,8 @@ #include "src/core/lib/uri/uri_parser.h" #include "test/core/util/test_config.h" +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + constexpr int kMinResolutionPeriodMs = 1000; static std::shared_ptr* g_work_serializer; @@ -91,7 +93,7 @@ class TestDNSResolver : public grpc_core::DNSResolver { explicit TestDNSResolver( std::shared_ptr default_resolver) : default_resolver_(std::move(default_resolver)), - engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} + engine_(GetDefaultEventEngine()) {} // Wrapper around default resolve_address in order to count the number of // times we incur in a system-level name resolution. TaskHandle LookupHostname( @@ -383,6 +385,7 @@ static void start_test_under_work_serializer(void* arg) { kMinResolutionPeriodMs); grpc_channel_args cooldown_args = {1, &cooldown_arg}; args.args = grpc_core::ChannelArgs::FromC(&cooldown_args); + args.args = args.args.SetObject(GetDefaultEventEngine()); res_cb_arg->resolver = factory->CreateResolver(std::move(args)); ASSERT_NE(res_cb_arg->resolver, nullptr); // First resolution, would incur in system-level resolution. diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index c6ea07ed817..c3355bcb5f7 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -28,7 +28,9 @@ #include #include "src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/global_config_generic.h" #include "src/core/lib/gprpp/memory.h" @@ -41,6 +43,8 @@ #include "src/core/lib/uri/uri_parser.h" #include "test/core/util/test_config.h" +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + static std::shared_ptr* g_work_serializer; class TestResultHandler : public grpc_core::Resolver::ResultHandler { @@ -54,13 +58,13 @@ static void test_succeeds(grpc_core::ResolverFactory* factory, grpc_core::ExecCtx exec_ctx; absl::StatusOr uri = grpc_core::URI::Parse(string); if (!uri.ok()) { - gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str()); - ASSERT_TRUE(uri.ok()); + FAIL() << "Error: " << uri.status().ToString(); } grpc_core::ResolverArgs args; args.uri = std::move(*uri); args.work_serializer = *g_work_serializer; args.result_handler = std::make_unique(); + args.args = args.args.SetObject(GetDefaultEventEngine()); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); ASSERT_NE(resolver, nullptr); @@ -73,13 +77,13 @@ static void test_fails(grpc_core::ResolverFactory* factory, grpc_core::ExecCtx exec_ctx; absl::StatusOr uri = grpc_core::URI::Parse(string); if (!uri.ok()) { - gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str()); - ASSERT_TRUE(uri.ok()); + FAIL() << "Error: " << uri.status().ToString(); } grpc_core::ResolverArgs args; args.uri = std::move(*uri); args.work_serializer = *g_work_serializer; args.result_handler = std::make_unique(); + args.args = args.args.SetObject(GetDefaultEventEngine()); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); ASSERT_EQ(resolver, nullptr); diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index b8b16e3c425..1ab3672af82 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -38,6 +38,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/thd.h" @@ -64,6 +65,8 @@ namespace { +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + void* Tag(intptr_t t) { return reinterpret_cast(t); } gpr_timespec FiveSecondsFromNow(void) { @@ -166,8 +169,9 @@ void TestCancelActiveDNSQuery(ArgsStruct* args) { // create resolver and resolve grpc_core::OrphanablePtr resolver = grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver( - client_target.c_str(), grpc_core::ChannelArgs(), args->pollset_set, - args->lock, + client_target.c_str(), + grpc_core::ChannelArgs().SetObject(GetDefaultEventEngine()), + args->pollset_set, args->lock, std::unique_ptr( new AssertFailureResultHandler(args))); resolver->StartLocked(); diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 9f60fd8e764..7d2fa31995f 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -47,6 +47,7 @@ #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/orphanable.h" @@ -77,6 +78,7 @@ #define BAD_SOCKET_RETURN_VAL (-1) #endif +using ::grpc_event_engine::experimental::GetDefaultEventEngine; using std::vector; using testing::UnorderedElementsAreArray; @@ -629,6 +631,7 @@ void RunResolvesRelevantRecordsTest( gpr_log(GPR_DEBUG, "Invalid value for --enable_txt_queries."); abort(); } + resolver_args = resolver_args.SetObject(GetDefaultEventEngine()); // create resolver and resolve grpc_core::OrphanablePtr resolver = grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver(