clang-format

pull/2303/head
Craig Tiller 10 years ago
parent 08a1cf8f4f
commit 4ab82d2c4d
  1. 9
      src/core/client_config/connector.c
  2. 12
      src/core/client_config/connector.h
  3. 52
      src/core/client_config/lb_policies/pick_first.c
  4. 31
      src/core/client_config/lb_policy.c
  5. 19
      src/core/client_config/lb_policy.h
  6. 21
      src/core/client_config/resolvers/unix_resolver_posix.c
  7. 214
      src/core/client_config/subchannel.c
  8. 9
      src/core/client_config/subchannel.h

@ -41,10 +41,9 @@ void grpc_connector_unref(grpc_connector *connector) {
connector->vtable->unref(connector); connector->vtable->unref(connector);
} }
void grpc_connector_connect( void grpc_connector_connect(grpc_connector *connector,
grpc_connector *connector, const grpc_connect_in_args *in_args,
const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args,
grpc_connect_out_args *out_args, grpc_iomgr_closure *notify) {
grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify); connector->vtable->connect(connector, in_args, out_args, notify);
} }

@ -72,16 +72,14 @@ struct grpc_connector_vtable {
void (*unref)(grpc_connector *connector); void (*unref)(grpc_connector *connector);
void (*connect)(grpc_connector *connector, void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args, const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args, grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
grpc_iomgr_closure *notify);
}; };
void grpc_connector_ref(grpc_connector *connector); void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect( void grpc_connector_connect(grpc_connector *connector,
grpc_connector *connector, const grpc_connect_in_args *in_args,
const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args,
grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
grpc_iomgr_closure *notify);
#endif #endif

@ -74,7 +74,7 @@ typedef struct {
} pick_first_lb_policy; } pick_first_lb_policy;
void pf_destroy(grpc_lb_policy *pol) { void pf_destroy(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy*)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i; size_t i;
for (i = 0; i < p->num_subchannels; i++) { for (i = 0; i < p->num_subchannels; i++) {
grpc_subchannel_unref(p->subchannels[i]); grpc_subchannel_unref(p->subchannels[i]);
@ -92,7 +92,7 @@ void pf_shutdown(grpc_lb_policy *pol) {
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete) { grpc_iomgr_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy*)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
if (p->selected) { if (p->selected) {
@ -105,9 +105,12 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
p->checking_subchannel = 0; p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE; p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(pol, "pick_first_connectivity"); GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed);
} }
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->pollset = pollset; pp->pollset = pollset;
@ -121,14 +124,16 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
static void del_interested_parties_locked(pick_first_lb_policy *p) { static void del_interested_parties_locked(pick_first_lb_policy *p) {
pending_pick *pp; pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) { for (pp = p->pending_picks; pp; pp = pp->next) {
grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
pp->pollset);
} }
} }
static void add_interested_parties_locked(pick_first_lb_policy *p) { static void add_interested_parties_locked(pick_first_lb_policy *p) {
pending_pick *pp; pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) { for (pp = p->pending_picks; pp; pp = pp->next) {
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pp->pollset);
} }
} }
@ -142,7 +147,8 @@ loop:
switch (p->checking_connectivity) { switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
p->selected = p->subchannels[p->checking_subchannel]; p->selected = p->subchannels[p->checking_subchannel];
GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == GRPC_CHANNEL_READY); GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) ==
GRPC_CHANNEL_READY);
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = p->selected; *pp->target = p->selected;
@ -154,19 +160,25 @@ loop:
break; break;
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE:
del_interested_parties_locked(p); del_interested_parties_locked(p);
p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_subchannel =
p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); (p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p); add_interested_parties_locked(p);
goto loop; goto loop;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed);
break; break;
case GRPC_CHANNEL_FATAL_FAILURE: case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p); del_interested_parties_locked(p);
GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
p->subchannels[p->num_subchannels - 1]);
p->checking_subchannel %= p->num_subchannels; p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
p->num_subchannels--; p->num_subchannels--;
grpc_subchannel_unref(p->subchannels[p->num_subchannels]); grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
add_interested_parties_locked(p); add_interested_parties_locked(p);
@ -184,7 +196,7 @@ loop:
} }
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
pick_first_lb_policy *p = (pick_first_lb_policy*)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i; size_t i;
size_t n; size_t n;
grpc_subchannel **subchannels; grpc_subchannel **subchannels;
@ -206,7 +218,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
} }
static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy*)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st; grpc_connectivity_state st;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
st = grpc_connectivity_state_check(&p->state_tracker); st = grpc_connectivity_state_check(&p->state_tracker);
@ -214,15 +226,19 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st; return st;
} }
static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { static void pf_notify_on_state_change(grpc_lb_policy *pol,
pick_first_lb_policy *p = (pick_first_lb_policy*)pol; grpc_connectivity_state *current,
grpc_iomgr_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify); grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
notify);
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; pf_destroy, pf_shutdown, pf_pick,
pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) { size_t num_subchannels) {

@ -33,33 +33,34 @@
#include "src/core/client_config/lb_policy.h" #include "src/core/client_config/lb_policy.h"
void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { void grpc_lb_policy_init(grpc_lb_policy *policy,
policy->vtable = vtable; const grpc_lb_policy_vtable *vtable) {
gpr_ref_init(&policy->refs, 1); policy->vtable = vtable;
gpr_ref_init(&policy->refs, 1);
} }
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s",
policy, (int)policy->refs.count, (int)policy->refs.count + 1, policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason);
reason);
#else #else
void grpc_lb_policy_ref(grpc_lb_policy *policy) { void grpc_lb_policy_ref(grpc_lb_policy *policy) {
#endif #endif
gpr_ref(&policy->refs); gpr_ref(&policy->refs);
} }
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
policy, (int)policy->refs.count, (int)policy->refs.count - 1, policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason);
reason);
#else #else
void grpc_lb_policy_unref(grpc_lb_policy *policy) { void grpc_lb_policy_unref(grpc_lb_policy *policy) {
#endif #endif
if (gpr_unref(&policy->refs)) { if (gpr_unref(&policy->refs)) {
policy->vtable->destroy(policy); policy->vtable->destroy(policy);
} }
} }
void grpc_lb_policy_shutdown(grpc_lb_policy *policy) { void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
@ -74,5 +75,5 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
} }
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
policy->vtable->broadcast(policy, op); policy->vtable->broadcast(policy, op);
} }

@ -67,14 +67,20 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state. /** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */ Updates *state with the new state of the policy */
void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure); void (*notify_on_state_change)(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure);
}; };
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_REF(p, r) \
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); #define GRPC_LB_POLICY_UNREF(p, r) \
void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason); grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
#else #else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
@ -83,7 +89,8 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy);
#endif #endif
/** called by concrete implementations to initialize the base struct */ /** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */ /** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy); void grpc_lb_policy_shutdown(grpc_lb_policy *policy);

@ -79,10 +79,10 @@ static void unix_ref(grpc_resolver *r);
static void unix_unref(grpc_resolver *r); static void unix_unref(grpc_resolver *r);
static void unix_shutdown(grpc_resolver *r); static void unix_shutdown(grpc_resolver *r);
static void unix_channel_saw_error(grpc_resolver *r, static void unix_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address, struct sockaddr *failing_address,
int failing_address_len); int failing_address_len);
static void unix_next(grpc_resolver *r, grpc_client_config **target_config, static void unix_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete); grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable unix_resolver_vtable = { static const grpc_resolver_vtable unix_resolver_vtable = {
unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next}; unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next};
@ -112,12 +112,11 @@ static void unix_shutdown(grpc_resolver *resolver) {
} }
static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
int len) { int len) {}
}
static void unix_next(grpc_resolver *resolver, static void unix_next(grpc_resolver *resolver,
grpc_client_config **target_config, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) { grpc_iomgr_closure *on_complete) {
unix_resolver *r = (unix_resolver *)resolver; unix_resolver *r = (unix_resolver *)resolver;
gpr_mu_lock(&r->mu); gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion); GPR_ASSERT(!r->next_completion);
@ -136,9 +135,10 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) {
if (r->next_completion != NULL && !r->published) { if (r->next_completion != NULL && !r->published) {
cfg = grpc_client_config_create(); cfg = grpc_client_config_create();
memset(&args, 0, sizeof(args)); memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *) &r->addr; args.addr = (struct sockaddr *)&r->addr;
args.addr_len = r->addr_len; args.addr_len = r->addr_len;
subchannel = grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); subchannel =
grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
lb_policy = r->lb_policy_factory(&subchannel, 1); lb_policy = r->lb_policy_factory(&subchannel, 1);
grpc_client_config_set_lb_policy(cfg, lb_policy); grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix"); GRPC_LB_POLICY_UNREF(lb_policy, "unix");
@ -194,8 +194,7 @@ static void unix_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *unix_factory_create_resolver( static grpc_resolver *unix_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri, grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) { grpc_subchannel_factory *subchannel_factory) {
return unix_create(uri, grpc_create_pick_first_lb_policy, return unix_create(uri, grpc_create_pick_first_lb_policy, subchannel_factory);
subchannel_factory);
} }
static const grpc_resolver_factory_vtable unix_factory_vtable = { static const grpc_resolver_factory_vtable unix_factory_vtable = {

@ -42,10 +42,10 @@
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
typedef struct { typedef struct {
/* all fields protected by subchannel->mu */ /* all fields protected by subchannel->mu */
/** refcount */ /** refcount */
int refs; int refs;
/** parent subchannel */ /** parent subchannel */
grpc_subchannel *subchannel; grpc_subchannel *subchannel;
} connection; } connection;
@ -103,7 +103,8 @@ struct grpc_subchannel_call {
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); static grpc_subchannel_call *create_call(connection *con,
grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c); static void connectivity_state_changed_locked(grpc_subchannel *c);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
@ -112,7 +113,8 @@ static void subchannel_connected(void *subchannel, int iomgr_success);
static void subchannel_ref_locked(grpc_subchannel *c); static void subchannel_ref_locked(grpc_subchannel *c);
static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT; static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c); static void connection_ref_locked(connection *c);
static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT; static grpc_subchannel *connection_unref_locked(connection *c)
GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_subchannel *c); static void subchannel_destroy(grpc_subchannel *c);
/* /*
@ -120,58 +122,55 @@ static void subchannel_destroy(grpc_subchannel *c);
*/ */
static void connection_destroy(connection *c) { static void connection_destroy(connection *c) {
GPR_ASSERT(c->refs == 0); GPR_ASSERT(c->refs == 0);
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c); gpr_free(c);
} }
static void connection_ref_locked(connection *c) { static void connection_ref_locked(connection *c) {
subchannel_ref_locked(c->subchannel); subchannel_ref_locked(c->subchannel);
++c->refs; ++c->refs;
} }
static grpc_subchannel *connection_unref_locked(connection *c) { static grpc_subchannel *connection_unref_locked(connection *c) {
grpc_subchannel *destroy = NULL; grpc_subchannel *destroy = NULL;
if (subchannel_unref_locked(c->subchannel)) { if (subchannel_unref_locked(c->subchannel)) {
destroy = c->subchannel; destroy = c->subchannel;
} }
if (--c->refs == 0 && c->subchannel->active != c) { if (--c->refs == 0 && c->subchannel->active != c) {
connection_destroy(c); connection_destroy(c);
} }
return destroy; return destroy;
} }
/* /*
* grpc_subchannel implementation * grpc_subchannel implementation
*/ */
static void subchannel_ref_locked(grpc_subchannel *c) { static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; }
++c->refs;
}
static int subchannel_unref_locked(grpc_subchannel *c) { static int subchannel_unref_locked(grpc_subchannel *c) {
return --c->refs == 0; return --c->refs == 0;
} }
void grpc_subchannel_ref(grpc_subchannel *c) { void grpc_subchannel_ref(grpc_subchannel *c) {
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
subchannel_ref_locked(c); subchannel_ref_locked(c);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
} }
void grpc_subchannel_unref(grpc_subchannel *c) { void grpc_subchannel_unref(grpc_subchannel *c) {
int destroy; int destroy;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c); destroy = subchannel_unref_locked(c);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c); if (destroy) subchannel_destroy(c);
} }
static void subchannel_destroy(grpc_subchannel *c) { static void subchannel_destroy(grpc_subchannel *c) {
if (c->active != NULL) { if (c->active != NULL) {
connection_destroy(c->active); connection_destroy(c->active);
} }
gpr_free(c->filters); gpr_free(c->filters);
grpc_channel_args_destroy(c->args); grpc_channel_args_destroy(c->args);
gpr_free(c->addr); gpr_free(c->addr);
@ -216,16 +215,17 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
} }
static void start_connect(grpc_subchannel *c) { static void start_connect(grpc_subchannel *c) {
grpc_connect_in_args args; grpc_connect_in_args args;
args.interested_parties = &c->pollset_set; args.interested_parties = &c->pollset_set;
args.addr = c->addr; args.addr = c->addr;
args.addr_len = c->addr_len; args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c); args.deadline = compute_connect_deadline(c);
args.channel_args = c->args; args.channel_args = c->args;
args.metadata_context = c->mdctx; args.metadata_context = c->mdctx;
grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected); grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->connected);
} }
void grpc_subchannel_create_call(grpc_subchannel *c, void grpc_subchannel_create_call(grpc_subchannel *c,
@ -275,78 +275,82 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_iomgr_closure *notify) { grpc_iomgr_closure *notify) {
int do_connect = 0; int do_connect = 0;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
do_connect = 1; notify)) {
do_connect = 1;
c->connecting = 1; c->connecting = 1;
subchannel_ref_locked(c); subchannel_ref_locked(c);
grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
} }
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
if (do_connect) { if (do_connect) {
start_connect(c); start_connect(c);
} }
} }
void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { void grpc_subchannel_process_transport_op(grpc_subchannel *c,
abort(); grpc_transport_op *op) {
abort();
} }
static void publish_transport(grpc_subchannel *c) { static void publish_transport(grpc_subchannel *c) {
size_t channel_stack_size; size_t channel_stack_size;
connection *con; connection *con;
grpc_channel_stack *stk; grpc_channel_stack *stk;
size_t num_filters; size_t num_filters;
const grpc_channel_filter **filters; const grpc_channel_filter **filters;
waiting_for_connect *w4c; waiting_for_connect *w4c;
int destroy; int destroy;
num_filters = c->num_filters + c->connecting_result.num_filters + 1; num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters); filters = gpr_malloc(sizeof(*filters) * num_filters);
memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters); memcpy(filters + c->num_filters, c->connecting_result.filters,
filters[num_filters - 1] = &grpc_connected_channel_filter; sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
con = gpr_malloc(sizeof(connection) + channel_stack_size); channel_stack_size = grpc_channel_stack_size(filters, num_filters);
stk = (grpc_channel_stack *)(con + 1); con = gpr_malloc(sizeof(connection) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
con->refs = 0;
con->subchannel = c; con->refs = 0;
grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); con->subchannel = c;
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
memset(&c->connecting_result, 0, sizeof(c->connecting_result)); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->active == NULL); gpr_mu_lock(&c->mu);
c->active = con; GPR_ASSERT(c->active == NULL);
c->connecting = 0; c->active = con;
connectivity_state_changed_locked(c); c->connecting = 0;
while ((w4c = c->waiting)) { connectivity_state_changed_locked(c);
abort(); /* not implemented */ while ((w4c = c->waiting)) {
} abort(); /* not implemented */
}
destroy = subchannel_unref_locked(c); destroy = subchannel_unref_locked(c);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
gpr_free(filters); gpr_free(filters);
if (destroy) { if (destroy) {
subchannel_destroy(c); subchannel_destroy(c);
} }
} }
static void subchannel_connected(void *arg, int iomgr_success) { static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg; grpc_subchannel *c = arg;
if (c->connecting_result.transport) { if (c->connecting_result.transport) {
publish_transport(c); publish_transport(c);
} else { } else {
int destroy; int destroy;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c); destroy = subchannel_unref_locked(c);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c); if (destroy) subchannel_destroy(c);
/* TODO(ctiller): retry after sleeping */ /* TODO(ctiller): retry after sleeping */
abort(); abort();
} }
} }
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@ -372,21 +376,19 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation * grpc_subchannel_call implementation
*/ */
void grpc_subchannel_call_ref(grpc_subchannel_call *c) { void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); }
gpr_ref(&c->refs);
}
void grpc_subchannel_call_unref(grpc_subchannel_call *c) { void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
if (gpr_unref(&c->refs)) { if (gpr_unref(&c->refs)) {
gpr_mu *mu = &c->connection->subchannel->mu; gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy; grpc_subchannel *destroy;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
gpr_mu_lock(mu); gpr_mu_lock(mu);
destroy = connection_unref_locked(c->connection); destroy = connection_unref_locked(c->connection);
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
gpr_free(c); gpr_free(c);
if (destroy) { if (destroy) {
subchannel_destroy(destroy); subchannel_destroy(destroy);
} }
} }
} }
@ -398,9 +400,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
top_elem->filter->start_transport_stream_op(top_elem, op); top_elem->filter->start_transport_stream_op(top_elem, op);
} }
grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { grpc_subchannel_call *create_call(connection *con,
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_transport_stream_op *initial_op) {
grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con; call->connection = con;
gpr_ref_init(&call->refs, 1); gpr_ref_init(&call->refs, 1);

@ -55,7 +55,8 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_iomgr_closure *notify); grpc_iomgr_closure *notify);
/** process a transport level op */ /** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
grpc_transport_op *op);
/** poll the current connectivity state of a channel */ /** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_connectivity_state grpc_subchannel_check_connectivity(
@ -67,8 +68,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state, grpc_connectivity_state *state,
grpc_iomgr_closure *notify); grpc_iomgr_closure *notify);
void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); grpc_pollset *pollset);
void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
grpc_pollset *pollset);
/** continue processing a transport op */ /** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,

Loading…
Cancel
Save