From 1c2f57ae1dd54ee94cc3b53c9683b7b392f645a9 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 8 Jul 2020 15:11:16 -0700 Subject: [PATCH] xds interop: add routing path matching to framework - prefix and full path - header matching - print client cmd to run - new test cases are not included in all --- tools/run_tests/run_xds_tests.py | 289 ++++++++++++++++++++++++++++--- 1 file changed, 263 insertions(+), 26 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3073d301ded..5f6ffd431f0 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -56,17 +56,28 @@ _TEST_CASES = [ 'secondary_locality_gets_requests_on_primary_failure', 'traffic_splitting', ] +# Valid test cases, but not in all. So the tests can only run manually, and +# aren't enabled automatically for all languages. +# +# TODO: Move them into _TEST_CASES when support is ready in all languages. +_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching'] def parse_test_cases(arg): - if arg == 'all': - return _TEST_CASES if arg == '': return [] - test_cases = arg.split(',') - if all([test_case in _TEST_CASES for test_case in test_cases]): - return test_cases - raise Exception('Failed to parse test cases %s' % arg) + arg_split = arg.split(',') + test_cases = set() + all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES + for arg in arg_split: + if arg == "all": + test_cases = test_cases.union(_TEST_CASES) + else: + test_cases = test_cases.union([arg]) + if not all([test_case in all_test_cases for test_case in test_cases]): + raise Exception('Failed to parse test cases %s' % arg) + # Perserve order. + return [x for x in all_test_cases if x in test_cases] def parse_port_range(port_arg): @@ -89,8 +100,10 @@ argp.add_argument( '--test_case', default='ping_pong', type=parse_test_cases, - help='Comma-separated list of test cases to run, or \'all\' to run every ' - 'test. Available tests: %s' % ' '.join(_TEST_CASES)) + help='Comma-separated list of test cases to run. Available tests: %s, ' + '(or \'all\' to run every test). ' + 'Alternative tests not included in \'all\': %s' % + (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) argp.add_argument( '--bootstrap_file', default='', @@ -237,6 +250,12 @@ _BOOTSTRAP_TEMPLATE = """ _TESTS_TO_FAIL_ON_RPC_FAILURE = [ 'new_instance_group_receives_traffic', 'ping_pong', 'round_robin' ] +# Tests that run UnaryCall and EmptyCall. +_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] +# Tests that make UnaryCall with test metadata. +_TESTS_TO_SEND_METADATA = ['header_matching'] +_TEST_METADATA_KEY = 'xds_md' +_TEST_METADATA_VALUE = 'exact_match' _PATH_MATCHER_NAME = 'path-matcher' _BASE_TEMPLATE_NAME = 'test-template' _BASE_INSTANCE_GROUP_NAME = 'test-ig' @@ -348,6 +367,29 @@ def compare_distributions(actual_distribution, expected_distribution, return True +def compare_expected_instances(stats, expected_instances): + """Compare if stats have expected instances for each type of RPC. + + Args: + stats: LoadBalancerStatsResponse reported by interop client. + expected_instances: a dict with key as the RPC type (string), value as + the expected backend instances (list of strings). + + Returns: + Returns true if the instances are expected. False if not. + """ + for rpc_type, expected_peers in expected_instances.items(): + rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] + rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None + logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) + peers = list(rpcs_by_peer.keys()) + if set(peers) != set(expected_peers): + logger.info('unexpected peers for %s, got %s, want %s', rpc_type, + peers, expected_peers) + return False + return True + + def test_backends_restart(gcp, backend_service, instance_group): logger.info('Running test_backends_restart') instance_names = get_instance_names(gcp, instance_group) @@ -629,19 +671,20 @@ def test_secondary_locality_gets_requests_on_primary_failure( patch_backend_instances(gcp, backend_service, [primary_instance_group]) -def test_traffic_splitting(gcp, original_backend_service, instance_group, - alternate_backend_service, same_zone_instance_group): - # This test start with all traffic going to original_backend_service. Then - # it updates URL-map to set default action to traffic splitting between - # original and alternate. It waits for all backends in both services to - # receive traffic, then verifies that weights are expected. - logger.info('Running test_traffic_splitting') +def prepare_services_for_urlmap_tests(gcp, original_backend_service, + instance_group, alternate_backend_service, + same_zone_instance_group): + ''' + This function prepares the services to be ready for tests that modifies + urlmaps. + Returns: + Returns original and alternate backend names as lists of strings. + ''' # The config validation for proxyless doesn't allow setting - # default_route_action. To test traffic splitting, we need to set the - # route action to weighted clusters. Disable validate - # validate_for_proxyless for this test. This can be removed when - # validation accepts default_route_action. + # default_route_action or route_rules. Disable validate + # validate_for_proxyless for this test. This can be removed when validation + # accepts default_route_action. logger.info('disabling validate_for_proxyless in target proxy') set_validate_for_proxyless(gcp, False) @@ -665,6 +708,20 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, logger.info('waiting for traffic to all go to original backends') wait_until_all_rpcs_go_to_given_backends(original_backend_instances, _WAIT_FOR_STATS_SEC) + return original_backend_instances, alternate_backend_instances + + +def test_traffic_splitting(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + # This test start with all traffic going to original_backend_service. Then + # it updates URL-map to set default action to traffic splitting between + # original and alternate. It waits for all backends in both services to + # receive traffic, then verifies that weights are expected. + logger.info('Running test_traffic_splitting') + + original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( + gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group) try: # Patch urlmap, change route action to traffic splitting between @@ -728,6 +785,157 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, set_validate_for_proxyless(gcp, True) +def test_path_matching(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + # This test start with all traffic (UnaryCall and EmptyCall) going to + # original_backend_service. + # + # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to + # go different backends. It waits for all backends in both services to + # receive traffic, then verifies that traffic goes to the expected + # backends. + logger.info('Running test_path_matching') + + original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( + gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group) + + try: + # A list of tuples (route_rules, expected_instances). + test_cases = [ + ( + [{ + 'priority': 0, + # FullPath EmptyCall -> alternate_backend_service. + 'matchRules': [{ + 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' + }], + 'service': alternate_backend_service.url + }], + { + "EmptyCall": alternate_backend_instances, + "UnaryCall": original_backend_instances + }), + ( + [{ + 'priority': 0, + # Prefix UnaryCall -> alternate_backend_service. + 'matchRules': [{ + 'prefixMatch': '/grpc.testing.TestService/Unary' + }], + 'service': alternate_backend_service.url + }], + { + "UnaryCall": alternate_backend_instances, + "EmptyCall": original_backend_instances + }) + ] + + for (route_rules, expected_instances) in test_cases: + logger.info('patching url map with %s -> alternative', + route_rules[0]['matchRules']) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + + # Wait for traffic to go to both services. + logger.info( + 'waiting for traffic to go to all backends (including alternate)' + ) + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + + retry_count = 10 + # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 + # seconds timeout. + for i in range(retry_count): + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + if not stats.rpcs_by_method: + raise ValueError( + 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' + ) + logger.info('attempt %d', i) + if compare_expected_instances(stats, expected_instances): + logger.info("success") + break + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + set_validate_for_proxyless(gcp, True) + + +def test_header_matching(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + # This test start with all traffic (UnaryCall and EmptyCall) going to + # original_backend_service. + # + # Then it updates URL-map to add routes, to make RPCs with test headers to + # go to different backends. It waits for all backends in both services to + # receive traffic, then verifies that traffic goes to the expected + # backends. + logger.info('Running test_header_matching') + + original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( + gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group) + + try: + # A list of tuples (route_rules, expected_instances). + test_cases = [( + [{ + 'priority': 0, + # Header ExactMatch -> alternate_backend_service. + # EmptyCall is sent with the metadata. + 'matchRules': [{ + 'prefixMatch': + '/', + 'headerMatches': [{ + 'headerName': _TEST_METADATA_KEY, + 'exactMatch': _TEST_METADATA_VALUE + }] + }], + 'service': alternate_backend_service.url + }], + { + "EmptyCall": alternate_backend_instances, + "UnaryCall": original_backend_instances + })] + + for (route_rules, expected_instances) in test_cases: + logger.info('patching url map with %s -> alternative', + route_rules[0]['matchRules']) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + + # Wait for traffic to go to both services. + logger.info( + 'waiting for traffic to go to all backends (including alternate)' + ) + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + + retry_count = 10 + # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 + # seconds timeout. + for i in range(retry_count): + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + if not stats.rpcs_by_method: + raise ValueError( + 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' + ) + logger.info('attempt %d', i) + if compare_expected_instances(stats, expected_instances): + logger.info("success") + break + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + set_validate_for_proxyless(gcp, True) + + def set_serving_status(instances, service_port, serving): for instance in instances: with grpc.insecure_channel('%s:%d' % @@ -1208,7 +1416,8 @@ def resize_instance_group(gcp, def patch_url_map_backend_service(gcp, backend_service=None, - services_with_weights=None): + services_with_weights=None, + route_rules=None): '''change url_map's backend service Only one of backend_service and service_with_weights can be not None. @@ -1230,6 +1439,7 @@ def patch_url_map_backend_service(gcp, 'name': _PATH_MATCHER_NAME, 'defaultService': default_service, 'defaultRouteAction': default_route_action, + 'routeRules': route_rules, }] } logger.debug('Sending GCP request with body=%s', config) @@ -1504,22 +1714,41 @@ try: test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) test_log_file = open(test_log_filename, 'w+') client_process = None + + if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: + rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' + else: + rpcs_to_send = '--rpc="UnaryCall"' + + if test_case in _TESTS_TO_SEND_METADATA: + metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format( + key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE) + else: + metadata_to_send = '--metadata=""' + if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE: wait_for_config_propagation( gcp, instance_group, args.client_cmd.format(server_uri=server_uri, stats_port=args.stats_port, qps=args.qps, - fail_on_failed_rpc=False), + fail_on_failed_rpc=False, + rpcs_to_send=rpcs_to_send, + metadata_to_send=metadata_to_send), client_env) fail_on_failed_rpc = '--fail_on_failed_rpc=true' else: fail_on_failed_rpc = '--fail_on_failed_rpc=false' - client_cmd = shlex.split( - args.client_cmd.format(server_uri=server_uri, - stats_port=args.stats_port, - qps=args.qps, - fail_on_failed_rpc=fail_on_failed_rpc)) + + client_cmd_formatted = args.client_cmd.format( + server_uri=server_uri, + stats_port=args.stats_port, + qps=args.qps, + fail_on_failed_rpc=fail_on_failed_rpc, + rpcs_to_send=rpcs_to_send, + metadata_to_send=metadata_to_send) + logger.debug('running client: %s', client_cmd_formatted) + client_cmd = shlex.split(client_cmd_formatted) try: client_process = subprocess.Popen(client_cmd, env=client_env, @@ -1559,6 +1788,14 @@ try: test_traffic_splitting(gcp, backend_service, instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'path_matching': + test_path_matching(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + elif test_case == 'header_matching': + test_header_matching(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1)