From 66feb8c2d99f675ff876f67314cd8175c0e71b9e Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 5 Nov 2020 11:28:55 -0800 Subject: [PATCH 01/18] Implement the basic test case for limiting max number of concurrent RPCs. --- tools/run_tests/run_xds_tests.py | 183 +++++++++++++++++++++++++++---- 1 file changed, 164 insertions(+), 19 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3a140939133..7bbec830e12 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -57,6 +57,7 @@ _TEST_CASES = [ 'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_requests_on_primary_failure', 'traffic_splitting', + 'circuit_breaking', ] # Valid test cases, but not in all. So the tests can only run manually, and # aren't enabled automatically for all languages. @@ -311,6 +312,63 @@ def get_client_stats(num_rpcs, timeout_sec): logger.debug('Invoked GetClientStats RPC to %s: %s', host, response) return response +def get_client_accumulated_stats(): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) + request = messages_pb2.LoadBalancerAccumulatedStatsRequest() + logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', + host, args.stats_port) + response = stub.GetClientAccumulatedStats(request, + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', + host, + response) + return response + +def configure_client(rpc_types, metadata): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) + request = messages_pb2.ClientConfigureRequest() + for rpc_type in rpc_types: + if rpc_type not in ['empty_call', 'unary_call']: + continue + request.types.append( + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + if rpc_type == 'empty_call' + else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL + ) + for rpc_type, md_key, md_value in metadata: + if rpc_type not in ['empty_call', 'unary_call']: + continue + md = request.metadata.add() + md.type = ( + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + if rpc_type == 'empty_call' + else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL + ) + md.key = md_key + md.value = md_value + logger.debug('Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', + host, + args.stats_port, + request) + stub.Configure(request, + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', host) class RpcDistributionError(Exception): pass @@ -356,6 +414,39 @@ def wait_until_all_rpcs_go_to_given_backends(backends, num_rpcs, allow_failures=False) +def wait_until_all_rpcs_fail(timeout_sec, num_rpcs): + start_time = time.time() + error_msg = None + logger.debug('Waiting for %d sec until all of next %d RPCs fail' % + (timeout_sec, num_rpcs)) + while time.time() - start_time <= timeout_sec: + error_msg = None + stats = get_client_stats(num_rpcs, timeout_sec) + diff = num_rpcs - stats.num_failures + if not diff : + error_msg = 'Unexpected completion for %d RPCs' % diff + time.sleep(2) + else: + return + raise RpcDistributionError(error_msg) + +def wait_until_rpcs_in_flight(timeout_sec, num_rpcs): + start_time = time.time() + error_msg = None + logger.debug('Waiting for %d sec until %d RPCs in-flight' % (timeout_sec, num_rpcs)) + while time.time() - start_time <= timeout_sec: + error_msg = None + stats = get_client_accumulated_stats() + rpcs_in_flight = (stats.num_rpcs_started + - stats.num_rpcs_succeeded + - stats.num_rpcs_failed) + if rpcs_in_flight < num_rpcs: + error_msg = ('Expected %d RPCs in-flight, actual: %d' % + (num_rpcs, rpcs_in_flight)) + time.sleep(2) + else: + return + raise RpcDistributionError(error_msg) def compare_distributions(actual_distribution, expected_distribution, threshold): @@ -442,7 +533,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, original_backend_instances = get_instance_names(gcp, instance_group) alternate_backend_instances = get_instance_names(gcp, same_zone_instance_group) - patch_backend_instances(gcp, alternate_backend_service, + patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group]) wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, @@ -455,7 +546,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, _WAIT_FOR_URL_MAP_PATCH_SEC) finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_gentle_failover(gcp, @@ -470,7 +561,7 @@ def test_gentle_failover(gcp, if num_primary_instances < min_instances_for_gentle_failover: resize_instance_group(gcp, primary_instance_group, min_instances_for_gentle_failover) - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) primary_instance_names = get_instance_names(gcp, primary_instance_group) @@ -506,7 +597,7 @@ def test_gentle_failover(gcp, else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) resize_instance_group(gcp, primary_instance_group, num_primary_instances) instance_names = get_instance_names(gcp, primary_instance_group) @@ -526,7 +617,7 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_group): logger.info('Running test_remove_instance_group') try: - patch_backend_instances(gcp, + patch_backend_service(gcp, backend_service, [instance_group, same_zone_instance_group], balancing_mode='RATE') @@ -556,13 +647,13 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_names, _WAIT_FOR_STATS_SEC) remaining_instance_group = instance_group remaining_instance_names = instance_names - patch_backend_instances(gcp, + patch_backend_service(gcp, backend_service, [remaining_instance_group], balancing_mode='RATE') wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, _WAIT_FOR_BACKEND_SEC) finally: - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) wait_until_all_rpcs_go_to_given_backends(instance_names, _WAIT_FOR_BACKEND_SEC) @@ -609,7 +700,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' ) try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -643,7 +734,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def test_secondary_locality_gets_requests_on_primary_failure( @@ -654,7 +745,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( swapped_primary_and_secondary=False): logger.info('Running secondary_locality_gets_requests_on_primary_failure') try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -688,7 +779,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def prepare_services_for_urlmap_tests(gcp, original_backend_service, @@ -704,7 +795,7 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, logger.info('waiting for original backends to become healthy') wait_for_healthy_backends(gcp, original_backend_service, instance_group) - patch_backend_instances(gcp, alternate_backend_service, + patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group]) logger.info('waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, @@ -794,7 +885,7 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_path_matching(gcp, original_backend_service, instance_group, @@ -901,7 +992,7 @@ def test_path_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_header_matching(gcp, original_backend_service, instance_group, @@ -971,8 +1062,56 @@ def test_header_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) + + +def test_circuit_breaking(gcp, + original_backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group): + logger.info('Running test_circuit_breaking') + max_requests = _NUM_TEST_RPCS + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + try: + # Switch to a new backend_service configured with circuit breakers. + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + patch_url_map_backend_service(gcp, alternate_backend_service) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_URL_MAP_PATCH_SEC) + + # Make unary calls open. + configure_client(rpc_types=['unary_call'], + metadata=[('unary_call', 'rpc-behavior', 'keep-open')]) + wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), + _NUM_TEST_RPCS) + _assert_rpcs_in_flight(max_requests) + + # Increment circuit breakers max_requests threshold. + max_requests = _NUM_TEST_RPCS * 2 + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) + wait_until_rpcs_in_flight(int(_WAIT_FOR_STATS_SEC + max_requests / args.qps), + max_requests) + wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), + _NUM_TEST_RPCS) + _assert_rpcs_in_flight(max_requests) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_service(gcp, alternate_backend_service, []) +def _assert_rpcs_in_flight(num_rpcs): + stats = get_client_accumulated_stats() + rpcs_in_flight = (stats.num_rpcs_started + - stats.num_rpcs_succeeded + - stats.num_rpcs_failed) + compare_distributions([rpcs_in_flight], [num_rpcs], threshold=2) def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: @@ -1207,7 +1346,6 @@ def create_target_proxy(gcp, name): config = { 'name': name, 'url_map': gcp.url_map.url, - 'validate_for_proxyless': True, } logger.debug('Sending GCP request with body=%s', config) result = gcp.alpha_compute.targetGrpcProxies().insert( @@ -1415,10 +1553,11 @@ def delete_instance_template(gcp): logger.info('Delete failed: %s', http_error) -def patch_backend_instances(gcp, +def patch_backend_service(gcp, backend_service, instance_groups, - balancing_mode='UTILIZATION'): + balancing_mode='UTILIZATION', + circuit_breakers=None): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: @@ -1429,6 +1568,7 @@ def patch_backend_instances(gcp, 'balancingMode': balancing_mode, 'maxRate': 1 if balancing_mode == 'RATE' else None } for instance_group in instance_groups], + 'circuitBreakers': circuit_breakers, } logger.debug('Sending GCP request with body=%s', config) result = compute_to_use.backendServices().patch( @@ -1742,7 +1882,7 @@ try: startup_script) instance_group = add_instance_group(gcp, args.zone, instance_group_name, _INSTANCE_GROUP_SIZE) - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) same_zone_instance_group = add_instance_group( gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) secondary_zone_instance_group = add_instance_group( @@ -1867,6 +2007,11 @@ try: test_header_matching(gcp, backend_service, instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'circuit_breaking': + test_circuit_breaking(gcp, backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1) From b221b5e16efa3004fd23b328a15146df45595e15 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 12:34:35 -0800 Subject: [PATCH 02/18] Update test description. --- doc/xds-test-descriptions.md | 83 ++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/doc/xds-test-descriptions.md b/doc/xds-test-descriptions.md index bea4a677aa5..d00cced0454 100644 --- a/doc/xds-test-descriptions.md +++ b/doc/xds-test-descriptions.md @@ -42,6 +42,47 @@ Clients should accept these arguments: * --rpc_timeout_sec=SEC * The timeout to set on all outbound RPCs. Default is 20. +### XdsUpdateClientConfigureService + +The xDS test client's behavior can be dynamically changed in the middle of tests. +This is achieved by invoking the `XdsUpdateClientConfigureService` gRPC service +on the test client. This can be useful for tests requiring special client behaviors +that are not desirable at test initialization and client warmup. The service is +defined as: + +``` +message ClientConfigureRequest { + // Type of RPCs to send. + enum RpcType { + EMPTY_CALL = 0; + UNARY_CALL = 1; + } + + // Metadata to be attached for the given type of RPCs. + message Metadata { + RpcType type = 1; + string key = 2; + string value = 3; + } + + // The types of RPCs the client sends. + repeated RpcType types = 1; + // The collection of custom metadata to be attached to RPCs sent by the client. + repeated Metadata metadata = 2; +} + +message ClientConfigureResponse {} + +service XdsUpdateClientConfigureService { + // Update the tes client's configuration. + rpc Configure(ClientConfigureRequest) returns (ClientConfigureResponse); +} +``` + +The test client changes its behavior right after receiving the +`ClientConfigureRequest`. Currently it only supports configuring the type(s) +of RPCs sent by the test client and metadata attached to each type of RPCs. + ## Test Driver Note that, unlike our other interop tests, neither the client nor the server has @@ -70,10 +111,24 @@ message LoadBalancerStatsResponse { int32 num_failures = 2; } +message LoadBalancerAccumulatedStatsRequest {} + +message LoadBalancerAccumulatedStatsResponse { + // The total number of RPCs have ever issued. + int32 num_rpcs_started = 1; + // The total number of RPCs have ever completed successfully. + int32 num_rpcs_succeeded = 2; + // The total number of RPCs have ever failed. + int32 num_rpcs_failed = 3; +} + service LoadBalancerStatsService { // Gets the backend distribution for RPCs sent by a test client. rpc GetClientStats(LoadBalancerStatsRequest) returns (LoadBalancerStatsResponse) {} + // Gets the accumulated stats for RPCs sent by a test client. + rpc GetClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest) + returns (LoadBalancerAccumulatedStatsResponse) {} } ``` @@ -331,3 +386,31 @@ Test driver asserts: 1. All backends in the primary locality receive at least 1 RPC. 1. No backends in the secondary locality receive RPCs. + +### circuit_breaking + +This test verifies that the maximum number of outstanding requests is limited +by circuit breakers of the backend service. + +Client parameters: + +1. --num_channels=1 +1. --qps=100 + +Load balancer configuration: + +1. One MIG with two backends +1. The backend service has circuit breakers of maximum concurrent requests + being 1000 + +The test driver configures the test client's behavior to keep RPCs open. + +Test driver asserts: + +1. All RPCs fail after reaching quota of 1000 RPCs in-flight. + +Update the maximum concurrent requests of the breakers backend service to 2000. + +Test driver asserts: + +1. All RPCs fail after reaching quota of 2000 RPCs in-flight. \ No newline at end of file From 6c1907aa4212027ea9c2bbd07eeb33bf7e9f97c6 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 12:51:11 -0800 Subject: [PATCH 03/18] Move circuit_breaking from _TEST_CASES to _ADDITIONAL_TEST_CASES as not all languages have the feature ready. --- tools/run_tests/run_xds_tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 7bbec830e12..ede07dd15ee 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -57,13 +57,12 @@ _TEST_CASES = [ 'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_requests_on_primary_failure', 'traffic_splitting', - 'circuit_breaking', ] # Valid test cases, but not in all. So the tests can only run manually, and # aren't enabled automatically for all languages. # # TODO: Move them into _TEST_CASES when support is ready in all languages. -_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching'] +_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching', 'circuit_breaking'] def parse_test_cases(arg): From 9a2a75f83018f3829bdd6fd03f119cb76fc47cc8 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 14:33:49 -0800 Subject: [PATCH 04/18] Directly use enum values defined in the message instead of doing string conversion. --- tools/run_tests/run_xds_tests.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index ede07dd15ee..fe49271f5b9 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -341,23 +341,10 @@ def configure_client(rpc_types, metadata): (host, args.stats_port)) as channel: stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) request = messages_pb2.ClientConfigureRequest() - for rpc_type in rpc_types: - if rpc_type not in ['empty_call', 'unary_call']: - continue - request.types.append( - messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL - if rpc_type == 'empty_call' - else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL - ) + request.types.extend(rpc_types) for rpc_type, md_key, md_value in metadata: - if rpc_type not in ['empty_call', 'unary_call']: - continue md = request.metadata.add() - md.type = ( - messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL - if rpc_type == 'empty_call' - else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL - ) + md.type = rpc_type md.key = md_key md.value = md_value logger.debug('Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', @@ -1083,14 +1070,15 @@ def test_circuit_breaking(gcp, patch_url_map_backend_service(gcp, alternate_backend_service) wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, _WAIT_FOR_URL_MAP_PATCH_SEC) - - # Make unary calls open. - configure_client(rpc_types=['unary_call'], - metadata=[('unary_call', 'rpc-behavior', 'keep-open')]) + + # Make unary calls keep-open. + configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], + [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'keep-open')]) wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), _NUM_TEST_RPCS) _assert_rpcs_in_flight(max_requests) - + # Increment circuit breakers max_requests threshold. max_requests = _NUM_TEST_RPCS * 2 patch_backend_service(gcp, alternate_backend_service, From 1746fae5658b3735eb1ac424a97d1ce26cfbc13f Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 14:42:03 -0800 Subject: [PATCH 05/18] Eliminate unnecessary wait_until_all_rpcs_fail --- tools/run_tests/run_xds_tests.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index fe49271f5b9..5ed11501cdf 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -400,21 +400,6 @@ def wait_until_all_rpcs_go_to_given_backends(backends, num_rpcs, allow_failures=False) -def wait_until_all_rpcs_fail(timeout_sec, num_rpcs): - start_time = time.time() - error_msg = None - logger.debug('Waiting for %d sec until all of next %d RPCs fail' % - (timeout_sec, num_rpcs)) - while time.time() - start_time <= timeout_sec: - error_msg = None - stats = get_client_stats(num_rpcs, timeout_sec) - diff = num_rpcs - stats.num_failures - if not diff : - error_msg = 'Unexpected completion for %d RPCs' % diff - time.sleep(2) - else: - return - raise RpcDistributionError(error_msg) def wait_until_rpcs_in_flight(timeout_sec, num_rpcs): start_time = time.time() @@ -434,6 +419,7 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs): return raise RpcDistributionError(error_msg) + def compare_distributions(actual_distribution, expected_distribution, threshold): """Compare if two distributions are similar. @@ -1075,8 +1061,7 @@ def test_circuit_breaking(gcp, configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 'rpc-behavior', 'keep-open')]) - wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), - _NUM_TEST_RPCS) + wait_until_all_rpcs_go_to_given_backends_or_fail([], _WAIT_FOR_BACKEND_SEC) _assert_rpcs_in_flight(max_requests) # Increment circuit breakers max_requests threshold. @@ -1086,8 +1071,7 @@ def test_circuit_breaking(gcp, circuit_breakers={'maxRequests': max_requests}) wait_until_rpcs_in_flight(int(_WAIT_FOR_STATS_SEC + max_requests / args.qps), max_requests) - wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), - _NUM_TEST_RPCS) + wait_until_all_rpcs_go_to_given_backends_or_fail([], _WAIT_FOR_BACKEND_SEC) _assert_rpcs_in_flight(max_requests) finally: patch_url_map_backend_service(gcp, original_backend_service) From 993050099d223e2bdd82703f42fed96329130b5d Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 14:52:57 -0800 Subject: [PATCH 06/18] Use _WAIT_FOR_BACKEND_SEC as the timeout for waiting a specific number of RPCs to be in-flight. --- tools/run_tests/run_xds_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 5ed11501cdf..ccd0db3cbc0 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1069,7 +1069,7 @@ def test_circuit_breaking(gcp, patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group], circuit_breakers={'maxRequests': max_requests}) - wait_until_rpcs_in_flight(int(_WAIT_FOR_STATS_SEC + max_requests / args.qps), + wait_until_rpcs_in_flight(_WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), max_requests) wait_until_all_rpcs_go_to_given_backends_or_fail([], _WAIT_FOR_BACKEND_SEC) _assert_rpcs_in_flight(max_requests) From 1d0bed27d6c1495c506facaea05b8189e7c07973 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 14:52:11 -0800 Subject: [PATCH 07/18] Simplify test logic by only checking the number of RPCs being in-flight. --- tools/run_tests/run_xds_tests.py | 48 ++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index ccd0db3cbc0..19e74ea4631 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -401,23 +401,41 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) -def wait_until_rpcs_in_flight(timeout_sec, num_rpcs): +def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): + '''Block until the test client reaches the state with the given number + of RPCs being outstanding. + + Args: + timeout_sec: Maximum number of seconds to wait until the desired state + is reached. + num_rpcs: Expected number of RPCs to be in-flight. + threshold: Number within [0,100], the tolerable percentage by which + the actual number of RPCs in-flight can differ from the expected number. + ''' + if threshold < 0 or threshold > 100: + raise ValueError('Value error: Threshold should be between 0 to 100') + threshold_fraction = threshold / 100.0 start_time = time.time() error_msg = None - logger.debug('Waiting for %d sec until %d RPCs in-flight' % (timeout_sec, num_rpcs)) + logger.debug('Waiting for %d sec until %d RPCs (with %d%% tolerance) in-flight' + % (timeout_sec, num_rpcs, threshold)) while time.time() - start_time <= timeout_sec: error_msg = None stats = get_client_accumulated_stats() rpcs_in_flight = (stats.num_rpcs_started - stats.num_rpcs_succeeded - stats.num_rpcs_failed) - if rpcs_in_flight < num_rpcs: - error_msg = ('Expected %d RPCs in-flight, actual: %d' % - (num_rpcs, rpcs_in_flight)) + if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): + error_msg = ('actual(%d) < expected(%d - %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) + time.sleep(2) + elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): + error_msg = ('actual(%d) > expected(%d + %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) time.sleep(2) else: return - raise RpcDistributionError(error_msg) + raise Exception(error_msg) def compare_distributions(actual_distribution, expected_distribution, @@ -1061,28 +1079,22 @@ def test_circuit_breaking(gcp, configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 'rpc-behavior', 'keep-open')]) - wait_until_all_rpcs_go_to_given_backends_or_fail([], _WAIT_FOR_BACKEND_SEC) - _assert_rpcs_in_flight(max_requests) + wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + + int(max_requests / args.qps)), + max_requests, 1) # Increment circuit breakers max_requests threshold. max_requests = _NUM_TEST_RPCS * 2 patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group], circuit_breakers={'maxRequests': max_requests}) - wait_until_rpcs_in_flight(_WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), - max_requests) - wait_until_all_rpcs_go_to_given_backends_or_fail([], _WAIT_FOR_BACKEND_SEC) - _assert_rpcs_in_flight(max_requests) + wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + + int(max_requests / args.qps)), + max_requests, 1) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, alternate_backend_service, []) -def _assert_rpcs_in_flight(num_rpcs): - stats = get_client_accumulated_stats() - rpcs_in_flight = (stats.num_rpcs_started - - stats.num_rpcs_succeeded - - stats.num_rpcs_failed) - compare_distributions([rpcs_in_flight], [num_rpcs], threshold=2) def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: From 172b59be3314f403461af5b1335acd3b0dba2e47 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 14:55:20 -0800 Subject: [PATCH 08/18] Fix formatting. --- tools/run_tests/run_xds_tests.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 19e74ea4631..d9f4ab26e20 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -524,7 +524,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, alternate_backend_instances = get_instance_names(gcp, same_zone_instance_group) patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group]) + [same_zone_instance_group]) wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) @@ -608,9 +608,9 @@ def test_remove_instance_group(gcp, backend_service, instance_group, logger.info('Running test_remove_instance_group') try: patch_backend_service(gcp, - backend_service, - [instance_group, same_zone_instance_group], - balancing_mode='RATE') + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') wait_for_healthy_backends(gcp, backend_service, instance_group) wait_for_healthy_backends(gcp, backend_service, same_zone_instance_group) @@ -638,8 +638,8 @@ def test_remove_instance_group(gcp, backend_service, instance_group, remaining_instance_group = instance_group remaining_instance_names = instance_names patch_backend_service(gcp, - backend_service, [remaining_instance_group], - balancing_mode='RATE') + backend_service, [remaining_instance_group], + balancing_mode='RATE') wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, _WAIT_FOR_BACKEND_SEC) finally: @@ -786,7 +786,7 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, wait_for_healthy_backends(gcp, original_backend_service, instance_group) patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group]) + [same_zone_instance_group]) logger.info('waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) @@ -1067,8 +1067,8 @@ def test_circuit_breaking(gcp, try: # Switch to a new backend_service configured with circuit breakers. patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group], - circuit_breakers={'maxRequests': max_requests}) + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) patch_url_map_backend_service(gcp, alternate_backend_service) @@ -1086,8 +1086,8 @@ def test_circuit_breaking(gcp, # Increment circuit breakers max_requests threshold. max_requests = _NUM_TEST_RPCS * 2 patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group], - circuit_breakers={'maxRequests': max_requests}) + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps)), max_requests, 1) @@ -1537,10 +1537,10 @@ def delete_instance_template(gcp): def patch_backend_service(gcp, - backend_service, - instance_groups, - balancing_mode='UTILIZATION', - circuit_breakers=None): + backend_service, + instance_groups, + balancing_mode='UTILIZATION', + circuit_breakers=None): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: From 4f5d6e44a691ad38f07ec71fe00e224b8d80f375 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 15:45:12 -0800 Subject: [PATCH 09/18] Formatting. --- tools/run_tests/run_xds_tests.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index d9f4ab26e20..207b78085e6 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -311,6 +311,7 @@ def get_client_stats(num_rpcs, timeout_sec): logger.debug('Invoked GetClientStats RPC to %s: %s', host, response) return response + def get_client_accumulated_stats(): if CLIENT_HOSTS: hosts = CLIENT_HOSTS @@ -331,6 +332,7 @@ def get_client_accumulated_stats(): response) return response + def configure_client(rpc_types, metadata): if CLIENT_HOSTS: hosts = CLIENT_HOSTS @@ -356,6 +358,7 @@ def configure_client(rpc_types, metadata): timeout=_CONNECTION_TIMEOUT_SEC) logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', host) + class RpcDistributionError(Exception): pass From d25d8d64a3c6d351edf5412cb0fcd711d699784e Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 15:57:20 -0800 Subject: [PATCH 10/18] Ensure number of outstanding RPCs being stable. --- tools/run_tests/run_xds_tests.py | 40 ++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 207b78085e6..f4459b2142e 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -406,7 +406,7 @@ def wait_until_all_rpcs_go_to_given_backends(backends, def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): '''Block until the test client reaches the state with the given number - of RPCs being outstanding. + of RPCs being outstanding stably. Args: timeout_sec: Maximum number of seconds to wait until the desired state @@ -423,22 +423,32 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): logger.debug('Waiting for %d sec until %d RPCs (with %d%% tolerance) in-flight' % (timeout_sec, num_rpcs, threshold)) while time.time() - start_time <= timeout_sec: - error_msg = None - stats = get_client_accumulated_stats() - rpcs_in_flight = (stats.num_rpcs_started - - stats.num_rpcs_succeeded - - stats.num_rpcs_failed) - if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): - error_msg = ('actual(%d) < expected(%d - %d%%)' % - (rpcs_in_flight, num_rpcs, threshold)) - time.sleep(2) - elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): - error_msg = ('actual(%d) > expected(%d + %d%%)' % - (rpcs_in_flight, num_rpcs, threshold)) + error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + if error_msg: time.sleep(2) else: - return - raise Exception(error_msg) + break + # Ensure the number of outstanding RPCs is stable. + if not error_msg: + time.sleep(5) + error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + if error_msg: + raise Exception(error_msg) + + +def _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction): + error_msg = None + stats = get_client_accumulated_stats() + rpcs_in_flight = (stats.num_rpcs_started + - stats.num_rpcs_succeeded + - stats.num_rpcs_failed) + if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): + error_msg = ('actual(%d) < expected(%d - %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) + elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): + error_msg = ('actual(%d) > expected(%d + %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) + return error_msg def compare_distributions(actual_distribution, expected_distribution, From c7506e9b41ca9ed444ae8d7290f9b79309e9d3ac Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 21:23:06 -0800 Subject: [PATCH 11/18] Implement advanced circuit breaking test --- tools/run_tests/run_xds_tests.py | 114 ++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index f4459b2142e..be411f16006 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -404,7 +404,7 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) -def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): +def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): '''Block until the test client reaches the state with the given number of RPCs being outstanding stably. @@ -423,7 +423,10 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): logger.debug('Waiting for %d sec until %d RPCs (with %d%% tolerance) in-flight' % (timeout_sec, num_rpcs, threshold)) while time.time() - start_time <= timeout_sec: - error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + error_msg = _check_rpcs_in_flight(rpc_type, + num_rpcs, + threshold, + threshold_fraction) if error_msg: time.sleep(2) else: @@ -431,17 +434,21 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): # Ensure the number of outstanding RPCs is stable. if not error_msg: time.sleep(5) - error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + error_msg = _check_rpcs_in_flight(rpc_type, + num_rpcs, + threshold, + threshold_fraction) if error_msg: raise Exception(error_msg) -def _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction): +def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): error_msg = None stats = get_client_accumulated_stats() - rpcs_in_flight = (stats.num_rpcs_started - - stats.num_rpcs_succeeded - - stats.num_rpcs_failed) + rpcs_started = stats.num_rpcs_started_by_method[rpc_type] + rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] + rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] + rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): error_msg = ('actual(%d) < expected(%d - %d%%)' % (rpcs_in_flight, num_rpcs, threshold)) @@ -1092,8 +1099,8 @@ def test_circuit_breaking(gcp, configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 'rpc-behavior', 'keep-open')]) - wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + - int(max_requests / args.qps)), + wait_until_rpcs_in_flight('UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), max_requests, 1) # Increment circuit breakers max_requests threshold. @@ -1101,14 +1108,99 @@ def test_circuit_breaking(gcp, patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group], circuit_breakers={'maxRequests': max_requests}) - wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + - int(max_requests / args.qps)), + wait_until_rpcs_in_flight('UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), max_requests, 1) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, alternate_backend_service, []) +def test_circuit_breaking_advanced(gcp, + original_backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group): + logger.info('Running test_circuit_breaking_advanced') + patch_backend_service(gcp, + original_backend_service, + [instance_group], + circuit_breakers={'maxRequests': 500}) + logger.info('Waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + patch_backend_service(gcp, + alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': 1000}) + logger.info('Waiting for alternate to become healthy') + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp,same_zone_instance_group) + route_rules = [ + { + 'priority': 0, + # UnaryCall -> original_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + # EmptyCall -> alternate_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' + }], + 'service': alternate_backend_service.url + }, + ] + try: + logger.info('Patching url map with %s', route_rules) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + logger.info('Waiting for traffic to go to all backends') + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + + # Make all calls keep-open. + configure_client( + [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], + [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'keep-open'), + (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + 'rpc-behavior', 'keep-open')]) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(500 / args.qps), + 500, + 1) + wait_until_rpcs_in_flight( + 'EMPTY_CALL', + _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), + 1000, + 1) + + # Increment circuit breakers max_requests threshold. + patch_backend_service(gcp, + original_backend_service, + [instance_group], + circuit_breakers={'maxRequests': 1000}) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), + 1000, + 1) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_service(gcp, original_backend_service, [instance_group]) + patch_backend_service(gcp, alternate_backend_service, []) + + + def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: health_stub = health_pb2_grpc.HealthStub(channel) From 8c24041c37ba3a63bae83f880a8b134fe5f924f4 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 21:45:23 -0800 Subject: [PATCH 12/18] Enable running circuit_breaking_test_advanced. --- tools/run_tests/run_xds_tests.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index be411f16006..c2a32306e5e 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -62,7 +62,12 @@ _TEST_CASES = [ # aren't enabled automatically for all languages. # # TODO: Move them into _TEST_CASES when support is ready in all languages. -_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching', 'circuit_breaking'] +_ADDITIONAL_TEST_CASES = [ + 'path_matching', + 'header_matching', + 'circuit_breaking', + 'circuit_breaking_advanced', +] def parse_test_cases(arg): @@ -2100,6 +2105,11 @@ try: instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'circuit_breaking_advanced': + test_circuit_breaking_advanced(gcp, backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1) From df64f452de0f45ce8d4ceae681697f286d8edca8 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 21:58:31 -0800 Subject: [PATCH 13/18] Configure client to send both unary and empty calls. --- tools/run_tests/run_xds_tests.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index c2a32306e5e..baf923cac23 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1161,6 +1161,11 @@ def test_circuit_breaking_advanced(gcp, }, ] try: + # Make client send UNARY_CALL and EMPTY_CALL. + configure_client( + [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], + []) logger.info('Patching url map with %s', route_rules) patch_url_map_backend_service(gcp, original_backend_service, From bd0b4bb367ab9177d5b7bb7ddc1831691d925561 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 23:48:32 -0800 Subject: [PATCH 14/18] Implement more advanced circuit breaking test with two backend services. --- tools/run_tests/run_xds_tests.py | 86 ++++++++++---------------------- 1 file changed, 27 insertions(+), 59 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index baf923cac23..c7bce976b68 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -66,7 +66,6 @@ _ADDITIONAL_TEST_CASES = [ 'path_matching', 'header_matching', 'circuit_breaking', - 'circuit_breaking_advanced', ] @@ -414,6 +413,8 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): of RPCs being outstanding stably. Args: + rpc_type: A string indicating the RPC method to check for. Either + 'UnaryCall' or 'EmptyCall'. timeout_sec: Maximum number of seconds to wait until the desired state is reached. num_rpcs: Expected number of RPCs to be in-flight. @@ -429,7 +430,7 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): % (timeout_sec, num_rpcs, threshold)) while time.time() - start_time <= timeout_sec: error_msg = _check_rpcs_in_flight(rpc_type, - num_rpcs, + num_rpcs, threshold, threshold_fraction) if error_msg: @@ -1086,57 +1087,24 @@ def test_circuit_breaking(gcp, alternate_backend_service, same_zone_instance_group): logger.info('Running test_circuit_breaking') - max_requests = _NUM_TEST_RPCS - alternate_backend_instances = get_instance_names(gcp, - same_zone_instance_group) - try: - # Switch to a new backend_service configured with circuit breakers. - patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group], - circuit_breakers={'maxRequests': max_requests}) - wait_for_healthy_backends(gcp, alternate_backend_service, - same_zone_instance_group) - patch_url_map_backend_service(gcp, alternate_backend_service) - wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, - _WAIT_FOR_URL_MAP_PATCH_SEC) - - # Make unary calls keep-open. - configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], - [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, - 'rpc-behavior', 'keep-open')]) - wait_until_rpcs_in_flight('UNARY_CALL', - _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), - max_requests, 1) - - # Increment circuit breakers max_requests threshold. - max_requests = _NUM_TEST_RPCS * 2 - patch_backend_service(gcp, alternate_backend_service, - [same_zone_instance_group], - circuit_breakers={'maxRequests': max_requests}) - wait_until_rpcs_in_flight('UNARY_CALL', - _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), - max_requests, 1) - finally: - patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_service(gcp, alternate_backend_service, []) - - -def test_circuit_breaking_advanced(gcp, - original_backend_service, - instance_group, - alternate_backend_service, - same_zone_instance_group): - logger.info('Running test_circuit_breaking_advanced') + original_backend_service_max_requests = 500 + alternate_backend_service_max_requests = 1000 patch_backend_service(gcp, original_backend_service, [instance_group], - circuit_breakers={'maxRequests': 500}) + circuit_breakers={ + 'maxRequests': + original_backend_service_max_requests + }) logger.info('Waiting for original backends to become healthy') wait_for_healthy_backends(gcp, original_backend_service, instance_group) patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group], - circuit_breakers={'maxRequests': 1000}) + circuit_breakers={ + 'maxRequests': + alternate_backend_service_max_requests + }) logger.info('Waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) @@ -1163,7 +1131,7 @@ def test_circuit_breaking_advanced(gcp, try: # Make client send UNARY_CALL and EMPTY_CALL. configure_client( - [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], []) logger.info('Patching url map with %s', route_rules) @@ -1185,24 +1153,30 @@ def test_circuit_breaking_advanced(gcp, 'rpc-behavior', 'keep-open')]) wait_until_rpcs_in_flight( 'UNARY_CALL', - _WAIT_FOR_BACKEND_SEC + int(500 / args.qps), - 500, + (_WAIT_FOR_BACKEND_SEC + + int(original_backend_service_max_requests / args.qps)), + original_backend_service_max_requests, 1) wait_until_rpcs_in_flight( 'EMPTY_CALL', - _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), - 1000, + (_WAIT_FOR_BACKEND_SEC + + int(alternate_backend_service_max_requests / args.qps)), + alternate_backend_service_max_requests, 1) # Increment circuit breakers max_requests threshold. + original_backend_service_max_requests = 800 patch_backend_service(gcp, original_backend_service, [instance_group], - circuit_breakers={'maxRequests': 1000}) + circuit_breakers={ + 'maxRequests': + original_backend_service_max_requests}) wait_until_rpcs_in_flight( 'UNARY_CALL', - _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), - 1000, + (_WAIT_FOR_BACKEND_SEC + + int(original_backend_service_max_requests / args.qps)), + original_backend_service_max_requests, 1) finally: patch_url_map_backend_service(gcp, original_backend_service) @@ -1210,7 +1184,6 @@ def test_circuit_breaking_advanced(gcp, patch_backend_service(gcp, alternate_backend_service, []) - def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: health_stub = health_pb2_grpc.HealthStub(channel) @@ -2110,11 +2083,6 @@ try: instance_group, alternate_backend_service, same_zone_instance_group) - elif test_case == 'circuit_breaking_advanced': - test_circuit_breaking_advanced(gcp, backend_service, - instance_group, - alternate_backend_service, - same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1) From 8d023a478ca2fd664b31c0b9a40855bd16c12a4a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 9 Nov 2020 00:01:23 -0800 Subject: [PATCH 15/18] Update test description. --- doc/xds-test-descriptions.md | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/doc/xds-test-descriptions.md b/doc/xds-test-descriptions.md index d00cced0454..dcc74073d88 100644 --- a/doc/xds-test-descriptions.md +++ b/doc/xds-test-descriptions.md @@ -114,12 +114,12 @@ message LoadBalancerStatsResponse { message LoadBalancerAccumulatedStatsRequest {} message LoadBalancerAccumulatedStatsResponse { - // The total number of RPCs have ever issued. - int32 num_rpcs_started = 1; - // The total number of RPCs have ever completed successfully. - int32 num_rpcs_succeeded = 2; - // The total number of RPCs have ever failed. - int32 num_rpcs_failed = 3; + // The total number of RPCs have ever issued for each type. + map num_rpcs_started_by_method = 1; + // The total number of RPCs have ever completed successfully for each type. + map num_rpcs_succeeded_by_method = 2; + // The total number of RPCs have ever failed for each type. + map num_rpcs_failed_by_method = 3; } service LoadBalancerStatsService { @@ -399,18 +399,25 @@ Client parameters: Load balancer configuration: -1. One MIG with two backends -1. The backend service has circuit breakers of maximum concurrent requests - being 1000 +1. Two MIGs with each having two backends. -The test driver configures the test client's behavior to keep RPCs open. +The test driver configures the backend services with: -Test driver asserts: +1. path{“/grpc.testing.TestService/UnaryCall"}: MIG_1 +1. path{“/grpc.testing.TestService/EmptyCall"}: MIG_2 +1. MIG_1 circuit_breakers with max_requests = 500 +1. MIG_2 circuit breakers with max_requests = 1000 + +The test driver configures the test client to send both UnaryCall and EmptyCall, +with all RPCs keep-open. + +Assert: -1. All RPCs fail after reaching quota of 1000 RPCs in-flight. +1. After reaching steady state, there are 500 UnaryCall RPCs in-flight +and 1000 EmptyCall RPCs in-flight. -Update the maximum concurrent requests of the breakers backend service to 2000. +The test driver updates MIG_1's circuit breakers with max_request = 800. Test driver asserts: -1. All RPCs fail after reaching quota of 2000 RPCs in-flight. \ No newline at end of file +1. After reaching steady state, there are 800 UnaryCall RPCs in-flight. \ No newline at end of file From 003ce563f4b0d5549f84ed2869c851b1847656d5 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 10 Nov 2020 10:45:21 -0800 Subject: [PATCH 16/18] Only disable config validation for circuit breaking test. --- tools/run_tests/run_xds_tests.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index c7bce976b68..13b9aee0744 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1087,6 +1087,12 @@ def test_circuit_breaking(gcp, alternate_backend_service, same_zone_instance_group): logger.info('Running test_circuit_breaking') + # The config validation for proxyless doesn't allow setting + # circuit_breakers. Disable validate validate_for_proxyless + # for this test. This can be removed when validation + # accepts circuit_breakers. + logger.info('disabling validate_for_proxyless in target proxy') + set_validate_for_proxyless(gcp, False) original_backend_service_max_requests = 500 alternate_backend_service_max_requests = 1000 patch_backend_service(gcp, @@ -1182,6 +1188,22 @@ def test_circuit_breaking(gcp, patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, original_backend_service, [instance_group]) patch_backend_service(gcp, alternate_backend_service, []) + set_validate_for_proxyless(gcp, True) + + +def set_validate_for_proxyless(gcp, validate_for_proxyless): + if not gcp.alpha_compute: + logger.debug( + 'Not setting validateForProxy because alpha is not enabled') + return + # This function deletes global_forwarding_rule and target_proxy, then + # recreate target_proxy with validateForProxyless=False. This is necessary + # because patching target_grpc_proxy isn't supported. + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless) + create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name, + [gcp.service_port]) def get_serving_status(instance, service_port): @@ -1412,11 +1434,12 @@ def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): wait_for_global_operation(gcp, result['name']) -def create_target_proxy(gcp, name): +def create_target_proxy(gcp, name, validate_for_proxyless=True): if gcp.alpha_compute: config = { 'name': name, 'url_map': gcp.url_map.url, + 'validate_for_proxyless': validate_for_proxyless } logger.debug('Sending GCP request with body=%s', config) result = gcp.alpha_compute.targetGrpcProxies().insert( From 07ddf4eaefe67741d12987e22f1c1e6057d28b95 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 10 Nov 2020 10:46:15 -0800 Subject: [PATCH 17/18] Enhance logging messages. --- tools/run_tests/run_xds_tests.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 13b9aee0744..faa73a019e7 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -426,8 +426,8 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): threshold_fraction = threshold / 100.0 start_time = time.time() error_msg = None - logger.debug('Waiting for %d sec until %d RPCs (with %d%% tolerance) in-flight' - % (timeout_sec, num_rpcs, threshold)) + logger.debug('Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' + % (timeout_sec, num_rpcs, rpc_type, threshold)) while time.time() - start_time <= timeout_sec: error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, @@ -445,7 +445,8 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): threshold, threshold_fraction) if error_msg: - raise Exception(error_msg) + raise Exception("Wrong number of %s RPCs in-flight: %s" + % (rpc_type, error_msg)) def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): From 16d6b8044ff52ee2ef1dee496c1e455dcad730e7 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 10 Nov 2020 16:25:24 -0800 Subject: [PATCH 18/18] Fix lint, formatting. --- tools/run_tests/run_xds_tests.py | 103 +++++++++++++------------------ 1 file changed, 44 insertions(+), 59 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index faa73a019e7..0c57b06292b 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -328,12 +328,10 @@ def get_client_accumulated_stats(): request = messages_pb2.LoadBalancerAccumulatedStatsRequest() logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', host, args.stats_port) - response = stub.GetClientAccumulatedStats(request, - wait_for_ready=True, - timeout=_CONNECTION_TIMEOUT_SEC) + response = stub.GetClientAccumulatedStats( + request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', - host, - response) + host, response) return response @@ -353,14 +351,14 @@ def configure_client(rpc_types, metadata): md.type = rpc_type md.key = md_key md.value = md_value - logger.debug('Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', - host, - args.stats_port, - request) + logger.debug( + 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', + host, args.stats_port, request) stub.Configure(request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) - logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', host) + logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', + host) class RpcDistributionError(Exception): @@ -426,12 +424,11 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): threshold_fraction = threshold / 100.0 start_time = time.time() error_msg = None - logger.debug('Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' - % (timeout_sec, num_rpcs, rpc_type, threshold)) + logger.debug( + 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % + (timeout_sec, num_rpcs, rpc_type, threshold)) while time.time() - start_time <= timeout_sec: - error_msg = _check_rpcs_in_flight(rpc_type, - num_rpcs, - threshold, + error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction) if error_msg: time.sleep(2) @@ -440,13 +437,11 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): # Ensure the number of outstanding RPCs is stable. if not error_msg: time.sleep(5) - error_msg = _check_rpcs_in_flight(rpc_type, - num_rpcs, - threshold, + error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction) if error_msg: - raise Exception("Wrong number of %s RPCs in-flight: %s" - % (rpc_type, error_msg)) + raise Exception("Wrong number of %s RPCs in-flight: %s" % + (rpc_type, error_msg)) def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): @@ -1082,41 +1077,35 @@ def test_header_matching(gcp, original_backend_service, instance_group, patch_backend_service(gcp, alternate_backend_service, []) -def test_circuit_breaking(gcp, - original_backend_service, - instance_group, - alternate_backend_service, - same_zone_instance_group): +def test_circuit_breaking(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): logger.info('Running test_circuit_breaking') # The config validation for proxyless doesn't allow setting - # circuit_breakers. Disable validate validate_for_proxyless + # circuit_breakers. Disable validate validate_for_proxyless # for this test. This can be removed when validation # accepts circuit_breakers. logger.info('disabling validate_for_proxyless in target proxy') set_validate_for_proxyless(gcp, False) original_backend_service_max_requests = 500 alternate_backend_service_max_requests = 1000 - patch_backend_service(gcp, - original_backend_service, - [instance_group], - circuit_breakers={ - 'maxRequests': - original_backend_service_max_requests - }) + patch_backend_service( + gcp, + original_backend_service, [instance_group], + circuit_breakers={'maxRequests': original_backend_service_max_requests}) logger.info('Waiting for original backends to become healthy') wait_for_healthy_backends(gcp, original_backend_service, instance_group) patch_backend_service(gcp, - alternate_backend_service, - [same_zone_instance_group], + alternate_backend_service, [same_zone_instance_group], circuit_breakers={ 'maxRequests': - alternate_backend_service_max_requests + alternate_backend_service_max_requests }) logger.info('Waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) original_backend_instances = get_instance_names(gcp, instance_group) - alternate_backend_instances = get_instance_names(gcp,same_zone_instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) route_rules = [ { 'priority': 0, @@ -1137,10 +1126,10 @@ def test_circuit_breaking(gcp, ] try: # Make client send UNARY_CALL and EMPTY_CALL. - configure_client( - [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, - messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], - []) + configure_client([ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + ], []) logger.info('Patching url map with %s', route_rules) patch_url_map_backend_service(gcp, original_backend_service, @@ -1151,40 +1140,37 @@ def test_circuit_breaking(gcp, _WAIT_FOR_STATS_SEC) # Make all calls keep-open. - configure_client( - [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, - messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], - [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, - 'rpc-behavior', 'keep-open'), - (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, - 'rpc-behavior', 'keep-open')]) + configure_client([ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'keep-open'), + (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + 'rpc-behavior', 'keep-open')]) wait_until_rpcs_in_flight( 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + int(original_backend_service_max_requests / args.qps)), - original_backend_service_max_requests, - 1) + original_backend_service_max_requests, 1) wait_until_rpcs_in_flight( 'EMPTY_CALL', (_WAIT_FOR_BACKEND_SEC + int(alternate_backend_service_max_requests / args.qps)), - alternate_backend_service_max_requests, - 1) + alternate_backend_service_max_requests, 1) # Increment circuit breakers max_requests threshold. original_backend_service_max_requests = 800 patch_backend_service(gcp, - original_backend_service, - [instance_group], + original_backend_service, [instance_group], circuit_breakers={ 'maxRequests': - original_backend_service_max_requests}) + original_backend_service_max_requests + }) wait_until_rpcs_in_flight( 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + int(original_backend_service_max_requests / args.qps)), - original_backend_service_max_requests, - 1) + original_backend_service_max_requests, 1) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, original_backend_service, [instance_group]) @@ -2103,8 +2089,7 @@ try: alternate_backend_service, same_zone_instance_group) elif test_case == 'circuit_breaking': - test_circuit_breaking(gcp, backend_service, - instance_group, + test_circuit_breaking(gcp, backend_service, instance_group, alternate_backend_service, same_zone_instance_group) else: