Revert "Replace work serializer with a mutex in c-ares resolver (#27858)" (#28324)

This reverts commit ec600f3973.
pull/28326/head
Craig Tiller 3 years ago committed by GitHub
parent 47f58f5b8d
commit b972b76816
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  2. 34
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  3. 34
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  4. 61
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  5. 271
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  6. 32
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  7. 11
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  8. 19
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  9. 11
      test/core/end2end/fuzzers/api_fuzzer.cc
  10. 29
      test/core/end2end/goaway_server_test.cc

@ -186,7 +186,7 @@ void AresDnsResolver::ShutdownLocked() {
grpc_timer_cancel(&next_resolution_timer_);
}
if (pending_request_ != nullptr) {
grpc_cancel_ares_request(pending_request_);
grpc_cancel_ares_request_locked(pending_request_);
}
}
@ -443,12 +443,12 @@ void AresDnsResolver::StartResolvingLocked() {
GPR_ASSERT(!resolving_);
resolving_ = true;
service_config_json_ = nullptr;
pending_request_ = grpc_dns_lookup_ares(
pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_.c_str(), name_to_resolve_.c_str(), kDefaultSecurePort,
interested_parties_, &on_resolved_, &addresses_,
enable_srv_queries_ ? &balancer_addresses_ : nullptr,
request_service_config_ ? &service_config_json_ : nullptr,
query_timeout_ms_);
query_timeout_ms_, work_serializer_);
last_resolution_timestamp_ = ExecCtx::Get()->Now();
GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p",
this, pending_request_);

@ -23,8 +23,8 @@
#include <ares.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
@ -36,23 +36,18 @@ class GrpcPolledFd {
public:
virtual ~GrpcPolledFd() {}
/* Called when c-ares library is interested and there's no pending callback */
virtual void RegisterForOnReadableLocked(grpc_closure* read_closure)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) = 0;
/* Called when c-ares library is interested and there's no pending callback */
virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) = 0;
/* Indicates if there is data left even after just being read from */
virtual bool IsFdStillReadableLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual bool IsFdStillReadableLocked() = 0;
/* Called once and only once. Must cause cancellation of any pending
* read/write callbacks. */
virtual void ShutdownLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual void ShutdownLocked(grpc_error_handle error) = 0;
/* Get the underlying ares_socket_t that this was created from */
virtual ares_socket_t GetWrappedAresSocketLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual ares_socket_t GetWrappedAresSocketLocked() = 0;
/* A unique name, for logging */
virtual const char* GetName() const = 0;
virtual const char* GetName() = 0;
};
/* A GrpcPolledFdFactory is 1-to-1 with and owned by the
@ -64,19 +59,14 @@ class GrpcPolledFdFactory {
virtual ~GrpcPolledFdFactory() {}
/* Creates a new wrapped fd for the current platform */
virtual GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
std::shared_ptr<WorkSerializer> work_serializer) = 0;
/* Optionally configures the ares channel after creation */
virtual void ConfigureAresChannelLocked(ares_channel channel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) = 0;
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
};
/* Creates a new polled fd factory.
* Note that even though ownership of mu is not transferred, the mu
* parameter is guaranteed to be alive for the the whole lifetime of
* the resulting GrpcPolledFdFactory as well as any GrpcPolledFd
* returned by the factory. */
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* mu);
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
std::shared_ptr<WorkSerializer> work_serializer);
} // namespace grpc_core

@ -59,53 +59,49 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
grpc_fd_orphan(fd_, nullptr, &phony_release_fd, "c-ares query finished");
}
void RegisterForOnReadableLocked(grpc_closure* read_closure)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override {
void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
grpc_fd_notify_on_read(fd_, read_closure);
}
void RegisterForOnWriteableLocked(grpc_closure* write_closure)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override {
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
grpc_fd_notify_on_write(fd_, write_closure);
}
bool IsFdStillReadableLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override {
bool IsFdStillReadableLocked() override {
size_t bytes_available = 0;
return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 &&
bytes_available > 0;
}
void ShutdownLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override {
void ShutdownLocked(grpc_error_handle error) override {
grpc_fd_shutdown(fd_, error);
}
ares_socket_t GetWrappedAresSocketLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) override {
return as_;
}
ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
const char* GetName() const override { return name_.c_str(); }
const char* GetName() override { return name_.c_str(); }
private:
const std::string name_;
const ares_socket_t as_;
grpc_fd* fd_ ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_pollset_set* driver_pollset_set_ ABSL_GUARDED_BY(&grpc_ares_request::mu);
std::string name_;
ares_socket_t as_;
grpc_fd* fd_;
grpc_pollset_set* driver_pollset_set_;
};
class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
public:
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set) override {
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
std::shared_ptr<WorkSerializer> /*work_serializer*/) override {
return new GrpcPolledFdPosix(as, driver_pollset_set);
}
void ConfigureAresChannelLocked(ares_channel /*channel*/) override {}
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* /* mu */) {
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
std::shared_ptr<WorkSerializer> work_serializer) {
(void)work_serializer;
return absl::make_unique<GrpcPolledFdFactoryPosix>();
}

@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/slice/slice_internal.h"
/* TODO(apolcyn): remove this hack after fixing upstream.
@ -99,9 +100,10 @@ class GrpcPolledFdWindows {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
};
GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family,
int socket_type)
: mu_(mu),
GrpcPolledFdWindows(ares_socket_t as,
std::shared_ptr<WorkSerializer> work_serializer,
int address_family, int socket_type)
: work_serializer_(std::move(work_serializer)),
read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()),
tcp_write_state_(WRITE_IDLE),
@ -258,7 +260,7 @@ class GrpcPolledFdWindows {
return grpc_winsocket_wrapped_socket(winsocket_);
}
const char* GetName() const { return name_.c_str(); }
const char* GetName() { return name_.c_str(); }
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int flags,
@ -420,8 +422,12 @@ class GrpcPolledFdWindows {
static void OnTcpConnect(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
MutexLock lock(grpc_polled_fd->mu_);
grpc_polled_fd->OnTcpConnectLocked(error);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
grpc_polled_fd->work_serializer_->Run(
[grpc_polled_fd, error]() {
grpc_polled_fd->OnTcpConnectLocked(error);
},
DEBUG_LOCATION);
}
void OnTcpConnectLocked(grpc_error_handle error) {
@ -463,6 +469,7 @@ class GrpcPolledFdWindows {
if (pending_continue_register_for_on_writeable_locked_) {
ContinueRegisterForOnWriteableLocked();
}
GRPC_ERROR_UNREF(error);
}
int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
@ -568,9 +575,10 @@ class GrpcPolledFdWindows {
static void OnIocpReadable(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
(void)GRPC_ERROR_REF(error);
MutexLock lock(polled_fd->mu_);
polled_fd->OnIocpReadableLocked(error);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
polled_fd->work_serializer_->Run(
[polled_fd, error]() { polled_fd->OnIocpReadableLocked(error); },
DEBUG_LOCATION);
}
// TODO(apolcyn): improve this error handling to be less conversative.
@ -612,9 +620,10 @@ class GrpcPolledFdWindows {
static void OnIocpWriteable(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
(void)GRPC_ERROR_REF(error);
MutexLock lock(polled_fd->mu_);
polled_fd->OnIocpWriteableLocked(error);
(void)GRPC_ERROR_REF(error); // error owned by lambda
polled_fd->work_serializer_->Run(
[polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); },
DEBUG_LOCATION);
}
void OnIocpWriteableLocked(grpc_error_handle error) {
@ -649,7 +658,7 @@ class GrpcPolledFdWindows {
void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
private:
Mutex* mu_;
std::shared_ptr<WorkSerializer> work_serializer_;
char recv_from_source_addr_[200];
ares_socklen_t recv_from_source_addr_len_;
grpc_slice read_buf_;
@ -662,7 +671,7 @@ class GrpcPolledFdWindows {
grpc_winsocket* winsocket_;
// tcp_write_state_ is only used on TCP GrpcPolledFds
WriteState tcp_write_state_;
const std::string name_;
std::string name_;
bool gotten_into_driver_list_;
int address_family_;
int socket_type_;
@ -691,7 +700,8 @@ struct SockToPolledFdEntry {
* with a GrpcPolledFdWindows factory and event driver */
class SockToPolledFdMap {
public:
explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {}
explicit SockToPolledFdMap(std::shared_ptr<WorkSerializer> work_serializer)
: work_serializer_(std::move(work_serializer)) {}
~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); }
@ -749,7 +759,7 @@ class SockToPolledFdMap {
}
grpc_tcp_set_non_block(s);
GrpcPolledFdWindows* polled_fd =
new GrpcPolledFdWindows(s, map->mu_, af, type);
new GrpcPolledFdWindows(s, map->work_serializer_, af, type);
GRPC_CARES_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
@ -804,8 +814,8 @@ class SockToPolledFdMap {
}
private:
Mutex* mu_;
SockToPolledFdEntry* head_ = nullptr;
std::shared_ptr<WorkSerializer> work_serializer_;
};
const struct ares_socket_functions custom_ares_sock_funcs = {
@ -846,18 +856,21 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
return wrapped_->GetWrappedAresSocketLocked();
}
const char* GetName() const override { return wrapped_->GetName(); }
const char* GetName() override { return wrapped_->GetName(); }
private:
GrpcPolledFdWindows* const wrapped_;
GrpcPolledFdWindows* wrapped_;
};
class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
public:
explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {}
explicit GrpcPolledFdFactoryWindows(
std::shared_ptr<WorkSerializer> work_serializer)
: sock_to_polled_fd_map_(std::move(work_serializer)) {}
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set) override {
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
std::shared_ptr<WorkSerializer> work_serializer) override {
GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
// Set a flag so that the virtual socket "close" method knows it
// doesn't need to call ShutdownLocked, since now the driver will.
@ -874,8 +887,10 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
SockToPolledFdMap sock_to_polled_fd_map_;
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* mu) {
return absl::make_unique<GrpcPolledFdFactoryWindows>(mu);
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
std::shared_ptr<WorkSerializer> work_serializer) {
return absl::make_unique<GrpcPolledFdFactoryWindows>(
std::move(work_serializer));
}
} // namespace grpc_core

@ -57,63 +57,55 @@ grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver");
typedef struct fd_node {
/* default constructor exists only for linked list manipulation */
fd_node() : ev_driver(nullptr) {}
explicit fd_node(grpc_ares_ev_driver* ev_driver) : ev_driver(ev_driver) {}
/** the owner of this fd node */
grpc_ares_ev_driver* const ev_driver;
grpc_ares_ev_driver* ev_driver;
/** a closure wrapping on_readable_locked, which should be
invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_closure read_closure;
/** a closure wrapping on_writable_locked, which should be
invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_closure write_closure;
/** next fd node in the list */
struct fd_node* next ABSL_GUARDED_BY(&grpc_ares_request::mu);
struct fd_node* next;
/** wrapped fd that's polled by grpc's poller for the current platform */
grpc_core::GrpcPolledFd* grpc_polled_fd
ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_core::GrpcPolledFd* grpc_polled_fd;
/** if the readable closure has been registered */
bool readable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu);
bool readable_registered;
/** if the writable closure has been registered */
bool writable_registered ABSL_GUARDED_BY(&grpc_ares_request::mu);
bool writable_registered;
/** if the fd has been shutdown yet from grpc iomgr perspective */
bool already_shutdown ABSL_GUARDED_BY(&grpc_ares_request::mu);
bool already_shutdown;
} fd_node;
struct grpc_ares_ev_driver {
explicit grpc_ares_ev_driver(grpc_ares_request* request) : request(request) {}
/** the ares_channel owned by this event driver */
ares_channel channel ABSL_GUARDED_BY(&grpc_ares_request::mu);
ares_channel channel;
/** pollset set for driving the IO events of the channel */
grpc_pollset_set* pollset_set ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_pollset_set* pollset_set;
/** refcount of the event driver */
gpr_refcount refs;
/** work_serializer to synchronize c-ares and I/O callbacks on */
std::shared_ptr<grpc_core::WorkSerializer> work_serializer;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds ABSL_GUARDED_BY(&grpc_ares_request::mu);
fd_node* fds;
/** is this event driver being shut down */
bool shutting_down ABSL_GUARDED_BY(&grpc_ares_request::mu);
bool shutting_down;
/** request object that's using this ev driver */
grpc_ares_request* const request;
grpc_ares_request* request;
/** Owned by the ev_driver. Creates new GrpcPolledFd's */
std::unique_ptr<grpc_core::GrpcPolledFdFactory> polled_fd_factory
ABSL_GUARDED_BY(&grpc_ares_request::mu);
std::unique_ptr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
/** query timeout in milliseconds */
int query_timeout_ms ABSL_GUARDED_BY(&grpc_ares_request::mu);
int query_timeout_ms;
/** alarm to cancel active queries */
grpc_timer query_timeout ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_timer query_timeout;
/** cancels queries on a timeout */
grpc_closure on_timeout_locked ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_closure on_timeout_locked;
/** alarm to poll ares_process on in case fd events don't happen */
grpc_timer ares_backup_poll_alarm ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_timer ares_backup_poll_alarm;
/** polls ares_process on a periodic timer */
grpc_closure on_ares_backup_poll_alarm_locked
ABSL_GUARDED_BY(&grpc_ares_request::mu);
grpc_closure on_ares_backup_poll_alarm_locked;
};
// TODO(apolcyn): make grpc_ares_hostbyname_request a sub-class
@ -133,10 +125,8 @@ typedef struct grpc_ares_hostbyname_request {
const char* qtype;
} grpc_ares_hostbyname_request;
static void grpc_ares_request_ref_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu);
static void grpc_ares_request_unref_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu);
static void grpc_ares_request_ref_locked(grpc_ares_request* r);
static void grpc_ares_request_unref_locked(grpc_ares_request* r);
// TODO(apolcyn): as a part of C++-ification, find a way to
// organize per-query and per-resolution information in such a way
@ -163,19 +153,14 @@ class GrpcAresQuery {
};
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
grpc_ares_ev_driver* ev_driver) {
GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request,
ev_driver);
gpr_ref(&ev_driver->refs);
return ev_driver;
}
static void grpc_ares_complete_request_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu);
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request,
ev_driver);
if (gpr_unref(&ev_driver->refs)) {
@ -188,19 +173,17 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
}
}
static void fd_node_destroy_locked(fd_node* fdn)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
static void fd_node_destroy_locked(fd_node* fdn) {
GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
delete fdn->grpc_polled_fd;
delete fdn;
gpr_free(fdn);
}
static void fd_node_shutdown_locked(fd_node* fdn, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
if (!fdn->already_shutdown) {
fdn->already_shutdown = true;
fdn->grpc_polled_fd->ShutdownLocked(
@ -209,8 +192,7 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason)
}
void grpc_ares_ev_driver_on_queries_complete_locked(
grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
grpc_ares_ev_driver* ev_driver) {
// We mark the event driver as being shut down.
// grpc_ares_notify_on_event_locked will shut down any remaining
// fds.
@ -220,8 +202,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked(
grpc_ares_ev_driver_unref(ev_driver);
}
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
@ -232,8 +213,7 @@ void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver)
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
fd_node phony_head;
phony_head.next = *head;
fd_node* node = &phony_head;
@ -250,8 +230,7 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as)
}
static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
grpc_ares_ev_driver* driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
grpc_ares_ev_driver* driver) {
// An alternative here could be to use ares_timeout to try to be more
// accurate, but that would require using "struct timeval"'s, which just makes
// things a bit more complicated. So just poll every second, as suggested
@ -265,9 +244,8 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
grpc_core::ExecCtx::Get()->Now();
}
static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
static void on_timeout_locked(grpc_ares_ev_driver* driver,
grpc_error_handle error) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. "
"err=%s",
@ -277,10 +255,28 @@ static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver_shutdown_locked(driver);
}
grpc_ares_ev_driver_unref(driver);
GRPC_ERROR_UNREF(error);
}
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu);
static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
driver->work_serializer->Run(
[driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION);
}
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver,
grpc_error_handle error);
static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
(void)GRPC_ERROR_REF(error);
driver->work_serializer->Run(
[driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); },
DEBUG_LOCATION);
}
/* In case of non-responsive DNS servers, dropped packets, etc., c-ares has
* intelligent timeout and retry logic, which we can take advantage of by
@ -290,9 +286,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
* b) when some time has passed without fd events having happened
* For the latter, we use this backup poller. Also see
* https://github.com/grpc/grpc/pull/17688 description for more details. */
static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver,
grpc_error_handle error) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. "
"driver->shutting_down=%d. "
@ -330,11 +325,10 @@ static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_notify_on_event_locked(driver);
}
grpc_ares_ev_driver_unref(driver);
GRPC_ERROR_UNREF(error);
}
static void on_readable(void* arg, grpc_error_handle error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_core::MutexLock lock(&fdn->ev_driver->request->mu);
static void on_readable_locked(fd_node* fdn, grpc_error_handle error) {
GPR_ASSERT(fdn->readable_registered);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
@ -356,11 +350,17 @@ static void on_readable(void* arg, grpc_error_handle error) {
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
GRPC_ERROR_UNREF(error);
}
static void on_writable(void* arg, grpc_error_handle error) {
static void on_readable(void* arg, grpc_error_handle error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_core::MutexLock lock(&fdn->ev_driver->request->mu);
(void)GRPC_ERROR_REF(error); /* ref owned by lambda */
fdn->ev_driver->work_serializer->Run(
[fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION);
}
static void on_writable_locked(fd_node* fdn, grpc_error_handle error) {
GPR_ASSERT(fdn->writable_registered);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
@ -380,12 +380,19 @@ static void on_writable(void* arg, grpc_error_handle error) {
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
GRPC_ERROR_UNREF(error);
}
static void on_writable(void* arg, grpc_error_handle error) {
fd_node* fdn = static_cast<fd_node*>(arg);
(void)GRPC_ERROR_REF(error); /* ref owned by lambda */
fdn->ev_driver->work_serializer->Run(
[fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION);
}
// Get the file descriptors used by the ev_driver's ares channel, register
// driver_closure with these filedescriptors.
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fd_node* new_list = nullptr;
if (!ev_driver->shutting_down) {
ares_socket_t socks[ARES_GETSOCK_MAXNUM];
@ -397,12 +404,13 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
fdn = new fd_node(ev_driver);
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
fdn->grpc_polled_fd =
ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
socks[i], ev_driver->pollset_set);
socks[i], ev_driver->pollset_set, ev_driver->work_serializer);
GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
@ -458,8 +466,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
ev_driver->fds = new_list;
}
void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
grpc_ares_notify_on_event_locked(ev_driver);
// Initialize overall DNS resolution timeout alarm
grpc_millis timeout =
@ -494,9 +501,10 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) =
grpc_error_handle grpc_ares_ev_driver_create_locked(
grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
int query_timeout_ms, grpc_ares_request* request)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(request->mu) {
*ev_driver = new grpc_ares_ev_driver(request);
int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer,
grpc_ares_request* request) {
*ev_driver = new grpc_ares_ev_driver();
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
@ -506,15 +514,17 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
if (status != ARES_SUCCESS) {
grpc_error_handle err = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"Failed to init ares channel. C-ares error: ", ares_strerror(status)));
delete *ev_driver;
gpr_free(*ev_driver);
return err;
}
(*ev_driver)->work_serializer = std::move(work_serializer);
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
(*ev_driver)->shutting_down = false;
(*ev_driver)->request = request;
(*ev_driver)->polled_fd_factory =
grpc_core::NewGrpcPolledFdFactory(&(*ev_driver)->request->mu);
grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer);
(*ev_driver)
->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
(*ev_driver)->query_timeout_ms = query_timeout_ms;
@ -560,21 +570,18 @@ void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r,
}
}
static void grpc_ares_request_ref_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
r->pending_queries++;
}
static void grpc_ares_request_unref_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
r->pending_queries--;
if (r->pending_queries == 0u) {
grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver);
}
}
void grpc_ares_complete_request_locked(grpc_ares_request* r)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
void grpc_ares_complete_request_locked(grpc_ares_request* r) {
/* Invoke on_done callback and destroy the
request */
r->ev_driver = nullptr;
@ -599,8 +606,7 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r)
* qtype must outlive it. */
static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, const char* host, uint16_t port,
bool is_balancer, const char* qtype)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) {
bool is_balancer, const char* qtype) {
GRPC_CARES_TRACE_LOG(
"request:%p create_hostbyname_request_locked host:%s port:%d "
"is_balancer:%d qtype:%s",
@ -615,18 +621,15 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
return hr;
}
static void destroy_hostbyname_request_locked(grpc_ares_hostbyname_request* hr)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(hr->parent_request->mu) {
static void destroy_hostbyname_request_locked(
grpc_ares_hostbyname_request* hr) {
grpc_ares_request_unref_locked(hr->parent_request);
gpr_free(hr->host);
delete hr;
}
static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
struct hostent* hostent)
ABSL_NO_THREAD_SAFETY_ANALYSIS {
// This callback is invoked from the c-ares library, so disable thread safety
// analysis. Note that we are guaranteed to be holding r->mu, though.
struct hostent* hostent) {
grpc_ares_hostbyname_request* hr =
static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request;
@ -699,10 +702,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
}
static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
unsigned char* abuf,
int alen) ABSL_NO_THREAD_SAFETY_ANALYSIS {
// This callback is invoked from the c-ares library, so disable thread safety
// analysis. Note that we are guaranteed to be holding r->mu, though.
unsigned char* abuf, int alen) {
GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg);
grpc_ares_request* r = q->parent_request();
if (status == ARES_SUCCESS) {
@ -749,10 +749,7 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
static const char g_service_config_attribute_prefix[] = "grpc_config=";
static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
unsigned char* buf,
int len) ABSL_NO_THREAD_SAFETY_ANALYSIS {
// This callback is invoked from the c-ares library, so disable thread safety
// analysis. Note that we are guaranteed to be holding r->mu, though.
unsigned char* buf, int len) {
GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg);
std::unique_ptr<GrpcAresQuery> query_deleter(q);
grpc_ares_request* r = q->parent_request();
@ -809,7 +806,8 @@ fail:
void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
grpc_ares_request* r, const char* dns_server, const char* name,
const char* default_port, grpc_pollset_set* interested_parties,
int query_timeout_ms) ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
/* parse name, splitting it into host and port parts */
@ -831,7 +829,8 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
port = default_port;
}
error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties,
query_timeout_ms, r);
query_timeout_ms,
std::move(work_serializer), r);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
// If dns_server is specified, use it.
if (dns_server != nullptr && dns_server[0] != '\0') {
@ -1030,21 +1029,21 @@ static bool grpc_ares_maybe_resolve_localhost_manually_locked(
}
#endif /* GRPC_ARES_RESOLVE_LOCALHOST_MANUALLY */
static grpc_ares_request* grpc_dns_lookup_ares_impl(
static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addrs,
char** service_config_json, int query_timeout_ms) {
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
grpc_ares_request* r = new grpc_ares_request();
grpc_core::MutexLock lock(&r->mu);
r->ev_driver = nullptr;
r->on_done = on_done;
r->addresses_out = addrs;
r->balancer_addresses_out = balancer_addrs;
r->service_config_json_out = service_config_json;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_ares_impl name=%s, "
"request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, "
"default_port=%s",
r, name, default_port);
// Early out if the target is an ipv4 or ipv6 literal.
@ -1067,28 +1066,29 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
}
// Look up name using c-ares lib.
grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
r, dns_server, name, default_port, interested_parties, query_timeout_ms);
r, dns_server, name, default_port, interested_parties, query_timeout_ms,
std::move(work_serializer));
return r;
}
grpc_ares_request* (*grpc_dns_lookup_ares)(
grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addrs,
char** service_config_json,
int query_timeout_ms) = grpc_dns_lookup_ares_impl;
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer) =
grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_impl(grpc_ares_request* r) {
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
GPR_ASSERT(r != nullptr);
grpc_core::MutexLock lock(&r->mu);
if (r->ev_driver != nullptr) {
grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
}
}
void (*grpc_cancel_ares_request)(grpc_ares_request* r) =
grpc_cancel_ares_request_impl;
void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) =
grpc_cancel_ares_request_locked_impl;
// ares_library_init and ares_library_cleanup are currently no-op except under
// Windows. Calling them may cause race conditions when other parts of the
@ -1114,6 +1114,8 @@ void grpc_ares_cleanup(void) {}
*/
typedef struct grpc_resolve_address_ares_request {
/* work_serializer that queries and related callbacks run under */
std::shared_ptr<grpc_core::WorkSerializer> work_serializer;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out;
/** currently resolving addresses */
@ -1121,8 +1123,8 @@ typedef struct grpc_resolve_address_ares_request {
/** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done;
/** a closure wrapping on_resolve_address_done, which should be invoked when
the grpc_dns_lookup_ares operation is done. */
grpc_closure on_dns_lookup_done;
the grpc_dns_lookup_ares_locked operation is done. */
grpc_closure on_dns_lookup_done_locked;
/* target name */
const char* name;
/* default port to use if none is specified */
@ -1133,9 +1135,8 @@ typedef struct grpc_resolve_address_ares_request {
grpc_ares_request* ares_request = nullptr;
} grpc_resolve_address_ares_request;
static void on_dns_lookup_done(void* arg, grpc_error_handle error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r,
grpc_error_handle error) {
delete r->ares_request;
grpc_resolved_addresses** resolved_addresses = r->addrs_out;
if (r->addresses == nullptr || r->addresses->empty()) {
@ -1152,11 +1153,30 @@ static void on_dns_lookup_done(void* arg, grpc_error_handle error) {
sizeof(grpc_resolved_address));
}
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, error);
delete r;
}
static void on_dns_lookup_done(void* arg, grpc_error_handle error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); },
DEBUG_LOCATION);
}
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r,
grpc_schedule_on_exec_ctx);
r->ares_request = grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
&r->on_dns_lookup_done_locked, &r->addresses,
nullptr /* balancer_addresses */, nullptr /* service_config_json */,
GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->work_serializer);
}
static void grpc_resolve_address_ares_impl(const char* name,
const char* default_port,
grpc_pollset_set* interested_parties,
@ -1164,18 +1184,15 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_resolved_addresses** addrs) {
grpc_resolve_address_ares_request* r =
new grpc_resolve_address_ares_request();
r->work_serializer = std::make_shared<grpc_core::WorkSerializer>();
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
r->name = name;
r->default_port = default_port;
r->interested_parties = interested_parties;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done, r,
grpc_schedule_on_exec_ctx);
r->ares_request = grpc_dns_lookup_ares(
nullptr /* dns_server */, name, default_port, interested_parties,
&r->on_dns_lookup_done, &r->addresses, nullptr /* balancer_addresses */,
nullptr /* service_config_json */,
GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS);
r->work_serializer->Run(
[r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); },
DEBUG_LOCATION);
}
void (*grpc_resolve_address_ares)(

@ -45,28 +45,23 @@ extern grpc_core::TraceFlag grpc_trace_cares_resolver;
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
struct grpc_ares_request {
/** synchronizes access to this request, and also to associated
* ev_driver and fd_node objects */
grpc_core::Mutex mu;
/** indicates the DNS server to use, if specified */
struct ares_addr_port_node dns_server_addr ABSL_GUARDED_BY(mu);
struct ares_addr_port_node dns_server_addr;
/** following members are set in grpc_resolve_address_ares_impl */
/** closure to call when the request completes */
grpc_closure* on_done ABSL_GUARDED_BY(mu) = nullptr;
grpc_closure* on_done = nullptr;
/** the pointer to receive the resolved addresses */
std::unique_ptr<grpc_core::ServerAddressList>* addresses_out
ABSL_GUARDED_BY(mu);
std::unique_ptr<grpc_core::ServerAddressList>* addresses_out;
/** the pointer to receive the resolved balancer addresses */
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses_out
ABSL_GUARDED_BY(mu);
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses_out;
/** the pointer to receive the service config in JSON */
char** service_config_json_out ABSL_GUARDED_BY(mu) = nullptr;
char** service_config_json_out = nullptr;
/** the evernt driver used by this request */
grpc_ares_ev_driver* ev_driver ABSL_GUARDED_BY(mu) = nullptr;
grpc_ares_ev_driver* ev_driver = nullptr;
/** number of ongoing queries */
size_t pending_queries ABSL_GUARDED_BY(mu) = 0;
size_t pending_queries = 0;
/** the errors explaining query failures, appended to in query callbacks */
grpc_error_handle error ABSL_GUARDED_BY(mu) = GRPC_ERROR_NONE;
grpc_error_handle error = GRPC_ERROR_NONE;
};
/* Asynchronously resolve \a name. Use \a default_port if a port isn't
@ -88,15 +83,16 @@ extern void (*grpc_resolve_address_ares)(const char* name,
scheduled with \a exec_ctx, so it must not try to acquire locks that are
being held by the caller. The returned grpc_ares_request object is owned
by the caller and it is safe to free after on_done is called back. */
extern grpc_ares_request* (*grpc_dns_lookup_ares)(
extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
char** service_config_json, int query_timeout_ms);
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer);
/* Cancel the pending grpc_ares_request \a request */
extern void (*grpc_cancel_ares_request)(grpc_ares_request* request);
extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);
/* Initialize gRPC ares wrapper. Must be called at least once before
grpc_resolve_address_ares(). */
@ -107,6 +103,10 @@ grpc_error_handle grpc_ares_init(void);
it has been called the same number of times as grpc_ares_init(). */
void grpc_ares_cleanup(void);
/** Schedules the desired callback for request completion
* and destroys the grpc_ares_request */
void grpc_ares_complete_request_locked(grpc_ares_request* request);
/* Indicates whether or not AAAA queries should be attempted. */
/* E.g., return false if ipv6 is known to not be available. */
bool grpc_ares_query_ipv6();

@ -60,12 +60,13 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/,
static grpc_address_resolver_vtable test_resolver = {my_resolve_address,
nullptr};
static grpc_ares_request* my_dns_lookup_ares(
static grpc_ares_request* my_dns_lookup_ares_locked(
const char* /*dns_server*/, const char* addr, const char* /*default_port*/,
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* /*balancer_addresses*/,
char** /*service_config_json*/, int /*query_timeout_ms*/) { // NOLINT
char** /*service_config_json*/, int /*query_timeout_ms*/,
std::shared_ptr<grpc_core::WorkSerializer> /*combiner*/) { // NOLINT
gpr_mu_lock(&g_mu);
GPR_ASSERT(0 == strcmp("test", addr));
grpc_error_handle error = GRPC_ERROR_NONE;
@ -85,7 +86,7 @@ static grpc_ares_request* my_dns_lookup_ares(
return nullptr;
}
static void my_cancel_ares_request(grpc_ares_request* request) {
static void my_cancel_ares_request_locked(grpc_ares_request* request) {
GPR_ASSERT(request == nullptr);
}
@ -154,8 +155,8 @@ int main(int argc, char** argv) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
g_work_serializer = &work_serializer;
grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request;
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
{
grpc_core::ExecCtx exec_ctx;

@ -37,12 +37,13 @@ static grpc_address_resolver_vtable* default_resolve_address;
static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
static grpc_ares_request* (*g_default_dns_lookup_ares)(
static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
char** service_config_json, int query_timeout_ms);
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer);
// Counter incremented by test_resolve_address_impl indicating the number of
// times a system-level resolution has happened.
@ -95,15 +96,17 @@ static grpc_error_handle test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl};
static grpc_ares_request* test_dns_lookup_ares(
static grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
char** service_config_json, int query_timeout_ms) {
grpc_ares_request* result = g_default_dns_lookup_ares(
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
grpc_ares_request* result = g_default_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done,
addresses, balancer_addresses, service_config_json, query_timeout_ms);
addresses, balancer_addresses, service_config_json, query_timeout_ms,
std::move(work_serializer));
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
grpc_millis now =
@ -332,8 +335,8 @@ int main(int argc, char** argv) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
g_work_serializer = &work_serializer;
g_default_dns_lookup_ares = grpc_dns_lookup_ares;
grpc_dns_lookup_ares = test_dns_lookup_ares;
g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked;
default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver);

@ -137,12 +137,13 @@ void my_resolve_address(const char* addr, const char* /*default_port*/,
static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address,
nullptr};
grpc_ares_request* my_dns_lookup_ares(
grpc_ares_request* my_dns_lookup_ares_locked(
const char* /*dns_server*/, const char* addr, const char* /*default_port*/,
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* /*balancer_addresses*/,
char** /*service_config_json*/, int /*query_timeout*/) {
char** /*service_config_json*/, int /*query_timeout*/,
std::shared_ptr<grpc_core::WorkSerializer> /*combiner*/) {
addr_req* r = new addr_req();
r->addr = gpr_strdup(addr);
r->on_done = on_done;
@ -154,7 +155,7 @@ grpc_ares_request* my_dns_lookup_ares(
return nullptr;
}
static void my_cancel_ares_request(grpc_ares_request* request) {
static void my_cancel_ares_request_locked(grpc_ares_request* request) {
GPR_ASSERT(request == nullptr);
}
////////////////////////////////////////////////////////////////////////////////
@ -729,8 +730,8 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_core::Executor::SetThreadingAll(false);
}
grpc_set_resolver_impl(&fuzzer_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request;
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
GPR_ASSERT(g_channel == nullptr);
GPR_ASSERT(g_server == nullptr);

@ -43,14 +43,15 @@ static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static gpr_mu g_mu;
static int g_resolve_port = -1;
static grpc_ares_request* (*iomgr_dns_lookup_ares)(
static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
char** service_config_json, int query_timeout_ms);
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> combiner);
static void (*iomgr_cancel_ares_request)(grpc_ares_request* request);
static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request);
static void set_resolve_port(int port) {
gpr_mu_lock(&g_mu);
@ -100,16 +101,18 @@ static grpc_error_handle my_blocking_resolve_address(
static grpc_address_resolver_vtable test_resolver = {
my_resolve_address, my_blocking_resolve_address};
static grpc_ares_request* my_dns_lookup_ares(
static grpc_ares_request* my_dns_lookup_ares_locked(
const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
std::unique_ptr<grpc_core::ServerAddressList>* balancer_addresses,
char** service_config_json, int query_timeout_ms) {
char** service_config_json, int query_timeout_ms,
std::shared_ptr<grpc_core::WorkSerializer> work_serializer) {
if (0 != strcmp(addr, "test")) {
return iomgr_dns_lookup_ares(
return iomgr_dns_lookup_ares_locked(
dns_server, addr, default_port, interested_parties, on_done, addresses,
balancer_addresses, service_config_json, query_timeout_ms);
balancer_addresses, service_config_json, query_timeout_ms,
std::move(work_serializer));
}
grpc_error_handle error = GRPC_ERROR_NONE;
@ -131,9 +134,9 @@ static grpc_ares_request* my_dns_lookup_ares(
return nullptr;
}
static void my_cancel_ares_request(grpc_ares_request* request) {
static void my_cancel_ares_request_locked(grpc_ares_request* request) {
if (request != nullptr) {
iomgr_cancel_ares_request(request);
iomgr_cancel_ares_request_locked(request);
}
}
@ -149,10 +152,10 @@ int main(int argc, char** argv) {
grpc_init();
default_resolver = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver);
iomgr_dns_lookup_ares = grpc_dns_lookup_ares;
iomgr_cancel_ares_request = grpc_cancel_ares_request;
grpc_dns_lookup_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request;
iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
iomgr_cancel_ares_request_locked = grpc_cancel_ares_request_locked;
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
int was_cancelled1;
int was_cancelled2;

Loading…
Cancel
Save