Merge pull request #12710 from apolcyn/speedup_ruby_end2end_tests

Fix a race in ruby server shutdown and cleanup for ruby tests
pull/13264/merge
apolcyn 7 years ago committed by GitHub
commit 1972e5ce74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/ruby/end2end/channel_closing_driver.rb
  2. 7
      src/ruby/end2end/channel_state_driver.rb
  3. 6
      src/ruby/end2end/end2end_common.rb
  4. 6
      src/ruby/end2end/forking_client_driver.rb
  5. 2
      src/ruby/end2end/grpc_class_init_client.rb
  6. 56
      src/ruby/end2end/killed_client_thread_driver.rb
  7. 2
      src/ruby/end2end/multiple_killed_watching_threads_driver.rb
  8. 30
      src/ruby/end2end/sig_handling_client.rb
  9. 35
      src/ruby/end2end/sig_handling_driver.rb
  10. 4
      src/ruby/end2end/sig_int_during_channel_watch_driver.rb
  11. 37
      src/ruby/lib/grpc/generic/rpc_server.rb
  12. 12
      src/ruby/qps/worker.rb

@ -23,13 +23,11 @@ def main
STDERR.puts 'start server'
server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
STDERR.puts 'start client'
control_stub, client_pid = start_client('channel_closing_client.rb',
server_port)
# sleep to allow time for the client to get into
# the middle of a "watch connectivity state" call
sleep 3
begin

@ -22,14 +22,11 @@ def main
STDERR.puts 'start server'
server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
STDERR.puts 'start client'
_, client_pid = start_client('channel_state_client.rb', server_port)
# sleep to allow time for the client to get into
# the middle of a "watch connectivity state" call
sleep 3
Process.kill('SIGTERM', client_pid)
begin

@ -40,12 +40,13 @@ end
# ServerRunner starts an "echo server" that test clients can make calls to
class ServerRunner
def initialize(service_impl)
def initialize(service_impl, rpc_server_args: {})
@service_impl = service_impl
@rpc_server_args = rpc_server_args
end
def run
@srv = GRPC::RpcServer.new
@srv = GRPC::RpcServer.new(@rpc_server_args)
port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@srv.handle(@service_impl)
@ -75,7 +76,6 @@ def start_client(client_main, server_port)
client_path,
"--client_control_port=#{client_control_port}",
"--server_port=#{server_port}")
sleep 1
control_stub = ClientControl::ClientController::Stub.new(
"localhost:#{client_control_port}", :this_channel_is_insecure)
[control_stub, client_pid]

@ -20,12 +20,6 @@ def main
STDERR.puts 'start server'
server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
# TODO(apolcyn) Can we get rid of this sleep?
# Without it, an immediate call to the just started EchoServer
# fails with UNAVAILABLE
sleep 1
STDERR.puts 'start client'
_, client_pid = start_client('forking_client_client.rb',
server_port)

@ -54,7 +54,7 @@ def run_concurrency_stress_test(test_proc)
test_proc.call
fail 'exception thrown while child thread initing class'
fail '(expected) exception thrown while child thread initing class'
end
# default (no gc_stress and no concurrency_stress)

@ -17,56 +17,46 @@
require_relative './end2end_common'
# Service that sleeps for a long time upon receiving an 'echo request'
# Also, this notifies @call_started_cv once it has received a request.
# Also, this calls it's callback upon receiving an RPC as a method
# of synchronization/waiting for the child to start.
class SleepingEchoServerImpl < Echo::EchoServer::Service
def initialize(call_started, call_started_mu, call_started_cv)
@call_started = call_started
@call_started_mu = call_started_mu
@call_started_cv = call_started_cv
def initialize(received_rpc_callback)
@received_rpc_callback = received_rpc_callback
end
def echo(echo_req, _)
@call_started_mu.synchronize do
@call_started.set_true
@call_started_cv.signal
end
sleep 1000
@received_rpc_callback.call
# sleep forever to get the client stuck waiting
sleep
Echo::EchoReply.new(response: echo_req.request)
end
end
# Mutable boolean
class BoolHolder
attr_reader :val
def init
@val = false
end
def set_true
@val = true
end
end
def main
STDERR.puts 'start server'
call_started = BoolHolder.new
call_started_mu = Mutex.new
call_started_cv = ConditionVariable.new
client_started = false
client_started_mu = Mutex.new
client_started_cv = ConditionVariable.new
received_rpc_callback = proc do
client_started_mu.synchronize do
client_started = true
client_started_cv.signal
end
end
service_impl = SleepingEchoServerImpl.new(call_started,
call_started_mu,
call_started_cv)
server_runner = ServerRunner.new(service_impl)
service_impl = SleepingEchoServerImpl.new(received_rpc_callback)
# RPCs against the server will all be hanging, so kill thread
# pool workers immediately rather than after waiting for a second.
rpc_server_args = { poll_period: 0, pool_keep_alive: 0 }
server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args)
server_port = server_runner.run
STDERR.puts 'start client'
_, client_pid = start_client('killed_client_thread_client.rb',
server_port)
call_started_mu.synchronize do
call_started_cv.wait(call_started_mu) until call_started.val
client_started_mu.synchronize do
client_started_cv.wait(client_started_mu) until client_started
end
# SIGTERM the child process now that it's

@ -26,6 +26,8 @@ def watch_state(ch)
fail "non-idle state: #{state}" unless state == IDLE
ch.watch_connectivity_state(IDLE, Time.now + 360)
end
# sleep to get the thread into the middle of a
# "watch connectivity state" call
sleep 0.1
thd.kill
end

@ -30,16 +30,18 @@ class SigHandlingClientController < ClientControl::ClientController::Service
end
def shutdown(_, _)
Thread.new do
# TODO(apolcyn) There is a race between stopping the
# server and the "shutdown" rpc completing,
# See if stop method on server can end active RPC cleanly, to
# avoid this sleep.
sleep 3
# Spawn a new thread because RpcServer#stop is
# synchronous and blocks until either this RPC has finished,
# or the server's "poll_period" seconds have passed.
@shutdown_thread = Thread.new do
@srv.stop
end
ClientControl::Void.new
end
def join_shutdown_thread
@shutdown_thread.join
end
end
def main
@ -62,13 +64,23 @@ def main
STDERR.puts 'SIGINT received'
end
srv = GRPC::RpcServer.new
# The "shutdown" RPC should end very quickly.
# Allow a few seconds to be safe.
srv = GRPC::RpcServer.new(poll_period: 3)
srv.add_http2_port("0.0.0.0:#{client_control_port}",
:this_port_is_insecure)
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",
:this_channel_is_insecure)
srv.handle(SigHandlingClientController.new(srv, stub))
srv.run
control_service = SigHandlingClientController.new(srv, stub)
srv.handle(control_service)
server_thread = Thread.new do
srv.run
end
srv.wait_till_running
# send a first RPC to notify the parent process that we've started
stub.echo(Echo::EchoRequest.new(request: 'client/child started'))
server_thread.join
control_service.join_shutdown_thread
end
main

@ -19,17 +19,42 @@
require_relative './end2end_common'
# A service that calls back it's received_rpc_callback
# upon receiving an RPC. Used for synchronization/waiting
# for child process to start.
class ClientStartedService < Echo::EchoServer::Service
def initialize(received_rpc_callback)
@received_rpc_callback = received_rpc_callback
end
def echo(echo_req, _)
@received_rpc_callback.call unless @received_rpc_callback.nil?
@received_rpc_callback = nil
Echo::EchoReply.new(response: echo_req.request)
end
end
def main
STDERR.puts 'start server'
server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
client_started = false
client_started_mu = Mutex.new
client_started_cv = ConditionVariable.new
received_rpc_callback = proc do
client_started_mu.synchronize do
client_started = true
client_started_cv.signal
end
end
client_started_service = ClientStartedService.new(received_rpc_callback)
server_runner = ServerRunner.new(client_started_service)
server_port = server_runner.run
STDERR.puts 'start client'
control_stub, client_pid = start_client('sig_handling_client.rb', server_port)
sleep 1
client_started_mu.synchronize do
client_started_cv.wait(client_started_mu) until client_started
end
count = 0
while count < 5

@ -23,13 +23,9 @@ def main
STDERR.puts 'start server'
server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run
sleep 1
STDERR.puts 'start client'
_, client_pid = start_client('sig_int_during_channel_watch_client.rb',
server_port)
# give time for the client to get into the middle
# of a channel state watch call
sleep 1

@ -92,9 +92,13 @@ module GRPC
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
schedule { throw :exit } while ready_for_work?
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop
@stopped = true
loop do
break unless ready_for_work?
worker_queue = @ready_workers.pop
worker_queue << [proc { throw :exit }, []]
end
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
@ -138,7 +142,10 @@ module GRPC
end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
@ready_workers << worker_queue
@stop_mutex.synchronize do
return if @stopped
@ready_workers << worker_queue
end
end
end
end
@ -186,8 +193,13 @@ module GRPC
# * max_waiting_requests: Deprecated due to internal changes to the thread
# pool. This is still an argument for compatibility but is ignored.
#
# * poll_period: when present, the server polls for new events with this
# period
# * poll_period: The amount of time in seconds to wait for
# currently-serviced RPC's to finish before cancelling them when shutting
# down the server.
#
# * pool_keep_alive: The amount of time in seconds to wait
# for currently busy thread-pool threads to finish before
# forcing an abrupt exit to each thread.
#
# * connect_md_proc:
# when non-nil is a proc for determining metadata to to send back the client
@ -202,17 +214,18 @@ module GRPC
# intercepting server handlers to provide extra functionality.
# Interceptors are an EXPERIMENTAL API.
#
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
connect_md_proc:nil,
server_args:{},
interceptors:[])
def initialize(pool_size: DEFAULT_POOL_SIZE,
max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
poll_period: DEFAULT_POLL_PERIOD,
pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE,
connect_md_proc: nil,
server_args: {},
interceptors: [])
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool_size = pool_size
@pool = Pool.new(@pool_size)
@pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
# running_state can take 4 values: :not_started, :running, :stopping, and

@ -77,8 +77,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
Grpc::Testing::CoreResponse.new(cores: cpu_cores)
end
def quit_worker(_args, _call)
Thread.new {
sleep 3
@shutdown_thread = Thread.new {
@server.stop
}
Grpc::Testing::Void.new
@ -87,6 +86,9 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
@server = s
@server_port = sp
end
def join_shutdown_thread
@shutdown_thread.join
end
end
def main
@ -107,11 +109,13 @@ def main
# Configure any errors with client or server child threads to surface
Thread.abort_on_exception = true
s = GRPC::RpcServer.new
s = GRPC::RpcServer.new(poll_period: 3)
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)
s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i))
worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i)
s.handle(worker_service)
s.run
worker_service.join_shutdown_thread
end
main

Loading…
Cancel
Save