diff --git a/src/proto/grpc/testing/test.proto b/src/proto/grpc/testing/test.proto index 0b198d8c260..1da43f48989 100644 --- a/src/proto/grpc/testing/test.proto +++ b/src/proto/grpc/testing/test.proto @@ -84,3 +84,9 @@ service LoadBalancerStatsService { rpc GetClientStats(LoadBalancerStatsRequest) returns (LoadBalancerStatsResponse) {} } + +// A service to remotely control health status of an xDS test server. +service XdsUpdateHealthService { + rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty); + rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty); +} diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index ffb029f5f04..81c913f5795 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -32,6 +32,7 @@ from oauth2client.client import GoogleCredentials import python_utils.jobset as jobset import python_utils.report_utils as report_utils +from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc @@ -46,6 +47,7 @@ logger.setLevel(logging.WARNING) _TEST_CASES = [ 'backends_restart', 'change_backend_service', + 'gentle_failover', 'new_instance_group_receives_traffic', 'ping_pong', 'remove_instance_group', @@ -231,12 +233,6 @@ _BOOTSTRAP_TEMPLATE = """ _TESTS_TO_FAIL_ON_RPC_FAILURE = [ 'new_instance_group_receives_traffic', 'ping_pong', 'round_robin' ] -_TESTS_USING_SECONDARY_IG = [ - 'secondary_locality_gets_no_requests_on_partial_primary_failure', - 'secondary_locality_gets_requests_on_primary_failure' -] -_USE_SECONDARY_IG = any( - [t in args.test_case for t in _TESTS_USING_SECONDARY_IG]) _PATH_MATCHER_NAME = 'path-matcher' _BASE_TEMPLATE_NAME = 'test-template' _BASE_INSTANCE_GROUP_NAME = 'test-ig' @@ -267,6 +263,10 @@ def get_client_stats(num_rpcs, timeout_sec): return response +class RpcDistributionError(Exception): + pass + + def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, allow_failures): start_time = time.time() @@ -287,7 +287,7 @@ def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, error_msg = '%d RPCs failed' % stats.num_failures if not error_msg: return - raise Exception(error_msg) + raise RpcDistributionError(error_msg) def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, @@ -396,6 +396,62 @@ def test_change_backend_service(gcp, original_backend_service, instance_group, patch_backend_instances(gcp, alternate_backend_service, []) +def test_gentle_failover(gcp, + backend_service, + primary_instance_group, + secondary_instance_group, + swapped_primary_and_secondary=False): + logger.info('Running test_gentle_failover') + num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) + min_instances_for_gentle_failover = 3 # Need >50% failure to start failover + try: + if num_primary_instances < min_instances_for_gentle_failover: + resize_instance_group(gcp, primary_instance_group, + min_instances_for_gentle_failover) + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_instance_group]) + primary_instance_names = get_instance_names(gcp, primary_instance_group) + secondary_instance_names = get_instance_names(gcp, + secondary_instance_group) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) + instances_to_stop = primary_instance_names[:-1] + remaining_instances = primary_instance_names[-1:] + try: + set_serving_status(instances_to_stop, + gcp.service_port, + serving=False) + wait_until_all_rpcs_go_to_given_backends( + remaining_instances + secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + set_serving_status(primary_instance_names, + gcp.service_port, + serving=True) + except RpcDistributionError as e: + if not swapped_primary_and_secondary and is_primary_instance_group( + gcp, secondary_instance_group): + # Swap expectation of primary and secondary instance groups. + test_gentle_failover(gcp, + backend_service, + secondary_instance_group, + primary_instance_group, + swapped_primary_and_secondary=True) + else: + raise e + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + resize_instance_group(gcp, primary_instance_group, + num_primary_instances) + instance_names = get_instance_names(gcp, primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) + + def test_new_instance_group_receives_traffic(gcp, backend_service, instance_group, same_zone_instance_group): @@ -478,61 +534,93 @@ def test_round_robin(gcp, backend_service, instance_group): def test_secondary_locality_gets_no_requests_on_partial_primary_failure( - gcp, backend_service, primary_instance_group, - secondary_zone_instance_group): + gcp, + backend_service, + primary_instance_group, + secondary_instance_group, + swapped_primary_and_secondary=False): logger.info( - 'Running test_secondary_locality_gets_no_requests_on_partial_primary_failure' + 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' ) try: patch_backend_instances( gcp, backend_service, - [primary_instance_group, secondary_zone_instance_group]) + [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) wait_for_healthy_backends(gcp, backend_service, - secondary_zone_instance_group) - primary_instance_names = get_instance_names(gcp, instance_group) - secondary_instance_names = get_instance_names( - gcp, secondary_zone_instance_group) + secondary_instance_group) + primary_instance_names = get_instance_names(gcp, primary_instance_group) wait_until_all_rpcs_go_to_given_backends(primary_instance_names, _WAIT_FOR_STATS_SEC) - original_size = len(primary_instance_names) - resize_instance_group(gcp, primary_instance_group, original_size - 1) - remaining_instance_names = get_instance_names(gcp, - primary_instance_group) - wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, - _WAIT_FOR_BACKEND_SEC) + instances_to_stop = primary_instance_names[:1] + remaining_instances = primary_instance_names[1:] + try: + set_serving_status(instances_to_stop, + gcp.service_port, + serving=False) + wait_until_all_rpcs_go_to_given_backends(remaining_instances, + _WAIT_FOR_BACKEND_SEC) + finally: + set_serving_status(primary_instance_names, + gcp.service_port, + serving=True) + except RpcDistributionError as e: + if not swapped_primary_and_secondary and is_primary_instance_group( + gcp, secondary_instance_group): + # Swap expectation of primary and secondary instance groups. + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, + backend_service, + secondary_instance_group, + primary_instance_group, + swapped_primary_and_secondary=True) + else: + raise e finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) - resize_instance_group(gcp, primary_instance_group, original_size) def test_secondary_locality_gets_requests_on_primary_failure( - gcp, backend_service, primary_instance_group, - secondary_zone_instance_group): - logger.info( - 'Running test_secondary_locality_gets_requests_on_primary_failure') + gcp, + backend_service, + primary_instance_group, + secondary_instance_group, + swapped_primary_and_secondary=False): + logger.info('Running secondary_locality_gets_requests_on_primary_failure') try: patch_backend_instances( gcp, backend_service, - [primary_instance_group, secondary_zone_instance_group]) + [primary_instance_group, secondary_instance_group]) wait_for_healthy_backends(gcp, backend_service, primary_instance_group) wait_for_healthy_backends(gcp, backend_service, - secondary_zone_instance_group) - primary_instance_names = get_instance_names(gcp, instance_group) - secondary_instance_names = get_instance_names( - gcp, secondary_zone_instance_group) + secondary_instance_group) + primary_instance_names = get_instance_names(gcp, primary_instance_group) + secondary_instance_names = get_instance_names(gcp, + secondary_instance_group) wait_until_all_rpcs_go_to_given_backends(primary_instance_names, - _WAIT_FOR_BACKEND_SEC) - original_size = len(primary_instance_names) - resize_instance_group(gcp, primary_instance_group, 0) - wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, - _WAIT_FOR_BACKEND_SEC) - - resize_instance_group(gcp, primary_instance_group, original_size) - new_instance_names = get_instance_names(gcp, primary_instance_group) - wait_for_healthy_backends(gcp, backend_service, primary_instance_group) - wait_until_all_rpcs_go_to_given_backends(new_instance_names, - _WAIT_FOR_BACKEND_SEC) + _WAIT_FOR_STATS_SEC) + try: + set_serving_status(primary_instance_names, + gcp.service_port, + serving=False) + wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + set_serving_status(primary_instance_names, + gcp.service_port, + serving=True) + except RpcDistributionError as e: + if not swapped_primary_and_secondary and is_primary_instance_group( + gcp, secondary_instance_group): + # Swap expectation of primary and secondary instance groups. + test_secondary_locality_gets_requests_on_primary_failure( + gcp, + backend_service, + secondary_instance_group, + primary_instance_group, + swapped_primary_and_secondary=True) + else: + raise e finally: patch_backend_instances(gcp, backend_service, [primary_instance_group]) @@ -636,6 +724,26 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group, set_validate_for_proxyless(gcp, True) +def set_serving_status(instances, service_port, serving): + for instance in instances: + with grpc.insecure_channel('%s:%d' % + (instance, service_port)) as channel: + stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) + if serving: + stub.SetServing(empty_pb2.Empty()) + else: + stub.SetNotServing(empty_pb2.Empty()) + + +def is_primary_instance_group(gcp, instance_group): + # Clients may connect to a TD instance in a different region than the + # client, in which case primary/secondary assignments may not be based on + # the client's actual locality. + instance_names = get_instance_names(gcp, instance_group) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + return all(peer in instance_names for peer in stats.rpcs_by_peer.keys()) + + def get_startup_script(path_to_server_binary, service_port): if path_to_server_binary: return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary, @@ -1182,18 +1290,20 @@ def wait_for_healthy_backends(gcp, timeout_sec=_WAIT_FOR_BACKEND_SEC): start_time = time.time() config = {'group': instance_group.url} + expected_size = len(get_instance_names(gcp, instance_group)) while time.time() - start_time <= timeout_sec: result = gcp.compute.backendServices().getHealth( project=gcp.project, backendService=backend_service.name, body=config).execute(num_retries=_GCP_API_RETRIES) if 'healthStatus' in result: + logger.info('received healthStatus: %s', result['healthStatus']) healthy = True for instance in result['healthStatus']: if instance['healthState'] != 'HEALTHY': healthy = False break - if healthy: + if healthy and expected_size == len(result['healthStatus']): return time.sleep(2) raise Exception('Not all backends became healthy within %d seconds: %s' % @@ -1227,6 +1337,7 @@ def get_instance_names(gcp, instance_group): # just extract the name manually. instance_name = item['instance'].split('/')[-1] instance_names.append(instance_name) + logger.info('retrieved instance names: %s', instance_names) return instance_names @@ -1306,8 +1417,7 @@ try: template_name = _BASE_TEMPLATE_NAME + args.gcp_suffix instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix - if _USE_SECONDARY_IG: - secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix + secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix if args.use_existing_gcp_resources: logger.info('Reusing existing GCP resources') get_health_check(gcp, health_check_name) @@ -1328,9 +1438,8 @@ try: instance_group = get_instance_group(gcp, args.zone, instance_group_name) same_zone_instance_group = get_instance_group( gcp, args.zone, same_zone_instance_group_name) - if _USE_SECONDARY_IG: - secondary_zone_instance_group = get_instance_group( - gcp, args.secondary_zone, secondary_zone_instance_group_name) + secondary_zone_instance_group = get_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name) else: create_health_check(gcp, health_check_name) create_health_check_firewall_rule(gcp, firewall_name) @@ -1360,10 +1469,9 @@ try: patch_backend_instances(gcp, backend_service, [instance_group]) same_zone_instance_group = add_instance_group( gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) - if _USE_SECONDARY_IG: - secondary_zone_instance_group = add_instance_group( - gcp, args.secondary_zone, secondary_zone_instance_group_name, - _INSTANCE_GROUP_SIZE) + secondary_zone_instance_group = add_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name, + _INSTANCE_GROUP_SIZE) wait_for_healthy_backends(gcp, backend_service, instance_group) @@ -1420,6 +1528,9 @@ try: instance_group, alternate_backend_service, same_zone_instance_group) + elif test_case == 'gentle_failover': + test_gentle_failover(gcp, backend_service, instance_group, + secondary_zone_instance_group) elif test_case == 'new_instance_group_receives_traffic': test_new_instance_group_receives_traffic( gcp, backend_service, instance_group,