diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index af83181f7a5..a043c65e117 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -84,6 +84,10 @@ _TEST_CASES = [ 'traffic_splitting', 'path_matching', 'header_matching', + 'api_listener', + 'forwarding_rule_port_match', + 'forwarding_rule_default_port', + 'metadata_filter', ] # Valid test cases, but not in all. So the tests can only run manually, and @@ -214,8 +218,9 @@ argp.add_argument( argp.add_argument('--network', default='global/networks/default', help='GCP network to use') +_DEFAULT_PORT_RANGE = '8080:8280' argp.add_argument('--service_port_range', - default='8080:8280', + default=_DEFAULT_PORT_RANGE, type=parse_port_range, help='Listening port for created gRPC backends. Specified as ' 'either a single int or as a range in the format min:max, in ' @@ -479,6 +484,21 @@ def wait_until_all_rpcs_go_to_given_backends(backends, allow_failures=False) +def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): + start_time = time.time() + while time.time() - start_time <= timeout_sec: + stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) + error_msg = None + rpcs_by_peer = stats.rpcs_by_peer + for backend in backends: + if backend in rpcs_by_peer: + error_msg = 'Unexpected backend %s receives load' % backend + break + if not error_msg: + return + raise Exception('Unexpected RPCs going to given backends') + + def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): '''Block until the test client reaches the state with the given number of RPCs being outstanding stably. @@ -951,6 +971,303 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service, return original_backend_instances, alternate_backend_instances +def test_metadata_filter(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + logger.info("Running test_metadata_filter") + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + patch_backend_service(gcp, alternate_backend_service, + [same_zone_instance_group]) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + try: + with open(bootstrap_path) as f: + md = json.load(f)['node']['metadata'] + match_labels = [] + for k, v in md.items(): + match_labels.append({'name': k, 'value': v}) + + not_match_labels = [{'name': 'fake', 'value': 'fail'}] + test_route_rules = [ + # test MATCH_ALL + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': not_match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test mixing MATCH_ALL and MATCH_ANY + # test MATCH_ALL: super set labels won't match + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test MATCH_ANY + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + }] + }], + 'service': original_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': not_match_labels + match_labels + }] + }], + 'service': alternate_backend_service.url + }, + ], + # test match multiple route rules + [ + { + 'priority': 0, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ANY', + 'filterLabels': match_labels + }] + }], + 'service': alternate_backend_service.url + }, + { + 'priority': 1, + 'matchRules': [{ + 'prefixMatch': + '/', + 'metadataFilters': [{ + 'filterMatchCriteria': 'MATCH_ALL', + 'filterLabels': match_labels + }] + }], + 'service': original_backend_service.url + }, + ] + ] + + for route_rules in test_route_rules: + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + patch_url_map_backend_service(gcp, + original_backend_service, + route_rules=route_rules) + wait_until_no_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends( + alternate_backend_instances, _WAIT_FOR_STATS_SEC) + patch_url_map_backend_service(gcp, original_backend_service) + finally: + patch_backend_service(gcp, alternate_backend_service, []) + + +def test_api_listener(gcp, backend_service, instance_group, + alternate_backend_service): + logger.info("Running api_listener") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # create a second suite of map+tp+fr with the same host name in host rule + # and we have to disable proxyless validation because it needs `0.0.0.0` + # ip address in fr for proxyless and also we violate ip:port uniqueness + # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 + new_config_suffix = '2' + create_url_map(gcp, url_map_name + new_config_suffix, backend_service, + service_host_name) + create_target_proxy(gcp, target_proxy_name + new_config_suffix, False) + if not gcp.service_port: + raise Exception( + 'Faied to find a valid port for the forwarding rule') + potential_ip_addresses = [] + max_attempts = 10 + for i in range(max_attempts): + potential_ip_addresses.append('10.10.10.%d' % + (random.randint(0, 255))) + create_global_forwarding_rule(gcp, + forwarding_rule_name + new_config_suffix, + [gcp.service_port], + potential_ip_addresses) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, + url_map_name + new_config_suffix, + backend_service, + service_host_name) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + delete_global_forwarding_rule(gcp, forwarding_rule_name) + delete_target_proxy(gcp, target_proxy_name) + delete_url_map(gcp, url_map_name) + verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * + args.qps) + for i in range(verify_attempts): + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # delete host rule for the original host name + patch_url_map_backend_service(gcp, alternate_backend_service) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + finally: + delete_global_forwarding_rule(gcp, + forwarding_rule_name + new_config_suffix) + delete_target_proxy(gcp, target_proxy_name + new_config_suffix) + delete_url_map(gcp, url_map_name + new_config_suffix) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, target_proxy_name) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + +def test_forwarding_rule_port_match(gcp, backend_service, instance_group): + logger.info("Running test_forwarding_rule_port_match") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, [ + x for x in parse_port_range(_DEFAULT_PORT_RANGE) + if x != gcp.service_port + ]) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + +def test_forwarding_rule_default_port(gcp, backend_service, instance_group): + logger.info("Running test_forwarding_rule_default_port") + try: + wait_for_healthy_backends(gcp, backend_service, instance_group) + backend_instances = get_instance_names(gcp, instance_group) + if gcp.service_port == _DEFAULT_SERVICE_PORT: + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + delete_global_forwarding_rule(gcp) + create_global_forwarding_rule(gcp, forwarding_rule_name, + parse_port_range(_DEFAULT_PORT_RANGE)) + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + # expect success when no port in client request service uri, and no port in url-map + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + delete_url_map(gcp) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, gcp.target_proxy.name, False) + potential_ip_addresses = [] + max_attempts = 10 + for i in range(max_attempts): + potential_ip_addresses.append('10.10.10.%d' % + (random.randint(0, 255))) + create_global_forwarding_rule(gcp, forwarding_rule_name, [80], + potential_ip_addresses) + wait_until_all_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + + # expect failure when no port in client request uri, but specify port in url-map + patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service, + service_host_name) + wait_until_no_rpcs_go_to_given_backends(backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + delete_global_forwarding_rule(gcp) + delete_target_proxy(gcp) + delete_url_map(gcp) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_proxy(gcp, target_proxy_name) + create_global_forwarding_rule(gcp, forwarding_rule_name, + potential_service_ports) + if gcp.service_port != _DEFAULT_SERVICE_PORT: + patch_url_map_host_rule_with_port(gcp, url_map_name, + backend_service, + service_host_name) + server_uri = service_host_name + ':' + str(gcp.service_port) + else: + server_uri = service_host_name + return server_uri + + def test_traffic_splitting(gcp, original_backend_service, instance_group, alternate_backend_service, same_zone_instance_group): # This test start with all traffic going to original_backend_service. Then @@ -2179,34 +2496,39 @@ def create_target_proxy(gcp, name, validate_for_proxyless=True): gcp.target_proxy = GcpResource(config['name'], result['targetLink']) -def create_global_forwarding_rule(gcp, name, potential_ports): +def create_global_forwarding_rule(gcp, + name, + potential_ports, + potential_ip_addresses=['0.0.0.0']): if gcp.alpha_compute: compute_to_use = gcp.alpha_compute else: compute_to_use = gcp.compute for port in potential_ports: - try: - config = { - 'name': name, - 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'portRange': str(port), - 'IPAddress': '0.0.0.0', - 'network': args.network, - 'target': gcp.target_proxy.url, - } - logger.debug('Sending GCP request with body=%s', config) - result = compute_to_use.globalForwardingRules().insert( - project=gcp.project, - body=config).execute(num_retries=_GCP_API_RETRIES) - wait_for_global_operation(gcp, result['name']) - gcp.global_forwarding_rule = GcpResource(config['name'], - result['targetLink']) - gcp.service_port = port - return - except googleapiclient.errors.HttpError as http_error: - logger.warning( - 'Got error %s when attempting to create forwarding rule to ' - '0.0.0.0:%d. Retrying with another port.' % (http_error, port)) + for ip_address in potential_ip_addresses: + try: + config = { + 'name': name, + 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', + 'portRange': str(port), + 'IPAddress': ip_address, + 'network': args.network, + 'target': gcp.target_proxy.url, + } + logger.debug('Sending GCP request with body=%s', config) + result = compute_to_use.globalForwardingRules().insert( + project=gcp.project, + body=config).execute(num_retries=_GCP_API_RETRIES) + wait_for_global_operation(gcp, result['name']) + gcp.global_forwarding_rule = GcpResource( + config['name'], result['targetLink']) + gcp.service_port = port + return + except googleapiclient.errors.HttpError as http_error: + logger.warning( + 'Got error %s when attempting to create forwarding rule to ' + '%s:%d. Retrying with another port.' % + (http_error, ip_address, port)) def get_health_check(gcp, health_check_name): @@ -2270,39 +2592,49 @@ def get_instance_group(gcp, zone, instance_group_name): return instance_group -def delete_global_forwarding_rule(gcp): +def delete_global_forwarding_rule(gcp, name=None): + if name: + forwarding_rule_to_delete = name + else: + forwarding_rule_to_delete = gcp.global_forwarding_rule.name try: result = gcp.compute.globalForwardingRules().delete( project=gcp.project, - forwardingRule=gcp.global_forwarding_rule.name).execute( + forwardingRule=forwarding_rule_to_delete).execute( num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_target_proxy(gcp): +def delete_target_proxy(gcp, name=None): + if name: + proxy_to_delete = name + else: + proxy_to_delete = gcp.target_proxy.name try: if gcp.alpha_compute: result = gcp.alpha_compute.targetGrpcProxies().delete( - project=gcp.project, - targetGrpcProxy=gcp.target_proxy.name).execute( + project=gcp.project, targetGrpcProxy=proxy_to_delete).execute( num_retries=_GCP_API_RETRIES) else: result = gcp.compute.targetHttpProxies().delete( - project=gcp.project, - targetHttpProxy=gcp.target_proxy.name).execute( + project=gcp.project, targetHttpProxy=proxy_to_delete).execute( num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_url_map(gcp): +def delete_url_map(gcp, name=None): + if name: + url_map_to_delete = name + else: + url_map_to_delete = gcp.url_map.name try: result = gcp.compute.urlMaps().delete( project=gcp.project, - urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES) + urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES) wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) @@ -2725,6 +3057,7 @@ try: if original_grpc_verbosity: client_env['GRPC_VERBOSITY'] = original_grpc_verbosity bootstrap_server_features = [] + if gcp.service_port == _DEFAULT_SERVICE_PORT: server_uri = service_host_name else: @@ -2759,6 +3092,17 @@ try: logger.info('skipping test %s due to missing alpha support', test_case) continue + if test_case in [ + 'api_listener', 'forwarding_rule_port_match', + 'forwarding_rule_default_port' + ] and CLIENT_HOSTS: + logger.info( + 'skipping test %s because test configuration is' + 'not compatible with client processes on existing' + 'client hosts', test_case) + continue + if test_case == 'forwarding_rule_default_port': + server_uri = service_host_name result = jobset.JobResult() log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) if not os.path.exists(log_dir): @@ -2867,6 +3211,20 @@ try: test_timeout(gcp, backend_service, instance_group) elif test_case == 'fault_injection': test_fault_injection(gcp, backend_service, instance_group) + elif test_case == 'api_listener': + server_uri = test_api_listener(gcp, backend_service, + instance_group, + alternate_backend_service) + elif test_case == 'forwarding_rule_port_match': + server_uri = test_forwarding_rule_port_match( + gcp, backend_service, instance_group) + elif test_case == 'forwarding_rule_default_port': + server_uri = test_forwarding_rule_default_port( + gcp, backend_service, instance_group) + elif test_case == 'metadata_filter': + test_metadata_filter(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) elif test_case == 'csds': test_csds(gcp, backend_service, instance_group, server_uri) else: