diff --git a/doc/xds-test-descriptions.md b/doc/xds-test-descriptions.md index 8365f16da02..b64431fe029 100644 --- a/doc/xds-test-descriptions.md +++ b/doc/xds-test-descriptions.md @@ -42,6 +42,47 @@ Clients should accept these arguments: * --rpc_timeout_sec=SEC * The timeout to set on all outbound RPCs. Default is 20. +### XdsUpdateClientConfigureService + +The xDS test client's behavior can be dynamically changed in the middle of tests. +This is achieved by invoking the `XdsUpdateClientConfigureService` gRPC service +on the test client. This can be useful for tests requiring special client behaviors +that are not desirable at test initialization and client warmup. The service is +defined as: + +``` +message ClientConfigureRequest { + // Type of RPCs to send. + enum RpcType { + EMPTY_CALL = 0; + UNARY_CALL = 1; + } + + // Metadata to be attached for the given type of RPCs. + message Metadata { + RpcType type = 1; + string key = 2; + string value = 3; + } + + // The types of RPCs the client sends. + repeated RpcType types = 1; + // The collection of custom metadata to be attached to RPCs sent by the client. + repeated Metadata metadata = 2; +} + +message ClientConfigureResponse {} + +service XdsUpdateClientConfigureService { + // Update the tes client's configuration. + rpc Configure(ClientConfigureRequest) returns (ClientConfigureResponse); +} +``` + +The test client changes its behavior right after receiving the +`ClientConfigureRequest`. Currently it only supports configuring the type(s) +of RPCs sent by the test client and metadata attached to each type of RPCs. + ## Test Driver Note that, unlike our other interop tests, neither the client nor the server has @@ -70,10 +111,24 @@ message LoadBalancerStatsResponse { int32 num_failures = 2; } +message LoadBalancerAccumulatedStatsRequest {} + +message LoadBalancerAccumulatedStatsResponse { + // The total number of RPCs have ever issued for each type. + map num_rpcs_started_by_method = 1; + // The total number of RPCs have ever completed successfully for each type. + map num_rpcs_succeeded_by_method = 2; + // The total number of RPCs have ever failed for each type. + map num_rpcs_failed_by_method = 3; +} + service LoadBalancerStatsService { // Gets the backend distribution for RPCs sent by a test client. rpc GetClientStats(LoadBalancerStatsRequest) returns (LoadBalancerStatsResponse) {} + // Gets the accumulated stats for RPCs sent by a test client. + rpc GetClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest) + returns (LoadBalancerAccumulatedStatsResponse) {} } ``` @@ -407,3 +462,38 @@ Test driver asserts: 1. All backends in the primary locality receive at least 1 RPC. 1. No backends in the secondary locality receive RPCs. + +### circuit_breaking + +This test verifies that the maximum number of outstanding requests is limited +by circuit breakers of the backend service. + +Client parameters: + +1. --num_channels=1 +1. --qps=100 + +Load balancer configuration: + +1. Two MIGs with each having two backends. + +The test driver configures the backend services with: + +1. path{“/grpc.testing.TestService/UnaryCall"}: MIG_1 +1. path{“/grpc.testing.TestService/EmptyCall"}: MIG_2 +1. MIG_1 circuit_breakers with max_requests = 500 +1. MIG_2 circuit breakers with max_requests = 1000 + +The test driver configures the test client to send both UnaryCall and EmptyCall, +with all RPCs keep-open. + +Assert: + +1. After reaching steady state, there are 500 UnaryCall RPCs in-flight +and 1000 EmptyCall RPCs in-flight. + +The test driver updates MIG_1's circuit breakers with max_request = 800. + +Test driver asserts: + +1. After reaching steady state, there are 800 UnaryCall RPCs in-flight. \ No newline at end of file diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3a140939133..0c57b06292b 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -62,7 +62,11 @@ _TEST_CASES = [ # aren't enabled automatically for all languages. # # TODO: Move them into _TEST_CASES when support is ready in all languages. -_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching'] +_ADDITIONAL_TEST_CASES = [ + 'path_matching', + 'header_matching', + 'circuit_breaking', +] def parse_test_cases(arg): @@ -312,6 +316,51 @@ def get_client_stats(num_rpcs, timeout_sec): return response +def get_client_accumulated_stats(): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) + request = messages_pb2.LoadBalancerAccumulatedStatsRequest() + logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', + host, args.stats_port) + response = stub.GetClientAccumulatedStats( + request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', + host, response) + return response + + +def configure_client(rpc_types, metadata): + if CLIENT_HOSTS: + hosts = CLIENT_HOSTS + else: + hosts = ['localhost'] + for host in hosts: + with grpc.insecure_channel('%s:%d' % + (host, args.stats_port)) as channel: + stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) + request = messages_pb2.ClientConfigureRequest() + request.types.extend(rpc_types) + for rpc_type, md_key, md_value in metadata: + md = request.metadata.add() + md.type = rpc_type + md.key = md_key + md.value = md_value + logger.debug( + 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', + host, args.stats_port, request) + stub.Configure(request, + wait_for_ready=True, + timeout=_CONNECTION_TIMEOUT_SEC) + logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', + host) + + class RpcDistributionError(Exception): pass @@ -357,6 +406,60 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) +def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): + '''Block until the test client reaches the state with the given number + of RPCs being outstanding stably. + + Args: + rpc_type: A string indicating the RPC method to check for. Either + 'UnaryCall' or 'EmptyCall'. + timeout_sec: Maximum number of seconds to wait until the desired state + is reached. + num_rpcs: Expected number of RPCs to be in-flight. + threshold: Number within [0,100], the tolerable percentage by which + the actual number of RPCs in-flight can differ from the expected number. + ''' + if threshold < 0 or threshold > 100: + raise ValueError('Value error: Threshold should be between 0 to 100') + threshold_fraction = threshold / 100.0 + start_time = time.time() + error_msg = None + logger.debug( + 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % + (timeout_sec, num_rpcs, rpc_type, threshold)) + while time.time() - start_time <= timeout_sec: + error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, + threshold_fraction) + if error_msg: + time.sleep(2) + else: + break + # Ensure the number of outstanding RPCs is stable. + if not error_msg: + time.sleep(5) + error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, + threshold_fraction) + if error_msg: + raise Exception("Wrong number of %s RPCs in-flight: %s" % + (rpc_type, error_msg)) + + +def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): + error_msg = None + stats = get_client_accumulated_stats() + rpcs_started = stats.num_rpcs_started_by_method[rpc_type] + rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] + rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] + rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed + if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): + error_msg = ('actual(%d) < expected(%d - %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) + elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): + error_msg = ('actual(%d) > expected(%d + %d%%)' % + (rpcs_in_flight, num_rpcs, threshold)) + return error_msg + + def compare_distributions(actual_distribution, expected_distribution, threshold): """Compare if two distributions are similar. @@ -442,8 +545,8 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, original_backend_instances = get_instance_names(gcp, instance_group) alternate_backend_instances = get_instance_names(gcp, same_zone_instance_group) - patch_backend_instances(gcp, alternate_backend_service, - [same_zone_instance_group]) + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group]) wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) @@ -455,7 +558,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, _WAIT_FOR_URL_MAP_PATCH_SEC) finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_gentle_failover(gcp, @@ -470,7 +573,7 @@ def test_gentle_failover(gcp, if num_primary_instances < min_instances_for_gentle_failover: resize_instance_group(gcp, primary_instance_group, min_instances_for_gentle_failover) - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) primary_instance_names = get_instance_names(gcp, primary_instance_group) @@ -506,7 +609,7 @@ def test_gentle_failover(gcp, else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) resize_instance_group(gcp, primary_instance_group, num_primary_instances) instance_names = get_instance_names(gcp, primary_instance_group) @@ -526,10 +629,10 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_group): logger.info('Running test_remove_instance_group') try: - patch_backend_instances(gcp, - backend_service, - [instance_group, same_zone_instance_group], - balancing_mode='RATE') + patch_backend_service(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') wait_for_healthy_backends(gcp, backend_service, instance_group) wait_for_healthy_backends(gcp, backend_service, same_zone_instance_group) @@ -556,13 +659,13 @@ def test_remove_instance_group(gcp, backend_service, instance_group, same_zone_instance_names, _WAIT_FOR_STATS_SEC) remaining_instance_group = instance_group remaining_instance_names = instance_names - patch_backend_instances(gcp, - backend_service, [remaining_instance_group], - balancing_mode='RATE') + patch_backend_service(gcp, + backend_service, [remaining_instance_group], + balancing_mode='RATE') wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, _WAIT_FOR_BACKEND_SEC) finally: - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) wait_until_all_rpcs_go_to_given_backends(instance_names, _WAIT_FOR_BACKEND_SEC) @@ -609,7 +712,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' ) try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -643,7 +746,7 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def test_secondary_locality_gets_requests_on_primary_failure( @@ -654,7 +757,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( swapped_primary_and_secondary=False): logger.info('Running secondary_locality_gets_requests_on_primary_failure') try: - patch_backend_instances( + patch_backend_service( gcp, backend_service, [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) @@ -688,7 +791,7 @@ def test_secondary_locality_gets_requests_on_primary_failure( else: raise e finally: - patch_backend_instances(gcp, backend_service, [primary_instance_group]) + patch_backend_service(gcp, backend_service, [primary_instance_group]) def prepare_services_for_urlmap_tests(gcp, original_backend_service, @@ -704,8 +807,8 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, logger.info('waiting for original backends to become healthy') wait_for_healthy_backends(gcp, original_backend_service, instance_group) - patch_backend_instances(gcp, alternate_backend_service, - [same_zone_instance_group]) + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group]) logger.info('waiting for alternate to become healthy') wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) @@ -794,7 +897,7 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_path_matching(gcp, original_backend_service, instance_group, @@ -901,7 +1004,7 @@ def test_path_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) def test_header_matching(gcp, original_backend_service, instance_group, @@ -971,7 +1074,123 @@ def test_header_matching(gcp, original_backend_service, instance_group, break finally: patch_url_map_backend_service(gcp, original_backend_service) - patch_backend_instances(gcp, alternate_backend_service, []) + patch_backend_service(gcp, alternate_backend_service, []) + + +def test_circuit_breaking(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + logger.info('Running test_circuit_breaking') + # The config validation for proxyless doesn't allow setting + # circuit_breakers. Disable validate validate_for_proxyless + # for this test. This can be removed when validation + # accepts circuit_breakers. + logger.info('disabling validate_for_proxyless in target proxy') + set_validate_for_proxyless(gcp, False) + original_backend_service_max_requests = 500 + alternate_backend_service_max_requests = 1000 + patch_backend_service( + gcp, + original_backend_service, [instance_group], + circuit_breakers={'maxRequests': original_backend_service_max_requests}) + logger.info('Waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + patch_backend_service(gcp, + alternate_backend_service, [same_zone_instance_group], + circuit_breakers={ + 'maxRequests': + alternate_backend_service_max_requests + }) + logger.info('Waiting for alternate to become healthy') + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + route_rules = [ + { + 'priority': 0, + # UnaryCall -> original_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + # EmptyCall -> alternate_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' + }], + 'service': alternate_backend_service.url + }, + ] + try: + # Make client send UNARY_CALL and EMPTY_CALL. + configure_client([ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + ], []) + logger.info('Patching url map with %s', route_rules) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + logger.info('Waiting for traffic to go to all backends') + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + + # Make all calls keep-open. + configure_client([ + messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL + ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'keep-open'), + (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + 'rpc-behavior', 'keep-open')]) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + (_WAIT_FOR_BACKEND_SEC + + int(original_backend_service_max_requests / args.qps)), + original_backend_service_max_requests, 1) + wait_until_rpcs_in_flight( + 'EMPTY_CALL', + (_WAIT_FOR_BACKEND_SEC + + int(alternate_backend_service_max_requests / args.qps)), + alternate_backend_service_max_requests, 1) + + # Increment circuit breakers max_requests threshold. + original_backend_service_max_requests = 800 + patch_backend_service(gcp, + original_backend_service, [instance_group], + circuit_breakers={ + 'maxRequests': + original_backend_service_max_requests + }) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + (_WAIT_FOR_BACKEND_SEC + + int(original_backend_service_max_requests / args.qps)), + original_backend_service_max_requests, 1) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_service(gcp, original_backend_service, [instance_group]) + patch_backend_service(gcp, alternate_backend_service, []) + set_validate_for_proxyless(gcp, True) + + +def set_validate_for_proxyless(gcp, validate_for_proxyless): + if not gcp.alpha_compute: + logger.debug( + 'Not setting validateForProxy because alpha is not enabled') + return + # This function deletes global_forwarding_rule and target_proxy, then + # recreate target_proxy with validateForProxyless=False. This is necessary + # because patching target_grpc_proxy isn't supported. + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless) + create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name, + [gcp.service_port]) def get_serving_status(instance, service_port): @@ -1202,12 +1421,12 @@ def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): wait_for_global_operation(gcp, result['name']) -def create_target_proxy(gcp, name): +def create_target_proxy(gcp, name, validate_for_proxyless=True): if gcp.alpha_compute: config = { 'name': name, 'url_map': gcp.url_map.url, - 'validate_for_proxyless': True, + 'validate_for_proxyless': validate_for_proxyless } logger.debug('Sending GCP request with body=%s', config) result = gcp.alpha_compute.targetGrpcProxies().insert( @@ -1415,10 +1634,11 @@ def delete_instance_template(gcp): logger.info('Delete failed: %s', http_error) -def patch_backend_instances(gcp, - backend_service, - instance_groups, - balancing_mode='UTILIZATION'): +def patch_backend_service(gcp, + backend_service, + instance_groups, + balancing_mode='UTILIZATION', + circuit_breakers=None): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: @@ -1429,6 +1649,7 @@ def patch_backend_instances(gcp, 'balancingMode': balancing_mode, 'maxRate': 1 if balancing_mode == 'RATE' else None } for instance_group in instance_groups], + 'circuitBreakers': circuit_breakers, } logger.debug('Sending GCP request with body=%s', config) result = compute_to_use.backendServices().patch( @@ -1742,7 +1963,7 @@ try: startup_script) instance_group = add_instance_group(gcp, args.zone, instance_group_name, _INSTANCE_GROUP_SIZE) - patch_backend_instances(gcp, backend_service, [instance_group]) + patch_backend_service(gcp, backend_service, [instance_group]) same_zone_instance_group = add_instance_group( gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) secondary_zone_instance_group = add_instance_group( @@ -1867,6 +2088,10 @@ try: test_header_matching(gcp, backend_service, instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'circuit_breaking': + test_circuit_breaking(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1)