Address review comments

reviewable/pr7771/r5
Yuchen Zeng 8 years ago
parent bd363544fd
commit 3ae2663b95
  1. 12
      src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h
  2. 40
      src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
  3. 119
      src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c
  4. 3
      src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h

@ -45,9 +45,9 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver);
/* Returns a pointer of ares_channel. This channel is owned by \a ev_driver. To
bind a c-ares query to\a ev_driver, use this channel as the arg of the query.
*/
/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to
\a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the
query. */
void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver);
/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
@ -55,9 +55,9 @@ void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver);
grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
grpc_pollset_set *pollset_set);
/* Destroys \a ev_driver asynchronously. Pending lookups lookups made on this
ev_driver will be cancelled and their on done callbacks will be invoked with
a status of ARES_ECANCELLED. */
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
will be cancelled and their on_done callbacks will be invoked with a status
of ARES_ECANCELLED. */
void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver);

@ -32,8 +32,7 @@
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#ifndef GRPC_NATIVE_ADDRESS_RESOLVE
#ifdef GRPC_POSIX_SOCKET
#if !defined(GRPC_NATIVE_ADDRESS_RESOLVE) && defined(GRPC_POSIX_SOCKET)
#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h"
@ -154,7 +153,7 @@ static void grpc_ares_ev_driver_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) {
// Shutdowe all the working fds, invoke their resgistered on_readable_cb and
// Shutdown all the working fds, invoke their registered on_readable_cb and
// on_writable_cb.
gpr_mu_lock(&ev_driver->mu);
fd_node *fdn;
@ -172,16 +171,13 @@ void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node *get_fd(fd_node **head, int fd) {
static fd_node *pop_fd_node(fd_node **head, int fd) {
fd_node dummy_head;
fd_node *node;
fd_node *ret;
dummy_head.next = *head;
node = &dummy_head;
fd_node *node = &dummy_head;
while (node->next != NULL) {
if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) {
ret = node->next;
fd_node *ret = node->next;
node->next = node->next->next;
*head = dummy_head.next;
return ret;
@ -206,7 +202,7 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_canncel() and the on done callbacks will be invoked
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
@ -233,7 +229,7 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_canncel() and the on done callbacks will be invoked
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
@ -254,15 +250,13 @@ void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) {
static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) {
fd_node *new_list = NULL;
gpr_log(GPR_DEBUG, "notify_on_event\n");
ares_socket_t socks[ARES_GETSOCK_MAXNUM];
int socks_bitmask =
ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
size_t i;
for (i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
fd_node *fdn = get_fd(&ev_driver->fds, socks[i]);
fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == NULL) {
char *fd_name;
@ -306,6 +300,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&fdn->mu);
}
}
// Any remaining fds in ev_driver->fds was not returned by ares_getsock() and
// is therefore no longer in use, so they can be shut donw and removed from
// the list.
while (ev_driver->fds != NULL) {
fd_node *cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
@ -314,7 +311,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
}
ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done.
if (!new_list) {
if (new_list == NULL) {
ev_driver->working = false;
gpr_log(GPR_DEBUG, "ev driver stop working");
}
@ -323,14 +320,11 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) {
gpr_mu_lock(&ev_driver->mu);
if (ev_driver->working) {
gpr_mu_unlock(&ev_driver->mu);
return;
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
}
ev_driver->working = true;
grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
gpr_mu_unlock(&ev_driver->mu);
}
#endif /* GRPC_POSIX_SOCKET */
#endif /* GRPC_NATIVE_ADDRESS_RESOLVE */
#endif /* !GRPC_NATIVE_ADDRESS_RESOLVE && GRPC_POSIX_SOCKET */

@ -61,27 +61,22 @@ static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
typedef struct grpc_ares_request {
/** host to resolve, parsed from the name to resolve, set in
grpc_resolve_address_ares_impl */
/** following members are set in grpc_resolve_address_ares_impl */
/** host to resolve, parsed from the name to resolve */
char *host;
/** port to fill in sockaddr_in, parsed from the name to resolve, set in
grpc_resolve_address_ares_impl */
/** port to fill in sockaddr_in, parsed from the name to resolve */
char *port;
/** default port to use, set in grpc_resolve_address_ares_impl */
/** default port to use */
char *default_port;
/** closure to call when the request completes, set in
grpc_resolve_address_ares_impl */
/** closure to call when the request completes */
grpc_closure *on_done;
/** the pointer to receive the resolved addresses, set in
grpc_resolve_address_ares_impl */
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses **addrs_out;
/** the evernt driver used by this request, set in
grpc_resolve_address_ares_impl */
/** the evernt driver used by this request */
grpc_ares_ev_driver *ev_driver;
/** the closure wraps request_resolving_address, initialized in
grpc_resolve_address_ares_impl */
/** the closure wraps request_resolving_address */
grpc_closure request_closure;
/** number of ongoing queries, set in grpc_resolve_address_ares_impl */
/** number of ongoing queries */
gpr_refcount pending_queries;
/** mutex guarding the rest of the state */
@ -94,10 +89,17 @@ typedef struct grpc_ares_request {
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
static void destroy_request(grpc_ares_request *request) {
gpr_free(request->host);
gpr_free(request->port);
gpr_free(request->default_port);
static void ares_request_unref(grpc_ares_request *r) {
if (gpr_unref(&r->pending_queries)) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_destroy(&r->mu);
gpr_free(r->host);
gpr_free(r->port);
gpr_free(r->default_port);
gpr_free(r);
}
}
static uint16_t strhtons(const char *port) {
@ -178,14 +180,7 @@ static void on_done_cb(void *arg, int status, int timeouts,
}
}
gpr_mu_unlock(&r->mu);
if (gpr_unref(&r->pending_queries)) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL);
grpc_exec_ctx_flush(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
destroy_request(r);
gpr_free(r);
}
ares_request_unref(r);
}
static void request_resolving_address(grpc_exec_ctx *exec_ctx, void *arg,
@ -203,47 +198,12 @@ static void request_resolving_address(grpc_exec_ctx *exec_ctx, void *arg,
grpc_ares_ev_driver_start(exec_ctx, ev_driver);
}
static int try_sockaddr_resolve(const char *name, const char *port,
grpc_resolved_addresses **addresses) {
struct sockaddr_in sa;
struct sockaddr_in6 sa6;
memset(&sa, 0, sizeof(struct sockaddr_in));
memset(&sa6, 0, sizeof(struct sockaddr_in6));
if (0 != ares_inet_pton(AF_INET, name, &(sa.sin_addr))) {
*addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
(*addresses)->naddrs = 1;
(*addresses)->addrs =
gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
(*addresses)->addrs[0].len = sizeof(struct sockaddr_in);
sa.sin_family = AF_INET;
sa.sin_port = strhtons(port);
memcpy(&(*addresses)->addrs[0].addr, &sa, sizeof(struct sockaddr_in));
return 1;
}
if (0 != ares_inet_pton(AF_INET6, name, &(sa6.sin6_addr))) {
*addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
(*addresses)->naddrs = 1;
(*addresses)->addrs =
gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
(*addresses)->addrs[0].len = sizeof(struct sockaddr_in6);
sa6.sin6_family = AF_INET6;
sa6.sin6_port = strhtons(port);
memcpy(&(*addresses)->addrs[0].addr, &sa6, sizeof(struct sockaddr_in6));
return 1;
}
return 0;
}
void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_ares_ev_driver *ev_driver,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
char *host;
char *port;
grpc_error *err;
grpc_ares_request *r = NULL;
if (grpc_customized_resolve_address(name, default_port, addrs, &err) != 0) {
grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
return;
@ -252,40 +212,39 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
err = GRPC_ERROR_NONE;
/* parse name, splitting it into host and port parts */
char *host;
char *port;
gpr_split_host_port(name, &host, &port);
if (host == NULL) {
err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"),
GRPC_ERROR_STR_TARGET_ADDRESS, name);
grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
goto done;
goto error_cleanup;
} else if (port == NULL) {
if (default_port == NULL) {
err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"),
GRPC_ERROR_STR_TARGET_ADDRESS, name);
grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
goto done;
goto error_cleanup;
}
port = gpr_strdup(default_port);
}
if (try_sockaddr_resolve(host, port, addrs)) {
grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL);
} else {
r = gpr_malloc(sizeof(grpc_ares_request));
gpr_mu_init(&r->mu);
r->ev_driver = ev_driver;
r->on_done = on_done;
r->addrs_out = addrs;
r->default_port = gpr_strdup(default_port);
r->port = gpr_strdup(port);
r->host = gpr_strdup(host);
r->success = false;
r->error = GRPC_ERROR_NONE;
grpc_closure_init(&r->request_closure, request_resolving_address, r);
grpc_exec_ctx_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE, NULL);
}
grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request));
gpr_mu_init(&r->mu);
r->ev_driver = ev_driver;
r->on_done = on_done;
r->addrs_out = addrs;
r->default_port = gpr_strdup(default_port);
r->port = port;
r->host = host;
r->success = false;
r->error = GRPC_ERROR_NONE;
grpc_closure_init(&r->request_closure, request_resolving_address, r);
grpc_exec_ctx_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE, NULL);
return;
done:
error_cleanup:
gpr_free(host);
gpr_free(port);
}

@ -58,8 +58,7 @@ grpc_error *grpc_ares_init(void);
/* Uninitialized gRPC ares wrapper. If there was more than one previous call to
grpc_ares_init(), this function uninitializes the gRPC ares wrapper only if
it is the call matching the call to grpc_ares_init() which initialized the
wrapper. */
it has been called the same number of times as grpc_ares_init(). */
void grpc_ares_cleanup(void);
#endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H */

Loading…
Cancel
Save