Merge pull request #23439 from menghanl/xds_interop_routing

xds interop: add routing path/header matching to framework
pull/23657/head
Menghan Li 5 years ago committed by Eric Gribkoff
parent be38879301
commit f189419615
  1. 289
      tools/run_tests/run_xds_tests.py

@ -56,17 +56,28 @@ _TEST_CASES = [
'secondary_locality_gets_requests_on_primary_failure', 'secondary_locality_gets_requests_on_primary_failure',
'traffic_splitting', 'traffic_splitting',
] ]
# Valid test cases, but not in all. So the tests can only run manually, and
# aren't enabled automatically for all languages.
#
# TODO: Move them into _TEST_CASES when support is ready in all languages.
_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching']
def parse_test_cases(arg): def parse_test_cases(arg):
if arg == 'all':
return _TEST_CASES
if arg == '': if arg == '':
return [] return []
test_cases = arg.split(',') arg_split = arg.split(',')
if all([test_case in _TEST_CASES for test_case in test_cases]): test_cases = set()
return test_cases all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
raise Exception('Failed to parse test cases %s' % arg) for arg in arg_split:
if arg == "all":
test_cases = test_cases.union(_TEST_CASES)
else:
test_cases = test_cases.union([arg])
if not all([test_case in all_test_cases for test_case in test_cases]):
raise Exception('Failed to parse test cases %s' % arg)
# Perserve order.
return [x for x in all_test_cases if x in test_cases]
def parse_port_range(port_arg): def parse_port_range(port_arg):
@ -89,8 +100,10 @@ argp.add_argument(
'--test_case', '--test_case',
default='ping_pong', default='ping_pong',
type=parse_test_cases, type=parse_test_cases,
help='Comma-separated list of test cases to run, or \'all\' to run every ' help='Comma-separated list of test cases to run. Available tests: %s, '
'test. Available tests: %s' % ' '.join(_TEST_CASES)) '(or \'all\' to run every test). '
'Alternative tests not included in \'all\': %s' %
(','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
argp.add_argument( argp.add_argument(
'--bootstrap_file', '--bootstrap_file',
default='', default='',
@ -237,6 +250,12 @@ _BOOTSTRAP_TEMPLATE = """
_TESTS_TO_FAIL_ON_RPC_FAILURE = [ _TESTS_TO_FAIL_ON_RPC_FAILURE = [
'new_instance_group_receives_traffic', 'ping_pong', 'round_robin' 'new_instance_group_receives_traffic', 'ping_pong', 'round_robin'
] ]
# Tests that run UnaryCall and EmptyCall.
_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
# Tests that make UnaryCall with test metadata.
_TESTS_TO_SEND_METADATA = ['header_matching']
_TEST_METADATA_KEY = 'xds_md'
_TEST_METADATA_VALUE = 'exact_match'
_PATH_MATCHER_NAME = 'path-matcher' _PATH_MATCHER_NAME = 'path-matcher'
_BASE_TEMPLATE_NAME = 'test-template' _BASE_TEMPLATE_NAME = 'test-template'
_BASE_INSTANCE_GROUP_NAME = 'test-ig' _BASE_INSTANCE_GROUP_NAME = 'test-ig'
@ -348,6 +367,29 @@ def compare_distributions(actual_distribution, expected_distribution,
return True return True
def compare_expected_instances(stats, expected_instances):
"""Compare if stats have expected instances for each type of RPC.
Args:
stats: LoadBalancerStatsResponse reported by interop client.
expected_instances: a dict with key as the RPC type (string), value as
the expected backend instances (list of strings).
Returns:
Returns true if the instances are expected. False if not.
"""
for rpc_type, expected_peers in expected_instances.items():
rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
peers = list(rpcs_by_peer.keys())
if set(peers) != set(expected_peers):
logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
peers, expected_peers)
return False
return True
def test_backends_restart(gcp, backend_service, instance_group): def test_backends_restart(gcp, backend_service, instance_group):
logger.info('Running test_backends_restart') logger.info('Running test_backends_restart')
instance_names = get_instance_names(gcp, instance_group) instance_names = get_instance_names(gcp, instance_group)
@ -629,19 +671,20 @@ def test_secondary_locality_gets_requests_on_primary_failure(
patch_backend_instances(gcp, backend_service, [primary_instance_group]) patch_backend_instances(gcp, backend_service, [primary_instance_group])
def test_traffic_splitting(gcp, original_backend_service, instance_group, def prepare_services_for_urlmap_tests(gcp, original_backend_service,
alternate_backend_service, same_zone_instance_group): instance_group, alternate_backend_service,
# This test start with all traffic going to original_backend_service. Then same_zone_instance_group):
# it updates URL-map to set default action to traffic splitting between '''
# original and alternate. It waits for all backends in both services to This function prepares the services to be ready for tests that modifies
# receive traffic, then verifies that weights are expected. urlmaps.
logger.info('Running test_traffic_splitting')
Returns:
Returns original and alternate backend names as lists of strings.
'''
# The config validation for proxyless doesn't allow setting # The config validation for proxyless doesn't allow setting
# default_route_action. To test traffic splitting, we need to set the # default_route_action or route_rules. Disable validate
# route action to weighted clusters. Disable validate # validate_for_proxyless for this test. This can be removed when validation
# validate_for_proxyless for this test. This can be removed when # accepts default_route_action.
# validation accepts default_route_action.
logger.info('disabling validate_for_proxyless in target proxy') logger.info('disabling validate_for_proxyless in target proxy')
set_validate_for_proxyless(gcp, False) set_validate_for_proxyless(gcp, False)
@ -665,6 +708,20 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
logger.info('waiting for traffic to all go to original backends') logger.info('waiting for traffic to all go to original backends')
wait_until_all_rpcs_go_to_given_backends(original_backend_instances, wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
_WAIT_FOR_STATS_SEC) _WAIT_FOR_STATS_SEC)
return original_backend_instances, alternate_backend_instances
def test_traffic_splitting(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic going to original_backend_service. Then
# it updates URL-map to set default action to traffic splitting between
# original and alternate. It waits for all backends in both services to
# receive traffic, then verifies that weights are expected.
logger.info('Running test_traffic_splitting')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try: try:
# Patch urlmap, change route action to traffic splitting between # Patch urlmap, change route action to traffic splitting between
@ -728,6 +785,157 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
set_validate_for_proxyless(gcp, True) set_validate_for_proxyless(gcp, True)
def test_path_matching(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic (UnaryCall and EmptyCall) going to
# original_backend_service.
#
# Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
# go different backends. It waits for all backends in both services to
# receive traffic, then verifies that traffic goes to the expected
# backends.
logger.info('Running test_path_matching')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try:
# A list of tuples (route_rules, expected_instances).
test_cases = [
(
[{
'priority': 0,
# FullPath EmptyCall -> alternate_backend_service.
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
}],
'service': alternate_backend_service.url
}],
{
"EmptyCall": alternate_backend_instances,
"UnaryCall": original_backend_instances
}),
(
[{
'priority': 0,
# Prefix UnaryCall -> alternate_backend_service.
'matchRules': [{
'prefixMatch': '/grpc.testing.TestService/Unary'
}],
'service': alternate_backend_service.url
}],
{
"UnaryCall": alternate_backend_instances,
"EmptyCall": original_backend_instances
})
]
for (route_rules, expected_instances) in test_cases:
logger.info('patching url map with %s -> alternative',
route_rules[0]['matchRules'])
patch_url_map_backend_service(gcp,
original_backend_service,
route_rules=route_rules)
# Wait for traffic to go to both services.
logger.info(
'waiting for traffic to go to all backends (including alternate)'
)
wait_until_all_rpcs_go_to_given_backends(
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
retry_count = 10
# Each attempt takes about 10 seconds, 10 retries is equivalent to 100
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
if not stats.rpcs_by_method:
raise ValueError(
'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
)
logger.info('attempt %d', i)
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_instances(gcp, alternate_backend_service, [])
set_validate_for_proxyless(gcp, True)
def test_header_matching(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic (UnaryCall and EmptyCall) going to
# original_backend_service.
#
# Then it updates URL-map to add routes, to make RPCs with test headers to
# go to different backends. It waits for all backends in both services to
# receive traffic, then verifies that traffic goes to the expected
# backends.
logger.info('Running test_header_matching')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try:
# A list of tuples (route_rules, expected_instances).
test_cases = [(
[{
'priority': 0,
# Header ExactMatch -> alternate_backend_service.
# EmptyCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'exactMatch': _TEST_METADATA_VALUE
}]
}],
'service': alternate_backend_service.url
}],
{
"EmptyCall": alternate_backend_instances,
"UnaryCall": original_backend_instances
})]
for (route_rules, expected_instances) in test_cases:
logger.info('patching url map with %s -> alternative',
route_rules[0]['matchRules'])
patch_url_map_backend_service(gcp,
original_backend_service,
route_rules=route_rules)
# Wait for traffic to go to both services.
logger.info(
'waiting for traffic to go to all backends (including alternate)'
)
wait_until_all_rpcs_go_to_given_backends(
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
retry_count = 10
# Each attempt takes about 10 seconds, 10 retries is equivalent to 100
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
if not stats.rpcs_by_method:
raise ValueError(
'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
)
logger.info('attempt %d', i)
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_instances(gcp, alternate_backend_service, [])
set_validate_for_proxyless(gcp, True)
def set_serving_status(instances, service_port, serving): def set_serving_status(instances, service_port, serving):
for instance in instances: for instance in instances:
with grpc.insecure_channel('%s:%d' % with grpc.insecure_channel('%s:%d' %
@ -1208,7 +1416,8 @@ def resize_instance_group(gcp,
def patch_url_map_backend_service(gcp, def patch_url_map_backend_service(gcp,
backend_service=None, backend_service=None,
services_with_weights=None): services_with_weights=None,
route_rules=None):
'''change url_map's backend service '''change url_map's backend service
Only one of backend_service and service_with_weights can be not None. Only one of backend_service and service_with_weights can be not None.
@ -1230,6 +1439,7 @@ def patch_url_map_backend_service(gcp,
'name': _PATH_MATCHER_NAME, 'name': _PATH_MATCHER_NAME,
'defaultService': default_service, 'defaultService': default_service,
'defaultRouteAction': default_route_action, 'defaultRouteAction': default_route_action,
'routeRules': route_rules,
}] }]
} }
logger.debug('Sending GCP request with body=%s', config) logger.debug('Sending GCP request with body=%s', config)
@ -1504,22 +1714,41 @@ try:
test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
test_log_file = open(test_log_filename, 'w+') test_log_file = open(test_log_filename, 'w+')
client_process = None client_process = None
if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
else:
rpcs_to_send = '--rpc="UnaryCall"'
if test_case in _TESTS_TO_SEND_METADATA:
metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format(
key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE)
else:
metadata_to_send = '--metadata=""'
if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE: if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE:
wait_for_config_propagation( wait_for_config_propagation(
gcp, instance_group, gcp, instance_group,
args.client_cmd.format(server_uri=server_uri, args.client_cmd.format(server_uri=server_uri,
stats_port=args.stats_port, stats_port=args.stats_port,
qps=args.qps, qps=args.qps,
fail_on_failed_rpc=False), fail_on_failed_rpc=False,
rpcs_to_send=rpcs_to_send,
metadata_to_send=metadata_to_send),
client_env) client_env)
fail_on_failed_rpc = '--fail_on_failed_rpc=true' fail_on_failed_rpc = '--fail_on_failed_rpc=true'
else: else:
fail_on_failed_rpc = '--fail_on_failed_rpc=false' fail_on_failed_rpc = '--fail_on_failed_rpc=false'
client_cmd = shlex.split(
args.client_cmd.format(server_uri=server_uri, client_cmd_formatted = args.client_cmd.format(
stats_port=args.stats_port, server_uri=server_uri,
qps=args.qps, stats_port=args.stats_port,
fail_on_failed_rpc=fail_on_failed_rpc)) qps=args.qps,
fail_on_failed_rpc=fail_on_failed_rpc,
rpcs_to_send=rpcs_to_send,
metadata_to_send=metadata_to_send)
logger.debug('running client: %s', client_cmd_formatted)
client_cmd = shlex.split(client_cmd_formatted)
try: try:
client_process = subprocess.Popen(client_cmd, client_process = subprocess.Popen(client_cmd,
env=client_env, env=client_env,
@ -1559,6 +1788,14 @@ try:
test_traffic_splitting(gcp, backend_service, instance_group, test_traffic_splitting(gcp, backend_service, instance_group,
alternate_backend_service, alternate_backend_service,
same_zone_instance_group) same_zone_instance_group)
elif test_case == 'path_matching':
test_path_matching(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
elif test_case == 'header_matching':
test_header_matching(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
else: else:
logger.error('Unknown test case: %s', test_case) logger.error('Unknown test case: %s', test_case)
sys.exit(1) sys.exit(1)

Loading…
Cancel
Save