diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 6df35cbb964..0a51c586ede 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -186,7 +186,7 @@ void AresDnsResolver::ShutdownLocked() { grpc_timer_cancel(&next_resolution_timer_); } if (pending_request_ != nullptr) { - grpc_cancel_ares_request_locked(pending_request_); + grpc_cancel_ares_request(pending_request_); } } @@ -443,12 +443,12 @@ void AresDnsResolver::StartResolvingLocked() { GPR_ASSERT(!resolving_); resolving_ = true; service_config_json_ = nullptr; - pending_request_ = grpc_dns_lookup_ares_locked( + pending_request_ = grpc_dns_lookup_ares( dns_server_.c_str(), name_to_resolve_.c_str(), kDefaultSecurePort, interested_parties_, &on_resolved_, &addresses_, enable_srv_queries_ ? &balancer_addresses_ : nullptr, request_service_config_ ? &service_config_json_ : nullptr, - query_timeout_ms_, work_serializer_); + query_timeout_ms_); last_resolution_timestamp_ = ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", this, pending_request_); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 41b3ae0dba4..a1374af6d4a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -23,8 +23,8 @@ #include +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -36,18 +36,23 @@ class GrpcPolledFd { public: virtual ~GrpcPolledFd() {} /* Called when c-ares library is interested and there's no pending callback */ - virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) = 0; + virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* Called when c-ares library is interested and there's no pending callback */ - virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) = 0; + virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* Indicates if there is data left even after just being read from */ - virtual bool IsFdStillReadableLocked() = 0; + virtual bool IsFdStillReadableLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* Called once and only once. Must cause cancellation of any pending * read/write callbacks. */ - virtual void ShutdownLocked(grpc_error_handle error) = 0; + virtual void ShutdownLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* Get the underlying ares_socket_t that this was created from */ - virtual ares_socket_t GetWrappedAresSocketLocked() = 0; + virtual ares_socket_t GetWrappedAresSocketLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* A unique name, for logging */ - virtual const char* GetName() = 0; + virtual const char* GetName() const = 0; }; /* A GrpcPolledFdFactory is 1-to-1 with and owned by the @@ -59,14 +64,19 @@ class GrpcPolledFdFactory { virtual ~GrpcPolledFdFactory() {} /* Creates a new wrapped fd for the current platform */ virtual GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set, - std::shared_ptr work_serializer) = 0; + ares_socket_t as, grpc_pollset_set* driver_pollset_set) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; /* Optionally configures the ares channel after creation */ - virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; + virtual void ConfigureAresChannelLocked(ares_channel channel) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0; }; -std::unique_ptr NewGrpcPolledFdFactory( - std::shared_ptr work_serializer); +/* Creates a new polled fd factory. + * Note that even though ownership of mu is not transferred, the mu + * parameter is guaranteed to be alive for the the whole lifetime of + * the resulting GrpcPolledFdFactory as well as any GrpcPolledFd + * returned by the factory. */ +std::unique_ptr NewGrpcPolledFdFactory(Mutex* mu); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index e25bd55b225..68f1c2bbcb4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -59,49 +59,53 @@ class GrpcPolledFdPosix : public GrpcPolledFd { grpc_fd_orphan(fd_, nullptr, &phony_release_fd, "c-ares query finished"); } - void RegisterForOnReadableLocked(grpc_closure* read_closure) override { + void RegisterForOnReadableLocked(grpc_closure* read_closure) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { grpc_fd_notify_on_read(fd_, read_closure); } - void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { + void RegisterForOnWriteableLocked(grpc_closure* write_closure) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { grpc_fd_notify_on_write(fd_, write_closure); } - bool IsFdStillReadableLocked() override { + bool IsFdStillReadableLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { size_t bytes_available = 0; return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 && bytes_available > 0; } - void ShutdownLocked(grpc_error_handle error) override { + void ShutdownLocked(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { grpc_fd_shutdown(fd_, error); } - ares_socket_t GetWrappedAresSocketLocked() override { return as_; } + ares_socket_t GetWrappedAresSocketLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override { + return as_; + } - const char* GetName() override { return name_.c_str(); } + const char* GetName() const override { return name_.c_str(); } private: - std::string name_; - ares_socket_t as_; - grpc_fd* fd_; - grpc_pollset_set* driver_pollset_set_; + const std::string name_; + const ares_socket_t as_; + grpc_fd* fd_ ABSL_GUARDED_BY(&grpc_ares_request::mu); + grpc_pollset_set* driver_pollset_set_ ABSL_GUARDED_BY(&grpc_ares_request::mu); }; class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set, - std::shared_ptr /*work_serializer*/) override { + ares_socket_t as, grpc_pollset_set* driver_pollset_set) override { return new GrpcPolledFdPosix(as, driver_pollset_set); } void ConfigureAresChannelLocked(ares_channel /*channel*/) override {} }; -std::unique_ptr NewGrpcPolledFdFactory( - std::shared_ptr work_serializer) { - (void)work_serializer; +std::unique_ptr NewGrpcPolledFdFactory(Mutex* /* mu */) { return absl::make_unique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index a5c7a716950..eab25f30808 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -41,7 +41,6 @@ #include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_windows.h" -#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" /* TODO(apolcyn): remove this hack after fixing upstream. @@ -100,10 +99,9 @@ class GrpcPolledFdWindows { WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, }; - GrpcPolledFdWindows(ares_socket_t as, - std::shared_ptr work_serializer, - int address_family, int socket_type) - : work_serializer_(std::move(work_serializer)), + GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family, + int socket_type) + : mu_(mu), read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), tcp_write_state_(WRITE_IDLE), @@ -260,7 +258,7 @@ class GrpcPolledFdWindows { return grpc_winsocket_wrapped_socket(winsocket_); } - const char* GetName() { return name_.c_str(); } + const char* GetName() const { return name_.c_str(); } ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_socket_t data_len, int flags, @@ -422,12 +420,8 @@ class GrpcPolledFdWindows { static void OnTcpConnect(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* grpc_polled_fd = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - grpc_polled_fd->work_serializer_->Run( - [grpc_polled_fd, error]() { - grpc_polled_fd->OnTcpConnectLocked(error); - }, - DEBUG_LOCATION); + MutexLock lock(grpc_polled_fd->mu_); + grpc_polled_fd->OnTcpConnectLocked(error); } void OnTcpConnectLocked(grpc_error_handle error) { @@ -469,7 +463,6 @@ class GrpcPolledFdWindows { if (pending_continue_register_for_on_writeable_locked_) { ContinueRegisterForOnWriteableLocked(); } - GRPC_ERROR_UNREF(error); } int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, @@ -575,10 +568,9 @@ class GrpcPolledFdWindows { static void OnIocpReadable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - polled_fd->work_serializer_->Run( - [polled_fd, error]() { polled_fd->OnIocpReadableLocked(error); }, - DEBUG_LOCATION); + (void)GRPC_ERROR_REF(error); + MutexLock lock(polled_fd->mu_); + polled_fd->OnIocpReadableLocked(error); } // TODO(apolcyn): improve this error handling to be less conversative. @@ -620,10 +612,9 @@ class GrpcPolledFdWindows { static void OnIocpWriteable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); - (void)GRPC_ERROR_REF(error); // error owned by lambda - polled_fd->work_serializer_->Run( - [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); }, - DEBUG_LOCATION); + (void)GRPC_ERROR_REF(error); + MutexLock lock(polled_fd->mu_); + polled_fd->OnIocpWriteableLocked(error); } void OnIocpWriteableLocked(grpc_error_handle error) { @@ -658,7 +649,7 @@ class GrpcPolledFdWindows { void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; } private: - std::shared_ptr work_serializer_; + Mutex* mu_; char recv_from_source_addr_[200]; ares_socklen_t recv_from_source_addr_len_; grpc_slice read_buf_; @@ -671,7 +662,7 @@ class GrpcPolledFdWindows { grpc_winsocket* winsocket_; // tcp_write_state_ is only used on TCP GrpcPolledFds WriteState tcp_write_state_; - std::string name_; + const std::string name_; bool gotten_into_driver_list_; int address_family_; int socket_type_; @@ -700,8 +691,7 @@ struct SockToPolledFdEntry { * with a GrpcPolledFdWindows factory and event driver */ class SockToPolledFdMap { public: - explicit SockToPolledFdMap(std::shared_ptr work_serializer) - : work_serializer_(std::move(work_serializer)) {} + explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {} ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); } @@ -759,7 +749,7 @@ class SockToPolledFdMap { } grpc_tcp_set_non_block(s); GrpcPolledFdWindows* polled_fd = - new GrpcPolledFdWindows(s, map->work_serializer_, af, type); + new GrpcPolledFdWindows(s, map->mu_, af, type); GRPC_CARES_TRACE_LOG( "fd:|%s| created with params af:%d type:%d protocol:%d", polled_fd->GetName(), af, type, protocol); @@ -814,8 +804,8 @@ class SockToPolledFdMap { } private: + Mutex* mu_; SockToPolledFdEntry* head_ = nullptr; - std::shared_ptr work_serializer_; }; const struct ares_socket_functions custom_ares_sock_funcs = { @@ -856,21 +846,18 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd { return wrapped_->GetWrappedAresSocketLocked(); } - const char* GetName() override { return wrapped_->GetName(); } + const char* GetName() const override { return wrapped_->GetName(); } private: - GrpcPolledFdWindows* wrapped_; + GrpcPolledFdWindows* const wrapped_; }; class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { public: - explicit GrpcPolledFdFactoryWindows( - std::shared_ptr work_serializer) - : sock_to_polled_fd_map_(std::move(work_serializer)) {} + explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {} GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set, - std::shared_ptr work_serializer) override { + ares_socket_t as, grpc_pollset_set* driver_pollset_set) override { GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as); // Set a flag so that the virtual socket "close" method knows it // doesn't need to call ShutdownLocked, since now the driver will. @@ -887,10 +874,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { SockToPolledFdMap sock_to_polled_fd_map_; }; -std::unique_ptr NewGrpcPolledFdFactory( - std::shared_ptr work_serializer) { - return absl::make_unique( - std::move(work_serializer)); +std::unique_ptr NewGrpcPolledFdFactory(Mutex* mu) { + return absl::make_unique(mu); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index fe5a42af8a2..03ae3c54aa8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -57,55 +57,63 @@ grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver"); typedef struct fd_node { + /* default constructor exists only for linked list manipulation */ + fd_node() : ev_driver(nullptr) {} + + explicit fd_node(grpc_ares_ev_driver* ev_driver) : ev_driver(ev_driver) {} + /** the owner of this fd node */ - grpc_ares_ev_driver* ev_driver; + grpc_ares_ev_driver* const ev_driver; /** a closure wrapping on_readable_locked, which should be invoked when the grpc_fd in this node becomes readable. */ - grpc_closure read_closure; + grpc_closure read_closure ABSL_GUARDED_BY(&grpc_ares_request::mu); /** a closure wrapping on_writable_locked, which should be invoked when the grpc_fd in this node becomes writable. */ - grpc_closure write_closure; + grpc_closure write_closure ABSL_GUARDED_BY(&grpc_ares_request::mu); /** next fd node in the list */ - struct fd_node* next; + struct fd_node* next ABSL_GUARDED_BY(&grpc_ares_request::mu); /** wrapped fd that's polled by grpc's poller for the current platform */ - grpc_core::GrpcPolledFd* grpc_polled_fd; + grpc_core::GrpcPolledFd* grpc_polled_fd + ABSL_GUARDED_BY(&grpc_ares_request::mu); /** if the readable closure has been registered */ - bool readable_registered; + bool readable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu); /** if the writable closure has been registered */ - bool writable_registered; + bool writable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu); /** if the fd has been shutdown yet from grpc iomgr perspective */ - bool already_shutdown; + bool already_shutdown ABSL_GUARDED_BY(&grpc_ares_request::mu); } fd_node; struct grpc_ares_ev_driver { + explicit grpc_ares_ev_driver(grpc_ares_request* request) : request(request) {} + /** the ares_channel owned by this event driver */ - ares_channel channel; + ares_channel channel ABSL_GUARDED_BY(&grpc_ares_request::mu); /** pollset set for driving the IO events of the channel */ - grpc_pollset_set* pollset_set; + grpc_pollset_set* pollset_set ABSL_GUARDED_BY(&grpc_ares_request::mu); /** refcount of the event driver */ gpr_refcount refs; - /** work_serializer to synchronize c-ares and I/O callbacks on */ - std::shared_ptr work_serializer; /** a list of grpc_fd that this event driver is currently using. */ - fd_node* fds; + fd_node* fds ABSL_GUARDED_BY(&grpc_ares_request::mu); /** is this event driver being shut down */ - bool shutting_down; + bool shutting_down ABSL_GUARDED_BY(&grpc_ares_request::mu); /** request object that's using this ev driver */ - grpc_ares_request* request; + grpc_ares_request* const request; /** Owned by the ev_driver. Creates new GrpcPolledFd's */ - std::unique_ptr polled_fd_factory; + std::unique_ptr polled_fd_factory + ABSL_GUARDED_BY(&grpc_ares_request::mu); /** query timeout in milliseconds */ - int query_timeout_ms; + int query_timeout_ms ABSL_GUARDED_BY(&grpc_ares_request::mu); /** alarm to cancel active queries */ - grpc_timer query_timeout; + grpc_timer query_timeout ABSL_GUARDED_BY(&grpc_ares_request::mu); /** cancels queries on a timeout */ - grpc_closure on_timeout_locked; + grpc_closure on_timeout_locked ABSL_GUARDED_BY(&grpc_ares_request::mu); /** alarm to poll ares_process on in case fd events don't happen */ - grpc_timer ares_backup_poll_alarm; + grpc_timer ares_backup_poll_alarm ABSL_GUARDED_BY(&grpc_ares_request::mu); /** polls ares_process on a periodic timer */ - grpc_closure on_ares_backup_poll_alarm_locked; + grpc_closure on_ares_backup_poll_alarm_locked + ABSL_GUARDED_BY(&grpc_ares_request::mu); }; // TODO(apolcyn): make grpc_ares_hostbyname_request a sub-class @@ -125,8 +133,10 @@ typedef struct grpc_ares_hostbyname_request { const char* qtype; } grpc_ares_hostbyname_request; -static void grpc_ares_request_ref_locked(grpc_ares_request* r); -static void grpc_ares_request_unref_locked(grpc_ares_request* r); +static void grpc_ares_request_ref_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); +static void grpc_ares_request_unref_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); // TODO(apolcyn): as a part of C++-ification, find a way to // organize per-query and per-resolution information in such a way @@ -153,14 +163,19 @@ class GrpcAresQuery { }; static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( - grpc_ares_ev_driver* ev_driver) { + grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, ev_driver); gpr_ref(&ev_driver->refs); return ev_driver; } -static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { +static void grpc_ares_complete_request_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu); + +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, ev_driver); if (gpr_unref(&ev_driver->refs)) { @@ -173,17 +188,19 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { } } -static void fd_node_destroy_locked(fd_node* fdn) { +static void fd_node_destroy_locked(fd_node* fdn) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, fdn->grpc_polled_fd->GetName()); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); delete fdn->grpc_polled_fd; - gpr_free(fdn); + delete fdn; } -static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { if (!fdn->already_shutdown) { fdn->already_shutdown = true; fdn->grpc_polled_fd->ShutdownLocked( @@ -192,7 +209,8 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } void grpc_ares_ev_driver_on_queries_complete_locked( - grpc_ares_ev_driver* ev_driver) { + grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { // We mark the event driver as being shut down. // grpc_ares_notify_on_event_locked will shut down any remaining // fds. @@ -202,7 +220,8 @@ void grpc_ares_ev_driver_on_queries_complete_locked( grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { @@ -213,7 +232,8 @@ void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { // Search fd in the fd_node list head. This is an O(n) search, the max possible // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { fd_node phony_head; phony_head.next = *head; fd_node* node = &phony_head; @@ -230,7 +250,8 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { } static grpc_millis calculate_next_ares_backup_poll_alarm_ms( - grpc_ares_ev_driver* driver) { + grpc_ares_ev_driver* driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { // An alternative here could be to use ares_timeout to try to be more // accurate, but that would require using "struct timeval"'s, which just makes // things a bit more complicated. So just poll every second, as suggested @@ -244,8 +265,9 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms( grpc_core::ExecCtx::Get()->Now(); } -static void on_timeout_locked(grpc_ares_ev_driver* driver, - grpc_error_handle error) { +static void on_timeout(void* arg, grpc_error_handle error) { + grpc_ares_ev_driver* driver = static_cast(arg); + grpc_core::MutexLock lock(&driver->request->mu); GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " "err=%s", @@ -255,28 +277,10 @@ static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_ares_ev_driver_shutdown_locked(driver); } grpc_ares_ev_driver_unref(driver); - GRPC_ERROR_UNREF(error); } -static void on_timeout(void* arg, grpc_error_handle error) { - grpc_ares_ev_driver* driver = static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - driver->work_serializer->Run( - [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); -} - -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); - -static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, - grpc_error_handle error); - -static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { - grpc_ares_ev_driver* driver = static_cast(arg); - (void)GRPC_ERROR_REF(error); - driver->work_serializer->Run( - [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, - DEBUG_LOCATION); -} +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu); /* In case of non-responsive DNS servers, dropped packets, etc., c-ares has * intelligent timeout and retry logic, which we can take advantage of by @@ -286,8 +290,9 @@ static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { * b) when some time has passed without fd events having happened * For the latter, we use this backup poller. Also see * https://github.com/grpc/grpc/pull/17688 description for more details. */ -static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, - grpc_error_handle error) { +static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { + grpc_ares_ev_driver* driver = static_cast(arg); + grpc_core::MutexLock lock(&driver->request->mu); GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " "driver->shutting_down=%d. " @@ -325,10 +330,11 @@ static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, grpc_ares_notify_on_event_locked(driver); } grpc_ares_ev_driver_unref(driver); - GRPC_ERROR_UNREF(error); } -static void on_readable_locked(fd_node* fdn, grpc_error_handle error) { +static void on_readable(void* arg, grpc_error_handle error) { + fd_node* fdn = static_cast(arg); + grpc_core::MutexLock lock(&fdn->ev_driver->request->mu); GPR_ASSERT(fdn->readable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -350,17 +356,11 @@ static void on_readable_locked(fd_node* fdn, grpc_error_handle error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); - GRPC_ERROR_UNREF(error); } -static void on_readable(void* arg, grpc_error_handle error) { +static void on_writable(void* arg, grpc_error_handle error) { fd_node* fdn = static_cast(arg); - (void)GRPC_ERROR_REF(error); /* ref owned by lambda */ - fdn->ev_driver->work_serializer->Run( - [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); -} - -static void on_writable_locked(fd_node* fdn, grpc_error_handle error) { + grpc_core::MutexLock lock(&fdn->ev_driver->request->mu); GPR_ASSERT(fdn->writable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -380,19 +380,12 @@ static void on_writable_locked(fd_node* fdn, grpc_error_handle error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); - GRPC_ERROR_UNREF(error); -} - -static void on_writable(void* arg, grpc_error_handle error) { - fd_node* fdn = static_cast(arg); - (void)GRPC_ERROR_REF(error); /* ref owned by lambda */ - fdn->ev_driver->work_serializer->Run( - [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); } // Get the file descriptors used by the ev_driver's ares channel, register // driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { fd_node* new_list = nullptr; if (!ev_driver->shutting_down) { ares_socket_t socks[ARES_GETSOCK_MAXNUM]; @@ -404,13 +397,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); // Create a new fd_node if sock[i] is not in the fd_node list. if (fdn == nullptr) { - fdn = static_cast(gpr_malloc(sizeof(fd_node))); + fdn = new fd_node(ev_driver); fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( - socks[i], ev_driver->pollset_set, ev_driver->work_serializer); + socks[i], ev_driver->pollset_set); GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, fdn->grpc_polled_fd->GetName()); - fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; fdn->already_shutdown = false; @@ -466,7 +458,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { ev_driver->fds = new_list; } -void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { grpc_ares_notify_on_event_locked(ev_driver); // Initialize overall DNS resolution timeout alarm grpc_millis timeout = @@ -501,10 +494,9 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) = grpc_error_handle grpc_ares_ev_driver_create_locked( grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, - int query_timeout_ms, - std::shared_ptr work_serializer, - grpc_ares_request* request) { - *ev_driver = new grpc_ares_ev_driver(); + int query_timeout_ms, grpc_ares_request* request) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(request->mu) { + *ev_driver = new grpc_ares_ev_driver(request); ares_options opts; memset(&opts, 0, sizeof(opts)); opts.flags |= ARES_FLAG_STAYOPEN; @@ -514,17 +506,15 @@ grpc_error_handle grpc_ares_ev_driver_create_locked( if (status != ARES_SUCCESS) { grpc_error_handle err = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( "Failed to init ares channel. C-ares error: ", ares_strerror(status))); - gpr_free(*ev_driver); + delete *ev_driver; return err; } - (*ev_driver)->work_serializer = std::move(work_serializer); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; (*ev_driver)->shutting_down = false; - (*ev_driver)->request = request; (*ev_driver)->polled_fd_factory = - grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer); + grpc_core::NewGrpcPolledFdFactory(&(*ev_driver)->request->mu); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); (*ev_driver)->query_timeout_ms = query_timeout_ms; @@ -570,18 +560,21 @@ void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r, } } -static void grpc_ares_request_ref_locked(grpc_ares_request* r) { +static void grpc_ares_request_ref_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { r->pending_queries++; } -static void grpc_ares_request_unref_locked(grpc_ares_request* r) { +static void grpc_ares_request_unref_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { r->pending_queries--; if (r->pending_queries == 0u) { grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver); } } -void grpc_ares_complete_request_locked(grpc_ares_request* r) { +void grpc_ares_complete_request_locked(grpc_ares_request* r) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { /* Invoke on_done callback and destroy the request */ r->ev_driver = nullptr; @@ -606,7 +599,8 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) { * qtype must outlive it. */ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, const char* host, uint16_t port, - bool is_balancer, const char* qtype) { + bool is_balancer, const char* qtype) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) { GRPC_CARES_TRACE_LOG( "request:%p create_hostbyname_request_locked host:%s port:%d " "is_balancer:%d qtype:%s", @@ -621,15 +615,18 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( return hr; } -static void destroy_hostbyname_request_locked( - grpc_ares_hostbyname_request* hr) { +static void destroy_hostbyname_request_locked(grpc_ares_hostbyname_request* hr) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(hr->parent_request->mu) { grpc_ares_request_unref_locked(hr->parent_request); gpr_free(hr->host); delete hr; } static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, - struct hostent* hostent) { + struct hostent* hostent) + ABSL_NO_THREAD_SAFETY_ANALYSIS { + // This callback is invoked from the c-ares library, so disable thread safety + // analysis. Note that we are guaranteed to be holding r->mu, though. grpc_ares_hostbyname_request* hr = static_cast(arg); grpc_ares_request* r = hr->parent_request; @@ -702,7 +699,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, } static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* abuf, int alen) { + unsigned char* abuf, + int alen) ABSL_NO_THREAD_SAFETY_ANALYSIS { + // This callback is invoked from the c-ares library, so disable thread safety + // analysis. Note that we are guaranteed to be holding r->mu, though. GrpcAresQuery* q = static_cast(arg); grpc_ares_request* r = q->parent_request(); if (status == ARES_SUCCESS) { @@ -749,7 +749,10 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, static const char g_service_config_attribute_prefix[] = "grpc_config="; static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, - unsigned char* buf, int len) { + unsigned char* buf, + int len) ABSL_NO_THREAD_SAFETY_ANALYSIS { + // This callback is invoked from the c-ares library, so disable thread safety + // analysis. Note that we are guaranteed to be holding r->mu, though. GrpcAresQuery* q = static_cast(arg); std::unique_ptr query_deleter(q); grpc_ares_request* r = q->parent_request(); @@ -806,8 +809,7 @@ fail: void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, - int query_timeout_ms, - std::shared_ptr work_serializer) { + int query_timeout_ms) ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { grpc_error_handle error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; /* parse name, splitting it into host and port parts */ @@ -829,8 +831,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( port = default_port; } error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, - query_timeout_ms, - std::move(work_serializer), r); + query_timeout_ms, r); if (error != GRPC_ERROR_NONE) goto error_cleanup; // If dns_server is specified, use it. if (dns_server != nullptr && dns_server[0] != '\0') { @@ -1029,21 +1030,21 @@ static bool grpc_ares_maybe_resolve_localhost_manually_locked( } #endif /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */ -static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( +static grpc_ares_request* grpc_dns_lookup_ares_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, std::unique_ptr* balancer_addrs, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) { + char** service_config_json, int query_timeout_ms) { grpc_ares_request* r = new grpc_ares_request(); + grpc_core::MutexLock lock(&r->mu); r->ev_driver = nullptr; r->on_done = on_done; r->addresses_out = addrs; r->balancer_addresses_out = balancer_addrs; r->service_config_json_out = service_config_json; GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, " + "request:%p c-ares grpc_dns_lookup_ares_impl name=%s, " "default_port=%s", r, name, default_port); // Early out if the target is an ipv4 or ipv6 literal. @@ -1066,29 +1067,28 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( } // Look up name using c-ares lib. grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( - r, dns_server, name, default_port, interested_parties, query_timeout_ms, - std::move(work_serializer)); + r, dns_server, name, default_port, interested_parties, query_timeout_ms); return r; } -grpc_ares_request* (*grpc_dns_lookup_ares_locked)( +grpc_ares_request* (*grpc_dns_lookup_ares)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, std::unique_ptr* balancer_addrs, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) = - grpc_dns_lookup_ares_locked_impl; + char** service_config_json, + int query_timeout_ms) = grpc_dns_lookup_ares_impl; -static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { +static void grpc_cancel_ares_request_impl(grpc_ares_request* r) { GPR_ASSERT(r != nullptr); + grpc_core::MutexLock lock(&r->mu); if (r->ev_driver != nullptr) { grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } -void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) = - grpc_cancel_ares_request_locked_impl; +void (*grpc_cancel_ares_request)(grpc_ares_request* r) = + grpc_cancel_ares_request_impl; // ares_library_init and ares_library_cleanup are currently no-op except under // Windows. Calling them may cause race conditions when other parts of the @@ -1114,8 +1114,6 @@ void grpc_ares_cleanup(void) {} */ typedef struct grpc_resolve_address_ares_request { - /* work_serializer that queries and related callbacks run under */ - std::shared_ptr work_serializer; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving addresses */ @@ -1123,8 +1121,8 @@ typedef struct grpc_resolve_address_ares_request { /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_resolve_address_done, which should be invoked when - the grpc_dns_lookup_ares_locked operation is done. */ - grpc_closure on_dns_lookup_done_locked; + the grpc_dns_lookup_ares operation is done. */ + grpc_closure on_dns_lookup_done; /* target name */ const char* name; /* default port to use if none is specified */ @@ -1135,8 +1133,9 @@ typedef struct grpc_resolve_address_ares_request { grpc_ares_request* ares_request = nullptr; } grpc_resolve_address_ares_request; -static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, - grpc_error_handle error) { +static void on_dns_lookup_done(void* arg, grpc_error_handle error) { + grpc_resolve_address_ares_request* r = + static_cast(arg); delete r->ares_request; grpc_resolved_addresses** resolved_addresses = r->addrs_out; if (r->addresses == nullptr || r->addresses->empty()) { @@ -1153,30 +1152,11 @@ static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, sizeof(grpc_resolved_address)); } } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, + GRPC_ERROR_REF(error)); delete r; } -static void on_dns_lookup_done(void* arg, grpc_error_handle error) { - grpc_resolve_address_ares_request* r = - static_cast(arg); - (void)GRPC_ERROR_REF(error); // ref owned by lambda - r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, - DEBUG_LOCATION); -} - -static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { - grpc_resolve_address_ares_request* r = - static_cast(arg); - GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r, - grpc_schedule_on_exec_ctx); - r->ares_request = grpc_dns_lookup_ares_locked( - nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, - &r->on_dns_lookup_done_locked, &r->addresses, - nullptr /* balancer_addresses */, nullptr /* service_config_json */, - GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->work_serializer); -} - static void grpc_resolve_address_ares_impl(const char* name, const char* default_port, grpc_pollset_set* interested_parties, @@ -1184,15 +1164,18 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolved_addresses** addrs) { grpc_resolve_address_ares_request* r = new grpc_resolve_address_ares_request(); - r->work_serializer = std::make_shared(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; r->name = name; r->default_port = default_port; r->interested_parties = interested_parties; - r->work_serializer->Run( - [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); }, - DEBUG_LOCATION); + GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done, r, + grpc_schedule_on_exec_ctx); + r->ares_request = grpc_dns_lookup_ares( + nullptr /* dns_server */, name, default_port, interested_parties, + &r->on_dns_lookup_done, &r->addresses, nullptr /* balancer_addresses */, + nullptr /* service_config_json */, + GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS); } void (*grpc_resolve_address_ares)( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index cfc0e2b60fa..0e91cd25024 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -45,23 +45,28 @@ extern grpc_core::TraceFlag grpc_trace_cares_resolver; typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; struct grpc_ares_request { + /** synchronizes access to this request, and also to associated + * ev_driver and fd_node objects */ + grpc_core::Mutex mu; /** indicates the DNS server to use, if specified */ - struct ares_addr_port_node dns_server_addr; + struct ares_addr_port_node dns_server_addr ABSL_GUARDED_BY(mu); /** following members are set in grpc_resolve_address_ares_impl */ /** closure to call when the request completes */ - grpc_closure* on_done = nullptr; + grpc_closure* on_done ABSL_GUARDED_BY(mu) = nullptr; /** the pointer to receive the resolved addresses */ - std::unique_ptr* addresses_out; + std::unique_ptr* addresses_out + ABSL_GUARDED_BY(mu); /** the pointer to receive the resolved balancer addresses */ - std::unique_ptr* balancer_addresses_out; + std::unique_ptr* balancer_addresses_out + ABSL_GUARDED_BY(mu); /** the pointer to receive the service config in JSON */ - char** service_config_json_out = nullptr; + char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr; /** the evernt driver used by this request */ - grpc_ares_ev_driver* ev_driver = nullptr; + grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr; /** number of ongoing queries */ - size_t pending_queries = 0; + size_t pending_queries ABSL_GUARDED_BY(mu) = 0; /** the errors explaining query failures, appended to in query callbacks */ - grpc_error_handle error = GRPC_ERROR_NONE; + grpc_error_handle error ABSL_GUARDED_BY(mu) = GRPC_ERROR_NONE; }; /* Asynchronously resolve \a name. Use \a default_port if a port isn't @@ -83,16 +88,15 @@ extern void (*grpc_resolve_address_ares)(const char* name, scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. The returned grpc_ares_request object is owned by the caller and it is safe to free after on_done is called back. */ -extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( +extern grpc_ares_request* (*grpc_dns_lookup_ares)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer); + char** service_config_json, int query_timeout_ms); /* Cancel the pending grpc_ares_request \a request */ -extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); +extern void (*grpc_cancel_ares_request)(grpc_ares_request* request); /* Initialize gRPC ares wrapper. Must be called at least once before grpc_resolve_address_ares(). */ @@ -103,10 +107,6 @@ grpc_error_handle grpc_ares_init(void); it has been called the same number of times as grpc_ares_init(). */ void grpc_ares_cleanup(void); -/** Schedules the desired callback for request completion - * and destroys the grpc_ares_request */ -void grpc_ares_complete_request_locked(grpc_ares_request* request); - /* Indicates whether or not AAAA queries should be attempted. */ /* E.g., return false if ipv6 is known to not be available. */ bool grpc_ares_query_ipv6(); diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index 9486ad9ea05..873b34f0317 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -60,13 +60,12 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable test_resolver = {my_resolve_address, nullptr}; -static grpc_ares_request* my_dns_lookup_ares_locked( +static grpc_ares_request* my_dns_lookup_ares( const char* /*dns_server*/, const char* addr, const char* /*default_port*/, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - char** /*service_config_json*/, int /*query_timeout_ms*/, - std::shared_ptr /*combiner*/) { // NOLINT + char** /*service_config_json*/, int /*query_timeout_ms*/) { // NOLINT gpr_mu_lock(&g_mu); GPR_ASSERT(0 == strcmp("test", addr)); grpc_error_handle error = GRPC_ERROR_NONE; @@ -86,7 +85,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked( return nullptr; } -static void my_cancel_ares_request_locked(grpc_ares_request* request) { +static void my_cancel_ares_request(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } @@ -155,8 +154,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_set_resolver_impl(&test_resolver); - grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; - grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; + grpc_dns_lookup_ares = my_dns_lookup_ares; + grpc_cancel_ares_request = my_cancel_ares_request; { grpc_core::ExecCtx exec_ctx; diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 66bdea2e5b2..35c6b8a88ea 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -37,13 +37,12 @@ static grpc_address_resolver_vtable* default_resolve_address; static std::shared_ptr* g_work_serializer; -static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( +static grpc_ares_request* (*g_default_dns_lookup_ares)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer); + char** service_config_json, int query_timeout_ms); // Counter incremented by test_resolve_address_impl indicating the number of // times a system-level resolution has happened. @@ -96,17 +95,15 @@ static grpc_error_handle test_blocking_resolve_address_impl( static grpc_address_resolver_vtable test_resolver = { test_resolve_address_impl, test_blocking_resolve_address_impl}; -static grpc_ares_request* test_dns_lookup_ares_locked( +static grpc_ares_request* test_dns_lookup_ares( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) { - grpc_ares_request* result = g_default_dns_lookup_ares_locked( + char** service_config_json, int query_timeout_ms) { + grpc_ares_request* result = g_default_dns_lookup_ares( dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, - addresses, balancer_addresses, service_config_json, query_timeout_ms, - std::move(work_serializer)); + addresses, balancer_addresses, service_config_json, query_timeout_ms); ++g_resolution_count; static grpc_millis last_resolution_time = 0; grpc_millis now = @@ -335,8 +332,8 @@ int main(int argc, char** argv) { auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; - g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; - grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; + g_default_dns_lookup_ares = grpc_dns_lookup_ares; + grpc_dns_lookup_ares = test_dns_lookup_ares; default_resolve_address = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index cf2769707b3..a4fdf316e25 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -137,13 +137,12 @@ void my_resolve_address(const char* addr, const char* /*default_port*/, static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address, nullptr}; -grpc_ares_request* my_dns_lookup_ares_locked( +grpc_ares_request* my_dns_lookup_ares( const char* /*dns_server*/, const char* addr, const char* /*default_port*/, grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* /*balancer_addresses*/, - char** /*service_config_json*/, int /*query_timeout*/, - std::shared_ptr /*combiner*/) { + char** /*service_config_json*/, int /*query_timeout*/) { addr_req* r = new addr_req(); r->addr = gpr_strdup(addr); r->on_done = on_done; @@ -155,7 +154,7 @@ grpc_ares_request* my_dns_lookup_ares_locked( return nullptr; } -static void my_cancel_ares_request_locked(grpc_ares_request* request) { +static void my_cancel_ares_request(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } //////////////////////////////////////////////////////////////////////////////// @@ -730,8 +729,8 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_core::Executor::SetThreadingAll(false); } grpc_set_resolver_impl(&fuzzer_resolver); - grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; - grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; + grpc_dns_lookup_ares = my_dns_lookup_ares; + grpc_cancel_ares_request = my_cancel_ares_request; GPR_ASSERT(g_channel == nullptr); GPR_ASSERT(g_server == nullptr); diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 4d4509bc53a..9924ab61656 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -43,15 +43,14 @@ static void* tag(intptr_t t) { return reinterpret_cast(t); } static gpr_mu g_mu; static int g_resolve_port = -1; -static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( +static grpc_ares_request* (*iomgr_dns_lookup_ares)( const char* dns_server, const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, - std::shared_ptr combiner); + char** service_config_json, int query_timeout_ms); -static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); +static void (*iomgr_cancel_ares_request)(grpc_ares_request* request); static void set_resolve_port(int port) { gpr_mu_lock(&g_mu); @@ -101,18 +100,16 @@ static grpc_error_handle my_blocking_resolve_address( static grpc_address_resolver_vtable test_resolver = { my_resolve_address, my_blocking_resolve_address}; -static grpc_ares_request* my_dns_lookup_ares_locked( +static grpc_ares_request* my_dns_lookup_ares( const char* dns_server, const char* addr, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, std::unique_ptr* balancer_addresses, - char** service_config_json, int query_timeout_ms, - std::shared_ptr work_serializer) { + char** service_config_json, int query_timeout_ms) { if (0 != strcmp(addr, "test")) { - return iomgr_dns_lookup_ares_locked( + return iomgr_dns_lookup_ares( dns_server, addr, default_port, interested_parties, on_done, addresses, - balancer_addresses, service_config_json, query_timeout_ms, - std::move(work_serializer)); + balancer_addresses, service_config_json, query_timeout_ms); } grpc_error_handle error = GRPC_ERROR_NONE; @@ -134,9 +131,9 @@ static grpc_ares_request* my_dns_lookup_ares_locked( return nullptr; } -static void my_cancel_ares_request_locked(grpc_ares_request* request) { +static void my_cancel_ares_request(grpc_ares_request* request) { if (request != nullptr) { - iomgr_cancel_ares_request_locked(request); + iomgr_cancel_ares_request(request); } } @@ -152,10 +149,10 @@ int main(int argc, char** argv) { grpc_init(); default_resolver = grpc_resolve_address_impl; grpc_set_resolver_impl(&test_resolver); - iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; - iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked; - grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; - grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; + iomgr_dns_lookup_ares = grpc_dns_lookup_ares; + iomgr_cancel_ares_request = grpc_cancel_ares_request; + grpc_dns_lookup_ares = my_dns_lookup_ares; + grpc_cancel_ares_request = my_cancel_ares_request; int was_cancelled1; int was_cancelled2;