From 5b881460d24bc930339d1cfd37805a7eadeee5c0 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 21 Mar 2017 18:31:29 -0700 Subject: [PATCH] make fewer lock/unlock calls and loop on cv_wait in watch conn state --- src/ruby/ext/grpc/rb_channel.c | 40 +++++++++++------------- src/ruby/spec/channel_connection_spec.rb | 2 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 1fe825efd64..c12ea921c98 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { grpc_connectivity_state current_connectivity_state; int mu_init_done; + int abort_watch_connectivity_state; gpr_mu channel_mu; gpr_cv channel_cv; } grpc_rb_channel; @@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->mu_init_done = 1; gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 0; wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; @@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM( - grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } typedef struct watch_state_stack { @@ -254,39 +255,35 @@ typedef struct watch_state_stack { static void *watch_channel_state_without_gvl(void *arg) { watch_state_stack *stack = (watch_state_stack*)arg; - gpr_timespec deadline = stack->deadline; grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; + void *return_value = (void*)0; + gpr_timespec time_check_increment = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment); } - if (wrapper->safe_to_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)1; + return_value = (void*)1; } gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + + return return_value; } static void watch_channel_state_unblocking_func(void *arg) { grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 1; gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); } @@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling( // Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { gpr_mu_lock(&wrapper->channel_mu); + wrapper->request_safe_destroy = 1; + while (!wrapper->safe_to_destroy) { - wrapper->request_safe_destroy = 1; gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index b344052a215..940d68b9b04 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -92,7 +92,7 @@ describe 'channel connection behavior' do end it 'observably connects and reconnects to transient server' \ - 'when using the channel state API' do + ' when using the channel state API' do port = start_server ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure)