From b972b76816641337d0026d3ec0041b6aeeae7829 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Dec 2021 08:26:53 -0800 Subject: [PATCH] Revert "Replace work serializer with a mutex in c-ares resolver (#27858)" (#28324) This reverts commit ec600f3973a4540bbd90d63bb7301b9ae0616cff. --- .../resolver/dns/c_ares/dns_resolver_ares.cc | 6 +- .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 34 +-- .../dns/c_ares/grpc_ares_ev_driver_posix.cc | 34 +-- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 61 ++-- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 271 ++++++++++-------- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 32 +-- .../dns_resolver_connectivity_test.cc | 11 +- .../resolvers/dns_resolver_cooldown_test.cc | 19 +- test/core/end2end/fuzzers/api_fuzzer.cc | 11 +- test/core/end2end/goaway_server_test.cc | 29 +- 10 files changed, 267 insertions(+), 241 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 0a51c586ede..6df35cbb964 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -186,7 +186,7 @@ void AresDnsResolver::ShutdownLocked() { grpc_timer_cancel(&next_resolution_timer_); } if (pending_request_ != nullptr) { - grpc_cancel_ares_request(pending_request_); + grpc_cancel_ares_request_locked(pending_request_); } } @@ -443,12 +443,12 @@ void AresDnsResolver::StartResolvingLocked() { GPR_ASSERT(!resolving_); resolving_ = true; service_config_json_ = nullptr; - pending_request_ = grpc_dns_lookup_ares( + pending_request_ = grpc_dns_lookup_ares_locked( dns_server_.c_str(), name_to_resolve_.c_str(), kDefaultSecurePort, interested_parties_, &on_resolved_, &addresses_, enable_srv_queries_ ? &balancer_addresses_ : nullptr, request_service_config_ ? &service_config_json_ : nullptr, - query_timeout_ms_); + query_timeout_ms_, work_serializer_); last_resolution_timestamp_ = ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", this, pending_request_); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index a1374af6d4a..41b3ae0dba4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -23,8 +23,8 @@ #include -#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -36,23 +36,18 @@ class GrpcPolledFd { public: virtual ~GrpcPolledFd() {} /* Called when c-ares library is interested and there's no pending callback */ - virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) = 0; /* Called when c-ares library is interested and there's no pending callback */ - virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) = 0; /* Indicates if there is data left even after just being read from */ - virtual bool IsFdStillReadableLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual bool IsFdStillReadableLocked() = 0; /* Called once and only once. Must cause cancellation of any pending * read/write callbacks. */ - virtual void ShutdownLocked(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual void ShutdownLocked(grpc_error_handle error) = 0; /* Get the underlying ares_socket_t that this was created from */ - virtual ares_socket_t GetWrappedAresSocketLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual ares_socket_t GetWrappedAresSocketLocked() = 0; /* A unique name, for logging */ - virtual const char* GetName() const = 0; + virtual const char* GetName() = 0; }; /* A GrpcPolledFdFactory is 1-to-1 with and owned by the @@ -64,19 +59,14 @@ class GrpcPolledFdFactory { virtual ~GrpcPolledFdFactory() {} /* Creates a new wrapped fd for the current platform */ virtual GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + ares_socket_t as, grpc_pollset_set* driver_pollset_set, + std::shared_ptr work_serializer) = 0; /* Optionally configures the ares channel after creation */ - virtual void ConfigureAresChannelLocked(ares_channel channel) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; + virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; }; -/* Creates a new polled fd factory. - * Note that even though ownership of mu is not transferred, the mu - * parameter is guaranteed to be alive for the the whole lifetime of - * the resulting GrpcPolledFdFactory as well as any GrpcPolledFd - * returned by the factory. */ -std::unique_ptr NewGrpcPolledFdFactory(Mutex* mu); +std::unique_ptr NewGrpcPolledFdFactory( + std::shared_ptr work_serializer); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 68f1c2bbcb4..e25bd55b225 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -59,53 +59,49 @@ class GrpcPolledFdPosix : public GrpcPolledFd { grpc_fd_orphan(fd_, nullptr, &phony_release_fd, "c-ares query finished"); } - void RegisterForOnReadableLocked(grpc_closure* read_closure) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { + void RegisterForOnReadableLocked(grpc_closure* read_closure) override { grpc_fd_notify_on_read(fd_, read_closure); } - void RegisterForOnWriteableLocked(grpc_closure* write_closure) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { + void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { grpc_fd_notify_on_write(fd_, write_closure); } - bool IsFdStillReadableLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { + bool IsFdStillReadableLocked() override { size_t bytes_available = 0; return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 && bytes_available > 0; } - void ShutdownLocked(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { + void ShutdownLocked(grpc_error_handle error) override { grpc_fd_shutdown(fd_, error); } - ares_socket_t GetWrappedAresSocketLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { - return as_; - } + ares_socket_t GetWrappedAresSocketLocked() override { return as_; } - const char* GetName() const override { return name_.c_str(); } + const char* GetName() override { return name_.c_str(); } private: - const std::string name_; - const ares_socket_t as_; - grpc_fd* fd_ ABSL_GUARDED_BY(&grpc_ares_request::mu); - grpc_pollset_set* driver_pollset_set_ ABSL_GUARDED_BY(&grpc_ares_request::mu); + std::string name_; + ares_socket_t as_; + grpc_fd* fd_; + grpc_pollset_set* driver_pollset_set_; }; class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set) override { + ares_socket_t as, grpc_pollset_set* driver_pollset_set, + std::shared_ptr /*work_serializer*/) override { return new GrpcPolledFdPosix(as, driver_pollset_set); } void ConfigureAresChannelLocked(ares_channel /*channel*/) override {} }; -std::unique_ptr NewGrpcPolledFdFactory(Mutex* /* mu */) { +std::unique_ptr NewGrpcPolledFdFactory( + std::shared_ptr work_serializer) { + (void)work_serializer; return absl::make_unique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index eab25f30808..a5c7a716950 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_windows.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" /* TODO(apolcyn): remove this hack after fixing upstream. @@ -99,9 +100,10 @@ class GrpcPolledFdWindows { WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, }; - GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family, - int socket_type) - : mu_(mu), + GrpcPolledFdWindows(ares_socket_t as, + std::shared_ptr work_serializer, + int address_family, int socket_type) + : work_serializer_(std::move(work_serializer)), read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), tcp_write_state_(WRITE_IDLE), @@ -258,7 +260,7 @@ class GrpcPolledFdWindows { return grpc_winsocket_wrapped_socket(winsocket_); } - const char* GetName() const { return name_.c_str(); } + const char* GetName() { return name_.c_str(); } ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_socket_t data_len, int flags, @@ -420,8 +422,12 @@ class GrpcPolledFdWindows { static void OnTcpConnect(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* grpc_polled_fd = static_cast(arg); - MutexLock lock(grpc_polled_fd->mu_); - grpc_polled_fd->OnTcpConnectLocked(error); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + grpc_polled_fd->work_serializer_->Run( + [grpc_polled_fd, error]() { + grpc_polled_fd->OnTcpConnectLocked(error); + }, + DEBUG_LOCATION); } void OnTcpConnectLocked(grpc_error_handle error) { @@ -463,6 +469,7 @@ class GrpcPolledFdWindows { if (pending_continue_register_for_on_writeable_locked_) { ContinueRegisterForOnWriteableLocked(); } + GRPC_ERROR_UNREF(error); } int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, @@ -568,9 +575,10 @@ class GrpcPolledFdWindows { static void OnIocpReadable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); - (void)GRPC_ERROR_REF(error); - MutexLock lock(polled_fd->mu_); - polled_fd->OnIocpReadableLocked(error); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + polled_fd->work_serializer_->Run( + [polled_fd, error]() { polled_fd->OnIocpReadableLocked(error); }, + DEBUG_LOCATION); } // TODO(apolcyn): improve this error handling to be less conversative. @@ -612,9 +620,10 @@ class GrpcPolledFdWindows { static void OnIocpWriteable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); - (void)GRPC_ERROR_REF(error); - MutexLock lock(polled_fd->mu_); - polled_fd->OnIocpWriteableLocked(error); + (void)GRPC_ERROR_REF(error); // error owned by lambda + polled_fd->work_serializer_->Run( + [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); }, + DEBUG_LOCATION); } void OnIocpWriteableLocked(grpc_error_handle error) { @@ -649,7 +658,7 @@ class GrpcPolledFdWindows { void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; } private: - Mutex* mu_; + std::shared_ptr work_serializer_; char recv_from_source_addr_[200]; ares_socklen_t recv_from_source_addr_len_; grpc_slice read_buf_; @@ -662,7 +671,7 @@ class GrpcPolledFdWindows { grpc_winsocket* winsocket_; // tcp_write_state_ is only used on TCP GrpcPolledFds WriteState tcp_write_state_; - const std::string name_; + std::string name_; bool gotten_into_driver_list_; int address_family_; int socket_type_; @@ -691,7 +700,8 @@ struct SockToPolledFdEntry { * with a GrpcPolledFdWindows factory and event driver */ class SockToPolledFdMap { public: - explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {} + explicit SockToPolledFdMap(std::shared_ptr work_serializer) + : work_serializer_(std::move(work_serializer)) {} ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); } @@ -749,7 +759,7 @@ class SockToPolledFdMap { } grpc_tcp_set_non_block(s); GrpcPolledFdWindows* polled_fd = - new GrpcPolledFdWindows(s, map->mu_, af, type); + new GrpcPolledFdWindows(s, map->work_serializer_, af, type); GRPC_CARES_TRACE_LOG( "fd:|%s| created with params af:%d type:%d protocol:%d", polled_fd->GetName(), af, type, protocol); @@ -804,8 +814,8 @@ class SockToPolledFdMap { } private: - Mutex* mu_; SockToPolledFdEntry* head_ = nullptr; + std::shared_ptr work_serializer_; }; const struct ares_socket_functions custom_ares_sock_funcs = { @@ -846,18 +856,21 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd { return wrapped_->GetWrappedAresSocketLocked(); } - const char* GetName() const override { return wrapped_->GetName(); } + const char* GetName() override { return wrapped_->GetName(); } private: - GrpcPolledFdWindows* const wrapped_; + GrpcPolledFdWindows* wrapped_; }; class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { public: - explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {} + explicit GrpcPolledFdFactoryWindows( + std::shared_ptr work_serializer) + : sock_to_polled_fd_map_(std::move(work_serializer)) {} GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set) override { + ares_socket_t as, grpc_pollset_set* driver_pollset_set, + std::shared_ptr work_serializer) override { GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as); // Set a flag so that the virtual socket "close" method knows it // doesn't need to call ShutdownLocked, since now the driver will. @@ -874,8 +887,10 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { SockToPolledFdMap sock_to_polled_fd_map_; }; -std::unique_ptr NewGrpcPolledFdFactory(Mutex* mu) { - return absl::make_unique(mu); +std::unique_ptr NewGrpcPolledFdFactory( + std::shared_ptr work_serializer) { + return absl::make_unique( + std::move(work_serializer)); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 03ae3c54aa8..fe5a42af8a2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -57,63 +57,55 @@ grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver"); typedef struct fd_node { - /* default constructor exists only for linked list manipulation */ - fd_node() : ev_driver(nullptr) {} - - explicit fd_node(grpc_ares_ev_driver* ev_driver) : ev_driver(ev_driver) {} - /** the owner of this fd node */ - grpc_ares_ev_driver* const ev_driver; + grpc_ares_ev_driver* ev_driver; /** a closure wrapping on_readable_locked, which should be invoked when the grpc_fd in this node becomes readable. */ - grpc_closure read_closure ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_closure read_closure; /** a closure wrapping on_writable_locked, which should be invoked when the grpc_fd in this node becomes writable. */ - grpc_closure write_closure ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_closure write_closure; /** next fd node in the list */ - struct fd_node* next ABSL_GUARDED_BY(&grpc_ares_request::mu); + struct fd_node* next; /** wrapped fd that's polled by grpc's poller for the current platform */ - grpc_core::GrpcPolledFd* grpc_polled_fd - ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_core::GrpcPolledFd* grpc_polled_fd; /** if the readable closure has been registered */ - bool readable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu); + bool readable_registered; /** if the writable closure has been registered */ - bool writable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu); + bool writable_registered; /** if the fd has been shutdown yet from grpc iomgr perspective */ - bool already_shutdown ABSL_GUARDED_BY(&grpc_ares_request::mu); + bool already_shutdown; } fd_node; struct grpc_ares_ev_driver { - explicit grpc_ares_ev_driver(grpc_ares_request* request) : request(request) {} - /** the ares_channel owned by this event driver */ - ares_channel channel ABSL_GUARDED_BY(&grpc_ares_request::mu); + ares_channel channel; /** pollset set for driving the IO events of the channel */ - grpc_pollset_set* pollset_set ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_pollset_set* pollset_set; /** refcount of the event driver */ gpr_refcount refs; + /** work_serializer to synchronize c-ares and I/O callbacks on */ + std::shared_ptr work_serializer; /** a list of grpc_fd that this event driver is currently using. */ - fd_node* fds ABSL_GUARDED_BY(&grpc_ares_request::mu); + fd_node* fds; /** is this event driver being shut down */ - bool shutting_down ABSL_GUARDED_BY(&grpc_ares_request::mu); + bool shutting_down; /** request object that's using this ev driver */ - grpc_ares_request* const request; + grpc_ares_request* request; /** Owned by the ev_driver. Creates new GrpcPolledFd's */ - std::unique_ptr polled_fd_factory - ABSL_GUARDED_BY(&grpc_ares_request::mu); + std::unique_ptr polled_fd_factory; /** query timeout in milliseconds */ - int query_timeout_ms ABSL_GUARDED_BY(&grpc_ares_request::mu); + int query_timeout_ms; /** alarm to cancel active queries */ - grpc_timer query_timeout ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_timer query_timeout; /** cancels queries on a timeout */ - grpc_closure on_timeout_locked ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_closure on_timeout_locked; /** alarm to poll ares_process on in case fd events don't happen */ - grpc_timer ares_backup_poll_alarm ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_timer ares_backup_poll_alarm; /** polls ares_process on a periodic timer */ - grpc_closure on_ares_backup_poll_alarm_locked - ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_closure on_ares_backup_poll_alarm_locked; }; // TODO(apolcyn): make grpc_ares_hostbyname_request a sub-class @@ -133,10 +125,8 @@ typedef struct grpc_ares_hostbyname_request { const char* qtype; } grpc_ares_hostbyname_request; -static void grpc_ares_request_ref_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); -static void grpc_ares_request_unref_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); +static void grpc_ares_request_ref_locked(grpc_ares_request* r); +static void grpc_ares_request_unref_locked(grpc_ares_request* r); // TODO(apolcyn): as a part of C++-ification, find a way to // organize per-query and per-resolution information in such a way @@ -163,19 +153,14 @@ class GrpcAresQuery { }; static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( - grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { + grpc_ares_ev_driver* ev_driver) { GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, ev_driver); gpr_ref(&ev_driver->refs); return ev_driver; } -static void grpc_ares_complete_request_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); - -static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, ev_driver); if (gpr_unref(&ev_driver->refs)) { @@ -188,19 +173,17 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) } } -static void fd_node_destroy_locked(fd_node* fdn) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +static void fd_node_destroy_locked(fd_node* fdn) { GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, fdn->grpc_polled_fd->GetName()); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); delete fdn->grpc_polled_fd; - delete fdn; + gpr_free(fdn); } -static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { if (!fdn->already_shutdown) { fdn->already_shutdown = true; fdn->grpc_polled_fd->ShutdownLocked( @@ -209,8 +192,7 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) } void grpc_ares_ev_driver_on_queries_complete_locked( - grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { + grpc_ares_ev_driver* ev_driver) { // We mark the event driver as being shut down. // grpc_ares_notify_on_event_locked will shut down any remaining // fds. @@ -220,8 +202,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked( grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { @@ -232,8 +213,7 @@ void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) // Search fd in the fd_node list head. This is an O(n) search, the max possible // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { fd_node phony_head; phony_head.next = *head; fd_node* node = &phony_head; @@ -250,8 +230,7 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) } static grpc_millis calculate_next_ares_backup_poll_alarm_ms( - grpc_ares_ev_driver* driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { + grpc_ares_ev_driver* driver) { // An alternative here could be to use ares_timeout to try to be more // accurate, but that would require using "struct timeval"'s, which just makes // things a bit more complicated. So just poll every second, as suggested @@ -265,9 +244,8 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms( grpc_core::ExecCtx::Get()->Now(); } -static void on_timeout(void* arg, grpc_error_handle error) { - grpc_ares_ev_driver* driver = static_cast(arg); - grpc_core::MutexLock lock(&driver->request->mu); +static void on_timeout_locked(grpc_ares_ev_driver* driver, + grpc_error_handle error) { GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " "err=%s", @@ -277,10 +255,28 @@ static void on_timeout(void* arg, grpc_error_handle error) { grpc_ares_ev_driver_shutdown_locked(driver); } grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); } -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu); +static void on_timeout(void* arg, grpc_error_handle error) { + grpc_ares_ev_driver* driver = static_cast(arg); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + driver->work_serializer->Run( + [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); +} + +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); + +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, + grpc_error_handle error); + +static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { + grpc_ares_ev_driver* driver = static_cast(arg); + (void)GRPC_ERROR_REF(error); + driver->work_serializer->Run( + [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, + DEBUG_LOCATION); +} /* In case of non-responsive DNS servers, dropped packets, etc., c-ares has * intelligent timeout and retry logic, which we can take advantage of by @@ -290,9 +286,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) * b) when some time has passed without fd events having happened * For the latter, we use this backup poller. Also see * https://github.com/grpc/grpc/pull/17688 description for more details. */ -static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { - grpc_ares_ev_driver* driver = static_cast(arg); - grpc_core::MutexLock lock(&driver->request->mu); +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, + grpc_error_handle error) { GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " "driver->shutting_down=%d. " @@ -330,11 +325,10 @@ static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { grpc_ares_notify_on_event_locked(driver); } grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); } -static void on_readable(void* arg, grpc_error_handle error) { - fd_node* fdn = static_cast(arg); - grpc_core::MutexLock lock(&fdn->ev_driver->request->mu); +static void on_readable_locked(fd_node* fdn, grpc_error_handle error) { GPR_ASSERT(fdn->readable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -356,11 +350,17 @@ static void on_readable(void* arg, grpc_error_handle error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); } -static void on_writable(void* arg, grpc_error_handle error) { +static void on_readable(void* arg, grpc_error_handle error) { fd_node* fdn = static_cast(arg); - grpc_core::MutexLock lock(&fdn->ev_driver->request->mu); + (void)GRPC_ERROR_REF(error); /* ref owned by lambda */ + fdn->ev_driver->work_serializer->Run( + [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); +} + +static void on_writable_locked(fd_node* fdn, grpc_error_handle error) { GPR_ASSERT(fdn->writable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -380,12 +380,19 @@ static void on_writable(void* arg, grpc_error_handle error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); +} + +static void on_writable(void* arg, grpc_error_handle error) { + fd_node* fdn = static_cast(arg); + (void)GRPC_ERROR_REF(error); /* ref owned by lambda */ + fdn->ev_driver->work_serializer->Run( + [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); } // Get the file descriptors used by the ev_driver's ares channel, register // driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fd_node* new_list = nullptr; if (!ev_driver->shutting_down) { ares_socket_t socks[ARES_GETSOCK_MAXNUM]; @@ -397,12 +404,13 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); // Create a new fd_node if sock[i] is not in the fd_node list. if (fdn == nullptr) { - fdn = new fd_node(ev_driver); + fdn = static_cast(gpr_malloc(sizeof(fd_node))); fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( - socks[i], ev_driver->pollset_set); + socks[i], ev_driver->pollset_set, ev_driver->work_serializer); GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, fdn->grpc_polled_fd->GetName()); + fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; fdn->already_shutdown = false; @@ -458,8 +466,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) ev_driver->fds = new_list; } -void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { grpc_ares_notify_on_event_locked(ev_driver); // Initialize overall DNS resolution timeout alarm grpc_millis timeout = @@ -494,9 +501,10 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) = grpc_error_handle grpc_ares_ev_driver_create_locked( grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, - int query_timeout_ms, grpc_ares_request* request) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(request->mu) { - *ev_driver = new grpc_ares_ev_driver(request); + int query_timeout_ms, + std::shared_ptr work_serializer, + grpc_ares_request* request) { + *ev_driver = new grpc_ares_ev_driver(); ares_options opts; memset(&opts, 0, sizeof(opts)); opts.flags |= ARES_FLAG_STAYOPEN; @@ -506,15 +514,17 @@ grpc_error_handle grpc_ares_ev_driver_create_locked( if (status != ARES_SUCCESS) { grpc_error_handle err = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( "Failed to init ares channel. C-ares error: ", ares_strerror(status))); - delete *ev_driver; + gpr_free(*ev_driver); return err; } + (*ev_driver)->work_serializer = std::move(work_serializer); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; (*ev_driver)->shutting_down = false; + (*ev_driver)->request = request; (*ev_driver)->polled_fd_factory = - grpc_core::NewGrpcPolledFdFactory(&(*ev_driver)->request->mu); + grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); (*ev_driver)->query_timeout_ms = query_timeout_ms; @@ -560,21 +570,18 @@ void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r, } } -static void grpc_ares_request_ref_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { +static void grpc_ares_request_ref_locked(grpc_ares_request* r) { r->pending_queries++; } -static void grpc_ares_request_unref_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { +static void grpc_ares_request_unref_locked(grpc_ares_request* r) { r->pending_queries--; if (r->pending_queries == 0u) { grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver); } } -void grpc_ares_complete_request_locked(grpc_ares_request* r) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { +void grpc_ares_complete_request_locked(grpc_ares_request* r) { /* Invoke on_done callback and destroy the request */ r->ev_driver = nullptr; @@ -599,8 +606,7 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) * qtype must outlive it. */ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, const char* host, uint16_t port, - bool is_balancer, const char* qtype) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) { + bool is_balancer, const char* qtype) { GRPC_CARES_TRACE_LOG( "request:%p create_hostbyname_request_locked host:%s port:%d " "is_balancer:%d qtype:%s", @@ -615,18 +621,15 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( return hr; } -static void destroy_hostbyname_request_locked(grpc_ares_hostbyname_request* hr) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(hr->parent_request->mu) { +static void destroy_hostbyname_request_locked( + grpc_ares_hostbyname_request* hr) { grpc_ares_request_unref_locked(hr->parent_request); gpr_free(hr->host); delete hr; } static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, - struct hostent* hostent) - ABSL_NO_THREAD_SAFETY_ANALYSIS { - // This callback is invoked from the c-ares library, so disable thread safety - // analysis. Note that we are guaranteed to be holding r->mu, though. + struct hostent* hostent) { grpc_ares_hostbyname_request* hr = static_cast(arg); grpc_ares_request* r = hr->parent_request; @@ -699,10 +702,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, } static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* abuf, - int alen) ABSL_NO_THREAD_SAFETY_ANALYSIS { - // This callback is invoked from the c-ares library, so disable thread safety - // analysis. Note that we are guaranteed to be holding r->mu, though. + unsigned char* abuf, int alen) { GrpcAresQuery* q = static_cast(arg); grpc_ares_request* r = q->parent_request(); if (status == ARES_SUCCESS) { @@ -749,10 +749,7 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, static const char g_service_config_attribute_prefix[] = "grpc_config="; static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* buf, - int len) ABSL_NO_THREAD_SAFETY_ANALYSIS { - // This callback is invoked from the c-ares library, so disable thread safety - // analysis. Note that we are guaranteed to be holding r->mu, though. + unsigned char* buf, int len) { GrpcAresQuery* q = static_cast(arg); std::unique_ptr query_deleter(q); grpc_ares_request* r = q->parent_request(); @@ -809,7 +806,8 @@ fail: void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, - int query_timeout_ms) ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { + int query_timeout_ms, + std::shared_ptr work_serializer) { grpc_error_handle error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; /* parse name, splitting it into host and port parts */ @@ -831,7 +829,8 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( port = default_port; } error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, - query_timeout_ms, r); + query_timeout_ms, + std::move(work_serializer), r); if (error != GRPC_ERROR_NONE) goto error_cleanup; // If dns_server is specified, use it. if (dns_server != nullptr && dns_server[0] != '\0') { @@ -1030,21 +1029,21 @@ static bool grpc_ares_maybe_resolve_localhost_manually_locked( } #endif /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, std::unique_ptr* balancer_addrs, - char** service_config_json, int query_timeout_ms) { + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) { grpc_ares_request* r = new grpc_ares_request(); - grpc_core::MutexLock lock(&r->mu); r->ev_driver = nullptr; r->on_done = on_done; r->addresses_out = addrs; r->balancer_addresses_out = balancer_addrs; r->service_config_json_out = service_config_json; GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_ares_impl name=%s, " + "request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, " "default_port=%s", r, name, default_port); // Early out if the target is an ipv4 or ipv6 literal. @@ -1067,28 +1066,29 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } // Look up name using c-ares lib. grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( - r, dns_server, name, default_port, interested_parties, query_timeout_ms); + r, dns_server, name, default_port, interested_parties, query_timeout_ms, + std::move(work_serializer)); return r; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, std::unique_ptr* balancer_addrs, - char** service_config_json, - int query_timeout_ms) = grpc_dns_lookup_ares_impl; + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) = + grpc_dns_lookup_ares_locked_impl; -static void grpc_cancel_ares_request_impl(grpc_ares_request* r) { +static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { GPR_ASSERT(r != nullptr); - grpc_core::MutexLock lock(&r->mu); if (r->ev_driver != nullptr) { grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } -void (*grpc_cancel_ares_request)(grpc_ares_request* r) = - grpc_cancel_ares_request_impl; +void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) = + grpc_cancel_ares_request_locked_impl; // ares_library_init and ares_library_cleanup are currently no-op except under // Windows. Calling them may cause race conditions when other parts of the @@ -1114,6 +1114,8 @@ void grpc_ares_cleanup(void) {} */ typedef struct grpc_resolve_address_ares_request { + /* work_serializer that queries and related callbacks run under */ + std::shared_ptr work_serializer; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving addresses */ @@ -1121,8 +1123,8 @@ typedef struct grpc_resolve_address_ares_request { /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_resolve_address_done, which should be invoked when - the grpc_dns_lookup_ares operation is done. */ - grpc_closure on_dns_lookup_done; + the grpc_dns_lookup_ares_locked operation is done. */ + grpc_closure on_dns_lookup_done_locked; /* target name */ const char* name; /* default port to use if none is specified */ @@ -1133,9 +1135,8 @@ typedef struct grpc_resolve_address_ares_request { grpc_ares_request* ares_request = nullptr; } grpc_resolve_address_ares_request; -static void on_dns_lookup_done(void* arg, grpc_error_handle error) { - grpc_resolve_address_ares_request* r = - static_cast(arg); +static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, + grpc_error_handle error) { delete r->ares_request; grpc_resolved_addresses** resolved_addresses = r->addrs_out; if (r->addresses == nullptr || r->addresses->empty()) { @@ -1152,11 +1153,30 @@ static void on_dns_lookup_done(void* arg, grpc_error_handle error) { sizeof(grpc_resolved_address)); } } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, - GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, error); delete r; } +static void on_dns_lookup_done(void* arg, grpc_error_handle error) { + grpc_resolve_address_ares_request* r = + static_cast(arg); + (void)GRPC_ERROR_REF(error); // ref owned by lambda + r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, + DEBUG_LOCATION); +} + +static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { + grpc_resolve_address_ares_request* r = + static_cast(arg); + GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r, + grpc_schedule_on_exec_ctx); + r->ares_request = grpc_dns_lookup_ares_locked( + nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, + &r->on_dns_lookup_done_locked, &r->addresses, + nullptr /* balancer_addresses */, nullptr /* service_config_json */, + GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->work_serializer); +} + static void grpc_resolve_address_ares_impl(const char* name, const char* default_port, grpc_pollset_set* interested_parties, @@ -1164,18 +1184,15 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolved_addresses** addrs) { grpc_resolve_address_ares_request* r = new grpc_resolve_address_ares_request(); + r->work_serializer = std::make_shared(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; r->name = name; r->default_port = default_port; r->interested_parties = interested_parties; - GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done, r, - grpc_schedule_on_exec_ctx); - r->ares_request = grpc_dns_lookup_ares( - nullptr /* dns_server */, name, default_port, interested_parties, - &r->on_dns_lookup_done, &r->addresses, nullptr /* balancer_addresses */, - nullptr /* service_config_json */, - GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS); + r->work_serializer->Run( + [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); }, + DEBUG_LOCATION); } void (*grpc_resolve_address_ares)( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 0e91cd25024..cfc0e2b60fa 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -45,28 +45,23 @@ extern grpc_core::TraceFlag grpc_trace_cares_resolver; typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; struct grpc_ares_request { - /** synchronizes access to this request, and also to associated - * ev_driver and fd_node objects */ - grpc_core::Mutex mu; /** indicates the DNS server to use, if specified */ - struct ares_addr_port_node dns_server_addr ABSL_GUARDED_BY(mu); + struct ares_addr_port_node dns_server_addr; /** following members are set in grpc_resolve_address_ares_impl */ /** closure to call when the request completes */ - grpc_closure* on_done ABSL_GUARDED_BY(mu) = nullptr; + grpc_closure* on_done = nullptr; /** the pointer to receive the resolved addresses */ - std::unique_ptr* addresses_out - ABSL_GUARDED_BY(mu); + std::unique_ptr* addresses_out; /** the pointer to receive the resolved balancer addresses */ - std::unique_ptr* balancer_addresses_out - ABSL_GUARDED_BY(mu); + std::unique_ptr* balancer_addresses_out; /** the pointer to receive the service config in JSON */ - char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr; + char** service_config_json_out = nullptr; /** the evernt driver used by this request */ - grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr; + grpc_ares_ev_driver* ev_driver = nullptr; /** number of ongoing queries */ - size_t pending_queries ABSL_GUARDED_BY(mu) = 0; + size_t pending_queries = 0; /** the errors explaining query failures, appended to in query callbacks */ - grpc_error_handle error ABSL_GUARDED_BY(mu) = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; }; /* Asynchronously resolve \a name. Use \a default_port if a port isn't @@ -88,15 +83,16 @@ extern void (*grpc_resolve_address_ares)(const char* name, scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. The returned grpc_ares_request object is owned by the caller and it is safe to free after on_done is called back. */ -extern grpc_ares_request* (*grpc_dns_lookup_ares)( +extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms); + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer); /* Cancel the pending grpc_ares_request \a request */ -extern void (*grpc_cancel_ares_request)(grpc_ares_request* request); +extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); /* Initialize gRPC ares wrapper. Must be called at least once before grpc_resolve_address_ares(). */ @@ -107,6 +103,10 @@ grpc_error_handle grpc_ares_init(void); it has been called the same number of times as grpc_ares_init(). */ void grpc_ares_cleanup(void); +/** Schedules the desired callback for request completion + * and destroys the grpc_ares_request */ +void grpc_ares_complete_request_locked(grpc_ares_request* request); + /* Indicates whether or not AAAA queries should be attempted. */ /* E.g., return false if ipv6 is known to not be available. */ bool grpc_ares_query_ipv6(); diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index 873b34f0317..9486ad9ea05 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -60,12 +60,13 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable test_resolver = {my_resolve_address, nullptr}; -static grpc_ares_request* my_dns_lookup_ares( +static grpc_ares_request* my_dns_lookup_ares_locked( const char* /*dns_server*/, const char* addr, const char* /*default_port*/, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - char** /*service_config_json*/, int /*query_timeout_ms*/) { // NOLINT + char** /*service_config_json*/, int /*query_timeout_ms*/, + std::shared_ptr /*combiner*/) { // NOLINT gpr_mu_lock(&g_mu); GPR_ASSERT(0 == strcmp("test", addr)); grpc_error_handle error = GRPC_ERROR_NONE; @@ -85,7 +86,7 @@ static grpc_ares_request* my_dns_lookup_ares( return nullptr; } -static void my_cancel_ares_request(grpc_ares_request* request) { +static void my_cancel_ares_request_locked(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } @@ -154,8 +155,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_set_resolver_impl(&test_resolver); - grpc_dns_lookup_ares = my_dns_lookup_ares; - grpc_cancel_ares_request = my_cancel_ares_request; + grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; + grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; { grpc_core::ExecCtx exec_ctx; diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 35c6b8a88ea..66bdea2e5b2 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -37,12 +37,13 @@ static grpc_address_resolver_vtable* default_resolve_address; static std::shared_ptr* g_work_serializer; -static grpc_ares_request* (*g_default_dns_lookup_ares)( +static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms); + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer); // Counter incremented by test_resolve_address_impl indicating the number of // times a system-level resolution has happened. @@ -95,15 +96,17 @@ static grpc_error_handle test_blocking_resolve_address_impl( static grpc_address_resolver_vtable test_resolver = { test_resolve_address_impl, test_blocking_resolve_address_impl}; -static grpc_ares_request* test_dns_lookup_ares( +static grpc_ares_request* test_dns_lookup_ares_locked( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms) { - grpc_ares_request* result = g_default_dns_lookup_ares( + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) { + grpc_ares_request* result = g_default_dns_lookup_ares_locked( dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, - addresses, balancer_addresses, service_config_json, query_timeout_ms); + addresses, balancer_addresses, service_config_json, query_timeout_ms, + std::move(work_serializer)); ++g_resolution_count; static grpc_millis last_resolution_time = 0; grpc_millis now = @@ -332,8 +335,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; - g_default_dns_lookup_ares = grpc_dns_lookup_ares; - grpc_dns_lookup_ares = test_dns_lookup_ares; + g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; + grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; default_resolve_address = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index a4fdf316e25..cf2769707b3 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -137,12 +137,13 @@ void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address, nullptr}; -grpc_ares_request* my_dns_lookup_ares( +grpc_ares_request* my_dns_lookup_ares_locked( const char* /*dns_server*/, const char* addr, const char* /*default_port*/, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - char** /*service_config_json*/, int /*query_timeout*/) { + char** /*service_config_json*/, int /*query_timeout*/, + std::shared_ptr /*combiner*/) { addr_req* r = new addr_req(); r->addr = gpr_strdup(addr); r->on_done = on_done; @@ -154,7 +155,7 @@ grpc_ares_request* my_dns_lookup_ares( return nullptr; } -static void my_cancel_ares_request(grpc_ares_request* request) { +static void my_cancel_ares_request_locked(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } //////////////////////////////////////////////////////////////////////////////// @@ -729,8 +730,8 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_core::Executor::SetThreadingAll(false); } grpc_set_resolver_impl(&fuzzer_resolver); - grpc_dns_lookup_ares = my_dns_lookup_ares; - grpc_cancel_ares_request = my_cancel_ares_request; + grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; + grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; GPR_ASSERT(g_channel == nullptr); GPR_ASSERT(g_server == nullptr); diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 9924ab61656..4d4509bc53a 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -43,14 +43,15 @@ static void* tag(intptr_t t) { return reinterpret_cast(t); } static gpr_mu g_mu; static int g_resolve_port = -1; -static grpc_ares_request* (*iomgr_dns_lookup_ares)( +static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( const char* dns_server, const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms); + char** service_config_json, int query_timeout_ms, + std::shared_ptr combiner); -static void (*iomgr_cancel_ares_request)(grpc_ares_request* request); +static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); static void set_resolve_port(int port) { gpr_mu_lock(&g_mu); @@ -100,16 +101,18 @@ static grpc_error_handle my_blocking_resolve_address( static grpc_address_resolver_vtable test_resolver = { my_resolve_address, my_blocking_resolve_address}; -static grpc_ares_request* my_dns_lookup_ares( +static grpc_ares_request* my_dns_lookup_ares_locked( const char* dns_server, const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms) { + char** service_config_json, int query_timeout_ms, + std::shared_ptr work_serializer) { if (0 != strcmp(addr, "test")) { - return iomgr_dns_lookup_ares( + return iomgr_dns_lookup_ares_locked( dns_server, addr, default_port, interested_parties, on_done, addresses, - balancer_addresses, service_config_json, query_timeout_ms); + balancer_addresses, service_config_json, query_timeout_ms, + std::move(work_serializer)); } grpc_error_handle error = GRPC_ERROR_NONE; @@ -131,9 +134,9 @@ static grpc_ares_request* my_dns_lookup_ares( return nullptr; } -static void my_cancel_ares_request(grpc_ares_request* request) { +static void my_cancel_ares_request_locked(grpc_ares_request* request) { if (request != nullptr) { - iomgr_cancel_ares_request(request); + iomgr_cancel_ares_request_locked(request); } } @@ -149,10 +152,10 @@ int main(int argc, char** argv) { grpc_init(); default_resolver = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); - iomgr_dns_lookup_ares = grpc_dns_lookup_ares; - iomgr_cancel_ares_request = grpc_cancel_ares_request; - grpc_dns_lookup_ares = my_dns_lookup_ares; - grpc_cancel_ares_request = my_cancel_ares_request; + iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; + iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked; + grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; + grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; int was_cancelled1; int was_cancelled2;