diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index be325975920..cea9620081f 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -134,8 +134,7 @@ static void grpc_rb_call_credentials_plugin_destroy(void* state) { // Not sure what needs to be done here } -/* Destroys the credentials instances. */ -static void grpc_rb_call_credentials_free(void* p) { +static void grpc_rb_call_credentials_free_internal(void* p) { grpc_rb_call_credentials* wrapper; if (p == NULL) { return; @@ -143,10 +142,15 @@ static void grpc_rb_call_credentials_free(void* p) { wrapper = (grpc_rb_call_credentials*)p; grpc_call_credentials_release(wrapper->wrapped); wrapper->wrapped = NULL; - xfree(p); } +/* Destroys the credentials instances. */ +static void grpc_rb_call_credentials_free(void* p) { + grpc_rb_call_credentials_free_internal(p); + grpc_ruby_shutdown(); +} + /* Protects the mark object from GC */ static void grpc_rb_call_credentials_mark(void* p) { grpc_rb_call_credentials* wrapper = NULL; @@ -175,6 +179,7 @@ static rb_data_type_t grpc_rb_call_credentials_data_type = { /* Allocates CallCredentials instances. Provides safe initial defaults for the instance fields. */ static VALUE grpc_rb_call_credentials_alloc(VALUE cls) { + grpc_ruby_init(); grpc_rb_call_credentials* wrapper = ALLOC(grpc_rb_call_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; @@ -212,8 +217,6 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) { grpc_call_credentials* creds = NULL; grpc_metadata_credentials_plugin plugin; - grpc_ruby_once_init(); - TypedData_Get_Struct(self, grpc_rb_call_credentials, &grpc_rb_call_credentials_data_type, wrapper); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 5bde962f788..d789e5a4362 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -143,14 +143,12 @@ static void* channel_safe_destroy_without_gil(void* arg) { return NULL; } -/* Destroys Channel instances. */ -static void grpc_rb_channel_free(void* p) { +static void grpc_rb_channel_free_internal(void* p) { grpc_rb_channel* ch = NULL; if (p == NULL) { return; }; ch = (grpc_rb_channel*)p; - if (ch->bg_wrapped != NULL) { /* assumption made here: it's ok to directly gpr_mu_lock the global * connection polling mutex because we're in a finalizer, @@ -159,10 +157,15 @@ static void grpc_rb_channel_free(void* p) { grpc_rb_channel_safe_destroy(ch->bg_wrapped); ch->bg_wrapped = NULL; } - xfree(p); } +/* Destroys Channel instances. */ +static void grpc_rb_channel_free(void* p) { + grpc_rb_channel_free_internal(p); + grpc_ruby_shutdown(); +} + /* Protects the mark object from GC */ static void grpc_rb_channel_mark(void* p) { grpc_rb_channel* channel = NULL; @@ -189,6 +192,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel", /* Allocates grpc_rb_channel instances. */ static VALUE grpc_rb_channel_alloc(VALUE cls) { + grpc_ruby_init(); grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel); wrapper->bg_wrapped = NULL; wrapper->credentials = Qnil; @@ -216,7 +220,6 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) { int stop_waiting_for_thread_start = 0; MEMZERO(&args, grpc_channel_args, 1); - grpc_ruby_once_init(); grpc_ruby_fork_guard(); rb_thread_call_without_gvl( wait_until_channel_polling_thread_started_no_gil, @@ -682,9 +685,10 @@ static VALUE run_poll_channels_loop(VALUE arg) { gpr_log( GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + grpc_ruby_init(); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, run_poll_channels_loop_unblocking_func, NULL); - + grpc_ruby_shutdown(); return Qnil; } diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index 178224c6e00..970bc4eeb11 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -48,8 +48,7 @@ typedef struct grpc_rb_channel_credentials { grpc_channel_credentials* wrapped; } grpc_rb_channel_credentials; -/* Destroys the credentials instances. */ -static void grpc_rb_channel_credentials_free(void* p) { +static void grpc_rb_channel_credentials_free_internal(void* p) { grpc_rb_channel_credentials* wrapper = NULL; if (p == NULL) { return; @@ -61,6 +60,12 @@ static void grpc_rb_channel_credentials_free(void* p) { xfree(p); } +/* Destroys the credentials instances. */ +static void grpc_rb_channel_credentials_free(void* p) { + grpc_rb_channel_credentials_free_internal(p); + grpc_ruby_shutdown(); +} + /* Protects the mark object from GC */ static void grpc_rb_channel_credentials_mark(void* p) { grpc_rb_channel_credentials* wrapper = NULL; @@ -90,6 +95,7 @@ static rb_data_type_t grpc_rb_channel_credentials_data_type = { /* Allocates ChannelCredential instances. Provides safe initial defaults for the instance fields. */ static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) { + grpc_ruby_init(); grpc_rb_channel_credentials* wrapper = ALLOC(grpc_rb_channel_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; @@ -147,8 +153,6 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE* argv, const char* pem_root_certs_cstr = NULL; MEMZERO(&key_cert_pair, grpc_ssl_pem_key_cert_pair, 1); - grpc_ruby_once_init(); - /* "03" == no mandatory arg, 3 optional */ rb_scan_args(argc, argv, "03", &pem_root_certs, &pem_private_key, &pem_cert_chain); diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index 4ba6991ef66..d10c603460c 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -52,23 +52,26 @@ typedef struct grpc_rb_compression_options { grpc_compression_options* wrapped; } grpc_rb_compression_options; -/* Destroys the compression options instances and free the - * wrapped grpc compression options. */ -static void grpc_rb_compression_options_free(void* p) { +static void grpc_rb_compression_options_free_internal(void* p) { grpc_rb_compression_options* wrapper = NULL; if (p == NULL) { return; }; wrapper = (grpc_rb_compression_options*)p; - if (wrapper->wrapped != NULL) { gpr_free(wrapper->wrapped); wrapper->wrapped = NULL; } - xfree(p); } +/* Destroys the compression options instances and free the + * wrapped grpc compression options. */ +static void grpc_rb_compression_options_free(void* p) { + grpc_rb_compression_options_free_internal(p); + grpc_ruby_shutdown(); +} + /* Ruby recognized data type for the CompressionOptions class. */ static rb_data_type_t grpc_rb_compression_options_data_type = { "grpc_compression_options", @@ -87,10 +90,9 @@ static rb_data_type_t grpc_rb_compression_options_data_type = { Allocate the wrapped grpc compression options and initialize it here too. */ static VALUE grpc_rb_compression_options_alloc(VALUE cls) { + grpc_ruby_init(); grpc_rb_compression_options* wrapper = NULL; - grpc_ruby_once_init(); - wrapper = gpr_malloc(sizeof(grpc_rb_compression_options)); wrapper->wrapped = NULL; wrapper->wrapped = gpr_malloc(sizeof(grpc_compression_options)); diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c index 281e41c9a88..c9ca14ed06a 100644 --- a/src/ruby/ext/grpc/rb_event_thread.c +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -115,6 +115,7 @@ static void grpc_rb_event_unblocking_func(void* arg) { static VALUE grpc_rb_event_thread(VALUE arg) { grpc_rb_event* event; (void)arg; + grpc_ruby_init(); while (true) { event = (grpc_rb_event*)rb_thread_call_without_gvl( grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func, @@ -128,6 +129,7 @@ static VALUE grpc_rb_event_thread(VALUE arg) { } } grpc_rb_event_queue_destroy(); + grpc_ruby_shutdown(); return Qnil; } diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 872aed0cfce..4916cee4f7c 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -276,10 +276,6 @@ static bool grpc_ruby_forked_after_init(void) { } #endif -static void grpc_rb_shutdown(void) { - if (!grpc_ruby_forked_after_init()) grpc_shutdown(); -} - /* Initialize the GRPC module structs */ /* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ @@ -298,12 +294,6 @@ VALUE sym_metadata = Qundef; static gpr_once g_once_init = GPR_ONCE_INIT; -static void grpc_ruby_once_init_internal() { - grpc_ruby_set_init_pid(); - grpc_init(); - atexit(grpc_rb_shutdown); -} - void grpc_ruby_fork_guard() { if (grpc_ruby_forked_after_init()) { rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking"); @@ -313,19 +303,7 @@ void grpc_ruby_fork_guard() { static VALUE bg_thread_init_rb_mu = Qundef; static int bg_thread_init_done = 0; -void grpc_ruby_once_init() { - /* ruby_vm_at_exit doesn't seem to be working. It would crash once every - * blue moon, and some users are getting it repeatedly. See the discussions - * - https://github.com/grpc/grpc/pull/5337 - * - https://bugs.ruby-lang.org/issues/12095 - * - * In order to still be able to handle the (unlikely) situation where the - * extension is loaded by a first Ruby VM that is subsequently destroyed, - * then loaded again by another VM within the same process, we need to - * schedule our initialization and destruction only once. - */ - gpr_once_init(&g_once_init, grpc_ruby_once_init_internal); - +static void grpc_ruby_init_threads() { // Avoid calling calling into ruby library (when creating threads here) // in gpr_once_init. In general, it appears to be unsafe to call // into the ruby library while holding a non-ruby mutex, because a gil yield @@ -339,6 +317,27 @@ void grpc_ruby_once_init() { rb_mutex_unlock(bg_thread_init_rb_mu); } +static int64_t g_grpc_ruby_init_count; + +void grpc_ruby_init() { + gpr_once_init(&g_once_init, grpc_ruby_set_init_pid); + grpc_init(); + grpc_ruby_init_threads(); + // (only gpr_log after logging has been initialized) + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_ruby_init - prev g_grpc_ruby_init_count:%" PRId64, + g_grpc_ruby_init_count++); +} + +void grpc_ruby_shutdown() { + GPR_ASSERT(g_grpc_ruby_init_count > 0); + if (!grpc_ruby_forked_after_init()) grpc_shutdown(); + gpr_log( + GPR_DEBUG, + "GRPC_RUBY: grpc_ruby_shutdown - prev g_grpc_ruby_init_count:%" PRId64, + g_grpc_ruby_init_count--); +} + void Init_grpc_c() { if (!grpc_rb_load_core()) { rb_raise(rb_eLoadError, "Couldn't find or load gRPC's dynamic C core"); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 4118435ecf7..2c4675839ac 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -67,8 +67,10 @@ VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self); /* grpc_rb_time_timeval creates a gpr_timespec from a ruby time object. */ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval); -void grpc_ruby_once_init(); - void grpc_ruby_fork_guard(); +void grpc_ruby_init(); + +void grpc_ruby_shutdown(); + #endif /* GRPC_RB_H_ */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 2931f344092..4396de1c335 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -86,8 +86,7 @@ static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) { } } -/* Destroys server instances. */ -static void grpc_rb_server_free(void* p) { +static void grpc_rb_server_free_internal(void* p) { grpc_rb_server* svr = NULL; gpr_timespec deadline; if (p == NULL) { @@ -104,6 +103,12 @@ static void grpc_rb_server_free(void* p) { xfree(p); } +/* Destroys server instances. */ +static void grpc_rb_server_free(void* p) { + grpc_rb_server_free_internal(p); + grpc_ruby_shutdown(); +} + static const rb_data_type_t grpc_rb_server_data_type = { "grpc_server", {GRPC_RB_GC_NOT_MARKED, @@ -123,6 +128,7 @@ static const rb_data_type_t grpc_rb_server_data_type = { /* Allocates grpc_rb_server instances. */ static VALUE grpc_rb_server_alloc(VALUE cls) { + grpc_ruby_init(); grpc_rb_server* wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; wrapper->destroy_done = 0; @@ -142,8 +148,6 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) { grpc_channel_args args; MEMZERO(&args, grpc_channel_args, 1); - grpc_ruby_once_init(); - cq = grpc_completion_queue_create_for_pluck(NULL); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, wrapper);