Merge branch 'connected-subchannel' of github.com:ctiller/grpc into connected-subchannel

reviewable/pr4232/r3
Craig Tiller 9 years ago
commit 61efb98eda
  1. 4
      src/core/census/grpc_filter.c
  2. 8
      src/core/channel/client_channel.c
  3. 24
      src/core/channel/client_uchannel.c
  4. 4
      src/core/channel/client_uchannel.h
  5. 3
      src/core/channel/compress_filter.c
  6. 3
      src/core/channel/http_server_filter.c
  7. 19
      src/core/channel/subchannel_call_holder.c
  8. 28
      src/core/client_config/lb_policies/pick_first.c
  9. 10
      src/core/client_config/lb_policies/round_robin.c
  10. 3
      src/core/client_config/lb_policy.c
  11. 3
      src/core/client_config/lb_policy.h
  12. 117
      src/core/client_config/subchannel.c
  13. 43
      src/core/client_config/subchannel.h
  14. 3
      src/core/surface/lame_client.c
  15. 9
      test/core/end2end/fixtures/h2_uchannel.c

@ -60,9 +60,7 @@ typedef struct call_data {
grpc_closure finish_recv;
} call_data;
typedef struct channel_data {
gpr_uint8 unused;
} channel_data;
typedef struct channel_data { gpr_uint8 unused; } channel_data;
static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
call_data *calld,

@ -324,7 +324,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
gpr_mu_lock(&chand->mu_config);
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel);
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = grpc_closure_next(closure)) {
@ -338,8 +339,9 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
return 1;
}
if (chand->lb_policy != NULL) {
int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
initial_metadata, connected_subchannel, on_ready);
int r =
grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
initial_metadata, connected_subchannel, on_ready);
gpr_mu_unlock(&chand->mu_config);
return r;
}

@ -84,9 +84,9 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
"uchannel_monitor_subchannel");
grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel,
&chand->subchannel_connectivity,
&chand->connectivity_cb);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity,
&chand->connectivity_cb);
}
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
@ -168,8 +168,8 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
grpc_connected_subchannel_state_change_unsubscribe(exec_ctx, chand->connected_subchannel,
&chand->connectivity_cb);
grpc_connected_subchannel_state_change_unsubscribe(
exec_ctx, chand->connected_subchannel, &chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state);
}
@ -198,9 +198,9 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
GRPC_CHANNEL_CONNECTING,
"uchannel_connecting_changed");
chand->subchannel_connectivity = out;
grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel,
&chand->subchannel_connectivity,
&chand->connectivity_cb);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity,
&chand->connectivity_cb);
}
gpr_mu_unlock(&chand->mu_state);
return out;
@ -221,8 +221,8 @@ grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
channel_data *chand = elem->channel_data;
grpc_channel_element *parent_elem;
gpr_mu_lock(&chand->mu_state);
parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
chand->master));
parent_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(chand->master));
gpr_mu_unlock(&chand->mu_state);
return grpc_client_channel_get_connecting_pollset_set(parent_elem);
}
@ -267,8 +267,8 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
return channel;
}
void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel,
grpc_connected_subchannel *connected_subchannel) {
void grpc_client_uchannel_set_connected_subchannel(
grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) {
grpc_channel_element *elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
channel_data *chand = elem->channel_data;

@ -64,7 +64,7 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args);
void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel,
grpc_connected_subchannel *connected_subchannel);
void grpc_client_uchannel_set_connected_subchannel(
grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */

@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
}
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),

@ -225,8 +225,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
}
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),

@ -137,19 +137,23 @@ retry:
}
/* if we don't have a subchannel, try to get one */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->connected_subchannel == NULL && op->send_initial_metadata != NULL) {
holder->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&holder->next_step, subchannel_ready, holder);
if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
op->send_initial_metadata, &holder->connected_subchannel,
&holder->next_step)) {
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
&holder->connected_subchannel, &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->connected_subchannel != NULL) {
gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset));
gpr_atm_rel_store(
&holder->subchannel_call,
grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollset));
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@ -171,7 +175,10 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
fail_locked(exec_ctx, holder);
} else {
gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset));
gpr_atm_rel_store(
&holder->subchannel_call,
grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollset));
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);

@ -174,8 +174,8 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -218,7 +218,8 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) {
if (grpc_subchannel_get_connected_subchannel(subchannels[i]) !=
exclude_subchannel) {
memset(&op, 0, sizeof(op));
op.disconnect = 1;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
@ -245,9 +246,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, &p->checking_connectivity,
&p->connectivity_changed);
} else {
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
}
@ -258,7 +259,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
p->selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(p->selected);
GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */
@ -274,9 +276,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, &p->checking_connectivity,
&p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
@ -361,13 +363,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue;
if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i]))
continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
if (p->selected) {
grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected,
"pf_broadcast_to_selected");
}
gpr_free(subchannels);
}

@ -314,8 +314,8 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
@ -325,7 +325,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
/* only advance the last picked pointer if the selection was used */
@ -390,7 +391,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",

@ -71,7 +71,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
target, on_complete);
}

@ -111,7 +111,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete);
grpc_connected_subchannel **target,
grpc_closure *on_complete);
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);

@ -52,8 +52,9 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel)))
#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
&(subchannel)->connected_subchannel)))
struct grpc_connected_subchannel {
/** refcount */
@ -152,10 +153,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
#define REF_PASS_REASON , reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs + 1, reason)
(name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs - 1, reason)
(name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
#else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
@ -175,23 +176,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
* connection implementation
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
int success) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_connected_subchannel_ref(
grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("CONNECTION", c);
gpr_ref(&c->refs);
}
void grpc_connected_subchannel_unref(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("CONNECTION", c);
if (gpr_unref(&c->refs)) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1);
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c),
1);
}
}
@ -199,7 +203,8 @@ void grpc_connected_subchannel_unref(
* grpc_subchannel implementation
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
int success) {
grpc_subchannel *c = arg;
grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@ -214,13 +219,16 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success)
}
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("SUBCHANNEL", c);
gpr_ref(&c->refs);
}
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("SUBCHANNEL", c);
if (gpr_unref(&c->refs)) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1);
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
1);
}
}
@ -276,7 +284,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.deadline = compute_connect_deadline(c);
args.channel_args = c->args;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change");
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
GRPC_CHANNEL_CONNECTING, "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@ -319,11 +328,11 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
}
void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_closure *subscribed_notify) {
grpc_subchannel *c,
grpc_closure *subscribed_notify) {
gpr_mu_lock(&c->mu);
grpc_connectivity_state_change_unsubscribe(
exec_ctx, &c->state_tracker, subscribed_notify);
grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker,
subscribed_notify);
gpr_mu_unlock(&c->mu);
}
@ -339,7 +348,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect) {
c->disconnected = 1;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
@ -360,15 +370,16 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
}
}
void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) {
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_transport_op *op) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_element *top_elem =
grpc_channel_stack_element(channel_stack, 0);
grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
int iomgr_success) {
int iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->whom.subchannel;
gpr_mu *mu = &c->mu;
@ -377,9 +388,12 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
/* if we failed just leave this closure */
if (iomgr_success) {
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child");
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
sw->connectivity_state, "reflect_child");
if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier),
&sw->connectivity_state, &sw->closure);
GRPC_SUBCHANNEL_REF(c, "state_watcher");
sw = NULL;
}
@ -390,7 +404,10 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_free(sw);
}
static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_connectivity_state *state,
grpc_closure *closure) {
grpc_transport_op op;
grpc_channel_element *elem;
memset(&op, 0, sizeof(op));
@ -400,12 +417,16 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connecte
elem->filter->start_transport_op(exec_ctx, elem, &op);
}
void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_connectivity_state *state, grpc_closure *closure) {
GPR_ASSERT(state != NULL);
connected_subchannel_state_op(exec_ctx, con, state, closure);
}
void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) {
void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_closure *closure) {
connected_subchannel_state_op(exec_ctx, con, NULL, closure);
}
@ -429,7 +450,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
gpr_ref_init(&c->refs, 1);
gpr_ref_init(&con->refs, 1);
grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
@ -440,7 +461,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
sw_subchannel->whom.subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel);
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
sw_subchannel);
gpr_mu_lock(&c->mu);
@ -458,28 +480,18 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated
/* setup subchannel watching connected subchannel for changes; subchannel ref
for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_REF(c, "state_watcher");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure);
#if 0
grpc_transport_op op;
grpc_channel_element *elem;
/* setup connected subchannel watching transport for changes */
memset(&op, 0, sizeof(op));
op.connectivity_state = &sw_connected_subchannel->connectivity_state;
op.on_connectivity_state_change = &sw_connected_subchannel->closure;
op.bind_pollset_set = c->pollset_set;
elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, &op);
#endif
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
/* signal completion */
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected");
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
"connected");
gpr_mu_unlock(&c->mu);
gpr_free((void *)filters);
@ -559,7 +571,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed");
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
@ -623,13 +637,14 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
}
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) {
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel *c) {
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_pollset *pollset) {
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);

@ -64,7 +64,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) grpc_connected_subchannel_unref((cl), (p))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
grpc_connected_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
grpc_subchannel_call_unref((cl), (p))
@ -76,11 +77,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_ref(
grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_connected_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
@ -88,17 +89,17 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *connected_subchannel,
grpc_pollset *pollset);
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
grpc_pollset *pollset);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_transport_op *op);
void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *subchannel,
grpc_transport_op *op);
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
grpc_transport_op *op);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
@ -110,19 +111,18 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *channel,
grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_connectivity_state *state, grpc_closure *notify);
/** Remove \a subscribed_notify from the list of closures to be called on a
* state change if present. */
void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_closure *subscribed_notify);
void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *channel,
grpc_closure *subscribed_notify);
grpc_subchannel *channel,
grpc_closure *subscribed_notify);
void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_closure *subscribed_notify);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
@ -135,7 +135,8 @@ void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
/** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *subchannel);
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel *subchannel);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,

@ -103,8 +103,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
}
grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}

@ -237,7 +237,8 @@ grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE;
static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) {
if (g_state != GRPC_CHANNEL_READY) {
grpc_subchannel_notify_on_state_change(exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg));
grpc_subchannel_notify_on_state_change(
exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg));
}
}
@ -246,12 +247,14 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_init(&pollset);
grpc_subchannel_add_interested_party(&exec_ctx, c, &pollset);
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, grpc_closure_create(state_changed, c));
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state,
grpc_closure_create(state_changed, c));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pollset));
while (g_state != GRPC_CHANNEL_READY) {
grpc_pollset_worker worker;
grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
gpr_mu_unlock(GRPC_POLLSET_MU(&pollset));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pollset));

Loading…
Cancel
Save