From e4ebd44cd666b179bc7f76174e1f349403d941e9 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 17 Dec 2020 21:51:50 +0000 Subject: [PATCH] Ruby: add support to circuit_breaking xds interop test case --- src/ruby/pb/test/xds_client.rb | 135 +++++++++++++++--- .../linux/grpc_xds_ruby_test_in_docker.sh | 6 +- tools/run_tests/run_xds_tests.py | 2 + 3 files changed, 121 insertions(+), 22 deletions(-) diff --git a/src/ruby/pb/test/xds_client.rb b/src/ruby/pb/test/xds_client.rb index 2f15c5fa522..66a34e3feea 100755 --- a/src/ruby/pb/test/xds_client.rb +++ b/src/ruby/pb/test/xds_client.rb @@ -39,11 +39,28 @@ require_relative '../src/proto/grpc/testing/empty_pb' require_relative '../src/proto/grpc/testing/messages_pb' require_relative '../src/proto/grpc/testing/test_services_pb' +# Some global constant mappings +$RPC_MAP = { + 'UnaryCall' => :UNARY_CALL, + 'EmptyCall' => :EMPTY_CALL, +} + # Some global variables to be shared by server and client $watchers = Array.new $watchers_mutex = Mutex.new $watchers_cv = ConditionVariable.new $shutdown = false +# These can be configured by the test runner dynamically +$rpcs_to_send = [:UNARY_CALL] +$metadata_to_send = {} +# These stats are shared across threads +$num_rpcs_started_by_method = {} +$num_rpcs_succeeded_by_method = {} +$num_rpcs_failed_by_method = {} +# Some RPCs are meant to be "kept open". Since Ruby does not have an +# async API, we are executing those RPCs in a thread so that they don't +# block. +$keep_open_threads = Array.new # RubyLogger defines a logger for gRPC based on the standard ruby logger. module RubyLogger @@ -71,6 +88,29 @@ def create_stub(opts) ) end +class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service + include Grpc::Testing + + def configure(req, _call) + $rpcs_to_send = req['types']; + metadata_to_send = {} + req['metadata'].each do |m| + rpc = m.type + if !metadata_to_send.key?(rpc) + metadata_to_send[rpc] = {} + end + metadata_key = m.key + metadata_value = m.value + metadata_to_send[rpc][metadata_key] = metadata_value + end + $metadata_to_send = metadata_to_send + GRPC.logger.info("Configuring new rpcs_to_send and metadata_to_send...") + GRPC.logger.info($rpcs_to_send) + GRPC.logger.info($metadata_to_send) + ClientConfigureResponse.new(); + end +end + # This implements LoadBalancerStatsService required by the test runner class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service include Grpc::Testing @@ -109,10 +149,18 @@ class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed'] ); end + + def get_client_accumulated_stats(req, _call) + LoadBalancerAccumulatedStatsResponse.new( + num_rpcs_started_by_method: $num_rpcs_started_by_method, + num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method, + num_rpcs_failed_by_method: $num_rpcs_failed_by_method + ) + end end # execute 1 RPC and return remote hostname -def execute_rpc(op, fail_on_failed_rpcs) +def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) remote_peer = "" begin op.execute @@ -120,18 +168,43 @@ def execute_rpc(op, fail_on_failed_rpcs) remote_peer = op.metadata['hostname'] end rescue GRPC::BadStatus => e - GRPC.logger.info("ruby xds: rpc failed:|#{e.message}|, " \ - "this may or may not be expected") if fail_on_failed_rpcs raise e end end + if remote_peer.empty? + $num_rpcs_failed_by_method[rpc_stats_key] += 1 + else + $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + end remote_peer end +def execute_rpc_in_thread(op, rpc_stats_key) + num_open_threads = $keep_open_threads.size + if num_open_threads % 50 == 0 + GRPC.logger.info("number of keep_open_threads = #{num_open_threads}") + end + $keep_open_threads << Thread.new { + begin + op.execute + # The following should _not_ happen with the current spec + # because we are only executing RPCs in a thread if we expect it + # to be kept open, or deadline_exceeded, or dropped by the load + # balancing policy. These RPCs should not complete successfully. + # Doing this for consistency + $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 + rescue GRPC::BadStatus => e + # Normal execution arrives here, + # either because of deadline_exceeded or "call dropped by load + # balancing policy" + $num_rpcs_failed_by_method[rpc_stats_key] += 1 + end + } +end + # send 1 rpc every 1/qps second -def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs, - rpcs_to_send, metadata_to_send) +def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) include Grpc::Testing simple_req = SimpleRequest.new() empty_req = Empty.new() @@ -141,39 +214,49 @@ def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs, sleep_seconds = target_next_start - now if sleep_seconds < 0 target_next_start = now + target_seconds_between_rpcs - GRPC.logger.info( - "ruby xds: warning, rpc takes too long to finish. " \ - "Deficit = %.1fms. " \ - "If you consistently see this, the qps is too high." \ - % [(sleep_seconds * 1000).abs().round(1)]) else target_next_start += target_seconds_between_rpcs sleep(sleep_seconds) end deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds results = {} - rpcs_to_send.each do |rpc| - metadata = metadata_to_send.key?(rpc) ? metadata_to_send[rpc] : {} - if rpc == 'UnaryCall' + $rpcs_to_send.each do |rpc| + # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here + metadata = $metadata_to_send.key?(rpc) ? $metadata_to_send[rpc] : {} + $num_rpcs_started_by_method[rpc.to_s] += 1 + num_started = $num_rpcs_started_by_method[rpc.to_s] + if num_started % 100 == 0 + GRPC.logger.info("Started #{num_started} of #{rpc}") + end + if rpc == :UNARY_CALL op = stub.unary_call(simple_req, metadata: metadata, deadline: deadline, return_op: true) - elsif rpc == 'EmptyCall' + elsif rpc == :EMPTY_CALL op = stub.empty_call(empty_req, metadata: metadata, deadline: deadline, return_op: true) else - raise "Unsupported rpc %s" % [rpc] + raise "Unsupported rpc #{rpc}" + end + rpc_stats_key = rpc.to_s + if metadata.key?('rpc-behavior') and + (metadata['rpc-behavior'] == 'keep-open') + execute_rpc_in_thread(op, rpc_stats_key) + else + results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) end - results[rpc] = execute_rpc(op, fail_on_failed_rpcs) end $watchers_mutex.synchronize do $watchers.each do |watcher| # this is counted once when each group of all rpcs_to_send were done watcher['rpcs_needed'] -= 1 results.each do |rpc_name, remote_peer| + # These stats expect rpc_name to be in the form of + # UnaryCall or EmptyCall, not the underscore-case all-caps form + rpc_name = $RPC_MAP.invert()[rpc_name] if remote_peer.strip.empty? # error is counted per individual RPC watcher['no_remote_peer'] += 1 @@ -242,18 +325,30 @@ def main s = GRPC::RpcServer.new s.add_http2_port(host, :this_port_is_insecure) s.handle(TestTarget) + s.handle(ConfigureTarget) server_thread = Thread.new { # run the server until the main test runner terminates this process s.run_till_terminated_or_interrupted(['TERM']) } - # The client just sends unary rpcs continuously in a regular interval + # Initialize stats + $RPC_MAP.values.each do |rpc| + $num_rpcs_started_by_method[rpc.to_s] = 0 + $num_rpcs_succeeded_by_method[rpc.to_s] = 0 + $num_rpcs_failed_by_method[rpc.to_s] = 0 + end + + # The client just sends rpcs continuously in a regular interval stub = create_stub(opts) target_seconds_between_rpcs = (1.0 / opts['qps'].to_f) rpcs_to_send = [] if opts['rpc'] rpcs_to_send = opts['rpc'].split(',') end + if rpcs_to_send.size > 0 + rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] } + $rpcs_to_send = rpcs_to_send + end # Convert 'metadata' input in the form of # rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3 # into @@ -271,25 +366,27 @@ def main metadata_entries = opts['metadata'].split(',') metadata_entries.each do |e| (rpc_name, metadata_key, metadata_value) = e.split(':') + rpc_name = $RPC_MAP[rpc_name] # initialize if we haven't seen this rpc_name yet if !metadata_to_send.key?(rpc_name) metadata_to_send[rpc_name] = {} end metadata_to_send[rpc_name][metadata_key] = metadata_value end + $metadata_to_send = metadata_to_send end client_threads = Array.new opts['num_channels'].times { client_threads << Thread.new { run_test_loop(stub, target_seconds_between_rpcs, - opts['fail_on_failed_rpcs'], - rpcs_to_send, metadata_to_send) + opts['fail_on_failed_rpcs']) } } server_thread.join $shutdown = true client_threads.each { |thd| thd.join } + $keep_open_threads.each { |thd| thd.join } end if __FILE__ == $0 diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh index dfacac3da90..5d03721edf2 100644 --- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh @@ -60,11 +60,11 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py (cd src/ruby && bundle && rake compile) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ - --test_case="all,path_matching,header_matching" \ + --test_case="all,path_matching,header_matching,circuit_breaking" \ --project_id=grpc-testing \ - --source_image=projects/grpc-testing/global/images/xds-test-server-2 \ + --source_image=projects/grpc-testing/global/images/xds-test-server-3 \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 58dfd00b44e..f903799dc7e 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1282,6 +1282,8 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, logger.info('UNARY_CALL reached stable state after increase (%d)', extra_backend_service_max_requests) logger.info('success') + configure_client([ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], []) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, original_backend_service, [instance_group])