Add xDS TD API interop tests: metadata_filter, forwarding_rule_port_match, api_listener (#26065)

pull/26143/head
yifeizhuang 4 years ago committed by GitHub
parent 4c40ee3f78
commit 68aed165a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 424
      tools/run_tests/run_xds_tests.py

@ -84,6 +84,10 @@ _TEST_CASES = [
'traffic_splitting',
'path_matching',
'header_matching',
'api_listener',
'forwarding_rule_port_match',
'forwarding_rule_default_port',
'metadata_filter',
]
# Valid test cases, but not in all. So the tests can only run manually, and
@ -214,8 +218,9 @@ argp.add_argument(
argp.add_argument('--network',
default='global/networks/default',
help='GCP network to use')
_DEFAULT_PORT_RANGE = '8080:8280'
argp.add_argument('--service_port_range',
default='8080:8280',
default=_DEFAULT_PORT_RANGE,
type=parse_port_range,
help='Listening port for created gRPC backends. Specified as '
'either a single int or as a range in the format min:max, in '
@ -479,6 +484,21 @@ def wait_until_all_rpcs_go_to_given_backends(backends,
allow_failures=False)
def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
start_time = time.time()
while time.time() - start_time <= timeout_sec:
stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
error_msg = None
rpcs_by_peer = stats.rpcs_by_peer
for backend in backends:
if backend in rpcs_by_peer:
error_msg = 'Unexpected backend %s receives load' % backend
break
if not error_msg:
return
raise Exception('Unexpected RPCs going to given backends')
def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
'''Block until the test client reaches the state with the given number
of RPCs being outstanding stably.
@ -951,6 +971,303 @@ def prepare_services_for_urlmap_tests(gcp, original_backend_service,
return original_backend_instances, alternate_backend_instances
def test_metadata_filter(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
logger.info("Running test_metadata_filter")
wait_for_healthy_backends(gcp, original_backend_service, instance_group)
original_backend_instances = get_instance_names(gcp, instance_group)
alternate_backend_instances = get_instance_names(gcp,
same_zone_instance_group)
patch_backend_service(gcp, alternate_backend_service,
[same_zone_instance_group])
wait_for_healthy_backends(gcp, alternate_backend_service,
same_zone_instance_group)
try:
with open(bootstrap_path) as f:
md = json.load(f)['node']['metadata']
match_labels = []
for k, v in md.items():
match_labels.append({'name': k, 'value': v})
not_match_labels = [{'name': 'fake', 'value': 'fail'}]
test_route_rules = [
# test MATCH_ALL
[
{
'priority': 0,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ALL',
'filterLabels': not_match_labels
}]
}],
'service': original_backend_service.url
},
{
'priority': 1,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ALL',
'filterLabels': match_labels
}]
}],
'service': alternate_backend_service.url
},
],
# test mixing MATCH_ALL and MATCH_ANY
# test MATCH_ALL: super set labels won't match
[
{
'priority': 0,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ALL',
'filterLabels': not_match_labels + match_labels
}]
}],
'service': original_backend_service.url
},
{
'priority': 1,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ANY',
'filterLabels': not_match_labels + match_labels
}]
}],
'service': alternate_backend_service.url
},
],
# test MATCH_ANY
[
{
'priority': 0,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ANY',
'filterLabels': not_match_labels
}]
}],
'service': original_backend_service.url
},
{
'priority': 1,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ANY',
'filterLabels': not_match_labels + match_labels
}]
}],
'service': alternate_backend_service.url
},
],
# test match multiple route rules
[
{
'priority': 0,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ANY',
'filterLabels': match_labels
}]
}],
'service': alternate_backend_service.url
},
{
'priority': 1,
'matchRules': [{
'prefixMatch':
'/',
'metadataFilters': [{
'filterMatchCriteria': 'MATCH_ALL',
'filterLabels': match_labels
}]
}],
'service': original_backend_service.url
},
]
]
for route_rules in test_route_rules:
wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
_WAIT_FOR_STATS_SEC)
patch_url_map_backend_service(gcp,
original_backend_service,
route_rules=route_rules)
wait_until_no_rpcs_go_to_given_backends(original_backend_instances,
_WAIT_FOR_STATS_SEC)
wait_until_all_rpcs_go_to_given_backends(
alternate_backend_instances, _WAIT_FOR_STATS_SEC)
patch_url_map_backend_service(gcp, original_backend_service)
finally:
patch_backend_service(gcp, alternate_backend_service, [])
def test_api_listener(gcp, backend_service, instance_group,
alternate_backend_service):
logger.info("Running api_listener")
try:
wait_for_healthy_backends(gcp, backend_service, instance_group)
backend_instances = get_instance_names(gcp, instance_group)
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
# create a second suite of map+tp+fr with the same host name in host rule
# and we have to disable proxyless validation because it needs `0.0.0.0`
# ip address in fr for proxyless and also we violate ip:port uniqueness
# for test purpose. See https://github.com/grpc/grpc-java/issues/8009
new_config_suffix = '2'
create_url_map(gcp, url_map_name + new_config_suffix, backend_service,
service_host_name)
create_target_proxy(gcp, target_proxy_name + new_config_suffix, False)
if not gcp.service_port:
raise Exception(
'Faied to find a valid port for the forwarding rule')
potential_ip_addresses = []
max_attempts = 10
for i in range(max_attempts):
potential_ip_addresses.append('10.10.10.%d' %
(random.randint(0, 255)))
create_global_forwarding_rule(gcp,
forwarding_rule_name + new_config_suffix,
[gcp.service_port],
potential_ip_addresses)
if gcp.service_port != _DEFAULT_SERVICE_PORT:
patch_url_map_host_rule_with_port(gcp,
url_map_name + new_config_suffix,
backend_service,
service_host_name)
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
delete_global_forwarding_rule(gcp, forwarding_rule_name)
delete_target_proxy(gcp, target_proxy_name)
delete_url_map(gcp, url_map_name)
verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS *
args.qps)
for i in range(verify_attempts):
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
# delete host rule for the original host name
patch_url_map_backend_service(gcp, alternate_backend_service)
wait_until_no_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
finally:
delete_global_forwarding_rule(gcp,
forwarding_rule_name + new_config_suffix)
delete_target_proxy(gcp, target_proxy_name + new_config_suffix)
delete_url_map(gcp, url_map_name + new_config_suffix)
create_url_map(gcp, url_map_name, backend_service, service_host_name)
create_target_proxy(gcp, target_proxy_name)
create_global_forwarding_rule(gcp, forwarding_rule_name,
potential_service_ports)
if gcp.service_port != _DEFAULT_SERVICE_PORT:
patch_url_map_host_rule_with_port(gcp, url_map_name,
backend_service,
service_host_name)
server_uri = service_host_name + ':' + str(gcp.service_port)
else:
server_uri = service_host_name
return server_uri
def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
logger.info("Running test_forwarding_rule_port_match")
try:
wait_for_healthy_backends(gcp, backend_service, instance_group)
backend_instances = get_instance_names(gcp, instance_group)
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
delete_global_forwarding_rule(gcp)
create_global_forwarding_rule(gcp, forwarding_rule_name, [
x for x in parse_port_range(_DEFAULT_PORT_RANGE)
if x != gcp.service_port
])
wait_until_no_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
finally:
delete_global_forwarding_rule(gcp)
create_global_forwarding_rule(gcp, forwarding_rule_name,
potential_service_ports)
if gcp.service_port != _DEFAULT_SERVICE_PORT:
patch_url_map_host_rule_with_port(gcp, url_map_name,
backend_service,
service_host_name)
server_uri = service_host_name + ':' + str(gcp.service_port)
else:
server_uri = service_host_name
return server_uri
def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
logger.info("Running test_forwarding_rule_default_port")
try:
wait_for_healthy_backends(gcp, backend_service, instance_group)
backend_instances = get_instance_names(gcp, instance_group)
if gcp.service_port == _DEFAULT_SERVICE_PORT:
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
delete_global_forwarding_rule(gcp)
create_global_forwarding_rule(gcp, forwarding_rule_name,
parse_port_range(_DEFAULT_PORT_RANGE))
patch_url_map_host_rule_with_port(gcp, url_map_name,
backend_service,
service_host_name)
wait_until_no_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
# expect success when no port in client request service uri, and no port in url-map
delete_global_forwarding_rule(gcp)
delete_target_proxy(gcp)
delete_url_map(gcp)
create_url_map(gcp, url_map_name, backend_service, service_host_name)
create_target_proxy(gcp, gcp.target_proxy.name, False)
potential_ip_addresses = []
max_attempts = 10
for i in range(max_attempts):
potential_ip_addresses.append('10.10.10.%d' %
(random.randint(0, 255)))
create_global_forwarding_rule(gcp, forwarding_rule_name, [80],
potential_ip_addresses)
wait_until_all_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
# expect failure when no port in client request uri, but specify port in url-map
patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service,
service_host_name)
wait_until_no_rpcs_go_to_given_backends(backend_instances,
_WAIT_FOR_STATS_SEC)
finally:
delete_global_forwarding_rule(gcp)
delete_target_proxy(gcp)
delete_url_map(gcp)
create_url_map(gcp, url_map_name, backend_service, service_host_name)
create_target_proxy(gcp, target_proxy_name)
create_global_forwarding_rule(gcp, forwarding_rule_name,
potential_service_ports)
if gcp.service_port != _DEFAULT_SERVICE_PORT:
patch_url_map_host_rule_with_port(gcp, url_map_name,
backend_service,
service_host_name)
server_uri = service_host_name + ':' + str(gcp.service_port)
else:
server_uri = service_host_name
return server_uri
def test_traffic_splitting(gcp, original_backend_service, instance_group,
alternate_backend_service, same_zone_instance_group):
# This test start with all traffic going to original_backend_service. Then
@ -2179,34 +2496,39 @@ def create_target_proxy(gcp, name, validate_for_proxyless=True):
gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
def create_global_forwarding_rule(gcp, name, potential_ports):
def create_global_forwarding_rule(gcp,
name,
potential_ports,
potential_ip_addresses=['0.0.0.0']):
if gcp.alpha_compute:
compute_to_use = gcp.alpha_compute
else:
compute_to_use = gcp.compute
for port in potential_ports:
try:
config = {
'name': name,
'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
'portRange': str(port),
'IPAddress': '0.0.0.0',
'network': args.network,
'target': gcp.target_proxy.url,
}
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.globalForwardingRules().insert(
project=gcp.project,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.global_forwarding_rule = GcpResource(config['name'],
result['targetLink'])
gcp.service_port = port
return
except googleapiclient.errors.HttpError as http_error:
logger.warning(
'Got error %s when attempting to create forwarding rule to '
'0.0.0.0:%d. Retrying with another port.' % (http_error, port))
for ip_address in potential_ip_addresses:
try:
config = {
'name': name,
'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
'portRange': str(port),
'IPAddress': ip_address,
'network': args.network,
'target': gcp.target_proxy.url,
}
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.globalForwardingRules().insert(
project=gcp.project,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.global_forwarding_rule = GcpResource(
config['name'], result['targetLink'])
gcp.service_port = port
return
except googleapiclient.errors.HttpError as http_error:
logger.warning(
'Got error %s when attempting to create forwarding rule to '
'%s:%d. Retrying with another port.' %
(http_error, ip_address, port))
def get_health_check(gcp, health_check_name):
@ -2270,39 +2592,49 @@ def get_instance_group(gcp, zone, instance_group_name):
return instance_group
def delete_global_forwarding_rule(gcp):
def delete_global_forwarding_rule(gcp, name=None):
if name:
forwarding_rule_to_delete = name
else:
forwarding_rule_to_delete = gcp.global_forwarding_rule.name
try:
result = gcp.compute.globalForwardingRules().delete(
project=gcp.project,
forwardingRule=gcp.global_forwarding_rule.name).execute(
forwardingRule=forwarding_rule_to_delete).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_target_proxy(gcp):
def delete_target_proxy(gcp, name=None):
if name:
proxy_to_delete = name
else:
proxy_to_delete = gcp.target_proxy.name
try:
if gcp.alpha_compute:
result = gcp.alpha_compute.targetGrpcProxies().delete(
project=gcp.project,
targetGrpcProxy=gcp.target_proxy.name).execute(
project=gcp.project, targetGrpcProxy=proxy_to_delete).execute(
num_retries=_GCP_API_RETRIES)
else:
result = gcp.compute.targetHttpProxies().delete(
project=gcp.project,
targetHttpProxy=gcp.target_proxy.name).execute(
project=gcp.project, targetHttpProxy=proxy_to_delete).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_url_map(gcp):
def delete_url_map(gcp, name=None):
if name:
url_map_to_delete = name
else:
url_map_to_delete = gcp.url_map.name
try:
result = gcp.compute.urlMaps().delete(
project=gcp.project,
urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
@ -2725,6 +3057,7 @@ try:
if original_grpc_verbosity:
client_env['GRPC_VERBOSITY'] = original_grpc_verbosity
bootstrap_server_features = []
if gcp.service_port == _DEFAULT_SERVICE_PORT:
server_uri = service_host_name
else:
@ -2759,6 +3092,17 @@ try:
logger.info('skipping test %s due to missing alpha support',
test_case)
continue
if test_case in [
'api_listener', 'forwarding_rule_port_match',
'forwarding_rule_default_port'
] and CLIENT_HOSTS:
logger.info(
'skipping test %s because test configuration is'
'not compatible with client processes on existing'
'client hosts', test_case)
continue
if test_case == 'forwarding_rule_default_port':
server_uri = service_host_name
result = jobset.JobResult()
log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
if not os.path.exists(log_dir):
@ -2867,6 +3211,20 @@ try:
test_timeout(gcp, backend_service, instance_group)
elif test_case == 'fault_injection':
test_fault_injection(gcp, backend_service, instance_group)
elif test_case == 'api_listener':
server_uri = test_api_listener(gcp, backend_service,
instance_group,
alternate_backend_service)
elif test_case == 'forwarding_rule_port_match':
server_uri = test_forwarding_rule_port_match(
gcp, backend_service, instance_group)
elif test_case == 'forwarding_rule_default_port':
server_uri = test_forwarding_rule_default_port(
gcp, backend_service, instance_group)
elif test_case == 'metadata_filter':
test_metadata_filter(gcp, backend_service, instance_group,
alternate_backend_service,
same_zone_instance_group)
elif test_case == 'csds':
test_csds(gcp, backend_service, instance_group, server_uri)
else:

Loading…
Cancel
Save