diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 613d38f84fc..5ebf4d10886 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -55,7 +55,7 @@ argp.add_argument( 'ensure distinct names across test runs.') argp.add_argument( '--test_case', - default=None, + default='ping_pong', choices=[ 'all', 'backends_restart', @@ -64,8 +64,8 @@ argp.add_argument( 'ping_pong', 'remove_instance_group', 'round_robin', - 'secondary_locality_gets_requests_on_primary_failure', 'secondary_locality_gets_no_requests_on_partial_primary_failure', + 'secondary_locality_gets_requests_on_primary_failure', ]) argp.add_argument( '--client_cmd', @@ -184,13 +184,11 @@ def get_client_stats(num_rpcs, timeout_sec): raise Exception('GetClientStats RPC failed') -def wait_until_only_given_instances_receive_load(backends, - timeout_sec, - num_rpcs=100, - allow_failures=False): +def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, + allow_failures): start_time = time.time() error_msg = None - logger.debug('Waiting for %d sec until backends %s receive load' % + logger.debug('Waiting for %d sec until backends %s receive load' % (timeout_sec, backends)) while time.time() - start_time <= timeout_sec: error_msg = None @@ -209,31 +207,50 @@ def wait_until_only_given_instances_receive_load(backends, raise Exception(error_msg) +def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=True) + + +def wait_until_all_rpcs_go_to_given_backends(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=False) + + def test_backends_restart(gcp, backend_service, instance_group): instance_names = get_instance_names(gcp, instance_group) num_instances = len(instance_names) start_time = time.time() - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) try: resize_instance_group(gcp, instance_group, 0) - wait_until_only_given_instances_receive_load([], - _WAIT_FOR_BACKEND_SEC, - allow_failures=True) + wait_until_all_rpcs_go_to_given_backends_or_fail([], + _WAIT_FOR_BACKEND_SEC) finally: resize_instance_group(gcp, instance_group, num_instances) wait_for_healthy_backends(gcp, backend_service, instance_group) new_instance_names = get_instance_names(gcp, instance_group) - wait_until_only_given_instances_receive_load(new_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) original_distribution = list(stats.rpcs_by_peer.values()) original_distribution.sort() new_distribution = list(new_stats.rpcs_by_peer.values()) new_distribution.sort() - if original_distribution != new_distribution: - raise Exception('Distributions do not match: ', stats, new_stats) + error_threshold = 3 + for i in range(len(original_distribution)): + if abs(original_distribution[i] - new_distribution[i]) > threshold: + raise Exception('Distributions do not match: ', stats, new_stats) def test_change_backend_service(gcp, original_backend_service, instance_group, @@ -247,15 +264,15 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, alternate_backend_service, same_zone_instance_group) - wait_until_only_given_instances_receive_load(original_backend_instances, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) try: patch_url_map_backend_service(gcp, alternate_backend_service) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) if stats.num_failures > 0: raise Exception('Unexpected failure: %s', stats) - wait_until_only_given_instances_receive_load( - alternate_backend_instances, _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_STATS_SEC) finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_instances(gcp, alternate_backend_service, []) @@ -265,8 +282,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, instance_group, same_zone_instance_group): instance_names = get_instance_names(gcp, instance_group) - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) try: patch_backend_instances(gcp, backend_service, @@ -277,8 +294,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, same_zone_instance_group) combined_instance_names = instance_names + get_instance_names( gcp, same_zone_instance_group) - wait_until_only_given_instances_receive_load(combined_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(combined_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [instance_group]) @@ -286,20 +303,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service, def test_ping_pong(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) - start_time = time.time() - error_msg = None - while time.time() - start_time <= _WAIT_FOR_STATS_SEC: - error_msg = None - stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) - rpcs_by_peer = stats.rpcs_by_peer - for instance in instance_names: - if instance not in rpcs_by_peer: - error_msg = 'Instance %s did not receive load' % instance - break - if not error_msg and len(rpcs_by_peer) > len(instance_names): - error_msg = 'Unexpected instance received load: %s' % rpcs_by_peer - if not error_msg: - return + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) raise Exception(error_msg) @@ -316,29 +321,28 @@ def test_remove_instance_group(gcp, backend_service, instance_group, instance_names = get_instance_names(gcp, instance_group) same_zone_instance_names = get_instance_names(gcp, same_zone_instance_group) - wait_until_only_given_instances_receive_load( + wait_until_all_rpcs_go_to_given_backends( instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) patch_backend_instances(gcp, backend_service, [same_zone_instance_group], balancing_mode='RATE') - wait_until_only_given_instances_receive_load(same_zone_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [instance_group]) - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) def test_round_robin(gcp, backend_service, instance_group): wait_for_healthy_backends(gcp, backend_service, instance_group) instance_names = get_instance_names(gcp, instance_group) threshold = 1 - wait_until_only_given_instances_receive_load(instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] - total_requests_received = sum( - [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]) + total_requests_received = sum(requests_received) if total_requests_received != _NUM_TEST_RPCS: raise Exception('Unexpected RPC failures', stats) expected_requests = total_requests_received / len(instance_names) @@ -362,14 +366,14 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure( primary_instance_names = get_instance_names(gcp, instance_group) secondary_instance_names = get_instance_names( gcp, secondary_zone_instance_group) - wait_until_only_given_instances_receive_load(primary_instance_names, - _WAIT_FOR_STATS_SEC) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) original_size = len(primary_instance_names) resize_instance_group(gcp, primary_instance_group, original_size - 1) remaining_instance_names = get_instance_names(gcp, primary_instance_group) - wait_until_only_given_instances_receive_load(remaining_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) resize_instance_group(gcp, primary_instance_group, original_size) @@ -388,18 +392,18 @@ def test_secondary_locality_gets_requests_on_primary_failure( primary_instance_names = get_instance_names(gcp, instance_group) secondary_instance_names = get_instance_names( gcp, secondary_zone_instance_group) - wait_until_only_given_instances_receive_load(primary_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) original_size = len(primary_instance_names) resize_instance_group(gcp, primary_instance_group, 0) - wait_until_only_given_instances_receive_load(secondary_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) resize_instance_group(gcp, primary_instance_group, original_size) new_instance_names = get_instance_names(gcp, primary_instance_group) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) - wait_until_only_given_instances_receive_load(new_instance_names, - _WAIT_FOR_BACKEND_SEC) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) @@ -802,6 +806,23 @@ def start_xds_client(cmd, service_port): return client_process +def clean_up(gcp): + if gcp.global_forwarding_rule: + delete_global_forwarding_rule(gcp) + if gcp.target_http_proxy: + delete_target_http_proxy(gcp) + if gcp.url_map: + delete_url_map(gcp) + delete_backend_services(gcp) + if gcp.health_check_firewall_rule: + delete_firewall(gcp) + if gcp.health_check: + delete_health_check(gcp) + delete_instance_groups(gcp) + if gcp.instance_template: + delete_instance_template(gcp) + + class InstanceGroup(object): def __init__(self, name, url, zone): @@ -832,22 +853,6 @@ class GcpState(object): self.instance_template = None self.instance_groups = [] - def clean_up(self): - if self.global_forwarding_rule: - delete_global_forwarding_rule(self) - if self.target_http_proxy: - delete_target_http_proxy(self) - if self.url_map: - delete_url_map(self) - delete_backend_services(self) - if self.health_check_firewall_rule: - delete_firewall(self) - if self.health_check: - delete_health_check(self) - delete_instance_groups(self) - if self.instance_template: - delete_instance_template(self) - if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: @@ -1024,4 +1029,4 @@ finally: client_process.terminate() if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.') - gcp.clean_up() + clean_up(gcp)