[EventEngine] Simplify `EventEngine::DNSResolver` API (#33459)

This change simplifies `EventEngine::DNSResolver`'s API based on the
proposal:
[go/event-engine-dns-resolver-api-changes](http://go/event-engine-dns-resolver-api-changes).
Note that this API change + the implementation described in
[go/event-engine-dns-resolver-implementation](http://go/event-engine-dns-resolver-implementation)
has already been tested against our main test suites and are passing
them.


<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33520/head
Yijie Ma 1 year ago committed by GitHub
parent 055158b932
commit 6e95cebbd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 45
      include/grpc/event_engine/event_engine.h
  3. 2
      src/core/BUILD
  4. 5
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  5. 116
      src/core/ext/filters/client_channel/resolver/dns/event_engine/event_engine_client_channel_resolver.cc
  6. 12
      src/core/lib/event_engine/event_engine.cc
  7. 4
      src/core/lib/event_engine/handle_containers.h
  8. 16
      src/core/lib/event_engine/posix_engine/posix_engine.h
  9. 28
      src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
  10. 16
      src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
  11. 16
      src/core/lib/event_engine/windows/windows_engine.h
  12. 14
      src/core/lib/iomgr/resolve_address.cc
  13. 20
      src/core/lib/iomgr/resolve_address.h
  14. 3
      test/core/event_engine/handle_tests.cc
  15. 71
      test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc

@ -1473,6 +1473,7 @@ grpc_cc_library(
"absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_map",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/functional:any_invocable",
"absl/functional:function_ref",
@ -3126,7 +3127,6 @@ grpc_cc_library(
"//src/core:channel_args",
"//src/core:closure",
"//src/core:error",
"//src/core:event_engine_common",
"//src/core:grpc_service_config",
"//src/core:grpc_sockaddr",
"//src/core:iomgr_fwd",

@ -321,17 +321,14 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// called.
virtual bool CancelConnect(ConnectionHandle handle) = 0;
/// Provides asynchronous resolution.
///
/// This object has a destruction-is-cancellation semantic.
/// Implementations should make sure that all pending requests are cancelled
/// when the object is destroyed and all pending callbacks will be called
/// shortly. If cancellation races with request completion, implementations
/// may choose to either cancel or satisfy the request.
class DNSResolver {
public:
/// Task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t keys[2];
static const LookupTaskHandle kInvalid;
friend bool operator==(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
friend bool operator!=(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
};
/// Optional configuration for DNSResolvers.
struct ResolverOptions {
/// If empty, default DNS servers will be used.
@ -363,37 +360,27 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// \a default_port may be a non-numeric named service port, and will only
/// be used if \a address does not already contain a port component.
///
/// When the lookup is complete, the \a on_resolve callback will be invoked
/// with a status indicating the success or failure of the lookup.
/// Implementations should pass the appropriate statuses to the callback.
/// For example, callbacks might expect to receive DEADLINE_EXCEEDED or
/// When the lookup is complete or cancelled, the \a on_resolve callback
/// will be invoked with a status indicating the success or failure of the
/// lookup. Implementations should pass the appropriate statuses to the
/// callback. For example, callbacks might expect to receive CANCELLED or
/// NOT_FOUND.
///
/// If cancelled, \a on_resolve will not be executed.
virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
virtual void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) = 0;
absl::string_view default_port) = 0;
/// Asynchronously perform an SRV record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) = 0;
virtual void LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name) = 0;
/// Asynchronously perform a TXT record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) = 0;
/// Cancel an asynchronous lookup operation.
///
/// This shares the same semantics with \a EventEngine::Cancel: successfully
/// cancelled lookups will not have their callbacks executed, and this
/// method returns true.
virtual bool CancelLookup(LookupTaskHandle handle) = 0;
virtual void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) = 0;
};
/// At time of destruction, the EventEngine must have no active

@ -5131,7 +5131,6 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_set",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -5141,7 +5140,6 @@ grpc_cc_library(
deps = [
"channel_args",
"event_engine_common",
"event_engine_utils",
"grpc_service_config",
"polling_resolver",
"service_config_helper",

@ -33,7 +33,6 @@
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -70,7 +69,6 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/service_config/service_config_impl.h"
@ -706,8 +704,7 @@ class AresDNSResolver : public DNSResolver {
// the previous default DNS resolver, used to delegate blocking DNS calls to
std::shared_ptr<DNSResolver> default_resolver_ = GetDNSResolver();
Mutex mu_;
grpc_event_engine::experimental::LookupTaskHandleSet open_requests_
ABSL_GUARDED_BY(mu_);
TaskHandleSet open_requests_ ABSL_GUARDED_BY(mu_);
intptr_t aba_token_ ABSL_GUARDED_BY(mu_) = 0;
};

@ -28,7 +28,6 @@
#include "absl/base/thread_annotations.h"
#include "absl/cleanup/cleanup.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
@ -46,9 +45,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -73,8 +70,6 @@ namespace {
#define GRPC_DNS_DEFAULT_QUERY_TIMEOUT_MS 120000
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::HandleToString;
using grpc_event_engine::experimental::LookupTaskHandleSet;
// TODO(hork): Investigate adding a resolver test scenario where the first
// balancer hostname lookup result is an error, and the second contains valid
@ -121,6 +116,7 @@ class EventEngineClientChannelDNSResolver : public PollingResolver {
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
private:
void OnTimeout() ABSL_LOCKS_EXCLUDED(on_resolved_mu_);
void OnHostnameResolved(
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses);
void OnSRVResolved(
@ -149,14 +145,9 @@ class EventEngineClientChannelDNSResolver : public PollingResolver {
RefCountedPtr<EventEngineClientChannelDNSResolver> resolver_;
Mutex on_resolved_mu_;
// Lookup callbacks
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> hostname_handle_
ABSL_GUARDED_BY(on_resolved_mu_);
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> srv_handle_
ABSL_GUARDED_BY(on_resolved_mu_);
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> txt_handle_
ABSL_GUARDED_BY(on_resolved_mu_);
LookupTaskHandleSet balancer_hostname_handles_
ABSL_GUARDED_BY(on_resolved_mu_);
bool is_hostname_inflight_ ABSL_GUARDED_BY(on_resolved_mu_) = false;
bool is_srv_inflight_ ABSL_GUARDED_BY(on_resolved_mu_) = false;
bool is_txt_inflight_ ABSL_GUARDED_BY(on_resolved_mu_) = false;
// Output fields from requests.
ServerAddressList addresses_ ABSL_GUARDED_BY(on_resolved_mu_);
ServerAddressList balancer_addresses_ ABSL_GUARDED_BY(on_resolved_mu_);
@ -164,9 +155,13 @@ class EventEngineClientChannelDNSResolver : public PollingResolver {
absl::StatusOr<std::string> service_config_json_
ABSL_GUARDED_BY(on_resolved_mu_);
// Other internal state
size_t number_of_balancer_hostnames_initiated_
ABSL_GUARDED_BY(on_resolved_mu_) = 0;
size_t number_of_balancer_hostnames_resolved_
ABSL_GUARDED_BY(on_resolved_mu_) = 0;
bool orphaned_ ABSL_GUARDED_BY(on_resolved_mu_) = false;
absl::optional<EventEngine::TaskHandle> timeout_handle_
ABSL_GUARDED_BY(on_resolved_mu_);
std::unique_ptr<EventEngine::DNSResolver> event_engine_resolver_;
};
@ -225,41 +220,43 @@ EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting hostname resolution for %s", resolver_.get(),
resolver_->name_to_resolve().c_str());
hostname_handle_ = event_engine_resolver_->LookupHostname(
is_hostname_inflight_ = true;
event_engine_resolver_->LookupHostname(
[self = Ref(DEBUG_LOCATION, "OnHostnameResolved")](
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses) {
self->OnHostnameResolved(std::move(addresses));
},
resolver_->name_to_resolve(), kDefaultSecurePort,
resolver_->query_timeout_ms_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("hostname lookup handle: %s",
HandleToString(*hostname_handle_).c_str());
resolver_->name_to_resolve(), kDefaultSecurePort);
if (resolver_->enable_srv_queries_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting SRV record resolution for %s",
resolver_.get(), resolver_->name_to_resolve().c_str());
srv_handle_ = event_engine_resolver_->LookupSRV(
is_srv_inflight_ = true;
event_engine_resolver_->LookupSRV(
[self = Ref(DEBUG_LOCATION, "OnSRVResolved")](
absl::StatusOr<std::vector<EventEngine::DNSResolver::SRVRecord>>
srv_records) { self->OnSRVResolved(std::move(srv_records)); },
resolver_->name_to_resolve(), resolver_->query_timeout_ms_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("srv lookup handle: %s",
HandleToString(*srv_handle_).c_str());
resolver_->name_to_resolve());
}
if (resolver_->request_service_config_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting TXT record resolution for %s",
resolver_.get(), resolver_->name_to_resolve().c_str());
txt_handle_ = event_engine_resolver_->LookupTXT(
is_txt_inflight_ = true;
event_engine_resolver_->LookupTXT(
[self = Ref(DEBUG_LOCATION, "OnTXTResolved")](
absl::StatusOr<std::vector<std::string>> service_config) {
self->OnTXTResolved(std::move(service_config));
},
absl::StrCat("_grpc_config.", resolver_->name_to_resolve()),
resolver_->query_timeout_ms_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("txt lookup handle: %s",
HandleToString(*txt_handle_).c_str());
absl::StrCat("_grpc_config.", resolver_->name_to_resolve()));
}
// Initialize overall DNS resolution timeout alarm.
auto timeout = resolver_->query_timeout_ms_.count() == 0
? EventEngine::Duration::max()
: resolver_->query_timeout_ms_;
timeout_handle_ = resolver_->event_engine_->RunAfter(
timeout,
[self = Ref(DEBUG_LOCATION, "OnTimeout")]() { self->OnTimeout(); });
}
EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
@ -272,24 +269,26 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
{
MutexLock lock(&on_resolved_mu_);
orphaned_ = true;
// Event if cancellation fails here, OnResolvedLocked will return early, and
// the resolver will never see a completed request.
if (hostname_handle_.has_value()) {
event_engine_resolver_->CancelLookup(*hostname_handle_);
}
if (srv_handle_.has_value()) {
event_engine_resolver_->CancelLookup(*srv_handle_);
}
for (const auto& handle : balancer_hostname_handles_) {
event_engine_resolver_->CancelLookup(handle);
}
if (txt_handle_.has_value()) {
event_engine_resolver_->CancelLookup(*txt_handle_);
if (timeout_handle_.has_value()) {
resolver_->event_engine_->Cancel(*timeout_handle_);
timeout_handle_.reset();
}
// Even if cancellation fails here, OnResolvedLocked will return early, and
// the resolver will never see a completed request.
event_engine_resolver_.reset();
}
Unref(DEBUG_LOCATION, "Orphan");
}
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
OnTimeout() {
MutexLock lock(&on_resolved_mu_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("DNSResolver::%p OnTimeout",
resolver_.get());
timeout_handle_.reset();
event_engine_resolver_.reset();
}
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
OnHostnameResolved(absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
new_addresses) {
@ -298,7 +297,7 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
{
MutexLock lock(&on_resolved_mu_);
if (orphaned_) return;
hostname_handle_.reset();
is_hostname_inflight_ = false;
if (!new_addresses.ok()) {
errors_.AddError(new_addresses.status().message());
} else {
@ -327,7 +326,7 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
});
MutexLock lock(&on_resolved_mu_);
if (orphaned_) return;
srv_handle_.reset();
is_srv_inflight_ = false;
if (!srv_records.ok()) {
// An error has occurred, finish resolving.
errors_.AddError(srv_records.status().message());
@ -338,12 +337,20 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
result = OnResolvedLocked();
return;
}
if (!timeout_handle_.has_value()) {
// We could reach here if timeout happened while an SRV query was finishing.
errors_.AddError(
"timed out - not initiating subsequent balancer hostname requests");
result = OnResolvedLocked();
return;
}
// Do a subsequent hostname query since SRV records were returned
for (auto& srv_record : *srv_records) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting balancer hostname resolution for %s:%d",
resolver_.get(), srv_record.host.c_str(), srv_record.port);
auto handle = event_engine_resolver_->LookupHostname(
++number_of_balancer_hostnames_initiated_;
event_engine_resolver_->LookupHostname(
[host = std::move(srv_record.host),
self = Ref(DEBUG_LOCATION, "OnBalancerHostnamesResolved")](
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>>
@ -351,11 +358,7 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
self->OnBalancerHostnamesResolved(std::move(host),
std::move(new_balancer_addresses));
},
srv_record.host, std::to_string(srv_record.port),
resolver_->query_timeout_ms_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("balancer hostname lookup handle: %s",
HandleToString(handle).c_str());
balancer_hostname_handles_.insert(handle);
srv_record.host, std::to_string(srv_record.port));
}
}
@ -399,8 +402,8 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
{
MutexLock lock(&on_resolved_mu_);
if (orphaned_) return;
GPR_ASSERT(txt_handle_.has_value());
txt_handle_.reset();
GPR_ASSERT(is_txt_inflight_);
is_txt_inflight_ = false;
if (!service_config.ok()) {
errors_.AddError(service_config.status().message());
service_config_json_ = service_config.status();
@ -474,20 +477,19 @@ absl::optional<Resolver::Result> EventEngineClientChannelDNSResolver::
EventEngineDNSRequestWrapper::OnResolvedLocked() {
if (orphaned_) return absl::nullopt;
// Wait for all requested queries to return.
if (hostname_handle_.has_value() || srv_handle_.has_value() ||
txt_handle_.has_value() ||
if (is_hostname_inflight_ || is_srv_inflight_ || is_txt_inflight_ ||
number_of_balancer_hostnames_resolved_ !=
balancer_hostname_handles_.size()) {
number_of_balancer_hostnames_initiated_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p OnResolved() waiting for results (hostname: %s, "
"srv: %s, "
"txt: %s, "
"balancer addresses: %" PRIuPTR "/%" PRIuPTR " complete",
this, hostname_handle_.has_value() ? "waiting" : "done",
srv_handle_.has_value() ? "waiting" : "done",
txt_handle_.has_value() ? "waiting" : "done",
this, is_hostname_inflight_ ? "waiting" : "done",
is_srv_inflight_ ? "waiting" : "done",
is_txt_inflight_ ? "waiting" : "done",
number_of_balancer_hostnames_resolved_,
balancer_hostname_handles_.size());
number_of_balancer_hostnames_initiated_);
return absl::nullopt;
}
GRPC_EVENT_ENGINE_RESOLVER_TRACE(

@ -23,8 +23,6 @@ namespace experimental {
const EventEngine::TaskHandle EventEngine::TaskHandle::kInvalid = {-1, -1};
const EventEngine::ConnectionHandle EventEngine::ConnectionHandle::kInvalid = {
-1, -1};
const EventEngine::DNSResolver::LookupTaskHandle
EventEngine::DNSResolver::LookupTaskHandle::kInvalid = {-1, -1};
bool operator==(const EventEngine::TaskHandle& lhs,
const EventEngine::TaskHandle& rhs) {
@ -46,15 +44,5 @@ bool operator!=(const EventEngine::ConnectionHandle& lhs,
return !(lhs == rhs);
}
bool operator==(const EventEngine::DNSResolver::LookupTaskHandle& lhs,
const EventEngine::DNSResolver::LookupTaskHandle& rhs) {
return lhs.keys[0] == rhs.keys[0] && lhs.keys[1] == rhs.keys[1];
}
bool operator!=(const EventEngine::DNSResolver::LookupTaskHandle& lhs,
const EventEngine::DNSResolver::LookupTaskHandle& rhs) {
return !(lhs == rhs);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -49,10 +49,6 @@ using ConnectionHandleSet = absl::flat_hash_set<
EventEngine::ConnectionHandle,
TaskHandleComparator<EventEngine::ConnectionHandle>::Hash>;
using LookupTaskHandleSet = absl::flat_hash_set<
EventEngine::DNSResolver::LookupTaskHandle,
TaskHandleComparator<EventEngine::DNSResolver::LookupTaskHandle>::Hash>;
} // namespace experimental
} // namespace grpc_event_engine

@ -139,17 +139,13 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
class PosixDNSResolver : public EventEngine::DNSResolver {
public:
~PosixDNSResolver() override;
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) override;
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) override;
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) override;
bool CancelLookup(LookupTaskHandle handle) override;
absl::string_view default_port) override;
void LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name) override;
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) override;
};
#ifdef GRPC_POSIX_SOCKET_TCP

@ -111,10 +111,9 @@ bool ThreadyEventEngine::Cancel(TaskHandle handle) {
return impl_->Cancel(handle);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupHostname(
void ThreadyEventEngine::ThreadyDNSResolver::LookupHostname(
LookupHostnameCallback on_resolve, absl::string_view name,
absl::string_view default_port, Duration timeout) {
absl::string_view default_port) {
return impl_->LookupHostname(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::vector<ResolvedAddress>> addresses) mutable {
@ -123,13 +122,11 @@ ThreadyEventEngine::ThreadyDNSResolver::LookupHostname(
on_resolve(std::move(addresses));
});
},
name, default_port, timeout);
name, default_port);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) {
void ThreadyEventEngine::ThreadyDNSResolver::LookupSRV(
LookupSRVCallback on_resolve, absl::string_view name) {
return impl_->LookupSRV(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::vector<SRVRecord>> records) mutable {
@ -139,13 +136,11 @@ ThreadyEventEngine::ThreadyDNSResolver::LookupSRV(LookupSRVCallback on_resolve,
on_resolve(std::move(records));
});
},
name, timeout);
name);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) {
void ThreadyEventEngine::ThreadyDNSResolver::LookupTXT(
LookupTXTCallback on_resolve, absl::string_view name) {
return impl_->LookupTXT(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::vector<std::string>> record) mutable {
@ -154,12 +149,7 @@ ThreadyEventEngine::ThreadyDNSResolver::LookupTXT(LookupTXTCallback on_resolve,
on_resolve(std::move(record));
});
},
name, timeout);
}
bool ThreadyEventEngine::ThreadyDNSResolver::CancelLookup(
LookupTaskHandle handle) {
return impl_->CancelLookup(handle);
name);
}
} // namespace experimental

@ -80,17 +80,13 @@ class ThreadyEventEngine final : public EventEngine {
public:
explicit ThreadyDNSResolver(std::unique_ptr<DNSResolver> impl)
: impl_(std::move(impl)) {}
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) override;
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) override;
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) override;
bool CancelLookup(LookupTaskHandle handle) override;
absl::string_view default_port) override;
void LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name) override;
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) override;
private:
std::unique_ptr<DNSResolver> impl_;

@ -48,17 +48,13 @@ class WindowsEventEngine : public EventEngine,
class WindowsDNSResolver : public EventEngine::DNSResolver {
public:
~WindowsDNSResolver() override;
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) override;
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) override;
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) override;
bool CancelLookup(LookupTaskHandle handle) override;
absl::string_view default_port) override;
void LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name) override;
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name) override;
};
WindowsEventEngine();

@ -35,7 +35,9 @@ namespace {
NoDestruct<std::shared_ptr<DNSResolver>> g_dns_resolver;
}
constexpr DNSResolver::TaskHandle DNSResolver::kNullHandle;
const DNSResolver::LookupTaskHandle DNSResolver::LookupTaskHandle::kInvalid = {
-1, -1};
const DNSResolver::TaskHandle DNSResolver::kNullHandle = {0, 0};
void ResetDNSResolver(std::shared_ptr<DNSResolver> resolver) {
*g_dns_resolver = std::move(resolver);
@ -43,6 +45,16 @@ void ResetDNSResolver(std::shared_ptr<DNSResolver> resolver) {
std::shared_ptr<DNSResolver> GetDNSResolver() { return *g_dns_resolver; }
bool operator==(const DNSResolver::LookupTaskHandle& lhs,
const DNSResolver::LookupTaskHandle& rhs) {
return lhs.keys[0] == rhs.keys[0] && lhs.keys[1] == rhs.keys[1];
}
bool operator!=(const DNSResolver::LookupTaskHandle& lhs,
const DNSResolver::LookupTaskHandle& rhs) {
return !(lhs == rhs);
}
std::string DNSResolver::HandleToString(TaskHandle handle) {
return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}");
}

@ -23,10 +23,12 @@
#include <stddef.h>
#include "absl/container/flat_hash_set.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/pollset_set.h"
@ -43,9 +45,21 @@ constexpr Duration kDefaultDNSRequestTimeout = Duration::Minutes(2);
// A singleton class used for async and blocking DNS resolution
class DNSResolver {
public:
using TaskHandle = ::grpc_event_engine::experimental::EventEngine::
DNSResolver::LookupTaskHandle;
static constexpr TaskHandle kNullHandle{0, 0};
/// Task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t keys[2];
static const LookupTaskHandle kInvalid;
friend bool operator==(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
friend bool operator!=(const LookupTaskHandle& lhs,
const LookupTaskHandle& rhs);
};
using TaskHandle = LookupTaskHandle;
using TaskHandleSet = absl::flat_hash_set<
TaskHandle,
grpc_event_engine::experimental::TaskHandleComparator<TaskHandle>::Hash>;
static const TaskHandle kNullHandle;
virtual ~DNSResolver() {}

@ -23,8 +23,7 @@ template <typename T>
class TaskHandleTest : public testing::Test {};
using HandleTypes =
::testing::Types<EventEngine::TaskHandle, EventEngine::ConnectionHandle,
EventEngine::DNSResolver::LookupTaskHandle>;
::testing::Types<EventEngine::TaskHandle, EventEngine::ConnectionHandle>;
TYPED_TEST_SUITE(TaskHandleTest, HandleTypes);
TYPED_TEST(TaskHandleTest, Identity) {

@ -13,8 +13,6 @@
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <algorithm>
#include <memory>
#include <string>
@ -23,9 +21,7 @@
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -141,6 +137,12 @@ class FuzzingResolverEventEngine
return std::make_unique<FuzzingDNSResolver>(this);
}
TaskHandle RunAfter(Duration /* when */,
absl::AnyInvocable<void()> /* closure */) override {
return TaskHandle::kInvalid;
}
bool Cancel(TaskHandle /* handle */) override { return true; }
void Tick() { runner_.Tick(); }
private:
@ -149,74 +151,39 @@ class FuzzingResolverEventEngine
explicit FuzzingDNSResolver(FuzzingResolverEventEngine* engine)
: engine_(engine) {}
~FuzzingDNSResolver() override { GPR_ASSERT(known_handles_.empty()); }
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
void LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view /* name */,
absl::string_view /* default_port */,
Duration /* timeout */) override {
int handle = NextHandle();
absl::string_view /* default_port */) override {
CheckAndSetOrphan(ExecutionStep::DURING_LOOKUP_HOSTNAME);
if (!engine_->has_been_orphaned_) {
engine_->runner_.Run(
[this, cb = std::move(on_resolve), handle]() mutable {
if (!HandleExists(handle)) return;
DeleteHandle(handle);
cb(engine_->hostname_responses_);
engine_->runner_.Run([this, cb = std::move(on_resolve)]() mutable {
CheckAndSetOrphan(ExecutionStep::AFTER_LOOKUP_HOSTNAME_CALLBACK);
cb(engine_->hostname_responses_);
});
}
return {handle, 0};
}
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view /* name */,
Duration /* timeout */) override {
int handle = NextHandle();
void LookupSRV(LookupSRVCallback on_resolve,
absl::string_view /* name */) override {
CheckAndSetOrphan(ExecutionStep::DURING_LOOKUP_SRV);
if (!engine_->has_been_orphaned_) {
engine_->runner_.Run(
[this, cb = std::move(on_resolve), handle]() mutable {
if (!HandleExists(handle)) return;
DeleteHandle(handle);
cb(engine_->srv_responses_);
engine_->runner_.Run([this, cb = std::move(on_resolve)]() mutable {
CheckAndSetOrphan(ExecutionStep::AFTER_LOOKUP_SRV_CALLBACK);
cb(engine_->srv_responses_);
});
}
return {handle, 0};
}
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view /* name */,
Duration /* timeout */) override {
int handle = NextHandle();
void LookupTXT(LookupTXTCallback on_resolve,
absl::string_view /* name */) override {
CheckAndSetOrphan(ExecutionStep::DURING_LOOKUP_TXT);
if (!engine_->has_been_orphaned_) {
engine_->runner_.Run(
[this, cb = std::move(on_resolve), handle]() mutable {
if (!HandleExists(handle)) return;
DeleteHandle(handle);
cb(engine_->txt_responses_);
engine_->runner_.Run([this, cb = std::move(on_resolve)]() mutable {
CheckAndSetOrphan(ExecutionStep::AFTER_LOOKUP_TXT_CALLBACK);
cb(engine_->txt_responses_);
});
}
return {handle, 0};
}
bool CancelLookup(LookupTaskHandle handle) override {
int bit_handle = handle.keys[0];
if (!HandleExists(bit_handle)) return false;
DeleteHandle(bit_handle);
return true;
}
private:
int NextHandle() {
static uint64_t next_handle = 0;
known_handles_.insert(++next_handle);
return next_handle;
}
bool HandleExists(int handle) { return known_handles_.contains(handle); }
void DeleteHandle(int handle) { known_handles_.erase(handle); }
void CheckAndSetOrphan(ExecutionStep current_execution_step) {
if (engine_->should_orphan_at_step_ == current_execution_step) {
*engine_->done_resolving_ = true;
@ -225,8 +192,6 @@ class FuzzingResolverEventEngine
}
FuzzingResolverEventEngine* engine_;
// The set of outstanding LookupTaskHandles.
absl::flat_hash_set<uint64_t> known_handles_;
};
// members

Loading…
Cancel
Save