Merge pull request #1982 from ctiller/cereal-will-make-you-limp

Split channel level and call level callback serialization
pull/1994/head
David G. Quintas 10 years ago
commit 5ce0469d3f
  1. 66
      src/core/transport/chttp2_transport.c

@ -230,7 +230,10 @@ struct transport {
/* basic state management - what are we doing at the moment? */ /* basic state management - what are we doing at the moment? */
gpr_uint8 reading; gpr_uint8 reading;
gpr_uint8 writing; gpr_uint8 writing;
gpr_uint8 calling_back; /** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying; gpr_uint8 destroying;
gpr_uint8 closed; gpr_uint8 closed;
error_state error_state; error_state error_state;
@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value); gpr_uint32 value);
static int prepare_callbacks(transport *t); static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); static void run_callbacks(transport *t);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t); static int prepare_write(transport *t);
@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
} }
gpr_mu_lock(&t->mu); gpr_mu_lock(&t->mu);
t->calling_back = 1; t->calling_back_channel = 1;
ref_transport(t); /* matches unref at end of this function */ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
lock(t); lock(t);
t->cb = sr.callbacks; t->cb = sr.callbacks;
t->cb_user_data = sr.user_data; t->cb_user_data = sr.user_data;
t->calling_back = 0; t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv); if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t); unlock(t);
@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */ becomes 0. */
while (t->calling_back || t->writing) { while (t->calling_back_channel || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
} }
drop_connection(t); drop_connection(t);
@ -830,28 +833,29 @@ static void unlock(transport *t) {
finish_reads(t); finish_reads(t);
/* gather any callbacks that need to be made */ /* gather any callbacks that need to be made */
if (!t->calling_back) { if (!t->calling_back_ops) {
t->calling_back = perform_callbacks = prepare_callbacks(t); t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
if (cb) { if (perform_callbacks) ref_transport(t);
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back = 1;
}
}
} }
if (perform_callbacks || call_closed || num_goaways) { if (!t->calling_back_channel && cb) {
ref_transport(t); if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back_channel = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back_channel = 1;
}
if (call_closed || num_goaways) {
ref_transport(t);
}
} }
/* finally unlock */ /* finally unlock */
@ -865,7 +869,11 @@ static void unlock(transport *t) {
} }
if (perform_callbacks) { if (perform_callbacks) {
run_callbacks(t, cb); run_callbacks(t);
lock(t);
t->calling_back_ops = 0;
unlock(t);
unref_transport(t);
} }
if (call_closed) { if (call_closed) {
@ -878,9 +886,9 @@ static void unlock(transport *t) {
perform_write(t, ep); perform_write(t, ep);
} }
if (perform_callbacks || call_closed || num_goaways) { if (call_closed || num_goaways) {
lock(t); lock(t);
t->calling_back = 0; t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv); if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t); unlock(t);
unref_transport(t); unref_transport(t);
@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) {
return t->executing_callbacks.count > 0; return t->executing_callbacks.count > 0;
} }
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { static void run_callbacks(transport *t) {
size_t i; size_t i;
for (i = 0; i < t->executing_callbacks.count; i++) { for (i = 0; i < t->executing_callbacks.count; i++) {
op_closure c = t->executing_callbacks.callbacks[i]; op_closure c = t->executing_callbacks.callbacks[i];

Loading…
Cancel
Save