stop mixing gpr mutexes and the ruby gil to fix channel closing deadlock

pull/9986/head
Alexander Polcyn 8 years ago
parent 70bc4921e1
commit 563ec5324f
  1. 10
      src/ruby/end2end/channel_closing_client.rb
  2. 69
      src/ruby/ext/grpc/rb_channel.c

@ -36,9 +36,8 @@ class ChannelClosingClientController < ClientControl::ClientController::Service
@ch = ch
end
def shutdown(_, _)
STDERR.puts "about to close channel"
@ch.close
STDERR.puts "just closed channel"
ClientControl::Void.new
end
end
@ -63,12 +62,13 @@ def main
srv.run
end
# this should break out once the channel is closed
# this should break out with an exception once the channel is closed
loop do
state = ch.connectivity_state(true)
begin
state = ch.connectivity_state(true)
ch.watch_connectivity_state(state, Time.now + 360)
rescue RuntimeException => e
rescue RuntimeError => e
STDERR.puts "(expected) error occurred: #{e.inspect}"
break
end
end

@ -78,6 +78,7 @@ typedef struct grpc_rb_channel {
int safe_to_destroy;
grpc_connectivity_state current_connectivity_state;
int mu_init_done;
gpr_mu channel_mu;
gpr_cv channel_cv;
} grpc_rb_channel;
@ -106,6 +107,11 @@ static void grpc_rb_channel_free(void *p) {
ch->wrapped = NULL;
}
if (ch->mu_init_done) {
gpr_mu_destroy(&ch->channel_mu);
gpr_cv_destroy(&ch->channel_cv);
}
xfree(p);
}
@ -164,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@ -185,6 +192,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_init(&wrapper->channel_mu);
gpr_cv_init(&wrapper->channel_cv);
wrapper->mu_init_done = 1;
gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
@ -244,13 +252,38 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
typedef struct watch_state_stack {
grpc_rb_channel *wrapper;
gpr_timespec deadline;
int last_state;
} watch_state_stack;
static void *watch_channel_state_without_gvl(void *arg) {
gpr_timespec deadline = ((watch_state_stack*)arg)->deadline;
grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper;
watch_state_stack *stack = (watch_state_stack*)arg;
gpr_timespec deadline = stack->deadline;
grpc_rb_channel *wrapper = stack->wrapper;
int last_state = stack->last_state;
gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->current_connectivity_state != last_state) {
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
}
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
}
if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
}
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
return NULL;
if (wrapper->current_connectivity_state != last_state) {
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)1;
}
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
}
static void watch_channel_state_unblocking_func(void *arg) {
@ -273,6 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
void* out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@ -286,33 +320,13 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
gpr_mu_unlock(&wrapper->channel_mu);
return Qtrue;
}
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel");
return Qfalse;
}
if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
stack.wrapper = wrapper;
stack.deadline = grpc_rb_time_timeval(deadline, 0);
rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
return Qfalse;
}
if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
gpr_mu_unlock(&wrapper->channel_mu);
stack.last_state = NUM2LONG(last_state);
out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
if (out) {
return Qtrue;
}
gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
@ -460,9 +474,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
GPR_ASSERT(wrapper->safe_to_destroy);
gpr_mu_unlock(&wrapper->channel_mu);
gpr_mu_destroy(&wrapper->channel_mu);
gpr_cv_destroy(&wrapper->channel_cv);
grpc_channel_destroy(wrapper->wrapped);
}

Loading…
Cancel
Save