ruby: Raise instead of hanging if grpc is used before and after fork

pull/16332/head
Dylan Thacker-Smith 6 years ago
parent 07eecd8421
commit 78f3f44efb
  1. 1
      src/ruby/ext/grpc/rb_call.c
  2. 3
      src/ruby/ext/grpc/rb_channel.c
  3. 32
      src/ruby/ext/grpc/rb_grpc.c
  4. 2
      src/ruby/ext/grpc/rb_grpc.h
  5. 2
      src/ruby/ext/grpc/rb_server.c
  6. 44
      src/ruby/spec/channel_spec.rb

@ -819,6 +819,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
unsigned write_flag = 0;
void* tag = (void*)&st;
grpc_ruby_fork_guard();
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
return Qnil;

@ -217,6 +217,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
grpc_ruby_fork_guard();
rb_thread_call_without_gvl(
wait_until_channel_polling_thread_started_no_gil,
&stop_waiting_for_thread_start,
@ -374,6 +375,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
watch_state_stack stack;
void* op_success = 0;
grpc_ruby_fork_guard();
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
if (wrapper->bg_wrapped == NULL) {
@ -415,6 +417,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
grpc_slice* host_slice_ptr = NULL;
char* tmp_str = NULL;
grpc_ruby_fork_guard();
if (host != Qnil) {
host_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));

@ -23,9 +23,13 @@
#include <math.h>
#include <ruby/vm.h>
#include <stdbool.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "rb_call.h"
#include "rb_call_credentials.h"
@ -255,7 +259,26 @@ static void Init_grpc_time_consts() {
id_tv_nsec = rb_intern("tv_nsec");
}
static void grpc_rb_shutdown(void) { grpc_shutdown(); }
#if GPR_WINDOWS
static void grpc_ruby_set_init_pid(void) {}
static bool grpc_ruby_forked_after_init(void) { return false; }
#else
static pid_t grpc_init_pid;
static void grpc_ruby_set_init_pid(void) {
GPR_ASSERT(grpc_init_pid == 0);
grpc_init_pid = getpid();
}
static bool grpc_ruby_forked_after_init(void) {
GPR_ASSERT(grpc_init_pid != 0);
return grpc_init_pid != getpid();
}
#endif
static void grpc_rb_shutdown(void) {
if (!grpc_ruby_forked_after_init()) grpc_shutdown();
}
/* Initialize the GRPC module structs */
@ -276,10 +299,17 @@ VALUE sym_metadata = Qundef;
static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() {
grpc_ruby_set_init_pid();
grpc_init();
atexit(grpc_rb_shutdown);
}
void grpc_ruby_fork_guard() {
if (grpc_ruby_forked_after_init()) {
rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking");
}
}
static VALUE bg_thread_init_rb_mu = Qundef;
static int bg_thread_init_done = 0;

@ -69,4 +69,6 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval);
void grpc_ruby_once_init();
void grpc_ruby_fork_guard();
#endif /* GRPC_RB_H_ */

@ -243,6 +243,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server* s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
grpc_ruby_fork_guard();
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "destroyed!");
} else {

@ -13,6 +13,7 @@
# limitations under the License.
require 'spec_helper'
require 'English'
def load_test_certs
test_root = File.join(File.dirname(__FILE__), 'testdata')
@ -27,6 +28,28 @@ describe GRPC::Core::Channel do
GRPC::Core::ChannelCredentials.new(load_test_certs[0])
end
def fork_with_propagated_error_message
pipe_read, pipe_write = IO.pipe
pid = fork do
pipe_read.close
begin
yield
rescue => exc
pipe_write.syswrite(exc.message)
end
pipe_write.close
end
pipe_write.close
exc_message = pipe_read.read
Process.wait(pid)
unless $CHILD_STATUS.success?
raise "forked process failed with #{$CHILD_STATUS}"
end
raise exc_message unless exc_message.empty?
end
shared_examples '#new' do
it 'take a host name without channel args' do
blk = proc do
@ -79,6 +102,14 @@ describe GRPC::Core::Channel do
blk = construct_with_args(args)
expect(&blk).to_not raise_error
end
it 'raises if grpc was initialized in another process' do
blk = construct_with_args({})
expect(&blk).not_to raise_error
expect do
fork_with_propagated_error_message(&blk)
end.to raise_error(RuntimeError, 'grpc cannot be used before and after forking')
end
end
describe '#new for secure channels' do
@ -121,6 +152,19 @@ describe GRPC::Core::Channel do
end
expect(&blk).to raise_error(RuntimeError)
end
it 'raises if grpc was initialized in another process' do
ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure)
deadline = Time.now + 5
blk = proc do
fork_with_propagated_error_message do
ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end
end
expect(&blk).to raise_error(RuntimeError, 'grpc cannot be used before and after forking')
end
end
describe '#destroy' do

Loading…
Cancel
Save