From f89574cc4ba6b2a2cc81d9dd307814f463ff147a Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 9 Jun 2021 11:27:12 -0700 Subject: [PATCH] Ruby: fix timeout flake, always start RPC in threads (#26426) --- src/ruby/pb/test/xds_client.rb | 59 +++++++++++----------------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/src/ruby/pb/test/xds_client.rb b/src/ruby/pb/test/xds_client.rb index ed111b0d106..ec283b90adb 100755 --- a/src/ruby/pb/test/xds_client.rb +++ b/src/ruby/pb/test/xds_client.rb @@ -63,6 +63,8 @@ $shutdown = false $rpc_config = RpcConfig.new $rpc_config.init([:UNARY_CALL], {}) # These stats are shared across threads +$thread_results = Array.new +$thread_results_mu = Mutex.new $accumulated_stats_mu = Mutex.new $num_rpcs_started_by_method = {} $num_rpcs_succeeded_by_method = {} @@ -188,54 +190,28 @@ class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service end end -# execute 1 RPC and return remote hostname -def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) - remote_peer = "" - status_code = 0 - begin - op.execute - if op.metadata.key?('hostname') - remote_peer = op.metadata['hostname'] - end - rescue GRPC::BadStatus => e - if fail_on_failed_rpcs - raise e - end - status_code = e.code - end - $accumulated_stats_mu.synchronize do - $accumulated_method_stats[rpc_stats_key].add_result(status_code) - if remote_peer.empty? - $num_rpcs_failed_by_method[rpc_stats_key] += 1 - else - $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 - end - end - remote_peer -end - -def execute_rpc_in_thread(op, rpc_stats_key) +def execute_rpc_in_thread(op, rpc) Thread.new { + rpc_stats_key = rpc.to_s + remote_peer = "" begin op.execute - # The following should _not_ happen with the current spec - # because we are only executing RPCs in a thread if we expect it - # to be kept open, or deadline_exceeded, or dropped by the load - # balancing policy. These RPCs should not complete successfully. - # Doing this for consistency + if op.metadata.key?('hostname') + remote_peer = op.metadata['hostname'] + end $accumulated_stats_mu.synchronize do $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 $accumulated_method_stats[rpc_stats_key].add_result(0) end rescue GRPC::BadStatus => e - # Normal execution arrives here, - # either because of deadline_exceeded or "call dropped by load - # balancing policy" $accumulated_stats_mu.synchronize do $num_rpcs_failed_by_method[rpc_stats_key] += 1 $accumulated_method_stats[rpc_stats_key].add_result(e.code) end end + $thread_results_mu.synchronize do + $thread_results << [rpc, remote_peer] + end } end @@ -282,12 +258,15 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) else raise "Unsupported rpc #{rpc}" end - rpc_stats_key = rpc.to_s - if metadata.key?('rpc-behavior') or metadata.key?('fi_testcase') - keep_open_threads << execute_rpc_in_thread(op, rpc_stats_key) - else - results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) + keep_open_threads << execute_rpc_in_thread(op, rpc) + end + # collect results from threads + $thread_results_mu.synchronize do + $thread_results.each do |r| + rpc_name, remote_peer = r + results[rpc_name] = remote_peer end + $thread_results = Array.new end $watchers_mutex.synchronize do $watchers.each do |watcher|