From 66feb8c2d99f675ff876f67314cd8175c0e71b9e Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 5 Nov 2020 11:28:55 -0800 Subject: [PATCH] Implement the basic test case for limiting max number of concurrent RPCs. --- tools/run_tests/run_xds_tests.py | 183 +++++++++++++++++++++++++++---- 1 file changed, 164 insertions(+), 19 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3a140939133..7bbec830e12 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -57,6 +57,7 @@ _TEST_CASES = [ 'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_requests_on_primary_failure', 'traffic_splitting', + 'circuit_breaking', ] # Valid test cases, but not in all. So the tests can only run manually, and # aren't enabled automatically for all languages. @@ -311,6 +312,63 @@ def get_client_stats(num_rpcs, timeout_sec): logger.debug('Invoked GetClientStats RPC to %s: %s', host, response) return response +def get_client_accumulated_stats(): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) + request = messages_pb2.LoadBalancerAccumulatedStatsRequest() + logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', + host, args.stats_port) + response = stub.GetClientAccumulatedStats(request, + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', + host, + response) + return response + +def configure_client(rpc_types, metadata): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) + request = messages_pb2.ClientConfigureRequest() + for rpc_type in rpc_types: + if rpc_type not in ['empty_call', 'unary_call']: + continue + request.types.append( + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + if rpc_type == 'empty_call' + else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL + ) + for rpc_type, md_key, md_value in metadata: + if rpc_type not in ['empty_call', 'unary_call']: + continue + md = request.metadata.add() + md.type = ( + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + if rpc_type == 'empty_call' + else messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL + ) + md.key = md_key + md.value = md_value + logger.debug('Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', + host, + args.stats_port, + request) + stub.Configure(request, + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', host) class RpcDistributionError(Exception): pass @@ -356,6 +414,39 @@ def wait_until_all_rpcs_go_to_given_backends(backends, num_rpcs, allow_failures=False) +def wait_until_all_rpcs_fail(timeout_sec, num_rpcs): + start_time = time.time() + error_msg = None + logger.debug('Waiting for %d sec until all of next %d RPCs fail' % + (timeout_sec, num_rpcs)) + while time.time() - start_time <= timeout_sec: + error_msg = None + stats = get_client_stats(num_rpcs, timeout_sec) + diff = num_rpcs - stats.num_failures + if not diff : + error_msg = 'Unexpected completion for %d RPCs' % diff + time.sleep(2) + else: + return + raise RpcDistributionError(error_msg) + +def wait_until_rpcs_in_flight(timeout_sec, num_rpcs): + start_time = time.time() + error_msg = None + logger.debug('Waiting for %d sec until %d RPCs in-flight' % (timeout_sec, num_rpcs)) + while time.time() - start_time <= timeout_sec: + error_msg = None + stats = get_client_accumulated_stats() + rpcs_in_flight = (stats.num_rpcs_started + - stats.num_rpcs_succeeded + - stats.num_rpcs_failed) + if rpcs_in_flight < num_rpcs: + error_msg = ('Expected %d RPCs in-flight, actual: %d' % + (num_rpcs, rpcs_in_flight)) + time.sleep(2) + else: + return + raise RpcDistributionError(error_msg) def compare_distributions(actual_distribution, expected_distribution, threshold): @@ -442,7 +533,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, original_backend_instances = get_instance_names(gcp, instance_group) alternate_backend_instances = get_instance_names(gcp, same_zone_instance_group) - patch_backend_instances(gcp, alternate_backend_service, + patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group]) wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, @@ -455,7 +546,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, _WAIT_FOR_URL_MAP_PATCH_SEC) finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_gentle_failover(gcp, @@ -470,7 +561,7 @@ def test_gentle_failover(gcp, if num_primary_instances < min_instances_for_gentle_failover: resize_instance_group(gcp, primary_instance_group, min_instances_for_gentle_failover) - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) primary_instance_names = get_instance_names(gcp, primary_instance_group) @@ -506,7 +597,7 @@ def test_gentle_failover(gcp, else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) resize_instance_group(gcp, primary_instance_group, num_primary_instances) instance_names = get_instance_names(gcp, primary_instance_group) @@ -526,7 +617,7 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_group): logger.info('Running test_remove_instance_group') try: - patch_backend_instances(gcp, + patch_backend_service(gcp, backend_service, [instance_group, same_zone_instance_group], balancing_mode='RATE') @@ -556,13 +647,13 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_names, _WAIT_FOR_STATS_SEC) remaining_instance_group = instance_group remaining_instance_names = instance_names - patch_backend_instances(gcp, + patch_backend_service(gcp, backend_service, [remaining_instance_group], balancing_mode='RATE') wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, _WAIT_FOR_BACKEND_SEC) finally: - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) wait_until_all_rpcs_go_to_given_backends(instance_names, _WAIT_FOR_BACKEND_SEC) @@ -609,7 +700,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' ) try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -643,7 +734,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def test_secondary_locality_gets_requests_on_primary_failure( @@ -654,7 +745,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( swapped_primary_and_secondary=False): logger.info('Running secondary_locality_gets_requests_on_primary_failure') try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -688,7 +779,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def prepare_services_for_urlmap_tests(gcp, original_backend_service, @@ -704,7 +795,7 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, logger.info('waiting for original backends to become healthy') wait_for_healthy_backends(gcp, original_backend_service, instance_group) - patch_backend_instances(gcp, alternate_backend_service, + patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group]) logger.info('waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, @@ -794,7 +885,7 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_path_matching(gcp, original_backend_service, instance_group, @@ -901,7 +992,7 @@ def test_path_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_header_matching(gcp, original_backend_service, instance_group, @@ -971,8 +1062,56 @@ def test_header_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) + + +def test_circuit_breaking(gcp, + original_backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group): + logger.info('Running test_circuit_breaking') + max_requests = _NUM_TEST_RPCS + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + try: + # Switch to a new backend_service configured with circuit breakers. + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + patch_url_map_backend_service(gcp, alternate_backend_service) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_URL_MAP_PATCH_SEC) + + # Make unary calls open. + configure_client(rpc_types=['unary_call'], + metadata=[('unary_call', 'rpc-behavior', 'keep-open')]) + wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), + _NUM_TEST_RPCS) + _assert_rpcs_in_flight(max_requests) + + # Increment circuit breakers max_requests threshold. + max_requests = _NUM_TEST_RPCS * 2 + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': max_requests}) + wait_until_rpcs_in_flight(int(_WAIT_FOR_STATS_SEC + max_requests / args.qps), + max_requests) + wait_until_all_rpcs_fail(int(_WAIT_FOR_STATS_SEC + _NUM_TEST_RPCS / args.qps), + _NUM_TEST_RPCS) + _assert_rpcs_in_flight(max_requests) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_service(gcp, alternate_backend_service, []) +def _assert_rpcs_in_flight(num_rpcs): + stats = get_client_accumulated_stats() + rpcs_in_flight = (stats.num_rpcs_started + - stats.num_rpcs_succeeded + - stats.num_rpcs_failed) + compare_distributions([rpcs_in_flight], [num_rpcs], threshold=2) def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: @@ -1207,7 +1346,6 @@ def create_target_proxy(gcp, name): config = { 'name': name, 'url_map': gcp.url_map.url, - 'validate_for_proxyless': True, } logger.debug('Sending GCP request with body=%s', config) result = gcp.alpha_compute.targetGrpcProxies().insert( @@ -1415,10 +1553,11 @@ def delete_instance_template(gcp): logger.info('Delete failed: %s', http_error) -def patch_backend_instances(gcp, +def patch_backend_service(gcp, backend_service, instance_groups, - balancing_mode='UTILIZATION'): + balancing_mode='UTILIZATION', + circuit_breakers=None): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: @@ -1429,6 +1568,7 @@ def patch_backend_instances(gcp, 'balancingMode': balancing_mode, 'maxRate': 1 if balancing_mode == 'RATE' else None } for instance_group in instance_groups], + 'circuitBreakers': circuit_breakers, } logger.debug('Sending GCP request with body=%s', config) result = compute_to_use.backendServices().patch( @@ -1742,7 +1882,7 @@ try: startup_script) instance_group = add_instance_group(gcp, args.zone, instance_group_name, _INSTANCE_GROUP_SIZE) - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) same_zone_instance_group = add_instance_group( gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) secondary_zone_instance_group = add_instance_group( @@ -1867,6 +2007,11 @@ try: test_header_matching(gcp, backend_service, instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'circuit_breaking': + test_circuit_breaking(gcp, backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1)