[EventEngine] Fix issues found when enabling `event_engine_dns` experiment in OSS (#35530)

Using `AF_UNSPEC` for both IPv4 and IPv6 queries does not work in all cases. Specifically, for `localhost:<>`, c-ares only returns the IPv6 record i.e. `::1`.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35530

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35530 from yijiem:enable-oss-ee-dns-posix 452b5a2d81
PiperOrigin-RevId: 599989537
pull/35620/head
Yijie Ma 1 year ago committed by Copybara-Service
parent f1254a78b3
commit 5bf0971972
  1. 1
      CMakeLists.txt
  2. 1
      build_autogenerated.yaml
  3. 1
      grpc.gyp
  4. 2
      src/core/BUILD
  5. 2
      src/core/ext/filters/client_channel/resolver/dns/dns_resolver_plugin.cc
  6. 165
      src/core/lib/event_engine/ares_resolver.cc
  7. 6
      src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h
  8. 3
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  9. 1
      test/core/end2end/BUILD
  10. 8
      test/core/end2end/goaway_server_test.cc
  11. 1
      test/core/event_engine/fuzzing_event_engine/BUILD
  12. 11
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  13. 4
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  14. 8
      test/core/event_engine/test_suite/tests/dns_test.cc

1
CMakeLists.txt generated

@ -5410,6 +5410,7 @@ target_link_libraries(grpc_authorization_provider
absl::utility
${_gRPC_CARES_LIBRARIES}
gpr
${_gRPC_ADDRESS_SORTING_LIBRARIES}
)
foreach(_hdr

@ -4995,6 +4995,7 @@ libs:
- absl/utility:utility
- cares
- gpr
- address_sorting
- name: grpc_plugin_support
build: protoc
language: c++

1
grpc.gyp generated

@ -2025,6 +2025,7 @@
'absl/utility:utility',
'cares',
'gpr',
'address_sorting',
],
'sources': [
'src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c',

@ -2568,7 +2568,6 @@ grpc_cc_library(
],
deps = [
"iomgr_port",
"ref_counted_dns_resolver_interface",
"useful",
"//:event_engine_base_hdrs",
"//:gpr",
@ -2599,6 +2598,7 @@ grpc_cc_library(
"absl/strings:str_format",
"absl/types:optional",
"absl/types:variant",
"address_sorting",
"cares",
],
deps = [

@ -38,12 +38,14 @@ void RegisterDnsResolver(CoreConfiguration::Builder* builder) {
std::make_unique<EventEngineClientChannelDNSResolverFactory>());
return;
#endif
#ifndef GRPC_DO_NOT_INSTANTIATE_POSIX_POLLER
if (IsEventEngineDnsEnabled()) {
gpr_log(GPR_DEBUG, "Using EventEngine dns resolver");
builder->resolver_registry()->RegisterResolverFactory(
std::make_unique<EventEngineClientChannelDNSResolverFactory>());
return;
}
#endif
auto resolver = ConfigVars::Get().DnsResolver();
// ---- Ares resolver ----
if (ShouldUseAresDnsResolver(resolver)) {

@ -32,6 +32,7 @@
#if GRPC_ARES == 1
#include <address_sorting/address_sorting.h>
#include <ares.h>
#if ARES_VERSION >= 0x011200
@ -144,6 +145,28 @@ absl::Status SetRequestDNSServer(absl::string_view dns_server,
return absl::OkStatus();
}
std::vector<EventEngine::ResolvedAddress> SortAddresses(
const std::vector<EventEngine::ResolvedAddress>& addresses) {
address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
for (size_t i = 0; i < addresses.size(); i++) {
sortables[i].user_data =
const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
addresses[i].size());
sortables[i].dest_addr.len = addresses[i].size();
}
address_sorting_rfc_6724_sort(sortables, addresses.size());
std::vector<EventEngine::ResolvedAddress> sorted_addresses;
sorted_addresses.reserve(addresses.size());
for (size_t i = 0; i < addresses.size(); ++i) {
sorted_addresses.emplace_back(
*static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
}
gpr_free(sortables);
return sorted_addresses;
}
struct QueryArg {
QueryArg(AresResolver* ar, int id, absl::string_view name)
: ares_resolver(ar), callback_map_id(id), query_name(name) {}
@ -156,6 +179,9 @@ struct HostnameQueryArg : public QueryArg {
HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
: QueryArg(ar, id, name), port(p) {}
int port;
int pending_requests;
absl::Status error_status;
std::vector<EventEngine::ResolvedAddress> result;
};
} // namespace
@ -291,9 +317,16 @@ void AresResolver::LookupHostname(
callback_map_.emplace(++id_, std::move(callback));
auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
if (IsIpv6LoopbackAvailable()) {
ares_gethostbyname(channel_, std::string(host).c_str(), AF_UNSPEC,
// Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
// all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
// ::1).
resolver_arg->pending_requests = 2;
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
} else {
resolver_arg->pending_requests = 1;
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
}
@ -548,74 +581,88 @@ void AresResolver::OnAresBackupPollAlarm() {
void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
int /*timeouts*/,
struct hostent* hostent) {
std::unique_ptr<HostnameQueryArg> hostname_qa(
static_cast<HostnameQueryArg*>(arg));
auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
GPR_ASSERT(hostname_qa->pending_requests-- > 0);
auto* ares_resolver = hostname_qa->ares_resolver;
auto nh = ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
GPR_ASSERT(!nh.empty());
GPR_ASSERT(
absl::holds_alternative<EventEngine::DNSResolver::LookupHostnameCallback>(
nh.mapped()));
auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
std::move(nh.mapped()));
if (status != ARES_SUCCESS) {
std::string error_msg =
absl::StrFormat("address lookup failed for %s: %s",
hostname_qa->query_name, ares_strerror(status));
GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnHostbynameDoneLocked: %s",
ares_resolver, error_msg.c_str());
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
status = AresStatusToAbslStatus(status, error_msg)]() mutable {
callback(status);
});
return;
}
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS", ares_resolver,
hostname_qa->query_name.c_str());
std::vector<EventEngine::ResolvedAddress> result;
for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
switch (hostent->h_addrtype) {
case AF_INET6: {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
sizeof(struct in6_addr));
addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = htons(hostname_qa->port);
result.emplace_back(reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
break;
}
case AF_INET: {
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr));
addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = htons(hostname_qa->port);
result.emplace_back(reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
ares_resolver, output, hostname_qa->port);
break;
hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
} else {
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS",
ares_resolver, hostname_qa->query_name.c_str());
for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
switch (hostent->h_addrtype) {
case AF_INET6: {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
sizeof(struct in6_addr));
addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = htons(hostname_qa->port);
hostname_qa->result.emplace_back(
reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
break;
}
case AF_INET: {
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin_addr, hostent->h_addr_list[i],
sizeof(struct in_addr));
addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = htons(hostname_qa->port);
hostname_qa->result.emplace_back(
reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
ares_resolver, output, hostname_qa->port);
break;
}
default:
grpc_core::Crash(
absl::StrFormat("resolver:%p Received invalid type of address %d",
ares_resolver, hostent->h_addrtype));
}
}
}
ares_resolver->event_engine_->Run(
[callback = std::move(callback), result = std::move(result)]() mutable {
callback(std::move(result));
});
if (hostname_qa->pending_requests == 0) {
auto nh =
ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
GPR_ASSERT(!nh.empty());
GPR_ASSERT(absl::holds_alternative<
EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
std::move(nh.mapped()));
if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
result = SortAddresses(hostname_qa->result)]() mutable {
callback(std::move(result));
});
} else {
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
result = std::move(hostname_qa->error_status)]() mutable {
callback(std::move(result));
});
}
delete hostname_qa;
}
}
void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,

@ -27,14 +27,12 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/ref_counted_dns_resolver_interface.h"
namespace grpc_event_engine {
namespace experimental {
// An asynchronous DNS resolver which uses the native platform's getaddrinfo
// API. Only supports A/AAAA records.
class NativePosixDNSResolver : public RefCountedDNSResolverInterface {
class NativePosixDNSResolver : public EventEngine::DNSResolver {
public:
explicit NativePosixDNSResolver(std::shared_ptr<EventEngine> event_engine);
@ -48,8 +46,6 @@ class NativePosixDNSResolver : public RefCountedDNSResolverInterface {
void LookupTXT(EventEngine::DNSResolver::LookupTXTCallback on_resolved,
absl::string_view name) override;
void Orphan() override { delete this; }
private:
std::shared_ptr<EventEngine> event_engine_;
};

@ -573,8 +573,7 @@ PosixEventEngine::GetDNSResolver(
}
GRPC_EVENT_ENGINE_DNS_TRACE(
"PosixEventEngine:%p creating NativePosixDNSResolver", this);
return std::make_unique<PosixEventEngine::PosixDNSResolver>(
grpc_core::MakeOrphanable<NativePosixDNSResolver>(shared_from_this()));
return std::make_unique<NativePosixDNSResolver>(shared_from_this());
#endif // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
}

@ -543,6 +543,7 @@ grpc_cc_test(
"//src/core:closure",
"//src/core:default_event_engine",
"//src/core:error",
"//src/core:experiments",
"//src/core:grpc_sockaddr",
"//src/core:iomgr_fwd",
"//src/core:resolved_address",

@ -45,6 +45,7 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
@ -207,6 +208,13 @@ static void my_cancel_ares_request(grpc_ares_request* request) {
}
int main(int argc, char** argv) {
// TODO(yijiem): rewrite this test with a custom EventEngine DNS Resolver
if (grpc_core::IsEventEngineDnsEnabled()) {
gpr_log(
GPR_ERROR,
"Skipping iomgr-specific DNS test because EventEngine DNS is enabled");
return 0;
}
grpc_completion_queue* cq;
grpc_op ops[6];
grpc_op* op;

@ -29,6 +29,7 @@ grpc_cc_library(
":fuzzing_event_engine_proto",
"//:event_engine_base_hdrs",
"//src/core:default_event_engine",
"//src/core:native_posix_dns_resolver",
"//src/core:time",
"//test/core/util:grpc_test_util",
],

@ -34,10 +34,15 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/port.h"
#if defined(GRPC_POSIX_SOCKET_TCP)
#include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
#endif
// IWYU pragma: no_include <sys/socket.h>
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
@ -497,7 +502,11 @@ bool FuzzingEventEngine::IsWorkerThread() { abort(); }
absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
abort();
#if defined(GRPC_POSIX_SOCKET_TCP)
return std::make_unique<NativePosixDNSResolver>(shared_from_this());
#else
grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
#endif
}
void FuzzingEventEngine::Run(Closure* closure) {

@ -48,9 +48,7 @@ namespace experimental {
// EventEngine implementation to be used by fuzzers.
// It's only allowed to have one FuzzingEventEngine instantiated at a time.
class FuzzingEventEngine
: public EventEngine,
public std::enable_shared_from_this<FuzzingEventEngine> {
class FuzzingEventEngine : public EventEngine {
public:
struct Options {
Duration max_delay_run_after = std::chrono::seconds(30);

@ -435,7 +435,15 @@ TEST_F(EventEngineDNSTest, LocalHost) {
auto dns_resolver = CreateDNSResolverWithoutSpecifyingServer();
dns_resolver->LookupHostname(
[this](auto result) {
#ifdef GRPC_IOS_EVENT_ENGINE_CLIENT
EXPECT_SUCCESS();
#else
EXPECT_TRUE(result.ok());
EXPECT_THAT(*result,
Pointwise(ResolvedAddressEq(),
{*URIToResolvedAddress("ipv6:[::1]:1"),
*URIToResolvedAddress("ipv4:127.0.0.1:1")}));
#endif // GRPC_IOS_EVENT_ENGINE_CLIENT
dns_resolver_signal_.Notify();
},
"localhost:1", "");

Loading…
Cancel
Save