diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 9cc57ddf38c..a8cd5fc1490 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -41,10 +41,9 @@ void grpc_connector_unref(grpc_connector *connector) { connector->vtable->unref(connector); } -void grpc_connector_connect( - grpc_connector *connector, - const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify) { +void grpc_connector_connect(grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, + grpc_iomgr_closure *notify) { connector->vtable->connect(connector, in_args, out_args, notify); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 55c6e63129a..edcb10a36ec 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -72,16 +72,14 @@ struct grpc_connector_vtable { void (*unref)(grpc_connector *connector); void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify); + grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); }; void grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector); -void grpc_connector_connect( - grpc_connector *connector, - const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify); +void grpc_connector_connect(grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, + grpc_iomgr_closure *notify); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index cdc7e751401..c94408200ba 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -74,7 +74,7 @@ typedef struct { } pick_first_lb_policy; void pf_destroy(grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; for (i = 0; i < p->num_subchannels; i++) { grpc_subchannel_unref(p->subchannels[i]); @@ -92,7 +92,7 @@ void pf_shutdown(grpc_lb_policy *pol) { void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete) { - pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); if (p->selected) { @@ -105,9 +105,12 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; GRPC_LB_POLICY_REF(pol, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed); } - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); + grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -121,14 +124,16 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, static void del_interested_parties_locked(pick_first_lb_policy *p) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); + grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], + pp->pollset); } } static void add_interested_parties_locked(pick_first_lb_policy *p) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); + grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], + pp->pollset); } } @@ -142,7 +147,8 @@ loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: p->selected = p->subchannels[p->checking_subchannel]; - GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == GRPC_CHANNEL_READY); + GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == + GRPC_CHANNEL_READY); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; @@ -154,19 +160,25 @@ loop: break; case GRPC_CHANNEL_TRANSIENT_FAILURE: del_interested_parties_locked(p); - p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); + p->checking_subchannel = + (p->checking_subchannel + 1) % p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); add_interested_parties_locked(p); goto loop; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: - grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + p->subchannels[p->checking_subchannel], &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: del_interested_parties_locked(p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels - 1]); p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); p->num_subchannels--; grpc_subchannel_unref(p->subchannels[p->num_subchannels]); add_interested_parties_locked(p); @@ -184,7 +196,7 @@ loop: } static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { - pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; grpc_subchannel **subchannels; @@ -206,7 +218,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { } static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); st = grpc_connectivity_state_check(&p->state_tracker); @@ -214,15 +226,19 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { return st; } -static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { - pick_first_lb_policy *p = (pick_first_lb_policy*)pol; +static void pf_notify_on_state_change(grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); gpr_mu_unlock(&p->mu); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, pf_shutdown, pf_pick, + pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index dfe21cf443a..6d1c788742b 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,33 +33,34 @@ #include "src/core/client_config/lb_policy.h" -void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { - policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); +void grpc_lb_policy_init(grpc_lb_policy *policy, + const grpc_lb_policy_vtable *vtable) { + policy->vtable = vtable; + gpr_ref_init(&policy->refs, 1); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count + 1, - reason); + policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); #else -void grpc_lb_policy_ref(grpc_lb_policy *policy) { +void grpc_lb_policy_ref(grpc_lb_policy *policy) { #endif - gpr_ref(&policy->refs); + gpr_ref(&policy->refs); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { +void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, + const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count - 1, - reason); + policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); #else void grpc_lb_policy_unref(grpc_lb_policy *policy) { #endif - if (gpr_unref(&policy->refs)) { - policy->vtable->destroy(policy); - } + if (gpr_unref(&policy->refs)) { + policy->vtable->destroy(policy); + } } void grpc_lb_policy_shutdown(grpc_lb_policy *policy) { @@ -74,5 +75,5 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, } void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { - policy->vtable->broadcast(policy, op); + policy->vtable->broadcast(policy, op); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 717f26af652..a468f761cc9 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -67,14 +67,20 @@ struct grpc_lb_policy_vtable { /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ - void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure); + void (*notify_on_state_change)(grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_iomgr_closure *closure); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); -void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason); +#define GRPC_LB_POLICY_REF(p, r) \ + grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_UNREF(p, r) \ + grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); +void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) @@ -83,7 +89,8 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy); #endif /** called by concrete implementations to initialize the base struct */ -void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); +void grpc_lb_policy_init(grpc_lb_policy *policy, + const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ void grpc_lb_policy_shutdown(grpc_lb_policy *policy); diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c index 3dedf94357c..f7498548b11 100644 --- a/src/core/client_config/resolvers/unix_resolver_posix.c +++ b/src/core/client_config/resolvers/unix_resolver_posix.c @@ -79,10 +79,10 @@ static void unix_ref(grpc_resolver *r); static void unix_unref(grpc_resolver *r); static void unix_shutdown(grpc_resolver *r); static void unix_channel_saw_error(grpc_resolver *r, - struct sockaddr *failing_address, - int failing_address_len); + struct sockaddr *failing_address, + int failing_address_len); static void unix_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_iomgr_closure *on_complete); static const grpc_resolver_vtable unix_resolver_vtable = { unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next}; @@ -112,12 +112,11 @@ static void unix_shutdown(grpc_resolver *resolver) { } static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, - int len) { -} + int len) {} static void unix_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_client_config **target_config, + grpc_iomgr_closure *on_complete) { unix_resolver *r = (unix_resolver *)resolver; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); @@ -136,9 +135,10 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) { if (r->next_completion != NULL && !r->published) { cfg = grpc_client_config_create(); memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *) &r->addr; + args.addr = (struct sockaddr *)&r->addr; args.addr_len = r->addr_len; - subchannel = grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); + subchannel = + grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); lb_policy = r->lb_policy_factory(&subchannel, 1); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); @@ -194,8 +194,7 @@ static void unix_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *unix_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return unix_create(uri, grpc_create_pick_first_lb_policy, - subchannel_factory); + return unix_create(uri, grpc_create_pick_first_lb_policy, subchannel_factory); } static const grpc_resolver_factory_vtable unix_factory_vtable = { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 2f5843b2a4d..c770cb3b20f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -42,10 +42,10 @@ #include "src/core/transport/connectivity_state.h" typedef struct { - /* all fields protected by subchannel->mu */ - /** refcount */ - int refs; - /** parent subchannel */ + /* all fields protected by subchannel->mu */ + /** refcount */ + int refs; + /** parent subchannel */ grpc_subchannel *subchannel; } connection; @@ -103,7 +103,8 @@ struct grpc_subchannel_call { #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) -static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); +static grpc_subchannel_call *create_call(connection *con, + grpc_transport_stream_op *initial_op); static void connectivity_state_changed_locked(grpc_subchannel *c); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); @@ -112,7 +113,8 @@ static void subchannel_connected(void *subchannel, int iomgr_success); static void subchannel_ref_locked(grpc_subchannel *c); static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT; static void connection_ref_locked(connection *c); -static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT; +static grpc_subchannel *connection_unref_locked(connection *c) + GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_subchannel *c); /* @@ -120,58 +122,55 @@ static void subchannel_destroy(grpc_subchannel *c); */ static void connection_destroy(connection *c) { - GPR_ASSERT(c->refs == 0); - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); + GPR_ASSERT(c->refs == 0); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -static void connection_ref_locked(connection *c) { - subchannel_ref_locked(c->subchannel); - ++c->refs; +static void connection_ref_locked(connection *c) { + subchannel_ref_locked(c->subchannel); + ++c->refs; } static grpc_subchannel *connection_unref_locked(connection *c) { - grpc_subchannel *destroy = NULL; - if (subchannel_unref_locked(c->subchannel)) { - destroy = c->subchannel; - } + grpc_subchannel *destroy = NULL; + if (subchannel_unref_locked(c->subchannel)) { + destroy = c->subchannel; + } if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(c); + connection_destroy(c); } return destroy; } - /* * grpc_subchannel implementation */ -static void subchannel_ref_locked(grpc_subchannel *c) { - ++c->refs; -} +static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; } static int subchannel_unref_locked(grpc_subchannel *c) { - return --c->refs == 0; + return --c->refs == 0; } void grpc_subchannel_ref(grpc_subchannel *c) { - gpr_mu_lock(&c->mu); - subchannel_ref_locked(c); - gpr_mu_unlock(&c->mu); + gpr_mu_lock(&c->mu); + subchannel_ref_locked(c); + gpr_mu_unlock(&c->mu); } void grpc_subchannel_unref(grpc_subchannel *c) { - int destroy; - gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c); - gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c); + int destroy; + gpr_mu_lock(&c->mu); + destroy = subchannel_unref_locked(c); + gpr_mu_unlock(&c->mu); + if (destroy) subchannel_destroy(c); } static void subchannel_destroy(grpc_subchannel *c) { - if (c->active != NULL) { - connection_destroy(c->active); - } + if (c->active != NULL) { + connection_destroy(c->active); + } gpr_free(c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); @@ -216,16 +215,17 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, } static void start_connect(grpc_subchannel *c) { - grpc_connect_in_args args; + grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; - args.addr = c->addr; - args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); - args.channel_args = c->args; - args.metadata_context = c->mdctx; + args.interested_parties = &c->pollset_set; + args.addr = c->addr; + args.addr_len = c->addr_len; + args.deadline = compute_connect_deadline(c); + args.channel_args = c->args; + args.metadata_context = c->mdctx; - grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected); + grpc_connector_connect(c->connector, &args, &c->connecting_result, + &c->connected); } void grpc_subchannel_create_call(grpc_subchannel *c, @@ -275,78 +275,82 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_iomgr_closure *notify) { int do_connect = 0; gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { - do_connect = 1; + if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, + notify)) { + do_connect = 1; c->connecting = 1; subchannel_ref_locked(c); - grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); + grpc_connectivity_state_set(&c->state_tracker, + compute_connectivity_locked(c)); } gpr_mu_unlock(&c->mu); if (do_connect) { - start_connect(c); + start_connect(c); } } -void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { - abort(); +void grpc_subchannel_process_transport_op(grpc_subchannel *c, + grpc_transport_op *op) { + abort(); } static void publish_transport(grpc_subchannel *c) { - size_t channel_stack_size; - connection *con; - grpc_channel_stack *stk; - size_t num_filters; - const grpc_channel_filter **filters; - waiting_for_connect *w4c; - int destroy; - - num_filters = c->num_filters + c->connecting_result.num_filters + 1; - filters = gpr_malloc(sizeof(*filters) * num_filters); - memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); - memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters); - filters[num_filters - 1] = &grpc_connected_channel_filter; - - channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(connection) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); - - con->refs = 0; - con->subchannel = c; - grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); - grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); - memset(&c->connecting_result, 0, sizeof(c->connecting_result)); - - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->active == NULL); - c->active = con; - c->connecting = 0; - connectivity_state_changed_locked(c); - while ((w4c = c->waiting)) { - abort(); /* not implemented */ - } + size_t channel_stack_size; + connection *con; + grpc_channel_stack *stk; + size_t num_filters; + const grpc_channel_filter **filters; + waiting_for_connect *w4c; + int destroy; + + num_filters = c->num_filters + c->connecting_result.num_filters + 1; + filters = gpr_malloc(sizeof(*filters) * num_filters); + memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); + memcpy(filters + c->num_filters, c->connecting_result.filters, + sizeof(*filters) * c->connecting_result.num_filters); + filters[num_filters - 1] = &grpc_connected_channel_filter; + + channel_stack_size = grpc_channel_stack_size(filters, num_filters); + con = gpr_malloc(sizeof(connection) + channel_stack_size); + stk = (grpc_channel_stack *)(con + 1); + + con->refs = 0; + con->subchannel = c; + grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); + grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); + memset(&c->connecting_result, 0, sizeof(c->connecting_result)); + + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->active == NULL); + c->active = con; + c->connecting = 0; + connectivity_state_changed_locked(c); + while ((w4c = c->waiting)) { + abort(); /* not implemented */ + } destroy = subchannel_unref_locked(c); - gpr_mu_unlock(&c->mu); + gpr_mu_unlock(&c->mu); - gpr_free(filters); + gpr_free(filters); - if (destroy) { - subchannel_destroy(c); - } -} + if (destroy) { + subchannel_destroy(c); + } +} static void subchannel_connected(void *arg, int iomgr_success) { - grpc_subchannel *c = arg; - if (c->connecting_result.transport) { - publish_transport(c); - } else { - int destroy; - gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c); - gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c); - /* TODO(ctiller): retry after sleeping */ - abort(); - } + grpc_subchannel *c = arg; + if (c->connecting_result.transport) { + publish_transport(c); + } else { + int destroy; + gpr_mu_lock(&c->mu); + destroy = subchannel_unref_locked(c); + gpr_mu_unlock(&c->mu); + if (destroy) subchannel_destroy(c); + /* TODO(ctiller): retry after sleeping */ + abort(); + } } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -372,21 +376,19 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) { * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref(grpc_subchannel_call *c) { - gpr_ref(&c->refs); -} +void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); } void grpc_subchannel_call_unref(grpc_subchannel_call *c) { if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); gpr_mu_lock(mu); destroy = connection_unref_locked(c->connection); gpr_mu_unlock(mu); gpr_free(c); if (destroy) { - subchannel_destroy(destroy); + subchannel_destroy(destroy); } } } @@ -398,9 +400,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call, top_elem->filter->start_transport_stream_op(top_elem, op); } -grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { - grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); +grpc_subchannel_call *create_call(connection *con, + grpc_transport_stream_op *initial_op) { + grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_subchannel_call *call = + gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; gpr_ref_init(&call->refs, 1); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8155aba14cc..b777e51d201 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -55,7 +55,8 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_iomgr_closure *notify); /** process a transport level op */ -void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); +void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, + grpc_transport_op *op); /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( @@ -67,8 +68,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_connectivity_state *state, grpc_iomgr_closure *notify); -void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); -void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); +void grpc_subchannel_add_interested_party(grpc_subchannel *channel, + grpc_pollset *pollset); +void grpc_subchannel_del_interested_party(grpc_subchannel *channel, + grpc_pollset *pollset); /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,