Merge pull request #23206 from ericgribkoff/dynamic_server_health

Add gentle failover test
pull/23250/head
Eric Gribkoff 5 years ago
parent 118282682a
commit c7e0d323e0
  1. 6
      src/proto/grpc/testing/test.proto
  2. 197
      tools/run_tests/run_xds_tests.py

@ -84,3 +84,9 @@ service LoadBalancerStatsService {
rpc GetClientStats(LoadBalancerStatsRequest)
returns (LoadBalancerStatsResponse) {}
}
// A service to remotely control health status of an xDS test server.
service XdsUpdateHealthService {
rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty);
rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty);
}

@ -32,6 +32,7 @@ from oauth2client.client import GoogleCredentials
import python_utils.jobset as jobset
import python_utils.report_utils as report_utils
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
@ -46,6 +47,7 @@ logger.setLevel(logging.WARNING)
_TEST_CASES = [
'backends_restart',
'change_backend_service',
'gentle_failover',
'new_instance_group_receives_traffic',
'ping_pong',
'remove_instance_group',
@ -231,12 +233,6 @@ _BOOTSTRAP_TEMPLATE = """
_TESTS_TO_FAIL_ON_RPC_FAILURE = [
'new_instance_group_receives_traffic', 'ping_pong', 'round_robin'
]
_TESTS_USING_SECONDARY_IG = [
'secondary_locality_gets_no_requests_on_partial_primary_failure',
'secondary_locality_gets_requests_on_primary_failure'
]
_USE_SECONDARY_IG = any(
[t in args.test_case for t in _TESTS_USING_SECONDARY_IG])
_PATH_MATCHER_NAME = 'path-matcher'
_BASE_TEMPLATE_NAME = 'test-template'
_BASE_INSTANCE_GROUP_NAME = 'test-ig'
@ -267,6 +263,10 @@ def get_client_stats(num_rpcs, timeout_sec):
return response
class RpcDistributionError(Exception):
pass
def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
allow_failures):
start_time = time.time()
@ -287,7 +287,7 @@ def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
error_msg = '%d RPCs failed' % stats.num_failures
if not error_msg:
return
raise Exception(error_msg)
raise RpcDistributionError(error_msg)
def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
@ -396,6 +396,62 @@ def test_change_backend_service(gcp, original_backend_service, instance_group,
patch_backend_instances(gcp, alternate_backend_service, [])
def test_gentle_failover(gcp,
backend_service,
primary_instance_group,
secondary_instance_group,
swapped_primary_and_secondary=False):
logger.info('Running test_gentle_failover')
num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
min_instances_for_gentle_failover = 3 # Need >50% failure to start failover
try:
if num_primary_instances < min_instances_for_gentle_failover:
resize_instance_group(gcp, primary_instance_group,
min_instances_for_gentle_failover)
patch_backend_instances(
gcp, backend_service,
[primary_instance_group, secondary_instance_group])
primary_instance_names = get_instance_names(gcp, primary_instance_group)
secondary_instance_names = get_instance_names(gcp,
secondary_instance_group)
wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
wait_for_healthy_backends(gcp, backend_service,
secondary_instance_group)
wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
_WAIT_FOR_STATS_SEC)
instances_to_stop = primary_instance_names[:-1]
remaining_instances = primary_instance_names[-1:]
try:
set_serving_status(instances_to_stop,
gcp.service_port,
serving=False)
wait_until_all_rpcs_go_to_given_backends(
remaining_instances + secondary_instance_names,
_WAIT_FOR_BACKEND_SEC)
finally:
set_serving_status(primary_instance_names,
gcp.service_port,
serving=True)
except RpcDistributionError as e:
if not swapped_primary_and_secondary and is_primary_instance_group(
gcp, secondary_instance_group):
# Swap expectation of primary and secondary instance groups.
test_gentle_failover(gcp,
backend_service,
secondary_instance_group,
primary_instance_group,
swapped_primary_and_secondary=True)
else:
raise e
finally:
patch_backend_instances(gcp, backend_service, [primary_instance_group])
resize_instance_group(gcp, primary_instance_group,
num_primary_instances)
instance_names = get_instance_names(gcp, primary_instance_group)
wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_BACKEND_SEC)
def test_new_instance_group_receives_traffic(gcp, backend_service,
instance_group,
same_zone_instance_group):
@ -478,61 +534,93 @@ def test_round_robin(gcp, backend_service, instance_group):
def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
gcp, backend_service, primary_instance_group,
secondary_zone_instance_group):
gcp,
backend_service,
primary_instance_group,
secondary_instance_group,
swapped_primary_and_secondary=False):
logger.info(
'Running test_secondary_locality_gets_no_requests_on_partial_primary_failure'
'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
)
try:
patch_backend_instances(
gcp, backend_service,
[primary_instance_group, secondary_zone_instance_group])
[primary_instance_group, secondary_instance_group])
wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
wait_for_healthy_backends(gcp, backend_service,
secondary_zone_instance_group)
primary_instance_names = get_instance_names(gcp, instance_group)
secondary_instance_names = get_instance_names(
gcp, secondary_zone_instance_group)
secondary_instance_group)
primary_instance_names = get_instance_names(gcp, primary_instance_group)
wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
_WAIT_FOR_STATS_SEC)
original_size = len(primary_instance_names)
resize_instance_group(gcp, primary_instance_group, original_size - 1)
remaining_instance_names = get_instance_names(gcp,
primary_instance_group)
wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
instances_to_stop = primary_instance_names[:1]
remaining_instances = primary_instance_names[1:]
try:
set_serving_status(instances_to_stop,
gcp.service_port,
serving=False)
wait_until_all_rpcs_go_to_given_backends(remaining_instances,
_WAIT_FOR_BACKEND_SEC)
finally:
set_serving_status(primary_instance_names,
gcp.service_port,
serving=True)
except RpcDistributionError as e:
if not swapped_primary_and_secondary and is_primary_instance_group(
gcp, secondary_instance_group):
# Swap expectation of primary and secondary instance groups.
test_secondary_locality_gets_no_requests_on_partial_primary_failure(
gcp,
backend_service,
secondary_instance_group,
primary_instance_group,
swapped_primary_and_secondary=True)
else:
raise e
finally:
patch_backend_instances(gcp, backend_service, [primary_instance_group])
resize_instance_group(gcp, primary_instance_group, original_size)
def test_secondary_locality_gets_requests_on_primary_failure(
gcp, backend_service, primary_instance_group,
secondary_zone_instance_group):
logger.info(
'Running test_secondary_locality_gets_requests_on_primary_failure')
gcp,
backend_service,
primary_instance_group,
secondary_instance_group,
swapped_primary_and_secondary=False):
logger.info('Running secondary_locality_gets_requests_on_primary_failure')
try:
patch_backend_instances(
gcp, backend_service,
[primary_instance_group, secondary_zone_instance_group])
[primary_instance_group, secondary_instance_group])
wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
wait_for_healthy_backends(gcp, backend_service,
secondary_zone_instance_group)
primary_instance_names = get_instance_names(gcp, instance_group)
secondary_instance_names = get_instance_names(
gcp, secondary_zone_instance_group)
secondary_instance_group)
primary_instance_names = get_instance_names(gcp, primary_instance_group)
secondary_instance_names = get_instance_names(gcp,
secondary_instance_group)
wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
_WAIT_FOR_BACKEND_SEC)
original_size = len(primary_instance_names)
resize_instance_group(gcp, primary_instance_group, 0)
_WAIT_FOR_STATS_SEC)
try:
set_serving_status(primary_instance_names,
gcp.service_port,
serving=False)
wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
_WAIT_FOR_BACKEND_SEC)
resize_instance_group(gcp, primary_instance_group, original_size)
new_instance_names = get_instance_names(gcp, primary_instance_group)
wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
wait_until_all_rpcs_go_to_given_backends(new_instance_names,
_WAIT_FOR_BACKEND_SEC)
finally:
set_serving_status(primary_instance_names,
gcp.service_port,
serving=True)
except RpcDistributionError as e:
if not swapped_primary_and_secondary and is_primary_instance_group(
gcp, secondary_instance_group):
# Swap expectation of primary and secondary instance groups.
test_secondary_locality_gets_requests_on_primary_failure(
gcp,
backend_service,
secondary_instance_group,
primary_instance_group,
swapped_primary_and_secondary=True)
else:
raise e
finally:
patch_backend_instances(gcp, backend_service, [primary_instance_group])
@ -636,6 +724,26 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
set_validate_for_proxyless(gcp, True)
def set_serving_status(instances, service_port, serving):
for instance in instances:
with grpc.insecure_channel('%s:%d' %
(instance, service_port)) as channel:
stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
if serving:
stub.SetServing(empty_pb2.Empty())
else:
stub.SetNotServing(empty_pb2.Empty())
def is_primary_instance_group(gcp, instance_group):
# Clients may connect to a TD instance in a different region than the
# client, in which case primary/secondary assignments may not be based on
# the client's actual locality.
instance_names = get_instance_names(gcp, instance_group)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
def get_startup_script(path_to_server_binary, service_port):
if path_to_server_binary:
return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
@ -1182,18 +1290,20 @@ def wait_for_healthy_backends(gcp,
timeout_sec=_WAIT_FOR_BACKEND_SEC):
start_time = time.time()
config = {'group': instance_group.url}
expected_size = len(get_instance_names(gcp, instance_group))
while time.time() - start_time <= timeout_sec:
result = gcp.compute.backendServices().getHealth(
project=gcp.project,
backendService=backend_service.name,
body=config).execute(num_retries=_GCP_API_RETRIES)
if 'healthStatus' in result:
logger.info('received healthStatus: %s', result['healthStatus'])
healthy = True
for instance in result['healthStatus']:
if instance['healthState'] != 'HEALTHY':
healthy = False
break
if healthy:
if healthy and expected_size == len(result['healthStatus']):
return
time.sleep(2)
raise Exception('Not all backends became healthy within %d seconds: %s' %
@ -1227,6 +1337,7 @@ def get_instance_names(gcp, instance_group):
# just extract the name manually.
instance_name = item['instance'].split('/')[-1]
instance_names.append(instance_name)
logger.info('retrieved instance names: %s', instance_names)
return instance_names
@ -1306,7 +1417,6 @@ try:
template_name = _BASE_TEMPLATE_NAME + args.gcp_suffix
instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix
same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix
if _USE_SECONDARY_IG:
secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix
if args.use_existing_gcp_resources:
logger.info('Reusing existing GCP resources')
@ -1328,7 +1438,6 @@ try:
instance_group = get_instance_group(gcp, args.zone, instance_group_name)
same_zone_instance_group = get_instance_group(
gcp, args.zone, same_zone_instance_group_name)
if _USE_SECONDARY_IG:
secondary_zone_instance_group = get_instance_group(
gcp, args.secondary_zone, secondary_zone_instance_group_name)
else:
@ -1360,7 +1469,6 @@ try:
patch_backend_instances(gcp, backend_service, [instance_group])
same_zone_instance_group = add_instance_group(
gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
if _USE_SECONDARY_IG:
secondary_zone_instance_group = add_instance_group(
gcp, args.secondary_zone, secondary_zone_instance_group_name,
_INSTANCE_GROUP_SIZE)
@ -1420,6 +1528,9 @@ try:
instance_group,
alternate_backend_service,
same_zone_instance_group)
elif test_case == 'gentle_failover':
test_gentle_failover(gcp, backend_service, instance_group,
secondary_zone_instance_group)
elif test_case == 'new_instance_group_receives_traffic':
test_new_instance_group_receives_traffic(
gcp, backend_service, instance_group,

Loading…
Cancel
Save