reviewer comments

pull/22193/head
Eric Gribkoff 5 years ago
parent 0ddf5565e3
commit 033695da8b
  1. 153
      tools/run_tests/run_xds_tests.py

@ -55,7 +55,7 @@ argp.add_argument(
'ensure distinct names across test runs.') 'ensure distinct names across test runs.')
argp.add_argument( argp.add_argument(
'--test_case', '--test_case',
default=None, default='ping_pong',
choices=[ choices=[
'all', 'all',
'backends_restart', 'backends_restart',
@ -64,8 +64,8 @@ argp.add_argument(
'ping_pong', 'ping_pong',
'remove_instance_group', 'remove_instance_group',
'round_robin', 'round_robin',
'secondary_locality_gets_requests_on_primary_failure',
'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_no_requests_on_partial_primary_failure',
'secondary_locality_gets_requests_on_primary_failure',
]) ])
argp.add_argument( argp.add_argument(
'--client_cmd', '--client_cmd',
@ -184,13 +184,11 @@ def get_client_stats(num_rpcs, timeout_sec):
raise Exception('GetClientStats RPC failed') raise Exception('GetClientStats RPC failed')
def wait_until_only_given_instances_receive_load(backends, def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
timeout_sec, allow_failures):
num_rpcs=100,
allow_failures=False):
start_time = time.time() start_time = time.time()
error_msg = None error_msg = None
logger.debug('Waiting for %d sec until backends %s receive load' % logger.debug('Waiting for %d sec until backends %s receive load' %
(timeout_sec, backends)) (timeout_sec, backends))
while time.time() - start_time <= timeout_sec: while time.time() - start_time <= timeout_sec:
error_msg = None error_msg = None
@ -209,31 +207,50 @@ def wait_until_only_given_instances_receive_load(backends,
raise Exception(error_msg) raise Exception(error_msg)
def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
timeout_sec,
num_rpcs=100):
_verify_rpcs_to_given_backends(backends,
timeout_sec,
num_rpcs,
allow_failures=True)
def wait_until_all_rpcs_go_to_given_backends(backends,
timeout_sec,
num_rpcs=100):
_verify_rpcs_to_given_backends(backends,
timeout_sec,
num_rpcs,
allow_failures=False)
def test_backends_restart(gcp, backend_service, instance_group): def test_backends_restart(gcp, backend_service, instance_group):
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
num_instances = len(instance_names) num_instances = len(instance_names)
start_time = time.time() start_time = time.time()
wait_until_only_given_instances_receive_load(instance_names, wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
try: try:
resize_instance_group(gcp, instance_group, 0) resize_instance_group(gcp, instance_group, 0)
wait_until_only_given_instances_receive_load([], wait_until_all_rpcs_go_to_given_backends_or_fail([],
_WAIT_FOR_BACKEND_SEC, _WAIT_FOR_BACKEND_SEC)
allow_failures=True)
finally: finally:
resize_instance_group(gcp, instance_group, num_instances) resize_instance_group(gcp, instance_group, num_instances)
wait_for_healthy_backends(gcp, backend_service, instance_group) wait_for_healthy_backends(gcp, backend_service, instance_group)
new_instance_names = get_instance_names(gcp, instance_group) new_instance_names = get_instance_names(gcp, instance_group)
wait_until_only_given_instances_receive_load(new_instance_names, wait_until_all_rpcs_go_to_given_backends(new_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
original_distribution = list(stats.rpcs_by_peer.values()) original_distribution = list(stats.rpcs_by_peer.values())
original_distribution.sort() original_distribution.sort()
new_distribution = list(new_stats.rpcs_by_peer.values()) new_distribution = list(new_stats.rpcs_by_peer.values())
new_distribution.sort() new_distribution.sort()
if original_distribution != new_distribution: error_threshold = 3
raise Exception('Distributions do not match: ', stats, new_stats) for i in range(len(original_distribution)):
if abs(original_distribution[i] - new_distribution[i]) > threshold:
raise Exception('Distributions do not match: ', stats, new_stats)
def test_change_backend_service(gcp, original_backend_service, instance_group, def test_change_backend_service(gcp, original_backend_service, instance_group,
@ -247,15 +264,15 @@ def test_change_backend_service(gcp, original_backend_service, instance_group,
wait_for_healthy_backends(gcp, original_backend_service, instance_group) wait_for_healthy_backends(gcp, original_backend_service, instance_group)
wait_for_healthy_backends(gcp, alternate_backend_service, wait_for_healthy_backends(gcp, alternate_backend_service,
same_zone_instance_group) same_zone_instance_group)
wait_until_only_given_instances_receive_load(original_backend_instances, wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
try: try:
patch_url_map_backend_service(gcp, alternate_backend_service) patch_url_map_backend_service(gcp, alternate_backend_service)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
if stats.num_failures > 0: if stats.num_failures > 0:
raise Exception('Unexpected failure: %s', stats) raise Exception('Unexpected failure: %s', stats)
wait_until_only_given_instances_receive_load( wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
alternate_backend_instances, _WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
finally: finally:
patch_url_map_backend_service(gcp, original_backend_service) patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_instances(gcp, alternate_backend_service, []) patch_backend_instances(gcp, alternate_backend_service, [])
@ -265,8 +282,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service,
instance_group, instance_group,
same_zone_instance_group): same_zone_instance_group):
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
wait_until_only_given_instances_receive_load(instance_names, wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
try: try:
patch_backend_instances(gcp, patch_backend_instances(gcp,
backend_service, backend_service,
@ -277,8 +294,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service,
same_zone_instance_group) same_zone_instance_group)
combined_instance_names = instance_names + get_instance_names( combined_instance_names = instance_names + get_instance_names(
gcp, same_zone_instance_group) gcp, same_zone_instance_group)
wait_until_only_given_instances_receive_load(combined_instance_names, wait_until_all_rpcs_go_to_given_backends(combined_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
finally: finally:
patch_backend_instances(gcp, backend_service, [instance_group]) patch_backend_instances(gcp, backend_service, [instance_group])
@ -286,20 +303,8 @@ def test_new_instance_group_receives_traffic(gcp, backend_service,
def test_ping_pong(gcp, backend_service, instance_group): def test_ping_pong(gcp, backend_service, instance_group):
wait_for_healthy_backends(gcp, backend_service, instance_group) wait_for_healthy_backends(gcp, backend_service, instance_group)
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
start_time = time.time() wait_until_all_rpcs_go_to_given_backends(instance_names,
error_msg = None _WAIT_FOR_STATS_SEC)
while time.time() - start_time <= _WAIT_FOR_STATS_SEC:
error_msg = None
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
rpcs_by_peer = stats.rpcs_by_peer
for instance in instance_names:
if instance not in rpcs_by_peer:
error_msg = 'Instance %s did not receive load' % instance
break
if not error_msg and len(rpcs_by_peer) > len(instance_names):
error_msg = 'Unexpected instance received load: %s' % rpcs_by_peer
if not error_msg:
return
raise Exception(error_msg) raise Exception(error_msg)
@ -316,29 +321,28 @@ def test_remove_instance_group(gcp, backend_service, instance_group,
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
same_zone_instance_names = get_instance_names(gcp, same_zone_instance_names = get_instance_names(gcp,
same_zone_instance_group) same_zone_instance_group)
wait_until_only_given_instances_receive_load( wait_until_all_rpcs_go_to_given_backends(
instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC)
patch_backend_instances(gcp, patch_backend_instances(gcp,
backend_service, [same_zone_instance_group], backend_service, [same_zone_instance_group],
balancing_mode='RATE') balancing_mode='RATE')
wait_until_only_given_instances_receive_load(same_zone_instance_names, wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
finally: finally:
patch_backend_instances(gcp, backend_service, [instance_group]) patch_backend_instances(gcp, backend_service, [instance_group])
wait_until_only_given_instances_receive_load(instance_names, wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
def test_round_robin(gcp, backend_service, instance_group): def test_round_robin(gcp, backend_service, instance_group):
wait_for_healthy_backends(gcp, backend_service, instance_group) wait_for_healthy_backends(gcp, backend_service, instance_group)
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
threshold = 1 threshold = 1
wait_until_only_given_instances_receive_load(instance_names, wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
total_requests_received = sum( total_requests_received = sum(requests_received)
[stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer])
if total_requests_received != _NUM_TEST_RPCS: if total_requests_received != _NUM_TEST_RPCS:
raise Exception('Unexpected RPC failures', stats) raise Exception('Unexpected RPC failures', stats)
expected_requests = total_requests_received / len(instance_names) expected_requests = total_requests_received / len(instance_names)
@ -362,14 +366,14 @@ def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
primary_instance_names = get_instance_names(gcp, instance_group) primary_instance_names = get_instance_names(gcp, instance_group)
secondary_instance_names = get_instance_names( secondary_instance_names = get_instance_names(
gcp, secondary_zone_instance_group) gcp, secondary_zone_instance_group)
wait_until_only_given_instances_receive_load(primary_instance_names, wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
original_size = len(primary_instance_names) original_size = len(primary_instance_names)
resize_instance_group(gcp, primary_instance_group, original_size - 1) resize_instance_group(gcp, primary_instance_group, original_size - 1)
remaining_instance_names = get_instance_names(gcp, remaining_instance_names = get_instance_names(gcp,
primary_instance_group) primary_instance_group)
wait_until_only_given_instances_receive_load(remaining_instance_names, wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
finally: finally:
patch_backend_instances(gcp, backend_service, [primary_instance_group]) patch_backend_instances(gcp, backend_service, [primary_instance_group])
resize_instance_group(gcp, primary_instance_group, original_size) resize_instance_group(gcp, primary_instance_group, original_size)
@ -388,18 +392,18 @@ def test_secondary_locality_gets_requests_on_primary_failure(
primary_instance_names = get_instance_names(gcp, instance_group) primary_instance_names = get_instance_names(gcp, instance_group)
secondary_instance_names = get_instance_names( secondary_instance_names = get_instance_names(
gcp, secondary_zone_instance_group) gcp, secondary_zone_instance_group)
wait_until_only_given_instances_receive_load(primary_instance_names, wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
original_size = len(primary_instance_names) original_size = len(primary_instance_names)
resize_instance_group(gcp, primary_instance_group, 0) resize_instance_group(gcp, primary_instance_group, 0)
wait_until_only_given_instances_receive_load(secondary_instance_names, wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
resize_instance_group(gcp, primary_instance_group, original_size) resize_instance_group(gcp, primary_instance_group, original_size)
new_instance_names = get_instance_names(gcp, primary_instance_group) new_instance_names = get_instance_names(gcp, primary_instance_group)
wait_for_healthy_backends(gcp, backend_service, primary_instance_group) wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
wait_until_only_given_instances_receive_load(new_instance_names, wait_until_all_rpcs_go_to_given_backends(new_instance_names,
_WAIT_FOR_BACKEND_SEC) _WAIT_FOR_BACKEND_SEC)
finally: finally:
patch_backend_instances(gcp, backend_service, [primary_instance_group]) patch_backend_instances(gcp, backend_service, [primary_instance_group])
@ -802,6 +806,23 @@ def start_xds_client(cmd, service_port):
return client_process return client_process
def clean_up(gcp):
if gcp.global_forwarding_rule:
delete_global_forwarding_rule(gcp)
if gcp.target_http_proxy:
delete_target_http_proxy(gcp)
if gcp.url_map:
delete_url_map(gcp)
delete_backend_services(gcp)
if gcp.health_check_firewall_rule:
delete_firewall(gcp)
if gcp.health_check:
delete_health_check(gcp)
delete_instance_groups(gcp)
if gcp.instance_template:
delete_instance_template(gcp)
class InstanceGroup(object): class InstanceGroup(object):
def __init__(self, name, url, zone): def __init__(self, name, url, zone):
@ -832,22 +853,6 @@ class GcpState(object):
self.instance_template = None self.instance_template = None
self.instance_groups = [] self.instance_groups = []
def clean_up(self):
if self.global_forwarding_rule:
delete_global_forwarding_rule(self)
if self.target_http_proxy:
delete_target_http_proxy(self)
if self.url_map:
delete_url_map(self)
delete_backend_services(self)
if self.health_check_firewall_rule:
delete_firewall(self)
if self.health_check:
delete_health_check(self)
delete_instance_groups(self)
if self.instance_template:
delete_instance_template(self)
if args.compute_discovery_document: if args.compute_discovery_document:
with open(args.compute_discovery_document, 'r') as discovery_doc: with open(args.compute_discovery_document, 'r') as discovery_doc:
@ -1024,4 +1029,4 @@ finally:
client_process.terminate() client_process.terminate()
if not args.keep_gcp_resources: if not args.keep_gcp_resources:
logger.info('Cleaning up GCP resources. This may take some time.') logger.info('Cleaning up GCP resources. This may take some time.')
gcp.clean_up() clean_up(gcp)

Loading…
Cancel
Save