Merge pull request #15490 from apolcyn/ares_under_combiner

Refactor to put c-ares queries under a combiner
pull/15746/head
apolcyn 7 years ago committed by GitHub
commit 057fd3f575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  2. 14
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  3. 93
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  4. 155
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  5. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  6. 11
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
  7. 8
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  8. 18
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  9. 14
      test/core/end2end/fuzzers/api_fuzzer.cc
  10. 20
      test/core/end2end/goaway_server_test.cc

@ -414,10 +414,10 @@ void AresDnsResolver::StartResolvingLocked() {
resolving_ = true; resolving_ = true;
lb_addresses_ = nullptr; lb_addresses_ = nullptr;
service_config_json_ = nullptr; service_config_json_ = nullptr;
pending_request_ = grpc_dns_lookup_ares( pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
&on_resolved_, &lb_addresses_, true /* check_grpclb */, &on_resolved_, &lb_addresses_, true /* check_grpclb */,
request_service_config_ ? &service_config_json_ : nullptr); request_service_config_ ? &service_config_json_ : nullptr, combiner());
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
} }

@ -29,25 +29,27 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
/* Start \a ev_driver. It will keep working until all IO on its ares_channel is /* Start \a ev_driver. It will keep working until all IO on its ares_channel is
done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks
bound to its ares_channel when necessary. */ bound to its ares_channel when necessary. */
void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver);
/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to /* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to
\a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the
query. */ query. */
ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver); ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver);
/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is /* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
created successfully. */ created successfully. */
grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set); grpc_pollset_set* pollset_set,
grpc_combiner* combiner);
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
will be cancelled and their on_done callbacks will be invoked with a status will be cancelled and their on_done callbacks will be invoked with a status
of ARES_ECANCELLED. */ of ARES_ECANCELLED. */
void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver);
/* Shutdown all the grpc_fds used by \a ev_driver */ /* Shutdown all the grpc_fds used by \a ev_driver */
void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \
*/ */

@ -39,17 +39,15 @@
typedef struct fd_node { typedef struct fd_node {
/** the owner of this fd node */ /** the owner of this fd node */
grpc_ares_ev_driver* ev_driver; grpc_ares_ev_driver* ev_driver;
/** a closure wrapping on_readable_cb, which should be invoked when the /** a closure wrapping on_readable_locked, which should be
grpc_fd in this node becomes readable. */ invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure; grpc_closure read_closure;
/** a closure wrapping on_writable_cb, which should be invoked when the /** a closure wrapping on_writable_locked, which should be
grpc_fd in this node becomes writable. */ invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure; grpc_closure write_closure;
/** next fd node in the list */ /** next fd node in the list */
struct fd_node* next; struct fd_node* next;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** the grpc_fd owned by this fd node */ /** the grpc_fd owned by this fd node */
grpc_fd* fd; grpc_fd* fd;
/** if the readable closure has been registered */ /** if the readable closure has been registered */
@ -68,8 +66,8 @@ struct grpc_ares_ev_driver {
/** refcount of the event driver */ /** refcount of the event driver */
gpr_refcount refs; gpr_refcount refs;
/** mutex guarding the rest of the state */ /** combiner to synchronize c-ares and I/O callbacks on */
gpr_mu mu; grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */ /** a list of grpc_fd that this event driver is currently using. */
fd_node* fds; fd_node* fds;
/** is this event driver currently working? */ /** is this event driver currently working? */
@ -92,19 +90,18 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
if (gpr_unref(&ev_driver->refs)) { if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr); GPR_ASSERT(ev_driver->fds == nullptr);
gpr_mu_destroy(&ev_driver->mu); GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel); ares_destroy(ev_driver->channel);
gpr_free(ev_driver); gpr_free(ev_driver);
} }
} }
static void fd_node_destroy(fd_node* fdn) { static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown); GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu); /* c-ares library will close the fd inside grpc_fd. This fd may be picked up
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
int dummy_release_fd; int dummy_release_fd;
@ -119,15 +116,16 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
} }
} }
grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set) { grpc_pollset_set* pollset_set,
grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>( *ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver))); gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts; ares_options opts;
memset(&opts, 0, sizeof(opts)); memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN; opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) { if (status != ARES_SUCCESS) {
char* err_msg; char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
@ -137,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
gpr_free(*ev_driver); gpr_free(*ev_driver);
return err; return err;
} }
gpr_mu_init(&(*ev_driver)->mu); (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1); gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set; (*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr; (*ev_driver)->fds = nullptr;
@ -146,34 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
// It's not safe to shut down remaining fds here directly, becauses // We mark the event driver as being shut down. If the event driver
// ares_host_callback does not provide an exec_ctx. We mark the event driver // is working, grpc_ares_notify_on_event_locked will shut down the
// as being shut down. If the event driver is working, // fds; if it's not working, there are no fds to shut down.
// grpc_ares_notify_on_event_locked will shut down the fds; if it's not
// working, there are no fds to shut down.
gpr_mu_lock(&ev_driver->mu);
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
gpr_mu_lock(&ev_driver->mu);
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds; fd_node* fn = ev_driver->fds;
while (fn != nullptr) { while (fn != nullptr) {
gpr_mu_lock(&fn->mu);
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
gpr_mu_unlock(&fn->mu);
fn = fn->next; fn = fn->next;
} }
gpr_mu_unlock(&ev_driver->mu);
} }
// Search fd in the fd_node list head. This is an O(n) search, the max possible // Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node* pop_fd_node(fd_node** head, int fd) { static fd_node* pop_fd_node_locked(fd_node** head, int fd) {
fd_node dummy_head; fd_node dummy_head;
dummy_head.next = *head; dummy_head.next = *head;
fd_node* node = &dummy_head; fd_node* node = &dummy_head;
@ -190,24 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) {
} }
/* Check if \a fd is still readable */ /* Check if \a fd is still readable */
static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, static bool grpc_ares_is_fd_still_readable_locked(
int fd) { grpc_ares_ev_driver* ev_driver, int fd) {
size_t bytes_available = 0; size_t bytes_available = 0;
return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0;
} }
static void on_readable_cb(void* arg, grpc_error* error) { static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg); fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false; fdn->readable_registered = false;
gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", fd); gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
do { do {
ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD);
} while (grpc_ares_is_fd_still_readable(ev_driver, fd)); } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd));
} else { } else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled // timed out. The pending lookups made on this ev_driver will be cancelled
@ -217,19 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
static void on_writable_cb(void* arg, grpc_error* error) { static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg); fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false; fdn->writable_registered = false;
gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", fd); gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@ -242,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) { ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel; return &ev_driver->channel;
} }
@ -263,7 +246,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) || if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]); fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list. // Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) { if (fdn == nullptr) {
char* fd_name; char* fd_name;
@ -275,17 +258,15 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
fdn->already_shutdown = false; fdn->already_shutdown = false;
gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_combiner_scheduler(ev_driver->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_combiner_scheduler(ev_driver->combiner));
grpc_schedule_on_exec_ctx);
grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name); gpr_free(fd_name);
} }
fdn->next = new_list; fdn->next = new_list;
new_list = fdn; new_list = fdn;
gpr_mu_lock(&fdn->mu);
// Register read_closure if the socket is readable and read_closure has // Register read_closure if the socket is readable and read_closure has
// not been registered with this socket. // not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) && if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
@ -305,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure);
fdn->writable_registered = true; fdn->writable_registered = true;
} }
gpr_mu_unlock(&fdn->mu);
} }
} }
} }
@ -315,15 +295,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) { while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds; fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next; ev_driver->fds = ev_driver->fds->next;
gpr_mu_lock(&cur->mu);
fd_node_shutdown_locked(cur, "c-ares fd shutdown"); fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) { if (!cur->readable_registered && !cur->writable_registered) {
gpr_mu_unlock(&cur->mu); fd_node_destroy_locked(cur);
fd_node_destroy(cur);
} else { } else {
cur->next = new_list; cur->next = new_list;
new_list = cur; new_list = cur;
gpr_mu_unlock(&cur->mu);
} }
} }
ev_driver->fds = new_list; ev_driver->fds = new_list;
@ -334,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
} }
} }
void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
gpr_mu_lock(&ev_driver->mu);
if (!ev_driver->working) { if (!ev_driver->working) {
ev_driver->working = true; ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
} }
gpr_mu_unlock(&ev_driver->mu);
} }
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */

@ -65,8 +65,6 @@ struct grpc_ares_request {
/** number of ongoing queries */ /** number of ongoing queries */
gpr_refcount pending_queries; gpr_refcount pending_queries;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** is there at least one successful query, set in on_done_cb */ /** is there at least one successful query, set in on_done_cb */
bool success; bool success;
/** the errors explaining the request failure, set in on_done_cb */ /** the errors explaining the request failure, set in on_done_cb */
@ -74,7 +72,8 @@ struct grpc_ares_request {
}; };
typedef struct grpc_ares_hostbyname_request { typedef struct grpc_ares_hostbyname_request {
/** following members are set in create_hostbyname_request */ /** following members are set in create_hostbyname_request_locked
*/
/** the top-level request instance */ /** the top-level request instance */
grpc_ares_request* parent_request; grpc_ares_request* parent_request;
/** host to resolve, parsed from the name to resolve */ /** host to resolve, parsed from the name to resolve */
@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) {
return htons(static_cast<unsigned short>(atoi(port))); return htons(static_cast<unsigned short>(atoi(port)));
} }
static void grpc_ares_request_ref(grpc_ares_request* r) {
gpr_ref(&r->pending_queries);
}
static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
const char* input_output_str) { const char* input_output_str) {
for (size_t i = 0; i < lb_addrs->num_addresses; i++) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort(
grpc_cares_wrapper_address_sorting_sort(lb_addrs); grpc_cares_wrapper_address_sorting_sort(lb_addrs);
} }
static void grpc_ares_request_unref(grpc_ares_request* r) { static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
gpr_ref(&r->pending_queries);
}
static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the /* If there are no pending queries, invoke on_done callback and destroy the
request */ request */
if (gpr_unref(&r->pending_queries)) { if (gpr_unref(&r->pending_queries)) {
@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) {
grpc_cares_wrapper_address_sorting_sort(lb_addrs); grpc_cares_wrapper_address_sorting_sort(lb_addrs);
} }
GRPC_CLOSURE_SCHED(r->on_done, r->error); GRPC_CLOSURE_SCHED(r->on_done, r->error);
gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy_locked(r->ev_driver);
grpc_ares_ev_driver_destroy(r->ev_driver);
gpr_free(r); gpr_free(r);
} }
} }
static grpc_ares_hostbyname_request* create_hostbyname_request( static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, char* host, uint16_t port, grpc_ares_request* parent_request, char* host, uint16_t port,
bool is_balancer) { bool is_balancer) {
grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>( grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(
@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request(
hr->host = gpr_strdup(host); hr->host = gpr_strdup(host);
hr->port = port; hr->port = port;
hr->is_balancer = is_balancer; hr->is_balancer = is_balancer;
grpc_ares_request_ref(parent_request); grpc_ares_request_ref_locked(parent_request);
return hr; return hr;
} }
static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { static void destroy_hostbyname_request_locked(
grpc_ares_request_unref(hr->parent_request); grpc_ares_hostbyname_request* hr) {
grpc_ares_request_unref_locked(hr->parent_request);
gpr_free(hr->host); gpr_free(hr->host);
gpr_free(hr); gpr_free(hr);
} }
static void on_hostbyname_done_cb(void* arg, int status, int timeouts, static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
struct hostent* hostent) { struct hostent* hostent) {
grpc_ares_hostbyname_request* hr = grpc_ares_hostbyname_request* hr =
static_cast<grpc_ares_hostbyname_request*>(arg); static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request; grpc_ares_request* r = hr->parent_request;
gpr_mu_lock(&r->mu);
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
GRPC_ERROR_UNREF(r->error); GRPC_ERROR_UNREF(r->error);
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
@ -263,33 +261,33 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
} }
gpr_mu_unlock(&r->mu); destroy_hostbyname_request_locked(hr);
destroy_hostbyname_request(hr);
} }
static void on_srv_query_done_cb(void* arg, int status, int timeouts, static void on_srv_query_done_locked(void* arg, int status, int timeouts,
unsigned char* abuf, int alen) { unsigned char* abuf, int alen) {
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); gpr_log(GPR_DEBUG, "on_query_srv_done_locked");
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS");
struct ares_srv_reply* reply; struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
if (parse_status == ARES_SUCCESS) { if (parse_status == ARES_SUCCESS) {
ares_channel* channel = grpc_ares_ev_driver_get_channel(r->ev_driver); ares_channel* channel =
grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) { srv_it = srv_it->next) {
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
grpc_ares_hostbyname_request* hr = create_hostbyname_request( grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */); r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, ares_gethostbyname(*channel, hr->host, AF_INET6,
on_hostbyname_done_cb, hr); on_hostbyname_done_locked, hr);
} }
grpc_ares_hostbyname_request* hr = create_hostbyname_request( grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */); r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, ares_gethostbyname(*channel, hr->host, AF_INET,
hr); on_hostbyname_done_locked, hr);
grpc_ares_ev_driver_start(r->ev_driver); grpc_ares_ev_driver_start_locked(r->ev_driver);
} }
} }
if (reply != nullptr) { if (reply != nullptr) {
@ -307,21 +305,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
} }
grpc_ares_request_unref(r); grpc_ares_request_unref_locked(r);
} }
static const char g_service_config_attribute_prefix[] = "grpc_config="; static const char g_service_config_attribute_prefix[] = "grpc_config=";
static void on_txt_done_cb(void* arg, int status, int timeouts, static void on_txt_done_locked(void* arg, int status, int timeouts,
unsigned char* buf, int len) { unsigned char* buf, int len) {
gpr_log(GPR_DEBUG, "on_txt_done_cb"); gpr_log(GPR_DEBUG, "on_txt_done_locked");
char* error_msg; char* error_msg;
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1;
struct ares_txt_ext* result = nullptr; struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr; struct ares_txt_ext* reply = nullptr;
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
gpr_mu_lock(&r->mu);
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
status = ares_parse_txt_reply_ext(buf, len, &reply); status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
@ -366,14 +363,14 @@ fail:
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
done: done:
gpr_mu_unlock(&r->mu); grpc_ares_request_unref_locked(r);
grpc_ares_request_unref(r);
} }
static grpc_ares_request* grpc_dns_lookup_ares_impl( static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_hostbyname_request* hr = nullptr;
grpc_ares_request* r = nullptr; grpc_ares_request* r = nullptr;
@ -402,20 +399,19 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
} }
port = gpr_strdup(default_port); port = gpr_strdup(default_port);
} }
grpc_ares_ev_driver* ev_driver; grpc_ares_ev_driver* ev_driver;
error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties,
combiner);
if (error != GRPC_ERROR_NONE) goto error_cleanup; if (error != GRPC_ERROR_NONE) goto error_cleanup;
r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
gpr_mu_init(&r->mu);
r->ev_driver = ev_driver; r->ev_driver = ev_driver;
r->on_done = on_done; r->on_done = on_done;
r->lb_addrs_out = addrs; r->lb_addrs_out = addrs;
r->service_config_json_out = service_config_json; r->service_config_json_out = service_config_json;
r->success = false; r->success = false;
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
channel = grpc_ares_ev_driver_get_channel(r->ev_driver); channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
// If dns_server is specified, use it. // If dns_server is specified, use it.
if (dns_server != nullptr) { if (dns_server != nullptr) {
@ -457,32 +453,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
} }
gpr_ref_init(&r->pending_queries, 1); gpr_ref_init(&r->pending_queries, 1);
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
hr = create_hostbyname_request(r, host, strhtons(port), hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */); false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked,
hr);
} }
hr = create_hostbyname_request(r, host, strhtons(port), hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */); false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked,
hr);
if (check_grpclb) { if (check_grpclb) {
/* Query the SRV record */ /* Query the SRV record */
grpc_ares_request_ref(r); grpc_ares_request_ref_locked(r);
char* service_name; char* service_name;
gpr_asprintf(&service_name, "_grpclb._tcp.%s", host); gpr_asprintf(&service_name, "_grpclb._tcp.%s", host);
ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb, ares_query(*channel, service_name, ns_c_in, ns_t_srv,
r); on_srv_query_done_locked, r);
gpr_free(service_name); gpr_free(service_name);
} }
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
grpc_ares_request_ref(r); grpc_ares_request_ref_locked(r);
char* config_name; char* config_name;
gpr_asprintf(&config_name, "_grpc_config.%s", host); gpr_asprintf(&config_name, "_grpc_config.%s", host);
ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r); ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked,
r);
gpr_free(config_name); gpr_free(config_name);
} }
/* TODO(zyc): Handle CNAME records here. */ grpc_ares_ev_driver_start_locked(r->ev_driver);
grpc_ares_ev_driver_start(r->ev_driver); grpc_ares_request_unref_locked(r);
grpc_ares_request_unref(r);
gpr_free(host); gpr_free(host);
gpr_free(port); gpr_free(port);
return r; return r;
@ -494,15 +492,15 @@ error_cleanup:
return nullptr; return nullptr;
} }
grpc_ares_request* (*grpc_dns_lookup_ares)( grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) = grpc_dns_lookup_ares_impl; grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) { void grpc_cancel_ares_request(grpc_ares_request* r) {
if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) {
grpc_ares_ev_driver_shutdown(r->ev_driver); grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
} }
} }
@ -534,6 +532,8 @@ void grpc_ares_cleanup(void) {
*/ */
typedef struct grpc_resolve_address_ares_request { typedef struct grpc_resolve_address_ares_request {
/* combiner that queries and related callbacks run under */
grpc_combiner* combiner;
/** the pointer to receive the resolved addresses */ /** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out; grpc_resolved_addresses** addrs_out;
/** currently resolving lb addresses */ /** currently resolving lb addresses */
@ -541,8 +541,14 @@ typedef struct grpc_resolve_address_ares_request {
/** closure to call when the resolve_address_ares request completes */ /** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done; grpc_closure* on_resolve_address_done;
/** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the
grpc_dns_lookup_ares operation is done. */ grpc_dns_lookup_ares_locked operation is done. */
grpc_closure on_dns_lookup_done; grpc_closure on_dns_lookup_done;
/* target name */
const char* name;
/* default port to use if none is specified */
const char* default_port;
/* pollset_set to be driven by */
grpc_pollset_set* interested_parties;
} grpc_resolve_address_ares_request; } grpc_resolve_address_ares_request;
static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
@ -566,9 +572,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
} }
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs);
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
gpr_free(r); gpr_free(r);
} }
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
void* arg, grpc_error* unused_error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
&r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */,
nullptr /* service_config_json */, r->combiner);
}
static void grpc_resolve_address_ares_impl(const char* name, static void grpc_resolve_address_ares_impl(const char* name,
const char* default_port, const char* default_port,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
@ -577,14 +594,18 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_resolve_address_ares_request* r = grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>( static_cast<grpc_resolve_address_ares_request*>(
gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); gpr_zalloc(sizeof(grpc_resolve_address_ares_request)));
r->combiner = grpc_combiner_create();
r->addrs_out = addrs; r->addrs_out = addrs;
r->on_resolve_address_done = on_done; r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, r->name = name;
interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, r->default_port = default_port;
false /* check_grpclb */, r->interested_parties = interested_parties;
nullptr /* service_config_json */); GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r,
grpc_combiner_scheduler(r->combiner)),
GRPC_ERROR_NONE);
} }
void (*grpc_resolve_address_ares)( void (*grpc_resolve_address_ares)(

@ -48,11 +48,11 @@ extern void (*grpc_resolve_address_ares)(const char* name,
function. \a on_done may be called directly in this function without being function. \a on_done may be called directly in this function without being
scheduled with \a exec_ctx, so it must not try to acquire locks that are scheduled with \a exec_ctx, so it must not try to acquire locks that are
being held by the caller. */ being held by the caller. */
extern grpc_ares_request* (*grpc_dns_lookup_ares)( extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb, grpc_lb_addresses** addresses, bool check_grpclb,
char** service_config_json); char** service_config_json, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */ /* Cancel the pending grpc_ares_request \a request */
void grpc_cancel_ares_request(grpc_ares_request* request); void grpc_cancel_ares_request(grpc_ares_request* request);

@ -26,18 +26,19 @@ struct grpc_ares_request {
char val; char val;
}; };
static grpc_ares_request* grpc_dns_lookup_ares_impl( static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) {
return NULL; return NULL;
} }
grpc_ares_request* (*grpc_dns_lookup_ares)( grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) = grpc_dns_lookup_ares_impl; grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) {} void grpc_cancel_ares_request(grpc_ares_request* r) {}

@ -60,11 +60,11 @@ static void my_resolve_address(const char* addr, const char* default_port,
static grpc_address_resolver_vtable test_resolver = {my_resolve_address, static grpc_address_resolver_vtable test_resolver = {my_resolve_address,
nullptr}; nullptr};
static grpc_ares_request* my_dns_lookup_ares( static grpc_ares_request* my_dns_lookup_ares_locked(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** lb_addrs, bool check_grpclb, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) { grpc_combiner* combiner) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
GPR_ASSERT(0 == strcmp("test", addr)); GPR_ASSERT(0 == strcmp("test", addr));
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
@ -147,7 +147,7 @@ int main(int argc, char** argv) {
gpr_mu_init(&g_mu); gpr_mu_init(&g_mu);
g_combiner = grpc_combiner_create(); g_combiner = grpc_combiner_create();
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_channel_args* result = (grpc_channel_args*)1; grpc_channel_args* result = (grpc_channel_args*)1;
{ {

@ -33,10 +33,11 @@ static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner; static grpc_combiner* g_combiner;
grpc_ares_request* (*g_default_dns_lookup_ares)( grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json); grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner);
// Counter incremented by test_resolve_address_impl indicating the number of // Counter incremented by test_resolve_address_impl indicating the number of
// times a system-level resolution has happened. // times a system-level resolution has happened.
@ -72,13 +73,14 @@ static grpc_error* test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = { static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl}; test_resolve_address_impl, test_blocking_resolve_address_impl};
grpc_ares_request* test_dns_lookup_ares( grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_ares_request* result = g_default_dns_lookup_ares( grpc_combiner* combiner) {
grpc_ares_request* result = g_default_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs, dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs,
check_grpclb, service_config_json); check_grpclb, service_config_json, combiner);
++g_resolution_count; ++g_resolution_count;
return result; return result;
} }
@ -308,8 +310,8 @@ int main(int argc, char** argv) {
g_combiner = grpc_combiner_create(); g_combiner = grpc_combiner_create();
g_default_dns_lookup_ares = grpc_dns_lookup_ares; g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares = test_dns_lookup_ares; grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked;
default_resolve_address = grpc_resolve_address_impl; default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);

@ -374,13 +374,11 @@ void my_resolve_address(const char* addr, const char* default_port,
static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address, static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address,
nullptr}; nullptr};
grpc_ares_request* my_dns_lookup_ares(const char* dns_server, const char* addr, grpc_ares_request* my_dns_lookup_ares_locked(
const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_closure* on_done, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
grpc_lb_addresses** lb_addrs, grpc_combiner* combiner) {
bool check_grpclb,
char** service_config_json) {
addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r))); addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r)));
r->addr = gpr_strdup(addr); r->addr = gpr_strdup(addr);
r->on_done = on_done; r->on_done = on_done;
@ -706,7 +704,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_executor_set_threading(false); grpc_executor_set_threading(false);
} }
grpc_set_resolver_impl(&fuzzer_resolver); grpc_set_resolver_impl(&fuzzer_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
GPR_ASSERT(g_channel == nullptr); GPR_ASSERT(g_channel == nullptr);
GPR_ASSERT(g_server == nullptr); GPR_ASSERT(g_server == nullptr);

@ -44,11 +44,11 @@ static void* tag(intptr_t i) { return (void*)i; }
static gpr_mu g_mu; static gpr_mu g_mu;
static int g_resolve_port = -1; static int g_resolve_port = -1;
static grpc_ares_request* (*iomgr_dns_lookup_ares)( static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb, grpc_lb_addresses** addresses, bool check_grpclb,
char** service_config_json); char** service_config_json, grpc_combiner* combiner);
static void set_resolve_port(int port) { static void set_resolve_port(int port) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
@ -98,15 +98,15 @@ static grpc_error* my_blocking_resolve_address(
static grpc_address_resolver_vtable test_resolver = { static grpc_address_resolver_vtable test_resolver = {
my_resolve_address, my_blocking_resolve_address}; my_resolve_address, my_blocking_resolve_address};
static grpc_ares_request* my_dns_lookup_ares( static grpc_ares_request* my_dns_lookup_ares_locked(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** lb_addrs, bool check_grpclb, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) { grpc_combiner* combiner) {
if (0 != strcmp(addr, "test")) { if (0 != strcmp(addr, "test")) {
return iomgr_dns_lookup_ares(dns_server, addr, default_port, return iomgr_dns_lookup_ares_locked(
interested_parties, on_done, lb_addrs, dns_server, addr, default_port, interested_parties, on_done, lb_addrs,
check_grpclb, service_config_json); check_grpclb, service_config_json, combiner);
} }
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
@ -142,8 +142,8 @@ int main(int argc, char** argv) {
grpc_init(); grpc_init();
default_resolver = grpc_resolve_address_impl; default_resolver = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);
iomgr_dns_lookup_ares = grpc_dns_lookup_ares; iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
int was_cancelled1; int was_cancelled1;
int was_cancelled2; int was_cancelled2;

Loading…
Cancel
Save