diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1b5a771474d..b5991d1d8a0 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -51,11 +51,22 @@ argp.add_argument('--project_id', help='GCP project id') argp.add_argument( '--gcp_suffix', default='', - help='Optional suffix for all generated GCP resource names. Useful to ensure ' - 'distinct names across test runs.') -argp.add_argument('--test_case', - default=None, - choices=['all', 'ping_pong', 'round_robin']) + help='Optional suffix for all generated GCP resource names. Useful to ' + 'ensure distinct names across test runs.') +argp.add_argument( + '--test_case', + default='ping_pong', + choices=[ + 'all', + 'backends_restart', + 'change_backend_service', + 'new_instance_group_receives_traffic', + 'ping_pong', + 'remove_instance_group', + 'round_robin', + 'secondary_locality_gets_no_requests_on_partial_primary_failure', + 'secondary_locality_gets_requests_on_primary_failure', + ]) argp.add_argument( '--client_cmd', default=None, @@ -63,12 +74,15 @@ argp.add_argument( '{service_host}, {service_port},{stats_port} and {qps} parameters using ' 'str.format(), and generate the GRPC_XDS_BOOTSTRAP file.') argp.add_argument('--zone', default='us-central1-a') +argp.add_argument('--secondary_zone', + default='us-west1-b', + help='Zone to use for secondary TD locality tests') argp.add_argument('--qps', default=10, help='Client QPS') argp.add_argument( '--wait_for_backend_sec', - default=900, - help='Time limit for waiting for created backend services to report healthy ' - 'when launching test suite') + default=600, + help='Time limit for waiting for created backend services to report ' + 'healthy when launching or updated GCP resources') argp.add_argument( '--keep_gcp_resources', default=False, @@ -81,13 +95,13 @@ argp.add_argument( default=None, type=str, help= - 'If provided, uses this file instead of retrieving via the GCP discovery API' -) + 'If provided, uses this file instead of retrieving via the GCP discovery ' + 'API') argp.add_argument('--network', default='global/networks/default', help='GCP network to use') argp.add_argument('--service_port_range', - default='8080:8180', + default='8080:8100', type=parse_port_range, help='Listening port for created gRPC backends. Specified as ' 'either a single int or as a range in the format min:max, in ' @@ -115,35 +129,18 @@ argp.add_argument( argp.add_argument('--verbose', help='verbose log output', default=False, - action="store_true") + action='store_true') args = argp.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) -PROJECT_ID = args.project_id -ZONE = args.zone -QPS = args.qps -TEST_CASE = args.test_case -CLIENT_CMD = args.client_cmd -WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec -TEMPLATE_NAME = 'test-template' + args.gcp_suffix -INSTANCE_GROUP_NAME = 'test-ig' + args.gcp_suffix -HEALTH_CHECK_NAME = 'test-hc' + args.gcp_suffix -FIREWALL_RULE_NAME = 'test-fw-rule' + args.gcp_suffix -BACKEND_SERVICE_NAME = 'test-backend-service' + args.gcp_suffix -URL_MAP_NAME = 'test-map' + args.gcp_suffix -SERVICE_HOST = 'grpc-test' + args.gcp_suffix -TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix -FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix -KEEP_GCP_RESOURCES = args.keep_gcp_resources -TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors -STATS_PORT = args.stats_port -INSTANCE_GROUP_SIZE = 2 -WAIT_FOR_OPERATION_SEC = 60 -NUM_TEST_RPCS = 10 * QPS -WAIT_FOR_STATS_SEC = 30 -BOOTSTRAP_TEMPLATE = """ +_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec +_WAIT_FOR_OPERATION_SEC = 60 +_INSTANCE_GROUP_SIZE = 2 +_NUM_TEST_RPCS = 10 * args.qps +_WAIT_FOR_STATS_SEC = 60 +_BOOTSTRAP_TEMPLATE = """ {{ "node": {{ "id": "{node_id}" @@ -158,10 +155,20 @@ BOOTSTRAP_TEMPLATE = """ ] }}] }}""" % args.xds_server +_PATH_MATCHER_NAME = 'path-matcher' +_BASE_TEMPLATE_NAME = 'test-template' +_BASE_INSTANCE_GROUP_NAME = 'test-ig' +_BASE_HEALTH_CHECK_NAME = 'test-hc' +_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' +_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' +_BASE_URL_MAP_NAME = 'test-map' +_BASE_SERVICE_HOST = 'grpc-test' +_BASE_TARGET_PROXY_NAME = 'test-target-proxy' +_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' def get_client_stats(num_rpcs, timeout_sec): - with grpc.insecure_channel('localhost:%d' % STATS_PORT) as channel: + with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel: stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) request = messages_pb2.LoadBalancerStatsRequest() request.num_rpcs = num_rpcs @@ -177,12 +184,15 @@ def get_client_stats(num_rpcs, timeout_sec): raise Exception('GetClientStats RPC failed') -def wait_until_only_given_backends_receive_load(backends, timeout_sec): +def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, + allow_failures): start_time = time.time() error_msg = None + logger.debug('Waiting for %d sec until backends %s receive load' % + (timeout_sec, backends)) while time.time() - start_time <= timeout_sec: error_msg = None - stats = get_client_stats(max(len(backends), 1), timeout_sec) + stats = get_client_stats(num_rpcs, timeout_sec) rpcs_by_peer = stats.rpcs_by_peer for backend in backends: if backend not in rpcs_by_peer: @@ -190,52 +200,219 @@ def wait_until_only_given_backends_receive_load(backends, timeout_sec): break if not error_msg and len(rpcs_by_peer) > len(backends): error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer + if not allow_failures and stats.num_failures > 0: + error_msg = '%d RPCs failed' % stats.num_failures if not error_msg: return raise Exception(error_msg) -def test_ping_pong(backends, num_rpcs, stats_timeout_sec): - start_time = time.time() - error_msg = None - while time.time() - start_time <= stats_timeout_sec: - error_msg = None - stats = get_client_stats(num_rpcs, stats_timeout_sec) - rpcs_by_peer = stats.rpcs_by_peer - for backend in backends: - if backend not in rpcs_by_peer: - error_msg = 'Backend %s did not receive load' % backend - break - if not error_msg and len(rpcs_by_peer) > len(backends): - error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer - if not error_msg: - return - raise Exception(error_msg) +def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=True) + +def wait_until_all_rpcs_go_to_given_backends(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=False) -def test_round_robin(backends, num_rpcs, stats_timeout_sec): + +def test_backends_restart(gcp, backend_service, instance_group): + instance_names = get_instance_names(gcp, instance_group) + num_instances = len(instance_names) + start_time = time.time() + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + try: + resize_instance_group(gcp, instance_group, 0) + wait_until_all_rpcs_go_to_given_backends_or_fail([], + _WAIT_FOR_BACKEND_SEC) + finally: + resize_instance_group(gcp, instance_group, num_instances) + wait_for_healthy_backends(gcp, backend_service, instance_group) + new_instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + original_distribution = list(stats.rpcs_by_peer.values()) + original_distribution.sort() + new_distribution = list(new_stats.rpcs_by_peer.values()) + new_distribution.sort() + threshold = 3 + for i in range(len(original_distribution)): + if abs(original_distribution[i] - new_distribution[i]) > threshold: + raise Exception('Distributions do not match: ', stats, new_stats) + + +def test_change_backend_service(gcp, original_backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group): + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + patch_backend_instances(gcp, alternate_backend_service, + [same_zone_instance_group]) + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + try: + patch_url_map_backend_service(gcp, alternate_backend_service) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + if stats.num_failures > 0: + raise Exception('Unexpected failure: %s', stats) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + + +def test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group): + instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + combined_instance_names = instance_names + get_instance_names( + gcp, same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(combined_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + + +def test_ping_pong(gcp, backend_service, instance_group): + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + + +def test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group): + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + instance_names = get_instance_names(gcp, instance_group) + same_zone_instance_names = get_instance_names(gcp, + same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends( + instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) + patch_backend_instances(gcp, + backend_service, [same_zone_instance_group], + balancing_mode='RATE') + wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) + + +def test_round_robin(gcp, backend_service, instance_group): + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) threshold = 1 - wait_until_only_given_backends_receive_load(backends, stats_timeout_sec) - stats = get_client_stats(num_rpcs, stats_timeout_sec) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] - total_requests_received = sum( - [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]) - if total_requests_received != num_rpcs: + total_requests_received = sum(requests_received) + if total_requests_received != _NUM_TEST_RPCS: raise Exception('Unexpected RPC failures', stats) - expected_requests = total_requests_received / len(backends) - for backend in backends: - if abs(stats.rpcs_by_peer[backend] - expected_requests) > threshold: + expected_requests = total_requests_received / len(instance_names) + for instance in instance_names: + if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold: raise Exception( - 'RPC peer distribution differs from expected by more than %d for backend %s (%s)', - threshold, backend, stats) + 'RPC peer distribution differs from expected by more than %d ' + 'for instance %s (%s)', threshold, instance, stats) -def create_instance_template(compute, project, name, grpc_port): +def test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, original_size - 1) + remaining_instance_names = get_instance_names(gcp, + primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + resize_instance_group(gcp, primary_instance_group, original_size) + + +def test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, 0) + wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + + resize_instance_group(gcp, primary_instance_group, original_size) + new_instance_names = get_instance_names(gcp, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + + +def create_instance_template(gcp, name, network, source_image): config = { 'name': name, 'properties': { 'tags': { - 'items': ['grpc-allow-healthcheck'] + 'items': ['allow-health-checks'] }, 'machineType': 'e2-standard-2', 'serviceAccounts': [{ @@ -246,12 +423,12 @@ def create_instance_template(compute, project, name, grpc_port): 'accessConfigs': [{ 'type': 'ONE_TO_ONE_NAT' }], - 'network': args.network + 'network': network }], 'disks': [{ 'boot': True, 'initializeParams': { - 'sourceImage': args.source_image + 'sourceImage': source_image } }], 'metadata': { @@ -260,7 +437,6 @@ def create_instance_template(compute, project, name, grpc_port): 'startup-script', 'value': """#!/bin/bash - sudo apt update sudo apt install -y git default-jdk mkdir java_server @@ -271,40 +447,43 @@ pushd interop-testing ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true nohup build/install/grpc-interop-testing/bin/xds-test-server --port=%d 1>/dev/null &""" - % grpc_port + % gcp.service_port }] } } } - result = compute.instanceTemplates().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.instanceTemplates().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.instance_template = GcpResource(config['name'], result['targetLink']) -def create_instance_group(compute, project, zone, name, size, grpc_port, - template_url): +def add_instance_group(gcp, zone, name, size): config = { 'name': name, - 'instanceTemplate': template_url, + 'instanceTemplate': gcp.instance_template.url, 'targetSize': size, 'namedPorts': [{ 'name': 'grpc', - 'port': grpc_port + 'port': gcp.service_port }] } - result = compute.instanceGroupManagers().insert(project=project, - zone=zone, - body=config).execute() - wait_for_zone_operation(compute, project, zone, result['name']) - result = compute.instanceGroupManagers().get( - project=PROJECT_ID, zone=ZONE, instanceGroupManager=name).execute() - return result['instanceGroup'] + result = gcp.compute.instanceGroupManagers().insert(project=gcp.project, + zone=zone, + body=config).execute() + wait_for_zone_operation(gcp, zone, result['name']) + result = gcp.compute.instanceGroupManagers().get( + project=gcp.project, zone=zone, + instanceGroupManager=config['name']).execute() + instance_group = InstanceGroup(config['name'], result['instanceGroup'], + zone) + gcp.instance_groups.append(instance_group) + return instance_group -def create_health_check(compute, project, name): +def create_health_check(gcp, name): config = { 'name': name, 'type': 'TCP', @@ -312,13 +491,13 @@ def create_health_check(compute, project, name): 'portName': 'grpc' } } - result = compute.healthChecks().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.healthChecks().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check = GcpResource(config['name'], result['targetLink']) -def create_health_check_firewall_rule(compute, project, name): +def create_health_check_firewall_rule(gcp, name): config = { 'name': name, 'direction': 'INGRESS', @@ -326,169 +505,220 @@ def create_health_check_firewall_rule(compute, project, name): 'IPProtocol': 'tcp' }], 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], - 'targetTags': ['grpc-allow-healthcheck'], + 'targetTags': ['allow-health-checks'], } - result = compute.firewalls().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.firewalls().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check_firewall_rule = GcpResource(config['name'], + result['targetLink']) -def create_backend_service(compute, project, name, health_check): +def add_backend_service(gcp, name): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'healthChecks': [health_check], + 'healthChecks': [gcp.health_check.url], 'portName': 'grpc', 'protocol': 'HTTP2' } - result = compute.backendServices().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.backendServices().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + backend_service = GcpResource(config['name'], result['targetLink']) + gcp.backend_services.append(backend_service) + return backend_service -def create_url_map(compute, project, name, backend_service_url, host_name): - path_matcher_name = 'path-matcher' +def create_url_map(gcp, name, backend_service, host_name): config = { 'name': name, - 'defaultService': backend_service_url, + 'defaultService': backend_service.url, 'pathMatchers': [{ - 'name': path_matcher_name, - 'defaultService': backend_service_url, + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, }], 'hostRules': [{ 'hosts': [host_name], - 'pathMatcher': path_matcher_name + 'pathMatcher': _PATH_MATCHER_NAME }] } - result = compute.urlMaps().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.urlMaps().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.url_map = GcpResource(config['name'], result['targetLink']) -def create_target_http_proxy(compute, project, name, url_map_url): +def create_target_http_proxy(gcp, name): config = { 'name': name, - 'url_map': url_map_url, + 'url_map': gcp.url_map.url, } - result = compute.targetHttpProxies().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + result = gcp.compute.targetHttpProxies().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.target_http_proxy = GcpResource(config['name'], result['targetLink']) -def create_global_forwarding_rule(compute, project, name, grpc_port, - target_http_proxy_url): +def create_global_forwarding_rule(gcp, name, port): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'portRange': str(grpc_port), + 'portRange': str(port), 'IPAddress': '0.0.0.0', 'network': args.network, - 'target': target_http_proxy_url, + 'target': gcp.target_http_proxy.url, } - result = compute.globalForwardingRules().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.globalForwardingRules().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.global_forwarding_rule = GcpResource(config['name'], + result['targetLink']) -def delete_global_forwarding_rule(compute, project, forwarding_rule): +def delete_global_forwarding_rule(gcp): try: - result = compute.globalForwardingRules().delete( - project=project, forwardingRule=forwarding_rule).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.globalForwardingRules().delete( + project=gcp.project, + forwardingRule=gcp.global_forwarding_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_target_http_proxy(compute, project, target_http_proxy): +def delete_target_http_proxy(gcp): try: - result = compute.targetHttpProxies().delete( - project=project, targetHttpProxy=target_http_proxy).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.targetHttpProxies().delete( + project=gcp.project, + targetHttpProxy=gcp.target_http_proxy.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_url_map(compute, project, url_map): +def delete_url_map(gcp): try: - result = compute.urlMaps().delete(project=project, - urlMap=url_map).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.urlMaps().delete( + project=gcp.project, urlMap=gcp.url_map.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_backend_service(compute, project, backend_service): - try: - result = compute.backendServices().delete( - project=project, backendService=backend_service).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) +def delete_backend_services(gcp): + for backend_service in gcp.backend_services: + try: + result = gcp.compute.backendServices().delete( + project=gcp.project, + backendService=backend_service.name).execute() + wait_for_global_operation(gcp, result['name']) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) -def delete_firewall(compute, project, firewall_rule): +def delete_firewall(gcp): try: - result = compute.firewalls().delete(project=project, - firewall=firewall_rule).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.firewalls().delete( + project=gcp.project, + firewall=gcp.health_check_firewall_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_health_check(compute, project, health_check): +def delete_health_check(gcp): try: - result = compute.healthChecks().delete( - project=project, healthCheck=health_check).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.healthChecks().delete( + project=gcp.project, healthCheck=gcp.health_check.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_group(compute, project, zone, instance_group): +def delete_instance_groups(gcp): + for instance_group in gcp.instance_groups: + try: + result = gcp.compute.instanceGroupManagers().delete( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name).execute() + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=_WAIT_FOR_BACKEND_SEC) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) + + +def delete_instance_template(gcp): try: - result = compute.instanceGroupManagers().delete( - project=project, zone=zone, - instanceGroupManager=instance_group).execute() - timeout_sec = 180 # Deleting an instance group can be slow - wait_for_zone_operation(compute, - project, - ZONE, - result['name'], - timeout_sec=timeout_sec) + result = gcp.compute.instanceTemplates().delete( + project=gcp.project, + instanceTemplate=gcp.instance_template.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_template(compute, project, instance_template): - try: - result = compute.instanceTemplates().delete( - project=project, instanceTemplate=instance_template).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) +def patch_backend_instances(gcp, + backend_service, + instance_groups, + balancing_mode='UTILIZATION'): + config = { + 'backends': [{ + 'group': instance_group.url, + 'balancingMode': balancing_mode, + 'maxRate': 1 if balancing_mode == 'RATE' else None + } for instance_group in instance_groups], + } + result = gcp.compute.backendServices().patch( + project=gcp.project, backendService=backend_service.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + + +def resize_instance_group(gcp, instance_group, new_size, timeout_sec=120): + result = gcp.compute.instanceGroupManagers().resize( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name, + size=new_size).execute() + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=360) + start_time = time.time() + while True: + current_size = len(get_instance_names(gcp, instance_group)) + if current_size == new_size: + break + if time.time() - start_time > timeout_sec: + raise Exception('Failed to resize primary instance group') + time.sleep(1) -def add_instances_to_backend(compute, project, backend_service, instance_group): +def patch_url_map_backend_service(gcp, backend_service): config = { - 'backends': [{ - 'group': instance_group, - }], + 'defaultService': + backend_service.url, + 'pathMatchers': [{ + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, + }] } - result = compute.backendServices().patch(project=project, - backendService=backend_service, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.urlMaps().patch(project=gcp.project, + urlMap=gcp.url_map.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) -def wait_for_global_operation(compute, - project, +def wait_for_global_operation(gcp, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.globalOperations().get(project=project, - operation=operation).execute() + result = gcp.compute.globalOperations().get( + project=gcp.project, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -498,16 +728,14 @@ def wait_for_global_operation(compute, timeout_sec) -def wait_for_zone_operation(compute, - project, +def wait_for_zone_operation(gcp, zone, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.zoneOperations().get(project=project, - zone=zone, - operation=operation).execute() + result = gcp.compute.zoneOperations().get( + project=gcp.project, zone=zone, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -517,13 +745,16 @@ def wait_for_zone_operation(compute, timeout_sec) -def wait_for_healthy_backends(compute, project_id, backend_service, - instance_group_url, timeout_sec): +def wait_for_healthy_backends(gcp, + backend_service, + instance_group, + timeout_sec=_WAIT_FOR_BACKEND_SEC): start_time = time.time() - config = {'group': instance_group_url} + config = {'group': instance_group.url} while time.time() - start_time <= timeout_sec: - result = compute.backendServices().getHealth( - project=project_id, backendService=backend_service, + result = gcp.compute.backendServices().getHealth( + project=gcp.project, + backendService=backend_service.name, body=config).execute() if 'healthStatus' in result: healthy = True @@ -538,15 +769,32 @@ def wait_for_healthy_backends(compute, project_id, backend_service, (timeout_sec, result)) -def start_xds_client(service_port): - cmd = CLIENT_CMD.format(service_host=SERVICE_HOST, - service_port=service_port, - stats_port=STATS_PORT, - qps=QPS) +def get_instance_names(gcp, instance_group): + instance_names = [] + result = gcp.compute.instanceGroups().listInstances( + project=gcp.project, + zone=instance_group.zone, + instanceGroup=instance_group.name, + body={ + 'instanceState': 'ALL' + }).execute() + if 'items' not in result: + return [] + for item in result['items']: + # listInstances() returns the full URL of the instance, which ends with + # the instance name. compute.instances().get() requires using the + # instance name (not the full URL) to look up instance details, so we + # just extract the name manually. + instance_name = item['instance'].split('/')[-1] + instance_names.append(instance_name) + return instance_names + + +def start_xds_client(cmd): bootstrap_path = None with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: bootstrap_file.write( - BOOTSTRAP_TEMPLATE.format( + _BOOTSTRAP_TEMPLATE.format( node_id=socket.gethostname()).encode('utf-8')) bootstrap_path = bootstrap_file.name @@ -557,6 +805,54 @@ def start_xds_client(service_port): return client_process +def clean_up(gcp): + if gcp.global_forwarding_rule: + delete_global_forwarding_rule(gcp) + if gcp.target_http_proxy: + delete_target_http_proxy(gcp) + if gcp.url_map: + delete_url_map(gcp) + delete_backend_services(gcp) + if gcp.health_check_firewall_rule: + delete_firewall(gcp) + if gcp.health_check: + delete_health_check(gcp) + delete_instance_groups(gcp) + if gcp.instance_template: + delete_instance_template(gcp) + + +class InstanceGroup(object): + + def __init__(self, name, url, zone): + self.name = name + self.url = url + self.zone = zone + + +class GcpResource(object): + + def __init__(self, name, url): + self.name = name + self.url = url + + +class GcpState(object): + + def __init__(self, compute, project): + self.compute = compute + self.project = project + self.health_check = None + self.health_check_firewall_rule = None + self.backend_services = [] + self.url_map = None + self.target_http_proxy = None + self.global_forwarding_rule = None + self.service_port = None + self.instance_template = None + self.instance_groups = [] + + if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: compute = googleapiclient.discovery.build_from_document( @@ -564,107 +860,172 @@ if args.compute_discovery_document: else: compute = googleapiclient.discovery.build('compute', 'v1') -service_port = None client_process = None try: - instance_group_url = None + gcp = GcpState(compute, args.project_id) + health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix + firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix + backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix + alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix + url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix + service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix + target_http_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix + template_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix + same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix + secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix try: - health_check_url = create_health_check(compute, PROJECT_ID, - HEALTH_CHECK_NAME) - create_health_check_firewall_rule(compute, PROJECT_ID, - FIREWALL_RULE_NAME) - backend_service_url = create_backend_service(compute, PROJECT_ID, - BACKEND_SERVICE_NAME, - health_check_url) - url_map_url = create_url_map(compute, PROJECT_ID, URL_MAP_NAME, - backend_service_url, SERVICE_HOST) - target_http_proxy_url = create_target_http_proxy( - compute, PROJECT_ID, TARGET_PROXY_NAME, url_map_url) + create_health_check(gcp, health_check_name) + create_health_check_firewall_rule(gcp, firewall_name) + backend_service = add_backend_service(gcp, backend_service_name) + alternate_backend_service = add_backend_service( + gcp, alternate_backend_service_name) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_http_proxy(gcp, target_http_proxy_name) potential_service_ports = list(args.service_port_range) random.shuffle(potential_service_ports) for port in potential_service_ports: try: - create_global_forwarding_rule( - compute, - PROJECT_ID, - FORWARDING_RULE_NAME, - port, - target_http_proxy_url, - ) - service_port = port + create_global_forwarding_rule(gcp, forwarding_rule_name, port) + gcp.service_port = port break except googleapiclient.errors.HttpError as http_error: logger.warning( - 'Got error %s when attempting to create forwarding rule to port %d. Retrying with another port.' - % (http_error, port)) - if not service_port: + 'Got error %s when attempting to create forwarding rule to ' + 'port %d. Retrying with another port.' % (http_error, port)) + if not gcp.service_port: raise Exception('Failed to pick a service port in the range %s' % args.service_port_range) - template_url = create_instance_template(compute, PROJECT_ID, - TEMPLATE_NAME, service_port) - instance_group_url = create_instance_group(compute, PROJECT_ID, ZONE, - INSTANCE_GROUP_NAME, - INSTANCE_GROUP_SIZE, - service_port, template_url) - add_instances_to_backend(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url) + create_instance_template(gcp, template_name, args.network, + args.source_image) + instance_group = add_instance_group(gcp, args.zone, instance_group_name, + _INSTANCE_GROUP_SIZE) + patch_backend_instances(gcp, backend_service, [instance_group]) + same_zone_instance_group = add_instance_group( + gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) + secondary_zone_instance_group = add_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name, + _INSTANCE_GROUP_SIZE) except googleapiclient.errors.HttpError as http_error: - if TOLERATE_GCP_ERRORS: + if args.tolerate_gcp_errors: logger.warning( - 'Failed to set up backends: %s. Continuing since ' + 'Failed to set up backends: %s. Attempting to continue since ' '--tolerate_gcp_errors=true', http_error) + if not gcp.instance_template: + result = compute.instanceTemplates().get( + project=args.project_id, + instanceTemplate=template_name).execute() + gcp.instance_template = GcpResource(template_name, + result['selfLink']) + if not gcp.backend_services: + result = compute.backendServices().get( + project=args.project_id, + backendService=backend_service_name).execute() + backend_service = GcpResource(backend_service_name, + result['selfLink']) + gcp.backend_services.append(backend_service) + result = compute.backendServices().get( + project=args.project_id, + backendService=alternate_backend_service_name).execute() + alternate_backend_service = GcpResource( + alternate_backend_service_name, result['selfLink']) + gcp.backend_services.append(alternate_backend_service) + if not gcp.instance_groups: + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=instance_group_name).execute() + instance_group = InstanceGroup(instance_group_name, + result['selfLink'], args.zone) + gcp.instance_groups.append(instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=same_zone_instance_group_name).execute() + same_zone_instance_group = InstanceGroup( + same_zone_instance_group_name, result['selfLink'], + args.zone) + gcp.instance_groups.append(same_zone_instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.secondary_zone, + instanceGroup=secondary_zone_instance_group_name).execute() + secondary_zone_instance_group = InstanceGroup( + secondary_zone_instance_group_name, result['selfLink'], + args.secondary_zone) + gcp.instance_groups.append(secondary_zone_instance_group) + if not gcp.health_check: + result = compute.healthChecks().get( + project=args.project_id, + healthCheck=health_check_name).execute() + gcp.health_check = GcpResource(health_check_name, + result['selfLink']) + if not gcp.url_map: + result = compute.urlMaps().get(project=args.project_id, + urlMap=url_map_name).execute() + gcp.url_map = GcpResource(url_map_name, result['selfLink']) + if not gcp.service_port: + gcp.service_port = args.service_port_range[0] + logger.warning('Using arbitrary service port in range: %d' % + gcp.service_port) else: raise http_error - if instance_group_url is None: - # Look up the instance group URL, which may be unset if we are running - # with --tolerate_gcp_errors=true. - result = compute.instanceGroups().get( - project=PROJECT_ID, zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME).execute() - instance_group_url = result['selfLink'] - wait_for_healthy_backends(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url, WAIT_FOR_BACKEND_SEC) - - backends = [] - result = compute.instanceGroups().listInstances( - project=PROJECT_ID, - zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME, - body={ - 'instanceState': 'ALL' - }).execute() - for item in result['items']: - # listInstances() returns the full URL of the instance, which ends with - # the instance name. compute.instances().get() requires using the - # instance name (not the full URL) to look up instance details, so we - # just extract the name manually. - instance_name = item['instance'].split('/')[-1] - backends.append(instance_name) - - client_process = start_xds_client(service_port) - - if TEST_CASE == 'all': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'ping_pong': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'round_robin': - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) + wait_for_healthy_backends(gcp, backend_service, instance_group) + + cmd = args.client_cmd.format(service_host=service_host_name, + service_port=gcp.service_port, + stats_port=args.stats_port, + qps=args.qps) + client_process = start_xds_client(cmd) + + if args.test_case == 'all': + test_backends_restart(gcp, backend_service, instance_group) + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) + test_ping_pong(gcp, backend_service, instance_group) + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) + test_round_robin(gcp, backend_service, instance_group) + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'backends_restart': + test_backends_restart(gcp, backend_service, instance_group) + elif args.test_case == 'change_backend_service': + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + elif args.test_case == 'new_instance_group_receives_traffic': + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) + elif args.test_case == 'ping_pong': + test_ping_pong(gcp, backend_service, instance_group) + elif args.test_case == 'remove_instance_group': + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) + elif args.test_case == 'round_robin': + test_round_robin(gcp, backend_service, instance_group) + elif args.test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'secondary_locality_gets_requests_on_primary_failure': + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) else: - logger.error('Unknown test case: %s', TEST_CASE) + logger.error('Unknown test case: %s', args.test_case) sys.exit(1) finally: if client_process: client_process.terminate() - if not KEEP_GCP_RESOURCES: + if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.') - delete_global_forwarding_rule(compute, PROJECT_ID, FORWARDING_RULE_NAME) - delete_target_http_proxy(compute, PROJECT_ID, TARGET_PROXY_NAME) - delete_url_map(compute, PROJECT_ID, URL_MAP_NAME) - delete_backend_service(compute, PROJECT_ID, BACKEND_SERVICE_NAME) - delete_firewall(compute, PROJECT_ID, FIREWALL_RULE_NAME) - delete_health_check(compute, PROJECT_ID, HEALTH_CHECK_NAME) - delete_instance_group(compute, PROJECT_ID, ZONE, INSTANCE_GROUP_NAME) - delete_instance_template(compute, PROJECT_ID, TEMPLATE_NAME) + clean_up(gcp)