Ruby: fix timeout flake, always start RPC in threads (#26426)

pull/25860/head
Stanley Cheung 4 years ago committed by GitHub
parent 743055193a
commit f89574cc4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      src/ruby/pb/test/xds_client.rb

@ -63,6 +63,8 @@ $shutdown = false
$rpc_config = RpcConfig.new
$rpc_config.init([:UNARY_CALL], {})
# These stats are shared across threads
$thread_results = Array.new
$thread_results_mu = Mutex.new
$accumulated_stats_mu = Mutex.new
$num_rpcs_started_by_method = {}
$num_rpcs_succeeded_by_method = {}
@ -188,54 +190,28 @@ class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service
end
end
# execute 1 RPC and return remote hostname
def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
remote_peer = ""
status_code = 0
begin
op.execute
if op.metadata.key?('hostname')
remote_peer = op.metadata['hostname']
end
rescue GRPC::BadStatus => e
if fail_on_failed_rpcs
raise e
end
status_code = e.code
end
$accumulated_stats_mu.synchronize do
$accumulated_method_stats[rpc_stats_key].add_result(status_code)
if remote_peer.empty?
$num_rpcs_failed_by_method[rpc_stats_key] += 1
else
$num_rpcs_succeeded_by_method[rpc_stats_key] += 1
end
end
remote_peer
end
def execute_rpc_in_thread(op, rpc_stats_key)
def execute_rpc_in_thread(op, rpc)
Thread.new {
rpc_stats_key = rpc.to_s
remote_peer = ""
begin
op.execute
# The following should _not_ happen with the current spec
# because we are only executing RPCs in a thread if we expect it
# to be kept open, or deadline_exceeded, or dropped by the load
# balancing policy. These RPCs should not complete successfully.
# Doing this for consistency
if op.metadata.key?('hostname')
remote_peer = op.metadata['hostname']
end
$accumulated_stats_mu.synchronize do
$num_rpcs_succeeded_by_method[rpc_stats_key] += 1
$accumulated_method_stats[rpc_stats_key].add_result(0)
end
rescue GRPC::BadStatus => e
# Normal execution arrives here,
# either because of deadline_exceeded or "call dropped by load
# balancing policy"
$accumulated_stats_mu.synchronize do
$num_rpcs_failed_by_method[rpc_stats_key] += 1
$accumulated_method_stats[rpc_stats_key].add_result(e.code)
end
end
$thread_results_mu.synchronize do
$thread_results << [rpc, remote_peer]
end
}
end
@ -282,12 +258,15 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
else
raise "Unsupported rpc #{rpc}"
end
rpc_stats_key = rpc.to_s
if metadata.key?('rpc-behavior') or metadata.key?('fi_testcase')
keep_open_threads << execute_rpc_in_thread(op, rpc_stats_key)
else
results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
keep_open_threads << execute_rpc_in_thread(op, rpc)
end
# collect results from threads
$thread_results_mu.synchronize do
$thread_results.each do |r|
rpc_name, remote_peer = r
results[rpc_name] = remote_peer
end
$thread_results = Array.new
end
$watchers_mutex.synchronize do
$watchers.each do |watcher|

Loading…
Cancel
Save