diff --git a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c index 64d4a5933ee..e5691942a40 100644 --- a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c @@ -41,7 +41,6 @@ #include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" -#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" @@ -65,8 +64,6 @@ typedef struct { char *default_port; /** channel args. */ grpc_channel_args *channel_args; - /** the event driver to drive the lookups */ - grpc_ares_ev_driver *ev_driver; /** Closures used by the combiner */ grpc_closure dns_ares_shutdown_locked; @@ -281,7 +278,7 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->resolving = true; r->addresses = NULL; grpc_resolve_address_ares( - exec_ctx, r->name_to_resolve, r->default_port, r->ev_driver, + exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set, grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); } @@ -301,7 +298,6 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { gpr_log(GPR_DEBUG, "dns_ares_destroy"); ares_dns_resolver *r = (ares_dns_resolver *)gr; - grpc_ares_ev_driver_destroy(exec_ctx, r->ev_driver); grpc_combiner_destroy(exec_ctx, r->combiner); grpc_ares_cleanup(); if (r->resolved_result != NULL) { @@ -340,12 +336,6 @@ static grpc_resolver *dns_ares_create(grpc_resolver_args *args, r = gpr_malloc(sizeof(ares_dns_resolver)); memset(r, 0, sizeof(*r)); grpc_resolver_init(&r->base, &dns_ares_resolver_vtable); - error = grpc_ares_ev_driver_create(&r->ev_driver, r->base.pollset_set); - if (error != GRPC_ERROR_NONE) { - GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", error); - gpr_free(r); - return NULL; - } r->combiner = grpc_combiner_create(NULL); r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h index 09ca047c3bc..7165df0afca 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -58,7 +58,6 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver will be cancelled and their on_done callbacks will be invoked with a status of ARES_ECANCELLED. */ -void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver); +void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver); #endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 8d8f5f05805..a4733dcb4bc 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -86,6 +86,8 @@ struct grpc_ares_ev_driver { fd_node *fds; /** is this event driver currently working? */ bool working; + /** is this event driver being shut down */ + bool shutting_down; }; static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, @@ -145,19 +147,19 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = NULL; (*ev_driver)->working = false; + (*ev_driver)->shutting_down = false; return GRPC_ERROR_NONE; } -void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, - grpc_ares_ev_driver *ev_driver) { - // Shutdown all the working fds, invoke their registered on_readable_cb and - // on_writable_cb. +void grpc_ares_ev_driver_destroy( // grpc_exec_ctx *exec_ctx, + grpc_ares_ev_driver *ev_driver) { + // It's not safe to shut down remaining fds here directly, becauses + // ares_host_callback does not provide an exec_ctx. We mark the event driver + // as being shut down. If the event driver is working, + // grpc_ares_notify_on_event_locked will shut down the fds; if it's not + // working, grpc_ares_ev_driver_unref will release it directly. gpr_mu_lock(&ev_driver->mu); - fd_node *fdn; - for (fdn = ev_driver->fds; fdn; fdn = fdn->next) { - grpc_fd_shutdown(exec_ctx, fdn->grpc_fd); - fdn = fdn->next; - } + ev_driver->shutting_down = true; gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } @@ -243,53 +245,57 @@ void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver) { fd_node *new_list = NULL; - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int socks_bitmask = - ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); - for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]); - // Create a new fd_node if sock[i] is not in the fd_node list. - if (fdn == NULL) { - char *fd_name; - gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); - fdn = gpr_malloc(sizeof(fd_node)); - gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); - fdn->ev_driver = ev_driver; - fdn->readable_registered = false; - fdn->writable_registered = false; - gpr_mu_init(&fdn->mu); - grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); - grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); - gpr_free(fd_name); - } - fdn->next = new_list; - new_list = fdn; - gpr_mu_lock(&fdn->mu); - // Register read_closure if the socket is readable and read_closure has - // not been registered with this socket. - if (ARES_GETSOCK_READABLE(socks_bitmask, i) && - !fdn->readable_registered) { - grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); - fdn->readable_registered = true; + if (!ev_driver->shutting_down) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == NULL) { + char *fd_name; + gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); + fdn = gpr_malloc(sizeof(fd_node)); + gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); + fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + gpr_mu_init(&fdn->mu); + grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); + grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, + fdn->grpc_fd); + gpr_free(fd_name); + } + fdn->next = new_list; + new_list = fdn; + gpr_mu_lock(&fdn->mu); + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + grpc_ares_ev_driver_ref(ev_driver); + gpr_log(GPR_DEBUG, "notify read on: %d", + grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure + // has + // not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + gpr_log(GPR_DEBUG, "notify write on: %d", + grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_ares_ev_driver_ref(ev_driver); + grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + fdn->writable_registered = true; + } + gpr_mu_unlock(&fdn->mu); } - // Register write_closure if the socket is writable and write_closure has - // not been registered with this socket. - if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && - !fdn->writable_registered) { - gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); - fdn->writable_registered = true; - } - gpr_mu_unlock(&fdn->mu); } } // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and @@ -311,12 +317,14 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, grpc_ares_ev_driver *ev_driver) { + grpc_ares_ev_driver_ref(ev_driver); gpr_mu_lock(&ev_driver->mu); if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); } gpr_mu_unlock(&ev_driver->mu); + grpc_ares_ev_driver_unref(ev_driver); } #endif /* !GRPC_NATIVE_ADDRESS_RESOLVE && GRPC_POSIX_SOCKET */ diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c index 9c83a8bf39d..f90222b2e6d 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -96,6 +96,32 @@ static uint16_t strhtons(const char *port) { return htons((unsigned short)atoi(port)); } +static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, + grpc_ares_request *r) { + // If there are no pending queries, invoke on_done callback and destroy the + // request + if (gpr_unref(&r->pending_queries)) { + if (exec_ctx == NULL) { + // A new exec_ctx is created here, as the c-ares interface does not + // provide one in ares_host_callback. It's safe to schedule on_done with + // the newly created exec_ctx, since the caller has been warned not to + // acquire locks in on_done. ares_dns_resolver is using combiner to + // protect resources needed by on_done. + grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_sched(&new_exec_ctx, r->on_done, r->error, NULL); + grpc_exec_ctx_finish(&new_exec_ctx); + } else { + grpc_exec_ctx_sched(exec_ctx, r->on_done, r->error, NULL); + } + gpr_mu_destroy(&r->mu); + grpc_ares_ev_driver_destroy(r->ev_driver); + gpr_free(r->host); + gpr_free(r->port); + gpr_free(r->default_port); + gpr_free(r); + } +} + static void on_done_cb(void *arg, int status, int timeouts, struct hostent *hostent) { grpc_ares_request *r = (grpc_ares_request *)arg; @@ -165,28 +191,13 @@ static void on_done_cb(void *arg, int status, int timeouts, } } gpr_mu_unlock(&r->mu); - // If there are no pending queries, invoke on_done callback and destroy the - // request - if (gpr_unref(&r->pending_queries)) { - // A new exec_ctx is created here, as the c-ares interface does not provide - // one in this callback. It's safe to schedule on_done with the newly - // created exec_ctx, since the caller has been warned not to acquire locks - // in on_done. ares_dns_resolver is using combiner to protect resources - // needed by on_done. - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL); - grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_destroy(&r->mu); - gpr_free(r->host); - gpr_free(r->port); - gpr_free(r->default_port); - gpr_free(r); - } + grpc_ares_request_unref(NULL, r); } void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, - grpc_ares_ev_driver *ev_driver, + // grpc_ares_ev_driver *ev_driver, + grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { grpc_error *err; @@ -216,6 +227,13 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, port = gpr_strdup(default_port); } + grpc_ares_ev_driver *ev_driver; + err = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + if (err != GRPC_ERROR_NONE) { + GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err); + goto error_cleanup; + } + grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request)); gpr_mu_init(&r->mu); r->ev_driver = ev_driver; @@ -228,13 +246,16 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, r->error = GRPC_ERROR_NONE; ares_channel *channel = (ares_channel *)grpc_ares_ev_driver_get_channel(r->ev_driver); - gpr_ref_init(&r->pending_queries, 1); + // An extra reference is put here to avoid destroying the request in + // on_done_cb before calling grpc_ares_ev_driver_start. + gpr_ref_init(&r->pending_queries, 2); if (grpc_ipv6_loopback_available()) { gpr_ref(&r->pending_queries); ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r); } ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r); grpc_ares_ev_driver_start(exec_ctx, ev_driver); + grpc_ares_request_unref(exec_ctx, r); return; error_cleanup: @@ -244,7 +265,8 @@ error_cleanup: void (*grpc_resolve_address_ares)( grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, - grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, + grpc_pollset_set *interested_parties, grpc_closure *on_done, + // grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl; grpc_error *grpc_ares_init(void) { diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h index ffeff86344b..3968a445ab6 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -43,12 +43,12 @@ /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. grpc_ares_init() must be called at least once before this function. \a on_done may be called directly in this - function without being scheduled with \a exec_ctx, it should not try to - acquire locks that are being held by the caller. */ + function without being scheduled with \a exec_ctx, it must not try to acquire + locks that are being held by the caller. */ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port, - grpc_ares_ev_driver *ev_driver, + grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addresses);