Merge pull request #17997 from apolcyn/fix_ruby_windows

Ruby: refactor init/shutdown logic to avoid using atexit; fix windows
pull/17957/head
apolcyn 6 years ago committed by GitHub
commit d0d93bdab8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      src/ruby/ext/grpc/rb_call_credentials.c
  2. 16
      src/ruby/ext/grpc/rb_channel.c
  3. 12
      src/ruby/ext/grpc/rb_channel_credentials.c
  4. 16
      src/ruby/ext/grpc/rb_compression_options.c
  5. 2
      src/ruby/ext/grpc/rb_event_thread.c
  6. 45
      src/ruby/ext/grpc/rb_grpc.c
  7. 6
      src/ruby/ext/grpc/rb_grpc.h
  8. 12
      src/ruby/ext/grpc/rb_server.c

@ -134,8 +134,7 @@ static void grpc_rb_call_credentials_plugin_destroy(void* state) {
// Not sure what needs to be done here // Not sure what needs to be done here
} }
/* Destroys the credentials instances. */ static void grpc_rb_call_credentials_free_internal(void* p) {
static void grpc_rb_call_credentials_free(void* p) {
grpc_rb_call_credentials* wrapper; grpc_rb_call_credentials* wrapper;
if (p == NULL) { if (p == NULL) {
return; return;
@ -143,10 +142,15 @@ static void grpc_rb_call_credentials_free(void* p) {
wrapper = (grpc_rb_call_credentials*)p; wrapper = (grpc_rb_call_credentials*)p;
grpc_call_credentials_release(wrapper->wrapped); grpc_call_credentials_release(wrapper->wrapped);
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
xfree(p); xfree(p);
} }
/* Destroys the credentials instances. */
static void grpc_rb_call_credentials_free(void* p) {
grpc_rb_call_credentials_free_internal(p);
grpc_ruby_shutdown();
}
/* Protects the mark object from GC */ /* Protects the mark object from GC */
static void grpc_rb_call_credentials_mark(void* p) { static void grpc_rb_call_credentials_mark(void* p) {
grpc_rb_call_credentials* wrapper = NULL; grpc_rb_call_credentials* wrapper = NULL;
@ -175,6 +179,7 @@ static rb_data_type_t grpc_rb_call_credentials_data_type = {
/* Allocates CallCredentials instances. /* Allocates CallCredentials instances.
Provides safe initial defaults for the instance fields. */ Provides safe initial defaults for the instance fields. */
static VALUE grpc_rb_call_credentials_alloc(VALUE cls) { static VALUE grpc_rb_call_credentials_alloc(VALUE cls) {
grpc_ruby_init();
grpc_rb_call_credentials* wrapper = ALLOC(grpc_rb_call_credentials); grpc_rb_call_credentials* wrapper = ALLOC(grpc_rb_call_credentials);
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
wrapper->mark = Qnil; wrapper->mark = Qnil;
@ -212,8 +217,6 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) {
grpc_call_credentials* creds = NULL; grpc_call_credentials* creds = NULL;
grpc_metadata_credentials_plugin plugin; grpc_metadata_credentials_plugin plugin;
grpc_ruby_once_init();
TypedData_Get_Struct(self, grpc_rb_call_credentials, TypedData_Get_Struct(self, grpc_rb_call_credentials,
&grpc_rb_call_credentials_data_type, wrapper); &grpc_rb_call_credentials_data_type, wrapper);

@ -143,14 +143,12 @@ static void* channel_safe_destroy_without_gil(void* arg) {
return NULL; return NULL;
} }
/* Destroys Channel instances. */ static void grpc_rb_channel_free_internal(void* p) {
static void grpc_rb_channel_free(void* p) {
grpc_rb_channel* ch = NULL; grpc_rb_channel* ch = NULL;
if (p == NULL) { if (p == NULL) {
return; return;
}; };
ch = (grpc_rb_channel*)p; ch = (grpc_rb_channel*)p;
if (ch->bg_wrapped != NULL) { if (ch->bg_wrapped != NULL) {
/* assumption made here: it's ok to directly gpr_mu_lock the global /* assumption made here: it's ok to directly gpr_mu_lock the global
* connection polling mutex because we're in a finalizer, * connection polling mutex because we're in a finalizer,
@ -159,10 +157,15 @@ static void grpc_rb_channel_free(void* p) {
grpc_rb_channel_safe_destroy(ch->bg_wrapped); grpc_rb_channel_safe_destroy(ch->bg_wrapped);
ch->bg_wrapped = NULL; ch->bg_wrapped = NULL;
} }
xfree(p); xfree(p);
} }
/* Destroys Channel instances. */
static void grpc_rb_channel_free(void* p) {
grpc_rb_channel_free_internal(p);
grpc_ruby_shutdown();
}
/* Protects the mark object from GC */ /* Protects the mark object from GC */
static void grpc_rb_channel_mark(void* p) { static void grpc_rb_channel_mark(void* p) {
grpc_rb_channel* channel = NULL; grpc_rb_channel* channel = NULL;
@ -189,6 +192,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
/* Allocates grpc_rb_channel instances. */ /* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc(VALUE cls) { static VALUE grpc_rb_channel_alloc(VALUE cls) {
grpc_ruby_init();
grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel); grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel);
wrapper->bg_wrapped = NULL; wrapper->bg_wrapped = NULL;
wrapper->credentials = Qnil; wrapper->credentials = Qnil;
@ -216,7 +220,6 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
int stop_waiting_for_thread_start = 0; int stop_waiting_for_thread_start = 0;
MEMZERO(&args, grpc_channel_args, 1); MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
grpc_ruby_fork_guard(); grpc_ruby_fork_guard();
rb_thread_call_without_gvl( rb_thread_call_without_gvl(
wait_until_channel_polling_thread_started_no_gil, wait_until_channel_polling_thread_started_no_gil,
@ -682,9 +685,10 @@ static VALUE run_poll_channels_loop(VALUE arg) {
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
"GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
grpc_ruby_init();
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
run_poll_channels_loop_unblocking_func, NULL); run_poll_channels_loop_unblocking_func, NULL);
grpc_ruby_shutdown();
return Qnil; return Qnil;
} }

@ -48,8 +48,7 @@ typedef struct grpc_rb_channel_credentials {
grpc_channel_credentials* wrapped; grpc_channel_credentials* wrapped;
} grpc_rb_channel_credentials; } grpc_rb_channel_credentials;
/* Destroys the credentials instances. */ static void grpc_rb_channel_credentials_free_internal(void* p) {
static void grpc_rb_channel_credentials_free(void* p) {
grpc_rb_channel_credentials* wrapper = NULL; grpc_rb_channel_credentials* wrapper = NULL;
if (p == NULL) { if (p == NULL) {
return; return;
@ -61,6 +60,12 @@ static void grpc_rb_channel_credentials_free(void* p) {
xfree(p); xfree(p);
} }
/* Destroys the credentials instances. */
static void grpc_rb_channel_credentials_free(void* p) {
grpc_rb_channel_credentials_free_internal(p);
grpc_ruby_shutdown();
}
/* Protects the mark object from GC */ /* Protects the mark object from GC */
static void grpc_rb_channel_credentials_mark(void* p) { static void grpc_rb_channel_credentials_mark(void* p) {
grpc_rb_channel_credentials* wrapper = NULL; grpc_rb_channel_credentials* wrapper = NULL;
@ -90,6 +95,7 @@ static rb_data_type_t grpc_rb_channel_credentials_data_type = {
/* Allocates ChannelCredential instances. /* Allocates ChannelCredential instances.
Provides safe initial defaults for the instance fields. */ Provides safe initial defaults for the instance fields. */
static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) { static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) {
grpc_ruby_init();
grpc_rb_channel_credentials* wrapper = ALLOC(grpc_rb_channel_credentials); grpc_rb_channel_credentials* wrapper = ALLOC(grpc_rb_channel_credentials);
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
wrapper->mark = Qnil; wrapper->mark = Qnil;
@ -147,8 +153,6 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE* argv,
const char* pem_root_certs_cstr = NULL; const char* pem_root_certs_cstr = NULL;
MEMZERO(&key_cert_pair, grpc_ssl_pem_key_cert_pair, 1); MEMZERO(&key_cert_pair, grpc_ssl_pem_key_cert_pair, 1);
grpc_ruby_once_init();
/* "03" == no mandatory arg, 3 optional */ /* "03" == no mandatory arg, 3 optional */
rb_scan_args(argc, argv, "03", &pem_root_certs, &pem_private_key, rb_scan_args(argc, argv, "03", &pem_root_certs, &pem_private_key,
&pem_cert_chain); &pem_cert_chain);

@ -52,23 +52,26 @@ typedef struct grpc_rb_compression_options {
grpc_compression_options* wrapped; grpc_compression_options* wrapped;
} grpc_rb_compression_options; } grpc_rb_compression_options;
/* Destroys the compression options instances and free the static void grpc_rb_compression_options_free_internal(void* p) {
* wrapped grpc compression options. */
static void grpc_rb_compression_options_free(void* p) {
grpc_rb_compression_options* wrapper = NULL; grpc_rb_compression_options* wrapper = NULL;
if (p == NULL) { if (p == NULL) {
return; return;
}; };
wrapper = (grpc_rb_compression_options*)p; wrapper = (grpc_rb_compression_options*)p;
if (wrapper->wrapped != NULL) { if (wrapper->wrapped != NULL) {
gpr_free(wrapper->wrapped); gpr_free(wrapper->wrapped);
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
} }
xfree(p); xfree(p);
} }
/* Destroys the compression options instances and free the
* wrapped grpc compression options. */
static void grpc_rb_compression_options_free(void* p) {
grpc_rb_compression_options_free_internal(p);
grpc_ruby_shutdown();
}
/* Ruby recognized data type for the CompressionOptions class. */ /* Ruby recognized data type for the CompressionOptions class. */
static rb_data_type_t grpc_rb_compression_options_data_type = { static rb_data_type_t grpc_rb_compression_options_data_type = {
"grpc_compression_options", "grpc_compression_options",
@ -87,10 +90,9 @@ static rb_data_type_t grpc_rb_compression_options_data_type = {
Allocate the wrapped grpc compression options and Allocate the wrapped grpc compression options and
initialize it here too. */ initialize it here too. */
static VALUE grpc_rb_compression_options_alloc(VALUE cls) { static VALUE grpc_rb_compression_options_alloc(VALUE cls) {
grpc_ruby_init();
grpc_rb_compression_options* wrapper = NULL; grpc_rb_compression_options* wrapper = NULL;
grpc_ruby_once_init();
wrapper = gpr_malloc(sizeof(grpc_rb_compression_options)); wrapper = gpr_malloc(sizeof(grpc_rb_compression_options));
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
wrapper->wrapped = gpr_malloc(sizeof(grpc_compression_options)); wrapper->wrapped = gpr_malloc(sizeof(grpc_compression_options));

@ -115,6 +115,7 @@ static void grpc_rb_event_unblocking_func(void* arg) {
static VALUE grpc_rb_event_thread(VALUE arg) { static VALUE grpc_rb_event_thread(VALUE arg) {
grpc_rb_event* event; grpc_rb_event* event;
(void)arg; (void)arg;
grpc_ruby_init();
while (true) { while (true) {
event = (grpc_rb_event*)rb_thread_call_without_gvl( event = (grpc_rb_event*)rb_thread_call_without_gvl(
grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func, grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
@ -128,6 +129,7 @@ static VALUE grpc_rb_event_thread(VALUE arg) {
} }
} }
grpc_rb_event_queue_destroy(); grpc_rb_event_queue_destroy();
grpc_ruby_shutdown();
return Qnil; return Qnil;
} }

@ -276,10 +276,6 @@ static bool grpc_ruby_forked_after_init(void) {
} }
#endif #endif
static void grpc_rb_shutdown(void) {
if (!grpc_ruby_forked_after_init()) grpc_shutdown();
}
/* Initialize the GRPC module structs */ /* Initialize the GRPC module structs */
/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ /* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */
@ -298,12 +294,6 @@ VALUE sym_metadata = Qundef;
static gpr_once g_once_init = GPR_ONCE_INIT; static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() {
grpc_ruby_set_init_pid();
grpc_init();
atexit(grpc_rb_shutdown);
}
void grpc_ruby_fork_guard() { void grpc_ruby_fork_guard() {
if (grpc_ruby_forked_after_init()) { if (grpc_ruby_forked_after_init()) {
rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking"); rb_raise(rb_eRuntimeError, "grpc cannot be used before and after forking");
@ -313,19 +303,7 @@ void grpc_ruby_fork_guard() {
static VALUE bg_thread_init_rb_mu = Qundef; static VALUE bg_thread_init_rb_mu = Qundef;
static int bg_thread_init_done = 0; static int bg_thread_init_done = 0;
void grpc_ruby_once_init() { static void grpc_ruby_init_threads() {
/* ruby_vm_at_exit doesn't seem to be working. It would crash once every
* blue moon, and some users are getting it repeatedly. See the discussions
* - https://github.com/grpc/grpc/pull/5337
* - https://bugs.ruby-lang.org/issues/12095
*
* In order to still be able to handle the (unlikely) situation where the
* extension is loaded by a first Ruby VM that is subsequently destroyed,
* then loaded again by another VM within the same process, we need to
* schedule our initialization and destruction only once.
*/
gpr_once_init(&g_once_init, grpc_ruby_once_init_internal);
// Avoid calling calling into ruby library (when creating threads here) // Avoid calling calling into ruby library (when creating threads here)
// in gpr_once_init. In general, it appears to be unsafe to call // in gpr_once_init. In general, it appears to be unsafe to call
// into the ruby library while holding a non-ruby mutex, because a gil yield // into the ruby library while holding a non-ruby mutex, because a gil yield
@ -339,6 +317,27 @@ void grpc_ruby_once_init() {
rb_mutex_unlock(bg_thread_init_rb_mu); rb_mutex_unlock(bg_thread_init_rb_mu);
} }
static int64_t g_grpc_ruby_init_count;
void grpc_ruby_init() {
gpr_once_init(&g_once_init, grpc_ruby_set_init_pid);
grpc_init();
grpc_ruby_init_threads();
// (only gpr_log after logging has been initialized)
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_ruby_init - prev g_grpc_ruby_init_count:%" PRId64,
g_grpc_ruby_init_count++);
}
void grpc_ruby_shutdown() {
GPR_ASSERT(g_grpc_ruby_init_count > 0);
if (!grpc_ruby_forked_after_init()) grpc_shutdown();
gpr_log(
GPR_DEBUG,
"GRPC_RUBY: grpc_ruby_shutdown - prev g_grpc_ruby_init_count:%" PRId64,
g_grpc_ruby_init_count--);
}
void Init_grpc_c() { void Init_grpc_c() {
if (!grpc_rb_load_core()) { if (!grpc_rb_load_core()) {
rb_raise(rb_eLoadError, "Couldn't find or load gRPC's dynamic C core"); rb_raise(rb_eLoadError, "Couldn't find or load gRPC's dynamic C core");

@ -67,8 +67,10 @@ VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self);
/* grpc_rb_time_timeval creates a gpr_timespec from a ruby time object. */ /* grpc_rb_time_timeval creates a gpr_timespec from a ruby time object. */
gpr_timespec grpc_rb_time_timeval(VALUE time, int interval); gpr_timespec grpc_rb_time_timeval(VALUE time, int interval);
void grpc_ruby_once_init();
void grpc_ruby_fork_guard(); void grpc_ruby_fork_guard();
void grpc_ruby_init();
void grpc_ruby_shutdown();
#endif /* GRPC_RB_H_ */ #endif /* GRPC_RB_H_ */

@ -86,8 +86,7 @@ static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
} }
} }
/* Destroys server instances. */ static void grpc_rb_server_free_internal(void* p) {
static void grpc_rb_server_free(void* p) {
grpc_rb_server* svr = NULL; grpc_rb_server* svr = NULL;
gpr_timespec deadline; gpr_timespec deadline;
if (p == NULL) { if (p == NULL) {
@ -104,6 +103,12 @@ static void grpc_rb_server_free(void* p) {
xfree(p); xfree(p);
} }
/* Destroys server instances. */
static void grpc_rb_server_free(void* p) {
grpc_rb_server_free_internal(p);
grpc_ruby_shutdown();
}
static const rb_data_type_t grpc_rb_server_data_type = { static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server", "grpc_server",
{GRPC_RB_GC_NOT_MARKED, {GRPC_RB_GC_NOT_MARKED,
@ -123,6 +128,7 @@ static const rb_data_type_t grpc_rb_server_data_type = {
/* Allocates grpc_rb_server instances. */ /* Allocates grpc_rb_server instances. */
static VALUE grpc_rb_server_alloc(VALUE cls) { static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_ruby_init();
grpc_rb_server* wrapper = ALLOC(grpc_rb_server); grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL; wrapper->wrapped = NULL;
wrapper->destroy_done = 0; wrapper->destroy_done = 0;
@ -142,8 +148,6 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
grpc_channel_args args; grpc_channel_args args;
MEMZERO(&args, grpc_channel_args, 1); MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
cq = grpc_completion_queue_create_for_pluck(NULL); cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper); wrapper);

Loading…
Cancel
Save