Introduce call lists for moving work outside locks

pull/3423/head
Craig Tiller 10 years ago
parent 38adec97e8
commit 000cd8f9f7
  1. 139
      src/core/channel/client_channel.c
  2. 5
      src/core/channel/client_channel.h
  3. 127
      src/core/client_config/lb_policies/pick_first.c
  4. 35
      src/core/client_config/lb_policy.c
  5. 43
      src/core/client_config/lb_policy.h
  6. 91
      src/core/client_config/subchannel.c
  7. 11
      src/core/client_config/subchannel.h
  8. 39
      src/core/iomgr/iomgr.c
  9. 15
      src/core/iomgr/iomgr.h
  10. 2
      src/core/iomgr/pollset_multipoller_with_epoll.c
  11. 48
      src/core/iomgr/pollset_posix.c
  12. 10
      src/core/iomgr/pollset_posix.h
  13. 8
      src/core/surface/channel.c
  14. 16
      src/core/surface/channel_connectivity.c
  15. 8
      src/core/transport/chttp2/internal.h
  16. 4
      src/core/transport/chttp2/writing.c
  17. 67
      src/core/transport/chttp2_transport.c
  18. 62
      src/core/transport/connectivity_state.c
  19. 32
      src/core/transport/connectivity_state.h

@ -73,7 +73,7 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
grpc_iomgr_closure *waiting_for_config_closures;
grpc_iomgr_call_list waiting_for_config_closures;
/** resolver callback */
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
@ -181,8 +181,8 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
wc->closure.next = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = &wc->closure;
grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
1);
}
static int is_empty(void *p, int len) {
@ -230,6 +230,7 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@ -248,9 +249,10 @@ static void picked_target(void *arg, int iomgr_success) {
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, pollset,
&calld->subchannel_call,
&calld->async_setup_task);
&calld->async_setup_task, &call_list);
}
}
grpc_iomgr_call_list_run(call_list);
}
static grpc_iomgr_closure *merge_into_waiting_op(
@ -310,7 +312,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
grpc_iomgr_closure *consumed_op = NULL;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@ -328,7 +330,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
consumed_op = merge_into_waiting_op(elem, op);
grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@ -357,7 +359,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
consumed_op = merge_into_waiting_op(elem, op);
grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@ -398,7 +401,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel,
&calld->async_setup_task);
&calld->async_setup_task, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@ -424,9 +427,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
}
if (consumed_op != NULL) {
consumed_op->cb(consumed_op->cb_arg, 1);
}
grpc_iomgr_call_list_run(call_list);
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@ -435,36 +436,38 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
grpc_connectivity_state current_state,
grpc_iomgr_call_list *cl);
static void on_lb_policy_state_changed_locked(
lb_policy_connectivity_watcher *w) {
static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
grpc_iomgr_call_list *cl) {
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
cl);
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(w->chand, w->lb_policy, w->state);
watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
}
}
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&w->chand->mu_config);
on_lb_policy_state_changed_locked(w);
grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
on_lb_policy_state_changed_locked(w, &cl);
gpr_mu_unlock(&w->chand->mu_config);
grpc_connectivity_state_end_flush(&f);
grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) {
grpc_connectivity_state current_state,
grpc_iomgr_call_list *call_list) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@ -472,13 +475,8 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
&w->on_changed)
.state_already_changed) {
on_lb_policy_state_changed_locked(w);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
}
grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
call_list);
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@ -486,9 +484,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@ -496,7 +493,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
state = grpc_lb_policy_check_connectivity(lb_policy);
state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
}
grpc_client_config_unref(chand->incoming_configuration);
@ -508,8 +505,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
wakeup_closures = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = NULL;
grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@ -520,15 +516,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
&cl);
if (lb_policy != NULL) {
watch_lb_policy(chand, lb_policy, state);
watch_lb_policy(chand, lb_policy, state, &cl);
}
grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_connectivity_state_end_flush(&f);
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
@ -536,10 +530,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
&cl);
gpr_mu_unlock(&chand->mu_config);
grpc_connectivity_state_end_flush(&f);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
GRPC_RESOLVER_UNREF(old_resolver, "channel");
@ -547,24 +540,20 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (exit_idle) {
grpc_lb_policy_exit_idle(lb_policy);
grpc_lb_policy_exit_idle(lb_policy, &cl);
GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
}
if (old_lb_policy != NULL) {
grpc_lb_policy_shutdown(old_lb_policy);
grpc_lb_policy_shutdown(old_lb_policy, &cl);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
while (wakeup_closures) {
grpc_iomgr_closure *next = wakeup_closures->next;
wakeup_closures->cb(wakeup_closures->cb_arg, 1);
wakeup_closures = next;
}
if (lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
}
grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@ -573,23 +562,21 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
grpc_connectivity_state_flusher f;
grpc_iomgr_closure *call_list = op->on_consumed;
call_list->next = NULL;
op->on_consumed = NULL;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (op->on_consumed) {
grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
op->on_consumed = NULL;
}
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
if (grpc_connectivity_state_notify_on_state_change(
&chand->state_tracker, op->connectivity_state,
op->on_connectivity_state_change)
.state_already_changed) {
op->on_connectivity_state_change->next = call_list;
call_list = op->on_connectivity_state_change;
}
grpc_connectivity_state_notify_on_state_change(
&chand->state_tracker, op->connectivity_state,
op->on_connectivity_state_change, &call_list);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
@ -603,18 +590,17 @@ static void cc_start_transport_op(grpc_channel_element *elem,
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
&call_list);
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_lb_policy_shutdown(chand->lb_policy);
grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
}
grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
grpc_connectivity_state_end_flush(&f);
if (destroy_resolver) {
grpc_resolver_shutdown(destroy_resolver);
@ -622,15 +608,11 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (lb_policy) {
grpc_lb_policy_broadcast(lb_policy, op);
grpc_lb_policy_broadcast(lb_policy, op, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
while (call_list != NULL) {
grpc_iomgr_closure *next = call_list->next;
call_list->cb(call_list->cb_arg, 1);
call_list = next;
}
grpc_iomgr_call_list_run(call_list);
}
/* Constructor for call_data */
@ -740,7 +722,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
if (chand->waiting_for_config_closures != NULL ||
if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@ -751,14 +733,15 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect) {
grpc_channel_element *elem, int try_to_connect,
grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(chand->lb_policy);
grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
@ -775,16 +758,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete) {
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&chand->mu_config);
r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
state, on_complete);
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
on_complete, call_list);
gpr_mu_unlock(&chand->mu_config);
if (r.state_already_changed) {
on_complete->cb(on_complete->cb_arg, 1);
}
}
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(

@ -53,11 +53,12 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect);
grpc_channel_element *elem, int try_to_connect,
grpc_iomgr_call_list *call_list);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);

@ -108,9 +108,8 @@ void pf_destroy(grpc_lb_policy *pol) {
gpr_free(p);
}
void pf_shutdown(grpc_lb_policy *pol) {
void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
@ -118,51 +117,40 @@ void pf_shutdown(grpc_lb_policy *pol) {
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
"shutdown");
grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
"shutdown", call_list);
gpr_mu_unlock(&p->mu);
grpc_connectivity_state_end_flush(&f);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
pp->on_complete->cb(pp->on_complete->cb_arg, 0);
grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
pp = next;
}
}
/* returns a closure to call, or NULL */
static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
static void start_picking(pick_first_lb_policy *p,
grpc_iomgr_call_list *call_list) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
if (grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed)
.state_already_changed) {
return &p->connectivity_changed;
} else {
return NULL;
}
grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
&p->checking_connectivity,
&p->connectivity_changed, call_list);
}
void pf_exit_idle(grpc_lb_policy *pol) {
void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
call = start_picking(p);
start_picking(p, call_list);
}
gpr_mu_unlock(&p->mu);
if (call) {
call->cb(call->cb_arg, 1);
}
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete) {
grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -171,9 +159,8 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
call = start_picking(p);
start_picking(p, call_list);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@ -184,9 +171,6 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
if (call) {
call->cb(call->cb_arg, 1);
}
}
}
@ -194,8 +178,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
grpc_iomgr_closure *cbs = NULL;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
@ -203,14 +186,11 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
unref = 1;
} else if (p->selected != NULL) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"selected_changed");
"selected_changed", &call_list);
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
if (grpc_subchannel_notify_on_state_change(
p->selected, &p->checking_connectivity, &p->connectivity_changed)
.state_already_changed) {
p->connectivity_changed.next = cbs;
cbs = &p->connectivity_changed;
}
grpc_subchannel_notify_on_state_change(
p->selected, &p->checking_connectivity, &p->connectivity_changed,
&call_list);
} else {
unref = 1;
}
@ -219,28 +199,23 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
"connecting_ready");
"connecting_ready", &call_list);
p->selected = p->subchannels[p->checking_subchannel];
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
pp->on_complete->next = cbs;
cbs = pp->on_complete;
grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
if (grpc_subchannel_notify_on_state_change(p->selected,
&p->checking_connectivity,
&p->connectivity_changed)
.state_already_changed) {
p->connectivity_changed.next = cbs;
cbs = &p->connectivity_changed;
}
grpc_subchannel_notify_on_state_change(
p->selected, &p->checking_connectivity, &p->connectivity_changed,
&call_list);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
"connecting_transient_failure", &call_list);
del_interested_parties_locked(p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
@ -248,13 +223,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel],
&p->checking_connectivity, &p->connectivity_changed)
.state_already_changed) {
p->connectivity_changed.next = cbs;
cbs = &p->connectivity_changed;
}
grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed, &call_list);
} else {
goto loop;
}
@ -262,14 +233,10 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"connecting_changed");
if (grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel],
&p->checking_connectivity, &p->connectivity_changed)
.state_already_changed) {
p->connectivity_changed.next = cbs;
cbs = &p->connectivity_changed;
}
"connecting_changed", &call_list);
grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed, &call_list);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@ -280,19 +247,18 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE,
"no_more_channels");
"no_more_channels", &call_list);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
pp->on_complete->next = cbs;
cbs = pp->on_complete;
grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
} else {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"subchannel_failed");
"subchannel_failed", &call_list);
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
@ -302,22 +268,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
grpc_connectivity_state_end_flush(&f);
while (cbs != NULL) {
grpc_iomgr_closure *next = cbs->next;
cbs->cb(cbs->cb_arg, 1);
cbs = next;
}
grpc_iomgr_call_list_run(call_list);
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
}
}
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
@ -339,7 +300,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
gpr_free(subchannels);
}
static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
static grpc_connectivity_state pf_check_connectivity(
grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
@ -348,16 +310,15 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
static grpc_connectivity_state_notify_on_state_change_result
pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
grpc_iomgr_closure *notify) {
void pf_notify_on_state_change(grpc_lb_policy *pol,
grpc_connectivity_state *current,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
notify);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
notify, call_list);
gpr_mu_unlock(&p->mu);
return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {

@ -63,33 +63,38 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) {
}
}
void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
policy->vtable->shutdown(policy);
void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list) {
policy->vtable->shutdown(policy, call_list);
}
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
grpc_iomgr_closure *on_complete) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete);
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete,
call_list);
}
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
policy->vtable->broadcast(policy, op);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list) {
policy->vtable->broadcast(policy, op, call_list);
}
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
policy->vtable->exit_idle(policy);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list) {
policy->vtable->exit_idle(policy, call_list);
}
grpc_connectivity_state_notify_on_state_change_result
grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure) {
return policy->vtable->notify_on_state_change(policy, state, closure);
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list) {
policy->vtable->notify_on_state_change(policy, state, closure, call_list);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy) {
return policy->vtable->check_connectivity(policy);
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) {
return policy->vtable->check_connectivity(policy, call_list);
}

@ -53,28 +53,31 @@ struct grpc_lb_policy {
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy *policy);
void (*shutdown)(grpc_lb_policy *policy);
void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** implement grpc_lb_policy_pick */
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list);
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_lb_policy *policy);
void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** broadcast a transport op to all subchannels */
void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list);
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy);
grpc_connectivity_state (*check_connectivity)(
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
grpc_connectivity_state_notify_on_state_change_result (
*notify_on_state_change)(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure);
void (*notify_on_state_change)(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@ -98,7 +101,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list);
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
@ -107,18 +111,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
grpc_iomgr_closure *on_complete,
grpc_iomgr_call_list *call_list);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
grpc_iomgr_call_list *call_list);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
grpc_iomgr_call_list *call_list);
grpc_connectivity_state_notify_on_state_change_result
grpc_lb_policy_notify_on_state_change(
grpc_lb_policy *policy, grpc_connectivity_state *state,
grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT;
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_iomgr_closure *closure,
grpc_iomgr_call_list *call_list);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
grpc_lb_policy *policy);
grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */

@ -146,7 +146,8 @@ struct grpc_subchannel_call {
static grpc_subchannel_call *create_call(connection *con);
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason);
const char *reason,
grpc_iomgr_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
@ -333,16 +334,18 @@ static void start_connect(grpc_subchannel *c) {
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
w4c->notify);
w4c->notify, &call_list);
GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify) {
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@ -365,15 +368,12 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
grpc_connectivity_state_flusher f;
c->connecting = 1;
connectivity_state_changed_locked(c, "create_call");
connectivity_state_changed_locked(c, "create_call", call_list);
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
start_connect(c);
} else {
@ -390,33 +390,26 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
grpc_connectivity_state_notify_on_state_change_result
grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list) {
int do_connect = 0;
grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&c->mu);
r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
notify);
if (r.current_state_is_idle) {
grpc_connectivity_state_flusher f;
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
notify, call_list)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
connectivity_state_changed_locked(c, "state_change");
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
} else {
gpr_mu_unlock(&c->mu);
connectivity_state_changed_locked(c, "state_change", call_list);
}
gpr_mu_unlock(&c->mu);
if (do_connect) {
start_connect(c);
}
return r;
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@ -424,24 +417,20 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
CONNECTION_REF_LOCKED(con, "transport-op");
}
if (op->disconnect) {
grpc_connectivity_state_flusher f;
c->disconnected = 1;
connectivity_state_changed_locked(c, "disconnect");
connectivity_state_changed_locked(c, "disconnect", &call_list);
if (c->have_alarm) {
cancel_alarm = 1;
}
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
} else {
gpr_mu_unlock(&c->mu);
}
gpr_mu_unlock(&c->mu);
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@ -464,6 +453,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
if (op->disconnect) {
grpc_connector_shutdown(c->connector);
}
grpc_iomgr_call_list_run(call_list);
}
static void on_state_changed(void *p, int iomgr_success) {
@ -474,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(mu);
@ -508,26 +499,26 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_connectivity_state_set(
&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
: GRPC_CHANNEL_TRANSIENT_FAILURE,
"connection_failed");
"connection_failed", &call_list);
break;
}
done:
connectivity_state_changed_locked(c, "transport_state_changed");
connectivity_state_changed_locked(c, "transport_state_changed", &call_list);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(mu);
grpc_connectivity_state_end_flush(&f);
if (destroy) {
subchannel_destroy(c);
}
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
grpc_iomgr_call_list_run(call_list);
}
static void publish_transport(grpc_subchannel *c) {
static void publish_transport(grpc_subchannel *c,
grpc_iomgr_call_list *call_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@ -538,7 +529,6 @@ static void publish_transport(grpc_subchannel *c) {
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
grpc_connectivity_state_flusher f;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@ -601,17 +591,15 @@ static void publish_transport(grpc_subchannel *c) {
elem->filter->start_transport_op(elem, &op);
/* signal completion */
connectivity_state_changed_locked(c, "connected");
connectivity_state_changed_locked(c, "connected", call_list);
w4c = c->waiting;
c->waiting = NULL;
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
while (w4c != NULL) {
waiting_for_connect *next = w4c;
w4c->continuation.cb(w4c->continuation.cb_arg, 1);
grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
w4c = next;
}
@ -653,16 +641,14 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
connectivity_state_changed_locked(c, "alarm");
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
connectivity_state_changed_locked(c, "alarm", &call_list);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(c);
@ -670,24 +656,24 @@ static void on_alarm(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(c, "connecting");
}
grpc_iomgr_call_list_run(call_list);
}
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
publish_transport(c);
publish_transport(c, &call_list);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
connectivity_state_changed_locked(c, "connect_failed");
connectivity_state_changed_locked(c, "connect_failed", &call_list);
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
grpc_connectivity_state_end_flush(&f);
}
grpc_iomgr_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@ -718,9 +704,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
}
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason) {
const char *reason,
grpc_iomgr_call_list *call_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
grpc_connectivity_state_set(&c->state_tracker, current, reason);
grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
}
/*

@ -76,7 +76,8 @@ void grpc_subchannel_call_unref(
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify);
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
@ -88,10 +89,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
grpc_connectivity_state_notify_on_state_change_result
grpc_subchannel_notify_on_state_change(
grpc_subchannel *channel, grpc_connectivity_state *state,
grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify,
grpc_iomgr_call_list *call_list);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,

@ -157,3 +157,42 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
closure->next = NULL;
}
void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
grpc_iomgr_closure *closure, int success) {
if (!closure) return;
closure->next = NULL;
closure->success = success;
if (!call_list->head) {
call_list->head = closure;
} else {
call_list->tail->next = closure;
}
call_list->tail = closure;
}
void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
grpc_iomgr_closure *c = call_list.head;
while (c) {
grpc_iomgr_closure *next = c->next;
c->cb(c->cb_arg, c->success);
c = next;
}
}
int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
return call_list.head == NULL;
}
void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
grpc_iomgr_call_list *dst) {
if (dst->head == NULL) {
*dst = *src;
return;
}
if (src->head == NULL) {
return;
}
dst->tail->next = src->head;
dst->tail = src->tail;
}

@ -58,10 +58,25 @@ typedef struct grpc_iomgr_closure {
struct grpc_iomgr_closure *next;
} grpc_iomgr_closure;
typedef struct grpc_iomgr_call_list {
grpc_iomgr_closure *head;
grpc_iomgr_closure *tail;
} grpc_iomgr_call_list;
/** Initializes \a closure with \a cb and \a cb_arg. */
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
#define GRPC_IOMGR_CALL_LIST_INIT \
{ NULL, NULL }
void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
grpc_iomgr_closure *closure, int success);
void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
grpc_iomgr_call_list *dst);
int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
/** Initializes the iomgr. */
void grpc_iomgr_init(void);

@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_workqueue_push(fd->workqueue, &da->closure, 1);
grpc_pollset_add_unlock_job(pollset, &da->closure);
}
}

@ -136,6 +136,8 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->idle_jobs = NULL;
pollset->unlock_jobs = NULL;
become_basic_pollset(pollset, NULL);
}
@ -145,7 +147,6 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
gpr_mu_lock(&pollset->mu);
pollset->vtable->add_fd(pollset, fd, 1);
grpc_workqueue_flush(fd->workqueue, 0);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@ -174,6 +175,33 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
grpc_iomgr_closure *exec = *root;
*root = NULL;
gpr_mu_unlock(&pollset->mu);
while (exec != NULL) {
grpc_iomgr_closure *next = exec->next;
exec->cb(exec->cb_arg, 1);
exec = next;
}
gpr_mu_lock(&pollset->mu);
}
static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
closure->next = *root;
*root = closure;
}
void grpc_pollset_add_idle_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure) {
add_job(&pollset->idle_jobs, closure);
}
void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure) {
add_job(&pollset->unlock_jobs, closure);
}
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
@ -182,6 +210,14 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) {
run_jobs(pollset, &pollset->idle_jobs);
goto done;
}
if (pollset->unlock_jobs != NULL) {
run_jobs(pollset, &pollset->unlock_jobs);
goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
goto done;
}
@ -301,12 +337,7 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
gpr_mu_unlock(&pollset->mu);
return;
}
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
@ -390,13 +421,12 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GRPC_FD_REF(fd, "basicpoll_add");
pollset->in_flight_cbs++;
up_args = gpr_malloc(sizeof(*up_args));
up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:

@ -37,6 +37,7 @@
#include <poll.h>
#include <grpc/support/sync.h>
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@ -66,6 +67,8 @@ typedef struct grpc_pollset {
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
grpc_iomgr_closure *unlock_jobs;
grpc_iomgr_closure *idle_jobs;
union {
int fd;
void *ptr;
@ -124,4 +127,11 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
/** schedule a closure to be run next time there are no active workers */
void grpc_pollset_add_idle_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure);
/** schedule a closure to be run next time the pollset is unlocked */
void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
grpc_iomgr_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -77,7 +77,6 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
grpc_iomgr_closure destroy_closure;
char *target;
grpc_workqueue *workqueue;
};
@ -273,8 +272,7 @@ void grpc_channel_internal_ref(grpc_channel *c) {
gpr_ref(&c->refs);
}
static void destroy_channel(void *p, int ok) {
grpc_channel *channel = p;
static void destroy_channel(grpc_channel *channel) {
size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
@ -312,9 +310,7 @@ void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) {
void grpc_channel_internal_unref(grpc_channel *channel) {
#endif
if (gpr_unref(&channel->refs)) {
channel->destroy_closure.cb = destroy_channel;
channel->destroy_closure.cb_arg = channel;
grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
destroy_channel(channel);
}
}

@ -45,6 +45,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_connectivity_state state;
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
@ -52,8 +54,10 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
client_channel_elem->filter->name);
return GRPC_CHANNEL_FATAL_FAILURE;
}
return grpc_client_channel_check_connectivity_state(client_channel_elem,
try_to_connect);
state = grpc_client_channel_check_connectivity_state(
client_channel_elem, try_to_connect, &call_list);
grpc_iomgr_call_list_run(call_list);
return state;
}
typedef enum {
@ -154,6 +158,7 @@ void grpc_channel_watch_connectivity_state(
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
state_watcher *w = gpr_malloc(sizeof(*w));
grpc_cq_begin_op(cq);
@ -176,13 +181,14 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
1);
grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
&w->on_complete);
&w->on_complete, &call_list);
}
grpc_iomgr_call_list_run(call_list);
}

@ -164,8 +164,7 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
grpc_iomgr_closure *pending_closures_head;
grpc_iomgr_closure *pending_closures_tail;
grpc_iomgr_call_list run_at_unlock;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
@ -568,11 +567,6 @@ int grpc_chttp2_list_pop_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
/** schedule a closure to run without the transport lock taken */
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(

@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing(
stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;

@ -499,32 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
grpc_connectivity_state_flusher f;
grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT;
unlock_check_read_write_state(t);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
run_closures = t->global.pending_closures_head;
t->global.pending_closures_head = NULL;
t->global.pending_closures_tail = NULL;
grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f);
GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock);
gpr_mu_unlock(&t->mu);
grpc_connectivity_state_end_flush(&f);
while (run_closures) {
grpc_iomgr_closure *next = run_closures->next;
run_closures->cb(run_closures->cb_arg, run_closures->success);
run_closures = next;
}
grpc_iomgr_call_list_run(run);
}
/*
@ -676,8 +664,8 @@ static void perform_stream_op_locked(
}
} else {
grpc_sopb_reset(op->send_ops);
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 0);
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 0);
}
}
@ -715,9 +703,8 @@ static void perform_stream_op_locked(
op->bind_pollset);
}
if (op->on_consumed) {
grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1);
}
grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed,
1);
}
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
@ -754,18 +741,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
lock(t);
if (op->on_consumed) {
grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1);
}
grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
if (op->on_connectivity_state_change) {
if (grpc_connectivity_state_notify_on_state_change(
&t->channel_callback.state_tracker, op->connectivity_state,
op->on_connectivity_state_change)
.state_already_changed) {
grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change,
1);
}
grpc_connectivity_state_notify_on_state_change(
&t->channel_callback.state_tracker, op->connectivity_state,
op->on_connectivity_state_change, &t->global.run_at_unlock);
}
if (op->send_goaway) {
@ -887,8 +868,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
@ -938,8 +919,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
grpc_chttp2_schedule_closure(transport_global,
stream_global->recv_done_closure, 1);
grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;
@ -1200,21 +1181,7 @@ static void connectivity_state_set(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
state, reason);
}
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
if (transport_global->pending_closures_tail == NULL) {
transport_global->pending_closures_head =
transport_global->pending_closures_tail = closure;
} else {
transport_global->pending_closures_tail->next = closure;
transport_global->pending_closures_tail = closure;
}
closure->next = NULL;
state, reason, &transport_global->run_at_unlock);
}
/*

@ -64,7 +64,6 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
tracker->current_state = init_state;
tracker->watchers = NULL;
tracker->name = gpr_strdup(name);
tracker->changed = 0;
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@ -90,20 +89,17 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
grpc_connectivity_state_notify_on_state_change_result
grpc_connectivity_state_notify_on_state_change(
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify) {
grpc_connectivity_state_notify_on_state_change_result result;
memset(&result, 0, sizeof(result));
grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(tracker->current_state));
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
tracker->name, grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(tracker->current_state), notify);
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
result.state_already_changed = 1;
grpc_iomgr_call_list_add(call_list, notify, 1);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@ -111,13 +107,14 @@ grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE;
return result;
return tracker->current_state == GRPC_CHANNEL_IDLE;
}
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason) {
const char *reason,
grpc_iomgr_call_list *call_list) {
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
grpc_connectivity_state_name(tracker->current_state),
@ -128,40 +125,11 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
}
GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
tracker->current_state = state;
tracker->changed = 1;
}
void grpc_connectivity_state_begin_flush(
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state_flusher *flusher) {
grpc_connectivity_state_watcher *w;
flusher->cbs = NULL;
if (!tracker->changed) return;
w = tracker->watchers;
tracker->watchers = NULL;
while (w != NULL) {
grpc_connectivity_state_watcher *next = w->next;
if (tracker->current_state != *w->current) {
*w->current = tracker->current_state;
w->notify->next = flusher->cbs;
flusher->cbs = w->notify;
gpr_free(w);
} else {
w->next = tracker->watchers;
tracker->watchers = w;
}
w = next;
}
tracker->changed = 0;
}
void grpc_connectivity_state_end_flush(
grpc_connectivity_state_flusher *flusher) {
grpc_iomgr_closure *c = flusher->cbs;
while (c != NULL) {
grpc_iomgr_closure *next = c;
c->cb(c->cb_arg, 1);
c = next;
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
grpc_iomgr_call_list_add(call_list, w->notify, 1);
gpr_free(w);
}
}

@ -54,12 +54,8 @@ typedef struct {
grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */
char *name;
/** has this state been changed since the last flush? */
int changed;
} grpc_connectivity_state_tracker;
typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher;
extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
@ -71,35 +67,15 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
* external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason);
/** Begin flushing callbacks; not thread safe; access must be serialized using
* the same external lock as grpc_connectivity_state_set. Initializes flusher.
*/
void grpc_connectivity_state_begin_flush(
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state_flusher *flusher);
/** Complete flushing updates: must not be called with any locks held */
void grpc_connectivity_state_end_flush(
grpc_connectivity_state_flusher *flusher);
const char *reason,
grpc_iomgr_call_list *call_list);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
typedef struct {
/** 1 if the current state is idle (a hint to begin connecting), 0 otherwise
*/
int current_state_is_idle;
/** 1 if the state has already changed: in this case the closure passed to
* grpc_connectivity_state_notify_on_state_change will not be called */
int state_already_changed;
} grpc_connectivity_state_notify_on_state_change_result;
/** Return 1 if the channel should start connecting, 0 otherwise */
grpc_connectivity_state_notify_on_state_change_result
grpc_connectivity_state_notify_on_state_change(
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */

Loading…
Cancel
Save