Merge pull request #23657 from ericgribkoff/backport_new_xds_tests

Backport new xds tests
pull/23666/head v1.31.0-pre2
Eric Gribkoff 5 years ago committed by GitHub
commit 33a80989ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      doc/xds-test-descriptions.md
  2. 9
      test/cpp/interop/xds_interop_client.cc
  3. 13
      tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
  4. 301
      tools/run_tests/run_xds_tests.py

@ -27,7 +27,9 @@ Clients should accept these arguments:
* --fail_on_failed_rpcs=BOOL
* If true, the client should exit with a non-zero return code if any RPCs
fail. Default is false.
fail after at least one RPC has succeeded, indicating a valid xDS config
was received. This accounts for any startup-related delays in receiving
an initial config from the load balancer. Default is false.
* --num_channels=CHANNELS
* The number of channels to create to the server.
* --qps=QPS

@ -16,6 +16,7 @@
*
*/
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
@ -41,7 +42,8 @@
#include "test/core/util/test_config.h"
#include "test/cpp/util/test_config.h"
DEFINE_bool(fail_on_failed_rpc, false, "Fail client if any RPCs fail.");
DEFINE_bool(fail_on_failed_rpc, false,
"Fail client if any RPCs fail after first successful RPC.");
DEFINE_int32(num_channels, 1, "Number of channels.");
DEFINE_bool(print_response, false, "Write RPC response to stdout.");
DEFINE_int32(qps, 1, "Qps per channel.");
@ -80,6 +82,8 @@ int global_request_id;
std::set<XdsStatsWatcher*> watchers;
// Mutex for global_request_id and watchers
std::mutex mu;
// Whether at least one RPC has succeeded, indicating xDS resolution completed.
std::atomic<bool> one_rpc_succeeded(false);
/** Records the remote peer distribution for a given range of RPCs. */
class XdsStatsWatcher {
@ -223,7 +227,7 @@ class TestClient {
std::cout << "RPC failed: " << call->status.error_code() << ": "
<< call->status.error_message() << std::endl;
}
if (FLAGS_fail_on_failed_rpc) {
if (FLAGS_fail_on_failed_rpc && one_rpc_succeeded.load()) {
abort();
}
} else {
@ -239,6 +243,7 @@ class TestClient {
std::cout << "Greeting: Hello world, this is " << hostname
<< ", from " << call->context.peer() << std::endl;
}
one_rpc_succeeded = true;
}
delete call;

@ -48,12 +48,19 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py
bazel build test/cpp/interop:xds_interop_client
# Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests.
#
# TODO: remove "path_matching" and "header_matching" from --test_case after
# they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case=all \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
--gcp_suffix=$(date '+%s') \
--verbose \
--client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps} {fail_on_failed_rpc}'
--client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps} {fail_on_failed_rpc} \
{rpcs_to_send} \
{metadata_to_send}'

@ -56,17 +56,28 @@ _TEST_CASES = [
'secondary_locality_gets_requests_on_primary_failure',
'traffic_splitting',
]
# Valid test cases, but not in all. So the tests can only run manually, and
# aren't enabled automatically for all languages.
#
# TODO: Move them into _TEST_CASES when support is ready in all languages.
_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching']
def parse_test_cases(arg):
if arg == 'all':
return _TEST_CASES
if arg == '':
return []
test_cases = arg.split(',')
if all([test_case in _TEST_CASES for test_case in test_cases]):
return test_cases
raise Exception('Failed to parse test cases %s' % arg)
arg_split = arg.split(',')
test_cases = set()
all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
for arg in arg_split:
if arg == "all":
test_cases = test_cases.union(_TEST_CASES)
else:
test_cases = test_cases.union([arg])
if not all([test_case in all_test_cases for test_case in test_cases]):
raise Exception('Failed to parse test cases %s' % arg)
# Perserve order.
return [x for x in all_test_cases if x in test_cases]
def parse_port_range(port_arg):
@ -89,8 +100,10 @@ argp.add_argument(
'--test_case',
default='ping_pong',
type=parse_test_cases,
help='Comma-separated list of test cases to run, or \'all\' to run every '
'test. Available tests: %s' % ' '.join(_TEST_CASES))
help='Comma-separated list of test cases to run. Available tests: %s, '
'(or \'all\' to run every test). '
'Alternative tests not included in \'all\': %s' %
(','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
argp.add_argument(
'--bootstrap_file',
default='',
@ -237,6 +250,12 @@ _BOOTSTRAP_TEMPLATE = """
_TESTS_TO_FAIL_ON_RPC_FAILURE = [
'new_instance_group_receives_traffic', 'ping_pong', 'round_robin'
]
# Tests that run UnaryCall and EmptyCall.
_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
# Tests that make UnaryCall with test metadata.
_TESTS_TO_SEND_METADATA = ['header_matching']
_TEST_METADATA_KEY = 'xds_md'
_TEST_METADATA_VALUE = 'exact_match'
_PATH_MATCHER_NAME = 'path-matcher'
_BASE_TEMPLATE_NAME = 'test-template'
_BASE_INSTANCE_GROUP_NAME = 'test-ig'
@ -348,6 +367,29 @@ def compare_distributions(actual_distribution, expected_distribution,
return True
def compare_expected_instances(stats, expected_instances):
"""Compare if stats have expected instances for each type of RPC.
Args:
stats: LoadBalancerStatsResponse reported by interop client.
expected_instances: a dict with key as the RPC type (string), value as
the expected backend instances (list of strings).
Returns:
Returns true if the instances are expected. False if not.
"""
for rpc_type, expected_peers in expected_instances.items():
rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
peers = list(rpcs_by_peer.keys())
if set(peers) != set(expected_peers):
logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
peers, expected_peers)
return False
return True
def test_backends_restart(gcp, backend_service, instance_group):
logger.info('Running test_backends_restart')
instance_names = get_instance_names(gcp, instance_group)
@ -629,19 +671,20 @@ def test_secondary_locality_gets_requests_on_primary_failure(
patch_backend_instances(gcp, backend_service, [primary_instance_group])
def test_traffic_splitting(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic going to original_backend_service. Then
# it updates URL-map to set default action to traffic splitting between
# original and alternate. It waits for all backends in both services to
# receive traffic, then verifies that weights are expected.
logger.info('Running test_traffic_splitting')
def prepare_services_for_urlmap_tests(gcp, original_backend_service,
instance_group, alternate_backend_service,
same_zone_instance_group):
'''
This function prepares the services to be ready for tests that modifies
urlmaps.
Returns:
Returns original and alternate backend names as lists of strings.
'''
# The config validation for proxyless doesn't allow setting
# default_route_action. To test traffic splitting, we need to set the
# route action to weighted clusters. Disable validate
# validate_for_proxyless for this test. This can be removed when
# validation accepts default_route_action.
# default_route_action or route_rules. Disable validate
# validate_for_proxyless for this test. This can be removed when validation
# accepts default_route_action.
logger.info('disabling validate_for_proxyless in target proxy')
set_validate_for_proxyless(gcp, False)
@ -665,6 +708,20 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
logger.info('waiting for traffic to all go to original backends')
wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
_WAIT_FOR_STATS_SEC)
return original_backend_instances, alternate_backend_instances
def test_traffic_splitting(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic going to original_backend_service. Then
# it updates URL-map to set default action to traffic splitting between
# original and alternate. It waits for all backends in both services to
# receive traffic, then verifies that weights are expected.
logger.info('Running test_traffic_splitting')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try:
# Patch urlmap, change route action to traffic splitting between
@ -728,6 +785,157 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
set_validate_for_proxyless(gcp, True)
def test_path_matching(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic (UnaryCall and EmptyCall) going to
# original_backend_service.
#
# Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
# go different backends. It waits for all backends in both services to
# receive traffic, then verifies that traffic goes to the expected
# backends.
logger.info('Running test_path_matching')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try:
# A list of tuples (route_rules, expected_instances).
test_cases = [
(
[{
'priority': 0,
# FullPath EmptyCall -> alternate_backend_service.
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
}],
'service': alternate_backend_service.url
}],
{
"EmptyCall": alternate_backend_instances,
"UnaryCall": original_backend_instances
}),
(
[{
'priority': 0,
# Prefix UnaryCall -> alternate_backend_service.
'matchRules': [{
'prefixMatch': '/grpc.testing.TestService/Unary'
}],
'service': alternate_backend_service.url
}],
{
"UnaryCall": alternate_backend_instances,
"EmptyCall": original_backend_instances
})
]
for (route_rules, expected_instances) in test_cases:
logger.info('patching url map with %s -> alternative',
route_rules[0]['matchRules'])
patch_url_map_backend_service(gcp,
original_backend_service,
route_rules=route_rules)
# Wait for traffic to go to both services.
logger.info(
'waiting for traffic to go to all backends (including alternate)'
)
wait_until_all_rpcs_go_to_given_backends(
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
retry_count = 10
# Each attempt takes about 10 seconds, 10 retries is equivalent to 100
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
if not stats.rpcs_by_method:
raise ValueError(
'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
)
logger.info('attempt %d', i)
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_instances(gcp, alternate_backend_service, [])
set_validate_for_proxyless(gcp, True)
def test_header_matching(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic (UnaryCall and EmptyCall) going to
# original_backend_service.
#
# Then it updates URL-map to add routes, to make RPCs with test headers to
# go to different backends. It waits for all backends in both services to
# receive traffic, then verifies that traffic goes to the expected
# backends.
logger.info('Running test_header_matching')
original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group)
try:
# A list of tuples (route_rules, expected_instances).
test_cases = [(
[{
'priority': 0,
# Header ExactMatch -> alternate_backend_service.
# EmptyCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'exactMatch': _TEST_METADATA_VALUE
}]
}],
'service': alternate_backend_service.url
}],
{
"EmptyCall": alternate_backend_instances,
"UnaryCall": original_backend_instances
})]
for (route_rules, expected_instances) in test_cases:
logger.info('patching url map with %s -> alternative',
route_rules[0]['matchRules'])
patch_url_map_backend_service(gcp,
original_backend_service,
route_rules=route_rules)
# Wait for traffic to go to both services.
logger.info(
'waiting for traffic to go to all backends (including alternate)'
)
wait_until_all_rpcs_go_to_given_backends(
original_backend_instances + alternate_backend_instances,
_WAIT_FOR_STATS_SEC)
retry_count = 10
# Each attempt takes about 10 seconds, 10 retries is equivalent to 100
# seconds timeout.
for i in range(retry_count):
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
if not stats.rpcs_by_method:
raise ValueError(
'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
)
logger.info('attempt %d', i)
if compare_expected_instances(stats, expected_instances):
logger.info("success")
break
finally:
patch_url_map_backend_service(gcp, original_backend_service)
patch_backend_instances(gcp, alternate_backend_service, [])
set_validate_for_proxyless(gcp, True)
def set_serving_status(instances, service_port, serving):
for instance in instances:
with grpc.insecure_channel('%s:%d' %
@ -1208,7 +1416,8 @@ def resize_instance_group(gcp,
def patch_url_map_backend_service(gcp,
backend_service=None,
services_with_weights=None):
services_with_weights=None,
route_rules=None):
'''change url_map's backend service
Only one of backend_service and service_with_weights can be not None.
@ -1230,6 +1439,7 @@ def patch_url_map_backend_service(gcp,
'name': _PATH_MATCHER_NAME,
'defaultService': default_service,
'defaultRouteAction': default_route_action,
'routeRules': route_rules,
}]
}
logger.debug('Sending GCP request with body=%s', config)
@ -1314,15 +1524,6 @@ def wait_for_healthy_backends(gcp,
(timeout_sec, result))
def wait_for_config_propagation(gcp, instance_group, client_cmd, client_env):
"""Use client to verify config propagation from GCP->TD->client"""
instance_names = get_instance_names(gcp, instance_group)
client_process = subprocess.Popen(shlex.split(client_cmd), env=client_env)
wait_until_all_rpcs_go_to_given_backends(instance_names,
_WAIT_FOR_VALID_CONFIG_SEC)
client_process.terminate()
def get_instance_names(gcp, instance_group):
instance_names = []
result = gcp.compute.instanceGroups().listInstances(
@ -1504,22 +1705,32 @@ try:
test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
test_log_file = open(test_log_filename, 'w+')
client_process = None
if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
else:
rpcs_to_send = '--rpc="UnaryCall"'
if test_case in _TESTS_TO_SEND_METADATA:
metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format(
key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE)
else:
metadata_to_send = '--metadata=""'
if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE:
wait_for_config_propagation(
gcp, instance_group,
args.client_cmd.format(server_uri=server_uri,
stats_port=args.stats_port,
qps=args.qps,
fail_on_failed_rpc=False),
client_env)
fail_on_failed_rpc = '--fail_on_failed_rpc=true'
else:
fail_on_failed_rpc = '--fail_on_failed_rpc=false'
client_cmd = shlex.split(
args.client_cmd.format(server_uri=server_uri,
stats_port=args.stats_port,
qps=args.qps,
fail_on_failed_rpc=fail_on_failed_rpc))
client_cmd_formatted = args.client_cmd.format(
server_uri=server_uri,
stats_port=args.stats_port,
qps=args.qps,
fail_on_failed_rpc=fail_on_failed_rpc,
rpcs_to_send=rpcs_to_send,
metadata_to_send=metadata_to_send)
logger.debug('running client: %s', client_cmd_formatted)
client_cmd = shlex.split(client_cmd_formatted)
try:
client_process = subprocess.Popen(client_cmd,
env=client_env,
@ -1559,6 +1770,14 @@ try:
test_traffic_splitting(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
elif test_case == 'path_matching':
test_path_matching(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
elif test_case == 'header_matching':
test_header_matching(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
else:
logger.error('Unknown test case: %s', test_case)
sys.exit(1)

Loading…
Cancel
Save