From e8d830930182f52c5381117746395b303099b5c2 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 27 Oct 2016 17:30:17 -0700 Subject: [PATCH] Refactor grpc_ares_ev_driver --- .../resolver/dns/c_ares/dns_resolver_ares.c | 31 +- .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 19 +- .../dns/c_ares/grpc_ares_ev_driver_posix.c | 268 ++++++++++++------ .../resolver/dns/c_ares/grpc_ares_wrapper.c | 19 +- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 3 +- test/core/end2end/fuzzers/api_fuzzer.c | 2 +- 6 files changed, 227 insertions(+), 115 deletions(-) diff --git a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c index 7b46ba53660..4695ba061c8 100644 --- a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c @@ -41,6 +41,7 @@ #include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" @@ -84,6 +85,8 @@ typedef struct { /** currently resolving addresses */ grpc_resolved_addresses *addresses; + + grpc_ares_ev_driver *ev_driver; } ares_dns_resolver; static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); @@ -198,18 +201,20 @@ static void dns_ares_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, grpc_closure *on_complete) { ares_dns_resolver *r = (ares_dns_resolver *)resolver; gpr_mu_lock(&r->mu); + gpr_log(GPR_DEBUG, "dns_ares_next is called."); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { gpr_backoff_reset(&r->backoff_state); - GRPC_RESOLVER_REF(&r->base, "dns-resolving"); - GPR_ASSERT(!r->resolving); - r->resolving = true; - r->addresses = NULL; - grpc_resolve_address_ares( - exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set, - grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); + dns_ares_start_resolving_locked(exec_ctx, r); + // GRPC_RESOLVER_REF(&r->base, "dns-resolving"); + // GPR_ASSERT(!r->resolving); + // r->resolving = true; + // r->addresses = NULL; + // grpc_resolve_address_ares( + // exec_ctx, r->name_to_resolve, r->default_port, r->ev_driver, + // grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); } else { dns_ares_maybe_finish_next_locked(exec_ctx, r); } @@ -223,7 +228,7 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->resolving = true; r->addresses = NULL; grpc_resolve_address_ares( - exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set, + exec_ctx, r->name_to_resolve, r->default_port, r->ev_driver, grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); } @@ -242,7 +247,9 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, } static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + gpr_log(GPR_DEBUG, "dns_ares_destroy"); ares_dns_resolver *r = (ares_dns_resolver *)gr; + grpc_ares_ev_driver_destroy(exec_ctx, r->ev_driver); gpr_mu_destroy(&r->mu); grpc_ares_cleanup(); if (r->resolved_result) { @@ -280,8 +287,14 @@ static grpc_resolver *dns_ares_create(grpc_resolver_args *args, // Create resolver. r = gpr_malloc(sizeof(ares_dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &dns_ares_resolver_vtable); + error = grpc_ares_ev_driver_create(&r->ev_driver, r->base.pollset_set); + if (error != GRPC_ERROR_NONE) { + GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", error); + gpr_free(r); + return NULL; + } + gpr_mu_init(&r->mu); r->target_name = gpr_strdup(path); r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h index f3eedb5f956..4d2b451321b 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -37,8 +37,6 @@ #include #ifndef GRPC_NATIVE_ADDRESS_RESOLVE -#include - #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -50,20 +48,21 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver); -/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to - \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the - query. */ -ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver); +/* Returns a pointer of ares_channel. This channel is owned by \a ev_driver. To + bind a c-ares query to\a ev_driver, use this channel as the arg of the query. + */ +void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver); /* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is created successfully. */ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set); -/* Destroys \a ev_driver asynchronously. If \a ev_driver is already working, - destroys it immediately; otherwise, destroys it once - grpc_ares_ev_driver_start() is called */ -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver); +/* Destroys \a ev_driver asynchronously. Pending lookups lookups made on this + ev_driver will be cancelled and their on done callbacks will be invoked with + a status of ARES_ECANCELLED. */ +void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver); #endif /* GRPC_NATIVE_ADDRESS_RESOLVE */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 302a1037035..8266da2759e 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -45,6 +45,7 @@ #include #include #include +#include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" @@ -52,48 +53,82 @@ #include "src/core/lib/support/string.h" typedef struct fd_node { + /** the owner of this fd node */ + grpc_ares_ev_driver *ev_driver; + /** refcount of the node */ + gpr_refcount refs; + /** the grpc_fd owned by this fd node */ grpc_fd *grpc_fd; + /** a closure wrapping on_readable_cb, which should be invoked when the + grpc_fd in this node becomes readable. */ + grpc_closure read_closure; + /** a closure wrapping on_writable_cb, which should be invoked when the + grpc_fd in this node becomes writable. */ + grpc_closure write_closure; + /** next fd node in the list */ struct fd_node *next; + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** if the readable closure has been registered */ + bool readable_registered; + /** if the writable closure has been registered */ + bool writable_registered; } fd_node; struct grpc_ares_ev_driver { /** the ares_channel owned by this event driver */ ares_channel channel; - /** a closure wrapping the driver_cb, which should be invoked each time the ev - driver gets notified by fds. */ - grpc_closure driver_closure; /** pollset set for driving the IO events of the channel */ grpc_pollset_set *pollset_set; - /** has grpc_ares_ev_driver_destroy been called on this event driver? */ - bool closing; - /** an array of ares sockets that the ares channel owned by this event driver - is currently using */ - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - /** a bitmask that can tell if an ares socket in the socks array is readable - or/and writable */ - int socks_bitmask; - /** a list of grpc_fd that this event driver is currently using. */ - fd_node *fds; /** mutex guarding the rest of the state */ gpr_mu mu; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node *fds; /** is this event driver currently working? */ bool working; }; -static void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver); +static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver); + +static fd_node *fd_node_ref(fd_node *fdn) { + gpr_log(GPR_DEBUG, "ref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_ref(&fdn->refs); + return fdn; +} + +static void fd_node_unref(grpc_exec_ctx *exec_ctx, fd_node *fdn) { + gpr_log(GPR_DEBUG, "unref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + if (gpr_unref(&fdn->refs)) { + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + gpr_mu_destroy(&fdn->mu); + grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, + fdn->grpc_fd); + grpc_fd_shutdown(exec_ctx, fdn->grpc_fd); + grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished"); + gpr_free(fdn); + } +} grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { int status; + grpc_error *err = grpc_ares_init(); + if (err != GRPC_ERROR_NONE) { + return err; + } *ev_driver = gpr_malloc(sizeof(grpc_ares_ev_driver)); status = ares_init(&(*ev_driver)->channel); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create\n"); if (status != ARES_SUCCESS) { char *err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", ares_strerror(status)); - grpc_error *err = GRPC_ERROR_CREATE(err_msg); + err = GRPC_ERROR_CREATE(err_msg); gpr_free(err_msg); gpr_free(*ev_driver); return err; @@ -101,17 +136,43 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, gpr_mu_init(&(*ev_driver)->mu); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = NULL; - (*ev_driver)->closing = false; (*ev_driver)->working = false; return GRPC_ERROR_NONE; } -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver) { - ev_driver->closing = true; +static void grpc_ares_ev_driver_cleanup(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_ares_ev_driver *ev_driver = arg; + GPR_ASSERT(ev_driver->fds == NULL); + gpr_mu_lock(&ev_driver->mu); + gpr_mu_unlock(&ev_driver->mu); + gpr_mu_destroy(&ev_driver->mu); + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + grpc_ares_cleanup(); +} + +void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver) { + // Shutdowe all the working fds, invoke their resgistered on_readable_cb and + // on_writable_cb. + gpr_mu_lock(&ev_driver->mu); + fd_node *fdn; + for (fdn = ev_driver->fds; fdn; fdn = fdn->next) { + grpc_fd_shutdown(exec_ctx, fdn->grpc_fd); + fdn = fdn->next; + } + gpr_mu_unlock(&ev_driver->mu); + // Schedule the actual cleanup with exec_ctx, so that it happens after the + // fd shutdown process. + grpc_exec_ctx_sched( + exec_ctx, grpc_closure_create(grpc_ares_ev_driver_cleanup, ev_driver), + GRPC_ERROR_NONE, NULL); } // Search fd in the fd_node list head. This is an O(n) search, the max possible -// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 3 in our tests. +// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. static fd_node *get_fd(fd_node **head, int fd) { fd_node dummy_head; fd_node *node; @@ -131,93 +192,132 @@ static fd_node *get_fd(fd_node **head, int fd) { return NULL; } -// Process each file descriptor that may wake this callback up. -static void driver_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_ares_ev_driver *d = arg; - size_t i; +static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + fd_node *fdn = arg; + grpc_ares_ev_driver *ev_driver = fdn->ev_driver; + gpr_mu_lock(&fdn->mu); + fdn->readable_registered = false; + gpr_mu_unlock(&fdn->mu); + gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); if (error == GRPC_ERROR_NONE) { - for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - ares_socket_t read_fd = ARES_GETSOCK_READABLE(d->socks_bitmask, i) - ? d->socks[i] - : ARES_SOCKET_BAD; - ares_socket_t write_fd = ARES_GETSOCK_WRITABLE(d->socks_bitmask, i) - ? d->socks[i] - : ARES_SOCKET_BAD; - ares_process_fd(d->channel, read_fd, write_fd); - } + ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), + ARES_SOCKET_BAD); } else { - // error != GRPC_ERROR_NONE means the waiting timed out or the fd has been - // shutdown. In this case, the event driver cancels all the ongoing requests - // that are using its channel. The fds get cleaned up in the next - // grpc_ares_notify_on_event. - ares_cancel(d->channel); + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_canncel() and the on done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); } - grpc_ares_notify_on_event(exec_ctx, d); + fd_node_unref(exec_ctx, fdn); + gpr_mu_lock(&ev_driver->mu); + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); + gpr_mu_unlock(&ev_driver->mu); +} + +static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + fd_node *fdn = arg; + grpc_ares_ev_driver *ev_driver = fdn->ev_driver; + gpr_mu_lock(&fdn->mu); + fdn->writable_registered = false; + gpr_mu_unlock(&fdn->mu); + + gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, + grpc_fd_wrapped_fd(fdn->grpc_fd)); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_canncel() and the on done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + fd_node_unref(exec_ctx, fdn); + gpr_mu_lock(&ev_driver->mu); + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); + gpr_mu_unlock(&ev_driver->mu); } -ares_channel *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { +void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { return &ev_driver->channel; } // Get the file descriptors used by the ev_driver's ares channel, register // driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver) { - size_t i; +static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver) { fd_node *new_list = NULL; - if (!ev_driver->closing) { - ev_driver->socks_bitmask = - ares_getsock(ev_driver->channel, ev_driver->socks, ARES_GETSOCK_MAXNUM); - grpc_closure_init(&ev_driver->driver_closure, driver_cb, ev_driver); - for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(ev_driver->socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(ev_driver->socks_bitmask, i)) { - fd_node *fdn = get_fd(&ev_driver->fds, ev_driver->socks[i]); - if (fdn == NULL) { - char *fd_name; - gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); - fdn = gpr_malloc(sizeof(fd_node)); - fdn->grpc_fd = grpc_fd_create(ev_driver->socks[i], fd_name); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdn->grpc_fd); - gpr_free(fd_name); - } - fdn->next = new_list; - new_list = fdn; - - if (ARES_GETSOCK_READABLE(ev_driver->socks_bitmask, i)) { - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, - &ev_driver->driver_closure); - } - if (ARES_GETSOCK_WRITABLE(ev_driver->socks_bitmask, i)) { - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, - &ev_driver->driver_closure); - } + gpr_log(GPR_DEBUG, "notify_on_event\n"); + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + size_t i; + for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node *fdn = get_fd(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == NULL) { + char *fd_name; + gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); + fdn = gpr_malloc(sizeof(fd_node)); + gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); + fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + gpr_mu_init(&fdn->mu); + gpr_ref_init(&fdn->refs, 1); + grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); + grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); + gpr_free(fd_name); + } + fdn->next = new_list; + new_list = fdn; + gpr_mu_lock(&fdn->mu); + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + fd_node_ref(fdn); + gpr_log(GPR_DEBUG, "notify read on: %d", + grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure has + // not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + gpr_log(GPR_DEBUG, "notify write on: %d", + grpc_fd_wrapped_fd(fdn->grpc_fd)); + fd_node_ref(fdn); + grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + fdn->writable_registered = true; } + gpr_mu_unlock(&fdn->mu); } } - while (ev_driver->fds != NULL) { fd_node *cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - grpc_pollset_set_del_fd(exec_ctx, ev_driver->pollset_set, cur->grpc_fd); grpc_fd_shutdown(exec_ctx, cur->grpc_fd); - grpc_fd_orphan(exec_ctx, cur->grpc_fd, NULL, NULL, "c-ares query finished"); - gpr_free(cur); + fd_node_unref(exec_ctx, cur); } - ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. if (!new_list) { - gpr_mu_lock(&ev_driver->mu); ev_driver->working = false; - gpr_mu_unlock(&ev_driver->mu); - } - - if (ev_driver->closing) { - ares_destroy(ev_driver->channel); - gpr_free(ev_driver); + gpr_log(GPR_DEBUG, "ev driver stop working"); } } @@ -229,8 +329,8 @@ void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, return; } ev_driver->working = true; + grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); gpr_mu_unlock(&ev_driver->mu); - grpc_ares_notify_on_event(exec_ctx, ev_driver); } #endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c index 86037cd598f..408c5d89d74 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -92,7 +92,6 @@ typedef struct grpc_ares_request { static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } static void destroy_request(grpc_ares_request *request) { - grpc_ares_ev_driver_destroy(request->ev_driver); gpr_free(request->host); gpr_free(request->port); gpr_free(request->default_port); @@ -112,6 +111,7 @@ static void on_done_cb(void *arg, int status, int timeouts, grpc_ares_request *r = (grpc_ares_request *)arg; grpc_resolved_addresses **addresses = r->addrs_out; if (status == ARES_SUCCESS) { + gpr_log(GPR_DEBUG, "on_done_cb success"); GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; r->success = true; @@ -175,7 +175,9 @@ static void on_done_cb(void *arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } + gpr_log(GPR_DEBUG, "update pending queries: %d", r->pending_queries); if (--r->pending_queries == 0) { + gpr_log(GPR_DEBUG, "finish"); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL); grpc_exec_ctx_flush(&exec_ctx); @@ -189,12 +191,14 @@ static void request_resolving_address(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_ares_request *r = (grpc_ares_request *)arg; grpc_ares_ev_driver *ev_driver = r->ev_driver; - ares_channel *channel = grpc_ares_ev_driver_get_channel(ev_driver); + ares_channel *channel = + (ares_channel *)grpc_ares_ev_driver_get_channel(ev_driver); r->pending_queries = 1; if (grpc_ipv6_loopback_available()) { ++r->pending_queries; ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r); } + gpr_log(GPR_DEBUG, "pending queries: %d", r->pending_queries); ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r); grpc_ares_ev_driver_start(exec_ctx, ev_driver); } @@ -232,14 +236,13 @@ static int try_sockaddr_resolve(const char *name, const char *port, void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, - grpc_pollset_set *pollset_set, + grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_resolved_addresses **addrs) { char *host; char *port; grpc_error *err; grpc_ares_request *r = NULL; - grpc_ares_ev_driver *ev_driver; if (grpc_customized_resolve_address(name, default_port, addrs, &err) != 0) { grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); @@ -268,11 +271,7 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, if (try_sockaddr_resolve(host, port, addrs)) { grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); } else { - err = grpc_ares_ev_driver_create(&ev_driver, pollset_set); - if (err != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); - goto done; - } + gpr_log(GPR_DEBUG, "%s", host); r = gpr_malloc(sizeof(grpc_ares_request)); r->ev_driver = ev_driver; r->on_done = on_done; @@ -294,7 +293,7 @@ done: void (*grpc_resolve_address_ares)( grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, - grpc_pollset_set *pollset_set, grpc_closure *on_done, + grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl; grpc_error *grpc_ares_init(void) { diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h index b7440b533d1..ac231cad265 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -36,6 +36,7 @@ #include +#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -47,7 +48,7 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port, - grpc_pollset_set *pollset_set, + grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_resolved_addresses **addresses); diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index 303a15c7565..e9b3783e08b 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -228,7 +228,7 @@ void my_resolve_address(grpc_exec_ctx *exec_ctx, const char *addr, void my_resolve_address_async(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port, - grpc_pollset_set *pollset_set, + grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_resolved_addresses **addresses) { my_resolve_address(exec_ctx, addr, default_port, on_done, addresses);