[ruby] remove unnecessary background thread startup wait logic that interferes with forking (#33805)

Alternative to https://github.com/grpc/grpc/pull/33804 - this takes the
approach in
https://github.com/grpc/grpc/pull/33804#issuecomment-1645792677

Fix https://github.com/grpc/grpc/issues/33802

cc @casperisfine
pull/33849/head
apolcyn 2 years ago committed by GitHub
parent 29dd271d44
commit 7c21997dba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/ruby/end2end/prefork_postfork_loop_test.rb
  2. 54
      src/ruby/ext/grpc/rb_channel.c
  3. 2
      tools/run_tests/run_tests.py

@ -0,0 +1,44 @@
#!/usr/bin/env ruby
#
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ENV['GRPC_ENABLE_FORK_SUPPORT'] = "1"
fail "forking only supported on linux" unless RUBY_PLATFORM =~ /linux/
this_dir = File.expand_path(File.dirname(__FILE__))
protos_lib_dir = File.join(this_dir, 'lib')
grpc_lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir)
$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'grpc'
require 'end2end_common'
def main
10_000.times do
# The prefork and postfork APIs are meant to be used before and after a
# fork. So this is not technically correct usage of the API. However, the
# current implementation doesn't actually care about a "fork" call happening
# in between prefork and postfork_parent, and this is unlikely to change anytime
# soon. Also note the goal of this test is mainly to test the background thread startup
# and shutdown that happens in prefork and postfork_parent. If we were to actually
# fork in this test, it would take much longer to run.
GRPC.prefork
GRPC.postfork_parent
end
end
main

@ -97,8 +97,6 @@ static bg_watched_channel* bg_watched_channel_list_head = NULL;
static void grpc_rb_channel_try_register_connection_polling(
bg_watched_channel* bg);
static void* wait_until_channel_polling_thread_started_no_gil(void*);
static void wait_until_channel_polling_thread_started_unblocking_func(void*);
static void* channel_init_try_register_connection_polling_without_gil(
void* arg);
@ -111,7 +109,6 @@ static grpc_completion_queue* g_channel_polling_cq;
static gpr_mu global_connection_polling_mu;
static gpr_cv global_connection_polling_cv;
static int g_abort_channel_polling = 0;
static int g_channel_polling_thread_started = 0;
static gpr_once g_once_init = GPR_ONCE_INIT;
static VALUE g_channel_polling_thread = Qnil;
@ -224,12 +221,6 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
channel_init_try_register_stack stack;
grpc_ruby_fork_guard();
int stop_waiting_for_thread_start = 0;
rb_thread_call_without_gvl(
wait_until_channel_polling_thread_started_no_gil,
&stop_waiting_for_thread_start,
wait_until_channel_polling_thread_started_unblocking_func,
&stop_waiting_for_thread_start);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
@ -286,7 +277,6 @@ static void* get_state_without_gil(void* arg) {
get_state_stack* stack = (get_state_stack*)arg;
gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(g_abort_channel_polling || g_channel_polling_thread_started);
if (stack->bg->channel_destroyed) {
stack->out = GRPC_CHANNEL_SHUTDOWN;
} else {
@ -423,15 +413,8 @@ static void grpc_rb_channel_maybe_recreate_channel_after_fork(
bg_watched_channel* bg = wrapper->bg_wrapped;
if (bg->channel_destroyed) {
// There must be one ref at this point, held by the ruby-level channel
// object.
// object, drop this one here.
GPR_ASSERT(bg->refcount == 1);
// Wait for channel polling thread to re-initialize
int stop_waiting_for_thread_start = 0;
rb_thread_call_without_gvl(
wait_until_channel_polling_thread_started_no_gil,
&stop_waiting_for_thread_start,
wait_until_channel_polling_thread_started_unblocking_func,
&stop_waiting_for_thread_start);
rb_thread_call_without_gvl(channel_safe_destroy_without_gil, bg, NULL,
NULL);
// re-create C-core channel
@ -635,9 +618,6 @@ static void grpc_rb_channel_try_register_connection_polling(
bg_watched_channel* bg) {
grpc_connectivity_state conn_state;
watch_state_op* op = NULL;
GPR_ASSERT(g_channel_polling_thread_started || g_abort_channel_polling);
if (bg->refcount == 0) {
GPR_ASSERT(bg->channel_destroyed);
bg_watched_channel_list_free_and_remove(bg);
@ -647,7 +627,6 @@ static void grpc_rb_channel_try_register_connection_polling(
if (bg->channel_destroyed || g_abort_channel_polling) {
return;
}
conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
return;
@ -655,7 +634,6 @@ static void grpc_rb_channel_try_register_connection_polling(
GPR_ASSERT(bg_watched_channel_list_lookup(bg));
// prevent bg from being free'd by GC while background thread is watching it
bg->refcount++;
op = gpr_zalloc(sizeof(watch_state_op));
op->op_type = CONTINUOUS_WATCH;
op->op.continuous_watch_callback_args.bg = bg;
@ -678,9 +656,6 @@ static void* run_poll_channels_loop_no_gil(void* arg) {
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(!g_abort_channel_polling);
GPR_ASSERT(!g_channel_polling_thread_started);
g_channel_polling_thread_started = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
@ -761,31 +736,6 @@ static VALUE run_poll_channels_loop(VALUE arg) {
return Qnil;
}
static void* wait_until_channel_polling_thread_started_no_gil(void* arg) {
int* stop_waiting = (int*)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu);
while (!g_channel_polling_thread_started && !g_abort_channel_polling &&
!*stop_waiting) {
gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&global_connection_polling_mu);
return NULL;
}
static void wait_until_channel_polling_thread_started_unblocking_func(
void* arg) {
int* stop_waiting = (int*)arg;
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
"GRPC_RUBY: interrupt wait for channel polling thread to start");
*stop_waiting = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
}
static void* set_abort_channel_polling_without_gil(void* arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
@ -814,7 +764,6 @@ void grpc_rb_channel_polling_thread_start() {
gpr_once_init(&g_once_init, do_basic_init);
GPR_ASSERT(!RTEST(g_channel_polling_thread));
GPR_ASSERT(!g_abort_channel_polling);
GPR_ASSERT(!g_channel_polling_thread_started);
GPR_ASSERT(g_channel_polling_cq == NULL);
g_channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
@ -841,7 +790,6 @@ void grpc_rb_channel_polling_thread_stop() {
// we can start again later
g_channel_polling_thread = Qnil;
g_abort_channel_polling = false;
g_channel_polling_thread_started = false;
g_channel_polling_cq = NULL;
}

@ -945,6 +945,7 @@ class RubyLanguage(object):
"src/ruby/end2end/fork_test.rb",
"src/ruby/end2end/simple_fork_test.rb",
"src/ruby/end2end/prefork_without_using_grpc_test.rb",
"src/ruby/end2end/prefork_postfork_loop_test.rb",
"src/ruby/end2end/secure_fork_test.rb",
"src/ruby/end2end/bad_usage_fork_test.rb",
"src/ruby/end2end/sig_handling_test.rb",
@ -969,6 +970,7 @@ class RubyLanguage(object):
"src/ruby/end2end/secure_fork_test.rb",
"src/ruby/end2end/bad_usage_fork_test.rb",
"src/ruby/end2end/prefork_without_using_grpc_test.rb",
"src/ruby/end2end/prefork_postfork_loop_test.rb",
]:
if platform_string() == "mac":
# Skip fork tests on mac, it's only supported on linux.

Loading…
Cancel
Save