Fix child process port selection in ruby end-to-end tests

pull/25442/head
Alexander Polcyn 4 years ago
parent 90b565f17a
commit 27687563d6
  1. 9
      src/ruby/end2end/channel_closing_client.rb
  2. 14
      src/ruby/end2end/channel_closing_test.rb
  3. 6
      src/ruby/end2end/channel_state_client.rb
  4. 13
      src/ruby/end2end/channel_state_test.rb
  5. 6
      src/ruby/end2end/client_memory_usage_client.rb
  6. 5
      src/ruby/end2end/client_memory_usage_test.rb
  7. 108
      src/ruby/end2end/end2end_common.rb
  8. 9
      src/ruby/end2end/forking_client_client.rb
  9. 12
      src/ruby/end2end/forking_client_test.rb
  10. 15
      src/ruby/end2end/graceful_sig_handling_client.rb
  11. 13
      src/ruby/end2end/graceful_sig_handling_test.rb
  12. 14
      src/ruby/end2end/graceful_sig_stop_client.rb
  13. 11
      src/ruby/end2end/graceful_sig_stop_test.rb
  14. 6
      src/ruby/end2end/killed_client_thread_client.rb
  15. 30
      src/ruby/end2end/killed_client_thread_test.rb
  16. 18
      src/ruby/end2end/lib/client_control_pb.rb
  17. 18
      src/ruby/end2end/lib/client_control_services_pb.rb
  18. 16
      src/ruby/end2end/lib/echo_pb.rb
  19. 2
      src/ruby/end2end/lib/echo_services_pb.rb
  20. 8
      src/ruby/end2end/protos/client_control.proto
  21. 14
      src/ruby/end2end/sig_handling_client.rb
  22. 17
      src/ruby/end2end/sig_handling_test.rb
  23. 17
      src/ruby/end2end/sig_int_during_channel_watch_client.rb
  24. 17
      src/ruby/end2end/sig_int_during_channel_watch_test.rb

@ -30,11 +30,11 @@ class ChannelClosingClientController < ClientControl::ClientController::Service
end end
def main def main
client_control_port = '' parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do |p| opts.on('--parent_controller_port=P', String) do |p|
client_control_port = p parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
@ -46,7 +46,8 @@ def main
srv = new_rpc_server_for_testing srv = new_rpc_server_for_testing
thd = Thread.new do thd = Thread.new do
srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) port = srv.add_http2_port('localhost:0', :this_port_is_insecure)
report_controller_port_to_parent(parent_controller_port, port)
srv.handle(ChannelClosingClientController.new(ch)) srv.handle(ChannelClosingClientController.new(ch))
srv.run srv.run
end end

@ -24,8 +24,8 @@ def main
server_runner = ServerRunner.new(EchoServerImpl) server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
control_stub, client_pid = start_client('channel_closing_client.rb', client_controller = ClientController.new(
server_port) 'channel_closing_client.rb', server_port)
# sleep to allow time for the client to get into # sleep to allow time for the client to get into
# the middle of a "watch connectivity state" call # the middle of a "watch connectivity state" call
sleep 3 sleep 3
@ -34,7 +34,7 @@ def main
Timeout.timeout(20) do Timeout.timeout(20) do
loop do loop do
begin begin
control_stub.shutdown(ClientControl::Void.new) client_controller.stub.shutdown(ClientControl::Void.new)
break break
rescue GRPC::BadStatus => e rescue GRPC::BadStatus => e
STDERR.puts "control_stub.shutdown RPC received error:|#{e}|. " \ STDERR.puts "control_stub.shutdown RPC received error:|#{e}|. " \
@ -42,12 +42,12 @@ def main
"so we'll retry the RPC" "so we'll retry the RPC"
end end
end end
Process.wait(client_pid) Process.wait(client_controller.client_pid)
end end
rescue Timeout::Error rescue Timeout::Error
STDERR.puts "timeout wait for client pid #{client_pid}" STDERR.puts "timeout wait for client pid #{client_controller.client_pid}"
Process.kill('SIGKILL', client_pid) Process.kill('SIGKILL', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
STDERR.puts 'killed client child' STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. It likely hangs when a ' \ raise 'Timed out waiting for client process. It likely hangs when a ' \
'channel is closed while connectivity is watched' 'channel is closed while connectivity is watched'

@ -17,15 +17,17 @@
require_relative './end2end_common' require_relative './end2end_common'
def main def main
parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do opts.on('--parent_controller_port=P', String) do |p|
STDERR.puts 'client_control_port ignored' parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
end end
end.parse! end.parse!
report_controller_port_to_parent(parent_controller_port, 0)
ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, ch = GRPC::Core::Channel.new("localhost:#{server_port}", {},
:this_channel_is_insecure) :this_channel_is_insecure)

@ -23,18 +23,19 @@ def main
server_runner = ServerRunner.new(EchoServerImpl) server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
_, client_pid = start_client('channel_state_client.rb', server_port) client_controller = ClientController.new(
'channel_state_client.rb', server_port)
# sleep to allow time for the client to get into # sleep to allow time for the client to get into
# the middle of a "watch connectivity state" call # the middle of a "watch connectivity state" call
sleep 3 sleep 3
Process.kill('SIGTERM', client_pid) Process.kill('SIGTERM', client_controller.client_pid)
begin begin
Timeout.timeout(10) { Process.wait(client_pid) } Timeout.timeout(10) { Process.wait(client_controller.client_pid) }
rescue Timeout::Error rescue Timeout::Error
STDERR.puts "timeout wait for client pid #{client_pid}" STDERR.puts "timeout wait for client pid #{client_controller.client_pid}"
Process.kill('SIGKILL', client_pid) Process.kill('SIGKILL', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
STDERR.puts 'killed client child' STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \ raise 'Timed out waiting for client process. ' \
'It likely hangs when ended abruptly' 'It likely hangs when ended abruptly'

@ -18,17 +18,19 @@ require_relative './end2end_common'
require 'objspace' require 'objspace'
def main def main
parent_controller_port = ''
server_port = '' server_port = ''
loop_count = 200 loop_count = 200
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do opts.on('--parent_controller_port=P', String) do |p|
STDERR.puts 'client_control_port ignored' parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
end end
end.parse! end.parse!
report_controller_port_to_parent(parent_controller_port, 0)
loop_count.times do loop_count.times do
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure)

@ -21,9 +21,10 @@ def main
server_runner = ServerRunner.new(EchoServerImpl) server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
_, client_pid = start_client('client_memory_usage_client.rb', server_port) client_controller = ClientController.new(
'client_memory_usage_client.rb', server_port)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
client_exit_code = $CHILD_STATUS client_exit_code = $CHILD_STATUS
if client_exit_code != 0 if client_exit_code != 0

@ -43,37 +43,11 @@ class MutableValue
end end
# GreeterServer is simple server that implements the Helloworld Greeter server. # GreeterServer is simple server that implements the Helloworld Greeter server.
# This service also has a mechanism to wait for a timeout until the first
# RPC has been received, which is useful for synchronizing between parent
# and child processes.
class EchoServerImpl < Echo::EchoServer::Service class EchoServerImpl < Echo::EchoServer::Service
def initialize
@first_rpc_received_mu = Mutex.new
@first_rpc_received_cv = ConditionVariable.new
@first_rpc_received = MutableValue.new(false)
end
# say_hello implements the SayHello rpc method. # say_hello implements the SayHello rpc method.
def echo(echo_req, _) def echo(echo_req, _)
@first_rpc_received_mu.synchronize do
@first_rpc_received.value = true
@first_rpc_received_cv.broadcast
end
Echo::EchoReply.new(response: echo_req.request) Echo::EchoReply.new(response: echo_req.request)
end end
def wait_for_first_rpc_received(timeout_seconds)
Timeout.timeout(timeout_seconds) do
@first_rpc_received_mu.synchronize do
until @first_rpc_received.value
@first_rpc_received_cv.wait(@first_rpc_received_mu)
end
end
end
rescue => e
fail "Received error:|#{e}| while waiting for #{timeout_seconds} " \
'seconds to receive the first RPC'
end
end end
# ServerRunner starts an "echo server" that test clients can make calls to # ServerRunner starts an "echo server" that test clients can make calls to
@ -105,32 +79,64 @@ class ServerRunner
end end
end end
def start_client(client_main, server_port) # ClientController is used to start a child process and communicate
this_dir = File.expand_path(File.dirname(__FILE__)) # with it for test orchestration purposes via RPCs.
class ClientController < ClientControl::ParentController::Service
tmp_server = TCPServer.new(0) attr_reader :stub, :client_pid
client_control_port = tmp_server.local_address.ip_port
tmp_server.close def initialize(client_main, server_port)
this_dir = File.expand_path(File.dirname(__FILE__))
client_path = File.join(this_dir, client_main) client_path = File.join(this_dir, client_main)
client_pid = Process.spawn(RbConfig.ruby, @server = new_rpc_server_for_testing(poll_period: 3)
client_path, port = @server.add_http2_port('localhost:0', :this_port_is_insecure)
"--client_control_port=#{client_control_port}", server_thread = Thread.new do
"--server_port=#{server_port}") @server.handle(self)
control_stub = ClientControl::ClientController::Stub.new( @server.run
"localhost:#{client_control_port}", :this_channel_is_insecure) end
[control_stub, client_pid] @server.wait_till_running
end @client_controller_port_mu = Mutex.new
@client_controller_port_cv = ConditionVariable.new
def cleanup(control_stub, client_pid, server_runner) @client_controller_port = nil
control_stub.shutdown(ClientControl::Void.new) @client_pid = Process.spawn(RbConfig.ruby,
Process.wait(client_pid) client_path,
"--parent_controller_port=#{port}",
client_exit_code = $CHILD_STATUS "--server_port=#{server_port}")
begin
Timeout.timeout(10) do
@client_controller_port_mu.synchronize do
while @client_controller_port.nil?
@client_controller_port_cv.wait(@client_controller_port_mu)
end
end
end
rescue => e
fail "timeout waiting for child process to report port. error: #{e}"
end
@server.stop
server_thread.join
@stub = ClientControl::ClientController::Stub.new(
"localhost:#{@client_controller_port}", :this_channel_is_insecure)
end
if client_exit_code != 0 def set_client_controller_port(req, _)
fail "term sig test failure: client exit code: #{client_exit_code}" @client_controller_port_mu.synchronize do
unless @client_controller_port.nil?
fail 'client controller port already set'
end
@client_controller_port = req.port
@client_controller_port_cv.broadcast
end
ClientControl::Void.new
end end
end
server_runner.stop def report_controller_port_to_parent(parent_controller_port, client_controller_port)
unless parent_controller_port.to_i > 0
fail "bad parent control port: |#{parent_controller_port}|"
end
stub = ClientControl::ParentController::Stub.new(
"localhost:#{parent_controller_port.to_i}", :this_channel_is_insecure)
m = ClientControl::Port.new
m.port = client_controller_port.to_i
stub.set_client_controller_port(m, deadline: Time.now + 10)
end end

@ -19,10 +19,11 @@
require_relative './end2end_common' require_relative './end2end_common'
def main def main
parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do opts.on('--parent_controller_port=P', String) do |p|
STDERR.puts 'client control port not used' parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
@ -46,7 +47,9 @@ def main
raise 'Timed out waiting for client process. ' \ raise 'Timed out waiting for client process. ' \
'It likely hangs when using gRPC after loading it and then forking' 'It likely hangs when using gRPC after loading it and then forking'
end end
# don't report the port until now so as to not use grpc before forking
report_controller_port_to_parent(parent_controller_port, 0)
# check exit status of forked process
client_exit_code = $CHILD_STATUS client_exit_code = $CHILD_STATUS
fail "forked process failed #{client_exit_code}" if client_exit_code != 0 fail "forked process failed #{client_exit_code}" if client_exit_code != 0
end end

@ -21,17 +21,17 @@ def main
server_runner = ServerRunner.new(EchoServerImpl) server_runner = ServerRunner.new(EchoServerImpl)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
_, client_pid = start_client('forking_client_client.rb', client_controller = ClientController.new(
server_port) 'forking_client_client.rb', server_port)
begin begin
Timeout.timeout(10) do Timeout.timeout(10) do
Process.wait(client_pid) Process.wait(client_controller.client_pid)
end end
rescue Timeout::Error rescue Timeout::Error
STDERR.puts "timeout wait for client pid #{client_pid}" STDERR.puts "timeout wait for client pid #{client_controller.client_pid}"
Process.kill('SIGKILL', client_pid) Process.kill('SIGKILL', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
STDERR.puts 'killed client child' STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \ raise 'Timed out waiting for client process. ' \
'It likely hangs when requiring grpc, then forking, then using grpc ' 'It likely hangs when requiring grpc, then forking, then using grpc '

@ -30,21 +30,20 @@ class SigHandlingClientController < ClientControl::ClientController::Service
end end
def main def main
client_control_port = '' parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do |p| opts.on('--parent_controller_port=P', String) do |p|
client_control_port = p parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
end end
end.parse! end.parse!
# Allow a few seconds to be safe.
srv = new_rpc_server_for_testing srv = new_rpc_server_for_testing
srv.add_http2_port("0.0.0.0:#{client_control_port}", port = srv.add_http2_port('localhost:0',
:this_port_is_insecure) :this_port_is_insecure)
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",
:this_channel_is_insecure) :this_channel_is_insecure)
control_service = SigHandlingClientController.new(stub) control_service = SigHandlingClientController.new(stub)
@ -53,8 +52,8 @@ def main
srv.run_till_terminated_or_interrupted(['int']) srv.run_till_terminated_or_interrupted(['int'])
end end
srv.wait_till_running srv.wait_till_running
# send a first RPC to notify the parent process that we've started # notify the parent process that we're ready to receive signals
stub.echo(Echo::EchoRequest.new(request: 'client/child started')) report_controller_port_to_parent(parent_controller_port, port)
server_thread.join server_thread.join
end end

@ -25,16 +25,14 @@ def main
server_runner = ServerRunner.new(echo_service) server_runner = ServerRunner.new(echo_service)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
control_stub, client_pid = start_client('graceful_sig_handling_client.rb', server_port) client_controller = ClientController.new(
# use receipt of one RPC to indicate that the child process is 'graceful_sig_handling_client.rb', server_port)
# ready
echo_service.wait_for_first_rpc_received(20)
# now get the client to send an RPC # now get the client to send an RPC
control_stub.do_echo_rpc( client_controller.stub.do_echo_rpc(
ClientControl::DoEchoRpcRequest.new(request: 'hello')) ClientControl::DoEchoRpcRequest.new(request: 'hello'))
STDERR.puts 'killing client' STDERR.puts 'killing client'
Process.kill('SIGINT', client_pid) Process.kill('SIGINT', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
client_exit_status = $CHILD_STATUS client_exit_status = $CHILD_STATUS
if client_exit_status.exited? if client_exit_status.exited?
if client_exit_status.exitstatus != 0 if client_exit_status.exitstatus != 0
@ -47,7 +45,6 @@ def main
end end
STDERR.puts 'Client ended gracefully' STDERR.puts 'Client ended gracefully'
# no need to call cleanup, client should already be dead
server_runner.stop server_runner.stop
end end

@ -45,11 +45,11 @@ class SigHandlingClientController < ClientControl::ClientController::Service
end end
def main def main
client_control_port = '' parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do |p| opts.on('--parent_controller_port=P', String) do |p|
client_control_port = p parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
@ -59,8 +59,8 @@ def main
# The "shutdown" RPC should end very quickly. # The "shutdown" RPC should end very quickly.
# Allow a few seconds to be safe. # Allow a few seconds to be safe.
srv = new_rpc_server_for_testing(poll_period: 3) srv = new_rpc_server_for_testing(poll_period: 3)
srv.add_http2_port("0.0.0.0:#{client_control_port}", port = srv.add_http2_port('localhost:0',
:this_port_is_insecure) :this_port_is_insecure)
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",
:this_channel_is_insecure) :this_channel_is_insecure)
control_service = SigHandlingClientController.new(srv, stub) control_service = SigHandlingClientController.new(srv, stub)
@ -69,8 +69,8 @@ def main
srv.run_till_terminated_or_interrupted(['int']) srv.run_till_terminated_or_interrupted(['int'])
end end
srv.wait_till_running srv.wait_till_running
# send a first RPC to notify the parent process that we've started # notify the parent process that we're ready
stub.echo(Echo::EchoRequest.new(request: 'client/child started')) report_controller_port_to_parent(parent_controller_port, port)
server_thread.join server_thread.join
control_service.join_shutdown_thread control_service.join_shutdown_thread
end end

@ -25,11 +25,12 @@ def main
server_runner = ServerRunner.new(echo_service) server_runner = ServerRunner.new(echo_service)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
control_stub, client_pid = start_client('./graceful_sig_stop_client.rb', server_port) client_controller = ClientController.new(
# use receipt of one RPC to indicate that the child process is './graceful_sig_stop_client.rb', server_port)
# ready client_controller.stub.shutdown(ClientControl::Void.new)
echo_service.wait_for_first_rpc_received(20) Process.wait(client_controller.client_pid)
cleanup(control_stub, client_pid, server_runner) fail "client exit code: #{$CHILD_STATUS}" unless $CHILD_STATUS.to_i.zero?
server_runner.stop
end end
main main

@ -20,15 +20,17 @@
require_relative './end2end_common' require_relative './end2end_common'
def main def main
parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do opts.on('--parent_controller_port=P', String) do |p|
STDERR.puts 'client control port not used' parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
end end
end.parse! end.parse!
report_controller_port_to_parent(parent_controller_port, 0)
thd = Thread.new do thd = Thread.new do
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",

@ -35,13 +35,13 @@ end
def main def main
STDERR.puts 'start server' STDERR.puts 'start server'
client_started = false received_rpc = false
client_started_mu = Mutex.new received_rpc_mu = Mutex.new
client_started_cv = ConditionVariable.new received_rpc_cv = ConditionVariable.new
received_rpc_callback = proc do received_rpc_callback = proc do
client_started_mu.synchronize do received_rpc_mu.synchronize do
client_started = true received_rpc = true
client_started_cv.signal received_rpc_cv.signal
end end
end end
@ -52,26 +52,26 @@ def main
server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args) server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
_, client_pid = start_client('killed_client_thread_client.rb', client_controller = ClientController.new(
server_port) 'killed_client_thread_client.rb', server_port)
client_started_mu.synchronize do received_rpc_mu.synchronize do
client_started_cv.wait(client_started_mu) until client_started received_rpc_cv.wait(received_rpc_mu) until received_rpc
end end
# SIGTERM the child process now that it's # SIGTERM the child process now that it's
# in the middle of an RPC (happening on a non-main thread) # in the middle of an RPC (happening on a non-main thread)
Process.kill('SIGTERM', client_pid) Process.kill('SIGTERM', client_controller.client_pid)
STDERR.puts 'sent shutdown' STDERR.puts 'sent shutdown'
begin begin
Timeout.timeout(10) do Timeout.timeout(10) do
Process.wait(client_pid) Process.wait(client_controller.client_pid)
end end
rescue Timeout::Error rescue Timeout::Error
STDERR.puts "timeout wait for client pid #{client_pid}" STDERR.puts "timeout wait for client pid #{client_controller.client_pid}"
Process.kill('SIGKILL', client_pid) Process.kill('SIGKILL', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
STDERR.puts 'killed client child' STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \ raise 'Timed out waiting for client process. ' \
'It likely hangs when killed while in the middle of an rpc' 'It likely hangs when killed while in the middle of an rpc'

@ -4,14 +4,20 @@
require 'google/protobuf' require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "client_control.DoEchoRpcRequest" do add_file("client_control.proto", :syntax => :proto3) do
optional :request, :string, 1 add_message "client_control.DoEchoRpcRequest" do
end optional :request, :string, 1
add_message "client_control.Void" do end
add_message "client_control.Void" do
end
add_message "client_control.Port" do
optional :port, :int32, 1
end
end end
end end
module ClientControl module ClientControl
DoEchoRpcRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.DoEchoRpcRequest").msgclass DoEchoRpcRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.DoEchoRpcRequest").msgclass
Void = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Void").msgclass Void = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Void").msgclass
Port = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Port").msgclass
end end

@ -29,8 +29,22 @@ module ClientControl
self.unmarshal_class_method = :decode self.unmarshal_class_method = :decode
self.service_name = 'client_control.ClientController' self.service_name = 'client_control.ClientController'
rpc :DoEchoRpc, DoEchoRpcRequest, Void rpc :DoEchoRpc, ::ClientControl::DoEchoRpcRequest, ::ClientControl::Void
rpc :Shutdown, Void, Void rpc :Shutdown, ::ClientControl::Void, ::ClientControl::Void
end
Stub = Service.rpc_stub_class
end
module ParentController
class Service
include GRPC::GenericService
self.marshal_class_method = :encode
self.unmarshal_class_method = :decode
self.service_name = 'client_control.ParentController'
rpc :SetClientControllerPort, ::ClientControl::Port, ::ClientControl::Void
end end
Stub = Service.rpc_stub_class Stub = Service.rpc_stub_class

@ -4,15 +4,17 @@
require 'google/protobuf' require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "echo.EchoRequest" do add_file("echo.proto", :syntax => :proto3) do
optional :request, :string, 1 add_message "echo.EchoRequest" do
end optional :request, :string, 1
add_message "echo.EchoReply" do end
optional :response, :string, 1 add_message "echo.EchoReply" do
optional :response, :string, 1
end
end end
end end
module Echo module Echo
EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass EchoRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass
EchoReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoReply").msgclass EchoReply = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoReply").msgclass
end end

@ -29,7 +29,7 @@ module Echo
self.unmarshal_class_method = :decode self.unmarshal_class_method = :decode
self.service_name = 'echo.EchoServer' self.service_name = 'echo.EchoServer'
rpc :Echo, EchoRequest, EchoReply rpc :Echo, ::Echo::EchoRequest, ::Echo::EchoReply
end end
Stub = Service.rpc_stub_class Stub = Service.rpc_stub_class

@ -26,3 +26,11 @@ message DoEchoRpcRequest {
} }
message Void{} message Void{}
service ParentController {
rpc SetClientControllerPort(Port) returns (Void) {}
}
message Port {
int32 port = 1;
}

@ -45,11 +45,11 @@ class SigHandlingClientController < ClientControl::ClientController::Service
end end
def main def main
client_control_port = '' parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do |p| opts.on('--parent_controller_port=P', String) do |p|
client_control_port = p parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
@ -67,8 +67,8 @@ def main
# The "shutdown" RPC should end very quickly. # The "shutdown" RPC should end very quickly.
# Allow a few seconds to be safe. # Allow a few seconds to be safe.
srv = new_rpc_server_for_testing(poll_period: 3) srv = new_rpc_server_for_testing(poll_period: 3)
srv.add_http2_port("0.0.0.0:#{client_control_port}", port = srv.add_http2_port('localhost:0',
:this_port_is_insecure) :this_port_is_insecure)
stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", stub = Echo::EchoServer::Stub.new("localhost:#{server_port}",
:this_channel_is_insecure) :this_channel_is_insecure)
control_service = SigHandlingClientController.new(srv, stub) control_service = SigHandlingClientController.new(srv, stub)
@ -77,8 +77,8 @@ def main
srv.run srv.run
end end
srv.wait_till_running srv.wait_till_running
# send a first RPC to notify the parent process that we've started # notify the parent process that we're ready
stub.echo(Echo::EchoRequest.new(request: 'client/child started')) report_controller_port_to_parent(parent_controller_port, port)
server_thread.join server_thread.join
control_service.join_shutdown_thread control_service.join_shutdown_thread
end end

@ -25,19 +25,20 @@ def main
server_runner = ServerRunner.new(echo_service) server_runner = ServerRunner.new(echo_service)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
control_stub, client_pid = start_client('sig_handling_client.rb', server_port) client_controller = ClientController.new(
# use receipt of one RPC to indicate that the child process is 'sig_handling_client.rb', server_port)
# ready
echo_service.wait_for_first_rpc_received(20)
count = 0 count = 0
while count < 5 while count < 5
control_stub.do_echo_rpc( client_controller.stub.do_echo_rpc(
ClientControl::DoEchoRpcRequest.new(request: 'hello')) ClientControl::DoEchoRpcRequest.new(request: 'hello'))
Process.kill('SIGTERM', client_pid) Process.kill('SIGTERM', client_controller.client_pid)
Process.kill('SIGINT', client_pid) Process.kill('SIGINT', client_controller.client_pid)
count += 1 count += 1
end end
cleanup(control_stub, client_pid, server_runner) client_controller.stub.shutdown(ClientControl::Void.new)
Process.wait(client_controller.client_pid)
fail "client exit code: #{$CHILD_STATUS}" unless $CHILD_STATUS.to_i.zero?
server_runner.stop
end end
main main

@ -22,10 +22,11 @@ require_relative './end2end_common'
# middle of a blocking connectivity_state call. # middle of a blocking connectivity_state call.
def main def main
STDERR.puts 'sig_int_during_channel_watch_client.rb main' STDERR.puts 'sig_int_during_channel_watch_client.rb main'
parent_controller_port = ''
server_port = '' server_port = ''
OptionParser.new do |opts| OptionParser.new do |opts|
opts.on('--client_control_port=P', String) do opts.on('--parent_controller_port=P', String) do |p|
STDERR.puts 'client_control_port not used' parent_controller_port = p
end end
opts.on('--server_port=P', String) do |p| opts.on('--server_port=P', String) do |p|
server_port = p server_port = p
@ -34,16 +35,8 @@ def main
trap('SIGINT') { exit 0 } trap('SIGINT') { exit 0 }
STDERR.puts 'sig_int_during_channel_watch_client.rb: SIGINT trap has been set' STDERR.puts 'sig_int_during_channel_watch_client.rb: SIGINT trap has been set'
# First, notify the parent process that we're ready for a SIGINT by sending # notify the parent process that we're ready for signals
# an RPC report_controller_port_to_parent(parent_controller_port, 0)
begin
stub = Echo::EchoServer::Stub.new(
"localhost:#{server_port}", :this_channel_is_insecure)
stub.echo(ClientControl::DoEchoRpcRequest.new)
rescue => e
fail "received error:|#{e}| while sending an RPC to the parent process " \
'to indicate that the SIGINT trap has been set'
end
thd = Thread.new do thd = Thread.new do
child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}", child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",

@ -25,23 +25,20 @@ def main
server_runner = ServerRunner.new(echo_service) server_runner = ServerRunner.new(echo_service)
server_port = server_runner.run server_port = server_runner.run
STDERR.puts 'start client' STDERR.puts 'start client'
_, client_pid = start_client('sig_int_during_channel_watch_client.rb', client_controller = ClientController.new(
server_port) 'sig_int_during_channel_watch_client.rb', server_port)
# use receipt of one RPC to indicate that the child process is
# ready for a SIGINT
echo_service.wait_for_first_rpc_received(20)
# give time for the client to get into the middle # give time for the client to get into the middle
# of a channel state watch call # of a channel state watch call
sleep 1 sleep 1
Process.kill('SIGINT', client_pid) Process.kill('SIGINT', client_controller.client_pid)
begin begin
Timeout.timeout(10) do Timeout.timeout(10) do
Process.wait(client_pid) Process.wait(client_controller.client_pid)
end end
rescue Timeout::Error rescue Timeout::Error
STDERR.puts "timeout wait for client pid #{client_pid}" STDERR.puts "timeout wait for client pid #{client_controller.client_pid}"
Process.kill('SIGKILL', client_pid) Process.kill('SIGKILL', client_controller.client_pid)
Process.wait(client_pid) Process.wait(client_controller.client_pid)
STDERR.puts 'killed client child' STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. It likely hangs when a ' \ raise 'Timed out waiting for client process. It likely hangs when a ' \
'SIGINT is sent while there is an active connectivity_state call' 'SIGINT is sent while there is an active connectivity_state call'

Loading…
Cancel
Save