Move the refcount from fd_node to ev_driver

reviewable/pr7771/r5
Yuchen Zeng 8 years ago
parent 3ae2663b95
commit 8a6cba2558
  1. 78
      src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
  2. 4
      src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h

@ -53,8 +53,6 @@
typedef struct fd_node { typedef struct fd_node {
/** the owner of this fd node */ /** the owner of this fd node */
grpc_ares_ev_driver *ev_driver; grpc_ares_ev_driver *ev_driver;
/** refcount of the node */
gpr_refcount refs;
/** the grpc_fd owned by this fd node */ /** the grpc_fd owned by this fd node */
grpc_fd *grpc_fd; grpc_fd *grpc_fd;
/** a closure wrapping on_readable_cb, which should be invoked when the /** a closure wrapping on_readable_cb, which should be invoked when the
@ -79,6 +77,8 @@ struct grpc_ares_ev_driver {
ares_channel channel; ares_channel channel;
/** pollset set for driving the IO events of the channel */ /** pollset set for driving the IO events of the channel */
grpc_pollset_set *pollset_set; grpc_pollset_set *pollset_set;
/** refcount of the event driver */
gpr_refcount refs;
/** mutex guarding the rest of the state */ /** mutex guarding the rest of the state */
gpr_mu mu; gpr_mu mu;
@ -91,27 +91,36 @@ struct grpc_ares_ev_driver {
static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver); grpc_ares_ev_driver *ev_driver);
static fd_node *fd_node_ref(fd_node *fdn) { static grpc_ares_ev_driver *grpc_ares_ev_driver_ref(
gpr_log(GPR_DEBUG, "ref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_ares_ev_driver *ev_driver) {
gpr_ref(&fdn->refs); gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
return fdn; gpr_ref(&ev_driver->refs);
return ev_driver;
} }
static void fd_node_unref(grpc_exec_ctx *exec_ctx, fd_node *fdn) { static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) {
gpr_log(GPR_DEBUG, "unref %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
if (gpr_unref(&fdn->refs)) { if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(ev_driver->fds == NULL);
GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&ev_driver->mu);
gpr_mu_destroy(&fdn->mu); ares_destroy(ev_driver->channel);
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, gpr_free(ev_driver);
fdn->grpc_fd); grpc_ares_cleanup();
grpc_fd_shutdown(exec_ctx, fdn->grpc_fd);
grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished");
gpr_free(fdn);
} }
} }
static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
gpr_mu_destroy(&fdn->mu);
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
grpc_fd_shutdown(exec_ctx, fdn->grpc_fd);
grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, "c-ares query finished");
gpr_free(fdn);
}
grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
grpc_pollset_set *pollset_set) { grpc_pollset_set *pollset_set) {
int status; int status;
@ -121,7 +130,7 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
} }
*ev_driver = gpr_malloc(sizeof(grpc_ares_ev_driver)); *ev_driver = gpr_malloc(sizeof(grpc_ares_ev_driver));
status = ares_init(&(*ev_driver)->channel); status = ares_init(&(*ev_driver)->channel);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create\n"); gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
if (status != ARES_SUCCESS) { if (status != ARES_SUCCESS) {
char *err_msg; char *err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
@ -132,25 +141,13 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
return err; return err;
} }
gpr_mu_init(&(*ev_driver)->mu); gpr_mu_init(&(*ev_driver)->mu);
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set; (*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = NULL; (*ev_driver)->fds = NULL;
(*ev_driver)->working = false; (*ev_driver)->working = false;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void grpc_ares_ev_driver_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_ares_ev_driver *ev_driver = arg;
GPR_ASSERT(ev_driver->fds == NULL);
gpr_mu_lock(&ev_driver->mu);
gpr_mu_unlock(&ev_driver->mu);
gpr_mu_destroy(&ev_driver->mu);
ares_destroy(ev_driver->channel);
gpr_free(ev_driver);
grpc_ares_cleanup();
}
void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx, void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
grpc_ares_ev_driver *ev_driver) { grpc_ares_ev_driver *ev_driver) {
// Shutdown all the working fds, invoke their registered on_readable_cb and // Shutdown all the working fds, invoke their registered on_readable_cb and
@ -162,11 +159,7 @@ void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
fdn = fdn->next; fdn = fdn->next;
} }
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
// Schedule the actual cleanup with exec_ctx, so that it happens after the grpc_ares_ev_driver_unref(ev_driver);
// fd shutdown process.
grpc_exec_ctx_sched(
exec_ctx, grpc_closure_create(grpc_ares_ev_driver_cleanup, ev_driver),
GRPC_ERROR_NONE, NULL);
} }
// Search fd in the fd_node list head. This is an O(n) search, the max possible // Search fd in the fd_node list head. This is an O(n) search, the max possible
@ -208,10 +201,10 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
fd_node_unref(exec_ctx, fdn);
gpr_mu_lock(&ev_driver->mu); gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
} }
static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
@ -235,10 +228,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
fd_node_unref(exec_ctx, fdn);
gpr_mu_lock(&ev_driver->mu); gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(exec_ctx, ev_driver); grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
} }
void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) { void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) {
@ -268,7 +261,6 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
gpr_mu_init(&fdn->mu); gpr_mu_init(&fdn->mu);
gpr_ref_init(&fdn->refs, 1);
grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn);
grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn);
grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd);
@ -281,7 +273,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
// not been registered with this socket. // not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) && if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) { !fdn->readable_registered) {
fd_node_ref(fdn); grpc_ares_ev_driver_ref(ev_driver);
gpr_log(GPR_DEBUG, "notify read on: %d", gpr_log(GPR_DEBUG, "notify read on: %d",
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_wrapped_fd(fdn->grpc_fd));
grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
@ -293,7 +285,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
!fdn->writable_registered) { !fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d", gpr_log(GPR_DEBUG, "notify write on: %d",
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_wrapped_fd(fdn->grpc_fd));
fd_node_ref(fdn); grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
fdn->writable_registered = true; fdn->writable_registered = true;
} }
@ -307,7 +299,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
fd_node *cur = ev_driver->fds; fd_node *cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next; ev_driver->fds = ev_driver->fds->next;
grpc_fd_shutdown(exec_ctx, cur->grpc_fd); grpc_fd_shutdown(exec_ctx, cur->grpc_fd);
fd_node_unref(exec_ctx, cur); fd_node_destroy(exec_ctx, cur);
} }
ev_driver->fds = new_list; ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done. // If the ev driver has no working fd, all the tasks are done.

@ -34,8 +34,6 @@
#ifndef GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H #ifndef GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
#define GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H #define GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
#include <grpc/support/port_platform.h>
#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
@ -44,7 +42,7 @@
/* Asynchronously resolve addr. Use default_port if a port isn't designated /* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. grpc_ares_init() must be called in addr, otherwise use the port in addr. grpc_ares_init() must be called
at least once before this function . */ at least once before this function. */
extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
const char *addr, const char *addr,
const char *default_port, const char *default_port,

Loading…
Cancel
Save