Move ev_driver from ares_dns_resolver to grpc_ares_request

reviewable/pr7771/r7
Yuchen Zeng 8 years ago
parent 3483cf586e
commit 117a300027
  1. 12
      src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c
  2. 3
      src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h
  3. 118
      src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
  4. 62
      src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c
  5. 6
      src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h

@ -41,7 +41,6 @@
#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/http_connect_handshaker.h"
#include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
@ -65,8 +64,6 @@ typedef struct {
char *default_port; char *default_port;
/** channel args. */ /** channel args. */
grpc_channel_args *channel_args; grpc_channel_args *channel_args;
/** the event driver to drive the lookups */
grpc_ares_ev_driver *ev_driver;
/** Closures used by the combiner */ /** Closures used by the combiner */
grpc_closure dns_ares_shutdown_locked; grpc_closure dns_ares_shutdown_locked;
@ -281,7 +278,7 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
r->resolving = true; r->resolving = true;
r->addresses = NULL; r->addresses = NULL;
grpc_resolve_address_ares( grpc_resolve_address_ares(
exec_ctx, r->name_to_resolve, r->default_port, r->ev_driver, exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set,
grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); grpc_closure_create(dns_ares_on_resolved, r), &r->addresses);
} }
@ -301,7 +298,6 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_log(GPR_DEBUG, "dns_ares_destroy"); gpr_log(GPR_DEBUG, "dns_ares_destroy");
ares_dns_resolver *r = (ares_dns_resolver *)gr; ares_dns_resolver *r = (ares_dns_resolver *)gr;
grpc_ares_ev_driver_destroy(exec_ctx, r->ev_driver);
grpc_combiner_destroy(exec_ctx, r->combiner); grpc_combiner_destroy(exec_ctx, r->combiner);
grpc_ares_cleanup(); grpc_ares_cleanup();
if (r->resolved_result != NULL) { if (r->resolved_result != NULL) {
@ -340,12 +336,6 @@ static grpc_resolver *dns_ares_create(grpc_resolver_args *args,
r = gpr_malloc(sizeof(ares_dns_resolver)); r = gpr_malloc(sizeof(ares_dns_resolver));
memset(r, 0, sizeof(*r)); memset(r, 0, sizeof(*r));
grpc_resolver_init(&r->base, &dns_ares_resolver_vtable); grpc_resolver_init(&r->base, &dns_ares_resolver_vtable);
error = grpc_ares_ev_driver_create(&r->ev_driver, r->base.pollset_set);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", error);
gpr_free(r);
return NULL;
}
r->combiner = grpc_combiner_create(NULL); r->combiner = grpc_combiner_create(NULL);
r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
r->default_port = gpr_strdup(default_port); r->default_port = gpr_strdup(default_port);

@ -58,7 +58,6 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
will be cancelled and their on_done callbacks will be invoked with a status will be cancelled and their on_done callbacks will be invoked with a status
of ARES_ECANCELLED. */ of ARES_ECANCELLED. */
void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver);
grpc_ares_ev_driver *ev_driver);
#endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H */ #endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H */

@ -86,6 +86,8 @@ struct grpc_ares_ev_driver {
fd_node *fds; fd_node *fds;
/** is this event driver currently working? */ /** is this event driver currently working? */
bool working; bool working;
/** is this event driver being shut down */
bool shutting_down;
}; };
static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
@ -145,19 +147,19 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
(*ev_driver)->pollset_set = pollset_set; (*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = NULL; (*ev_driver)->fds = NULL;
(*ev_driver)->working = false; (*ev_driver)->working = false;
(*ev_driver)->shutting_down = false;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, void grpc_ares_ev_driver_destroy( // grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) { grpc_ares_ev_driver *ev_driver) {
// Shutdown all the working fds, invoke their registered on_readable_cb and // It's not safe to shut down remaining fds here directly, becauses
// on_writable_cb. // ares_host_callback does not provide an exec_ctx. We mark the event driver
// as being shut down. If the event driver is working,
// grpc_ares_notify_on_event_locked will shut down the fds; if it's not
// working, grpc_ares_ev_driver_unref will release it directly.
gpr_mu_lock(&ev_driver->mu); gpr_mu_lock(&ev_driver->mu);
fd_node *fdn; ev_driver->shutting_down = true;
for (fdn = ev_driver->fds; fdn; fdn = fdn->next) {
grpc_fd_shutdown(exec_ctx, fdn->grpc_fd);
fdn = fdn->next;
}
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
@ -243,53 +245,57 @@ void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) {
static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) { grpc_ares_ev_driver *ev_driver) {
fd_node *new_list = NULL; fd_node *new_list = NULL;
ares_socket_t socks[ARES_GETSOCK_MAXNUM]; if (!ev_driver->shutting_down) {
int socks_bitmask = ares_socket_t socks[ARES_GETSOCK_MAXNUM];
ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); int socks_bitmask =
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
if (ARES_GETSOCK_READABLE(socks_bitmask, i) || for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]); ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
// Create a new fd_node if sock[i] is not in the fd_node list. fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]);
if (fdn == NULL) { // Create a new fd_node if sock[i] is not in the fd_node list.
char *fd_name; if (fdn == NULL) {
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); char *fd_name;
fdn = gpr_malloc(sizeof(fd_node)); gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); fdn = gpr_malloc(sizeof(fd_node));
fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
fdn->ev_driver = ev_driver; fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
fdn->readable_registered = false; fdn->ev_driver = ev_driver;
fdn->writable_registered = false; fdn->readable_registered = false;
gpr_mu_init(&fdn->mu); fdn->writable_registered = false;
grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); gpr_mu_init(&fdn->mu);
grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn);
grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn);
gpr_free(fd_name); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set,
} fdn->grpc_fd);
fdn->next = new_list; gpr_free(fd_name);
new_list = fdn; }
gpr_mu_lock(&fdn->mu); fdn->next = new_list;
// Register read_closure if the socket is readable and read_closure has new_list = fdn;
// not been registered with this socket. gpr_mu_lock(&fdn->mu);
if (ARES_GETSOCK_READABLE(socks_bitmask, i) && // Register read_closure if the socket is readable and read_closure has
!fdn->readable_registered) { // not been registered with this socket.
grpc_ares_ev_driver_ref(ev_driver); if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
gpr_log(GPR_DEBUG, "notify read on: %d", !fdn->readable_registered) {
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); gpr_log(GPR_DEBUG, "notify read on: %d",
fdn->readable_registered = true; grpc_fd_wrapped_fd(fdn->grpc_fd));
grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure
// has
// not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d",
grpc_fd_wrapped_fd(fdn->grpc_fd));
grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
fdn->writable_registered = true;
}
gpr_mu_unlock(&fdn->mu);
} }
// Register write_closure if the socket is writable and write_closure has
// not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d",
grpc_fd_wrapped_fd(fdn->grpc_fd));
grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
fdn->writable_registered = true;
}
gpr_mu_unlock(&fdn->mu);
} }
} }
// Any remaining fds in ev_driver->fds were not returned by ares_getsock() and // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
@ -311,12 +317,14 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx, void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) { grpc_ares_ev_driver *ev_driver) {
grpc_ares_ev_driver_ref(ev_driver);
gpr_mu_lock(&ev_driver->mu); gpr_mu_lock(&ev_driver->mu);
if (!ev_driver->working) { if (!ev_driver->working) {
ev_driver->working = true; ev_driver->working = true;
grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
} }
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
} }
#endif /* !GRPC_NATIVE_ADDRESS_RESOLVE && GRPC_POSIX_SOCKET */ #endif /* !GRPC_NATIVE_ADDRESS_RESOLVE && GRPC_POSIX_SOCKET */

@ -96,6 +96,32 @@ static uint16_t strhtons(const char *port) {
return htons((unsigned short)atoi(port)); return htons((unsigned short)atoi(port));
} }
static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
grpc_ares_request *r) {
// If there are no pending queries, invoke on_done callback and destroy the
// request
if (gpr_unref(&r->pending_queries)) {
if (exec_ctx == NULL) {
// A new exec_ctx is created here, as the c-ares interface does not
// provide one in ares_host_callback. It's safe to schedule on_done with
// the newly created exec_ctx, since the caller has been warned not to
// acquire locks in on_done. ares_dns_resolver is using combiner to
// protect resources needed by on_done.
grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&new_exec_ctx, r->on_done, r->error, NULL);
grpc_exec_ctx_finish(&new_exec_ctx);
} else {
grpc_exec_ctx_sched(exec_ctx, r->on_done, r->error, NULL);
}
gpr_mu_destroy(&r->mu);
grpc_ares_ev_driver_destroy(r->ev_driver);
gpr_free(r->host);
gpr_free(r->port);
gpr_free(r->default_port);
gpr_free(r);
}
}
static void on_done_cb(void *arg, int status, int timeouts, static void on_done_cb(void *arg, int status, int timeouts,
struct hostent *hostent) { struct hostent *hostent) {
grpc_ares_request *r = (grpc_ares_request *)arg; grpc_ares_request *r = (grpc_ares_request *)arg;
@ -165,28 +191,13 @@ static void on_done_cb(void *arg, int status, int timeouts,
} }
} }
gpr_mu_unlock(&r->mu); gpr_mu_unlock(&r->mu);
// If there are no pending queries, invoke on_done callback and destroy the grpc_ares_request_unref(NULL, r);
// request
if (gpr_unref(&r->pending_queries)) {
// A new exec_ctx is created here, as the c-ares interface does not provide
// one in this callback. It's safe to schedule on_done with the newly
// created exec_ctx, since the caller has been warned not to acquire locks
// in on_done. ares_dns_resolver is using combiner to protect resources
// needed by on_done.
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_destroy(&r->mu);
gpr_free(r->host);
gpr_free(r->port);
gpr_free(r->default_port);
gpr_free(r);
}
} }
void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port, const char *default_port,
grpc_ares_ev_driver *ev_driver, // grpc_ares_ev_driver *ev_driver,
grpc_pollset_set *interested_parties,
grpc_closure *on_done, grpc_closure *on_done,
grpc_resolved_addresses **addrs) { grpc_resolved_addresses **addrs) {
grpc_error *err; grpc_error *err;
@ -216,6 +227,13 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
port = gpr_strdup(default_port); port = gpr_strdup(default_port);
} }
grpc_ares_ev_driver *ev_driver;
err = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
if (err != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err);
goto error_cleanup;
}
grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request)); grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request));
gpr_mu_init(&r->mu); gpr_mu_init(&r->mu);
r->ev_driver = ev_driver; r->ev_driver = ev_driver;
@ -228,13 +246,16 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
ares_channel *channel = ares_channel *channel =
(ares_channel *)grpc_ares_ev_driver_get_channel(r->ev_driver); (ares_channel *)grpc_ares_ev_driver_get_channel(r->ev_driver);
gpr_ref_init(&r->pending_queries, 1); // An extra reference is put here to avoid destroying the request in
// on_done_cb before calling grpc_ares_ev_driver_start.
gpr_ref_init(&r->pending_queries, 2);
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
gpr_ref(&r->pending_queries); gpr_ref(&r->pending_queries);
ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r); ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r);
} }
ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r); ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r);
grpc_ares_ev_driver_start(exec_ctx, ev_driver); grpc_ares_ev_driver_start(exec_ctx, ev_driver);
grpc_ares_request_unref(exec_ctx, r);
return; return;
error_cleanup: error_cleanup:
@ -244,7 +265,8 @@ error_cleanup:
void (*grpc_resolve_address_ares)( void (*grpc_resolve_address_ares)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_ares_ev_driver *ev_driver, grpc_closure *on_done, grpc_pollset_set *interested_parties, grpc_closure *on_done,
// grpc_ares_ev_driver *ev_driver, grpc_closure *on_done,
grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl; grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl;
grpc_error *grpc_ares_init(void) { grpc_error *grpc_ares_init(void) {

@ -43,12 +43,12 @@
/* Asynchronously resolve addr. Use default_port if a port isn't designated in /* Asynchronously resolve addr. Use default_port if a port isn't designated in
addr, otherwise use the port in addr. grpc_ares_init() must be called at addr, otherwise use the port in addr. grpc_ares_init() must be called at
least once before this function. \a on_done may be called directly in this least once before this function. \a on_done may be called directly in this
function without being scheduled with \a exec_ctx, it should not try to function without being scheduled with \a exec_ctx, it must not try to acquire
acquire locks that are being held by the caller. */ locks that are being held by the caller. */
extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
const char *addr, const char *addr,
const char *default_port, const char *default_port,
grpc_ares_ev_driver *ev_driver, grpc_pollset_set *interested_parties,
grpc_closure *on_done, grpc_closure *on_done,
grpc_resolved_addresses **addresses); grpc_resolved_addresses **addresses);

Loading…
Cancel
Save