diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 965d4e53dc6..dc838de7153 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -249,7 +249,6 @@ static void started_call(void *arg, int iomgr_success) { static void picked_target(void *arg, int iomgr_success) { call_data *calld = arg; - channel_data *chand = calld->elem->channel_data; grpc_transport_stream_op op; if (calld->picked_channel == NULL) { @@ -268,7 +267,9 @@ static void picked_target(void *arg, int iomgr_success) { memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task); + grpc_subchannel_create_call(calld->picked_channel, &op, + &calld->subchannel_call, + &calld->async_setup_task); } } } diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index c3a8962ea66..60c392f85b5 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -41,11 +41,11 @@ void grpc_connector_unref(grpc_connector *connector) { connector->vtable->unref(connector); } -void grpc_connector_connect(grpc_connector *connector, - const grpc_channel_args *channel_args, - grpc_mdctx *metadata_context, - grpc_transport **transport, - grpc_iomgr_closure *notify) { - connector->vtable->connect(connector, channel_args, metadata_context, - transport, notify); +void grpc_connector_connect( + grpc_connector *connector, grpc_pollset_set *pollset_set, + const struct sockaddr *addr, int addr_len, gpr_timespec deadline, + const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, + grpc_transport **transport, grpc_iomgr_closure *notify) { + connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline, + channel_args, metadata_context, transport, notify); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 8ada75ec708..72414377293 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H +#include "src/core/iomgr/sockaddr.h" #include "src/core/transport/transport.h" typedef struct grpc_connector grpc_connector; @@ -46,18 +47,19 @@ struct grpc_connector { struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_connector *connector); - void (*connect)(grpc_connector *connector, - const grpc_channel_args *channel_args, + void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set, + const struct sockaddr *addr, int addr_len, + gpr_timespec deadline, const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, grpc_transport **transport, grpc_iomgr_closure *notify); }; void grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector); -void grpc_connector_connect(grpc_connector *connector, - const grpc_channel_args *channel_args, - grpc_mdctx *metadata_context, - grpc_transport **transport, - grpc_iomgr_closure *notify); +void grpc_connector_connect( + grpc_connector *connector, grpc_pollset_set *pollset_set, + const struct sockaddr *addr, int addr_len, gpr_timespec deadline, + const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, + grpc_transport **transport, grpc_iomgr_closure *notify); #endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 9637cf39fec..2b4c7ea1d38 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -44,6 +44,19 @@ typedef struct { grpc_subchannel *subchannel; } connection; +typedef struct waiting_for_connect { + struct waiting_for_connect *next; + grpc_iomgr_closure *notify; + grpc_transport_stream_op *initial_op; + grpc_subchannel_call **target; +} waiting_for_connect; + +typedef struct connectivity_state_watcher { + struct connectivity_state_watcher *next; + grpc_iomgr_closure *notify; + grpc_connectivity_state *current; +} connectivity_state_watcher; + struct grpc_subchannel { gpr_refcount refs; grpc_connector *connector; @@ -56,6 +69,8 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; + /** metadata context */ + grpc_mdctx *mdctx; /** set during connection */ grpc_transport *connecting_transport; @@ -63,6 +78,10 @@ struct grpc_subchannel { /** callback for connection finishing */ grpc_iomgr_closure connected; + /** pollset_set tracking who's interested in a connection + being setup */ + grpc_pollset_set pollset_set; + /** mutex protecting remaining elements */ gpr_mu mu; @@ -70,8 +89,10 @@ struct grpc_subchannel { connection *active; /** are we connecting */ int connecting; - /** closures waiting for a connection */ - grpc_iomgr_closure *waiting; + /** things waiting for a connection */ + waiting_for_connect *waiting; + /** things watching the connectivity state */ + connectivity_state_watcher *watchers; }; struct grpc_subchannel_call { @@ -82,6 +103,9 @@ struct grpc_subchannel_call { #define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1) static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); +static void connectivity_state_changed_locked(grpc_subchannel *c); +static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); +static gpr_timespec compute_connect_deadline(grpc_subchannel *c); /* * grpc_subchannel implementation @@ -94,10 +118,21 @@ void grpc_subchannel_unref(grpc_subchannel *c) { gpr_free(c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); + grpc_mdctx_unref(c->mdctx); gpr_free(c); } } +void grpc_subchannel_add_interested_party(grpc_subchannel *c, + grpc_pollset *pollset) { + grpc_pollset_set_add_pollset(&c->pollset_set, pollset); +} + +void grpc_subchannel_del_interested_party(grpc_subchannel *c, + grpc_pollset *pollset) { + grpc_pollset_set_del_pollset(&c->pollset_set, pollset); +} + grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); @@ -113,12 +148,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, memcpy(c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); + c->mdctx = args->mdctx; + grpc_mdctx_ref(c->mdctx); gpr_mu_init(&c->mu); return c; } void grpc_subchannel_create_call(grpc_subchannel *c, - grpc_mdctx *mdctx, grpc_transport_stream_op *initial_op, grpc_subchannel_call **target, grpc_iomgr_closure *notify) { @@ -132,19 +168,101 @@ void grpc_subchannel_create_call(grpc_subchannel *c, *target = create_call(con, initial_op); notify->cb(notify->cb_arg, 1); } else { - notify->next = c->waiting; - c->waiting = notify; + waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); + w4c->next = c->waiting; + w4c->notify = notify; + w4c->initial_op = initial_op; + w4c->target = target; + c->waiting = w4c; + grpc_subchannel_add_interested_party(c, initial_op->bind_pollset); if (!c->connecting) { c->connecting = 1; + connectivity_state_changed_locked(c); gpr_mu_unlock(&c->mu); - grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected); + grpc_connector_connect(c->connector, &c->pollset_set, c->addr, + c->addr_len, compute_connect_deadline(c), c->args, + c->mdctx, &c->connecting_transport, &c->connected); } else { gpr_mu_unlock(&c->mu); } } } +grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { + grpc_connectivity_state state; + gpr_mu_lock(&c->mu); + state = compute_connectivity_locked(c); + gpr_mu_unlock(&c->mu); + return state; +} + +void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, + grpc_connectivity_state *state, + grpc_iomgr_closure *notify) { + grpc_connectivity_state current; + int do_connect = 0; + connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = state; + w->notify = notify; + gpr_mu_lock(&c->mu); + current = compute_connectivity_locked(c); + if (current == GRPC_CHANNEL_IDLE) { + current = GRPC_CHANNEL_CONNECTING; + c->connecting = 1; + do_connect = 1; + connectivity_state_changed_locked(c); + } + if (current != *state) { + gpr_mu_unlock(&c->mu); + *state = current; + grpc_iomgr_add_callback(notify); + gpr_free(w); + } else { + w->next = c->watchers; + c->watchers = w; + gpr_mu_unlock(&c->mu); + } + if (do_connect) { + grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len, + compute_connect_deadline(c), c->args, c->mdctx, + &c->connecting_transport, &c->connected); + } +} + +static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { + return gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); +} + +static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { + if (c->connecting) { + return GRPC_CHANNEL_CONNECTING; + } + if (c->active) { + return GRPC_CHANNEL_READY; + } + return GRPC_CHANNEL_IDLE; +} + +static void connectivity_state_changed_locked(grpc_subchannel *c) { + grpc_connectivity_state current = compute_connectivity_locked(c); + connectivity_state_watcher *new = NULL; + connectivity_state_watcher *w; + while ((w = c->watchers)) { + c->watchers = w->next; + + if (current != *w->current) { + *w->current = current; + grpc_iomgr_add_callback(w->notify); + gpr_free(w); + } else { + w->next = new; + new = w; + } + } + c->watchers = new; +} + /* * grpc_subchannel_call implementation */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8b3d82eb0ad..8836f9b09c1 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,7 +64,6 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, - grpc_mdctx *mdctx, grpc_transport_stream_op *initial_op, grpc_subchannel_call **target, grpc_iomgr_closure *notify); @@ -84,6 +83,8 @@ struct grpc_subchannel_args { /** Address to connect to */ struct sockaddr *addr; size_t addr_len; + /** metadata context to use */ + grpc_mdctx *mdctx; }; /** create a subchannel given a connector */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index ba4e1a24ecc..46d1d708dd7 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -41,11 +41,18 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/client_config/resolver_registry.h" +#include "src/core/iomgr/tcp_client.h" #include "src/core/surface/channel.h" +#include "src/core/transport/chttp2_transport.h" typedef struct { grpc_connector base; gpr_refcount refs; + + grpc_transport **transport; + grpc_iomgr_closure *notify; + const grpc_channel_args *args; + grpc_mdctx *mdctx; } connector; static void connector_ref(grpc_connector *con) { @@ -60,28 +67,68 @@ static void connector_unref(grpc_connector *con) { } } -static void connector_connect(grpc_connector *connector, const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, grpc_transport **transport, grpc_iomgr_closure *notify) { - abort(); +static void connected(void *arg, grpc_endpoint *tcp) { + connector *c = arg; + grpc_iomgr_closure *notify; + if (tcp != NULL) { + *c->transport = + grpc_create_chttp2_transport(c->args, tcp, NULL, 0, c->mdctx, 1); + } else { + *c->transport = NULL; + } + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); +} + +static void connector_connect( + grpc_connector *con, grpc_pollset_set *pollset_set, + const struct sockaddr *addr, int addr_len, gpr_timespec deadline, + const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, + grpc_transport **transport, grpc_iomgr_closure *notify) { + connector *c = (connector *)con; + GPR_ASSERT(c->notify == NULL); + c->notify = notify; + c->args = channel_args; + c->mdctx = metadata_context; + grpc_tcp_client_connect(connected, c, pollset_set, addr, addr_len, deadline); } static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect}; -static void subchannel_factory_ref(grpc_subchannel_factory *scf) {} +typedef struct { + grpc_subchannel_factory base; + gpr_refcount refs; + grpc_mdctx *mdctx; +} subchannel_factory; + +static void subchannel_factory_ref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + gpr_ref(&f->refs); +} -static void subchannel_factory_unref(grpc_subchannel_factory *scf) {} +static void subchannel_factory_unref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + if (gpr_unref(&f->refs)) { + grpc_mdctx_unref(f->mdctx); + gpr_free(f); + } +} static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_subchannel *s; + memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); + args->mdctx = f->mdctx; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); return s; } static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel}; -static grpc_subchannel_factory subchannel_factory = {&subchannel_factory_vtable}; /* Create a client channel: Asynchronously: - resolve target @@ -93,6 +140,8 @@ grpc_channel *grpc_channel_create(const char *target, #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; grpc_resolver *resolver; + subchannel_factory *f; + grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { @@ -101,12 +150,16 @@ grpc_channel *grpc_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - resolver = grpc_resolver_create(target, &subchannel_factory); + f = gpr_malloc(sizeof(*f)); + f->base.vtable = &subchannel_factory_vtable; + gpr_ref_init(&f->refs, 1); + f->mdctx = mdctx; + resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, grpc_mdctx_create(), 1); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); return channel;