From 09e4bce53e3170d6a68d9973e55841109173f322 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Tue, 22 Dec 2020 02:25:48 +0000 Subject: [PATCH] Review comment fixes: need synchronization --- src/ruby/pb/test/xds_client.rb | 63 ++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/src/ruby/pb/test/xds_client.rb b/src/ruby/pb/test/xds_client.rb index a6ff9cd0ed0..2ab24b629cd 100755 --- a/src/ruby/pb/test/xds_client.rb +++ b/src/ruby/pb/test/xds_client.rb @@ -67,13 +67,10 @@ $shutdown = false $rpc_config = RpcConfig.new $rpc_config.init([:UNARY_CALL], {}) # These stats are shared across threads +$accumulated_stats_mu = Mutex.new $num_rpcs_started_by_method = {} $num_rpcs_succeeded_by_method = {} $num_rpcs_failed_by_method = {} -# Some RPCs are meant to be "kept open". Since Ruby does not have an -# async API, we are executing those RPCs in a thread so that they don't -# block. -$keep_open_threads = Array.new # RubyLogger defines a logger for gRPC based on the standard ruby logger. module RubyLogger @@ -166,11 +163,13 @@ class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service end def get_client_accumulated_stats(req, _call) - LoadBalancerAccumulatedStatsResponse.new( - num_rpcs_started_by_method: $num_rpcs_started_by_method, - num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method, - num_rpcs_failed_by_method: $num_rpcs_failed_by_method - ) + $accumulated_stats_mu.synchronize do + LoadBalancerAccumulatedStatsResponse.new( + num_rpcs_started_by_method: $num_rpcs_started_by_method, + num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method, + num_rpcs_failed_by_method: $num_rpcs_failed_by_method + ) + end end end @@ -187,20 +186,18 @@ def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) raise e end end - if remote_peer.empty? - $num_rpcs_failed_by_method[rpc_stats_key] += 1 - else - $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + $accumulated_stats_mu.synchronize do + if remote_peer.empty? + $num_rpcs_failed_by_method[rpc_stats_key] += 1 + else + $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + end end remote_peer end def execute_rpc_in_thread(op, rpc_stats_key) - num_open_threads = $keep_open_threads.size - if num_open_threads % 50 == 0 - GRPC.logger.info("number of keep_open_threads = #{num_open_threads}") - end - $keep_open_threads << Thread.new { + Thread.new { begin op.execute # The following should _not_ happen with the current spec @@ -208,12 +205,16 @@ def execute_rpc_in_thread(op, rpc_stats_key) # to be kept open, or deadline_exceeded, or dropped by the load # balancing policy. These RPCs should not complete successfully. # Doing this for consistency - $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + $accumulated_stats_mu.synchronize do + $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + end rescue GRPC::BadStatus => e # Normal execution arrives here, # either because of deadline_exceeded or "call dropped by load # balancing policy" - $num_rpcs_failed_by_method[rpc_stats_key] += 1 + $accumulated_stats_mu.synchronize do + $num_rpcs_failed_by_method[rpc_stats_key] += 1 + end end } end @@ -224,6 +225,10 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) simple_req = SimpleRequest.new() empty_req = Empty.new() target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + # Some RPCs are meant to be "kept open". Since Ruby does not have an + # async API, we are executing those RPCs in a thread so that they don't + # block. + keep_open_threads = Array.new while !$shutdown now = Process.clock_gettime(Process::CLOCK_MONOTONIC) sleep_seconds = target_next_start - now @@ -239,10 +244,12 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here metadata = $rpc_config.metadata_to_send.key?(rpc) ? $rpc_config.metadata_to_send[rpc] : {} - $num_rpcs_started_by_method[rpc.to_s] += 1 - num_started = $num_rpcs_started_by_method[rpc.to_s] - if num_started % 100 == 0 - GRPC.logger.info("Started #{num_started} of #{rpc}") + $accumulated_stats_mu.synchronize do + $num_rpcs_started_by_method[rpc.to_s] += 1 + num_started = $num_rpcs_started_by_method[rpc.to_s] + if num_started % 100 == 0 + GRPC.logger.info("Started #{num_started} of #{rpc}") + end end if rpc == :UNARY_CALL op = stub.unary_call(simple_req, @@ -260,7 +267,11 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) rpc_stats_key = rpc.to_s if metadata.key?('rpc-behavior') and (metadata['rpc-behavior'] == 'keep-open') - execute_rpc_in_thread(op, rpc_stats_key) + num_open_threads = keep_open_threads.size + if num_open_threads % 50 == 0 + GRPC.logger.info("number of keep_open_threads = #{num_open_threads}") + end + keep_open_threads << execute_rpc_in_thread(op, rpc_stats_key) else results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) end @@ -290,6 +301,7 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) $watchers_cv.broadcast end end + keep_open_threads.each { |thd| thd.join } end # Args is used to hold the command line info. @@ -403,7 +415,6 @@ def main server_thread.join $shutdown = true client_threads.each { |thd| thd.join } - $keep_open_threads.each { |thd| thd.join } end if __FILE__ == $0