From 1576de5d35ca7c053301ec73daac341d7074689e Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 26 Feb 2020 20:37:59 -0800 Subject: [PATCH 01/10] Refactor xds test driver to prepare for additional test cases The bulk of this change is the introduction of a GcpResource object to track the created VMs, backend services, URL maps, etc. The additional test cases for xDS integration greatly increase the complexity of the resources required (e.g., multiple backend services, instance groups in different zones) and the GcpResource construct makes keeping track of - and cleaning up - these resources much cleaner. --- tools/run_tests/run_xds_tests.py | 653 ++++++++++++++++++------------- 1 file changed, 383 insertions(+), 270 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1b5a771474d..0a696655d7a 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -51,11 +51,15 @@ argp.add_argument('--project_id', help='GCP project id') argp.add_argument( '--gcp_suffix', default='', - help='Optional suffix for all generated GCP resource names. Useful to ensure ' - 'distinct names across test runs.') + help='Optional suffix for all generated GCP resource names. Useful to ' + 'ensure distinct names across test runs.') argp.add_argument('--test_case', default=None, - choices=['all', 'ping_pong', 'round_robin']) + choices=[ + 'all', + 'ping_pong', + 'round_robin', + ]) argp.add_argument( '--client_cmd', default=None, @@ -63,12 +67,15 @@ argp.add_argument( '{service_host}, {service_port},{stats_port} and {qps} parameters using ' 'str.format(), and generate the GRPC_XDS_BOOTSTRAP file.') argp.add_argument('--zone', default='us-central1-a') +argp.add_argument('--secondary_zone', + default='us-west1-b', + help='Zone to use for secondary TD locality tests') argp.add_argument('--qps', default=10, help='Client QPS') argp.add_argument( '--wait_for_backend_sec', - default=900, - help='Time limit for waiting for created backend services to report healthy ' - 'when launching test suite') + default=600, + help='Time limit for waiting for created backend services to report ' + 'healthy when launching or updated GCP resources') argp.add_argument( '--keep_gcp_resources', default=False, @@ -81,13 +88,13 @@ argp.add_argument( default=None, type=str, help= - 'If provided, uses this file instead of retrieving via the GCP discovery API' -) + 'If provided, uses this file instead of retrieving via the GCP discovery ' + 'API') argp.add_argument('--network', default='global/networks/default', help='GCP network to use') argp.add_argument('--service_port_range', - default='8080:8180', + default='8080:8100', type=parse_port_range, help='Listening port for created gRPC backends. Specified as ' 'either a single int or as a range in the format min:max, in ' @@ -115,35 +122,18 @@ argp.add_argument( argp.add_argument('--verbose', help='verbose log output', default=False, - action="store_true") + action='store_true') args = argp.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) -PROJECT_ID = args.project_id -ZONE = args.zone -QPS = args.qps -TEST_CASE = args.test_case -CLIENT_CMD = args.client_cmd -WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec -TEMPLATE_NAME = 'test-template' + args.gcp_suffix -INSTANCE_GROUP_NAME = 'test-ig' + args.gcp_suffix -HEALTH_CHECK_NAME = 'test-hc' + args.gcp_suffix -FIREWALL_RULE_NAME = 'test-fw-rule' + args.gcp_suffix -BACKEND_SERVICE_NAME = 'test-backend-service' + args.gcp_suffix -URL_MAP_NAME = 'test-map' + args.gcp_suffix -SERVICE_HOST = 'grpc-test' + args.gcp_suffix -TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix -FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix -KEEP_GCP_RESOURCES = args.keep_gcp_resources -TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors -STATS_PORT = args.stats_port -INSTANCE_GROUP_SIZE = 2 -WAIT_FOR_OPERATION_SEC = 60 -NUM_TEST_RPCS = 10 * QPS -WAIT_FOR_STATS_SEC = 30 -BOOTSTRAP_TEMPLATE = """ +_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec +_WAIT_FOR_OPERATION_SEC = 60 +_INSTANCE_GROUP_SIZE = 2 +_NUM_TEST_RPCS = 10 * args.qps +_WAIT_FOR_STATS_SEC = 60 +_BOOTSTRAP_TEMPLATE = """ {{ "node": {{ "id": "{node_id}" @@ -158,10 +148,20 @@ BOOTSTRAP_TEMPLATE = """ ] }}] }}""" % args.xds_server +_PATH_MATCHER_NAME = 'path-matcher' +_BASE_TEMPLATE_NAME = 'test-template' +_BASE_INSTANCE_GROUP_NAME = 'test-ig' +_BASE_HEALTH_CHECK_NAME = 'test-hc' +_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' +_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' +_BASE_URL_MAP_NAME = 'test-map' +_BASE_SERVICE_HOST = 'grpc-test' +_BASE_TARGET_PROXY_NAME = 'test-target-proxy' +_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' def get_client_stats(num_rpcs, timeout_sec): - with grpc.insecure_channel('localhost:%d' % STATS_PORT) as channel: + with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel: stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) request = messages_pb2.LoadBalancerStatsRequest() request.num_rpcs = num_rpcs @@ -177,12 +177,17 @@ def get_client_stats(num_rpcs, timeout_sec): raise Exception('GetClientStats RPC failed') -def wait_until_only_given_backends_receive_load(backends, timeout_sec): +def wait_until_only_given_instances_receive_load(backends, + timeout_sec, + num_rpcs=100, + allow_failures=False): start_time = time.time() error_msg = None + logger.debug('Waiting for %d sec until backends %s receive load' % + (timeout_sec, backends)) while time.time() - start_time <= timeout_sec: error_msg = None - stats = get_client_stats(max(len(backends), 1), timeout_sec) + stats = get_client_stats(num_rpcs, timeout_sec) rpcs_by_peer = stats.rpcs_by_peer for backend in backends: if backend not in rpcs_by_peer: @@ -190,52 +195,59 @@ def wait_until_only_given_backends_receive_load(backends, timeout_sec): break if not error_msg and len(rpcs_by_peer) > len(backends): error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer + if not allow_failures and stats.num_failures > 0: + error_msg = '%d RPCs failed' % stats.num_failures if not error_msg: return raise Exception(error_msg) -def test_ping_pong(backends, num_rpcs, stats_timeout_sec): +def test_ping_pong(gcp, backend_service, instance_group): + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) start_time = time.time() error_msg = None - while time.time() - start_time <= stats_timeout_sec: + while time.time() - start_time <= _WAIT_FOR_STATS_SEC: error_msg = None - stats = get_client_stats(num_rpcs, stats_timeout_sec) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) rpcs_by_peer = stats.rpcs_by_peer - for backend in backends: - if backend not in rpcs_by_peer: - error_msg = 'Backend %s did not receive load' % backend + for instance in instance_names: + if instance not in rpcs_by_peer: + error_msg = 'Instance %s did not receive load' % instance break - if not error_msg and len(rpcs_by_peer) > len(backends): - error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer + if not error_msg and len(rpcs_by_peer) > len(instance_names): + error_msg = 'Unexpected instance received load: %s' % rpcs_by_peer if not error_msg: return raise Exception(error_msg) -def test_round_robin(backends, num_rpcs, stats_timeout_sec): +def test_round_robin(gcp, backend_service, instance_group): + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) threshold = 1 - wait_until_only_given_backends_receive_load(backends, stats_timeout_sec) - stats = get_client_stats(num_rpcs, stats_timeout_sec) + wait_until_only_given_instances_receive_load(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] total_requests_received = sum( [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]) - if total_requests_received != num_rpcs: + if total_requests_received != _NUM_TEST_RPCS: raise Exception('Unexpected RPC failures', stats) - expected_requests = total_requests_received / len(backends) - for backend in backends: - if abs(stats.rpcs_by_peer[backend] - expected_requests) > threshold: + expected_requests = total_requests_received / len(instance_names) + for instance in instance_names: + if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold: raise Exception( - 'RPC peer distribution differs from expected by more than %d for backend %s (%s)', - threshold, backend, stats) + 'RPC peer distribution differs from expected by more than %d ' + 'for instance %s (%s)', threshold, instance, stats) -def create_instance_template(compute, project, name, grpc_port): +def create_instance_template(gcp, name, network, source_image): config = { 'name': name, 'properties': { 'tags': { - 'items': ['grpc-allow-healthcheck'] + 'items': ['allow-health-checks'] }, 'machineType': 'e2-standard-2', 'serviceAccounts': [{ @@ -246,12 +258,12 @@ def create_instance_template(compute, project, name, grpc_port): 'accessConfigs': [{ 'type': 'ONE_TO_ONE_NAT' }], - 'network': args.network + 'network': network }], 'disks': [{ 'boot': True, 'initializeParams': { - 'sourceImage': args.source_image + 'sourceImage': source_image } }], 'metadata': { @@ -260,7 +272,6 @@ def create_instance_template(compute, project, name, grpc_port): 'startup-script', 'value': """#!/bin/bash - sudo apt update sudo apt install -y git default-jdk mkdir java_server @@ -271,40 +282,43 @@ pushd interop-testing ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true nohup build/install/grpc-interop-testing/bin/xds-test-server --port=%d 1>/dev/null &""" - % grpc_port + % gcp.service_port }] } } } - result = compute.instanceTemplates().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.instanceTemplates().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.instance_template = GcpResource(config['name'], result['targetLink']) -def create_instance_group(compute, project, zone, name, size, grpc_port, - template_url): +def add_instance_group(gcp, zone, name, size): config = { 'name': name, - 'instanceTemplate': template_url, + 'instanceTemplate': gcp.instance_template.url, 'targetSize': size, 'namedPorts': [{ 'name': 'grpc', - 'port': grpc_port + 'port': gcp.service_port }] } - result = compute.instanceGroupManagers().insert(project=project, - zone=zone, - body=config).execute() - wait_for_zone_operation(compute, project, zone, result['name']) - result = compute.instanceGroupManagers().get( - project=PROJECT_ID, zone=ZONE, instanceGroupManager=name).execute() - return result['instanceGroup'] + result = gcp.compute.instanceGroupManagers().insert(project=gcp.project, + zone=zone, + body=config).execute() + wait_for_zone_operation(gcp, zone, result['name']) + result = gcp.compute.instanceGroupManagers().get( + project=gcp.project, zone=zone, + instanceGroupManager=config['name']).execute() + instance_group = InstanceGroup(config['name'], result['instanceGroup'], + zone) + gcp.instance_groups.append(instance_group) + return instance_group -def create_health_check(compute, project, name): +def create_health_check(gcp, name): config = { 'name': name, 'type': 'TCP', @@ -312,13 +326,13 @@ def create_health_check(compute, project, name): 'portName': 'grpc' } } - result = compute.healthChecks().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.healthChecks().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check = GcpResource(config['name'], result['targetLink']) -def create_health_check_firewall_rule(compute, project, name): +def create_health_check_firewall_rule(gcp, name): config = { 'name': name, 'direction': 'INGRESS', @@ -326,169 +340,169 @@ def create_health_check_firewall_rule(compute, project, name): 'IPProtocol': 'tcp' }], 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], - 'targetTags': ['grpc-allow-healthcheck'], + 'targetTags': ['allow-health-checks'], } - result = compute.firewalls().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.firewalls().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check_firewall_rule = GcpResource(config['name'], + result['targetLink']) -def create_backend_service(compute, project, name, health_check): +def add_backend_service(gcp, name): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'healthChecks': [health_check], + 'healthChecks': [gcp.health_check.url], 'portName': 'grpc', 'protocol': 'HTTP2' } - result = compute.backendServices().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.backendServices().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + backend_service = GcpResource(config['name'], result['targetLink']) + gcp.backend_services.append(backend_service) + return backend_service -def create_url_map(compute, project, name, backend_service_url, host_name): - path_matcher_name = 'path-matcher' +def create_url_map(gcp, name, backend_service, host_name): config = { 'name': name, - 'defaultService': backend_service_url, + 'defaultService': backend_service.url, 'pathMatchers': [{ - 'name': path_matcher_name, - 'defaultService': backend_service_url, + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, }], 'hostRules': [{ 'hosts': [host_name], - 'pathMatcher': path_matcher_name + 'pathMatcher': _PATH_MATCHER_NAME }] } - result = compute.urlMaps().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.urlMaps().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.url_map = GcpResource(config['name'], result['targetLink']) -def create_target_http_proxy(compute, project, name, url_map_url): +def create_target_http_proxy(gcp, name): config = { 'name': name, - 'url_map': url_map_url, + 'url_map': gcp.url_map.url, } - result = compute.targetHttpProxies().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.targetHttpProxies().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.target_http_proxy = GcpResource(config['name'], result['targetLink']) -def create_global_forwarding_rule(compute, project, name, grpc_port, - target_http_proxy_url): +def create_global_forwarding_rule(gcp, name, port): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'portRange': str(grpc_port), + 'portRange': str(port), 'IPAddress': '0.0.0.0', 'network': args.network, - 'target': target_http_proxy_url, + 'target': gcp.target_http_proxy.url, } - result = compute.globalForwardingRules().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - - -def delete_global_forwarding_rule(compute, project, forwarding_rule): - try: - result = compute.globalForwardingRules().delete( - project=project, forwardingRule=forwarding_rule).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) + result = gcp.compute.globalForwardingRules().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.global_forwarding_rule = GcpResource(config['name'], + result['targetLink']) -def delete_target_http_proxy(compute, project, target_http_proxy): +def delete_global_forwarding_rule(gcp): try: - result = compute.targetHttpProxies().delete( - project=project, targetHttpProxy=target_http_proxy).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.globalForwardingRules().delete( + project=gcp.project, + forwardingRule=gcp.global_forwarding_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_url_map(compute, project, url_map): +def delete_target_http_proxy(gcp): try: - result = compute.urlMaps().delete(project=project, - urlMap=url_map).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.targetHttpProxies().delete( + project=gcp.project, + targetHttpProxy=gcp.target_http_proxy.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_backend_service(compute, project, backend_service): +def delete_url_map(gcp): try: - result = compute.backendServices().delete( - project=project, backendService=backend_service).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.urlMaps().delete( + project=gcp.project, urlMap=gcp.url_map.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_firewall(compute, project, firewall_rule): - try: - result = compute.firewalls().delete(project=project, - firewall=firewall_rule).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) +def delete_backend_services(gcp): + for backend_service in gcp.backend_services: + try: + result = gcp.compute.backendServices().delete( + project=gcp.project, + backendService=gcp.backend_service.name).execute() + wait_for_global_operation(gcp, result['name']) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) -def delete_health_check(compute, project, health_check): +def delete_firewall(gcp): try: - result = compute.healthChecks().delete( - project=project, healthCheck=health_check).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.firewalls().delete( + project=gcp.project, + firewall=gcp.health_check_firewall_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_group(compute, project, zone, instance_group): +def delete_health_check(gcp): try: - result = compute.instanceGroupManagers().delete( - project=project, zone=zone, - instanceGroupManager=instance_group).execute() - timeout_sec = 180 # Deleting an instance group can be slow - wait_for_zone_operation(compute, - project, - ZONE, - result['name'], - timeout_sec=timeout_sec) + result = gcp.compute.healthChecks().delete( + project=gcp.project, healthCheck=gcp.health_check.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_template(compute, project, instance_template): +def delete_instance_groups(gcp): + for instance_group in gcp.instance_groups: + try: + result = gcp.compute.instanceGroupManagers().delete( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name).execute() + timeout_sec = 180 # Deleting an instance group can be slow + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=timeout_sec) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) + + +def delete_instance_template(gcp): try: - result = compute.instanceTemplates().delete( - project=project, instanceTemplate=instance_template).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.instanceTemplates().delete( + project=gcp.project, + instanceTemplate=gcp.instance_template.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def add_instances_to_backend(compute, project, backend_service, instance_group): - config = { - 'backends': [{ - 'group': instance_group, - }], - } - result = compute.backendServices().patch(project=project, - backendService=backend_service, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - - -def wait_for_global_operation(compute, - project, +def wait_for_global_operation(gcp, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.globalOperations().get(project=project, - operation=operation).execute() + result = gcp.compute.globalOperations().get( + project=gcp.project, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -498,16 +512,14 @@ def wait_for_global_operation(compute, timeout_sec) -def wait_for_zone_operation(compute, - project, +def wait_for_zone_operation(gcp, zone, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.zoneOperations().get(project=project, - zone=zone, - operation=operation).execute() + result = gcp.compute.zoneOperations().get( + project=gcp.project, zone=zone, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -517,13 +529,16 @@ def wait_for_zone_operation(compute, timeout_sec) -def wait_for_healthy_backends(compute, project_id, backend_service, - instance_group_url, timeout_sec): +def wait_for_healthy_backends(gcp, + backend_service, + instance_group, + timeout_sec=_WAIT_FOR_BACKEND_SEC): start_time = time.time() - config = {'group': instance_group_url} + config = {'group': instance_group.url} while time.time() - start_time <= timeout_sec: - result = compute.backendServices().getHealth( - project=project_id, backendService=backend_service, + result = gcp.compute.backendServices().getHealth( + project=gcp.project, + backendService=backend_service.name, body=config).execute() if 'healthStatus' in result: healthy = True @@ -538,15 +553,32 @@ def wait_for_healthy_backends(compute, project_id, backend_service, (timeout_sec, result)) -def start_xds_client(service_port): - cmd = CLIENT_CMD.format(service_host=SERVICE_HOST, - service_port=service_port, - stats_port=STATS_PORT, - qps=QPS) +def get_instance_names(gcp, instance_group): + instance_names = [] + result = gcp.compute.instanceGroups().listInstances( + project=gcp.project, + zone=instance_group.zone, + instanceGroup=instance_group.name, + body={ + 'instanceState': 'ALL' + }).execute() + if 'items' not in result: + return [] + for item in result['items']: + # listInstances() returns the full URL of the instance, which ends with + # the instance name. compute.instances().get() requires using the + # instance name (not the full URL) to look up instance details, so we + # just extract the name manually. + instance_name = item['instance'].split('/')[-1] + instance_names.append(instance_name) + return instance_names + + +def start_xds_client(cmd, service_port): bootstrap_path = None with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: bootstrap_file.write( - BOOTSTRAP_TEMPLATE.format( + _BOOTSTRAP_TEMPLATE.format( node_id=socket.gethostname()).encode('utf-8')) bootstrap_path = bootstrap_file.name @@ -557,6 +589,53 @@ def start_xds_client(service_port): return client_process +class InstanceGroup(object): + + def __init__(self, name, url, zone): + self.name = name + self.url = url + self.zone = zone + + +class GcpResource(object): + + def __init__(self, name, url): + self.name = name + self.url = url + + +class GcpState(object): + + def __init__(self, compute, project): + self.compute = compute + self.project = project + self.health_check = None + self.health_check_firewall_rule = None + self.backend_services = [] + self.url_map = None + self.target_http_proxy = None + self.global_forwarding_rule = None + self.service_port = None + self.instance_template = None + self.instance_groups = [] + + def clean_up(self): + if self.global_forwarding_rule: + delete_global_forwarding_rule(self) + if self.target_http_proxy: + delete_target_http_proxy(self) + if self.url_map: + delete_url_map(self) + delete_backend_services(self) + if self.health_check_firewall_rule: + delete_firewall(self) + if self.health_check: + delete_health_check(self) + delete_instance_groups(self) + if self.instance_template: + delete_instance_template(self) + + if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: compute = googleapiclient.discovery.build_from_document( @@ -564,107 +643,141 @@ if args.compute_discovery_document: else: compute = googleapiclient.discovery.build('compute', 'v1') -service_port = None client_process = None try: - instance_group_url = None + gcp = GcpState(compute, args.project_id) + health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix + firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix + backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix + alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix + url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix + service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix + target_http_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix + template_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix + same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix + secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix try: - health_check_url = create_health_check(compute, PROJECT_ID, - HEALTH_CHECK_NAME) - create_health_check_firewall_rule(compute, PROJECT_ID, - FIREWALL_RULE_NAME) - backend_service_url = create_backend_service(compute, PROJECT_ID, - BACKEND_SERVICE_NAME, - health_check_url) - url_map_url = create_url_map(compute, PROJECT_ID, URL_MAP_NAME, - backend_service_url, SERVICE_HOST) - target_http_proxy_url = create_target_http_proxy( - compute, PROJECT_ID, TARGET_PROXY_NAME, url_map_url) + create_health_check(gcp, health_check_name) + create_health_check_firewall_rule(gcp, firewall_name) + backend_service = add_backend_service(gcp, backend_service_name) + alternate_backend_service = add_backend_service( + gcp, alternate_backend_service_name) + create_url_map(gcp, url_map_name, gcp.backend_services[0], + service_host_name) + create_target_http_proxy(gcp, target_http_proxy_name) potential_service_ports = list(args.service_port_range) random.shuffle(potential_service_ports) for port in potential_service_ports: try: - create_global_forwarding_rule( - compute, - PROJECT_ID, - FORWARDING_RULE_NAME, - port, - target_http_proxy_url, - ) - service_port = port + create_global_forwarding_rule(gcp, forwarding_rule_name, port) + gcp.service_port = port break except googleapiclient.errors.HttpError as http_error: logger.warning( - 'Got error %s when attempting to create forwarding rule to port %d. Retrying with another port.' - % (http_error, port)) - if not service_port: + 'Got error %s when attempting to create forwarding rule to ' + 'port %d. Retrying with another port.' % (http_error, port)) + if not gcp.service_port: raise Exception('Failed to pick a service port in the range %s' % args.service_port_range) - template_url = create_instance_template(compute, PROJECT_ID, - TEMPLATE_NAME, service_port) - instance_group_url = create_instance_group(compute, PROJECT_ID, ZONE, - INSTANCE_GROUP_NAME, - INSTANCE_GROUP_SIZE, - service_port, template_url) - add_instances_to_backend(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url) + create_instance_template(gcp, template_name, args.network, + args.source_image) + instance_group = add_instance_group(gcp, args.zone, instance_group_name, + _INSTANCE_GROUP_SIZE) + patch_backend_instances(gcp, backend_service, [instance_group]) + same_zone_instance_group = add_instance_group( + gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) + secondary_zone_instance_group = add_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name, + _INSTANCE_GROUP_SIZE) except googleapiclient.errors.HttpError as http_error: - if TOLERATE_GCP_ERRORS: + if args.tolerate_gcp_errors: logger.warning( - 'Failed to set up backends: %s. Continuing since ' + 'Failed to set up backends: %s. Attempting to continue since ' '--tolerate_gcp_errors=true', http_error) + if not gcp.instance_template: + result = compute.instanceTemplates().get( + project=args.project_id, + instanceTemplate=template_name).execute() + gcp.instance_template = GcpResource(template_name, + result['selfLink']) + if not gcp.backend_services: + result = compute.backendServices().get( + project=args.project_id, + backendService=backend_service_name).execute() + backend_service = GcpResource(backend_service_name, + result['selfLink']) + gcp.backend_services.append(backend_service) + result = compute.backendServices().get( + project=args.project_id, + backendService=alternate_backend_service_name).execute() + alternate_backend_service = GcpResource( + alternate_backend_service_name, result['selfLink']) + gcp.backend_services.append(alternate_backend_service) + if not gcp.instance_groups: + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=instance_group_name).execute() + instance_group = InstanceGroup(instance_group_name, + result['selfLink'], args.zone) + gcp.instance_groups.append(instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=same_zone_instance_group_name).execute() + same_zone_instance_group = InstanceGroup( + same_zone_instance_group_name, result['selfLink'], + args.zone) + gcp.instance_groups.append(same_zone_instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.secondary_zone, + instanceGroup=secondary_zone_instance_group_name).execute() + secondary_zone_instance_group = InstanceGroup( + secondary_zone_instance_group_name, result['selfLink'], + args.secondary_zone) + gcp.instance_groups.append(secondary_zone_instance_group) + if not gcp.health_check: + result = compute.healthChecks().get( + project=args.project_id, + healthCheck=health_check_name).execute() + gcp.health_check = GcpResource(health_check_name, + result['selfLink']) + if not gcp.url_map: + result = compute.urlMaps().get(project=args.project_id, + urlMap=url_map_name).execute() + gcp.url_map = GcpResource(url_map_name, result['selfLink']) + if not gcp.service_port: + gcp.service_port = args.service_port_range[0] + logger.warning('Using arbitrary service port in range: %d' % + gcp.service_port) else: raise http_error - if instance_group_url is None: - # Look up the instance group URL, which may be unset if we are running - # with --tolerate_gcp_errors=true. - result = compute.instanceGroups().get( - project=PROJECT_ID, zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME).execute() - instance_group_url = result['selfLink'] - wait_for_healthy_backends(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url, WAIT_FOR_BACKEND_SEC) - - backends = [] - result = compute.instanceGroups().listInstances( - project=PROJECT_ID, - zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME, - body={ - 'instanceState': 'ALL' - }).execute() - for item in result['items']: - # listInstances() returns the full URL of the instance, which ends with - # the instance name. compute.instances().get() requires using the - # instance name (not the full URL) to look up instance details, so we - # just extract the name manually. - instance_name = item['instance'].split('/')[-1] - backends.append(instance_name) - - client_process = start_xds_client(service_port) - - if TEST_CASE == 'all': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'ping_pong': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'round_robin': - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) + wait_for_healthy_backends(gcp, backend_service, instance_group) + + cmd = args.client_cmd.format(service_host=service_host_name, + service_port=gcp.service_port, + stats_port=args.stats_port, + qps=args.qps) + client_process = start_xds_client(cmd, gcp.service_port) + + if args.test_case == 'all': + test_ping_pong(gcp, backend_service, instance_group) + test_round_robin(gcp, backend_service, instance_group) + elif args.test_case == 'ping_pong': + test_ping_pong(gcp, backend_service, instance_group) + elif args.test_case == 'round_robin': + test_round_robin(gcp, backend_service, instance_group) else: - logger.error('Unknown test case: %s', TEST_CASE) + logger.error('Unknown test case: %s', args.test_case) sys.exit(1) finally: if client_process: client_process.terminate() - if not KEEP_GCP_RESOURCES: + if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.') - delete_global_forwarding_rule(compute, PROJECT_ID, FORWARDING_RULE_NAME) - delete_target_http_proxy(compute, PROJECT_ID, TARGET_PROXY_NAME) - delete_url_map(compute, PROJECT_ID, URL_MAP_NAME) - delete_backend_service(compute, PROJECT_ID, BACKEND_SERVICE_NAME) - delete_firewall(compute, PROJECT_ID, FIREWALL_RULE_NAME) - delete_health_check(compute, PROJECT_ID, HEALTH_CHECK_NAME) - delete_instance_group(compute, PROJECT_ID, ZONE, INSTANCE_GROUP_NAME) - delete_instance_template(compute, PROJECT_ID, TEMPLATE_NAME) + gcp.clean_up() From ac6a79a108c16707b76cc109a0125b9dfba064ae Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sat, 29 Feb 2020 01:21:05 -0800 Subject: [PATCH 02/10] Add additional xds test cases --- tools/run_tests/run_xds_tests.py | 261 +++++++++++++++++++++++++++++-- 1 file changed, 252 insertions(+), 9 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 0a696655d7a..e36b7c8b748 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -53,13 +53,20 @@ argp.add_argument( default='', help='Optional suffix for all generated GCP resource names. Useful to ' 'ensure distinct names across test runs.') -argp.add_argument('--test_case', - default=None, - choices=[ - 'all', - 'ping_pong', - 'round_robin', - ]) +argp.add_argument( + '--test_case', + default=None, + choices=[ + 'all', + 'backends_restart', + 'change_backend_service', + 'new_instance_group_receives_traffic', + 'ping_pong', + 'remove_instance_group', + 'round_robin', + 'secondary_locality_gets_requests_on_primary_failure', + 'secondary_locality_gets_no_requests_on_partial_primary_failure', + ]) argp.add_argument( '--client_cmd', default=None, @@ -202,6 +209,78 @@ def wait_until_only_given_instances_receive_load(backends, raise Exception(error_msg) +def test_backends_restart(gcp, backend_service, instance_group): + instance_names = get_instance_names(gcp, instance_group) + num_instances = len(instance_names) + start_time = time.time() + wait_until_only_given_instances_receive_load(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + resize_instance_group(gcp, instance_group, 0) + wait_until_only_given_instances_receive_load([], + _WAIT_FOR_BACKEND_SEC, + allow_failures=True) + resize_instance_group(gcp, instance_group, num_instances) + wait_for_healthy_backends(gcp, backend_service, instance_group) + new_instance_names = get_instance_names(gcp, instance_group) + wait_until_only_given_instances_receive_load(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + original_distribution = list(stats.rpcs_by_peer.values()) + original_distribution.sort() + new_distribution = list(new_stats.rpcs_by_peer.values()) + new_distribution.sort() + if original_distribution != new_distribution: + raise Exception('Distributions do not match: ', stats, new_stats) + + +def test_change_backend_service(gcp, original_backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group): + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + patch_backend_instances(gcp, alternate_backend_service, + [same_zone_instance_group]) + wait_for_healthy_backends(gcp, original_backend_instances, instance_group) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + wait_until_only_given_instances_receive_load(original_backend_instances, + _WAIT_FOR_STATS_SEC) + try: + patch_url_map_backend_service(gcp, alternate_backend_service) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + if stats.num_failures > 0: + raise Exception('Unexpected failure: %s', stats) + wait_until_only_given_instances_receive_load( + alternate_backend_instances, _WAIT_FOR_STATS_SEC) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + + +def test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group): + instance_names = get_instance_names(gcp, instance_group) + wait_until_only_given_instances_receive_load(instance_names, + _WAIT_FOR_STATS_SEC) + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + combined_instance_names = instance_names + get_instance_names( + gcp, same_zone_instance_group) + wait_until_only_given_instances_receive_load(combined_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + + def test_ping_pong(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) @@ -222,6 +301,32 @@ def test_ping_pong(gcp, backend_service, instance_group): raise Exception(error_msg) +def test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group): + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + instance_names = get_instance_names(gcp, instance_group) + same_zone_instance_names = get_instance_names(gcp, + same_zone_instance_group) + wait_until_only_given_instances_receive_load( + instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) + patch_backend_instances(gcp, + backend_service, [same_zone_instance_group], + balancing_mode='RATE') + wait_until_only_given_instances_receive_load(same_zone_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + wait_until_only_given_instances_receive_load(instance_names, + _WAIT_FOR_BACKEND_SEC) + + def test_round_robin(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) @@ -242,6 +347,61 @@ def test_round_robin(gcp, backend_service, instance_group): 'for instance %s (%s)', threshold, instance, stats) +def test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_only_given_instances_receive_load(primary_instance_names, + _WAIT_FOR_STATS_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, original_size - 1) + remaining_instance_names = get_instance_names(gcp, + primary_instance_group) + wait_until_only_given_instances_receive_load(remaining_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + resize_instance_group(gcp, primary_instance_group, original_size) + + +def test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_only_given_instances_receive_load(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, 0) + wait_until_only_given_instances_receive_load(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + + resize_instance_group(gcp, primary_instance_group, original_size) + new_instance_names = get_instance_names(gcp, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_until_only_given_instances_receive_load(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + + def create_instance_template(gcp, name, network, source_image): config = { 'name': name, @@ -496,6 +656,58 @@ def delete_instance_template(gcp): logger.info('Delete failed: %s', http_error) +def patch_backend_instances(gcp, + backend_service, + instance_groups, + balancing_mode='UTILIZATION'): + config = { + 'backends': [{ + 'group': instance_group.url, + 'balancingMode': balancing_mode, + 'maxRate': 1 if balancing_mode == 'RATE' else None + } for instance_group in instance_groups], + } + result = gcp.compute.backendServices().patch( + project=gcp.project, backendService=backend_service.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + + +def resize_instance_group(gcp, instance_group, new_size, timeout_sec=120): + result = gcp.compute.instanceGroupManagers().resize( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name, + size=new_size).execute() + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=360) + start_time = time.time() + while True: + current_size = len(get_instance_names(gcp, instance_group)) + if current_size == new_size: + break + if time.time() - start_time > timeout_sec: + raise Exception('Failed to resize primary instance group') + time.sleep(1) + + +def patch_url_map_backend_service(gcp, backend_service): + config = { + 'defaultService': + backend_service.url, + 'pathMatchers': [{ + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, + }] + } + result = gcp.compute.urlMaps().patch(project=gcp.project, + urlMap=gcp.url_map.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + + def wait_for_global_operation(gcp, operation, timeout_sec=_WAIT_FOR_OPERATION_SEC): @@ -665,8 +877,7 @@ try: backend_service = add_backend_service(gcp, backend_service_name) alternate_backend_service = add_backend_service( gcp, alternate_backend_service_name) - create_url_map(gcp, url_map_name, gcp.backend_services[0], - service_host_name) + create_url_map(gcp, url_map_name, backend_service, service_host_name) create_target_http_proxy(gcp, target_http_proxy_name) potential_service_ports = list(args.service_port_range) random.shuffle(potential_service_ports) @@ -766,12 +977,44 @@ try: client_process = start_xds_client(cmd, gcp.service_port) if args.test_case == 'all': + test_backends_restart(gcp, backend_service, instance_group) + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) test_ping_pong(gcp, backend_service, instance_group) + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) test_round_robin(gcp, backend_service, instance_group) + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'backends_restart': + test_backends_restart(gcp, backend_service, instance_group) + elif args.test_case == 'change_backend_service': + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + elif args.test_case == 'new_instance_group_receives_traffic': + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) elif args.test_case == 'ping_pong': test_ping_pong(gcp, backend_service, instance_group) + elif args.test_case == 'remove_instance_group': + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) elif args.test_case == 'round_robin': test_round_robin(gcp, backend_service, instance_group) + elif args.test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'secondary_locality_gets_requests_on_primary_failure': + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) else: logger.error('Unknown test case: %s', args.test_case) sys.exit(1) From 4a594a101179e4604274892ebf2120f89f67eda8 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sat, 29 Feb 2020 02:42:07 -0800 Subject: [PATCH 03/10] fix typo --- tools/run_tests/run_xds_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index e36b7c8b748..1a407ea11f5 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -242,7 +242,7 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, same_zone_instance_group) patch_backend_instances(gcp, alternate_backend_service, [same_zone_instance_group]) - wait_for_healthy_backends(gcp, original_backend_instances, instance_group) + wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) wait_until_only_given_instances_receive_load(original_backend_instances, From 1cec654ac1abdf25b790a0bd44b5aa6c9259fd07 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 2 Mar 2020 15:47:56 -0800 Subject: [PATCH 04/10] wait longer for deletion --- tools/run_tests/run_xds_tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1a407ea11f5..044428add2a 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -637,11 +637,10 @@ def delete_instance_groups(gcp): project=gcp.project, zone=instance_group.zone, instanceGroupManager=instance_group.name).execute() - timeout_sec = 180 # Deleting an instance group can be slow wait_for_zone_operation(gcp, instance_group.zone, result['name'], - timeout_sec=timeout_sec) + timeout_sec=_WAIT_FOR_BACKEND_SEC) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) From c1677cd8a4001196a45fe5acdbfbfd02c419ca3b Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 2 Mar 2020 16:25:21 -0800 Subject: [PATCH 05/10] try-finally --- tools/run_tests/run_xds_tests.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 044428add2a..bdf61d37d23 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -216,11 +216,13 @@ def test_backends_restart(gcp, backend_service, instance_group): wait_until_only_given_instances_receive_load(instance_names, _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) - resize_instance_group(gcp, instance_group, 0) - wait_until_only_given_instances_receive_load([], - _WAIT_FOR_BACKEND_SEC, - allow_failures=True) - resize_instance_group(gcp, instance_group, num_instances) + try: + resize_instance_group(gcp, instance_group, 0) + wait_until_only_given_instances_receive_load([], + _WAIT_FOR_BACKEND_SEC, + allow_failures=True) + finally: + resize_instance_group(gcp, instance_group, num_instances) wait_for_healthy_backends(gcp, backend_service, instance_group) new_instance_names = get_instance_names(gcp, instance_group) wait_until_only_given_instances_receive_load(new_instance_names, From 0ddf5565e3ad9174e44057bad0267a518ec3083c Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 2 Mar 2020 18:20:44 -0800 Subject: [PATCH 06/10] yapf --- tools/run_tests/run_xds_tests.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index bdf61d37d23..613d38f84fc 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -350,8 +350,8 @@ def test_round_robin(gcp, backend_service, instance_group): def test_secondary_locality_gets_no_requests_on_partial_primary_failure( - gcp, backend_service, primary_instance_group, - secondary_zone_instance_group): + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): try: patch_backend_instances( gcp, backend_service, @@ -376,8 +376,8 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( def test_secondary_locality_gets_requests_on_primary_failure( - gcp, backend_service, primary_instance_group, - secondary_zone_instance_group): + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): try: patch_backend_instances( gcp, backend_service, From 033695da8b1683375267834362d51771766de21f Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 4 Mar 2020 12:16:30 -0800 Subject: [PATCH 07/10] reviewer comments --- tools/run_tests/run_xds_tests.py | 153 ++++++++++++++++--------------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 613d38f84fc..5ebf4d10886 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -55,7 +55,7 @@ argp.add_argument( 'ensure distinct names across test runs.') argp.add_argument( '--test_case', - default=None, + default='ping_pong', choices=[ 'all', 'backends_restart', @@ -64,8 +64,8 @@ argp.add_argument( 'ping_pong', 'remove_instance_group', 'round_robin', - 'secondary_locality_gets_requests_on_primary_failure', 'secondary_locality_gets_no_requests_on_partial_primary_failure', + 'secondary_locality_gets_requests_on_primary_failure', ]) argp.add_argument( '--client_cmd', @@ -184,13 +184,11 @@ def get_client_stats(num_rpcs, timeout_sec): raise Exception('GetClientStats RPC failed') -def wait_until_only_given_instances_receive_load(backends, - timeout_sec, - num_rpcs=100, - allow_failures=False): +def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, + allow_failures): start_time = time.time() error_msg = None - logger.debug('Waiting for %d sec until backends %s receive load' % + logger.debug('Waiting for %d sec until backends %s receive load' % (timeout_sec, backends)) while time.time() - start_time <= timeout_sec: error_msg = None @@ -209,31 +207,50 @@ def wait_until_only_given_instances_receive_load(backends, raise Exception(error_msg) +def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=True) + + +def wait_until_all_rpcs_go_to_given_backends(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=False) + + def test_backends_restart(gcp, backend_service, instance_group): instance_names = get_instance_names(gcp, instance_group) num_instances = len(instance_names) start_time = time.time() - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) try: resize_instance_group(gcp, instance_group, 0) - wait_until_only_given_instances_receive_load([], - _WAIT_FOR_BACKEND_SEC, - allow_failures=True) + wait_until_all_rpcs_go_to_given_backends_or_fail([], + _WAIT_FOR_BACKEND_SEC) finally: resize_instance_group(gcp, instance_group, num_instances) wait_for_healthy_backends(gcp, backend_service, instance_group) new_instance_names = get_instance_names(gcp, instance_group) - wait_until_only_given_instances_receive_load(new_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) original_distribution = list(stats.rpcs_by_peer.values()) original_distribution.sort() new_distribution = list(new_stats.rpcs_by_peer.values()) new_distribution.sort() - if original_distribution != new_distribution: - raise Exception('Distributions do not match: ', stats, new_stats) + error_threshold = 3 + for i in range(len(original_distribution)): + if abs(original_distribution[i] - new_distribution[i]) > threshold: + raise Exception('Distributions do not match: ', stats, new_stats) def test_change_backend_service(gcp, original_backend_service, instance_group, @@ -247,15 +264,15 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) - wait_until_only_given_instances_receive_load(original_backend_instances, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) try: patch_url_map_backend_service(gcp, alternate_backend_service) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) if stats.num_failures > 0: raise Exception('Unexpected failure: %s', stats) - wait_until_only_given_instances_receive_load( - alternate_backend_instances, _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_STATS_SEC) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_instances(gcp, alternate_backend_service, []) @@ -265,8 +282,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, instance_group, same_zone_instance_group): instance_names = get_instance_names(gcp, instance_group) - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) try: patch_backend_instances(gcp, backend_service, @@ -277,8 +294,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, same_zone_instance_group) combined_instance_names = instance_names + get_instance_names( gcp, same_zone_instance_group) - wait_until_only_given_instances_receive_load(combined_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(combined_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [instance_group]) @@ -286,20 +303,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, def test_ping_pong(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) - start_time = time.time() - error_msg = None - while time.time() - start_time <= _WAIT_FOR_STATS_SEC: - error_msg = None - stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) - rpcs_by_peer = stats.rpcs_by_peer - for instance in instance_names: - if instance not in rpcs_by_peer: - error_msg = 'Instance %s did not receive load' % instance - break - if not error_msg and len(rpcs_by_peer) > len(instance_names): - error_msg = 'Unexpected instance received load: %s' % rpcs_by_peer - if not error_msg: - return + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) raise Exception(error_msg) @@ -316,29 +321,28 @@ def test_remove_instance_group(gcp, backend_service, instance_group, instance_names = get_instance_names(gcp, instance_group) same_zone_instance_names = get_instance_names(gcp, same_zone_instance_group) - wait_until_only_given_instances_receive_load( + wait_until_all_rpcs_go_to_given_backends( instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) patch_backend_instances(gcp, backend_service, [same_zone_instance_group], balancing_mode='RATE') - wait_until_only_given_instances_receive_load(same_zone_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [instance_group]) - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) def test_round_robin(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) threshold = 1 - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] - total_requests_received = sum( - [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]) + total_requests_received = sum(requests_received) if total_requests_received != _NUM_TEST_RPCS: raise Exception('Unexpected RPC failures', stats) expected_requests = total_requests_received / len(instance_names) @@ -362,14 +366,14 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( primary_instance_names = get_instance_names(gcp, instance_group) secondary_instance_names = get_instance_names( gcp, secondary_zone_instance_group) - wait_until_only_given_instances_receive_load(primary_instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) original_size = len(primary_instance_names) resize_instance_group(gcp, primary_instance_group, original_size - 1) remaining_instance_names = get_instance_names(gcp, primary_instance_group) - wait_until_only_given_instances_receive_load(remaining_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) resize_instance_group(gcp, primary_instance_group, original_size) @@ -388,18 +392,18 @@ def test_secondary_locality_gets_requests_on_primary_failure( primary_instance_names = get_instance_names(gcp, instance_group) secondary_instance_names = get_instance_names( gcp, secondary_zone_instance_group) - wait_until_only_given_instances_receive_load(primary_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) original_size = len(primary_instance_names) resize_instance_group(gcp, primary_instance_group, 0) - wait_until_only_given_instances_receive_load(secondary_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) resize_instance_group(gcp, primary_instance_group, original_size) new_instance_names = get_instance_names(gcp, primary_instance_group) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) - wait_until_only_given_instances_receive_load(new_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) @@ -802,6 +806,23 @@ def start_xds_client(cmd, service_port): return client_process +def clean_up(gcp): + if gcp.global_forwarding_rule: + delete_global_forwarding_rule(gcp) + if gcp.target_http_proxy: + delete_target_http_proxy(gcp) + if gcp.url_map: + delete_url_map(gcp) + delete_backend_services(gcp) + if gcp.health_check_firewall_rule: + delete_firewall(gcp) + if gcp.health_check: + delete_health_check(gcp) + delete_instance_groups(gcp) + if gcp.instance_template: + delete_instance_template(gcp) + + class InstanceGroup(object): def __init__(self, name, url, zone): @@ -832,22 +853,6 @@ class GcpState(object): self.instance_template = None self.instance_groups = [] - def clean_up(self): - if self.global_forwarding_rule: - delete_global_forwarding_rule(self) - if self.target_http_proxy: - delete_target_http_proxy(self) - if self.url_map: - delete_url_map(self) - delete_backend_services(self) - if self.health_check_firewall_rule: - delete_firewall(self) - if self.health_check: - delete_health_check(self) - delete_instance_groups(self) - if self.instance_template: - delete_instance_template(self) - if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: @@ -1024,4 +1029,4 @@ finally: client_process.terminate() if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.') - gcp.clean_up() + clean_up(gcp) From 58bdf6995c12df30cd6f6faaf908bb3149fbcd49 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 4 Mar 2020 13:05:00 -0800 Subject: [PATCH 08/10] typo --- tools/run_tests/run_xds_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 5ebf4d10886..3904dd1f4f1 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -247,7 +247,7 @@ def test_backends_restart(gcp, backend_service, instance_group): original_distribution.sort() new_distribution = list(new_stats.rpcs_by_peer.values()) new_distribution.sort() - error_threshold = 3 + threshold = 3 for i in range(len(original_distribution)): if abs(original_distribution[i] - new_distribution[i]) > threshold: raise Exception('Distributions do not match: ', stats, new_stats) @@ -611,7 +611,7 @@ def delete_backend_services(gcp): try: result = gcp.compute.backendServices().delete( project=gcp.project, - backendService=gcp.backend_service.name).execute() + backendService=backend_service.name).execute() wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) From 8562a13fab0ef6e1be1be9c92087bdf49c64793c Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 4 Mar 2020 13:31:06 -0800 Subject: [PATCH 09/10] fix line --- tools/run_tests/run_xds_tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 3904dd1f4f1..1bf3d9a4cbc 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -305,7 +305,6 @@ def test_ping_pong(gcp, backend_service, instance_group): instance_names = get_instance_names(gcp, instance_group) wait_until_all_rpcs_go_to_given_backends(instance_names, _WAIT_FOR_STATS_SEC) - raise Exception(error_msg) def test_remove_instance_group(gcp, backend_service, instance_group, From 1816401abfbde542a6e644fa0234ef02ab639b6e Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 9 Mar 2020 10:40:37 -0700 Subject: [PATCH 10/10] remove unused param --- tools/run_tests/run_xds_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1bf3d9a4cbc..b5991d1d8a0 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -790,7 +790,7 @@ def get_instance_names(gcp, instance_group): return instance_names -def start_xds_client(cmd, service_port): +def start_xds_client(cmd): bootstrap_path = None with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: bootstrap_file.write( @@ -979,7 +979,7 @@ try: service_port=gcp.service_port, stats_port=args.stats_port, qps=args.qps) - client_process = start_xds_client(cmd, gcp.service_port) + client_process = start_xds_client(cmd) if args.test_case == 'all': test_backends_restart(gcp, backend_service, instance_group)