EventEngine::RunAfter: PollingResolver (#31717)

* EventEngine::RunAfter: polling_resolver

initial draft

* restore some iwyu change

* fix: get rid of OnNextResolution

* fix some test crashes

* fix more tests

* clang-tidy

* review

* clang-tidy

* fix

* review

* review

* review

* fix use-after-move

* revert

* review
pull/31811/head
Yijie Ma 2 years ago committed by GitHub
parent 5e4d9f4bcf
commit 6b79989d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/BUILD
  2. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  3. 72
      src/core/ext/filters/client_channel/resolver/polling_resolver.cc
  4. 18
      src/core/ext/filters/client_channel/resolver/polling_resolver.h
  5. 5
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  6. 12
      test/core/client_channel/resolvers/dns_resolver_test.cc
  7. 8
      test/cpp/naming/cancel_ares_query_test.cc
  8. 3
      test/cpp/naming/resolver_component_test.cc

@ -4439,19 +4439,16 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"closure",
"error",
"grpc_service_config",
"iomgr_fwd",
"status_helper",
"time",
"//:backoff",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
"//:grpc_resolver",
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:ref_counted_ptr",
"//:uri_parser",

@ -70,7 +70,7 @@ struct grpc_ares_request {
ABSL_GUARDED_BY(mu);
/** the pointer to receive the service config in JSON */
char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr;
/** the evernt driver used by this request */
/** the event driver used by this request */
grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr;
/** number of ongoing queries */
size_t pending_queries ABSL_GUARDED_BY(mu) = 0;

@ -36,15 +36,15 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
PollingResolver::PollingResolver(ResolverArgs args,
const ChannelArgs& channel_args,
Duration min_time_between_resolutions,
@ -88,9 +88,7 @@ void PollingResolver::RequestReresolutionLocked() {
}
void PollingResolver::ResetBackoffLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
}
MaybeCancelNextResolutionTimer();
backoff_.Reset();
}
@ -99,30 +97,46 @@ void PollingResolver::ShutdownLocked() {
gpr_log(GPR_INFO, "[polling resolver %p] shutting down", this);
}
shutdown_ = true;
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
}
MaybeCancelNextResolutionTimer();
request_.reset();
}
void PollingResolver::OnNextResolution(void* arg, grpc_error_handle error) {
auto* self = static_cast<PollingResolver*>(arg);
self->work_serializer_->Run(
[self, error]() { self->OnNextResolutionLocked(error); }, DEBUG_LOCATION);
void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
RefCountedPtr<PollingResolver> self = Ref();
next_resolution_timer_handle_ =
channel_args_.GetObject<EventEngine>()->RunAfter(
timeout, [self = std::move(self)]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->work_serializer_->Run(
[self = std::move(self)]() { self->OnNextResolutionLocked(); },
DEBUG_LOCATION);
});
}
void PollingResolver::OnNextResolutionLocked(grpc_error_handle error) {
void PollingResolver::OnNextResolutionLocked() {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO,
"[polling resolver %p] re-resolution timer fired: error=\"%s\", "
"shutdown_=%d",
this, StatusToString(error).c_str(), shutdown_);
"[polling resolver %p] re-resolution timer fired: shutdown_=%d",
this, shutdown_);
}
have_next_resolution_timer_ = false;
if (error.ok() && !shutdown_) {
next_resolution_timer_handle_.reset();
if (!shutdown_) {
StartResolvingLocked();
}
Unref(DEBUG_LOCATION, "retry-timer");
}
void PollingResolver::MaybeCancelNextResolutionTimer() {
if (next_resolution_timer_handle_.has_value()) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] cancel re-resolution timer",
this);
}
channel_args_.GetObject<EventEngine>()->Cancel(
*next_resolution_timer_handle_);
next_resolution_timer_handle_.reset();
}
}
void PollingResolver::OnRequestComplete(Result result) {
@ -188,10 +202,9 @@ void PollingResolver::GetResultStatus(absl::Status status) {
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
Timestamp next_try = backoff_.NextAttemptTime();
Duration timeout = next_try - Timestamp::Now();
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
const Timestamp next_try = backoff_.NextAttemptTime();
const Duration timeout = next_try - Timestamp::Now();
GPR_ASSERT(!next_resolution_timer_handle_.has_value());
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
@ -200,9 +213,7 @@ void PollingResolver::GetResultStatus(absl::Status status) {
gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
}
}
Ref(DEBUG_LOCATION, "next_resolution_timer").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
ScheduleNextResolutionTimer(timeout);
// Reset result_status_state_. Note that even if re-resolution was
// requested while the result-health callback was pending, we can
// ignore it here, because we are in backoff to re-resolve anyway.
@ -213,7 +224,7 @@ void PollingResolver::GetResultStatus(absl::Status status) {
void PollingResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) return;
if (next_resolution_timer_handle_.has_value()) return;
if (last_resolution_timestamp_.has_value()) {
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
@ -234,12 +245,7 @@ void PollingResolver::MaybeStartResolvingLocked() {
this, last_resolution_ago.millis(),
time_until_next_resolution.millis());
}
have_next_resolution_timer_ = true;
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
grpc_timer_init(&next_resolution_timer_,
Timestamp::Now() + time_until_next_resolution,
&on_next_resolution_);
ScheduleNextResolutionTimer(time_until_next_resolution);
return;
}
}

@ -25,16 +25,15 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_factory.h"
@ -77,11 +76,11 @@ class PollingResolver : public Resolver {
void StartResolvingLocked();
void OnRequestCompleteLocked(Result result);
void GetResultStatus(absl::Status status);
static void OnNextResolution(void* arg, grpc_error_handle error);
void OnNextResolutionLocked(grpc_error_handle error);
void ScheduleNextResolutionTimer(const Duration& timeout);
void OnNextResolutionLocked();
void MaybeCancelNextResolutionTimer();
/// authority
std::string authority_;
@ -98,10 +97,6 @@ class PollingResolver : public Resolver {
bool shutdown_ = false;
/// are we currently resolving?
OrphanablePtr<Orphanable> request_;
/// next resolution timer
bool have_next_resolution_timer_ = false;
grpc_timer next_resolution_timer_;
grpc_closure on_next_resolution_;
/// min time between DNS requests
Duration min_time_between_resolutions_;
/// timestamp of last DNS request
@ -116,6 +111,9 @@ class PollingResolver : public Resolver {
kReresolutionRequestedWhileCallbackWasPending,
};
ResultStatusState result_status_state_ = ResultStatusState::kNone;
/// next resolution timer
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
next_resolution_timer_handle_;
};
} // namespace grpc_core

@ -62,6 +62,8 @@
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/util/test_config.h"
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr int kMinResolutionPeriodMs = 1000;
static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
@ -91,7 +93,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
explicit TestDNSResolver(
std::shared_ptr<grpc_core::DNSResolver> default_resolver)
: default_resolver_(std::move(default_resolver)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
engine_(GetDefaultEventEngine()) {}
// Wrapper around default resolve_address in order to count the number of
// times we incur in a system-level name resolution.
TaskHandle LookupHostname(
@ -383,6 +385,7 @@ static void start_test_under_work_serializer(void* arg) {
kMinResolutionPeriodMs);
grpc_channel_args cooldown_args = {1, &cooldown_arg};
args.args = grpc_core::ChannelArgs::FromC(&cooldown_args);
args.args = args.args.SetObject(GetDefaultEventEngine());
res_cb_arg->resolver = factory->CreateResolver(std::move(args));
ASSERT_NE(res_cb_arg->resolver, nullptr);
// First resolution, would incur in system-level resolution.

@ -28,7 +28,9 @@
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/gprpp/memory.h"
@ -41,6 +43,8 @@
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/util/test_config.h"
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
class TestResultHandler : public grpc_core::Resolver::ResultHandler {
@ -54,13 +58,13 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
grpc_core::ExecCtx exec_ctx;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(string);
if (!uri.ok()) {
gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
ASSERT_TRUE(uri.ok());
FAIL() << "Error: " << uri.status().ToString();
}
grpc_core::ResolverArgs args;
args.uri = std::move(*uri);
args.work_serializer = *g_work_serializer;
args.result_handler = std::make_unique<TestResultHandler>();
args.args = args.args.SetObject(GetDefaultEventEngine());
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
ASSERT_NE(resolver, nullptr);
@ -73,13 +77,13 @@ static void test_fails(grpc_core::ResolverFactory* factory,
grpc_core::ExecCtx exec_ctx;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(string);
if (!uri.ok()) {
gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
ASSERT_TRUE(uri.ok());
FAIL() << "Error: " << uri.status().ToString();
}
grpc_core::ResolverArgs args;
args.uri = std::move(*uri);
args.work_serializer = *g_work_serializer;
args.result_handler = std::make_unique<TestResultHandler>();
args.args = args.args.SetObject(GetDefaultEventEngine());
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
ASSERT_EQ(resolver, nullptr);

@ -38,6 +38,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/thd.h"
@ -64,6 +65,8 @@
namespace {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
gpr_timespec FiveSecondsFromNow(void) {
@ -166,8 +169,9 @@ void TestCancelActiveDNSQuery(ArgsStruct* args) {
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver(
client_target.c_str(), grpc_core::ChannelArgs(), args->pollset_set,
args->lock,
client_target.c_str(),
grpc_core::ChannelArgs().SetObject(GetDefaultEventEngine()),
args->pollset_set, args->lock,
std::unique_ptr<grpc_core::Resolver::ResultHandler>(
new AssertFailureResultHandler(args)));
resolver->StartLocked();

@ -47,6 +47,7 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -77,6 +78,7 @@
#define BAD_SOCKET_RETURN_VAL (-1)
#endif
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using std::vector;
using testing::UnorderedElementsAreArray;
@ -629,6 +631,7 @@ void RunResolvesRelevantRecordsTest(
gpr_log(GPR_DEBUG, "Invalid value for --enable_txt_queries.");
abort();
}
resolver_args = resolver_args.SetObject(GetDefaultEventEngine());
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::CoreConfiguration::Get().resolver_registry().CreateResolver(

Loading…
Cancel
Save