Merge branch 'master' of https://github.com/grpc/grpc into leaderboard

pull/2151/head
Siddharth Rakesh 10 years ago
commit 9865e0d8a3
  1. 66
      src/core/transport/chttp2_transport.c
  2. 3
      test/cpp/qps/server_async.cc

@ -230,7 +230,10 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
gpr_uint8 writing;
gpr_uint8 calling_back;
/** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
error_state error_state;
@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
static void run_callbacks(transport *t);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t);
@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
}
gpr_mu_lock(&t->mu);
t->calling_back = 1;
t->calling_back_channel = 1;
ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
lock(t);
t->cb = sr.callbacks;
t->cb_user_data = sr.user_data;
t->calling_back = 0;
t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
while (t->calling_back || t->writing) {
while (t->calling_back_channel || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
@ -830,28 +833,29 @@ static void unlock(transport *t) {
finish_reads(t);
/* gather any callbacks that need to be made */
if (!t->calling_back) {
t->calling_back = perform_callbacks = prepare_callbacks(t);
if (cb) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back = 1;
}
}
if (!t->calling_back_ops) {
t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
if (perform_callbacks) ref_transport(t);
}
if (perform_callbacks || call_closed || num_goaways) {
ref_transport(t);
if (!t->calling_back_channel && cb) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back_channel = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back_channel = 1;
}
if (call_closed || num_goaways) {
ref_transport(t);
}
}
/* finally unlock */
@ -865,7 +869,11 @@ static void unlock(transport *t) {
}
if (perform_callbacks) {
run_callbacks(t, cb);
run_callbacks(t);
lock(t);
t->calling_back_ops = 0;
unlock(t);
unref_transport(t);
}
if (call_closed) {
@ -878,9 +886,9 @@ static void unlock(transport *t) {
perform_write(t, ep);
}
if (perform_callbacks || call_closed || num_goaways) {
if (call_closed || num_goaways) {
lock(t);
t->calling_back = 0;
t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
unref_transport(t);
@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) {
return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
static void run_callbacks(transport *t) {
size_t i;
for (i = 0; i < t->executing_callbacks.count; i++) {
op_closure c = t->executing_callbacks.callbacks[i];

@ -101,10 +101,11 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
std::lock_guard<std::mutex> g(shutdown_mutex_);
std::unique_lock<std::mutex> g(shutdown_mutex_);
if (!shutdown_) {
// this RPC context is done, so refresh it
if (!still_going) {
g.unlock();
ctx->Reset();
}
} else {

Loading…
Cancel
Save