From c7506e9b41ca9ed444ae8d7290f9b79309e9d3ac Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sun, 8 Nov 2020 21:23:06 -0800 Subject: [PATCH] Implement advanced circuit breaking test --- tools/run_tests/run_xds_tests.py | 114 ++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index f4459b2142e..be411f16006 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -404,7 +404,7 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) -def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): +def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): '''Block until the test client reaches the state with the given number of RPCs being outstanding stably. @@ -423,7 +423,10 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): logger.debug('Waiting for %d sec until %d RPCs (with %d%% tolerance) in-flight' % (timeout_sec, num_rpcs, threshold)) while time.time() - start_time <= timeout_sec: - error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + error_msg = _check_rpcs_in_flight(rpc_type, + num_rpcs, + threshold, + threshold_fraction) if error_msg: time.sleep(2) else: @@ -431,17 +434,21 @@ def wait_until_rpcs_in_flight(timeout_sec, num_rpcs, threshold): # Ensure the number of outstanding RPCs is stable. if not error_msg: time.sleep(5) - error_msg = _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction) + error_msg = _check_rpcs_in_flight(rpc_type, + num_rpcs, + threshold, + threshold_fraction) if error_msg: raise Exception(error_msg) -def _check_rpcs_in_flight(num_rpcs, threshold, threshold_fraction): +def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): error_msg = None stats = get_client_accumulated_stats() - rpcs_in_flight = (stats.num_rpcs_started - - stats.num_rpcs_succeeded - - stats.num_rpcs_failed) + rpcs_started = stats.num_rpcs_started_by_method[rpc_type] + rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] + rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] + rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): error_msg = ('actual(%d) < expected(%d - %d%%)' % (rpcs_in_flight, num_rpcs, threshold)) @@ -1092,8 +1099,8 @@ def test_circuit_breaking(gcp, configure_client([messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 'rpc-behavior', 'keep-open')]) - wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + - int(max_requests / args.qps)), + wait_until_rpcs_in_flight('UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), max_requests, 1) # Increment circuit breakers max_requests threshold. @@ -1101,14 +1108,99 @@ def test_circuit_breaking(gcp, patch_backend_service(gcp, alternate_backend_service, [same_zone_instance_group], circuit_breakers={'maxRequests': max_requests}) - wait_until_rpcs_in_flight((_WAIT_FOR_BACKEND_SEC + - int(max_requests / args.qps)), + wait_until_rpcs_in_flight('UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(max_requests / args.qps), max_requests, 1) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, alternate_backend_service, []) +def test_circuit_breaking_advanced(gcp, + original_backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group): + logger.info('Running test_circuit_breaking_advanced') + patch_backend_service(gcp, + original_backend_service, + [instance_group], + circuit_breakers={'maxRequests': 500}) + logger.info('Waiting for original backends to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + patch_backend_service(gcp, + alternate_backend_service, + [same_zone_instance_group], + circuit_breakers={'maxRequests': 1000}) + logger.info('Waiting for alternate to become healthy') + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp,same_zone_instance_group) + route_rules = [ + { + 'priority': 0, + # UnaryCall -> original_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + # EmptyCall -> alternate_backend_service + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' + }], + 'service': alternate_backend_service.url + }, + ] + try: + logger.info('Patching url map with %s', route_rules) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + logger.info('Waiting for traffic to go to all backends') + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + + # Make all calls keep-open. + configure_client( + [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], + [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, + 'rpc-behavior', 'keep-open'), + (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, + 'rpc-behavior', 'keep-open')]) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(500 / args.qps), + 500, + 1) + wait_until_rpcs_in_flight( + 'EMPTY_CALL', + _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), + 1000, + 1) + + # Increment circuit breakers max_requests threshold. + patch_backend_service(gcp, + original_backend_service, + [instance_group], + circuit_breakers={'maxRequests': 1000}) + wait_until_rpcs_in_flight( + 'UNARY_CALL', + _WAIT_FOR_BACKEND_SEC + int(1000 / args.qps), + 1000, + 1) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_service(gcp, original_backend_service, [instance_group]) + patch_backend_service(gcp, alternate_backend_service, []) + + + def get_serving_status(instance, service_port): with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: health_stub = health_pb2_grpc.HealthStub(channel)