diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 8117f23ae56..49b7485c947 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -53,8 +53,6 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver *ev_driver; - /** refcount of the node */ - gpr_refcount refs; /** the grpc_fd owned by this fd node */ grpc_fd *grpc_fd; /** a closure wrapping on_readable_cb, which should be invoked when the @@ -79,6 +77,8 @@ struct grpc_ares_ev_driver { ares_channel channel; /** pollset set for driving the IO events of the channel */ grpc_pollset_set *pollset_set; + /** refcount of the event driver */ + gpr_refcount refs; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -91,27 +91,36 @@ struct grpc_ares_ev_driver { static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver); -static fd_node *fd_node_ref(fd_node *fdn) { - gpr_log(GPR_DEBUG, "ref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); - gpr_ref(&fdn->refs); - return fdn; +static grpc_ares_ev_driver *grpc_ares_ev_driver_ref( + grpc_ares_ev_driver *ev_driver) { + gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + gpr_ref(&ev_driver->refs); + return ev_driver; } -static void fd_node_unref(grpc_exec_ctx *exec_ctx, fd_node *fdn) { - gpr_log(GPR_DEBUG, "unref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); - if (gpr_unref(&fdn->refs)) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); - GPR_ASSERT(!fdn->readable_registered); - GPR_ASSERT(!fdn->writable_registered); - gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, - fdn->grpc_fd); - grpc_fd_shutdown(exec_ctx, fdn->grpc_fd); - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished"); - gpr_free(fdn); +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) { + gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + if (gpr_unref(&ev_driver->refs)) { + gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GPR_ASSERT(ev_driver->fds == NULL); + gpr_mu_destroy(&ev_driver->mu); + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + grpc_ares_cleanup(); } } +static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + gpr_mu_destroy(&fdn->mu); + grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); + grpc_fd_shutdown(exec_ctx, fdn->grpc_fd); + grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished"); + gpr_free(fdn); +} + grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { int status; @@ -121,7 +130,7 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, } *ev_driver = gpr_malloc(sizeof(grpc_ares_ev_driver)); status = ares_init(&(*ev_driver)->channel); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create\n"); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); if (status != ARES_SUCCESS) { char *err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", @@ -132,25 +141,13 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, return err; } gpr_mu_init(&(*ev_driver)->mu); + gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = NULL; (*ev_driver)->working = false; return GRPC_ERROR_NONE; } -static void grpc_ares_ev_driver_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_ares_ev_driver *ev_driver = arg; - GPR_ASSERT(ev_driver->fds == NULL); - gpr_mu_lock(&ev_driver->mu); - gpr_mu_unlock(&ev_driver->mu); - gpr_mu_destroy(&ev_driver->mu); - ares_destroy(ev_driver->channel); - gpr_free(ev_driver); - grpc_ares_cleanup(); -} - void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver) { // Shutdown all the working fds, invoke their registered on_readable_cb and @@ -162,11 +159,7 @@ void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, fdn = fdn->next; } gpr_mu_unlock(&ev_driver->mu); - // Schedule the actual cleanup with exec_ctx, so that it happens after the - // fd shutdown process. - grpc_exec_ctx_sched( - exec_ctx, grpc_closure_create(grpc_ares_ev_driver_cleanup, ev_driver), - GRPC_ERROR_NONE, NULL); + grpc_ares_ev_driver_unref(ev_driver); } // Search fd in the fd_node list head. This is an O(n) search, the max possible @@ -208,10 +201,10 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - fd_node_unref(exec_ctx, fdn); gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); gpr_mu_unlock(&ev_driver->mu); + grpc_ares_ev_driver_unref(ev_driver); } static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, @@ -235,10 +228,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - fd_node_unref(exec_ctx, fdn); gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); gpr_mu_unlock(&ev_driver->mu); + grpc_ares_ev_driver_unref(ev_driver); } void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { @@ -268,7 +261,6 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, fdn->readable_registered = false; fdn->writable_registered = false; gpr_mu_init(&fdn->mu); - gpr_ref_init(&fdn->refs, 1); grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); @@ -281,7 +273,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, // not been registered with this socket. if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { - fd_node_ref(fdn); + grpc_ares_ev_driver_ref(ev_driver); gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); @@ -293,7 +285,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, !fdn->writable_registered) { gpr_log(GPR_DEBUG, "notify write on: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); - fd_node_ref(fdn); + grpc_ares_ev_driver_ref(ev_driver); grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); fdn->writable_registered = true; } @@ -307,7 +299,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, fd_node *cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; grpc_fd_shutdown(exec_ctx, cur->grpc_fd); - fd_node_unref(exec_ctx, cur); + fd_node_destroy(exec_ctx, cur); } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h index 6f6caf1849f..465b9af7bd3 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -34,8 +34,6 @@ #ifndef GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H #define GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H -#include - #include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" @@ -44,7 +42,7 @@ /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. grpc_ares_init() must be called - at least once before this function . */ + at least once before this function. */ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port,