From a98819007d32820bbd09fa98efbe17b2a65e808a Mon Sep 17 00:00:00 2001 From: apolcyn Date: Thu, 18 Mar 2021 21:55:14 -0700 Subject: [PATCH] Revert "Convert grpc_ares_wrapper to C++ (#25108)" (#25761) This reverts commit 2ee70175bd091e14542d2209c6e7b0d74e549ec9. --- .../resolver/dns/c_ares/dns_resolver_ares.cc | 47 +- .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 78 +- .../dns/c_ares/grpc_ares_ev_driver_posix.cc | 6 +- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 14 +- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 1749 ++++++++++------- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 321 +-- .../dns/c_ares/grpc_ares_wrapper_libuv.cc | 6 +- .../dns/c_ares/grpc_ares_wrapper_posix.cc | 6 +- .../dns/c_ares/grpc_ares_wrapper_windows.cc | 6 +- .../dns_resolver_connectivity_test.cc | 28 +- .../resolvers/dns_resolver_cooldown_test.cc | 33 +- test/core/end2end/goaway_server_test.cc | 38 +- test/cpp/naming/address_sorting_test.cc | 52 +- test/cpp/naming/cancel_ares_query_test.cc | 7 - test/cpp/naming/resolver_component_test.cc | 9 +- 15 files changed, 1206 insertions(+), 1194 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 5c9541af1f9..922ac453df3 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -81,8 +81,9 @@ class AresDnsResolver : public Resolver { void StartResolvingLocked(); static void OnNextResolution(void* arg, grpc_error* error); - void OnResolvedLocked(grpc_error* error); + static void OnResolved(void* arg, grpc_error* error); void OnNextResolutionLocked(grpc_error* error); + void OnResolvedLocked(grpc_error* error); /// DNS server to use (if not system default) std::string dns_server_; @@ -106,10 +107,11 @@ class AresDnsResolver : public Resolver { /// closures used by the work_serializer grpc_closure on_next_resolution_; + grpc_closure on_resolved_; /// are we currently resolving? bool resolving_ = false; /// the pending resolving request - OrphanablePtr pending_request_; + grpc_ares_request* pending_request_ = nullptr; /// next resolution timer bool have_next_resolution_timer_ = false; grpc_timer next_resolution_timer_; @@ -122,7 +124,7 @@ class AresDnsResolver : public Resolver { /// currently resolving balancer addresses std::unique_ptr balancer_addresses_; /// currently resolving service config - absl::optional service_config_json_; + char* service_config_json_ = nullptr; // has shutdown been initiated bool shutdown_initiated_ = false; }; @@ -154,6 +156,7 @@ AresDnsResolver::AresDnsResolver(ResolverArgs args) // Closure initialization. GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx); } AresDnsResolver::~AresDnsResolver() { @@ -185,7 +188,9 @@ void AresDnsResolver::ShutdownLocked() { if (have_next_resolution_timer_) { grpc_timer_cancel(&next_resolution_timer_); } - pending_request_.reset(); + if (pending_request_ != nullptr) { + grpc_cancel_ares_request_locked(pending_request_); + } } void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) { @@ -221,7 +226,7 @@ bool ValueInJsonArray(const Json::Array& array, const char* value) { return false; } -std::string ChooseServiceConfig(absl::string_view service_config_choice_json, +std::string ChooseServiceConfig(char* service_config_choice_json, grpc_error** error) { Json json = Json::Parse(service_config_choice_json, error); if (*error != GRPC_ERROR_NONE) return ""; @@ -300,10 +305,18 @@ std::string ChooseServiceConfig(absl::string_view service_config_choice_json, return service_config->Dump(); } +void AresDnsResolver::OnResolved(void* arg, grpc_error* error) { + AresDnsResolver* r = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + r->work_serializer_->Run([r, error]() { r->OnResolvedLocked(error); }, + DEBUG_LOCATION); +} + void AresDnsResolver::OnResolvedLocked(grpc_error* error) { GPR_ASSERT(resolving_); resolving_ = false; - pending_request_.reset(); + gpr_free(pending_request_); + pending_request_ = nullptr; if (shutdown_initiated_) { Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown"); GRPC_ERROR_UNREF(error); @@ -314,9 +327,10 @@ void AresDnsResolver::OnResolvedLocked(grpc_error* error) { if (addresses_ != nullptr) { result.addresses = std::move(*addresses_); } - if (service_config_json_.has_value()) { + if (service_config_json_ != nullptr) { std::string service_config_string = ChooseServiceConfig( - service_config_json_.value(), &result.service_config_error); + service_config_json_, &result.service_config_error); + gpr_free(service_config_json_); if (result.service_config_error == GRPC_ERROR_NONE && !service_config_string.empty()) { GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", @@ -407,17 +421,16 @@ void AresDnsResolver::StartResolvingLocked() { Ref(DEBUG_LOCATION, "dns-resolving").release(); GPR_ASSERT(!resolving_); resolving_ = true; - service_config_json_.reset(); - auto on_done = [this](grpc_error* error) { OnResolvedLocked(error); }; - pending_request_ = LookupAresLocked( - dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, - std::move(on_done), &addresses_, + service_config_json_ = nullptr; + pending_request_ = grpc_dns_lookup_ares_locked( + dns_server_.c_str(), name_to_resolve_.c_str(), kDefaultPort, + interested_parties_, &on_resolved_, &addresses_, enable_srv_queries_ ? &balancer_addresses_ : nullptr, request_service_config_ ? &service_config_json_ : nullptr, query_timeout_ms_, work_serializer_); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", - this, pending_request_.get()); + this, pending_request_); } // @@ -450,7 +463,7 @@ static grpc_error* blocking_resolve_address_ares( } static grpc_address_resolver_vtable ares_resolver = { - grpc_core::ResolveAddressAres, blocking_resolve_address_ares}; + grpc_resolve_address_ares, blocking_resolve_address_ares}; #ifdef GRPC_UV /* TODO(murgatroid99): Remove this when we want the cares resolver to be the @@ -477,7 +490,7 @@ void grpc_resolver_dns_ares_init() { g_use_ares_dns_resolver = true; gpr_log(GPR_DEBUG, "Using ares dns resolver"); address_sorting_init(); - grpc_error* error = grpc_core::AresRequest::Init(); + grpc_error* error = grpc_ares_init(); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error); return; @@ -496,7 +509,7 @@ void grpc_resolver_dns_ares_init() { void grpc_resolver_dns_ares_shutdown() { if (g_use_ares_dns_resolver) { address_sorting_shutdown(); - grpc_core::AresRequest::Shutdown(); + grpc_ares_cleanup(); } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index feb2d3fc75e..cc884864862 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -1,18 +1,20 @@ -// -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H @@ -25,49 +27,40 @@ namespace grpc_core { -/// A wrapped fd that integrates with the grpc iomgr of the current platform. -/// A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints -/// from "ares_socket_t" sockets, and then sign up for readability/writeability -/// with that poller, and do shutdown and destruction. +/* A wrapped fd that integrates with the grpc iomgr of the current platform. + * A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints + * from "ares_socket_t" sockets, and then sign up for readability/writeability + * with that poller, and do shutdown and destruction. */ class GrpcPolledFd { public: virtual ~GrpcPolledFd() {} - // Called when c-ares library is interested and there's no pending callback + /* Called when c-ares library is interested and there's no pending callback */ virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) = 0; - // Called when c-ares library is interested and there's no pending callback + /* Called when c-ares library is interested and there's no pending callback */ virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) = 0; - // Indicates if there is data left even after just being read from + /* Indicates if there is data left even after just being read from */ virtual bool IsFdStillReadableLocked() = 0; - // Called once and only once. Must cause cancellation of any pending - // read/write callbacks. + /* Called once and only once. Must cause cancellation of any pending + * read/write callbacks. */ virtual void ShutdownLocked(grpc_error* error) = 0; - // Get the underlying ares_socket_t that this was created from + /* Get the underlying ares_socket_t that this was created from */ virtual ares_socket_t GetWrappedAresSocketLocked() = 0; - // A unique name, for logging + /* A unique name, for logging */ virtual const char* GetName() = 0; }; -/// A GrpcPolledFdFactory is 1-to-1 with and owned by the -/// ares event driver. It knows how to create GrpcPolledFd's -/// for the current platform, and the ares driver uses it for all of -/// its fd's. +/* A GrpcPolledFdFactory is 1-to-1 with and owned by the + * ares event driver. It knows how to create GrpcPolledFd's + * for the current platform, and the ares driver uses it for all of + * its fd's. */ class GrpcPolledFdFactory { public: virtual ~GrpcPolledFdFactory() {} - - /// Creates a new wrapped fd for the current platform. - /// - /// Note about \a driver_pollset_set lifetime: the \a driver_pollset_set - /// param exists in this factory function because some \a GrpcPolledFd - /// implementations need to use it. If a \a GrpcPolledFd object does need - /// to use \a driver_pollset_set, it is safe to access it up through the point - /// that \a GrpcPolledFd::ShutdownLocked has been run, and it is no longer - /// safe to access after that. + /* Creates a new wrapped fd for the current platform */ virtual GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, std::shared_ptr work_serializer) = 0; - - /// Optionally configures the ares channel after creation + /* Optionally configures the ares channel after creation */ virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; }; @@ -76,4 +69,5 @@ std::unique_ptr NewGrpcPolledFdFactory( } // namespace grpc_core -#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ + */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 94c4b786ff0..07f38ba59ab 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -50,6 +50,7 @@ class GrpcPolledFdPosix : public GrpcPolledFd { } ~GrpcPolledFdPosix() override { + grpc_pollset_set_del_fd(driver_pollset_set_, fd_); /* c-ares library will close the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ @@ -72,11 +73,6 @@ class GrpcPolledFdPosix : public GrpcPolledFd { } void ShutdownLocked(grpc_error* error) override { - // After we return, driver_pollset_set_ is no longer - // safe to access because the overall \a LookupAresLocked call may - // complete, after which its owner may destroy it. So delete the fd now. - grpc_pollset_set_del_fd(driver_pollset_set_, fd_); - driver_pollset_set_ = nullptr; grpc_fd_shutdown(fd_, error); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index c39b939baa4..8d4a0cb210d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -246,7 +246,7 @@ class GrpcPolledFdWindows { break; case WRITE_PENDING: case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: - GPR_ASSERT(0); + abort(); } } } @@ -346,7 +346,7 @@ class GrpcPolledFdWindows { case SOCK_STREAM: return SendVTCP(wsa_error_ctx, iov, iov_count); default: - GPR_ASSERT(0); + abort(); } } @@ -417,7 +417,7 @@ class GrpcPolledFdWindows { tcp_write_state_ = WRITE_IDLE; return total_sent; } - GPR_ASSERT(0); + abort(); } static void OnTcpConnect(void* arg, grpc_error* error) { @@ -483,7 +483,7 @@ class GrpcPolledFdWindows { case SOCK_STREAM: return ConnectTCP(wsa_error_ctx, target, target_len); default: - GPR_ASSERT(0); + abort(); } } @@ -722,9 +722,7 @@ class SockToPolledFdMap { return node->polled_fd; } } - gpr_log(GPR_ERROR, "LookupPolledFd for socket: %d failed. head_: %p", s, - head_); - GPR_ASSERT(0); + abort(); } void RemoveEntry(SOCKET s) { @@ -739,7 +737,7 @@ class SockToPolledFdMap { } prev = &node->next; } - GPR_ASSERT(0); + abort(); } /* These virtual socket functions are called from within the c-ares diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 21ecb396e67..fa3249efa55 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -27,10 +27,8 @@ #include #include "absl/container/inlined_vector.h" -#include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" -#include "absl/strings/str_join.h" #include #include @@ -39,6 +37,7 @@ #include #include +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/iomgr/error.h" @@ -47,68 +46,627 @@ #include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/authority_override.h" +using grpc_core::ServerAddress; +using grpc_core::ServerAddressList; + grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, "cares_address_sorting"); grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver"); -namespace grpc_core { - -void AresRequest::AddressQuery::Create(AresRequest* request, - const std::string& host, uint16_t port, - bool is_balancer, int address_family) { - AddressQuery* q = - new AddressQuery(request, host, port, is_balancer, address_family); - // note that ares_gethostbyname can't be invoked from the ctor because it - // can run it's callback inline and invoke the dtor - ares_gethostbyname(request->channel_, q->host_.c_str(), q->address_family_, - OnHostByNameDoneLocked, q); -} - -AresRequest::AddressQuery::AddressQuery(AresRequest* request, - const std::string& host, uint16_t port, - bool is_balancer, int address_family) - : request_(request), - host_(host), - port_(port), - is_balancer_(is_balancer), - address_family_(address_family) { - ++request_->pending_queries_; - if (address_family_ == AF_INET) { - qtype_ = "A"; - } else if (address_family_ == AF_INET6) { - qtype_ = "AAAA"; +typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; + +struct grpc_ares_request { + /** indicates the DNS server to use, if specified */ + struct ares_addr_port_node dns_server_addr; + /** following members are set in grpc_resolve_address_ares_impl */ + /** closure to call when the request completes */ + grpc_closure* on_done; + /** the pointer to receive the resolved addresses */ + std::unique_ptr* addresses_out; + /** the pointer to receive the resolved balancer addresses */ + std::unique_ptr* balancer_addresses_out; + /** the pointer to receive the service config in JSON */ + char** service_config_json_out; + /** the evernt driver used by this request */ + grpc_ares_ev_driver* ev_driver; + /** number of ongoing queries */ + size_t pending_queries; + + /** the errors explaining query failures, appended to in query callbacks */ + grpc_error* error; +}; + +typedef struct fd_node { + /** the owner of this fd node */ + grpc_ares_ev_driver* ev_driver; + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ + grpc_closure read_closure; + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ + grpc_closure write_closure; + /** next fd node in the list */ + struct fd_node* next; + + /** wrapped fd that's polled by grpc's poller for the current platform */ + grpc_core::GrpcPolledFd* grpc_polled_fd; + /** if the readable closure has been registered */ + bool readable_registered; + /** if the writable closure has been registered */ + bool writable_registered; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; +} fd_node; + +struct grpc_ares_ev_driver { + /** the ares_channel owned by this event driver */ + ares_channel channel; + /** pollset set for driving the IO events of the channel */ + grpc_pollset_set* pollset_set; + /** refcount of the event driver */ + gpr_refcount refs; + + /** work_serializer to synchronize c-ares and I/O callbacks on */ + std::shared_ptr work_serializer; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node* fds; + /** is this event driver being shut down */ + bool shutting_down; + /** request object that's using this ev driver */ + grpc_ares_request* request; + /** Owned by the ev_driver. Creates new GrpcPolledFd's */ + std::unique_ptr polled_fd_factory; + /** query timeout in milliseconds */ + int query_timeout_ms; + /** alarm to cancel active queries */ + grpc_timer query_timeout; + /** cancels queries on a timeout */ + grpc_closure on_timeout_locked; + /** alarm to poll ares_process on in case fd events don't happen */ + grpc_timer ares_backup_poll_alarm; + /** polls ares_process on a periodic timer */ + grpc_closure on_ares_backup_poll_alarm_locked; +}; + +// TODO(apolcyn): make grpc_ares_hostbyname_request a sub-class +// of GrpcAresQuery. +typedef struct grpc_ares_hostbyname_request { + /** following members are set in create_hostbyname_request_locked + */ + /** the top-level request instance */ + grpc_ares_request* parent_request; + /** host to resolve, parsed from the name to resolve */ + char* host; + /** port to fill in sockaddr_in, parsed from the name to resolve */ + uint16_t port; + /** is it a grpclb address */ + bool is_balancer; + /** for logging and errors: the query type ("A" or "AAAA") */ + const char* qtype; +} grpc_ares_hostbyname_request; + +static void grpc_ares_request_ref_locked(grpc_ares_request* r); +static void grpc_ares_request_unref_locked(grpc_ares_request* r); + +// TODO(apolcyn): as a part of C++-ification, find a way to +// organize per-query and per-resolution information in such a way +// that doesn't involve allocating a number of different data +// structures. +class GrpcAresQuery { + public: + explicit GrpcAresQuery(grpc_ares_request* r, const std::string& name) + : r_(r), name_(name) { + grpc_ares_request_ref_locked(r_); + } + + ~GrpcAresQuery() { grpc_ares_request_unref_locked(r_); } + + grpc_ares_request* parent_request() { return r_; } + + const std::string& name() { return name_; } + + private: + /* the top level request instance */ + grpc_ares_request* r_; + /** for logging and errors */ + const std::string name_; +}; + +static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( + grpc_ares_ev_driver* ev_driver) { + GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, + ev_driver); + gpr_ref(&ev_driver->refs); + return ev_driver; +} + +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { + GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, + ev_driver); + if (gpr_unref(&ev_driver->refs)) { + GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, + ev_driver); + GPR_ASSERT(ev_driver->fds == nullptr); + ares_destroy(ev_driver->channel); + grpc_ares_complete_request_locked(ev_driver->request); + delete ev_driver; + } +} + +static void fd_node_destroy_locked(fd_node* fdn) { + GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + GPR_ASSERT(fdn->already_shutdown); + delete fdn->grpc_polled_fd; + gpr_free(fdn); +} + +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + fdn->grpc_polled_fd->ShutdownLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + } +} + +void grpc_ares_ev_driver_on_queries_complete_locked( + grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. + // grpc_ares_notify_on_event_locked will shut down any remaining + // fds. + ev_driver->shutting_down = true; + grpc_timer_cancel(&ev_driver->query_timeout); + grpc_timer_cancel(&ev_driver->ares_backup_poll_alarm); + grpc_ares_ev_driver_unref(ev_driver); +} + +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { + ev_driver->shutting_down = true; + fd_node* fn = ev_driver->fds; + while (fn != nullptr) { + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); + fn = fn->next; + } +} + +// Search fd in the fd_node list head. This is an O(n) search, the max possible +// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { + fd_node phony_head; + phony_head.next = *head; + fd_node* node = &phony_head; + while (node->next != nullptr) { + if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) { + fd_node* ret = node->next; + node->next = node->next->next; + *head = phony_head.next; + return ret; + } + node = node->next; + } + return nullptr; +} + +static grpc_millis calculate_next_ares_backup_poll_alarm_ms( + grpc_ares_ev_driver* driver) { + // An alternative here could be to use ares_timeout to try to be more + // accurate, but that would require using "struct timeval"'s, which just makes + // things a bit more complicated. So just poll every second, as suggested + // by the c-ares code comments. + grpc_millis ms_until_next_ares_backup_poll_alarm = 1000; + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p. next ares process poll time in " + "%" PRId64 " ms", + driver->request, driver, ms_until_next_ares_backup_poll_alarm); + return ms_until_next_ares_backup_poll_alarm + + grpc_core::ExecCtx::Get()->Now(); +} + +static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_error* error) { + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " + "err=%s", + driver->request, driver, driver->shutting_down, grpc_error_string(error)); + if (!driver->shutting_down && error == GRPC_ERROR_NONE) { + grpc_ares_ev_driver_shutdown_locked(driver); + } + grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); +} + +static void on_timeout(void* arg, grpc_error* error) { + grpc_ares_ev_driver* driver = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + driver->work_serializer->Run( + [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); +} + +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); + +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, + grpc_error* error); + +static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) { + grpc_ares_ev_driver* driver = static_cast(arg); + GRPC_ERROR_REF(error); + driver->work_serializer->Run( + [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, + DEBUG_LOCATION); +} + +/* In case of non-responsive DNS servers, dropped packets, etc., c-ares has + * intelligent timeout and retry logic, which we can take advantage of by + * polling ares_process_fd on time intervals. Overall, the c-ares library is + * meant to be called into and given a chance to proceed name resolution: + * a) when fd events happen + * b) when some time has passed without fd events having happened + * For the latter, we use this backup poller. Also see + * https://github.com/grpc/grpc/pull/17688 description for more details. */ +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, + grpc_error* error) { + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " + "driver->shutting_down=%d. " + "err=%s", + driver->request, driver, driver->shutting_down, grpc_error_string(error)); + if (!driver->shutting_down && error == GRPC_ERROR_NONE) { + fd_node* fdn = driver->fds; + while (fdn != nullptr) { + if (!fdn->already_shutdown) { + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; " + "ares_process_fd. fd=%s", + driver->request, driver, fdn->grpc_polled_fd->GetName()); + ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + ares_process_fd(driver->channel, as, as); + } + fdn = fdn->next; + } + if (!driver->shutting_down) { + grpc_millis next_ares_backup_poll_alarm = + calculate_next_ares_backup_poll_alarm_ms(driver); + grpc_ares_ev_driver_ref(driver); + GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked, + on_ares_backup_poll_alarm, driver, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&driver->ares_backup_poll_alarm, + next_ares_backup_poll_alarm, + &driver->on_ares_backup_poll_alarm_locked); + } + grpc_ares_notify_on_event_locked(driver); + } + grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); +} + +static void on_readable_locked(fd_node* fdn, grpc_error* error) { + GPR_ASSERT(fdn->readable_registered); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->readable_registered = false; + GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + do { + ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); + } while (fdn->grpc_polled_fd->IsFdStillReadableLocked()); } else { - GPR_ASSERT(0); + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); } -AresRequest::AddressQuery::~AddressQuery() { - request_->DecrementPendingQueries(); +static void on_readable(void* arg, grpc_error* error) { + fd_node* fdn = static_cast(arg); + GRPC_ERROR_REF(error); /* ref owned by lambda */ + fdn->ev_driver->work_serializer->Run( + [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); } -void AresRequest::AddressQuery::OnHostByNameDoneLocked( - void* arg, int status, int /*timeouts*/, struct hostent* hostent) { - std::unique_ptr q(static_cast(arg)); - AresRequest* request = q->request_; +static void on_writable_locked(fd_node* fdn, grpc_error* error) { + GPR_ASSERT(fdn->writable_registered); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->writable_registered = false; + GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); +} + +static void on_writable(void* arg, grpc_error* error) { + fd_node* fdn = static_cast(arg); + GRPC_ERROR_REF(error); /* ref owned by lambda */ + fdn->ev_driver->work_serializer->Run( + [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); +} + +// Get the file descriptors used by the ev_driver's ares channel, register +// driver_closure with these filedescriptors. +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { + fd_node* new_list = nullptr; + if (!ev_driver->shutting_down) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == nullptr) { + fdn = static_cast(gpr_malloc(sizeof(fd_node))); + fdn->grpc_polled_fd = + ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( + socks[i], ev_driver->pollset_set, ev_driver->work_serializer); + GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + fdn->already_shutdown = false; + } + fdn->next = new_list; + new_list = fdn; + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + grpc_ares_ev_driver_ref(ev_driver); + GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, + grpc_schedule_on_exec_ctx); + fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure + // has not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); + grpc_ares_ev_driver_ref(ev_driver); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, + grpc_schedule_on_exec_ctx); + fdn->grpc_polled_fd->RegisterForOnWriteableLocked( + &fdn->write_closure); + fdn->writable_registered = true; + } + } + } + } + // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and + // are therefore no longer in use, so they can be shut down and removed from + // the list. + while (ev_driver->fds != nullptr) { + fd_node* cur = ev_driver->fds; + ev_driver->fds = ev_driver->fds->next; + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } + } + ev_driver->fds = new_list; +} + +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { + grpc_ares_notify_on_event_locked(ev_driver); + // Initialize overall DNS resolution timeout alarm + grpc_millis timeout = + ev_driver->query_timeout_ms == 0 + ? GRPC_MILLIS_INF_FUTURE + : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " + "%" PRId64 " ms", + ev_driver->request, ev_driver, timeout); + grpc_ares_ev_driver_ref(ev_driver); + GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&ev_driver->query_timeout, timeout, + &ev_driver->on_timeout_locked); + // Initialize the backup poll alarm + grpc_millis next_ares_backup_poll_alarm = + calculate_next_ares_backup_poll_alarm_ms(ev_driver); + grpc_ares_ev_driver_ref(ev_driver); + GRPC_CLOSURE_INIT(&ev_driver->on_ares_backup_poll_alarm_locked, + on_ares_backup_poll_alarm, ev_driver, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&ev_driver->ares_backup_poll_alarm, + next_ares_backup_poll_alarm, + &ev_driver->on_ares_backup_poll_alarm_locked); +} + +static void noop_inject_channel_config(ares_channel /*channel*/) {} + +void (*grpc_ares_test_only_inject_config)(ares_channel channel) = + noop_inject_channel_config; + +grpc_error* grpc_ares_ev_driver_create_locked( + grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, + std::shared_ptr work_serializer, + grpc_ares_request* request) { + *ev_driver = new grpc_ares_ev_driver(); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + grpc_ares_test_only_inject_config((*ev_driver)->channel); + GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); + if (status != ARES_SUCCESS) { + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Failed to init ares channel. C-ares error: ", + ares_strerror(status)) + .c_str()); + gpr_free(*ev_driver); + return err; + } + (*ev_driver)->work_serializer = std::move(work_serializer); + gpr_ref_init(&(*ev_driver)->refs, 1); + (*ev_driver)->pollset_set = pollset_set; + (*ev_driver)->fds = nullptr; + (*ev_driver)->shutting_down = false; + (*ev_driver)->request = request; + (*ev_driver)->polled_fd_factory = + grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer); + (*ev_driver) + ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); + (*ev_driver)->query_timeout_ms = query_timeout_ms; + return GRPC_ERROR_NONE; +} + +static void log_address_sorting_list(const grpc_ares_request* r, + const ServerAddressList& addresses, + const char* input_output_str) { + for (size_t i = 0; i < addresses.size(); i++) { + std::string addr_str = + grpc_sockaddr_to_string(&addresses[i].address(), true); + gpr_log(GPR_INFO, + "(c-ares resolver) request:%p c-ares address sorting: %s[%" PRIuPTR + "]=%s", + r, input_output_str, i, addr_str.c_str()); + } +} + +void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r, + ServerAddressList* addresses) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { + log_address_sorting_list(r, *addresses, "input"); + } + address_sorting_sortable* sortables = static_cast( + gpr_zalloc(sizeof(address_sorting_sortable) * addresses->size())); + for (size_t i = 0; i < addresses->size(); ++i) { + sortables[i].user_data = &(*addresses)[i]; + memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, + (*addresses)[i].address().len); + sortables[i].dest_addr.len = (*addresses)[i].address().len; + } + address_sorting_rfc_6724_sort(sortables, addresses->size()); + ServerAddressList sorted; + sorted.reserve(addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sorted.emplace_back(*static_cast(sortables[i].user_data)); + } + gpr_free(sortables); + *addresses = std::move(sorted); + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { + log_address_sorting_list(r, *addresses, "output"); + } +} + +static void grpc_ares_request_ref_locked(grpc_ares_request* r) { + r->pending_queries++; +} + +static void grpc_ares_request_unref_locked(grpc_ares_request* r) { + r->pending_queries--; + if (r->pending_queries == 0u) { + grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver); + } +} + +void grpc_ares_complete_request_locked(grpc_ares_request* r) { + /* Invoke on_done callback and destroy the + request */ + r->ev_driver = nullptr; + ServerAddressList* addresses = r->addresses_out->get(); + if (addresses != nullptr) { + grpc_cares_wrapper_address_sorting_sort(r, addresses); + GRPC_ERROR_UNREF(r->error); + r->error = GRPC_ERROR_NONE; + // TODO(apolcyn): allow c-ares to return a service config + // with no addresses along side it + } + if (r->balancer_addresses_out != nullptr) { + ServerAddressList* balancer_addresses = r->balancer_addresses_out->get(); + if (balancer_addresses != nullptr) { + grpc_cares_wrapper_address_sorting_sort(r, balancer_addresses); + } + } + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, r->error); +} + +/* Note that the returned object takes a reference to qtype, so + * qtype must outlive it. */ +static grpc_ares_hostbyname_request* create_hostbyname_request_locked( + grpc_ares_request* parent_request, const char* host, uint16_t port, + bool is_balancer, const char* qtype) { + GRPC_CARES_TRACE_LOG( + "request:%p create_hostbyname_request_locked host:%s port:%d " + "is_balancer:%d qtype:%s", + parent_request, host, port, is_balancer, qtype); + grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request(); + hr->parent_request = parent_request; + hr->host = gpr_strdup(host); + hr->port = port; + hr->is_balancer = is_balancer; + hr->qtype = qtype; + grpc_ares_request_ref_locked(parent_request); + return hr; +} + +static void destroy_hostbyname_request_locked( + grpc_ares_hostbyname_request* hr) { + grpc_ares_request_unref_locked(hr->parent_request); + gpr_free(hr->host); + delete hr; +} + +static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, + struct hostent* hostent) { + grpc_ares_hostbyname_request* hr = + static_cast(arg); + grpc_ares_request* r = hr->parent_request; if (status == ARES_SUCCESS) { GRPC_CARES_TRACE_LOG( - "request:%p OnHostByNameDoneLocked qtype=%s host=%s ARES_SUCCESS", - request, q->qtype_, q->host_.c_str()); + "request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r, + hr->qtype, hr->host); std::unique_ptr* address_list_ptr = - q->is_balancer_ ? request->balancer_addresses_out_ - : request->addresses_out_; + hr->is_balancer ? r->balancer_addresses_out : r->addresses_out; if (*address_list_ptr == nullptr) { *address_list_ptr = absl::make_unique(); } ServerAddressList& addresses = **address_list_ptr; for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) { absl::InlinedVector args_to_add; - if (q->is_balancer_) { + if (hr->is_balancer) { args_to_add.emplace_back( - CreateAuthorityOverrideChannelArg(q->host_.c_str())); + grpc_core::CreateAuthorityOverrideChannelArg(hr->host)); } grpc_channel_args* args = grpc_channel_args_copy_and_add( nullptr, args_to_add.data(), args_to_add.size()); @@ -120,14 +678,14 @@ void AresRequest::AddressQuery::OnHostByNameDoneLocked( memcpy(&addr.sin6_addr, hostent->h_addr_list[i], sizeof(struct in6_addr)); addr.sin6_family = static_cast(hostent->h_addrtype); - addr.sin6_port = q->port_; + addr.sin6_port = hr->port; addresses.emplace_back(&addr, addr_len, args); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); GRPC_CARES_TRACE_LOG( "request:%p c-ares resolver gets a AF_INET6 result: \n" " addr: %s\n port: %d\n sin6_scope_id: %d\n", - request, output, ntohs(q->port_), addr.sin6_scope_id); + r, output, ntohs(hr->port), addr.sin6_scope_id); break; } case AF_INET: { @@ -137,14 +695,14 @@ void AresRequest::AddressQuery::OnHostByNameDoneLocked( memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr)); addr.sin_family = static_cast(hostent->h_addrtype); - addr.sin_port = q->port_; + addr.sin_port = hr->port; addresses.emplace_back(&addr, addr_len, args); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); GRPC_CARES_TRACE_LOG( "request:%p c-ares resolver gets a AF_INET result: \n" " addr: %s\n port: %d\n", - request, output, ntohs(q->port_)); + r, output, ntohs(hr->port)); break; } } @@ -152,53 +710,42 @@ void AresRequest::AddressQuery::OnHostByNameDoneLocked( } else { std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s", - q->qtype_, q->host_, q->is_balancer_, ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p OnHostByNameDoneLocked: %s", request, + hr->qtype, hr->host, hr->is_balancer, ares_strerror(status)); + GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r, error_msg.c_str()); grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - request->error_ = grpc_error_add_child(error, request->error_); + r->error = grpc_error_add_child(error, r->error); } + destroy_hostbyname_request_locked(hr); } -void AresRequest::SRVQuery::Create(AresRequest* request) { - SRVQuery* q = new SRVQuery(request); - // note that ares_query can't be invoked from the ctor because it - // can run it's callback inline and invoke the dtor - ares_query(request->channel_, request->srv_qname().c_str(), ns_c_in, ns_t_srv, - OnSRVQueryDoneLocked, q); -} - -AresRequest::SRVQuery::SRVQuery(AresRequest* request) : request_(request) { - ++request_->pending_queries_; -} - -AresRequest::SRVQuery::~SRVQuery() { request_->DecrementPendingQueries(); } - -void AresRequest::SRVQuery::OnSRVQueryDoneLocked(void* arg, int status, - int /*timeouts*/, - unsigned char* abuf, - int alen) { - std::unique_ptr q(static_cast(arg)); - AresRequest* request = q->request_; +static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, + unsigned char* abuf, int alen) { + GrpcAresQuery* q = static_cast(arg); + grpc_ares_request* r = q->parent_request(); if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG("request:%p OnSRVQueryDoneLocked name=%s ARES_SUCCESS", - request, request->srv_qname().c_str()); + GRPC_CARES_TRACE_LOG( + "request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r, + q->name().c_str()); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); - GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", request, + GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r, parse_status); if (parse_status == ARES_SUCCESS) { for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { - if (AresQueryIPv6()) { - AresRequest::AddressQuery::Create( - request, std::string(srv_it->host), htons(srv_it->port), - true /* is_balancer */, AF_INET6 /* address_family */); + if (grpc_ares_query_ipv6()) { + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( + r, srv_it->host, htons(srv_it->port), true /* is_balancer */, + "AAAA"); + ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET6, + on_hostbyname_done_locked, hr); } - AresRequest::AddressQuery::Create( - request, std::string(srv_it->host), htons(srv_it->port), - true /* is_balancer */, AF_INET /* address_family */); - request->NotifyOnEventLocked(); + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( + r, srv_it->host, htons(srv_it->port), true /* is_balancer */, "A"); + ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET, + on_hostbyname_done_locked, hr); + grpc_ares_notify_on_event_locked(r->ev_driver); } } if (reply != nullptr) { @@ -206,756 +753,474 @@ void AresRequest::SRVQuery::OnSRVQueryDoneLocked(void* arg, int status, } } else { std::string error_msg = absl::StrFormat( - "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", - request->srv_qname(), ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p OnSRVQueryDoneLocked: %s", request, + "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(), + ares_strerror(status)); + GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r, error_msg.c_str()); grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - request->error_ = grpc_error_add_child(error, request->error_); + r->error = grpc_error_add_child(error, r->error); } + delete q; } -void AresRequest::TXTQuery::Create(AresRequest* request) { - TXTQuery* q = new TXTQuery(request); - // note that ares_search can't be invoked from the ctor because it - // can run it's callback inline and invoke the dtor - ares_search(request->channel_, request->txt_qname().c_str(), ns_c_in, - ns_t_txt, OnTXTDoneLocked, q); -} - -AresRequest::TXTQuery::TXTQuery(AresRequest* request) : request_(request) { - ++request_->pending_queries_; -} - -AresRequest::TXTQuery::~TXTQuery() { request_->DecrementPendingQueries(); } +static const char g_service_config_attribute_prefix[] = "grpc_config="; -void AresRequest::TXTQuery::OnTXTDoneLocked(void* arg, int status, - int /*timeouts*/, - unsigned char* buf, int len) { - std::unique_ptr q(static_cast(arg)); - AresRequest* request = q->request_; - const absl::string_view kServiceConfigAttributePrefix = "grpc_config="; +static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, + unsigned char* buf, int len) { + GrpcAresQuery* q = static_cast(arg); + std::unique_ptr query_deleter(q); + grpc_ares_request* r = q->parent_request(); + const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; - auto on_error = [request, status](const char* error_category) { - std::string error_msg = absl::StrFormat( - "%s: c-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", - error_category, request->txt_qname(), ares_strerror(status)); - grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - GRPC_CARES_TRACE_LOG("request:%p OnTXTDoneLocked %s", request, - error_msg.c_str()); - request->error_ = grpc_error_add_child(error, request->error_); - }; - if (status != ARES_SUCCESS) { - on_error("TXT resolution error"); - return; - } - GRPC_CARES_TRACE_LOG("request:%p OnTXTDoneLocked name=%s ARES_SUCCESS", - request, request->txt_qname().c_str()); + grpc_error* error = GRPC_ERROR_NONE; + if (status != ARES_SUCCESS) goto fail; + GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r, + q->name().c_str()); status = ares_parse_txt_reply_ext(buf, len, &reply); - if (status != ARES_SUCCESS) { - on_error("ares_parse_txt_reply_ext error"); - return; - } + if (status != ARES_SUCCESS) goto fail; // Find service config in TXT record. for (result = reply; result != nullptr; result = result->next) { - absl::string_view result_view(reinterpret_cast(result->txt), - result->length); if (result->record_start && - absl::StartsWith(result_view, kServiceConfigAttributePrefix)) { + memcmp(result->txt, g_service_config_attribute_prefix, prefix_len) == + 0) { break; } } // Found a service config record. - std::vector service_config_parts = { - absl::string_view(reinterpret_cast(result->txt), - result->length) - .substr(kServiceConfigAttributePrefix.size())}; - for (result = result->next; result != nullptr && !result->record_start; - result = result->next) { - service_config_parts.emplace_back( - reinterpret_cast(result->txt), result->length); - } - *request->service_config_json_out_ = absl::StrJoin(service_config_parts, ""); - GRPC_CARES_TRACE_LOG("request:%p found service config: %s", request, - request->service_config_json_out_->value().c_str()); + if (result != nullptr) { + size_t service_config_len = result->length - prefix_len; + *r->service_config_json_out = + static_cast(gpr_malloc(service_config_len + 1)); + memcpy(*r->service_config_json_out, result->txt + prefix_len, + service_config_len); + for (result = result->next; result != nullptr && !result->record_start; + result = result->next) { + *r->service_config_json_out = static_cast( + gpr_realloc(*r->service_config_json_out, + service_config_len + result->length + 1)); + memcpy(*r->service_config_json_out + service_config_len, result->txt, + result->length); + service_config_len += result->length; + } + (*r->service_config_json_out)[service_config_len] = '\0'; + GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, + *r->service_config_json_out); + } // Clean up. ares_free_data(reply); + return; +fail: + std::string error_msg = + absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", + q->name(), ares_strerror(status)); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); + GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r, + error_msg.c_str()); + r->error = grpc_error_add_child(error, r->error); } -AresRequest::FdNode::FdNode(RefCountedPtr request, - std::unique_ptr grpc_polled_fd) - : request_(std::move(request)), grpc_polled_fd_(std::move(grpc_polled_fd)) { - GRPC_CARES_TRACE_LOG("request:%p new fd: %s", request_.get(), - grpc_polled_fd_->GetName()); - GRPC_CLOSURE_INIT(&read_closure_, AresRequest::FdNode::OnReadable, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&write_closure_, AresRequest::FdNode::OnWritable, this, - grpc_schedule_on_exec_ctx); -} - -AresRequest::FdNode::~FdNode() { - GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", request_.get(), - grpc_polled_fd_->GetName()); - GPR_ASSERT(!readable_registered_); - GPR_ASSERT(!writable_registered_); - GPR_ASSERT(shutdown_); -} - -void AresRequest::FdNode::MaybeRegisterForOnReadableLocked() { - if (!readable_registered_) { - GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", request_.get(), - grpc_polled_fd_->GetName()); - grpc_polled_fd_->RegisterForOnReadableLocked(&read_closure_); - readable_registered_ = true; - } -} - -void AresRequest::FdNode::MaybeRegisterForOnWritableLocked() { - if (!writable_registered_) { - GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", request_.get(), - grpc_polled_fd_->GetName()); - grpc_polled_fd_->RegisterForOnWriteableLocked(&write_closure_); - writable_registered_ = true; - } -} - -void AresRequest::FdNode::MaybeShutdownLocked(absl::string_view reason) { - if (!shutdown_) { - GRPC_CARES_TRACE_LOG("request:%p shutdown on: %s", request_.get(), - grpc_polled_fd_->GetName()); - grpc_polled_fd_->ShutdownLocked( - GRPC_ERROR_CREATE_FROM_COPIED_STRING(std::string(reason).c_str())); - shutdown_ = true; - } -} - -bool AresRequest::FdNode::IsActiveLocked() { - return readable_registered_ || writable_registered_; -} - -void AresRequest::FdNode::OnReadableLocked(grpc_error* error) { - GPR_ASSERT(readable_registered_); - readable_registered_ = false; - const ares_socket_t as = grpc_polled_fd_->GetWrappedAresSocketLocked(); - GRPC_CARES_TRACE_LOG("request:%p readable on: %s, error: %s", request_.get(), - grpc_polled_fd_->GetName(), grpc_error_string(error)); - if (error == GRPC_ERROR_NONE) { - do { - ares_process_fd(request_->channel_, as, ARES_SOCKET_BAD); - } while (grpc_polled_fd_->IsFdStillReadableLocked()); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this request will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // request will be cleaned up in the follwing - // NotifyOnEventLocked(). - ares_cancel(request_->channel_); - } - request_->NotifyOnEventLocked(); - GRPC_ERROR_UNREF(error); -} - -void AresRequest::FdNode::OnReadable(void* arg, grpc_error* error) { - AresRequest::FdNode* fdn = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - fdn->request_->work_serializer_->Run( - [fdn, error]() { fdn->OnReadableLocked(error); }, DEBUG_LOCATION); -} - -void AresRequest::FdNode::OnWritableLocked(grpc_error* error) { - GPR_ASSERT(writable_registered_); - writable_registered_ = false; - const ares_socket_t as = grpc_polled_fd_->GetWrappedAresSocketLocked(); - GRPC_CARES_TRACE_LOG("request:%p writable on %s, error: %s", request_.get(), - grpc_polled_fd_->GetName(), grpc_error_string(error)); - if (error == GRPC_ERROR_NONE) { - ares_process_fd(request_->channel_, ARES_SOCKET_BAD, as); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this request will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // request will be cleaned up in the follwing NotifyOnEventLocked(). - ares_cancel(request_->channel_); - } - request_->NotifyOnEventLocked(); - GRPC_ERROR_UNREF(error); -} - -void AresRequest::FdNode::OnWritable(void* arg, grpc_error* error) { - AresRequest::FdNode* fdn = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - fdn->request_->work_serializer_->Run( - [fdn, error]() { fdn->OnWritableLocked(error); }, DEBUG_LOCATION); -} - -OrphanablePtr AresRequest::Create( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, - std::unique_ptr* addrs, - std::unique_ptr* balancer_addrs, - absl::optional* service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) { - OrphanablePtr request = MakeOrphanable( - addrs, balancer_addrs, service_config_json, interested_parties, - query_timeout_ms, on_done, work_serializer); - GRPC_CARES_TRACE_LOG( - "request:%p c-ares AresRequest::Create name=%s, " - "default_port=%s timeout in %d ms", - request.get(), std::string(name).c_str(), - std::string(default_port).c_str(), query_timeout_ms); - // pretend we have 1 query to avoid calling on_done before initialization is - // done - request->pending_queries_ = 1; - // Initialize overall DNS resolution timeout alarm - grpc_millis timeout = - request->query_timeout_ms_ == 0 - ? GRPC_MILLIS_INF_FUTURE - : request->query_timeout_ms_ + ExecCtx::Get()->Now(); - GRPC_CLOSURE_INIT(&request->on_timeout_locked_, AresRequest::OnTimeout, - request.get(), grpc_schedule_on_exec_ctx); - request->Ref().release(); // owned by timer callback - grpc_timer_init(&request->query_timeout_, timeout, - &request->on_timeout_locked_); - // Initialize the backup poll alarm - grpc_millis next_ares_backup_poll_alarm = - request->CalculateNextAresBackupPollAlarm(); - GRPC_CLOSURE_INIT(&request->on_ares_backup_poll_alarm_locked_, - AresRequest::OnAresBackupPollAlarm, request.get(), - grpc_schedule_on_exec_ctx); - request->Ref().release(); // owned by timer callback - grpc_timer_init(&request->ares_backup_poll_alarm_, - next_ares_backup_poll_alarm, - &request->on_ares_backup_poll_alarm_locked_); - // parse name, splitting it into host and port parts - std::string target_port_str; - SplitHostPort(name, &request->target_host_, &target_port_str); - if (request->target_host_.empty()) { - request->error_ = grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_COPIED_STRING("unparseable host:port"), - GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(std::string(name).c_str())); - request->DecrementPendingQueries(); - return request; - } else if (target_port_str.empty()) { +void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( + grpc_ares_request* r, const char* dns_server, const char* name, + const char* default_port, grpc_pollset_set* interested_parties, + int query_timeout_ms, + std::shared_ptr work_serializer) { + grpc_error* error = GRPC_ERROR_NONE; + grpc_ares_hostbyname_request* hr = nullptr; + /* parse name, splitting it into host and port parts */ + std::string host; + std::string port; + grpc_core::SplitHostPort(name, &host, &port); + if (host.empty()) { + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); + goto error_cleanup; + } else if (port.empty()) { if (default_port == nullptr) { - request->error_ = grpc_error_set_str( + error = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), - GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(std::string(name).c_str())); - request->DecrementPendingQueries(); - return request; - } - target_port_str = std::string(default_port); - } - request->target_port_ = grpc_strhtons(target_port_str.c_str()); - // Don't query for SRV and TXT records if the target is "localhost", so - // as to cut down on lookups over the network, especially in tests: - // https://github.com/grpc/proposal/pull/79 - if (request->target_host_ == "localhost") { - request->balancer_addresses_out_ = nullptr; - request->service_config_json_out_ = nullptr; - } - // Early out if the target is an ipv4 or ipv6 literal. - if (request->ResolveAsIPLiteralLocked()) { - request->DecrementPendingQueries(); - return request; - } - // Early out if the target is localhost and we're on Windows. - if (request->MaybeResolveLocalHostManuallyLocked()) { - request->DecrementPendingQueries(); - return request; - } - // Look up name using c-ares lib. - request->ContinueAfterCheckLocalhostAndIPLiteralsLocked(dns_server); - request->DecrementPendingQueries(); - return request; -} - -AresRequest::AresRequest( - std::unique_ptr* addresses_out, - std::unique_ptr* balancer_addresses_out, - absl::optional* service_config_json_out, - grpc_pollset_set* pollset_set, int query_timeout_ms, - std::function on_done, - std::shared_ptr work_serializer) - : addresses_out_(addresses_out), - balancer_addresses_out_(balancer_addresses_out), - service_config_json_out_(service_config_json_out), - pollset_set_(pollset_set), - work_serializer_(std::move(work_serializer)), - polled_fd_factory_(NewGrpcPolledFdFactory(work_serializer_)), - query_timeout_ms_(query_timeout_ms), - on_done_(std::move(on_done)) {} - -AresRequest::~AresRequest() { - if (channel_ != nullptr) { - ares_destroy(channel_); - } -} - -void AresRequest::Orphan() { - ShutdownIOLocked("request orphaned"); - Unref(); -} - -// ares_library_init and ares_library_cleanup are currently no-op except under -// Windows. Calling them may cause race conditions when other parts of the -// binary calls these functions concurrently. -#ifdef GPR_WINDOWS -grpc_error* AresRequest::Init(void) { - int status = ares_library_init(ARES_LIB_INIT_ALL); - if (status != ARES_SUCCESS) { - return GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("ares_library_init failed: ", ares_strerror(status)) - .c_str()); - } - return GRPC_ERROR_NONE; -} - -void AresRequest::Shutdown(void) { ares_library_cleanup(); } -#else -grpc_error* AresRequest::Init(void) { return GRPC_ERROR_NONE; } -void AresRequest::Shutdown(void) {} -#endif // GPR_WINDOWS - -void AresRequest::ShutdownIOLocked(absl::string_view reason) { - shutting_down_ = true; - for (auto& p : fds_) { - p.second->MaybeShutdownLocked( - absl::StrCat("AresRequest::ShutdownIOLocked reason: ", reason)); - } -} - -grpc_millis AresRequest::CalculateNextAresBackupPollAlarm() const { - // An alternative here could be to use ares_timeout to try to be more - // accurate, but that would require using "struct timeval"'s, which just makes - // things a bit more complicated. So just poll every second, as suggested - // by the c-ares code comments. - grpc_millis kMsUntilNextAresBackupPollAlarm = 1000; - GRPC_CARES_TRACE_LOG( - "request:%p next ares process poll time in " - "%" PRId64 " ms", - this, kMsUntilNextAresBackupPollAlarm); - return kMsUntilNextAresBackupPollAlarm + ExecCtx::Get()->Now(); -} - -void AresRequest::OnTimeoutLocked(grpc_error* error) { - GRPC_CARES_TRACE_LOG( - "request:%p OnTimeoutLocked. shutting_down_=%d. " - "err=%s", - this, shutting_down_, grpc_error_string(error)); - // TODO(apolcyn): always run ShutdownIOLocked since it's idempotent? - if (!shutting_down_ && error == GRPC_ERROR_NONE) { - ShutdownIOLocked("request timeout"); - } - Unref(); - GRPC_ERROR_UNREF(error); -} - -void AresRequest::OnTimeout(void* arg, grpc_error* error) { - AresRequest* request = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - request->work_serializer_->Run( - [request, error]() { request->OnTimeoutLocked(error); }, DEBUG_LOCATION); -} - -// In case of non-responsive DNS servers, dropped packets, etc., c-ares has -// intelligent timeout and retry logic, which we can take advantage of by -// polling ares_process_fd on time intervals. Overall, the c-ares library is -// meant to be called into and given a chance to proceed name resolution: -// a) when fd events happen -// b) when some time has passed without fd events having happened -// For the latter, we use this backup poller. Also see -// https://github.com/grpc/grpc/pull/17688 description for more details. -void AresRequest::OnAresBackupPollAlarmLocked(grpc_error* error) { - GRPC_CARES_TRACE_LOG( - "request:%p OnAresBackupPollAlarmLocked. " - "shutting_down_=%d. " - "err=%s", - this, shutting_down_, grpc_error_string(error)); - if (!shutting_down_ && error == GRPC_ERROR_NONE) { - for (auto& p : fds_) { - AresRequest::FdNode* fdn = p.second.get(); - if (!fdn->shutdown()) { - GRPC_CARES_TRACE_LOG( - "request:%p OnAresBackupPollAlarmLocked; " - "ares_process_fd. fd=%s", - this, fdn->grpc_polled_fd()->GetName()); - ares_socket_t as = fdn->grpc_polled_fd()->GetWrappedAresSocketLocked(); - ares_process_fd(channel_, as, as); - } - } - // the work done in ares_process_fd might have set shutting_down_ = true - if (!shutting_down_) { - grpc_millis next_ares_backup_poll_alarm = - CalculateNextAresBackupPollAlarm(); - Ref().release(); // owned by timer callback - grpc_timer_init(&ares_backup_poll_alarm_, next_ares_backup_poll_alarm, - &on_ares_backup_poll_alarm_locked_); - } - NotifyOnEventLocked(); - } - Unref(); - GRPC_ERROR_UNREF(error); -} - -void AresRequest::OnAresBackupPollAlarm(void* arg, grpc_error* error) { - AresRequest* request = static_cast(arg); - GRPC_ERROR_REF(error); - request->work_serializer_->Run( - [request, error]() { request->OnAresBackupPollAlarmLocked(error); }, - DEBUG_LOCATION); -} - -// Get the file descriptors used by the request's ares channel, register -// I/O readable/writable callbacks with these filedescriptors. -void AresRequest::NotifyOnEventLocked() { - // prevent unrefs in FdNode dtors from prematurely destroying this object - RefCountedPtr self_ref = Ref(); - std::map> active_fds; - if (!shutting_down_) { - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM); - for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - ares_socket_t s = socks[i]; - if (fds_[s] == nullptr) { - fds_[s] = absl::make_unique( - Ref(), std::unique_ptr( - polled_fd_factory_->NewGrpcPolledFdLocked( - s, pollset_set_, work_serializer_))); - } - auto p = fds_.find(s); - if (ARES_GETSOCK_READABLE(socks_bitmask, i)) { - p->second->MaybeRegisterForOnReadableLocked(); - } - if (ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - p->second->MaybeRegisterForOnWritableLocked(); - } - active_fds[p->first] = std::move(p->second); - fds_.erase(p); - } - } - } - // Any remaining fds in fds_ were not returned by ares_getsock() and - // are therefore no longer in use, so they can be shut down and removed from - // the list. - for (auto& p : fds_) { - p.second->MaybeShutdownLocked("c-ares fd shutdown"); - if (p.second->IsActiveLocked()) { - active_fds[p.first] = std::move(p.second); + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); + goto error_cleanup; } + port = default_port; } - fds_ = std::move(active_fds); -} - -void AresRequest::ContinueAfterCheckLocalhostAndIPLiteralsLocked( - absl::string_view dns_server) { - ares_options opts; - memset(&opts, 0, sizeof(opts)); - opts.flags |= ARES_FLAG_STAYOPEN; - int status = ares_init_options(&channel_, &opts, ARES_OPT_FLAGS); - internal::AresTestOnlyInjectConfig(channel_); - if (status != ARES_SUCCESS) { - error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("Failed to init ares channel. C-ares error: ", - ares_strerror(status)) - .c_str()); - return; - } - polled_fd_factory_->ConfigureAresChannelLocked(channel_); + error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, + query_timeout_ms, + std::move(work_serializer), r); + if (error != GRPC_ERROR_NONE) goto error_cleanup; // If dns_server is specified, use it. - if (!dns_server.empty()) { - GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", this, - std::string(dns_server).c_str()); - struct ares_addr_port_node dns_server_addr; - memset(&dns_server_addr, 0, sizeof(dns_server_addr)); + if (dns_server != nullptr && dns_server[0] != '\0') { + GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, dns_server); grpc_resolved_address addr; if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) { - dns_server_addr.family = AF_INET; + r->dns_server_addr.family = AF_INET; struct sockaddr_in* in = reinterpret_cast(addr.addr); - memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, + memcpy(&r->dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr)); - dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); - dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); } else if (grpc_parse_ipv6_hostport(dns_server, &addr, false /* log_errors */)) { - dns_server_addr.family = AF_INET6; + r->dns_server_addr.family = AF_INET6; struct sockaddr_in6* in6 = reinterpret_cast(addr.addr); - memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr, + memcpy(&r->dns_server_addr.addr.addr6, &in6->sin6_addr, sizeof(struct in6_addr)); - dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); - dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); } else { - error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("cannot parse DNS server ip address: ", dns_server) - .c_str()); - return; + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); + goto error_cleanup; } - int status = ares_set_servers_ports(channel_, &dns_server_addr); + int status = + ares_set_servers_ports(r->ev_driver->channel, &r->dns_server_addr); if (status != ARES_SUCCESS) { - error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("C-ares status is not ARES_SUCCESS: ", ares_strerror(status)) .c_str()); - return; + goto error_cleanup; } } - if (AresQueryIPv6()) { - AresRequest::AddressQuery::Create(this, target_host_, target_port_, - false /* is_balancer */, - AF_INET6 /* address_family */); - } - AresRequest::AddressQuery::Create(this, target_host_, target_port_, - false /* is_balancer */, - AF_INET /* address_family */); - if (balancer_addresses_out_ != nullptr) { - AresRequest::SRVQuery::Create(this); - } - if (service_config_json_out_ != nullptr) { - AresRequest::TXTQuery::Create(this); - } - NotifyOnEventLocked(); -} - -void AresRequest::DecrementPendingQueries() { - if (--pending_queries_ == 0) { - GRPC_CARES_TRACE_LOG("request: %p queries complete", this); - ShutdownIOLocked("DNS queries finished"); - grpc_timer_cancel(&query_timeout_); - grpc_timer_cancel(&ares_backup_poll_alarm_); - ServerAddressList* addresses = addresses_out_->get(); - if (addresses != nullptr) { - AddressSortingSort(this, addresses, "service-addresses"); - GRPC_ERROR_UNREF(error_); - error_ = GRPC_ERROR_NONE; - // TODO(apolcyn): allow c-ares to return a service config - // with no addresses along side it - } - if (balancer_addresses_out_ != nullptr) { - ServerAddressList* balancer_addresses = balancer_addresses_out_->get(); - if (balancer_addresses != nullptr) { - AddressSortingSort(this, balancer_addresses, "grpclb-addresses"); - } - } - grpc_error* error = error_; - std::function on_done = on_done_; - // note it's safe to schedule this inline because we're currently - // holding the work serializer - work_serializer_->Run([on_done, error]() { on_done(error); }, - DEBUG_LOCATION); + r->pending_queries = 1; + if (grpc_ares_query_ipv6()) { + hr = create_hostbyname_request_locked(r, host.c_str(), + grpc_strhtons(port.c_str()), + /*is_balancer=*/false, "AAAA"); + ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET6, + on_hostbyname_done_locked, hr); + } + hr = create_hostbyname_request_locked(r, host.c_str(), + grpc_strhtons(port.c_str()), + /*is_balancer=*/false, "A"); + ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET, + on_hostbyname_done_locked, hr); + if (r->balancer_addresses_out != nullptr) { + /* Query the SRV record */ + std::string service_name = absl::StrCat("_grpclb._tcp.", host); + GrpcAresQuery* srv_query = new GrpcAresQuery(r, service_name); + ares_query(r->ev_driver->channel, service_name.c_str(), ns_c_in, ns_t_srv, + on_srv_query_done_locked, srv_query); } + if (r->service_config_json_out != nullptr) { + std::string config_name = absl::StrCat("_grpc_config.", host); + GrpcAresQuery* txt_query = new GrpcAresQuery(r, config_name); + ares_search(r->ev_driver->channel, config_name.c_str(), ns_c_in, ns_t_txt, + on_txt_done_locked, txt_query); + } + grpc_ares_ev_driver_start_locked(r->ev_driver); + grpc_ares_request_unref_locked(r); + return; + +error_cleanup: + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error); } -bool AresRequest::ResolveAsIPLiteralLocked() { +static bool inner_resolve_as_ip_literal_locked( + const char* name, const char* default_port, + std::unique_ptr* addrs, std::string* host, + std::string* port, std::string* hostport) { + if (!grpc_core::SplitHostPort(name, host, port)) { + gpr_log(GPR_ERROR, + "Failed to parse %s to host:port while attempting to resolve as ip " + "literal.", + name); + return false; + } + if (port->empty()) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s while attempting to resolve as " + "ip literal.", + name); + return false; + } + *port = default_port; + } grpc_resolved_address addr; - std::string hostport = JoinHostPort(target_host_, ntohs(target_port_)); - if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr, + *hostport = grpc_core::JoinHostPort(*host, atoi(port->c_str())); + if (grpc_parse_ipv4_hostport(hostport->c_str(), &addr, false /* log errors */) || - grpc_parse_ipv6_hostport(hostport.c_str(), &addr, + grpc_parse_ipv6_hostport(hostport->c_str(), &addr, false /* log errors */)) { - GPR_ASSERT(*addresses_out_ == nullptr); - *addresses_out_ = absl::make_unique(); - (*addresses_out_)->emplace_back(addr.addr, addr.len, nullptr /* args */); + GPR_ASSERT(*addrs == nullptr); + *addrs = absl::make_unique(); + (*addrs)->emplace_back(addr.addr, addr.len, nullptr /* args */); return true; } return false; } +static bool resolve_as_ip_literal_locked( + const char* name, const char* default_port, + std::unique_ptr* addrs) { + std::string host; + std::string port; + std::string hostport; + bool out = inner_resolve_as_ip_literal_locked(name, default_port, addrs, + &host, &port, &hostport); + return out; +} + +static bool target_matches_localhost_inner(const char* name, std::string* host, + std::string* port) { + if (!grpc_core::SplitHostPort(name, host, port)) { + gpr_log(GPR_ERROR, "Unable to split host and port for name: %s", name); + return false; + } + return gpr_stricmp(host->c_str(), "localhost") == 0; +} + +static bool target_matches_localhost(const char* name) { + std::string host; + std::string port; + return target_matches_localhost_inner(name, &host, &port); +} + #ifdef GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY -bool AresRequest::MaybeResolveLocalHostManuallyLocked() { - if (target_host_ == "localhost") { - GPR_ASSERT(*addresses_out_ == nullptr); - *addresses_out_ = absl::make_unique(); +static bool inner_maybe_resolve_localhost_manually_locked( + const grpc_ares_request* r, const char* name, const char* default_port, + std::unique_ptr* addrs, std::string* host, + std::string* port) { + grpc_core::SplitHostPort(name, host, port); + if (host->empty()) { + gpr_log(GPR_ERROR, + "Failed to parse %s into host:port during manual localhost " + "resolution check.", + name); + return false; + } + if (port->empty()) { + if (default_port == nullptr) { + gpr_log(GPR_ERROR, + "No port or default port for %s during manual localhost " + "resolution check.", + name); + return false; + } + *port = default_port; + } + if (gpr_stricmp(host->c_str(), "localhost") == 0) { + GPR_ASSERT(*addrs == nullptr); + *addrs = absl::make_unique(); + uint16_t numeric_port = grpc_strhtons(port->c_str()); // Append the ipv6 loopback address. struct sockaddr_in6 ipv6_loopback_addr; memset(&ipv6_loopback_addr, 0, sizeof(ipv6_loopback_addr)); ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; ipv6_loopback_addr.sin6_family = AF_INET6; - ipv6_loopback_addr.sin6_port = target_port_; - (*addresses_out_) - ->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), - nullptr /* args */); + ipv6_loopback_addr.sin6_port = numeric_port; + (*addrs)->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + nullptr /* args */); // Append the ipv4 loopback address. struct sockaddr_in ipv4_loopback_addr; memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); ((char*)&ipv4_loopback_addr.sin_addr)[0] = 0x7f; ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; ipv4_loopback_addr.sin_family = AF_INET; - ipv4_loopback_addr.sin_port = target_port_; - (*addresses_out_) - ->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), - nullptr /* args */); + ipv4_loopback_addr.sin_port = numeric_port; + (*addrs)->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + nullptr /* args */); // Let the address sorter figure out which one should be tried first. + grpc_cares_wrapper_address_sorting_sort(r, addrs->get()); return true; } return false; } + +static bool grpc_ares_maybe_resolve_localhost_manually_locked( + const grpc_ares_request* r, const char* name, const char* default_port, + std::unique_ptr* addrs) { + std::string host; + std::string port; + return inner_maybe_resolve_localhost_manually_locked(r, name, default_port, + addrs, &host, &port); +} #else /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -bool AresRequest::MaybeResolveLocalHostManuallyLocked() { return false; } +static bool grpc_ares_maybe_resolve_localhost_manually_locked( + const grpc_ares_request* /*r*/, const char* /*name*/, + const char* /*default_port*/, + std::unique_ptr* /*addrs*/) { + return false; +} #endif /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -namespace { - -// A GrpcResolveAddressAresRequest maintains the state need to -// carry out a single asynchronous grpc_resolve_address call. -class GrpcResolveAddressAresRequest { - public: - static void GrpcResolveAddressAresImpl(const char* name, - const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addrs) { - GrpcResolveAddressAresRequest* request = new GrpcResolveAddressAresRequest( - name, default_port, interested_parties, on_done, addrs); - auto on_resolution_done = [request](grpc_error* error) { - request->OnDNSLookupDoneLocked(error); - }; - request->work_serializer_->Run( - [request, on_resolution_done]() { - request->ares_request_ = LookupAresLocked( - "" /* dns_server */, request->name_, request->default_port_, - request->interested_parties_, on_resolution_done, - &request->addresses_, nullptr /* balancer_addresses */, - nullptr /* service_config_json */, - GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, - request->work_serializer_); - }, - DEBUG_LOCATION); +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, + std::unique_ptr* addrs, + std::unique_ptr* balancer_addrs, + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) { + grpc_ares_request* r = + static_cast(gpr_zalloc(sizeof(grpc_ares_request))); + r->ev_driver = nullptr; + r->on_done = on_done; + r->addresses_out = addrs; + r->balancer_addresses_out = balancer_addrs; + r->service_config_json_out = service_config_json; + r->error = GRPC_ERROR_NONE; + r->pending_queries = 0; + GRPC_CARES_TRACE_LOG( + "request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, " + "default_port=%s", + r, name, default_port); + // Early out if the target is an ipv4 or ipv6 literal. + if (resolve_as_ip_literal_locked(name, default_port, addrs)) { + grpc_ares_complete_request_locked(r); + return r; } + // Early out if the target is localhost and we're on Windows. + if (grpc_ares_maybe_resolve_localhost_manually_locked(r, name, default_port, + addrs)) { + grpc_ares_complete_request_locked(r); + return r; + } + // Don't query for SRV and TXT records if the target is "localhost", so + // as to cut down on lookups over the network, especially in tests: + // https://github.com/grpc/proposal/pull/79 + if (target_matches_localhost(name)) { + r->balancer_addresses_out = nullptr; + r->service_config_json_out = nullptr; + } + // Look up name using c-ares lib. + grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( + r, dns_server, name, default_port, interested_parties, query_timeout_ms, + std::move(work_serializer)); + return r; +} - private: - explicit GrpcResolveAddressAresRequest(const char* name, - const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addrs_out) - : name_(name), - default_port_(default_port), - interested_parties_(interested_parties), - on_resolve_address_done_(on_done), - addrs_out_(addrs_out) {} - - void OnDNSLookupDoneLocked(grpc_error* error) { - grpc_resolved_addresses** resolved_addresses = addrs_out_; - if (addresses_ == nullptr || addresses_->empty()) { - *resolved_addresses = nullptr; - } else { - *resolved_addresses = static_cast( - gpr_zalloc(sizeof(grpc_resolved_addresses))); - (*resolved_addresses)->naddrs = addresses_->size(); - (*resolved_addresses)->addrs = - static_cast(gpr_zalloc( - sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); - for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { - memcpy(&(*resolved_addresses)->addrs[i], &(*addresses_)[i].address(), - sizeof(grpc_resolved_address)); - } - } - ExecCtx::Run(DEBUG_LOCATION, on_resolve_address_done_, error); - delete this; - } - - // work_serializer that queries and related callbacks run under - std::shared_ptr work_serializer_ = - std::make_shared(); - // target name - const char* name_; - // default port to use if none is specified - const char* default_port_; - // pollset_set to be driven by - grpc_pollset_set* interested_parties_; - // closure to call when the resolve_address_ares request completes - grpc_closure* on_resolve_address_done_; - // the pointer to receive the resolved addresses - grpc_resolved_addresses** addrs_out_; - // currently resolving addresses - std::unique_ptr addresses_; - // underlying ares_request that the query is performed on - OrphanablePtr ares_request_; -}; - -} // namespace - -void (*ResolveAddressAres)(const char* name, const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addrs) = - GrpcResolveAddressAresRequest::GrpcResolveAddressAresImpl; - -OrphanablePtr (*LookupAresLocked)( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, - std::unique_ptr* addrs, - std::unique_ptr* balancer_addrs, - absl::optional* service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) = AresRequest::Create; - -namespace { - -void LogAddressSortingList(const AresRequest* request, - const ServerAddressList& addresses, - const char* input_output_str) { - for (size_t i = 0; i < addresses.size(); i++) { - std::string addr_str = - grpc_sockaddr_to_string(&addresses[i].address(), true); - gpr_log(GPR_INFO, - "(c-ares resolver) request:%p c-ares address sorting: %s[%" PRIuPTR - "]=%s", - request, input_output_str, i, addr_str.c_str()); +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, + std::unique_ptr* addrs, + std::unique_ptr* balancer_addrs, + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) = + grpc_dns_lookup_ares_locked_impl; + +static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { + GPR_ASSERT(r != nullptr); + if (r->ev_driver != nullptr) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } -} // namespace +void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) = + grpc_cancel_ares_request_locked_impl; -void AddressSortingSort(const AresRequest* request, - ServerAddressList* addresses, - const std::string& logging_prefix) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { - LogAddressSortingList(request, *addresses, - absl::StrCat(logging_prefix, "-input").c_str()); - } - std::vector sortables; - sortables.resize(addresses->size()); - for (size_t i = 0; i < addresses->size(); ++i) { - sortables[i].user_data = &(*addresses)[i]; - memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, - (*addresses)[i].address().len); - sortables[i].dest_addr.len = (*addresses)[i].address().len; - } - address_sorting_rfc_6724_sort(sortables.data(), addresses->size()); - ServerAddressList sorted; - sorted.reserve(addresses->size()); - for (size_t i = 0; i < addresses->size(); ++i) { - sorted.emplace_back(*static_cast(sortables[i].user_data)); - } - *addresses = std::move(sorted); - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { - LogAddressSortingList(request, *addresses, - absl::StrCat(logging_prefix, "-output").c_str()); +// ares_library_init and ares_library_cleanup are currently no-op except under +// Windows. Calling them may cause race conditions when other parts of the +// binary calls these functions concurrently. +#ifdef GPR_WINDOWS +grpc_error* grpc_ares_init(void) { + int status = ares_library_init(ARES_LIB_INIT_ALL); + if (status != ARES_SUCCESS) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("ares_library_init failed: ", ares_strerror(status)) + .c_str()); } + return GRPC_ERROR_NONE; } -namespace internal { +void grpc_ares_cleanup(void) { ares_library_cleanup(); } +#else +grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; } +void grpc_ares_cleanup(void) {} +#endif // GPR_WINDOWS -namespace { +/* + * grpc_resolve_address_ares related structs and functions + */ -void NoopInjectChannelConfig(ares_channel /*channel*/) {} +typedef struct grpc_resolve_address_ares_request { + /* work_serializer that queries and related callbacks run under */ + std::shared_ptr work_serializer; + /** the pointer to receive the resolved addresses */ + grpc_resolved_addresses** addrs_out; + /** currently resolving addresses */ + std::unique_ptr addresses; + /** closure to call when the resolve_address_ares request completes */ + grpc_closure* on_resolve_address_done; + /** a closure wrapping on_resolve_address_done, which should be invoked when + the grpc_dns_lookup_ares_locked operation is done. */ + grpc_closure on_dns_lookup_done_locked; + /* target name */ + const char* name; + /* default port to use if none is specified */ + const char* default_port; + /* pollset_set to be driven by */ + grpc_pollset_set* interested_parties; + /* underlying ares_request that the query is performed on */ + grpc_ares_request* ares_request = nullptr; +} grpc_resolve_address_ares_request; + +static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, + grpc_error* error) { + gpr_free(r->ares_request); + grpc_resolved_addresses** resolved_addresses = r->addrs_out; + if (r->addresses == nullptr || r->addresses->empty()) { + *resolved_addresses = nullptr; + } else { + *resolved_addresses = static_cast( + gpr_zalloc(sizeof(grpc_resolved_addresses))); + (*resolved_addresses)->naddrs = r->addresses->size(); + (*resolved_addresses)->addrs = + static_cast(gpr_zalloc( + sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); + for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { + memcpy(&(*resolved_addresses)->addrs[i], &(*r->addresses)[i].address(), + sizeof(grpc_resolved_address)); + } + } + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, error); + delete r; +} -} // namespace +static void on_dns_lookup_done(void* arg, grpc_error* error) { + grpc_resolve_address_ares_request* r = + static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, + DEBUG_LOCATION); +} -void (*AresTestOnlyInjectConfig)(ares_channel channel) = - NoopInjectChannelConfig; +static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { + grpc_resolve_address_ares_request* r = + static_cast(arg); + GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r, + grpc_schedule_on_exec_ctx); + r->ares_request = grpc_dns_lookup_ares_locked( + nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, + &r->on_dns_lookup_done_locked, &r->addresses, + nullptr /* balancer_addresses */, nullptr /* service_config_json */, + GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->work_serializer); +} -} // namespace internal +static void grpc_resolve_address_ares_impl(const char* name, + const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addrs) { + grpc_resolve_address_ares_request* r = + new grpc_resolve_address_ares_request(); + r->work_serializer = std::make_shared(); + r->addrs_out = addrs; + r->on_resolve_address_done = on_done; + r->name = name; + r->default_port = default_port; + r->interested_parties = interested_parties; + r->work_serializer->Run( + [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); }, + DEBUG_LOCATION); +} -} // namespace grpc_core +void (*grpc_resolve_address_ares)( + const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, + grpc_resolved_addresses** addrs) = grpc_resolve_address_ares_impl; #endif /* GRPC_ARES == 1 */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index db9bde12ae6..6c29bb68a47 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -23,16 +23,10 @@ #include -#include "absl/strings/str_cat.h" -#include "absl/types/optional.h" - -#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/filters/client_channel/server_address.h" -#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 @@ -48,284 +42,61 @@ extern grpc_core::TraceFlag grpc_trace_cares_resolver; } \ } while (0) -namespace grpc_core { - -/// An AresRequest is a handle over a complete name resolution process -/// (A queries, AAAA queries, etc.). An AresRequest is created with a call -/// to LookupAresLocked, and the name resolution process begins when -/// it's created. The name resolution process can be terminated abruptly -/// by invoking \a Orphan. The \a interested_parties parameter must remain -/// alive until either the \a Orphan is invoked, or \a on_done is called, -/// whichever happens first. -class AresRequest final : public InternallyRefCounted { - public: - static OrphanablePtr Create( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, - std::unique_ptr* addrs, - std::unique_ptr* balancer_addrs, - absl::optional* service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer); - - /// Callers should only create an AresRequest via \a LookupAresLocked, this - /// ctor is made public only to help the factory method. - AresRequest( - std::unique_ptr* addresses_out, - std::unique_ptr* balancer_addresses_out, - absl::optional* service_config_json_out, - grpc_pollset_set* pollset_set, int query_timeout_ms, - std::function on_done, - std::shared_ptr work_serializer); - - ~AresRequest() final; - - /// Unref and Cancel the pending request if it's still in flight. Must be - /// called while holding the WorkSerializer that was used to call \a - /// LookupAresLocked. - void Orphan() override; - - /// Initialize the gRPC ares wrapper. Must be called at least once before - /// ResolveAddressAres(). - static grpc_error* Init(void); - - /// Uninitialized the gRPC ares wrapper. If there was more than one previous - /// call to AresInit(), this function uninitializes the gRPC ares - /// wrapper only if it has been called the same number of times as - /// AresInit(). - static void Shutdown(void); - - private: - /// Tracks state needed to perform one A or AAAA lookup with the c-ares lib. - /// Note that \a Create both constructs an AddressQuery object and arranges - /// for it's deletion. - class AddressQuery final { - public: - static void Create(AresRequest* request, const std::string& host, - uint16_t port, bool is_balancer, int address_family); - ~AddressQuery(); - - private: - AddressQuery(AresRequest* request, const std::string& host, uint16_t port, - bool is_balancer, int address_family); - static void OnHostByNameDoneLocked(void* arg, int status, int timeouts, - struct hostent* hostent); - - // the request which spawned this query - AresRequest* request_; - // host to resolve - const std::string host_; - // port to use in resulting socket addresses, in network byte order - const uint16_t port_; - // is it a grpclb address - const bool is_balancer_; - // for logging and errors: the query type ("A" or "AAAA") - const char* qtype_; - // the address family (AF_INET or AF_INET6) - const int address_family_; - }; - - /// Tracks state needed to perform one SRV lookup with the c-ares lib. - /// Note that \a Create both constructs an AddressQuery object and arranges - /// for it's deletion. - class SRVQuery final { - public: - static void Create(AresRequest* request); - ~SRVQuery(); - - private: - explicit SRVQuery(AresRequest* request); - - static void OnSRVQueryDoneLocked(void* arg, int status, int timeouts, - unsigned char* abuf, int alen); - - // the request which spawned this query - AresRequest* request_; - }; - - /// Tracks state needed to perform one TXT lookup with the c-ares lib. - /// Note that \a Create both constructs an AddressQuery object and arranges - /// for it's deletion. - class TXTQuery final { - public: - static void Create(AresRequest* request); - ~TXTQuery(); - - private: - explicit TXTQuery(AresRequest* request); - - static void OnTXTDoneLocked(void* arg, int status, int timeouts, - unsigned char* buf, int len); - - // the request which spawned this query - AresRequest* request_; - }; - - // An FdNode tracks an fd and its relevant state for polling it as - // needed to carry out a c-ares resolution. - class FdNode final { - public: - FdNode(RefCountedPtr request, - std::unique_ptr grpc_polled_fd); - - ~FdNode(); - - void MaybeRegisterForOnReadableLocked(); - - void MaybeRegisterForOnWritableLocked(); - - void MaybeShutdownLocked(absl::string_view reason); - - bool IsActiveLocked(); - - bool shutdown() { return shutdown_; } - - GrpcPolledFd* grpc_polled_fd() { return grpc_polled_fd_.get(); } - - private: - void OnReadableLocked(grpc_error* error); - - static void OnReadable(void* arg, grpc_error* error); - - void OnWritableLocked(grpc_error* error); - - static void OnWritable(void* arg, grpc_error* error); - - RefCountedPtr request_; - // a closure wrapping OnReadableLocked, which should be - // invoked when the fd in this node becomes readable. - grpc_closure read_closure_; - // a closure wrapping OnWritableLocked, which should be - // invoked when the fd in this node becomes writable. - grpc_closure write_closure_; - // wrapped fd that's polled by grpc's poller for the current platform - std::unique_ptr grpc_polled_fd_; - // if the readable closure has been registered - bool readable_registered_ = false; - // if the writable closure has been registered - bool writable_registered_ = false; - // if the fd has been shutdown yet from grpc iomgr perspective - bool shutdown_ = false; - }; - - void ShutdownIOLocked(absl::string_view reason); - - grpc_millis CalculateNextAresBackupPollAlarm() const; - - void OnTimeoutLocked(grpc_error* error); - - static void OnTimeout(void* arg, grpc_error* error); - - void OnAresBackupPollAlarmLocked(grpc_error* error); - - static void OnAresBackupPollAlarm(void* arg, grpc_error* error); - - void NotifyOnEventLocked(); - - void ContinueAfterCheckLocalhostAndIPLiteralsLocked( - absl::string_view dns_server); - - void DecrementPendingQueries(); - - void MaybeCallOnDoneLocked(); - - bool ResolveAsIPLiteralLocked(); - - bool MaybeResolveLocalHostManuallyLocked(); - - std::string srv_qname() const { - return absl::StrCat("_grpclb._tcp.", target_host_); - } - - std::string txt_qname() const { - return absl::StrCat("_grpc_config.", target_host_); - } - - // the host component of the service name to resolve - std::string target_host_; - // the numeric port number to access the service on, stored in - // network byte order - uint16_t target_port_ = 0; - // the pointer to receive the resolved addresses - std::unique_ptr* addresses_out_; - // the pointer to receive the resolved balancer addresses - std::unique_ptr* balancer_addresses_out_; - // the pointer to receive the service config in JSON - absl::optional* service_config_json_out_; - // the ares_channel owned by this request - ares_channel channel_ = nullptr; - // pollset set for driving the IO events of the channel - grpc_pollset_set* pollset_set_; - // work_serializer to synchronize c-ares and I/O callbacks on - std::shared_ptr work_serializer_; - // Number of active DNS queries (one for A, another for AAAA, etc.). - int pending_queries_ = 0; - // the fds that this request is currently using. - std::map> fds_; - // is this request being shut down - bool shutting_down_ = false; - // Owned by the ev_driver. Creates new GrpcPolledFd's - std::unique_ptr polled_fd_factory_; - // query timeout in milliseconds - int query_timeout_ms_; - // alarm to cancel active queries - grpc_timer query_timeout_; - // cancels queries on a timeout - grpc_closure on_timeout_locked_; - // alarm to poll ares_process on in case fd events don't happen - grpc_timer ares_backup_poll_alarm_; - // polls ares_process on a periodic timer - grpc_closure on_ares_backup_poll_alarm_locked_; - // callback to schedule when the request completes, empty means that - // we've already scheduled the callback - std::function on_done_; - // the errors explaining query failures, appended to in query callbacks - grpc_error* error_ = GRPC_ERROR_NONE; -}; - -/// Asynchronously resolve \a name. Use \a default_port if a port isn't -/// designated in \a name, otherwise use the port in \a name. -/// AresInit() must be called at least once before this function. -extern void (*ResolveAddressAres)(const char* name, const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addresses); - -/// Asynchronously resolve \a name. It will try to resolve grpclb SRV records in -/// addition to the normal address records if \a balancer_addresses is not -/// nullptr. For normal address records, it uses \a default_port if a port isn't -/// designated in \a name, otherwise it uses the port in \a name. AresInit() -/// must be called at least once before this function. The returned -/// AresRequest is safe to destroy after \a on_done is called back. -/// -/// TODO(apolcyn): as a part of moving to new gRPC DNS API, remove the -/// work_serializer parameter and synchronize internally instead. -extern OrphanablePtr (*LookupAresLocked)( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, +typedef struct grpc_ares_request grpc_ares_request; + +/* Asynchronously resolve \a name. Use \a default_port if a port isn't + designated in \a name, otherwise use the port in \a name. grpc_ares_init() + must be called at least once before this function. \a on_done may be + called directly in this function without being scheduled with \a exec_ctx, + so it must not try to acquire locks that are being held by the caller. */ +extern void (*grpc_resolve_address_ares)(const char* name, + const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addresses); + +/* Asynchronously resolve \a name. It will try to resolve grpclb SRV records in + addition to the normal address records. For normal address records, it uses + \a default_port if a port isn't designated in \a name, otherwise it uses the + port in \a name. grpc_ares_init() must be called at least once before this + function. \a on_done may be called directly in this function without being + scheduled with \a exec_ctx, so it must not try to acquire locks that are + being held by the caller. The returned grpc_ares_request object is owned + by the caller and it is safe to free after on_done is called back. */ +extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - absl::optional* service_config_json, int query_timeout_ms, + char** service_config_json, int query_timeout_ms, std::shared_ptr work_serializer); -/// Indicates whether or not AAAA queries should be attempted. -/// E.g., return false if ipv6 is known to not be available. -bool AresQueryIPv6(); +/* Cancel the pending grpc_ares_request \a request */ +extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); + +/* Initialize gRPC ares wrapper. Must be called at least once before + grpc_resolve_address_ares(). */ +grpc_error* grpc_ares_init(void); -/// Sorts destinations in \a addresses according to RFC 6724. -void AddressSortingSort(const AresRequest* request, - ServerAddressList* addresses, - const std::string& logging_prefix); +/* Uninitialized gRPC ares wrapper. If there was more than one previous call to + grpc_ares_init(), this function uninitializes the gRPC ares wrapper only if + it has been called the same number of times as grpc_ares_init(). */ +void grpc_ares_cleanup(void); -namespace internal { +/** Schedules the desired callback for request completion + * and destroys the grpc_ares_request */ +void grpc_ares_complete_request_locked(grpc_ares_request* request); -/// Exposed in this header for C-core tests only -extern void (*AresTestOnlyInjectConfig)(ares_channel channel); +/* Indicates whether or not AAAA queries should be attempted. */ +/* E.g., return false if ipv6 is known to not be available. */ +bool grpc_ares_query_ipv6(); -} // namespace internal +/* Sorts destinations in lb_addrs according to RFC 6724. */ +void grpc_cares_wrapper_address_sorting_sort( + const grpc_ares_request* request, grpc_core::ServerAddressList* addresses); -} // namespace grpc_core +/* Exposed in this header for C-core tests only */ +extern void (*grpc_ares_test_only_inject_config)(ares_channel channel); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc index 326fabb4586..1d4a90fbd90 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc @@ -28,15 +28,11 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/parse_address.h" -namespace grpc_core { - -bool AresQueryIPv6() { +bool grpc_ares_query_ipv6() { /* The libuv grpc code currently does not have the code to probe for this, * so we assume for now that IPv6 is always available in contexts where this * code will be used. */ return true; } -} // namespace grpc_core - #endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index f9648276553..23c0fec74f3 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -24,10 +24,6 @@ #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/socket_utils_posix.h" -namespace grpc_core { - -bool AresQueryIPv6() { return grpc_ipv6_loopback_available(); } - -} // namespace grpc_core +bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index 84d55841396..df11db3624d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -29,10 +29,6 @@ #include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/socket_windows.h" -namespace grpc_core { - -bool AresQueryIPv6() { return grpc_ipv6_loopback_available(); } - -} // namespace grpc_core +bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } #endif /* GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) */ diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index c6e07b3dff3..f79031dd55a 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -60,19 +60,15 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable test_resolver = {my_resolve_address, nullptr}; -static grpc_core::OrphanablePtr -my_dns_lookup_ares_locked( - absl::string_view /*dns_server*/, absl::string_view addr, - absl::string_view /*default_port*/, - grpc_pollset_set* /*interested_parties*/, - std::function on_done, +static grpc_ares_request* my_dns_lookup_ares_locked( + const char* /*dns_server*/, const char* addr, const char* /*default_port*/, + grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - absl::optional* /*service_config_json*/, - int /*query_timeout_ms*/, - std::shared_ptr work_serializer) { // NOLINT + char** /*service_config_json*/, int /*query_timeout_ms*/, + std::shared_ptr /*combiner*/) { // NOLINT gpr_mu_lock(&g_mu); - GPR_ASSERT("test" == addr); + GPR_ASSERT(0 == strcmp("test", addr)); grpc_error* error = GRPC_ERROR_NONE; if (g_fail_resolution) { g_fail_resolution = false; @@ -86,10 +82,14 @@ my_dns_lookup_ares_locked( phony_resolved_address.len = 123; (*addresses)->emplace_back(phony_resolved_address, nullptr); } - work_serializer->Run([on_done, error]() { on_done(error); }, DEBUG_LOCATION); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error); return nullptr; } +static void my_cancel_ares_request_locked(grpc_ares_request* request) { + GPR_ASSERT(request == nullptr); +} + static grpc_core::OrphanablePtr create_resolver( const char* name, std::unique_ptr result_handler) { @@ -166,7 +166,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_set_resolver_impl(&test_resolver); - grpc_core::LookupAresLocked = my_dns_lookup_ares_locked; + grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; + grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; { grpc_core::ExecCtx exec_ctx; @@ -176,8 +177,7 @@ int main(int argc, char** argv) { std::unique_ptr(result_handler)); ResultHandler::ResolverOutput output1; result_handler->SetOutput(&output1); - grpc_core::Resolver* r = resolver.get(); - work_serializer->Run([r]() { r->StartLocked(); }, DEBUG_LOCATION); + resolver->StartLocked(); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(wait_loop(5, &output1.ev)); GPR_ASSERT(output1.result.addresses.empty()); diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index d9577097751..cfedceba0df 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -37,14 +37,12 @@ static grpc_address_resolver_vtable* default_resolve_address; static std::shared_ptr* g_work_serializer; -static grpc_core::OrphanablePtr ( - *g_default_dns_lookup_ares_locked)( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, +static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - absl::optional* service_config_json, int query_timeout_ms, + char** service_config_json, int query_timeout_ms, std::shared_ptr work_serializer); // Counter incremented by test_resolve_address_impl indicating the number of @@ -98,20 +96,17 @@ static grpc_error* test_blocking_resolve_address_impl( static grpc_address_resolver_vtable test_resolver = { test_resolve_address_impl, test_blocking_resolve_address_impl}; -static grpc_core::OrphanablePtr -test_dns_lookup_ares_locked( - absl::string_view dns_server, absl::string_view name, - absl::string_view default_port, grpc_pollset_set* /*interested_parties*/, - std::function on_done, +static grpc_ares_request* test_dns_lookup_ares_locked( + const char* dns_server, const char* name, const char* default_port, + grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - absl::optional* service_config_json, int query_timeout_ms, + char** service_config_json, int query_timeout_ms, std::shared_ptr work_serializer) { - grpc_core::OrphanablePtr result = - g_default_dns_lookup_ares_locked( - dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, - addresses, balancer_addresses, service_config_json, query_timeout_ms, - std::move(work_serializer)); + grpc_ares_request* result = g_default_dns_lookup_ares_locked( + dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, + addresses, balancer_addresses, service_config_json, query_timeout_ms, + std::move(work_serializer)); ++g_resolution_count; static grpc_millis last_resolution_time = 0; grpc_millis now = @@ -345,8 +340,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; - g_default_dns_lookup_ares_locked = grpc_core::LookupAresLocked; - grpc_core::LookupAresLocked = test_dns_lookup_ares_locked; + g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; + grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; default_resolve_address = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 6ad3eb4af25..d0ca2196e05 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -49,16 +49,16 @@ static void* tag(intptr_t t) { return reinterpret_cast(t); } static gpr_mu g_mu; static int g_resolve_port = -1; -static grpc_core::OrphanablePtr ( - *iomgr_dns_lookup_ares_locked)( - absl::string_view dns_server, absl::string_view addr, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, +static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( + const char* dns_server, const char* addr, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - absl::optional* service_config_json, int query_timeout_ms, + char** service_config_json, int query_timeout_ms, std::shared_ptr combiner); +static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); + static void set_resolve_port(int port) { gpr_mu_lock(&g_mu); g_resolve_port = port; @@ -107,16 +107,14 @@ static grpc_error* my_blocking_resolve_address( static grpc_address_resolver_vtable test_resolver = { my_resolve_address, my_blocking_resolve_address}; -static grpc_core::OrphanablePtr -my_dns_lookup_ares_locked( - absl::string_view dns_server, absl::string_view addr, - absl::string_view default_port, grpc_pollset_set* interested_parties, - std::function on_done, +static grpc_ares_request* my_dns_lookup_ares_locked( + const char* dns_server, const char* addr, const char* default_port, + grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - absl::optional* service_config_json, int query_timeout_ms, + char** service_config_json, int query_timeout_ms, std::shared_ptr work_serializer) { - if (addr != "test") { + if (0 != strcmp(addr, "test")) { return iomgr_dns_lookup_ares_locked( dns_server, addr, default_port, interested_parties, on_done, addresses, balancer_addresses, service_config_json, query_timeout_ms, @@ -137,10 +135,16 @@ my_dns_lookup_ares_locked( (*addresses)->emplace_back(&sa, sizeof(sa), nullptr); gpr_mu_unlock(&g_mu); } - work_serializer->Run([on_done, error] { on_done(error); }, DEBUG_LOCATION); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error); return nullptr; } +static void my_cancel_ares_request_locked(grpc_ares_request* request) { + if (request != nullptr) { + iomgr_cancel_ares_request_locked(request); + } +} + int main(int argc, char** argv) { grpc_completion_queue* cq; cq_verifier* cqv; @@ -153,8 +157,10 @@ int main(int argc, char** argv) { grpc_init(); default_resolver = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); - iomgr_dns_lookup_ares_locked = grpc_core::LookupAresLocked; - grpc_core::LookupAresLocked = my_dns_lookup_ares_locked; + iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; + iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked; + grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; + grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; int was_cancelled1; int was_cancelled2; diff --git a/test/cpp/naming/address_sorting_test.cc b/test/cpp/naming/address_sorting_test.cc index f1c98c03a07..edcee212b10 100644 --- a/test/cpp/naming/address_sorting_test.cc +++ b/test/cpp/naming/address_sorting_test.cc @@ -205,7 +205,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnreachableAddresses) { {"1.2.3.4:443", AF_INET}, {"5.6.7.8:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "1.2.3.4:443", "5.6.7.8:443", @@ -224,7 +224,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnsupportedDomainIpv6) { {"[2607:f8b0:400a:801::1002]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "1.2.3.4:443", "[2607:f8b0:400a:801::1002]:443", @@ -244,7 +244,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnsupportedDomainIpv4) { {"[2607:f8b0:400a:801::1002]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[2607:f8b0:400a:801::1002]:443", "1.2.3.4:443", @@ -268,7 +268,7 @@ TEST_F(AddressSortingTest, TestDepriotizesNonMatchingScope) { {"[2000:f8b0:400a:801::1002]:443", AF_INET6}, {"[fec0::5000]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[fec0::5000]:443", "[2000:f8b0:400a:801::1002]:443", @@ -291,7 +291,7 @@ TEST_F(AddressSortingTest, TestUsesLabelFromDefaultTable) { {"[2002::5001]:443", AF_INET6}, {"[2001::5001]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[2001::5001]:443", "[2002::5001]:443", @@ -314,7 +314,7 @@ TEST_F(AddressSortingTest, TestUsesLabelFromDefaultTableInputFlipped) { {"[2001::5001]:443", AF_INET6}, {"[2002::5001]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[2001::5001]:443", "[2002::5001]:443", @@ -337,7 +337,7 @@ TEST_F(AddressSortingTest, {"[3ffe::5001]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs( lb_addrs, { // The AF_INET address should be IPv4-mapped by the sort, @@ -370,7 +370,7 @@ TEST_F(AddressSortingTest, {v4_compat_dest, AF_INET6}, {"[::1]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", v4_compat_dest, @@ -393,7 +393,7 @@ TEST_F(AddressSortingTest, {"[1234::2]:443", AF_INET6}, {"[::1]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs( lb_addrs, { @@ -417,7 +417,7 @@ TEST_F(AddressSortingTest, {"[2001::1234]:443", AF_INET6}, {"[2000::5001]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs( lb_addrs, { // The 2000::/16 address should match the ::/0 prefix rule @@ -441,7 +441,7 @@ TEST_F( {"[2001::1231]:443", AF_INET6}, {"[2000::5001]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[2000::5001]:443", "[2001::1231]:443", @@ -462,7 +462,7 @@ TEST_F(AddressSortingTest, {"[fec0::1234]:443", AF_INET6}, {"[fc00::5001]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[fc00::5001]:443", "[fec0::1234]:443", @@ -487,7 +487,7 @@ TEST_F( {"[::ffff:1.1.1.2]:443", AF_INET6}, {"[1234::2]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { // ::ffff:0:2 should match the v4-mapped // precedence entry and be deprioritized. @@ -514,7 +514,7 @@ TEST_F(AddressSortingTest, TestPrefersSmallerScope) { {"[3ffe::5001]:443", AF_INET6}, {"[fec0::1234]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[fec0::1234]:443", "[3ffe::5001]:443", @@ -539,7 +539,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestMatchingSrcDstPrefix) { {"[3ffe:5001::]:443", AF_INET6}, {"[3ffe:1234::]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:1234::]:443", "[3ffe:5001::]:443", @@ -560,7 +560,7 @@ TEST_F(AddressSortingTest, {"[3ffe::5001]:443", AF_INET6}, {"[3ffe::1234]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1234]:443", "[3ffe::5001]:443", @@ -580,7 +580,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixStressInnerBytePrefix) { {"[3ffe:8000::]:443", AF_INET6}, {"[3ffe:2000::]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:2000::]:443", "[3ffe:8000::]:443", @@ -600,7 +600,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixDiffersOnHighestBitOfByte) { {"[3ffe:6::]:443", AF_INET6}, {"[3ffe:c::]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:c::]:443", "[3ffe:6::]:443", @@ -622,7 +622,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixDiffersByLastBit) { {"[3ffe:1111:1111:1110::]:443", AF_INET6}, {"[3ffe:1111:1111:1111::]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:1111:1111:1111::]:443", "[3ffe:1111:1111:1110::]:443", @@ -644,7 +644,7 @@ TEST_F(AddressSortingTest, TestStableSort) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1234]:443", "[3ffe::1235]:443", @@ -670,7 +670,7 @@ TEST_F(AddressSortingTest, TestStableSortFiveElements) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1231]:443", "[3ffe::1232]:443", @@ -691,7 +691,7 @@ TEST_F(AddressSortingTest, TestStableSortNoSrcAddrsExist) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1231]:443", "[3ffe::1232]:443", @@ -709,7 +709,7 @@ TEST_F(AddressSortingTest, TestStableSortNoSrcAddrsExistWithIpv4) { {"[::ffff:5.6.7.8]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[::ffff:5.6.7.8]:443", "1.2.3.4:443", @@ -737,7 +737,7 @@ TEST_F(AddressSortingTest, TestStableSortV4CompatAndSiteLocalAddresses) { {"[fec0::2000]:443", AF_INET6}, {v4_compat_dest, AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { // The sort should be stable since @@ -758,7 +758,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6Loopback) { {"[::1]:443", AF_INET6}, {"127.0.0.1:443", AF_INET}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", "127.0.0.1:443", @@ -772,7 +772,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6LoopbackInputsFlipped) { {"127.0.0.1:443", AF_INET}, {"[::1]:443", AF_INET6}, }); - grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); + grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", "127.0.0.1:443", diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 2e727d4575b..371e3e5eb4e 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -35,7 +35,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/global_config_generic.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/pollset.h" @@ -56,8 +55,6 @@ #define BAD_SOCKET_RETURN_VAL (-1) #endif -GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_abort_on_leaks); - namespace { void* Tag(intptr_t t) { return reinterpret_cast(t); } @@ -408,10 +405,6 @@ TEST_F( int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); - // see notes in - // https://github.com/grpc/grpc/pull/25108#pullrequestreview-577881514 for - // motivation. - GPR_GLOBAL_CONFIG_SET(grpc_abort_on_leaks, true); ::testing::InitGoogleTest(&argc, argv); auto result = RUN_ALL_TESTS(); return result; diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index aa558d8065b..1fdc38d7a28 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -44,7 +44,6 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/global_config_generic.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/executor.h" @@ -127,8 +126,6 @@ ABSL_FLAG(std::string, expected_lb_policy, "", "Expected lb policy name that appears in resolver result channel " "arg. Empty for none."); -GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_abort_on_leaks); - namespace { class GrpcLBAddress final { @@ -587,7 +584,7 @@ void RunResolvesRelevantRecordsTest( absl::make_unique( g_fake_non_responsive_dns_server_port); - grpc_core::internal::AresTestOnlyInjectConfig = InjectBrokenNameServerList; + grpc_ares_test_only_inject_config = InjectBrokenNameServerList; whole_uri = absl::StrCat("dns:///", absl::GetFlag(FLAGS_target_name)); } else if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "False") { gpr_log(GPR_INFO, "Specifying authority in uris to: %s", @@ -671,10 +668,6 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) { int main(int argc, char** argv) { grpc_init(); grpc::testing::TestEnvironment env(argc, argv); - // see notes in - // https://github.com/grpc/grpc/pull/25108#pullrequestreview-577881514 for - // motivation. - GPR_GLOBAL_CONFIG_SET(grpc_abort_on_leaks, true); ::testing::InitGoogleTest(&argc, argv); grpc::testing::InitTest(&argc, &argv, true); if (absl::GetFlag(FLAGS_target_name).empty()) {