From 2ee70175bd091e14542d2209c6e7b0d74e549ec9 Mon Sep 17 00:00:00 2001 From: apolcyn Date: Wed, 17 Mar 2021 13:47:29 -0700 Subject: [PATCH] Convert grpc_ares_wrapper to C++ (#25108) --- .../resolver/dns/c_ares/dns_resolver_ares.cc | 47 +- .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 78 +- .../dns/c_ares/grpc_ares_ev_driver_posix.cc | 6 +- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 14 +- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 1749 +++++++---------- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 321 ++- .../dns/c_ares/grpc_ares_wrapper_libuv.cc | 6 +- .../dns/c_ares/grpc_ares_wrapper_posix.cc | 6 +- .../dns/c_ares/grpc_ares_wrapper_windows.cc | 6 +- .../dns_resolver_connectivity_test.cc | 28 +- .../resolvers/dns_resolver_cooldown_test.cc | 33 +- test/core/end2end/goaway_server_test.cc | 38 +- test/cpp/naming/address_sorting_test.cc | 52 +- test/cpp/naming/cancel_ares_query_test.cc | 7 + test/cpp/naming/resolver_component_test.cc | 9 +- 15 files changed, 1194 insertions(+), 1206 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 922ac453df3..5c9541af1f9 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -81,9 +81,8 @@ class AresDnsResolver : public Resolver { void StartResolvingLocked(); static void OnNextResolution(void* arg, grpc_error* error); - static void OnResolved(void* arg, grpc_error* error); - void OnNextResolutionLocked(grpc_error* error); void OnResolvedLocked(grpc_error* error); + void OnNextResolutionLocked(grpc_error* error); /// DNS server to use (if not system default) std::string dns_server_; @@ -107,11 +106,10 @@ class AresDnsResolver : public Resolver { /// closures used by the work_serializer grpc_closure on_next_resolution_; - grpc_closure on_resolved_; /// are we currently resolving? bool resolving_ = false; /// the pending resolving request - grpc_ares_request* pending_request_ = nullptr; + OrphanablePtr pending_request_; /// next resolution timer bool have_next_resolution_timer_ = false; grpc_timer next_resolution_timer_; @@ -124,7 +122,7 @@ class AresDnsResolver : public Resolver { /// currently resolving balancer addresses std::unique_ptr balancer_addresses_; /// currently resolving service config - char* service_config_json_ = nullptr; + absl::optional service_config_json_; // has shutdown been initiated bool shutdown_initiated_ = false; }; @@ -156,7 +154,6 @@ AresDnsResolver::AresDnsResolver(ResolverArgs args) // Closure initialization. GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx); } AresDnsResolver::~AresDnsResolver() { @@ -188,9 +185,7 @@ void AresDnsResolver::ShutdownLocked() { if (have_next_resolution_timer_) { grpc_timer_cancel(&next_resolution_timer_); } - if (pending_request_ != nullptr) { - grpc_cancel_ares_request_locked(pending_request_); - } + pending_request_.reset(); } void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) { @@ -226,7 +221,7 @@ bool ValueInJsonArray(const Json::Array& array, const char* value) { return false; } -std::string ChooseServiceConfig(char* service_config_choice_json, +std::string ChooseServiceConfig(absl::string_view service_config_choice_json, grpc_error** error) { Json json = Json::Parse(service_config_choice_json, error); if (*error != GRPC_ERROR_NONE) return ""; @@ -305,18 +300,10 @@ std::string ChooseServiceConfig(char* service_config_choice_json, return service_config->Dump(); } -void AresDnsResolver::OnResolved(void* arg, grpc_error* error) { - AresDnsResolver* r = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - r->work_serializer_->Run([r, error]() { r->OnResolvedLocked(error); }, - DEBUG_LOCATION); -} - void AresDnsResolver::OnResolvedLocked(grpc_error* error) { GPR_ASSERT(resolving_); resolving_ = false; - gpr_free(pending_request_); - pending_request_ = nullptr; + pending_request_.reset(); if (shutdown_initiated_) { Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown"); GRPC_ERROR_UNREF(error); @@ -327,10 +314,9 @@ void AresDnsResolver::OnResolvedLocked(grpc_error* error) { if (addresses_ != nullptr) { result.addresses = std::move(*addresses_); } - if (service_config_json_ != nullptr) { + if (service_config_json_.has_value()) { std::string service_config_string = ChooseServiceConfig( - service_config_json_, &result.service_config_error); - gpr_free(service_config_json_); + service_config_json_.value(), &result.service_config_error); if (result.service_config_error == GRPC_ERROR_NONE && !service_config_string.empty()) { GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", @@ -421,16 +407,17 @@ void AresDnsResolver::StartResolvingLocked() { Ref(DEBUG_LOCATION, "dns-resolving").release(); GPR_ASSERT(!resolving_); resolving_ = true; - service_config_json_ = nullptr; - pending_request_ = grpc_dns_lookup_ares_locked( - dns_server_.c_str(), name_to_resolve_.c_str(), kDefaultPort, - interested_parties_, &on_resolved_, &addresses_, + service_config_json_.reset(); + auto on_done = [this](grpc_error* error) { OnResolvedLocked(error); }; + pending_request_ = LookupAresLocked( + dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, + std::move(on_done), &addresses_, enable_srv_queries_ ? &balancer_addresses_ : nullptr, request_service_config_ ? &service_config_json_ : nullptr, query_timeout_ms_, work_serializer_); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", - this, pending_request_); + this, pending_request_.get()); } // @@ -463,7 +450,7 @@ static grpc_error* blocking_resolve_address_ares( } static grpc_address_resolver_vtable ares_resolver = { - grpc_resolve_address_ares, blocking_resolve_address_ares}; + grpc_core::ResolveAddressAres, blocking_resolve_address_ares}; #ifdef GRPC_UV /* TODO(murgatroid99): Remove this when we want the cares resolver to be the @@ -490,7 +477,7 @@ void grpc_resolver_dns_ares_init() { g_use_ares_dns_resolver = true; gpr_log(GPR_DEBUG, "Using ares dns resolver"); address_sorting_init(); - grpc_error* error = grpc_ares_init(); + grpc_error* error = grpc_core::AresRequest::Init(); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error); return; @@ -509,7 +496,7 @@ void grpc_resolver_dns_ares_init() { void grpc_resolver_dns_ares_shutdown() { if (g_use_ares_dns_resolver) { address_sorting_shutdown(); - grpc_ares_cleanup(); + grpc_core::AresRequest::Shutdown(); } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index cc884864862..feb2d3fc75e 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -1,20 +1,18 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H @@ -27,40 +25,49 @@ namespace grpc_core { -/* A wrapped fd that integrates with the grpc iomgr of the current platform. - * A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints - * from "ares_socket_t" sockets, and then sign up for readability/writeability - * with that poller, and do shutdown and destruction. */ +/// A wrapped fd that integrates with the grpc iomgr of the current platform. +/// A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints +/// from "ares_socket_t" sockets, and then sign up for readability/writeability +/// with that poller, and do shutdown and destruction. class GrpcPolledFd { public: virtual ~GrpcPolledFd() {} - /* Called when c-ares library is interested and there's no pending callback */ + // Called when c-ares library is interested and there's no pending callback virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) = 0; - /* Called when c-ares library is interested and there's no pending callback */ + // Called when c-ares library is interested and there's no pending callback virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) = 0; - /* Indicates if there is data left even after just being read from */ + // Indicates if there is data left even after just being read from virtual bool IsFdStillReadableLocked() = 0; - /* Called once and only once. Must cause cancellation of any pending - * read/write callbacks. */ + // Called once and only once. Must cause cancellation of any pending + // read/write callbacks. virtual void ShutdownLocked(grpc_error* error) = 0; - /* Get the underlying ares_socket_t that this was created from */ + // Get the underlying ares_socket_t that this was created from virtual ares_socket_t GetWrappedAresSocketLocked() = 0; - /* A unique name, for logging */ + // A unique name, for logging virtual const char* GetName() = 0; }; -/* A GrpcPolledFdFactory is 1-to-1 with and owned by the - * ares event driver. It knows how to create GrpcPolledFd's - * for the current platform, and the ares driver uses it for all of - * its fd's. */ +/// A GrpcPolledFdFactory is 1-to-1 with and owned by the +/// ares event driver. It knows how to create GrpcPolledFd's +/// for the current platform, and the ares driver uses it for all of +/// its fd's. class GrpcPolledFdFactory { public: virtual ~GrpcPolledFdFactory() {} - /* Creates a new wrapped fd for the current platform */ + + /// Creates a new wrapped fd for the current platform. + /// + /// Note about \a driver_pollset_set lifetime: the \a driver_pollset_set + /// param exists in this factory function because some \a GrpcPolledFd + /// implementations need to use it. If a \a GrpcPolledFd object does need + /// to use \a driver_pollset_set, it is safe to access it up through the point + /// that \a GrpcPolledFd::ShutdownLocked has been run, and it is no longer + /// safe to access after that. virtual GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, std::shared_ptr work_serializer) = 0; - /* Optionally configures the ares channel after creation */ + + /// Optionally configures the ares channel after creation virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; }; @@ -69,5 +76,4 @@ std::unique_ptr NewGrpcPolledFdFactory( } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ - */ +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 07f38ba59ab..94c4b786ff0 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -50,7 +50,6 @@ class GrpcPolledFdPosix : public GrpcPolledFd { } ~GrpcPolledFdPosix() override { - grpc_pollset_set_del_fd(driver_pollset_set_, fd_); /* c-ares library will close the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ @@ -73,6 +72,11 @@ class GrpcPolledFdPosix : public GrpcPolledFd { } void ShutdownLocked(grpc_error* error) override { + // After we return, driver_pollset_set_ is no longer + // safe to access because the overall \a LookupAresLocked call may + // complete, after which its owner may destroy it. So delete the fd now. + grpc_pollset_set_del_fd(driver_pollset_set_, fd_); + driver_pollset_set_ = nullptr; grpc_fd_shutdown(fd_, error); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index 8d4a0cb210d..c39b939baa4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -246,7 +246,7 @@ class GrpcPolledFdWindows { break; case WRITE_PENDING: case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY: - abort(); + GPR_ASSERT(0); } } } @@ -346,7 +346,7 @@ class GrpcPolledFdWindows { case SOCK_STREAM: return SendVTCP(wsa_error_ctx, iov, iov_count); default: - abort(); + GPR_ASSERT(0); } } @@ -417,7 +417,7 @@ class GrpcPolledFdWindows { tcp_write_state_ = WRITE_IDLE; return total_sent; } - abort(); + GPR_ASSERT(0); } static void OnTcpConnect(void* arg, grpc_error* error) { @@ -483,7 +483,7 @@ class GrpcPolledFdWindows { case SOCK_STREAM: return ConnectTCP(wsa_error_ctx, target, target_len); default: - abort(); + GPR_ASSERT(0); } } @@ -722,7 +722,9 @@ class SockToPolledFdMap { return node->polled_fd; } } - abort(); + gpr_log(GPR_ERROR, "LookupPolledFd for socket: %d failed. head_: %p", s, + head_); + GPR_ASSERT(0); } void RemoveEntry(SOCKET s) { @@ -737,7 +739,7 @@ class SockToPolledFdMap { } prev = &node->next; } - abort(); + GPR_ASSERT(0); } /* These virtual socket functions are called from within the c-ares diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index fa3249efa55..21ecb396e67 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -27,8 +27,10 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" #include #include @@ -37,7 +39,6 @@ #include #include -#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/iomgr/error.h" @@ -46,627 +47,68 @@ #include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/authority_override.h" -using grpc_core::ServerAddress; -using grpc_core::ServerAddressList; - grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, "cares_address_sorting"); grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver"); -typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; - -struct grpc_ares_request { - /** indicates the DNS server to use, if specified */ - struct ares_addr_port_node dns_server_addr; - /** following members are set in grpc_resolve_address_ares_impl */ - /** closure to call when the request completes */ - grpc_closure* on_done; - /** the pointer to receive the resolved addresses */ - std::unique_ptr* addresses_out; - /** the pointer to receive the resolved balancer addresses */ - std::unique_ptr* balancer_addresses_out; - /** the pointer to receive the service config in JSON */ - char** service_config_json_out; - /** the evernt driver used by this request */ - grpc_ares_ev_driver* ev_driver; - /** number of ongoing queries */ - size_t pending_queries; - - /** the errors explaining query failures, appended to in query callbacks */ - grpc_error* error; -}; - -typedef struct fd_node { - /** the owner of this fd node */ - grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_locked, which should be - invoked when the grpc_fd in this node becomes readable. */ - grpc_closure read_closure; - /** a closure wrapping on_writable_locked, which should be - invoked when the grpc_fd in this node becomes writable. */ - grpc_closure write_closure; - /** next fd node in the list */ - struct fd_node* next; - - /** wrapped fd that's polled by grpc's poller for the current platform */ - grpc_core::GrpcPolledFd* grpc_polled_fd; - /** if the readable closure has been registered */ - bool readable_registered; - /** if the writable closure has been registered */ - bool writable_registered; - /** if the fd has been shutdown yet from grpc iomgr perspective */ - bool already_shutdown; -} fd_node; - -struct grpc_ares_ev_driver { - /** the ares_channel owned by this event driver */ - ares_channel channel; - /** pollset set for driving the IO events of the channel */ - grpc_pollset_set* pollset_set; - /** refcount of the event driver */ - gpr_refcount refs; - - /** work_serializer to synchronize c-ares and I/O callbacks on */ - std::shared_ptr work_serializer; - /** a list of grpc_fd that this event driver is currently using. */ - fd_node* fds; - /** is this event driver being shut down */ - bool shutting_down; - /** request object that's using this ev driver */ - grpc_ares_request* request; - /** Owned by the ev_driver. Creates new GrpcPolledFd's */ - std::unique_ptr polled_fd_factory; - /** query timeout in milliseconds */ - int query_timeout_ms; - /** alarm to cancel active queries */ - grpc_timer query_timeout; - /** cancels queries on a timeout */ - grpc_closure on_timeout_locked; - /** alarm to poll ares_process on in case fd events don't happen */ - grpc_timer ares_backup_poll_alarm; - /** polls ares_process on a periodic timer */ - grpc_closure on_ares_backup_poll_alarm_locked; -}; - -// TODO(apolcyn): make grpc_ares_hostbyname_request a sub-class -// of GrpcAresQuery. -typedef struct grpc_ares_hostbyname_request { - /** following members are set in create_hostbyname_request_locked - */ - /** the top-level request instance */ - grpc_ares_request* parent_request; - /** host to resolve, parsed from the name to resolve */ - char* host; - /** port to fill in sockaddr_in, parsed from the name to resolve */ - uint16_t port; - /** is it a grpclb address */ - bool is_balancer; - /** for logging and errors: the query type ("A" or "AAAA") */ - const char* qtype; -} grpc_ares_hostbyname_request; - -static void grpc_ares_request_ref_locked(grpc_ares_request* r); -static void grpc_ares_request_unref_locked(grpc_ares_request* r); - -// TODO(apolcyn): as a part of C++-ification, find a way to -// organize per-query and per-resolution information in such a way -// that doesn't involve allocating a number of different data -// structures. -class GrpcAresQuery { - public: - explicit GrpcAresQuery(grpc_ares_request* r, const std::string& name) - : r_(r), name_(name) { - grpc_ares_request_ref_locked(r_); - } - - ~GrpcAresQuery() { grpc_ares_request_unref_locked(r_); } - - grpc_ares_request* parent_request() { return r_; } - - const std::string& name() { return name_; } - - private: - /* the top level request instance */ - grpc_ares_request* r_; - /** for logging and errors */ - const std::string name_; -}; - -static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( - grpc_ares_ev_driver* ev_driver) { - GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, - ev_driver); - gpr_ref(&ev_driver->refs); - return ev_driver; -} - -static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { - GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, - ev_driver); - if (gpr_unref(&ev_driver->refs)) { - GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, - ev_driver); - GPR_ASSERT(ev_driver->fds == nullptr); - ares_destroy(ev_driver->channel); - grpc_ares_complete_request_locked(ev_driver->request); - delete ev_driver; - } -} - -static void fd_node_destroy_locked(fd_node* fdn) { - GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); - GPR_ASSERT(!fdn->readable_registered); - GPR_ASSERT(!fdn->writable_registered); - GPR_ASSERT(fdn->already_shutdown); - delete fdn->grpc_polled_fd; - gpr_free(fdn); -} - -static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { - if (!fdn->already_shutdown) { - fdn->already_shutdown = true; - fdn->grpc_polled_fd->ShutdownLocked( - GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); - } -} - -void grpc_ares_ev_driver_on_queries_complete_locked( - grpc_ares_ev_driver* ev_driver) { - // We mark the event driver as being shut down. - // grpc_ares_notify_on_event_locked will shut down any remaining - // fds. - ev_driver->shutting_down = true; - grpc_timer_cancel(&ev_driver->query_timeout); - grpc_timer_cancel(&ev_driver->ares_backup_poll_alarm); - grpc_ares_ev_driver_unref(ev_driver); -} - -void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { - ev_driver->shutting_down = true; - fd_node* fn = ev_driver->fds; - while (fn != nullptr) { - fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); - fn = fn->next; - } -} - -// Search fd in the fd_node list head. This is an O(n) search, the max possible -// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { - fd_node phony_head; - phony_head.next = *head; - fd_node* node = &phony_head; - while (node->next != nullptr) { - if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) { - fd_node* ret = node->next; - node->next = node->next->next; - *head = phony_head.next; - return ret; - } - node = node->next; - } - return nullptr; -} - -static grpc_millis calculate_next_ares_backup_poll_alarm_ms( - grpc_ares_ev_driver* driver) { - // An alternative here could be to use ares_timeout to try to be more - // accurate, but that would require using "struct timeval"'s, which just makes - // things a bit more complicated. So just poll every second, as suggested - // by the c-ares code comments. - grpc_millis ms_until_next_ares_backup_poll_alarm = 1000; - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p. next ares process poll time in " - "%" PRId64 " ms", - driver->request, driver, ms_until_next_ares_backup_poll_alarm); - return ms_until_next_ares_backup_poll_alarm + - grpc_core::ExecCtx::Get()->Now(); -} - -static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_error* error) { - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, grpc_error_string(error)); - if (!driver->shutting_down && error == GRPC_ERROR_NONE) { - grpc_ares_ev_driver_shutdown_locked(driver); - } - grpc_ares_ev_driver_unref(driver); - GRPC_ERROR_UNREF(error); -} - -static void on_timeout(void* arg, grpc_error* error) { - grpc_ares_ev_driver* driver = static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - driver->work_serializer->Run( - [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); -} - -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); - -static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, - grpc_error* error); - -static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) { - grpc_ares_ev_driver* driver = static_cast(arg); - GRPC_ERROR_REF(error); - driver->work_serializer->Run( - [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, - DEBUG_LOCATION); -} - -/* In case of non-responsive DNS servers, dropped packets, etc., c-ares has - * intelligent timeout and retry logic, which we can take advantage of by - * polling ares_process_fd on time intervals. Overall, the c-ares library is - * meant to be called into and given a chance to proceed name resolution: - * a) when fd events happen - * b) when some time has passed without fd events having happened - * For the latter, we use this backup poller. Also see - * https://github.com/grpc/grpc/pull/17688 description for more details. */ -static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, - grpc_error* error) { - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " - "driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, grpc_error_string(error)); - if (!driver->shutting_down && error == GRPC_ERROR_NONE) { - fd_node* fdn = driver->fds; - while (fdn != nullptr) { - if (!fdn->already_shutdown) { - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; " - "ares_process_fd. fd=%s", - driver->request, driver, fdn->grpc_polled_fd->GetName()); - ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); - ares_process_fd(driver->channel, as, as); - } - fdn = fdn->next; - } - if (!driver->shutting_down) { - grpc_millis next_ares_backup_poll_alarm = - calculate_next_ares_backup_poll_alarm_ms(driver); - grpc_ares_ev_driver_ref(driver); - GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked, - on_ares_backup_poll_alarm, driver, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&driver->ares_backup_poll_alarm, - next_ares_backup_poll_alarm, - &driver->on_ares_backup_poll_alarm_locked); - } - grpc_ares_notify_on_event_locked(driver); - } - grpc_ares_ev_driver_unref(driver); - GRPC_ERROR_UNREF(error); -} - -static void on_readable_locked(fd_node* fdn, grpc_error* error) { - GPR_ASSERT(fdn->readable_registered); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); - fdn->readable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); - if (error == GRPC_ERROR_NONE) { - do { - ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); - } while (fdn->grpc_polled_fd->IsFdStillReadableLocked()); +namespace grpc_core { + +void AresRequest::AddressQuery::Create(AresRequest* request, + const std::string& host, uint16_t port, + bool is_balancer, int address_family) { + AddressQuery* q = + new AddressQuery(request, host, port, is_balancer, address_family); + // note that ares_gethostbyname can't be invoked from the ctor because it + // can run it's callback inline and invoke the dtor + ares_gethostbyname(request->channel_, q->host_.c_str(), q->address_family_, + OnHostByNameDoneLocked, q); +} + +AresRequest::AddressQuery::AddressQuery(AresRequest* request, + const std::string& host, uint16_t port, + bool is_balancer, int address_family) + : request_(request), + host_(host), + port_(port), + is_balancer_(is_balancer), + address_family_(address_family) { + ++request_->pending_queries_; + if (address_family_ == AF_INET) { + qtype_ = "A"; + } else if (address_family_ == AF_INET6) { + qtype_ = "AAAA"; } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); + GPR_ASSERT(0); } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); - GRPC_ERROR_UNREF(error); } -static void on_readable(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); - GRPC_ERROR_REF(error); /* ref owned by lambda */ - fdn->ev_driver->work_serializer->Run( - [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); +AresRequest::AddressQuery::~AddressQuery() { + request_->DecrementPendingQueries(); } -static void on_writable_locked(fd_node* fdn, grpc_error* error) { - GPR_ASSERT(fdn->writable_registered); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); - fdn->writable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); - if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); - } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); - GRPC_ERROR_UNREF(error); -} - -static void on_writable(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); - GRPC_ERROR_REF(error); /* ref owned by lambda */ - fdn->ev_driver->work_serializer->Run( - [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); -} - -// Get the file descriptors used by the ev_driver's ares channel, register -// driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { - fd_node* new_list = nullptr; - if (!ev_driver->shutting_down) { - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int socks_bitmask = - ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); - for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); - // Create a new fd_node if sock[i] is not in the fd_node list. - if (fdn == nullptr) { - fdn = static_cast(gpr_malloc(sizeof(fd_node))); - fdn->grpc_polled_fd = - ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( - socks[i], ev_driver->pollset_set, ev_driver->work_serializer); - GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); - fdn->ev_driver = ev_driver; - fdn->readable_registered = false; - fdn->writable_registered = false; - fdn->already_shutdown = false; - } - fdn->next = new_list; - new_list = fdn; - // Register read_closure if the socket is readable and read_closure has - // not been registered with this socket. - if (ARES_GETSOCK_READABLE(socks_bitmask, i) && - !fdn->readable_registered) { - grpc_ares_ev_driver_ref(ev_driver); - GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, - grpc_schedule_on_exec_ctx); - fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); - fdn->readable_registered = true; - } - // Register write_closure if the socket is writable and write_closure - // has not been registered with this socket. - if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && - !fdn->writable_registered) { - GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); - grpc_ares_ev_driver_ref(ev_driver); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, - grpc_schedule_on_exec_ctx); - fdn->grpc_polled_fd->RegisterForOnWriteableLocked( - &fdn->write_closure); - fdn->writable_registered = true; - } - } - } - } - // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and - // are therefore no longer in use, so they can be shut down and removed from - // the list. - while (ev_driver->fds != nullptr) { - fd_node* cur = ev_driver->fds; - ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown_locked(cur, "c-ares fd shutdown"); - if (!cur->readable_registered && !cur->writable_registered) { - fd_node_destroy_locked(cur); - } else { - cur->next = new_list; - new_list = cur; - } - } - ev_driver->fds = new_list; -} - -void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { - grpc_ares_notify_on_event_locked(ev_driver); - // Initialize overall DNS resolution timeout alarm - grpc_millis timeout = - ev_driver->query_timeout_ms == 0 - ? GRPC_MILLIS_INF_FUTURE - : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now(); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " - "%" PRId64 " ms", - ev_driver->request, ev_driver, timeout); - grpc_ares_ev_driver_ref(ev_driver); - GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&ev_driver->query_timeout, timeout, - &ev_driver->on_timeout_locked); - // Initialize the backup poll alarm - grpc_millis next_ares_backup_poll_alarm = - calculate_next_ares_backup_poll_alarm_ms(ev_driver); - grpc_ares_ev_driver_ref(ev_driver); - GRPC_CLOSURE_INIT(&ev_driver->on_ares_backup_poll_alarm_locked, - on_ares_backup_poll_alarm, ev_driver, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&ev_driver->ares_backup_poll_alarm, - next_ares_backup_poll_alarm, - &ev_driver->on_ares_backup_poll_alarm_locked); -} - -static void noop_inject_channel_config(ares_channel /*channel*/) {} - -void (*grpc_ares_test_only_inject_config)(ares_channel channel) = - noop_inject_channel_config; - -grpc_error* grpc_ares_ev_driver_create_locked( - grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, - int query_timeout_ms, - std::shared_ptr work_serializer, - grpc_ares_request* request) { - *ev_driver = new grpc_ares_ev_driver(); - ares_options opts; - memset(&opts, 0, sizeof(opts)); - opts.flags |= ARES_FLAG_STAYOPEN; - int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); - grpc_ares_test_only_inject_config((*ev_driver)->channel); - GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); - if (status != ARES_SUCCESS) { - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("Failed to init ares channel. C-ares error: ", - ares_strerror(status)) - .c_str()); - gpr_free(*ev_driver); - return err; - } - (*ev_driver)->work_serializer = std::move(work_serializer); - gpr_ref_init(&(*ev_driver)->refs, 1); - (*ev_driver)->pollset_set = pollset_set; - (*ev_driver)->fds = nullptr; - (*ev_driver)->shutting_down = false; - (*ev_driver)->request = request; - (*ev_driver)->polled_fd_factory = - grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer); - (*ev_driver) - ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); - (*ev_driver)->query_timeout_ms = query_timeout_ms; - return GRPC_ERROR_NONE; -} - -static void log_address_sorting_list(const grpc_ares_request* r, - const ServerAddressList& addresses, - const char* input_output_str) { - for (size_t i = 0; i < addresses.size(); i++) { - std::string addr_str = - grpc_sockaddr_to_string(&addresses[i].address(), true); - gpr_log(GPR_INFO, - "(c-ares resolver) request:%p c-ares address sorting: %s[%" PRIuPTR - "]=%s", - r, input_output_str, i, addr_str.c_str()); - } -} - -void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r, - ServerAddressList* addresses) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { - log_address_sorting_list(r, *addresses, "input"); - } - address_sorting_sortable* sortables = static_cast( - gpr_zalloc(sizeof(address_sorting_sortable) * addresses->size())); - for (size_t i = 0; i < addresses->size(); ++i) { - sortables[i].user_data = &(*addresses)[i]; - memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, - (*addresses)[i].address().len); - sortables[i].dest_addr.len = (*addresses)[i].address().len; - } - address_sorting_rfc_6724_sort(sortables, addresses->size()); - ServerAddressList sorted; - sorted.reserve(addresses->size()); - for (size_t i = 0; i < addresses->size(); ++i) { - sorted.emplace_back(*static_cast(sortables[i].user_data)); - } - gpr_free(sortables); - *addresses = std::move(sorted); - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { - log_address_sorting_list(r, *addresses, "output"); - } -} - -static void grpc_ares_request_ref_locked(grpc_ares_request* r) { - r->pending_queries++; -} - -static void grpc_ares_request_unref_locked(grpc_ares_request* r) { - r->pending_queries--; - if (r->pending_queries == 0u) { - grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver); - } -} - -void grpc_ares_complete_request_locked(grpc_ares_request* r) { - /* Invoke on_done callback and destroy the - request */ - r->ev_driver = nullptr; - ServerAddressList* addresses = r->addresses_out->get(); - if (addresses != nullptr) { - grpc_cares_wrapper_address_sorting_sort(r, addresses); - GRPC_ERROR_UNREF(r->error); - r->error = GRPC_ERROR_NONE; - // TODO(apolcyn): allow c-ares to return a service config - // with no addresses along side it - } - if (r->balancer_addresses_out != nullptr) { - ServerAddressList* balancer_addresses = r->balancer_addresses_out->get(); - if (balancer_addresses != nullptr) { - grpc_cares_wrapper_address_sorting_sort(r, balancer_addresses); - } - } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, r->error); -} - -/* Note that the returned object takes a reference to qtype, so - * qtype must outlive it. */ -static grpc_ares_hostbyname_request* create_hostbyname_request_locked( - grpc_ares_request* parent_request, const char* host, uint16_t port, - bool is_balancer, const char* qtype) { - GRPC_CARES_TRACE_LOG( - "request:%p create_hostbyname_request_locked host:%s port:%d " - "is_balancer:%d qtype:%s", - parent_request, host, port, is_balancer, qtype); - grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request(); - hr->parent_request = parent_request; - hr->host = gpr_strdup(host); - hr->port = port; - hr->is_balancer = is_balancer; - hr->qtype = qtype; - grpc_ares_request_ref_locked(parent_request); - return hr; -} - -static void destroy_hostbyname_request_locked( - grpc_ares_hostbyname_request* hr) { - grpc_ares_request_unref_locked(hr->parent_request); - gpr_free(hr->host); - delete hr; -} - -static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, - struct hostent* hostent) { - grpc_ares_hostbyname_request* hr = - static_cast(arg); - grpc_ares_request* r = hr->parent_request; +void AresRequest::AddressQuery::OnHostByNameDoneLocked( + void* arg, int status, int /*timeouts*/, struct hostent* hostent) { + std::unique_ptr q(static_cast(arg)); + AresRequest* request = q->request_; if (status == ARES_SUCCESS) { GRPC_CARES_TRACE_LOG( - "request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r, - hr->qtype, hr->host); + "request:%p OnHostByNameDoneLocked qtype=%s host=%s ARES_SUCCESS", + request, q->qtype_, q->host_.c_str()); std::unique_ptr* address_list_ptr = - hr->is_balancer ? r->balancer_addresses_out : r->addresses_out; + q->is_balancer_ ? request->balancer_addresses_out_ + : request->addresses_out_; if (*address_list_ptr == nullptr) { *address_list_ptr = absl::make_unique(); } ServerAddressList& addresses = **address_list_ptr; for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) { absl::InlinedVector args_to_add; - if (hr->is_balancer) { + if (q->is_balancer_) { args_to_add.emplace_back( - grpc_core::CreateAuthorityOverrideChannelArg(hr->host)); + CreateAuthorityOverrideChannelArg(q->host_.c_str())); } grpc_channel_args* args = grpc_channel_args_copy_and_add( nullptr, args_to_add.data(), args_to_add.size()); @@ -678,14 +120,14 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, memcpy(&addr.sin6_addr, hostent->h_addr_list[i], sizeof(struct in6_addr)); addr.sin6_family = static_cast(hostent->h_addrtype); - addr.sin6_port = hr->port; + addr.sin6_port = q->port_; addresses.emplace_back(&addr, addr_len, args); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); GRPC_CARES_TRACE_LOG( "request:%p c-ares resolver gets a AF_INET6 result: \n" " addr: %s\n port: %d\n sin6_scope_id: %d\n", - r, output, ntohs(hr->port), addr.sin6_scope_id); + request, output, ntohs(q->port_), addr.sin6_scope_id); break; } case AF_INET: { @@ -695,14 +137,14 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr)); addr.sin_family = static_cast(hostent->h_addrtype); - addr.sin_port = hr->port; + addr.sin_port = q->port_; addresses.emplace_back(&addr, addr_len, args); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); GRPC_CARES_TRACE_LOG( "request:%p c-ares resolver gets a AF_INET result: \n" " addr: %s\n port: %d\n", - r, output, ntohs(hr->port)); + request, output, ntohs(q->port_)); break; } } @@ -710,42 +152,53 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, } else { std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s", - hr->qtype, hr->host, hr->is_balancer, ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r, + q->qtype_, q->host_, q->is_balancer_, ares_strerror(status)); + GRPC_CARES_TRACE_LOG("request:%p OnHostByNameDoneLocked: %s", request, error_msg.c_str()); grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - r->error = grpc_error_add_child(error, r->error); + request->error_ = grpc_error_add_child(error, request->error_); } - destroy_hostbyname_request_locked(hr); } -static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* abuf, int alen) { - GrpcAresQuery* q = static_cast(arg); - grpc_ares_request* r = q->parent_request(); +void AresRequest::SRVQuery::Create(AresRequest* request) { + SRVQuery* q = new SRVQuery(request); + // note that ares_query can't be invoked from the ctor because it + // can run it's callback inline and invoke the dtor + ares_query(request->channel_, request->srv_qname().c_str(), ns_c_in, ns_t_srv, + OnSRVQueryDoneLocked, q); +} + +AresRequest::SRVQuery::SRVQuery(AresRequest* request) : request_(request) { + ++request_->pending_queries_; +} + +AresRequest::SRVQuery::~SRVQuery() { request_->DecrementPendingQueries(); } + +void AresRequest::SRVQuery::OnSRVQueryDoneLocked(void* arg, int status, + int /*timeouts*/, + unsigned char* abuf, + int alen) { + std::unique_ptr q(static_cast(arg)); + AresRequest* request = q->request_; if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG( - "request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + GRPC_CARES_TRACE_LOG("request:%p OnSRVQueryDoneLocked name=%s ARES_SUCCESS", + request, request->srv_qname().c_str()); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); - GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r, + GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", request, parse_status); if (parse_status == ARES_SUCCESS) { for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { - if (grpc_ares_query_ipv6()) { - grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( - r, srv_it->host, htons(srv_it->port), true /* is_balancer */, - "AAAA"); - ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET6, - on_hostbyname_done_locked, hr); + if (AresQueryIPv6()) { + AresRequest::AddressQuery::Create( + request, std::string(srv_it->host), htons(srv_it->port), + true /* is_balancer */, AF_INET6 /* address_family */); } - grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( - r, srv_it->host, htons(srv_it->port), true /* is_balancer */, "A"); - ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET, - on_hostbyname_done_locked, hr); - grpc_ares_notify_on_event_locked(r->ev_driver); + AresRequest::AddressQuery::Create( + request, std::string(srv_it->host), htons(srv_it->port), + true /* is_balancer */, AF_INET /* address_family */); + request->NotifyOnEventLocked(); } } if (reply != nullptr) { @@ -753,474 +206,756 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, } } else { std::string error_msg = absl::StrFormat( - "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(), - ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r, + "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", + request->srv_qname(), ares_strerror(status)); + GRPC_CARES_TRACE_LOG("request:%p OnSRVQueryDoneLocked: %s", request, error_msg.c_str()); grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - r->error = grpc_error_add_child(error, r->error); + request->error_ = grpc_error_add_child(error, request->error_); } - delete q; } -static const char g_service_config_attribute_prefix[] = "grpc_config="; +void AresRequest::TXTQuery::Create(AresRequest* request) { + TXTQuery* q = new TXTQuery(request); + // note that ares_search can't be invoked from the ctor because it + // can run it's callback inline and invoke the dtor + ares_search(request->channel_, request->txt_qname().c_str(), ns_c_in, + ns_t_txt, OnTXTDoneLocked, q); +} + +AresRequest::TXTQuery::TXTQuery(AresRequest* request) : request_(request) { + ++request_->pending_queries_; +} + +AresRequest::TXTQuery::~TXTQuery() { request_->DecrementPendingQueries(); } -static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* buf, int len) { - GrpcAresQuery* q = static_cast(arg); - std::unique_ptr query_deleter(q); - grpc_ares_request* r = q->parent_request(); - const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; +void AresRequest::TXTQuery::OnTXTDoneLocked(void* arg, int status, + int /*timeouts*/, + unsigned char* buf, int len) { + std::unique_ptr q(static_cast(arg)); + AresRequest* request = q->request_; + const absl::string_view kServiceConfigAttributePrefix = "grpc_config="; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; - grpc_error* error = GRPC_ERROR_NONE; - if (status != ARES_SUCCESS) goto fail; - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + auto on_error = [request, status](const char* error_category) { + std::string error_msg = absl::StrFormat( + "%s: c-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", + error_category, request->txt_qname(), ares_strerror(status)); + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); + GRPC_CARES_TRACE_LOG("request:%p OnTXTDoneLocked %s", request, + error_msg.c_str()); + request->error_ = grpc_error_add_child(error, request->error_); + }; + if (status != ARES_SUCCESS) { + on_error("TXT resolution error"); + return; + } + GRPC_CARES_TRACE_LOG("request:%p OnTXTDoneLocked name=%s ARES_SUCCESS", + request, request->txt_qname().c_str()); status = ares_parse_txt_reply_ext(buf, len, &reply); - if (status != ARES_SUCCESS) goto fail; + if (status != ARES_SUCCESS) { + on_error("ares_parse_txt_reply_ext error"); + return; + } // Find service config in TXT record. for (result = reply; result != nullptr; result = result->next) { + absl::string_view result_view(reinterpret_cast(result->txt), + result->length); if (result->record_start && - memcmp(result->txt, g_service_config_attribute_prefix, prefix_len) == - 0) { + absl::StartsWith(result_view, kServiceConfigAttributePrefix)) { break; } } // Found a service config record. - if (result != nullptr) { - size_t service_config_len = result->length - prefix_len; - *r->service_config_json_out = - static_cast(gpr_malloc(service_config_len + 1)); - memcpy(*r->service_config_json_out, result->txt + prefix_len, - service_config_len); - for (result = result->next; result != nullptr && !result->record_start; - result = result->next) { - *r->service_config_json_out = static_cast( - gpr_realloc(*r->service_config_json_out, - service_config_len + result->length + 1)); - memcpy(*r->service_config_json_out + service_config_len, result->txt, - result->length); - service_config_len += result->length; - } - (*r->service_config_json_out)[service_config_len] = '\0'; - GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, - *r->service_config_json_out); - } + std::vector service_config_parts = { + absl::string_view(reinterpret_cast(result->txt), + result->length) + .substr(kServiceConfigAttributePrefix.size())}; + for (result = result->next; result != nullptr && !result->record_start; + result = result->next) { + service_config_parts.emplace_back( + reinterpret_cast(result->txt), result->length); + } + *request->service_config_json_out_ = absl::StrJoin(service_config_parts, ""); + GRPC_CARES_TRACE_LOG("request:%p found service config: %s", request, + request->service_config_json_out_->value().c_str()); // Clean up. ares_free_data(reply); - return; -fail: - std::string error_msg = - absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", - q->name(), ares_strerror(status)); - error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg.c_str()); - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r, - error_msg.c_str()); - r->error = grpc_error_add_child(error, r->error); } -void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( - grpc_ares_request* r, const char* dns_server, const char* name, - const char* default_port, grpc_pollset_set* interested_parties, - int query_timeout_ms, - std::shared_ptr work_serializer) { - grpc_error* error = GRPC_ERROR_NONE; - grpc_ares_hostbyname_request* hr = nullptr; - /* parse name, splitting it into host and port parts */ - std::string host; - std::string port; - grpc_core::SplitHostPort(name, &host, &port); - if (host.empty()) { - error = grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), - GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - goto error_cleanup; - } else if (port.empty()) { +AresRequest::FdNode::FdNode(RefCountedPtr request, + std::unique_ptr grpc_polled_fd) + : request_(std::move(request)), grpc_polled_fd_(std::move(grpc_polled_fd)) { + GRPC_CARES_TRACE_LOG("request:%p new fd: %s", request_.get(), + grpc_polled_fd_->GetName()); + GRPC_CLOSURE_INIT(&read_closure_, AresRequest::FdNode::OnReadable, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&write_closure_, AresRequest::FdNode::OnWritable, this, + grpc_schedule_on_exec_ctx); +} + +AresRequest::FdNode::~FdNode() { + GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", request_.get(), + grpc_polled_fd_->GetName()); + GPR_ASSERT(!readable_registered_); + GPR_ASSERT(!writable_registered_); + GPR_ASSERT(shutdown_); +} + +void AresRequest::FdNode::MaybeRegisterForOnReadableLocked() { + if (!readable_registered_) { + GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", request_.get(), + grpc_polled_fd_->GetName()); + grpc_polled_fd_->RegisterForOnReadableLocked(&read_closure_); + readable_registered_ = true; + } +} + +void AresRequest::FdNode::MaybeRegisterForOnWritableLocked() { + if (!writable_registered_) { + GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", request_.get(), + grpc_polled_fd_->GetName()); + grpc_polled_fd_->RegisterForOnWriteableLocked(&write_closure_); + writable_registered_ = true; + } +} + +void AresRequest::FdNode::MaybeShutdownLocked(absl::string_view reason) { + if (!shutdown_) { + GRPC_CARES_TRACE_LOG("request:%p shutdown on: %s", request_.get(), + grpc_polled_fd_->GetName()); + grpc_polled_fd_->ShutdownLocked( + GRPC_ERROR_CREATE_FROM_COPIED_STRING(std::string(reason).c_str())); + shutdown_ = true; + } +} + +bool AresRequest::FdNode::IsActiveLocked() { + return readable_registered_ || writable_registered_; +} + +void AresRequest::FdNode::OnReadableLocked(grpc_error* error) { + GPR_ASSERT(readable_registered_); + readable_registered_ = false; + const ares_socket_t as = grpc_polled_fd_->GetWrappedAresSocketLocked(); + GRPC_CARES_TRACE_LOG("request:%p readable on: %s, error: %s", request_.get(), + grpc_polled_fd_->GetName(), grpc_error_string(error)); + if (error == GRPC_ERROR_NONE) { + do { + ares_process_fd(request_->channel_, as, ARES_SOCKET_BAD); + } while (grpc_polled_fd_->IsFdStillReadableLocked()); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this request will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // request will be cleaned up in the follwing + // NotifyOnEventLocked(). + ares_cancel(request_->channel_); + } + request_->NotifyOnEventLocked(); + GRPC_ERROR_UNREF(error); +} + +void AresRequest::FdNode::OnReadable(void* arg, grpc_error* error) { + AresRequest::FdNode* fdn = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + fdn->request_->work_serializer_->Run( + [fdn, error]() { fdn->OnReadableLocked(error); }, DEBUG_LOCATION); +} + +void AresRequest::FdNode::OnWritableLocked(grpc_error* error) { + GPR_ASSERT(writable_registered_); + writable_registered_ = false; + const ares_socket_t as = grpc_polled_fd_->GetWrappedAresSocketLocked(); + GRPC_CARES_TRACE_LOG("request:%p writable on %s, error: %s", request_.get(), + grpc_polled_fd_->GetName(), grpc_error_string(error)); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(request_->channel_, ARES_SOCKET_BAD, as); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this request will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // request will be cleaned up in the follwing NotifyOnEventLocked(). + ares_cancel(request_->channel_); + } + request_->NotifyOnEventLocked(); + GRPC_ERROR_UNREF(error); +} + +void AresRequest::FdNode::OnWritable(void* arg, grpc_error* error) { + AresRequest::FdNode* fdn = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + fdn->request_->work_serializer_->Run( + [fdn, error]() { fdn->OnWritableLocked(error); }, DEBUG_LOCATION); +} + +OrphanablePtr AresRequest::Create( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, + std::unique_ptr* addrs, + std::unique_ptr* balancer_addrs, + absl::optional* service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) { + OrphanablePtr request = MakeOrphanable( + addrs, balancer_addrs, service_config_json, interested_parties, + query_timeout_ms, on_done, work_serializer); + GRPC_CARES_TRACE_LOG( + "request:%p c-ares AresRequest::Create name=%s, " + "default_port=%s timeout in %d ms", + request.get(), std::string(name).c_str(), + std::string(default_port).c_str(), query_timeout_ms); + // pretend we have 1 query to avoid calling on_done before initialization is + // done + request->pending_queries_ = 1; + // Initialize overall DNS resolution timeout alarm + grpc_millis timeout = + request->query_timeout_ms_ == 0 + ? GRPC_MILLIS_INF_FUTURE + : request->query_timeout_ms_ + ExecCtx::Get()->Now(); + GRPC_CLOSURE_INIT(&request->on_timeout_locked_, AresRequest::OnTimeout, + request.get(), grpc_schedule_on_exec_ctx); + request->Ref().release(); // owned by timer callback + grpc_timer_init(&request->query_timeout_, timeout, + &request->on_timeout_locked_); + // Initialize the backup poll alarm + grpc_millis next_ares_backup_poll_alarm = + request->CalculateNextAresBackupPollAlarm(); + GRPC_CLOSURE_INIT(&request->on_ares_backup_poll_alarm_locked_, + AresRequest::OnAresBackupPollAlarm, request.get(), + grpc_schedule_on_exec_ctx); + request->Ref().release(); // owned by timer callback + grpc_timer_init(&request->ares_backup_poll_alarm_, + next_ares_backup_poll_alarm, + &request->on_ares_backup_poll_alarm_locked_); + // parse name, splitting it into host and port parts + std::string target_port_str; + SplitHostPort(name, &request->target_host_, &target_port_str); + if (request->target_host_.empty()) { + request->error_ = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_COPIED_STRING("unparseable host:port"), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(std::string(name).c_str())); + request->DecrementPendingQueries(); + return request; + } else if (target_port_str.empty()) { if (default_port == nullptr) { - error = grpc_error_set_str( + request->error_ = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), - GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - goto error_cleanup; + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(std::string(name).c_str())); + request->DecrementPendingQueries(); + return request; + } + target_port_str = std::string(default_port); + } + request->target_port_ = grpc_strhtons(target_port_str.c_str()); + // Don't query for SRV and TXT records if the target is "localhost", so + // as to cut down on lookups over the network, especially in tests: + // https://github.com/grpc/proposal/pull/79 + if (request->target_host_ == "localhost") { + request->balancer_addresses_out_ = nullptr; + request->service_config_json_out_ = nullptr; + } + // Early out if the target is an ipv4 or ipv6 literal. + if (request->ResolveAsIPLiteralLocked()) { + request->DecrementPendingQueries(); + return request; + } + // Early out if the target is localhost and we're on Windows. + if (request->MaybeResolveLocalHostManuallyLocked()) { + request->DecrementPendingQueries(); + return request; + } + // Look up name using c-ares lib. + request->ContinueAfterCheckLocalhostAndIPLiteralsLocked(dns_server); + request->DecrementPendingQueries(); + return request; +} + +AresRequest::AresRequest( + std::unique_ptr* addresses_out, + std::unique_ptr* balancer_addresses_out, + absl::optional* service_config_json_out, + grpc_pollset_set* pollset_set, int query_timeout_ms, + std::function on_done, + std::shared_ptr work_serializer) + : addresses_out_(addresses_out), + balancer_addresses_out_(balancer_addresses_out), + service_config_json_out_(service_config_json_out), + pollset_set_(pollset_set), + work_serializer_(std::move(work_serializer)), + polled_fd_factory_(NewGrpcPolledFdFactory(work_serializer_)), + query_timeout_ms_(query_timeout_ms), + on_done_(std::move(on_done)) {} + +AresRequest::~AresRequest() { + if (channel_ != nullptr) { + ares_destroy(channel_); + } +} + +void AresRequest::Orphan() { + ShutdownIOLocked("request orphaned"); + Unref(); +} + +// ares_library_init and ares_library_cleanup are currently no-op except under +// Windows. Calling them may cause race conditions when other parts of the +// binary calls these functions concurrently. +#ifdef GPR_WINDOWS +grpc_error* AresRequest::Init(void) { + int status = ares_library_init(ARES_LIB_INIT_ALL); + if (status != ARES_SUCCESS) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("ares_library_init failed: ", ares_strerror(status)) + .c_str()); + } + return GRPC_ERROR_NONE; +} + +void AresRequest::Shutdown(void) { ares_library_cleanup(); } +#else +grpc_error* AresRequest::Init(void) { return GRPC_ERROR_NONE; } +void AresRequest::Shutdown(void) {} +#endif // GPR_WINDOWS + +void AresRequest::ShutdownIOLocked(absl::string_view reason) { + shutting_down_ = true; + for (auto& p : fds_) { + p.second->MaybeShutdownLocked( + absl::StrCat("AresRequest::ShutdownIOLocked reason: ", reason)); + } +} + +grpc_millis AresRequest::CalculateNextAresBackupPollAlarm() const { + // An alternative here could be to use ares_timeout to try to be more + // accurate, but that would require using "struct timeval"'s, which just makes + // things a bit more complicated. So just poll every second, as suggested + // by the c-ares code comments. + grpc_millis kMsUntilNextAresBackupPollAlarm = 1000; + GRPC_CARES_TRACE_LOG( + "request:%p next ares process poll time in " + "%" PRId64 " ms", + this, kMsUntilNextAresBackupPollAlarm); + return kMsUntilNextAresBackupPollAlarm + ExecCtx::Get()->Now(); +} + +void AresRequest::OnTimeoutLocked(grpc_error* error) { + GRPC_CARES_TRACE_LOG( + "request:%p OnTimeoutLocked. shutting_down_=%d. " + "err=%s", + this, shutting_down_, grpc_error_string(error)); + // TODO(apolcyn): always run ShutdownIOLocked since it's idempotent? + if (!shutting_down_ && error == GRPC_ERROR_NONE) { + ShutdownIOLocked("request timeout"); + } + Unref(); + GRPC_ERROR_UNREF(error); +} + +void AresRequest::OnTimeout(void* arg, grpc_error* error) { + AresRequest* request = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda + request->work_serializer_->Run( + [request, error]() { request->OnTimeoutLocked(error); }, DEBUG_LOCATION); +} + +// In case of non-responsive DNS servers, dropped packets, etc., c-ares has +// intelligent timeout and retry logic, which we can take advantage of by +// polling ares_process_fd on time intervals. Overall, the c-ares library is +// meant to be called into and given a chance to proceed name resolution: +// a) when fd events happen +// b) when some time has passed without fd events having happened +// For the latter, we use this backup poller. Also see +// https://github.com/grpc/grpc/pull/17688 description for more details. +void AresRequest::OnAresBackupPollAlarmLocked(grpc_error* error) { + GRPC_CARES_TRACE_LOG( + "request:%p OnAresBackupPollAlarmLocked. " + "shutting_down_=%d. " + "err=%s", + this, shutting_down_, grpc_error_string(error)); + if (!shutting_down_ && error == GRPC_ERROR_NONE) { + for (auto& p : fds_) { + AresRequest::FdNode* fdn = p.second.get(); + if (!fdn->shutdown()) { + GRPC_CARES_TRACE_LOG( + "request:%p OnAresBackupPollAlarmLocked; " + "ares_process_fd. fd=%s", + this, fdn->grpc_polled_fd()->GetName()); + ares_socket_t as = fdn->grpc_polled_fd()->GetWrappedAresSocketLocked(); + ares_process_fd(channel_, as, as); + } + } + // the work done in ares_process_fd might have set shutting_down_ = true + if (!shutting_down_) { + grpc_millis next_ares_backup_poll_alarm = + CalculateNextAresBackupPollAlarm(); + Ref().release(); // owned by timer callback + grpc_timer_init(&ares_backup_poll_alarm_, next_ares_backup_poll_alarm, + &on_ares_backup_poll_alarm_locked_); + } + NotifyOnEventLocked(); + } + Unref(); + GRPC_ERROR_UNREF(error); +} + +void AresRequest::OnAresBackupPollAlarm(void* arg, grpc_error* error) { + AresRequest* request = static_cast(arg); + GRPC_ERROR_REF(error); + request->work_serializer_->Run( + [request, error]() { request->OnAresBackupPollAlarmLocked(error); }, + DEBUG_LOCATION); +} + +// Get the file descriptors used by the request's ares channel, register +// I/O readable/writable callbacks with these filedescriptors. +void AresRequest::NotifyOnEventLocked() { + // prevent unrefs in FdNode dtors from prematurely destroying this object + RefCountedPtr self_ref = Ref(); + std::map> active_fds; + if (!shutting_down_) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + ares_socket_t s = socks[i]; + if (fds_[s] == nullptr) { + fds_[s] = absl::make_unique( + Ref(), std::unique_ptr( + polled_fd_factory_->NewGrpcPolledFdLocked( + s, pollset_set_, work_serializer_))); + } + auto p = fds_.find(s); + if (ARES_GETSOCK_READABLE(socks_bitmask, i)) { + p->second->MaybeRegisterForOnReadableLocked(); + } + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + p->second->MaybeRegisterForOnWritableLocked(); + } + active_fds[p->first] = std::move(p->second); + fds_.erase(p); + } + } + } + // Any remaining fds in fds_ were not returned by ares_getsock() and + // are therefore no longer in use, so they can be shut down and removed from + // the list. + for (auto& p : fds_) { + p.second->MaybeShutdownLocked("c-ares fd shutdown"); + if (p.second->IsActiveLocked()) { + active_fds[p.first] = std::move(p.second); } - port = default_port; } - error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, - query_timeout_ms, - std::move(work_serializer), r); - if (error != GRPC_ERROR_NONE) goto error_cleanup; + fds_ = std::move(active_fds); +} + +void AresRequest::ContinueAfterCheckLocalhostAndIPLiteralsLocked( + absl::string_view dns_server) { + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&channel_, &opts, ARES_OPT_FLAGS); + internal::AresTestOnlyInjectConfig(channel_); + if (status != ARES_SUCCESS) { + error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Failed to init ares channel. C-ares error: ", + ares_strerror(status)) + .c_str()); + return; + } + polled_fd_factory_->ConfigureAresChannelLocked(channel_); // If dns_server is specified, use it. - if (dns_server != nullptr && dns_server[0] != '\0') { - GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, dns_server); + if (!dns_server.empty()) { + GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", this, + std::string(dns_server).c_str()); + struct ares_addr_port_node dns_server_addr; + memset(&dns_server_addr, 0, sizeof(dns_server_addr)); grpc_resolved_address addr; if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) { - r->dns_server_addr.family = AF_INET; + dns_server_addr.family = AF_INET; struct sockaddr_in* in = reinterpret_cast(addr.addr); - memcpy(&r->dns_server_addr.addr.addr4, &in->sin_addr, + memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr)); - r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); - r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); } else if (grpc_parse_ipv6_hostport(dns_server, &addr, false /* log_errors */)) { - r->dns_server_addr.family = AF_INET6; + dns_server_addr.family = AF_INET6; struct sockaddr_in6* in6 = reinterpret_cast(addr.addr); - memcpy(&r->dns_server_addr.addr.addr6, &in6->sin6_addr, + memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr, sizeof(struct in6_addr)); - r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); - r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); } else { - error = grpc_error_set_str( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"), - GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - goto error_cleanup; + error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("cannot parse DNS server ip address: ", dns_server) + .c_str()); + return; } - int status = - ares_set_servers_ports(r->ev_driver->channel, &r->dns_server_addr); + int status = ares_set_servers_ports(channel_, &dns_server_addr); if (status != ARES_SUCCESS) { - error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( + error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("C-ares status is not ARES_SUCCESS: ", ares_strerror(status)) .c_str()); - goto error_cleanup; + return; } } - r->pending_queries = 1; - if (grpc_ares_query_ipv6()) { - hr = create_hostbyname_request_locked(r, host.c_str(), - grpc_strhtons(port.c_str()), - /*is_balancer=*/false, "AAAA"); - ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET6, - on_hostbyname_done_locked, hr); - } - hr = create_hostbyname_request_locked(r, host.c_str(), - grpc_strhtons(port.c_str()), - /*is_balancer=*/false, "A"); - ares_gethostbyname(r->ev_driver->channel, hr->host, AF_INET, - on_hostbyname_done_locked, hr); - if (r->balancer_addresses_out != nullptr) { - /* Query the SRV record */ - std::string service_name = absl::StrCat("_grpclb._tcp.", host); - GrpcAresQuery* srv_query = new GrpcAresQuery(r, service_name); - ares_query(r->ev_driver->channel, service_name.c_str(), ns_c_in, ns_t_srv, - on_srv_query_done_locked, srv_query); - } - if (r->service_config_json_out != nullptr) { - std::string config_name = absl::StrCat("_grpc_config.", host); - GrpcAresQuery* txt_query = new GrpcAresQuery(r, config_name); - ares_search(r->ev_driver->channel, config_name.c_str(), ns_c_in, ns_t_txt, - on_txt_done_locked, txt_query); + if (AresQueryIPv6()) { + AresRequest::AddressQuery::Create(this, target_host_, target_port_, + false /* is_balancer */, + AF_INET6 /* address_family */); + } + AresRequest::AddressQuery::Create(this, target_host_, target_port_, + false /* is_balancer */, + AF_INET /* address_family */); + if (balancer_addresses_out_ != nullptr) { + AresRequest::SRVQuery::Create(this); + } + if (service_config_json_out_ != nullptr) { + AresRequest::TXTQuery::Create(this); + } + NotifyOnEventLocked(); +} + +void AresRequest::DecrementPendingQueries() { + if (--pending_queries_ == 0) { + GRPC_CARES_TRACE_LOG("request: %p queries complete", this); + ShutdownIOLocked("DNS queries finished"); + grpc_timer_cancel(&query_timeout_); + grpc_timer_cancel(&ares_backup_poll_alarm_); + ServerAddressList* addresses = addresses_out_->get(); + if (addresses != nullptr) { + AddressSortingSort(this, addresses, "service-addresses"); + GRPC_ERROR_UNREF(error_); + error_ = GRPC_ERROR_NONE; + // TODO(apolcyn): allow c-ares to return a service config + // with no addresses along side it + } + if (balancer_addresses_out_ != nullptr) { + ServerAddressList* balancer_addresses = balancer_addresses_out_->get(); + if (balancer_addresses != nullptr) { + AddressSortingSort(this, balancer_addresses, "grpclb-addresses"); + } + } + grpc_error* error = error_; + std::function on_done = on_done_; + // note it's safe to schedule this inline because we're currently + // holding the work serializer + work_serializer_->Run([on_done, error]() { on_done(error); }, + DEBUG_LOCATION); } - grpc_ares_ev_driver_start_locked(r->ev_driver); - grpc_ares_request_unref_locked(r); - return; - -error_cleanup: - grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error); } -static bool inner_resolve_as_ip_literal_locked( - const char* name, const char* default_port, - std::unique_ptr* addrs, std::string* host, - std::string* port, std::string* hostport) { - if (!grpc_core::SplitHostPort(name, host, port)) { - gpr_log(GPR_ERROR, - "Failed to parse %s to host:port while attempting to resolve as ip " - "literal.", - name); - return false; - } - if (port->empty()) { - if (default_port == nullptr) { - gpr_log(GPR_ERROR, - "No port or default port for %s while attempting to resolve as " - "ip literal.", - name); - return false; - } - *port = default_port; - } +bool AresRequest::ResolveAsIPLiteralLocked() { grpc_resolved_address addr; - *hostport = grpc_core::JoinHostPort(*host, atoi(port->c_str())); - if (grpc_parse_ipv4_hostport(hostport->c_str(), &addr, + std::string hostport = JoinHostPort(target_host_, ntohs(target_port_)); + if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr, false /* log errors */) || - grpc_parse_ipv6_hostport(hostport->c_str(), &addr, + grpc_parse_ipv6_hostport(hostport.c_str(), &addr, false /* log errors */)) { - GPR_ASSERT(*addrs == nullptr); - *addrs = absl::make_unique(); - (*addrs)->emplace_back(addr.addr, addr.len, nullptr /* args */); + GPR_ASSERT(*addresses_out_ == nullptr); + *addresses_out_ = absl::make_unique(); + (*addresses_out_)->emplace_back(addr.addr, addr.len, nullptr /* args */); return true; } return false; } -static bool resolve_as_ip_literal_locked( - const char* name, const char* default_port, - std::unique_ptr* addrs) { - std::string host; - std::string port; - std::string hostport; - bool out = inner_resolve_as_ip_literal_locked(name, default_port, addrs, - &host, &port, &hostport); - return out; -} - -static bool target_matches_localhost_inner(const char* name, std::string* host, - std::string* port) { - if (!grpc_core::SplitHostPort(name, host, port)) { - gpr_log(GPR_ERROR, "Unable to split host and port for name: %s", name); - return false; - } - return gpr_stricmp(host->c_str(), "localhost") == 0; -} - -static bool target_matches_localhost(const char* name) { - std::string host; - std::string port; - return target_matches_localhost_inner(name, &host, &port); -} - #ifdef GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY -static bool inner_maybe_resolve_localhost_manually_locked( - const grpc_ares_request* r, const char* name, const char* default_port, - std::unique_ptr* addrs, std::string* host, - std::string* port) { - grpc_core::SplitHostPort(name, host, port); - if (host->empty()) { - gpr_log(GPR_ERROR, - "Failed to parse %s into host:port during manual localhost " - "resolution check.", - name); - return false; - } - if (port->empty()) { - if (default_port == nullptr) { - gpr_log(GPR_ERROR, - "No port or default port for %s during manual localhost " - "resolution check.", - name); - return false; - } - *port = default_port; - } - if (gpr_stricmp(host->c_str(), "localhost") == 0) { - GPR_ASSERT(*addrs == nullptr); - *addrs = absl::make_unique(); - uint16_t numeric_port = grpc_strhtons(port->c_str()); +bool AresRequest::MaybeResolveLocalHostManuallyLocked() { + if (target_host_ == "localhost") { + GPR_ASSERT(*addresses_out_ == nullptr); + *addresses_out_ = absl::make_unique(); // Append the ipv6 loopback address. struct sockaddr_in6 ipv6_loopback_addr; memset(&ipv6_loopback_addr, 0, sizeof(ipv6_loopback_addr)); ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; ipv6_loopback_addr.sin6_family = AF_INET6; - ipv6_loopback_addr.sin6_port = numeric_port; - (*addrs)->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), - nullptr /* args */); + ipv6_loopback_addr.sin6_port = target_port_; + (*addresses_out_) + ->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + nullptr /* args */); // Append the ipv4 loopback address. struct sockaddr_in ipv4_loopback_addr; memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); ((char*)&ipv4_loopback_addr.sin_addr)[0] = 0x7f; ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; ipv4_loopback_addr.sin_family = AF_INET; - ipv4_loopback_addr.sin_port = numeric_port; - (*addrs)->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), - nullptr /* args */); + ipv4_loopback_addr.sin_port = target_port_; + (*addresses_out_) + ->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + nullptr /* args */); // Let the address sorter figure out which one should be tried first. - grpc_cares_wrapper_address_sorting_sort(r, addrs->get()); return true; } return false; } - -static bool grpc_ares_maybe_resolve_localhost_manually_locked( - const grpc_ares_request* r, const char* name, const char* default_port, - std::unique_ptr* addrs) { - std::string host; - std::string port; - return inner_maybe_resolve_localhost_manually_locked(r, name, default_port, - addrs, &host, &port); -} #else /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -static bool grpc_ares_maybe_resolve_localhost_manually_locked( - const grpc_ares_request* /*r*/, const char* /*name*/, - const char* /*default_port*/, - std::unique_ptr* /*addrs*/) { - return false; -} +bool AresRequest::MaybeResolveLocalHostManuallyLocked() { return false; } #endif /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - std::unique_ptr* addrs, - std::unique_ptr* balancer_addrs, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) { - grpc_ares_request* r = - static_cast(gpr_zalloc(sizeof(grpc_ares_request))); - r->ev_driver = nullptr; - r->on_done = on_done; - r->addresses_out = addrs; - r->balancer_addresses_out = balancer_addrs; - r->service_config_json_out = service_config_json; - r->error = GRPC_ERROR_NONE; - r->pending_queries = 0; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, " - "default_port=%s", - r, name, default_port); - // Early out if the target is an ipv4 or ipv6 literal. - if (resolve_as_ip_literal_locked(name, default_port, addrs)) { - grpc_ares_complete_request_locked(r); - return r; - } - // Early out if the target is localhost and we're on Windows. - if (grpc_ares_maybe_resolve_localhost_manually_locked(r, name, default_port, - addrs)) { - grpc_ares_complete_request_locked(r); - return r; - } - // Don't query for SRV and TXT records if the target is "localhost", so - // as to cut down on lookups over the network, especially in tests: - // https://github.com/grpc/proposal/pull/79 - if (target_matches_localhost(name)) { - r->balancer_addresses_out = nullptr; - r->service_config_json_out = nullptr; +namespace { + +// A GrpcResolveAddressAresRequest maintains the state need to +// carry out a single asynchronous grpc_resolve_address call. +class GrpcResolveAddressAresRequest { + public: + static void GrpcResolveAddressAresImpl(const char* name, + const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addrs) { + GrpcResolveAddressAresRequest* request = new GrpcResolveAddressAresRequest( + name, default_port, interested_parties, on_done, addrs); + auto on_resolution_done = [request](grpc_error* error) { + request->OnDNSLookupDoneLocked(error); + }; + request->work_serializer_->Run( + [request, on_resolution_done]() { + request->ares_request_ = LookupAresLocked( + "" /* dns_server */, request->name_, request->default_port_, + request->interested_parties_, on_resolution_done, + &request->addresses_, nullptr /* balancer_addresses */, + nullptr /* service_config_json */, + GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, + request->work_serializer_); + }, + DEBUG_LOCATION); } - // Look up name using c-ares lib. - grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( - r, dns_server, name, default_port, interested_parties, query_timeout_ms, - std::move(work_serializer)); - return r; -} -grpc_ares_request* (*grpc_dns_lookup_ares_locked)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - std::unique_ptr* addrs, - std::unique_ptr* balancer_addrs, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) = - grpc_dns_lookup_ares_locked_impl; - -static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { - GPR_ASSERT(r != nullptr); - if (r->ev_driver != nullptr) { - grpc_ares_ev_driver_shutdown_locked(r->ev_driver); + private: + explicit GrpcResolveAddressAresRequest(const char* name, + const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addrs_out) + : name_(name), + default_port_(default_port), + interested_parties_(interested_parties), + on_resolve_address_done_(on_done), + addrs_out_(addrs_out) {} + + void OnDNSLookupDoneLocked(grpc_error* error) { + grpc_resolved_addresses** resolved_addresses = addrs_out_; + if (addresses_ == nullptr || addresses_->empty()) { + *resolved_addresses = nullptr; + } else { + *resolved_addresses = static_cast( + gpr_zalloc(sizeof(grpc_resolved_addresses))); + (*resolved_addresses)->naddrs = addresses_->size(); + (*resolved_addresses)->addrs = + static_cast(gpr_zalloc( + sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); + for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { + memcpy(&(*resolved_addresses)->addrs[i], &(*addresses_)[i].address(), + sizeof(grpc_resolved_address)); + } + } + ExecCtx::Run(DEBUG_LOCATION, on_resolve_address_done_, error); + delete this; + } + + // work_serializer that queries and related callbacks run under + std::shared_ptr work_serializer_ = + std::make_shared(); + // target name + const char* name_; + // default port to use if none is specified + const char* default_port_; + // pollset_set to be driven by + grpc_pollset_set* interested_parties_; + // closure to call when the resolve_address_ares request completes + grpc_closure* on_resolve_address_done_; + // the pointer to receive the resolved addresses + grpc_resolved_addresses** addrs_out_; + // currently resolving addresses + std::unique_ptr addresses_; + // underlying ares_request that the query is performed on + OrphanablePtr ares_request_; +}; + +} // namespace + +void (*ResolveAddressAres)(const char* name, const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addrs) = + GrpcResolveAddressAresRequest::GrpcResolveAddressAresImpl; + +OrphanablePtr (*LookupAresLocked)( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, + std::unique_ptr* addrs, + std::unique_ptr* balancer_addrs, + absl::optional* service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) = AresRequest::Create; + +namespace { + +void LogAddressSortingList(const AresRequest* request, + const ServerAddressList& addresses, + const char* input_output_str) { + for (size_t i = 0; i < addresses.size(); i++) { + std::string addr_str = + grpc_sockaddr_to_string(&addresses[i].address(), true); + gpr_log(GPR_INFO, + "(c-ares resolver) request:%p c-ares address sorting: %s[%" PRIuPTR + "]=%s", + request, input_output_str, i, addr_str.c_str()); } } -void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) = - grpc_cancel_ares_request_locked_impl; +} // namespace -// ares_library_init and ares_library_cleanup are currently no-op except under -// Windows. Calling them may cause race conditions when other parts of the -// binary calls these functions concurrently. -#ifdef GPR_WINDOWS -grpc_error* grpc_ares_init(void) { - int status = ares_library_init(ARES_LIB_INIT_ALL); - if (status != ARES_SUCCESS) { - return GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("ares_library_init failed: ", ares_strerror(status)) - .c_str()); +void AddressSortingSort(const AresRequest* request, + ServerAddressList* addresses, + const std::string& logging_prefix) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { + LogAddressSortingList(request, *addresses, + absl::StrCat(logging_prefix, "-input").c_str()); + } + std::vector sortables; + sortables.resize(addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sortables[i].user_data = &(*addresses)[i]; + memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, + (*addresses)[i].address().len); + sortables[i].dest_addr.len = (*addresses)[i].address().len; + } + address_sorting_rfc_6724_sort(sortables.data(), addresses->size()); + ServerAddressList sorted; + sorted.reserve(addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sorted.emplace_back(*static_cast(sortables[i].user_data)); + } + *addresses = std::move(sorted); + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) { + LogAddressSortingList(request, *addresses, + absl::StrCat(logging_prefix, "-output").c_str()); } - return GRPC_ERROR_NONE; } -void grpc_ares_cleanup(void) { ares_library_cleanup(); } -#else -grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; } -void grpc_ares_cleanup(void) {} -#endif // GPR_WINDOWS +namespace internal { -/* - * grpc_resolve_address_ares related structs and functions - */ +namespace { -typedef struct grpc_resolve_address_ares_request { - /* work_serializer that queries and related callbacks run under */ - std::shared_ptr work_serializer; - /** the pointer to receive the resolved addresses */ - grpc_resolved_addresses** addrs_out; - /** currently resolving addresses */ - std::unique_ptr addresses; - /** closure to call when the resolve_address_ares request completes */ - grpc_closure* on_resolve_address_done; - /** a closure wrapping on_resolve_address_done, which should be invoked when - the grpc_dns_lookup_ares_locked operation is done. */ - grpc_closure on_dns_lookup_done_locked; - /* target name */ - const char* name; - /* default port to use if none is specified */ - const char* default_port; - /* pollset_set to be driven by */ - grpc_pollset_set* interested_parties; - /* underlying ares_request that the query is performed on */ - grpc_ares_request* ares_request = nullptr; -} grpc_resolve_address_ares_request; - -static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, - grpc_error* error) { - gpr_free(r->ares_request); - grpc_resolved_addresses** resolved_addresses = r->addrs_out; - if (r->addresses == nullptr || r->addresses->empty()) { - *resolved_addresses = nullptr; - } else { - *resolved_addresses = static_cast( - gpr_zalloc(sizeof(grpc_resolved_addresses))); - (*resolved_addresses)->naddrs = r->addresses->size(); - (*resolved_addresses)->addrs = - static_cast(gpr_zalloc( - sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); - for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { - memcpy(&(*resolved_addresses)->addrs[i], &(*r->addresses)[i].address(), - sizeof(grpc_resolved_address)); - } - } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, error); - delete r; -} +void NoopInjectChannelConfig(ares_channel /*channel*/) {} -static void on_dns_lookup_done(void* arg, grpc_error* error) { - grpc_resolve_address_ares_request* r = - static_cast(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, - DEBUG_LOCATION); -} +} // namespace -static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { - grpc_resolve_address_ares_request* r = - static_cast(arg); - GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r, - grpc_schedule_on_exec_ctx); - r->ares_request = grpc_dns_lookup_ares_locked( - nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, - &r->on_dns_lookup_done_locked, &r->addresses, - nullptr /* balancer_addresses */, nullptr /* service_config_json */, - GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->work_serializer); -} +void (*AresTestOnlyInjectConfig)(ares_channel channel) = + NoopInjectChannelConfig; -static void grpc_resolve_address_ares_impl(const char* name, - const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addrs) { - grpc_resolve_address_ares_request* r = - new grpc_resolve_address_ares_request(); - r->work_serializer = std::make_shared(); - r->addrs_out = addrs; - r->on_resolve_address_done = on_done; - r->name = name; - r->default_port = default_port; - r->interested_parties = interested_parties; - r->work_serializer->Run( - [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); }, - DEBUG_LOCATION); -} +} // namespace internal -void (*grpc_resolve_address_ares)( - const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_resolved_addresses** addrs) = grpc_resolve_address_ares_impl; +} // namespace grpc_core #endif /* GRPC_ARES == 1 */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 6c29bb68a47..db9bde12ae6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -23,10 +23,16 @@ #include +#include "absl/strings/str_cat.h" +#include "absl/types/optional.h" + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 @@ -42,61 +48,284 @@ extern grpc_core::TraceFlag grpc_trace_cares_resolver; } \ } while (0) -typedef struct grpc_ares_request grpc_ares_request; - -/* Asynchronously resolve \a name. Use \a default_port if a port isn't - designated in \a name, otherwise use the port in \a name. grpc_ares_init() - must be called at least once before this function. \a on_done may be - called directly in this function without being scheduled with \a exec_ctx, - so it must not try to acquire locks that are being held by the caller. */ -extern void (*grpc_resolve_address_ares)(const char* name, - const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_resolved_addresses** addresses); - -/* Asynchronously resolve \a name. It will try to resolve grpclb SRV records in - addition to the normal address records. For normal address records, it uses - \a default_port if a port isn't designated in \a name, otherwise it uses the - port in \a name. grpc_ares_init() must be called at least once before this - function. \a on_done may be called directly in this function without being - scheduled with \a exec_ctx, so it must not try to acquire locks that are - being held by the caller. The returned grpc_ares_request object is owned - by the caller and it is safe to free after on_done is called back. */ -extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, +namespace grpc_core { + +/// An AresRequest is a handle over a complete name resolution process +/// (A queries, AAAA queries, etc.). An AresRequest is created with a call +/// to LookupAresLocked, and the name resolution process begins when +/// it's created. The name resolution process can be terminated abruptly +/// by invoking \a Orphan. The \a interested_parties parameter must remain +/// alive until either the \a Orphan is invoked, or \a on_done is called, +/// whichever happens first. +class AresRequest final : public InternallyRefCounted { + public: + static OrphanablePtr Create( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, + std::unique_ptr* addrs, + std::unique_ptr* balancer_addrs, + absl::optional* service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer); + + /// Callers should only create an AresRequest via \a LookupAresLocked, this + /// ctor is made public only to help the factory method. + AresRequest( + std::unique_ptr* addresses_out, + std::unique_ptr* balancer_addresses_out, + absl::optional* service_config_json_out, + grpc_pollset_set* pollset_set, int query_timeout_ms, + std::function on_done, + std::shared_ptr work_serializer); + + ~AresRequest() final; + + /// Unref and Cancel the pending request if it's still in flight. Must be + /// called while holding the WorkSerializer that was used to call \a + /// LookupAresLocked. + void Orphan() override; + + /// Initialize the gRPC ares wrapper. Must be called at least once before + /// ResolveAddressAres(). + static grpc_error* Init(void); + + /// Uninitialized the gRPC ares wrapper. If there was more than one previous + /// call to AresInit(), this function uninitializes the gRPC ares + /// wrapper only if it has been called the same number of times as + /// AresInit(). + static void Shutdown(void); + + private: + /// Tracks state needed to perform one A or AAAA lookup with the c-ares lib. + /// Note that \a Create both constructs an AddressQuery object and arranges + /// for it's deletion. + class AddressQuery final { + public: + static void Create(AresRequest* request, const std::string& host, + uint16_t port, bool is_balancer, int address_family); + ~AddressQuery(); + + private: + AddressQuery(AresRequest* request, const std::string& host, uint16_t port, + bool is_balancer, int address_family); + static void OnHostByNameDoneLocked(void* arg, int status, int timeouts, + struct hostent* hostent); + + // the request which spawned this query + AresRequest* request_; + // host to resolve + const std::string host_; + // port to use in resulting socket addresses, in network byte order + const uint16_t port_; + // is it a grpclb address + const bool is_balancer_; + // for logging and errors: the query type ("A" or "AAAA") + const char* qtype_; + // the address family (AF_INET or AF_INET6) + const int address_family_; + }; + + /// Tracks state needed to perform one SRV lookup with the c-ares lib. + /// Note that \a Create both constructs an AddressQuery object and arranges + /// for it's deletion. + class SRVQuery final { + public: + static void Create(AresRequest* request); + ~SRVQuery(); + + private: + explicit SRVQuery(AresRequest* request); + + static void OnSRVQueryDoneLocked(void* arg, int status, int timeouts, + unsigned char* abuf, int alen); + + // the request which spawned this query + AresRequest* request_; + }; + + /// Tracks state needed to perform one TXT lookup with the c-ares lib. + /// Note that \a Create both constructs an AddressQuery object and arranges + /// for it's deletion. + class TXTQuery final { + public: + static void Create(AresRequest* request); + ~TXTQuery(); + + private: + explicit TXTQuery(AresRequest* request); + + static void OnTXTDoneLocked(void* arg, int status, int timeouts, + unsigned char* buf, int len); + + // the request which spawned this query + AresRequest* request_; + }; + + // An FdNode tracks an fd and its relevant state for polling it as + // needed to carry out a c-ares resolution. + class FdNode final { + public: + FdNode(RefCountedPtr request, + std::unique_ptr grpc_polled_fd); + + ~FdNode(); + + void MaybeRegisterForOnReadableLocked(); + + void MaybeRegisterForOnWritableLocked(); + + void MaybeShutdownLocked(absl::string_view reason); + + bool IsActiveLocked(); + + bool shutdown() { return shutdown_; } + + GrpcPolledFd* grpc_polled_fd() { return grpc_polled_fd_.get(); } + + private: + void OnReadableLocked(grpc_error* error); + + static void OnReadable(void* arg, grpc_error* error); + + void OnWritableLocked(grpc_error* error); + + static void OnWritable(void* arg, grpc_error* error); + + RefCountedPtr request_; + // a closure wrapping OnReadableLocked, which should be + // invoked when the fd in this node becomes readable. + grpc_closure read_closure_; + // a closure wrapping OnWritableLocked, which should be + // invoked when the fd in this node becomes writable. + grpc_closure write_closure_; + // wrapped fd that's polled by grpc's poller for the current platform + std::unique_ptr grpc_polled_fd_; + // if the readable closure has been registered + bool readable_registered_ = false; + // if the writable closure has been registered + bool writable_registered_ = false; + // if the fd has been shutdown yet from grpc iomgr perspective + bool shutdown_ = false; + }; + + void ShutdownIOLocked(absl::string_view reason); + + grpc_millis CalculateNextAresBackupPollAlarm() const; + + void OnTimeoutLocked(grpc_error* error); + + static void OnTimeout(void* arg, grpc_error* error); + + void OnAresBackupPollAlarmLocked(grpc_error* error); + + static void OnAresBackupPollAlarm(void* arg, grpc_error* error); + + void NotifyOnEventLocked(); + + void ContinueAfterCheckLocalhostAndIPLiteralsLocked( + absl::string_view dns_server); + + void DecrementPendingQueries(); + + void MaybeCallOnDoneLocked(); + + bool ResolveAsIPLiteralLocked(); + + bool MaybeResolveLocalHostManuallyLocked(); + + std::string srv_qname() const { + return absl::StrCat("_grpclb._tcp.", target_host_); + } + + std::string txt_qname() const { + return absl::StrCat("_grpc_config.", target_host_); + } + + // the host component of the service name to resolve + std::string target_host_; + // the numeric port number to access the service on, stored in + // network byte order + uint16_t target_port_ = 0; + // the pointer to receive the resolved addresses + std::unique_ptr* addresses_out_; + // the pointer to receive the resolved balancer addresses + std::unique_ptr* balancer_addresses_out_; + // the pointer to receive the service config in JSON + absl::optional* service_config_json_out_; + // the ares_channel owned by this request + ares_channel channel_ = nullptr; + // pollset set for driving the IO events of the channel + grpc_pollset_set* pollset_set_; + // work_serializer to synchronize c-ares and I/O callbacks on + std::shared_ptr work_serializer_; + // Number of active DNS queries (one for A, another for AAAA, etc.). + int pending_queries_ = 0; + // the fds that this request is currently using. + std::map> fds_; + // is this request being shut down + bool shutting_down_ = false; + // Owned by the ev_driver. Creates new GrpcPolledFd's + std::unique_ptr polled_fd_factory_; + // query timeout in milliseconds + int query_timeout_ms_; + // alarm to cancel active queries + grpc_timer query_timeout_; + // cancels queries on a timeout + grpc_closure on_timeout_locked_; + // alarm to poll ares_process on in case fd events don't happen + grpc_timer ares_backup_poll_alarm_; + // polls ares_process on a periodic timer + grpc_closure on_ares_backup_poll_alarm_locked_; + // callback to schedule when the request completes, empty means that + // we've already scheduled the callback + std::function on_done_; + // the errors explaining query failures, appended to in query callbacks + grpc_error* error_ = GRPC_ERROR_NONE; +}; + +/// Asynchronously resolve \a name. Use \a default_port if a port isn't +/// designated in \a name, otherwise use the port in \a name. +/// AresInit() must be called at least once before this function. +extern void (*ResolveAddressAres)(const char* name, const char* default_port, + grpc_pollset_set* interested_parties, + grpc_closure* on_done, + grpc_resolved_addresses** addresses); + +/// Asynchronously resolve \a name. It will try to resolve grpclb SRV records in +/// addition to the normal address records if \a balancer_addresses is not +/// nullptr. For normal address records, it uses \a default_port if a port isn't +/// designated in \a name, otherwise it uses the port in \a name. AresInit() +/// must be called at least once before this function. The returned +/// AresRequest is safe to destroy after \a on_done is called back. +/// +/// TODO(apolcyn): as a part of moving to new gRPC DNS API, remove the +/// work_serializer parameter and synchronize internally instead. +extern OrphanablePtr (*LookupAresLocked)( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, + absl::optional* service_config_json, int query_timeout_ms, std::shared_ptr work_serializer); -/* Cancel the pending grpc_ares_request \a request */ -extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); - -/* Initialize gRPC ares wrapper. Must be called at least once before - grpc_resolve_address_ares(). */ -grpc_error* grpc_ares_init(void); +/// Indicates whether or not AAAA queries should be attempted. +/// E.g., return false if ipv6 is known to not be available. +bool AresQueryIPv6(); -/* Uninitialized gRPC ares wrapper. If there was more than one previous call to - grpc_ares_init(), this function uninitializes the gRPC ares wrapper only if - it has been called the same number of times as grpc_ares_init(). */ -void grpc_ares_cleanup(void); +/// Sorts destinations in \a addresses according to RFC 6724. +void AddressSortingSort(const AresRequest* request, + ServerAddressList* addresses, + const std::string& logging_prefix); -/** Schedules the desired callback for request completion - * and destroys the grpc_ares_request */ -void grpc_ares_complete_request_locked(grpc_ares_request* request); +namespace internal { -/* Indicates whether or not AAAA queries should be attempted. */ -/* E.g., return false if ipv6 is known to not be available. */ -bool grpc_ares_query_ipv6(); +/// Exposed in this header for C-core tests only +extern void (*AresTestOnlyInjectConfig)(ares_channel channel); -/* Sorts destinations in lb_addrs according to RFC 6724. */ -void grpc_cares_wrapper_address_sorting_sort( - const grpc_ares_request* request, grpc_core::ServerAddressList* addresses); +} // namespace internal -/* Exposed in this header for C-core tests only */ -extern void (*grpc_ares_test_only_inject_config)(ares_channel channel); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc index 1d4a90fbd90..326fabb4586 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_libuv.cc @@ -28,11 +28,15 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/parse_address.h" -bool grpc_ares_query_ipv6() { +namespace grpc_core { + +bool AresQueryIPv6() { /* The libuv grpc code currently does not have the code to probe for this, * so we assume for now that IPv6 is always available in contexts where this * code will be used. */ return true; } +} // namespace grpc_core + #endif /* GRPC_ARES == 1 && defined(GRPC_UV) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index 23c0fec74f3..f9648276553 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -24,6 +24,10 @@ #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/socket_utils_posix.h" -bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +namespace grpc_core { + +bool AresQueryIPv6() { return grpc_ipv6_loopback_available(); } + +} // namespace grpc_core #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index df11db3624d..84d55841396 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -29,6 +29,10 @@ #include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/socket_windows.h" -bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } +namespace grpc_core { + +bool AresQueryIPv6() { return grpc_ipv6_loopback_available(); } + +} // namespace grpc_core #endif /* GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) */ diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index f79031dd55a..c6e07b3dff3 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -60,15 +60,19 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable test_resolver = {my_resolve_address, nullptr}; -static grpc_ares_request* my_dns_lookup_ares_locked( - const char* /*dns_server*/, const char* addr, const char* /*default_port*/, - grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, +static grpc_core::OrphanablePtr +my_dns_lookup_ares_locked( + absl::string_view /*dns_server*/, absl::string_view addr, + absl::string_view /*default_port*/, + grpc_pollset_set* /*interested_parties*/, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - char** /*service_config_json*/, int /*query_timeout_ms*/, - std::shared_ptr /*combiner*/) { // NOLINT + absl::optional* /*service_config_json*/, + int /*query_timeout_ms*/, + std::shared_ptr work_serializer) { // NOLINT gpr_mu_lock(&g_mu); - GPR_ASSERT(0 == strcmp("test", addr)); + GPR_ASSERT("test" == addr); grpc_error* error = GRPC_ERROR_NONE; if (g_fail_resolution) { g_fail_resolution = false; @@ -82,14 +86,10 @@ static grpc_ares_request* my_dns_lookup_ares_locked( phony_resolved_address.len = 123; (*addresses)->emplace_back(phony_resolved_address, nullptr); } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error); + work_serializer->Run([on_done, error]() { on_done(error); }, DEBUG_LOCATION); return nullptr; } -static void my_cancel_ares_request_locked(grpc_ares_request* request) { - GPR_ASSERT(request == nullptr); -} - static grpc_core::OrphanablePtr create_resolver( const char* name, std::unique_ptr result_handler) { @@ -166,8 +166,7 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_set_resolver_impl(&test_resolver); - grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; - grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; + grpc_core::LookupAresLocked = my_dns_lookup_ares_locked; { grpc_core::ExecCtx exec_ctx; @@ -177,7 +176,8 @@ int main(int argc, char** argv) { std::unique_ptr(result_handler)); ResultHandler::ResolverOutput output1; result_handler->SetOutput(&output1); - resolver->StartLocked(); + grpc_core::Resolver* r = resolver.get(); + work_serializer->Run([r]() { r->StartLocked(); }, DEBUG_LOCATION); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(wait_loop(5, &output1.ev)); GPR_ASSERT(output1.result.addresses.empty()); diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index cfedceba0df..d9577097751 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -37,12 +37,14 @@ static grpc_address_resolver_vtable* default_resolve_address; static std::shared_ptr* g_work_serializer; -static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, +static grpc_core::OrphanablePtr ( + *g_default_dns_lookup_ares_locked)( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, + absl::optional* service_config_json, int query_timeout_ms, std::shared_ptr work_serializer); // Counter incremented by test_resolve_address_impl indicating the number of @@ -96,17 +98,20 @@ static grpc_error* test_blocking_resolve_address_impl( static grpc_address_resolver_vtable test_resolver = { test_resolve_address_impl, test_blocking_resolve_address_impl}; -static grpc_ares_request* test_dns_lookup_ares_locked( - const char* dns_server, const char* name, const char* default_port, - grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, +static grpc_core::OrphanablePtr +test_dns_lookup_ares_locked( + absl::string_view dns_server, absl::string_view name, + absl::string_view default_port, grpc_pollset_set* /*interested_parties*/, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, + absl::optional* service_config_json, int query_timeout_ms, std::shared_ptr work_serializer) { - grpc_ares_request* result = g_default_dns_lookup_ares_locked( - dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, - addresses, balancer_addresses, service_config_json, query_timeout_ms, - std::move(work_serializer)); + grpc_core::OrphanablePtr result = + g_default_dns_lookup_ares_locked( + dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, + addresses, balancer_addresses, service_config_json, query_timeout_ms, + std::move(work_serializer)); ++g_resolution_count; static grpc_millis last_resolution_time = 0; grpc_millis now = @@ -340,8 +345,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; - g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; - grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; + g_default_dns_lookup_ares_locked = grpc_core::LookupAresLocked; + grpc_core::LookupAresLocked = test_dns_lookup_ares_locked; default_resolve_address = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index d0ca2196e05..6ad3eb4af25 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -49,16 +49,16 @@ static void* tag(intptr_t t) { return reinterpret_cast(t); } static gpr_mu g_mu; static int g_resolve_port = -1; -static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( - const char* dns_server, const char* addr, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, +static grpc_core::OrphanablePtr ( + *iomgr_dns_lookup_ares_locked)( + absl::string_view dns_server, absl::string_view addr, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, + absl::optional* service_config_json, int query_timeout_ms, std::shared_ptr combiner); -static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); - static void set_resolve_port(int port) { gpr_mu_lock(&g_mu); g_resolve_port = port; @@ -107,14 +107,16 @@ static grpc_error* my_blocking_resolve_address( static grpc_address_resolver_vtable test_resolver = { my_resolve_address, my_blocking_resolve_address}; -static grpc_ares_request* my_dns_lookup_ares_locked( - const char* dns_server, const char* addr, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, +static grpc_core::OrphanablePtr +my_dns_lookup_ares_locked( + absl::string_view dns_server, absl::string_view addr, + absl::string_view default_port, grpc_pollset_set* interested_parties, + std::function on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, + absl::optional* service_config_json, int query_timeout_ms, std::shared_ptr work_serializer) { - if (0 != strcmp(addr, "test")) { + if (addr != "test") { return iomgr_dns_lookup_ares_locked( dns_server, addr, default_port, interested_parties, on_done, addresses, balancer_addresses, service_config_json, query_timeout_ms, @@ -135,16 +137,10 @@ static grpc_ares_request* my_dns_lookup_ares_locked( (*addresses)->emplace_back(&sa, sizeof(sa), nullptr); gpr_mu_unlock(&g_mu); } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error); + work_serializer->Run([on_done, error] { on_done(error); }, DEBUG_LOCATION); return nullptr; } -static void my_cancel_ares_request_locked(grpc_ares_request* request) { - if (request != nullptr) { - iomgr_cancel_ares_request_locked(request); - } -} - int main(int argc, char** argv) { grpc_completion_queue* cq; cq_verifier* cqv; @@ -157,10 +153,8 @@ int main(int argc, char** argv) { grpc_init(); default_resolver = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); - iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; - iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked; - grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; - grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; + iomgr_dns_lookup_ares_locked = grpc_core::LookupAresLocked; + grpc_core::LookupAresLocked = my_dns_lookup_ares_locked; int was_cancelled1; int was_cancelled2; diff --git a/test/cpp/naming/address_sorting_test.cc b/test/cpp/naming/address_sorting_test.cc index edcee212b10..f1c98c03a07 100644 --- a/test/cpp/naming/address_sorting_test.cc +++ b/test/cpp/naming/address_sorting_test.cc @@ -205,7 +205,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnreachableAddresses) { {"1.2.3.4:443", AF_INET}, {"5.6.7.8:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "1.2.3.4:443", "5.6.7.8:443", @@ -224,7 +224,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnsupportedDomainIpv6) { {"[2607:f8b0:400a:801::1002]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "1.2.3.4:443", "[2607:f8b0:400a:801::1002]:443", @@ -244,7 +244,7 @@ TEST_F(AddressSortingTest, TestDepriotizesUnsupportedDomainIpv4) { {"[2607:f8b0:400a:801::1002]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[2607:f8b0:400a:801::1002]:443", "1.2.3.4:443", @@ -268,7 +268,7 @@ TEST_F(AddressSortingTest, TestDepriotizesNonMatchingScope) { {"[2000:f8b0:400a:801::1002]:443", AF_INET6}, {"[fec0::5000]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[fec0::5000]:443", "[2000:f8b0:400a:801::1002]:443", @@ -291,7 +291,7 @@ TEST_F(AddressSortingTest, TestUsesLabelFromDefaultTable) { {"[2002::5001]:443", AF_INET6}, {"[2001::5001]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[2001::5001]:443", "[2002::5001]:443", @@ -314,7 +314,7 @@ TEST_F(AddressSortingTest, TestUsesLabelFromDefaultTableInputFlipped) { {"[2001::5001]:443", AF_INET6}, {"[2002::5001]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[2001::5001]:443", "[2002::5001]:443", @@ -337,7 +337,7 @@ TEST_F(AddressSortingTest, {"[3ffe::5001]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs( lb_addrs, { // The AF_INET address should be IPv4-mapped by the sort, @@ -370,7 +370,7 @@ TEST_F(AddressSortingTest, {v4_compat_dest, AF_INET6}, {"[::1]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", v4_compat_dest, @@ -393,7 +393,7 @@ TEST_F(AddressSortingTest, {"[1234::2]:443", AF_INET6}, {"[::1]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs( lb_addrs, { @@ -417,7 +417,7 @@ TEST_F(AddressSortingTest, {"[2001::1234]:443", AF_INET6}, {"[2000::5001]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs( lb_addrs, { // The 2000::/16 address should match the ::/0 prefix rule @@ -441,7 +441,7 @@ TEST_F( {"[2001::1231]:443", AF_INET6}, {"[2000::5001]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[2000::5001]:443", "[2001::1231]:443", @@ -462,7 +462,7 @@ TEST_F(AddressSortingTest, {"[fec0::1234]:443", AF_INET6}, {"[fc00::5001]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[fc00::5001]:443", "[fec0::1234]:443", @@ -487,7 +487,7 @@ TEST_F( {"[::ffff:1.1.1.2]:443", AF_INET6}, {"[1234::2]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { // ::ffff:0:2 should match the v4-mapped // precedence entry and be deprioritized. @@ -514,7 +514,7 @@ TEST_F(AddressSortingTest, TestPrefersSmallerScope) { {"[3ffe::5001]:443", AF_INET6}, {"[fec0::1234]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[fec0::1234]:443", "[3ffe::5001]:443", @@ -539,7 +539,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestMatchingSrcDstPrefix) { {"[3ffe:5001::]:443", AF_INET6}, {"[3ffe:1234::]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:1234::]:443", "[3ffe:5001::]:443", @@ -560,7 +560,7 @@ TEST_F(AddressSortingTest, {"[3ffe::5001]:443", AF_INET6}, {"[3ffe::1234]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1234]:443", "[3ffe::5001]:443", @@ -580,7 +580,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixStressInnerBytePrefix) { {"[3ffe:8000::]:443", AF_INET6}, {"[3ffe:2000::]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:2000::]:443", "[3ffe:8000::]:443", @@ -600,7 +600,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixDiffersOnHighestBitOfByte) { {"[3ffe:6::]:443", AF_INET6}, {"[3ffe:c::]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:c::]:443", "[3ffe:6::]:443", @@ -622,7 +622,7 @@ TEST_F(AddressSortingTest, TestPrefersLongestPrefixDiffersByLastBit) { {"[3ffe:1111:1111:1110::]:443", AF_INET6}, {"[3ffe:1111:1111:1111::]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe:1111:1111:1111::]:443", "[3ffe:1111:1111:1110::]:443", @@ -644,7 +644,7 @@ TEST_F(AddressSortingTest, TestStableSort) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1234]:443", "[3ffe::1235]:443", @@ -670,7 +670,7 @@ TEST_F(AddressSortingTest, TestStableSortFiveElements) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1231]:443", "[3ffe::1232]:443", @@ -691,7 +691,7 @@ TEST_F(AddressSortingTest, TestStableSortNoSrcAddrsExist) { {"[3ffe::1234]:443", AF_INET6}, {"[3ffe::1235]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[3ffe::1231]:443", "[3ffe::1232]:443", @@ -709,7 +709,7 @@ TEST_F(AddressSortingTest, TestStableSortNoSrcAddrsExistWithIpv4) { {"[::ffff:5.6.7.8]:443", AF_INET6}, {"1.2.3.4:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[::ffff:5.6.7.8]:443", "1.2.3.4:443", @@ -737,7 +737,7 @@ TEST_F(AddressSortingTest, TestStableSortV4CompatAndSiteLocalAddresses) { {"[fec0::2000]:443", AF_INET6}, {v4_compat_dest, AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { // The sort should be stable since @@ -758,7 +758,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6Loopback) { {"[::1]:443", AF_INET6}, {"127.0.0.1:443", AF_INET}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", "127.0.0.1:443", @@ -772,7 +772,7 @@ TEST_F(AddressSortingTest, TestPrefersIpv6LoopbackInputsFlipped) { {"127.0.0.1:443", AF_INET}, {"[::1]:443", AF_INET6}, }); - grpc_cares_wrapper_address_sorting_sort(nullptr, &lb_addrs); + grpc_core::AddressSortingSort(nullptr, &lb_addrs, "test-addresses"); VerifyLbAddrOutputs(lb_addrs, { "[::1]:443", "127.0.0.1:443", diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 371e3e5eb4e..2e727d4575b 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -35,6 +35,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/global_config_generic.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/pollset.h" @@ -55,6 +56,8 @@ #define BAD_SOCKET_RETURN_VAL (-1) #endif +GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_abort_on_leaks); + namespace { void* Tag(intptr_t t) { return reinterpret_cast(t); } @@ -405,6 +408,10 @@ TEST_F( int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); + // see notes in + // https://github.com/grpc/grpc/pull/25108#pullrequestreview-577881514 for + // motivation. + GPR_GLOBAL_CONFIG_SET(grpc_abort_on_leaks, true); ::testing::InitGoogleTest(&argc, argv); auto result = RUN_ALL_TESTS(); return result; diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 1fdc38d7a28..aa558d8065b 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -44,6 +44,7 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/global_config_generic.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/executor.h" @@ -126,6 +127,8 @@ ABSL_FLAG(std::string, expected_lb_policy, "", "Expected lb policy name that appears in resolver result channel " "arg. Empty for none."); +GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_abort_on_leaks); + namespace { class GrpcLBAddress final { @@ -584,7 +587,7 @@ void RunResolvesRelevantRecordsTest( absl::make_unique( g_fake_non_responsive_dns_server_port); - grpc_ares_test_only_inject_config = InjectBrokenNameServerList; + grpc_core::internal::AresTestOnlyInjectConfig = InjectBrokenNameServerList; whole_uri = absl::StrCat("dns:///", absl::GetFlag(FLAGS_target_name)); } else if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "False") { gpr_log(GPR_INFO, "Specifying authority in uris to: %s", @@ -668,6 +671,10 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) { int main(int argc, char** argv) { grpc_init(); grpc::testing::TestEnvironment env(argc, argv); + // see notes in + // https://github.com/grpc/grpc/pull/25108#pullrequestreview-577881514 for + // motivation. + GPR_GLOBAL_CONFIG_SET(grpc_abort_on_leaks, true); ::testing::InitGoogleTest(&argc, argv); grpc::testing::InitTest(&argc, &argv, true); if (absl::GetFlag(FLAGS_target_name).empty()) {