diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 762743e0993..d0a5a66662c 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -58,6 +58,8 @@ _TEST_CASES = [ def parse_test_cases(arg): if arg == 'all': return _TEST_CASES + if arg == '': + return [] test_cases = arg.split(',') if all([test_case in _TEST_CASES for test_case in test_cases]): return test_cases @@ -108,6 +110,13 @@ argp.add_argument( type=int, help='Time limit for waiting for created backend services to report ' 'healthy when launching or updated GCP resources') +argp.add_argument( + '--use_existing_gcp_resources', + default=False, + action='store_true', + help= + 'If set, find and use already created GCP resources instead of creating new' + ' ones.') argp.add_argument( '--keep_gcp_resources', default=False, @@ -164,14 +173,6 @@ argp.add_argument( help='Number of VMs to create per instance group. Certain test cases (e.g., ' 'round_robin) may not give meaningful results if this is set to a value ' 'less than 2.') -argp.add_argument( - '--tolerate_gcp_errors', - default=False, - action='store_true', - help= - 'Continue with test even when an error occurs during setup. Intended for ' - 'manual testing, where attempts to recreate any GCP resources already ' - 'existing will result in an error') argp.add_argument('--verbose', help='verbose log output', default=False, @@ -255,7 +256,7 @@ def get_client_stats(num_rpcs, timeout_sec): logger.debug('Invoked GetClientStats RPC: %s', response) return response except grpc.RpcError as rpc_error: - raise Exception('GetClientStats RPC failed') + logger.exception('GetClientStats RPC failed') def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, @@ -732,6 +733,67 @@ def create_global_forwarding_rule(gcp, name, potential_ports): '0.0.0.0:%d. Retrying with another port.' % (http_error, port)) +def get_health_check(gcp, health_check_name): + result = gcp.compute.healthChecks().get( + project=gcp.project, healthCheck=health_check_name).execute() + gcp.health_check = GcpResource(health_check_name, result['selfLink']) + + +def get_health_check_firewall_rule(gcp, firewall_name): + result = gcp.compute.firewalls().get(project=gcp.project, + firewall=firewall_name).execute() + gcp.health_check_firewall_rule = GcpResource(firewall_name, + result['selfLink']) + + +def get_backend_service(gcp, backend_service_name): + result = gcp.compute.backendServices().get( + project=gcp.project, backendService=backend_service_name).execute() + backend_service = GcpResource(backend_service_name, result['selfLink']) + gcp.backend_services.append(backend_service) + return backend_service + + +def get_url_map(gcp, url_map_name): + result = gcp.compute.urlMaps().get(project=gcp.project, + urlMap=url_map_name).execute() + gcp.url_map = GcpResource(url_map_name, result['selfLink']) + + +def get_target_proxy(gcp, target_proxy_name): + if gcp.alpha_compute: + result = gcp.alpha_compute.targetGrpcProxies().get( + project=gcp.project, targetGrpcProxy=target_proxy_name).execute() + else: + result = gcp.compute.targetHttpProxies().get( + project=gcp.project, targetHttpProxy=target_proxy_name).execute() + gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink']) + + +def get_global_forwarding_rule(gcp, forwarding_rule_name): + result = gcp.compute.globalForwardingRules().get( + project=gcp.project, forwardingRule=forwarding_rule_name).execute() + gcp.global_forwarding_rule = GcpResource(forwarding_rule_name, + result['selfLink']) + + +def get_instance_template(gcp, template_name): + result = gcp.compute.instanceTemplates().get( + project=gcp.project, instanceTemplate=template_name).execute() + gcp.instance_template = GcpResource(template_name, result['selfLink']) + + +def get_instance_group(gcp, zone, instance_group_name): + result = gcp.compute.instanceGroups().get( + project=gcp.project, zone=zone, + instanceGroup=instance_group_name).execute() + gcp.service_port = result['namedPorts'][0]['port'] + instance_group = InstanceGroup(instance_group_name, result['selfLink'], + zone) + gcp.instance_groups.append(instance_group) + return instance_group + + def delete_global_forwarding_rule(gcp): try: result = gcp.compute.globalForwardingRules().delete( @@ -1040,7 +1102,30 @@ try: same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix if _USE_SECONDARY_IG: secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix - try: + if args.use_existing_gcp_resources: + logger.info('Reusing existing GCP resources') + get_health_check(gcp, health_check_name) + try: + get_health_check_firewall_rule(gcp, firewall_name) + except googleapiclient.errors.HttpError as http_error: + # Firewall rule may be auto-deleted periodically depending on GCP + # project settings. + logger.exception('Failed to find firewall rule, recreating') + create_health_check_firewall_rule(gcp, firewall_name) + backend_service = get_backend_service(gcp, backend_service_name) + alternate_backend_service = get_backend_service( + gcp, alternate_backend_service_name) + get_url_map(gcp, url_map_name) + get_target_proxy(gcp, target_proxy_name) + get_global_forwarding_rule(gcp, forwarding_rule_name) + get_instance_template(gcp, template_name) + instance_group = get_instance_group(gcp, args.zone, instance_group_name) + same_zone_instance_group = get_instance_group( + gcp, args.zone, same_zone_instance_group_name) + if _USE_SECONDARY_IG: + secondary_zone_instance_group = get_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name) + else: create_health_check(gcp, health_check_name) create_health_check_firewall_rule(gcp, firewall_name) backend_service = add_backend_service(gcp, backend_service_name) @@ -1073,166 +1158,104 @@ try: secondary_zone_instance_group = add_instance_group( gcp, args.secondary_zone, secondary_zone_instance_group_name, _INSTANCE_GROUP_SIZE) - except googleapiclient.errors.HttpError as http_error: - if args.tolerate_gcp_errors: - logger.warning( - 'Failed to set up backends: %s. Attempting to continue since ' - '--tolerate_gcp_errors=true', http_error) - if not gcp.instance_template: - result = compute.instanceTemplates().get( - project=args.project_id, - instanceTemplate=template_name).execute() - gcp.instance_template = GcpResource(template_name, - result['selfLink']) - if not gcp.backend_services: - result = compute.backendServices().get( - project=args.project_id, - backendService=backend_service_name).execute() - backend_service = GcpResource(backend_service_name, - result['selfLink']) - gcp.backend_services.append(backend_service) - result = compute.backendServices().get( - project=args.project_id, - backendService=alternate_backend_service_name).execute() - alternate_backend_service = GcpResource( - alternate_backend_service_name, result['selfLink']) - gcp.backend_services.append(alternate_backend_service) - if not gcp.instance_groups: - result = compute.instanceGroups().get( - project=args.project_id, - zone=args.zone, - instanceGroup=instance_group_name).execute() - instance_group = InstanceGroup(instance_group_name, - result['selfLink'], args.zone) - gcp.instance_groups.append(instance_group) - result = compute.instanceGroups().get( - project=args.project_id, - zone=args.zone, - instanceGroup=same_zone_instance_group_name).execute() - same_zone_instance_group = InstanceGroup( - same_zone_instance_group_name, result['selfLink'], - args.zone) - gcp.instance_groups.append(same_zone_instance_group) - if _USE_SECONDARY_IG: - result = compute.instanceGroups().get( - project=args.project_id, - zone=args.secondary_zone, - instanceGroup=secondary_zone_instance_group_name - ).execute() - secondary_zone_instance_group = InstanceGroup( - secondary_zone_instance_group_name, result['selfLink'], - args.secondary_zone) - gcp.instance_groups.append(secondary_zone_instance_group) - if not gcp.health_check: - result = compute.healthChecks().get( - project=args.project_id, - healthCheck=health_check_name).execute() - gcp.health_check = GcpResource(health_check_name, - result['selfLink']) - if not gcp.url_map: - result = compute.urlMaps().get(project=args.project_id, - urlMap=url_map_name).execute() - gcp.url_map = GcpResource(url_map_name, result['selfLink']) - if not gcp.service_port: - gcp.service_port = args.service_port_range[0] - logger.warning('Using arbitrary service port in range: %d' % - gcp.service_port) - else: - raise http_error wait_for_healthy_backends(gcp, backend_service, instance_group) - if gcp.service_port == _DEFAULT_SERVICE_PORT: - server_uri = service_host_name - else: - server_uri = service_host_name + ':' + str(gcp.service_port) - if args.bootstrap_file: - bootstrap_path = os.path.abspath(args.bootstrap_file) - else: - with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: - bootstrap_file.write( - _BOOTSTRAP_TEMPLATE.format( - node_id=socket.gethostname()).encode('utf-8')) - bootstrap_path = bootstrap_file.name - client_env = dict(os.environ, GRPC_XDS_BOOTSTRAP=bootstrap_path) - client_cmd = shlex.split( - args.client_cmd.format(server_uri=server_uri, - stats_port=args.stats_port, - qps=args.qps)) - - test_results = {} - failed_tests = [] - for test_case in args.test_case: - result = jobset.JobResult() - log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) - if not os.path.exists(log_dir): - os.makedirs(log_dir) - test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) - test_log_file = open(test_log_filename, 'w+') - client_process = None - try: - client_process = subprocess.Popen(client_cmd, - env=client_env, - stderr=subprocess.STDOUT, - stdout=test_log_file) - if test_case == 'backends_restart': - test_backends_restart(gcp, backend_service, instance_group) - elif test_case == 'change_backend_service': - test_change_backend_service(gcp, backend_service, - instance_group, - alternate_backend_service, - same_zone_instance_group) - elif test_case == 'new_instance_group_receives_traffic': - test_new_instance_group_receives_traffic( - gcp, backend_service, instance_group, - same_zone_instance_group) - elif test_case == 'ping_pong': - test_ping_pong(gcp, backend_service, instance_group) - elif test_case == 'remove_instance_group': - test_remove_instance_group(gcp, backend_service, instance_group, - same_zone_instance_group) - elif test_case == 'round_robin': - test_round_robin(gcp, backend_service, instance_group) - elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': - test_secondary_locality_gets_no_requests_on_partial_primary_failure( - gcp, backend_service, instance_group, - secondary_zone_instance_group) - elif test_case == 'secondary_locality_gets_requests_on_primary_failure': - test_secondary_locality_gets_requests_on_primary_failure( - gcp, backend_service, instance_group, - secondary_zone_instance_group) - else: - logger.error('Unknown test case: %s', test_case) - sys.exit(1) - result.state = 'PASSED' - result.returncode = 0 - except Exception as e: - logger.error('Test case %s failed: %s', test_case, e) - failed_tests.append(test_case) - result.state = 'FAILED' - result.message = str(e) - finally: - if client_process: - client_process.terminate() - test_log_file.close() - # Workaround for Python 3, as report_utils will invoke decode() on - # result.message, which has a default value of ''. - result.message = result.message.encode('UTF-8') - test_results[test_case] = [result] - if args.log_client_output: - logger.info('Client output:') - with open(test_log_filename, 'r') as client_output: - logger.info(client_output.read()) - if not os.path.exists(_TEST_LOG_BASE_DIR): - os.makedirs(_TEST_LOG_BASE_DIR) - report_utils.render_junit_xml_report(test_results, - os.path.join(_TEST_LOG_BASE_DIR, - _SPONGE_XML_NAME), - suite_name='xds_tests', - multi_target=True) - if failed_tests: - logger.error('Test case(s) %s failed', failed_tests) - sys.exit(1) + if args.test_case: + + if gcp.service_port == _DEFAULT_SERVICE_PORT: + server_uri = service_host_name + else: + server_uri = service_host_name + ':' + str(gcp.service_port) + if args.bootstrap_file: + bootstrap_path = os.path.abspath(args.bootstrap_file) + else: + with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: + bootstrap_file.write( + _BOOTSTRAP_TEMPLATE.format( + node_id=socket.gethostname()).encode('utf-8')) + bootstrap_path = bootstrap_file.name + client_env = dict(os.environ, GRPC_XDS_BOOTSTRAP=bootstrap_path) + client_cmd = shlex.split( + args.client_cmd.format(server_uri=server_uri, + stats_port=args.stats_port, + qps=args.qps)) + + test_results = {} + failed_tests = [] + for test_case in args.test_case: + result = jobset.JobResult() + log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) + test_log_file = open(test_log_filename, 'w+') + client_process = None + try: + client_process = subprocess.Popen(client_cmd, + env=client_env, + stderr=subprocess.STDOUT, + stdout=test_log_file) + if test_case == 'backends_restart': + test_backends_restart(gcp, backend_service, instance_group) + elif test_case == 'change_backend_service': + test_change_backend_service(gcp, backend_service, + instance_group, + alternate_backend_service, + same_zone_instance_group) + elif test_case == 'new_instance_group_receives_traffic': + test_new_instance_group_receives_traffic( + gcp, backend_service, instance_group, + same_zone_instance_group) + elif test_case == 'ping_pong': + test_ping_pong(gcp, backend_service, instance_group) + elif test_case == 'remove_instance_group': + test_remove_instance_group(gcp, backend_service, + instance_group, + same_zone_instance_group) + elif test_case == 'round_robin': + test_round_robin(gcp, backend_service, instance_group) + elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, + secondary_zone_instance_group) + elif test_case == 'secondary_locality_gets_requests_on_primary_failure': + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, + secondary_zone_instance_group) + else: + logger.error('Unknown test case: %s', test_case) + sys.exit(1) + result.state = 'PASSED' + result.returncode = 0 + except Exception as e: + logger.exception('Test case %s failed', test_case) + failed_tests.append(test_case) + result.state = 'FAILED' + result.message = str(e) + finally: + if client_process: + client_process.terminate() + test_log_file.close() + # Workaround for Python 3, as report_utils will invoke decode() on + # result.message, which has a default value of ''. + result.message = result.message.encode('UTF-8') + test_results[test_case] = [result] + if args.log_client_output: + logger.info('Client output:') + with open(test_log_filename, 'r') as client_output: + logger.info(client_output.read()) + if not os.path.exists(_TEST_LOG_BASE_DIR): + os.makedirs(_TEST_LOG_BASE_DIR) + report_utils.render_junit_xml_report(test_results, + os.path.join( + _TEST_LOG_BASE_DIR, + _SPONGE_XML_NAME), + suite_name='xds_tests', + multi_target=True) + if failed_tests: + logger.error('Test case(s) %s failed', failed_tests) + sys.exit(1) finally: if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.')