Move channel connection pollset to be across all attempts

Slightly increases the amount of polling we have to do, but lets us move
interest registration under the channel lock, simplifying our code (and
fixing a bug)
pull/2105/head
Craig Tiller 10 years ago
parent a603a450a9
commit 4cf08fbd55
  1. 12
      src/core/channel/client_channel.c
  2. 24
      src/core/channel/client_setup.c

@ -102,10 +102,17 @@ struct call_data {
static int prepare_activate(grpc_call_element *elem, static int prepare_activate(grpc_call_element *elem,
grpc_child_channel *on_child) { grpc_child_channel *on_child) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (calld->state == CALL_CANCELLED) return 0; if (calld->state == CALL_CANCELLED) return 0;
/* no more access to calld->s.waiting allowed */ /* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING); GPR_ASSERT(calld->state == CALL_WAITING);
if (calld->s.waiting_op.bind_pollset) {
grpc_transport_setup_del_interested_party(chand->transport_setup,
calld->s.waiting_op.bind_pollset);
}
calld->state = CALL_ACTIVE; calld->state = CALL_ACTIVE;
/* create a child call */ /* create a child call */
@ -199,6 +206,7 @@ static void cc_start_transport_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op); handle_op_after_cancellation(elem, op);
} else { } else {
calld->state = CALL_WAITING; calld->state = CALL_WAITING;
calld->s.waiting_op.bind_pollset = NULL;
if (chand->active_child) { if (chand->active_child) {
/* channel is connected - use the connected stack */ /* channel is connected - use the connected stack */
if (prepare_activate(elem, chand->active_child)) { if (prepare_activate(elem, chand->active_child)) {
@ -230,14 +238,14 @@ static void cc_start_transport_op(grpc_call_element *elem,
} }
calld->s.waiting_op = *op; calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld; chand->waiting_children[chand->waiting_child_count++] = calld;
grpc_transport_setup_add_interested_party(chand->transport_setup,
op->bind_pollset);
gpr_mu_unlock(&chand->mu); gpr_mu_unlock(&chand->mu);
/* finally initiate transport setup if needed */ /* finally initiate transport setup if needed */
if (initiate_transport_setup) { if (initiate_transport_setup) {
grpc_transport_setup_initiate(chand->transport_setup); grpc_transport_setup_initiate(chand->transport_setup);
} }
grpc_transport_setup_add_interested_party(chand->transport_setup,
op->bind_pollset);
} }
} }
break; break;

@ -56,12 +56,12 @@ struct grpc_client_setup {
gpr_cv cv; gpr_cv cv;
grpc_client_setup_request *active_request; grpc_client_setup_request *active_request;
int refs; int refs;
grpc_pollset_set interested_parties;
}; };
struct grpc_client_setup_request { struct grpc_client_setup_request {
/* pointer back to the setup object */ /* pointer back to the setup object */
grpc_client_setup *setup; grpc_client_setup *setup;
grpc_pollset_set interested_parties;
gpr_timespec deadline; gpr_timespec deadline;
}; };
@ -71,7 +71,7 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
grpc_pollset_set *grpc_client_setup_get_interested_parties( grpc_pollset_set *grpc_client_setup_get_interested_parties(
grpc_client_setup_request *r) { grpc_client_setup_request *r) {
return &r->interested_parties; return &r->setup->interested_parties;
} }
static void destroy_setup(grpc_client_setup *s) { static void destroy_setup(grpc_client_setup *s) {
@ -79,11 +79,11 @@ static void destroy_setup(grpc_client_setup *s) {
gpr_cv_destroy(&s->cv); gpr_cv_destroy(&s->cv);
s->done(s->user_data); s->done(s->user_data);
grpc_channel_args_destroy(s->args); grpc_channel_args_destroy(s->args);
grpc_pollset_set_destroy(&s->interested_parties);
gpr_free(s); gpr_free(s);
} }
static void destroy_request(grpc_client_setup_request *r) { static void destroy_request(grpc_client_setup_request *r) {
grpc_pollset_set_destroy(&r->interested_parties);
gpr_free(r); gpr_free(r);
} }
@ -94,7 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) {
int in_alarm = 0; int in_alarm = 0;
r->setup = s; r->setup = s;
grpc_pollset_set_init(&r->interested_parties);
/* TODO(klempner): Actually set a deadline */ /* TODO(klempner): Actually set a deadline */
r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
@ -125,12 +124,10 @@ static void setup_add_interested_party(grpc_transport_setup *sp,
grpc_client_setup *s = (grpc_client_setup *)sp; grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
if (!s->active_request) {
gpr_mu_unlock(&s->mu);
return;
}
grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset); gpr_log(GPR_DEBUG, "addip: %p %p", sp, pollset);
grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
@ -140,12 +137,10 @@ static void setup_del_interested_party(grpc_transport_setup *sp,
grpc_client_setup *s = (grpc_client_setup *)sp; grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
if (!s->active_request) {
gpr_mu_unlock(&s->mu);
return;
}
grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset); gpr_log(GPR_DEBUG, "delip: %p %p", sp, pollset);
grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
@ -225,6 +220,7 @@ void grpc_client_setup_create_and_attach(
s->in_alarm = 0; s->in_alarm = 0;
s->in_cb = 0; s->in_cb = 0;
s->cancelled = 0; s->cancelled = 0;
grpc_pollset_set_init(&s->interested_parties);
grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base); grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
} }

Loading…
Cancel
Save