[PSM interop] Introduce isort/pylint to the GKE framework (#29476)

* [PSM interop] Introduce isort/pylint to the GKE framework

* Update the rpc_types

* Update variable type annotation style in tests/*.py
pull/29538/head^2
Lidi Zheng 3 years ago committed by GitHub
parent ad3f591af3
commit 697c438df6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .pylintrc
  2. 2
      .pylintrc-examples
  3. 2
      .pylintrc-tests
  4. 1
      tools/distrib/isort_code.sh
  5. 3
      tools/distrib/pylint_code.sh
  6. 1
      tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py
  7. 2
      tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py
  8. 4
      tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py
  9. 2
      tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py
  10. 8
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py
  11. 49
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  12. 15
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py
  13. 5
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py
  14. 45
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py
  15. 7
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  16. 15
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  17. 4
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py
  18. 13
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py
  19. 3
      tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py
  20. 8
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  21. 9
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  22. 26
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  23. 7
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py
  24. 35
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
  25. 41
      tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py
  26. 12
      tools/run_tests/xds_k8s_test_driver/tests/api_listener_test.py
  27. 9
      tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py
  28. 6
      tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py
  29. 22
      tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py
  30. 41
      tools/run_tests/xds_k8s_test_driver/tests/failover_test.py
  31. 25
      tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py
  32. 20
      tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py
  33. 10
      tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py
  34. 4
      tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py
  35. 25
      tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py
  36. 10
      tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py
  37. 11
      tools/run_tests/xds_k8s_test_driver/tests/url_map/metadata_filter_test.py
  38. 14
      tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py
  39. 25
      tools/run_tests/xds_k8s_test_driver/tests/url_map/retry_test.py
  40. 10
      tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py

@ -97,5 +97,7 @@ disable=
useless-object-inheritance, useless-object-inheritance,
# NOTE(lidiz): the import order will be enforced by isort instead # NOTE(lidiz): the import order will be enforced by isort instead
wrong-import-order, wrong-import-order,
# TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint
unsubscriptable-object,
# NOTE(sergiitk): yapf compatibility, ref #25071 # NOTE(sergiitk): yapf compatibility, ref #25071
bad-continuation, bad-continuation,

@ -100,5 +100,7 @@ disable=
useless-object-inheritance, useless-object-inheritance,
# NOTE(lidiz): the import order will be enforced by isort instead # NOTE(lidiz): the import order will be enforced by isort instead
wrong-import-order, wrong-import-order,
# TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint
unsubscriptable-object,
# NOTE(sergiitk): yapf compatibility, ref #25071 # NOTE(sergiitk): yapf compatibility, ref #25071
bad-continuation, bad-continuation,

@ -126,5 +126,7 @@ disable=
useless-object-inheritance, useless-object-inheritance,
# NOTE(lidiz): the import order will be enforced by isort instead # NOTE(lidiz): the import order will be enforced by isort instead
wrong-import-order, wrong-import-order,
# TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint
unsubscriptable-object,
# NOTE(sergiitk): yapf compatibility, ref #25071 # NOTE(sergiitk): yapf compatibility, ref #25071
bad-continuation, bad-continuation,

@ -31,6 +31,7 @@ DIRS=(
'test' 'test'
'tools' 'tools'
'setup.py' 'setup.py'
'tools/run_tests/xds_k8s_test_driver'
) )
VIRTUALENV=isort_virtual_environment VIRTUALENV=isort_virtual_environment

@ -28,11 +28,14 @@ DIRS=(
'src/python/grpcio_reflection/grpc_reflection' 'src/python/grpcio_reflection/grpc_reflection'
'src/python/grpcio_testing/grpc_testing' 'src/python/grpcio_testing/grpc_testing'
'src/python/grpcio_status/grpc_status' 'src/python/grpcio_status/grpc_status'
'tools/run_tests/xds_k8s_test_driver/bin'
'tools/run_tests/xds_k8s_test_driver/framework'
) )
TEST_DIRS=( TEST_DIRS=(
'src/python/grpcio_tests/tests' 'src/python/grpcio_tests/tests'
'src/python/grpcio_tests/tests_gevent' 'src/python/grpcio_tests/tests_gevent'
'tools/run_tests/xds_k8s_test_driver/tests'
) )
VIRTUALENV=python_pylint_venv VIRTUALENV=python_pylint_venv

@ -178,7 +178,6 @@ def main(argv):
# Resource names. # Resource names.
resource_prefix: str = xds_flags.RESOURCE_PREFIX.value resource_prefix: str = xds_flags.RESOURCE_PREFIX.value
resource_suffix: str = xds_flags.RESOURCE_SUFFIX.value
# Server # Server
server_name = xds_flags.SERVER_NAME.value server_name = xds_flags.SERVER_NAME.value

@ -67,7 +67,7 @@ flags.mark_flag_as_required("resource_suffix")
KubernetesServerRunner = server_app.KubernetesServerRunner KubernetesServerRunner = server_app.KubernetesServerRunner
def main(argv): def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
if len(argv) > 1: if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.') raise app.UsageError('Too many command-line arguments.')

@ -14,7 +14,7 @@
"""This contains common helpers for working with dates and time.""" """This contains common helpers for working with dates and time."""
import datetime import datetime
import re import re
from typing import Pattern from typing import Optional, Pattern
RE_ZERO_OFFSET: Pattern[str] = re.compile(r'[+\-]00:?00$') RE_ZERO_OFFSET: Pattern[str] = re.compile(r'[+\-]00:?00$')
@ -29,7 +29,7 @@ def shorten_utc_zone(utc_datetime_str: str) -> str:
return RE_ZERO_OFFSET.sub('Z', utc_datetime_str) return RE_ZERO_OFFSET.sub('Z', utc_datetime_str)
def iso8601_utc_time(timedelta: datetime.timedelta = None) -> str: def iso8601_utc_time(timedelta: Optional[datetime.timedelta] = None) -> str:
"""Return datetime relative to current in ISO-8601 format, UTC tz.""" """Return datetime relative to current in ISO-8601 format, UTC tz."""
time: datetime.datetime = utc_now() time: datetime.datetime = utc_now()
if timedelta: if timedelta:

@ -70,7 +70,7 @@ def exponential_retryer_with_timeout(
def constant_retryer(*, def constant_retryer(*,
wait_fixed: timedelta, wait_fixed: timedelta,
attempts: int = 0, attempts: int = 0,
timeout: timedelta = None, timeout: Optional[timedelta] = None,
retry_on_exceptions: Optional[Sequence[Any]] = None, retry_on_exceptions: Optional[Sequence[Any]] = None,
logger: Optional[logging.Logger] = None, logger: Optional[logging.Logger] = None,
log_level: Optional[int] = logging.DEBUG) -> Retrying: log_level: Optional[int] = logging.DEBUG) -> Retrying:

@ -149,8 +149,9 @@ class GcpApiManager:
raise NotImplementedError(f'Network Services {version} not supported') raise NotImplementedError(f'Network Services {version} not supported')
@staticmethod
@functools.lru_cache(None) @functools.lru_cache(None)
def secrets(self, version): def secrets(version: str):
if version == 'v1': if version == 'v1':
return secretmanager_v1.SecretManagerServiceClient() return secretmanager_v1.SecretManagerServiceClient()
@ -263,7 +264,7 @@ class OperationError(Error):
""" """
api_name: str api_name: str
name: str name: str
metadata: str metadata: Any
code_name: code_pb2.Code code_name: code_pb2.Code
error: status_pb2.Status error: status_pb2.Status
@ -431,7 +432,8 @@ class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
return False return False
# TODO(sergiitk): Use ResponseError and TransportError # TODO(sergiitk): Use ResponseError and TransportError
def _execute(self, def _execute( # pylint: disable=arguments-differ
self,
request: HttpRequest, request: HttpRequest,
timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
operation = request.execute(num_retries=self._GCP_API_RETRIES) operation = request.execute(num_retries=self._GCP_API_RETRIES)

@ -26,7 +26,7 @@ from framework.infrastructure import gcp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ComputeV1(gcp.api.GcpProjectApiResource): class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-public-methods
# TODO(sergiitk): move someplace better # TODO(sergiitk): move someplace better
_WAIT_FOR_BACKEND_SEC = 60 * 10 _WAIT_FOR_BACKEND_SEC = 60 * 10
_WAIT_FOR_OPERATION_SEC = 60 * 10 _WAIT_FOR_OPERATION_SEC = 60 * 10
@ -58,7 +58,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
name: str, name: str,
protocol: HealthCheckProtocol, protocol: HealthCheckProtocol,
*, *,
port: Optional[int] = None) -> GcpResource: port: Optional[int] = None) -> 'GcpResource':
if protocol is self.HealthCheckProtocol.TCP: if protocol is self.HealthCheckProtocol.TCP:
health_check_field = 'tcpHealthCheck' health_check_field = 'tcpHealthCheck'
elif protocol is self.HealthCheckProtocol.GRPC: elif protocol is self.HealthCheckProtocol.GRPC:
@ -88,7 +88,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_firewall_rule(self, name: str, network_url: str, def create_firewall_rule(self, name: str, network_url: str,
source_ranges: List[str], source_ranges: List[str],
ports: List[str]) -> Optional[GcpResource]: ports: List[str]) -> Optional['GcpResource']:
try: try:
return self._insert_resource( return self._insert_resource(
self.api.firewalls(), { self.api.firewalls(), {
@ -107,7 +107,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
# TODO(lidiz) use status_code() when we upgrade googleapiclient # TODO(lidiz) use status_code() when we upgrade googleapiclient
if http_error.resp.status == 409: if http_error.resp.status == 409:
logger.debug('Firewall rule %s already existed', name) logger.debug('Firewall rule %s already existed', name)
return return None
else: else:
raise raise
@ -117,10 +117,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_backend_service_traffic_director( def create_backend_service_traffic_director(
self, self,
name: str, name: str,
health_check: GcpResource, health_check: 'GcpResource',
affinity_header: str = None, affinity_header: Optional[str] = None,
protocol: Optional[BackendServiceProtocol] = None, protocol: Optional[BackendServiceProtocol] = None,
subset_size: Optional[int] = None) -> GcpResource: subset_size: Optional[int] = None) -> 'GcpResource':
if not isinstance(protocol, self.BackendServiceProtocol): if not isinstance(protocol, self.BackendServiceProtocol):
raise TypeError(f'Unexpected Backend Service protocol: {protocol}') raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
body = { body = {
@ -144,7 +144,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
} }
return self._insert_resource(self.api.backendServices(), body) return self._insert_resource(self.api.backendServices(), body)
def get_backend_service_traffic_director(self, name: str) -> GcpResource: def get_backend_service_traffic_director(self, name: str) -> 'GcpResource':
return self._get_resource(self.api.backendServices(), return self._get_resource(self.api.backendServices(),
backendService=name) backendService=name)
@ -185,9 +185,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
name: str, name: str,
matcher_name: str, matcher_name: str,
src_hosts, src_hosts,
dst_default_backend_service: GcpResource, dst_default_backend_service: 'GcpResource',
dst_host_rule_match_backend_service: Optional[GcpResource] = None, dst_host_rule_match_backend_service: Optional['GcpResource'] = None,
) -> GcpResource: ) -> 'GcpResource':
if dst_host_rule_match_backend_service is None: if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service dst_host_rule_match_backend_service = dst_default_backend_service
return self._insert_resource( return self._insert_resource(
@ -206,10 +206,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
}], }],
}) })
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource: def create_url_map_with_content(self, url_map_body: Any) -> 'GcpResource':
return self._insert_resource(self.api.urlMaps(), url_map_body) return self._insert_resource(self.api.urlMaps(), url_map_body)
def patch_url_map(self, url_map: GcpResource, body, **kwargs): def patch_url_map(self, url_map: 'GcpResource', body, **kwargs):
self._patch_resource(collection=self.api.urlMaps(), self._patch_resource(collection=self.api.urlMaps(),
urlMap=url_map.name, urlMap=url_map.name,
body=body, body=body,
@ -221,9 +221,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_target_grpc_proxy( def create_target_grpc_proxy(
self, self,
name: str, name: str,
url_map: GcpResource, url_map: 'GcpResource',
validate_for_proxyless: bool = True, validate_for_proxyless: bool = True,
) -> GcpResource: ) -> 'GcpResource':
return self._insert_resource( return self._insert_resource(
self.api.targetGrpcProxies(), { self.api.targetGrpcProxies(), {
'name': name, 'name': name,
@ -238,8 +238,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_target_http_proxy( def create_target_http_proxy(
self, self,
name: str, name: str,
url_map: GcpResource, url_map: 'GcpResource',
) -> GcpResource: ) -> 'GcpResource':
return self._insert_resource(self.api.targetHttpProxies(), { return self._insert_resource(self.api.targetHttpProxies(), {
'name': name, 'name': name,
'url_map': url_map.url, 'url_map': url_map.url,
@ -252,10 +252,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_forwarding_rule(self, def create_forwarding_rule(self,
name: str, name: str,
src_port: int, src_port: int,
target_proxy: GcpResource, target_proxy: 'GcpResource',
network_url: str, network_url: str,
*, *,
ip_address: str = '0.0.0.0') -> GcpResource: ip_address: str = '0.0.0.0') -> 'GcpResource':
return self._insert_resource( return self._insert_resource(
self.api.globalForwardingRules(), self.api.globalForwardingRules(),
{ {
@ -367,14 +367,14 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
}).execute() }).execute()
def _get_resource(self, collection: discovery.Resource, def _get_resource(self, collection: discovery.Resource,
**kwargs) -> GcpResource: **kwargs) -> 'GcpResource':
resp = collection.get(project=self.project, **kwargs).execute() resp = collection.get(project=self.project, **kwargs).execute()
logger.info('Loaded compute resource:\n%s', logger.info('Loaded compute resource:\n%s',
self.resource_pretty_format(resp)) self.resource_pretty_format(resp))
return self.GcpResource(resp['name'], resp['selfLink']) return self.GcpResource(resp['name'], resp['selfLink'])
def _exists_resource(self, collection: discovery.Resource, def _exists_resource(
filter: str) -> bool: self, collection: discovery.Resource, filter: str) -> bool: # pylint: disable=redefined-builtin
resp = collection.list( resp = collection.list(
project=self.project, filter=filter, project=self.project, filter=filter,
maxResults=1).execute(num_retries=self._GCP_API_RETRIES) maxResults=1).execute(num_retries=self._GCP_API_RETRIES)
@ -384,7 +384,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
return 'items' in resp and resp['items'] return 'items' in resp and resp['items']
def _insert_resource(self, collection: discovery.Resource, def _insert_resource(self, collection: discovery.Resource,
body: Dict[str, Any]) -> GcpResource: body: Dict[str, Any]) -> 'GcpResource':
logger.info('Creating compute resource:\n%s', logger.info('Creating compute resource:\n%s',
self.resource_pretty_format(body)) self.resource_pretty_format(body))
resp = self._execute(collection.insert(project=self.project, body=body)) resp = self._execute(collection.insert(project=self.project, body=body))
@ -420,7 +420,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def _operation_status_done(operation): def _operation_status_done(operation):
return 'status' in operation and operation['status'] == 'DONE' return 'status' in operation and operation['status'] == 'DONE'
def _execute(self, def _execute( # pylint: disable=arguments-differ
self,
request, request,
*, *,
test_success_fn=None, test_success_fn=None,

@ -33,7 +33,6 @@ class EtagConflict(gcp.api.Error):
https://cloud.google.com/iam/docs/policies#etag https://cloud.google.com/iam/docs/policies#etag
""" """
pass
def handle_etag_conflict(func): def handle_etag_conflict(func):
@ -54,7 +53,7 @@ def _replace_binding(policy: 'Policy', binding: 'Policy.Binding',
new_bindings = set(policy.bindings) new_bindings = set(policy.bindings)
new_bindings.discard(binding) new_bindings.discard(binding)
new_bindings.add(new_binding) new_bindings.add(new_binding)
return dataclasses.replace(policy, bindings=frozenset(new_bindings)) return dataclasses.replace(policy, bindings=frozenset(new_bindings)) # pylint: disable=too-many-function-args
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
@ -190,7 +189,7 @@ class IamV1(gcp.api.GcpProjectApiResource):
# Otherwise conditions are omitted, and role names returned with a suffix, # Otherwise conditions are omitted, and role names returned with a suffix,
# f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0 # f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0
# https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version # https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version
POLICY_VERSION: str = 3 POLICY_VERSION: int = 3
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.iam('v1'), project) super().__init__(api_manager.iam('v1'), project)
@ -271,8 +270,9 @@ class IamV1(gcp.api.GcpProjectApiResource):
updated_binding = Policy.Binding(role, frozenset([member])) updated_binding = Policy.Binding(role, frozenset([member]))
else: else:
updated_members: FrozenSet[str] = binding.members.union({member}) updated_members: FrozenSet[str] = binding.members.union({member})
updated_binding: Policy.Binding = dataclasses.replace( updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args
binding, members=updated_members) binding,
members=updated_members)
updated_policy: Policy = _replace_binding(policy, binding, updated_policy: Policy = _replace_binding(policy, binding,
updated_binding) updated_binding)
@ -303,8 +303,9 @@ class IamV1(gcp.api.GcpProjectApiResource):
return return
updated_members: FrozenSet[str] = binding.members.difference({member}) updated_members: FrozenSet[str] = binding.members.difference({member})
updated_binding: Policy.Binding = dataclasses.replace( updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args
binding, members=updated_members) binding,
members=updated_members)
updated_policy: Policy = _replace_binding(policy, binding, updated_policy: Policy = _replace_binding(policy, binding,
updated_binding) updated_binding)
self.set_service_account_iam_policy(account, updated_policy) self.set_service_account_iam_policy(account, updated_policy)

@ -92,6 +92,9 @@ class _NetworkSecurityBase(gcp.api.GcpStandardCloudApiResource,
metaclass=abc.ABCMeta): metaclass=abc.ABCMeta):
"""Base class for NetworkSecurity APIs.""" """Base class for NetworkSecurity APIs."""
# TODO(https://github.com/grpc/grpc/issues/29532) remove pylint disable
# pylint: disable=abstract-method
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networksecurity(self.api_version), project) super().__init__(api_manager.networksecurity(self.api_version), project)
# Shortcut to projects/*/locations/ endpoints # Shortcut to projects/*/locations/ endpoints
@ -101,7 +104,7 @@ class _NetworkSecurityBase(gcp.api.GcpStandardCloudApiResource,
def api_name(self) -> str: def api_name(self) -> str:
return 'networksecurity' return 'networksecurity'
def _execute(self, *args, **kwargs): # pylint: disable=signature-differs def _execute(self, *args, **kwargs): # pylint: disable=signature-differs,arguments-differ
# Workaround TD bug: throttled operations are reported as internal. # Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578 # Ref b/175345578
retryer = tenacity.Retrying( retryer = tenacity.Retrying(

@ -81,7 +81,7 @@ class GrpcRoute:
case_sensitive: Optional[bool] case_sensitive: Optional[bool]
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'MethodMatch': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.MethodMatch':
return cls( return cls(
type=d.get("type"), type=d.get("type"),
grpc_service=d.get("grpcService"), grpc_service=d.get("grpcService"),
@ -96,7 +96,7 @@ class GrpcRoute:
value: str value: str
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'HeaderMatch': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.HeaderMatch':
return cls( return cls(
type=d.get("type"), type=d.get("type"),
key=d["key"], key=d["key"],
@ -105,17 +105,17 @@ class GrpcRoute:
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class RouteMatch: class RouteMatch:
method: Optional['MethodMatch'] method: Optional['GrpcRoute.MethodMatch']
headers: Tuple['HeaderMatch'] headers: Tuple['GrpcRoute.HeaderMatch']
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteMatch': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteMatch':
return cls( return cls(
method=MethodMatch.from_response(d["method"]) method=GrpcRoute.MethodMatch.from_response(d["method"])
if "method" in d else None, if "method" in d else None,
headers=tuple( headers=tuple(
HeaderMatch.from_response(h) for h in d["headers"]) GrpcRoute.HeaderMatch.from_response(h)
if "headers" in d else (), for h in d["headers"]) if "headers" in d else (),
) )
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
@ -124,7 +124,7 @@ class GrpcRoute:
weight: Optional[int] weight: Optional[int]
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'Destination': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.Destination':
return cls( return cls(
service_name=d["serviceName"], service_name=d["serviceName"],
weight=d.get("weight"), weight=d.get("weight"),
@ -132,37 +132,39 @@ class GrpcRoute:
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class RouteAction: class RouteAction:
destinations: List['Destination']
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteAction': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteAction':
destinations = [ destinations = [
Destination.from_response(dest) for dest in d["destinations"] GrpcRoute.Destination.from_response(dest)
for dest in d["destinations"]
] if "destinations" in d else [] ] if "destinations" in d else []
return cls(destinations=destinations) return cls(destinations=destinations)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class RouteRule: class RouteRule:
matches: List['RouteMatch'] matches: List['GrpcRoute.RouteMatch']
action: 'RouteAction' action: 'GrpcRoute.RouteAction'
@classmethod @classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteRule': def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteRule':
matches = [RouteMatch.from_response(m) for m in d["matches"] matches = [
GrpcRoute.RouteMatch.from_response(m) for m in d["matches"]
] if "matches" in d else [] ] if "matches" in d else []
return cls( return cls(
matches=matches, matches=matches,
action=RouteAction.from_response(d["action"]), action=GrpcRoute.RouteAction.from_response(d["action"]),
) )
name: str name: str
url: str url: str
hostnames: Tuple[str] hostnames: Tuple[str]
rules: Tuple['RouteRule'] rules: Tuple['GrpcRoute.RouteRule']
meshes: Optional[Tuple[str]] meshes: Optional[Tuple[str]]
@classmethod @classmethod
def from_response(cls, name: str, d: Dict[str, Any]) -> 'RouteRule': def from_response(cls, name: str, d: Dict[str,
Any]) -> 'GrpcRoute.RouteRule':
return cls( return cls(
name=name, name=name,
url=d["name"], url=d["name"],
@ -176,6 +178,9 @@ class _NetworkServicesBase(gcp.api.GcpStandardCloudApiResource,
metaclass=abc.ABCMeta): metaclass=abc.ABCMeta):
"""Base class for NetworkServices APIs.""" """Base class for NetworkServices APIs."""
# TODO(https://github.com/grpc/grpc/issues/29532) remove pylint disable
# pylint: disable=abstract-method
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networkservices(self.api_version), project) super().__init__(api_manager.networkservices(self.api_version), project)
# Shortcut to projects/*/locations/ endpoints # Shortcut to projects/*/locations/ endpoints
@ -185,7 +190,7 @@ class _NetworkServicesBase(gcp.api.GcpStandardCloudApiResource,
def api_name(self) -> str: def api_name(self) -> str:
return 'networkservices' return 'networkservices'
def _execute(self, *args, **kwargs): # pylint: disable=signature-differs def _execute(self, *args, **kwargs): # pylint: disable=signature-differs,arguments-differ
# Workaround TD bug: throttled operations are reported as internal. # Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578 # Ref b/175345578
retryer = tenacity.Retrying( retryer = tenacity.Retrying(

@ -98,7 +98,10 @@ class PortForwarder:
self.subprocess: Optional[subprocess.Popen] = None self.subprocess: Optional[subprocess.Popen] = None
def connect(self) -> None: def connect(self) -> None:
port_mapping = f"{self.local_port}:{self.remote_port}" if self.local_port else f":{self.remote_port}" if self.local_port:
port_mapping = f"{self.local_port}:{self.remote_port}"
else:
port_mapping = f":{self.remote_port}"
cmd = [ cmd = [
"kubectl", "--context", self.context, "--namespace", self.namespace, "kubectl", "--context", self.context, "--namespace", self.namespace,
"port-forward", "--address", self.local_address, self.destination, "port-forward", "--address", self.local_address, self.destination,
@ -167,7 +170,7 @@ class PortForwarder:
self.subprocess = None self.subprocess = None
class KubernetesNamespace: class KubernetesNamespace: # pylint: disable=too-many-public-methods
NEG_STATUS_META = 'cloud.google.com/neg-status' NEG_STATUS_META = 'cloud.google.com/neg-status'
DELETE_GRACE_PERIOD_SEC: int = 5 DELETE_GRACE_PERIOD_SEC: int = 5
WAIT_SHORT_TIMEOUT_SEC: int = 60 WAIT_SHORT_TIMEOUT_SEC: int = 60

@ -41,12 +41,14 @@ AuthorizationPolicy = gcp.network_security.AuthorizationPolicy
_NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1 _NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1
_NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1 _NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1
EndpointPolicy = gcp.network_services.EndpointPolicy EndpointPolicy = gcp.network_services.EndpointPolicy
GrpcRoute = gcp.network_services.GrpcRoute
Mesh = gcp.network_services.Mesh
# Testing metadata consts # Testing metadata consts
TEST_AFFINITY_METADATA_KEY = 'xds_md' TEST_AFFINITY_METADATA_KEY = 'xds_md'
class TrafficDirectorManager: class TrafficDirectorManager: # pylint: disable=too-many-public-methods
compute: _ComputeV1 compute: _ComputeV1
resource_prefix: str resource_prefix: str
resource_suffix: str resource_suffix: str
@ -383,8 +385,8 @@ class TrafficDirectorManager:
self.compute.wait_for_backends_healthy_status( self.compute.wait_for_backends_healthy_status(
self.affinity_backend_service, self.affinity_backends) self.affinity_backend_service, self.affinity_backends)
@staticmethod
def _generate_url_map_body( def _generate_url_map_body(
self,
name: str, name: str,
matcher_name: str, matcher_name: str,
src_hosts, src_hosts,
@ -547,9 +549,9 @@ class TrafficDirectorManager:
lo: int = 1024, # To avoid confusion, skip well-known ports. lo: int = 1024, # To avoid confusion, skip well-known ports.
hi: int = 65535, hi: int = 65535,
attempts: int = 25) -> int: attempts: int = 25) -> int:
for attempts in range(attempts): for _ in range(attempts):
src_port = random.randint(lo, hi) src_port = random.randint(lo, hi)
if not (self.compute.exists_forwarding_rule(src_port)): if not self.compute.exists_forwarding_rule(src_port):
return src_port return src_port
# TODO(sergiitk): custom exception # TODO(sergiitk): custom exception
raise RuntimeError("Couldn't find unused forwarding rule port") raise RuntimeError("Couldn't find unused forwarding rule port")
@ -656,8 +658,9 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager):
self.netsvc = _NetworkServicesV1Alpha1(gcp_api_manager, project) self.netsvc = _NetworkServicesV1Alpha1(gcp_api_manager, project)
# Managed resources # Managed resources
self.grpc_route: Optional[_NetworkServicesV1Alpha1.GrpcRoute] = None # TODO(gnossen) PTAL at the pylint error
self.mesh: Optional[_NetworkServicesV1Alpha1.Mesh] = None self.grpc_route: Optional[GrpcRoute] = None
self.mesh: Optional[Mesh] = None
def create_mesh(self) -> GcpResource: def create_mesh(self) -> GcpResource:
name = self.make_resource_name(self.MESH_NAME) name = self.make_resource_name(self.MESH_NAME)

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re import re
from typing import ClassVar, Dict, Optional from typing import Any, Dict, Optional
from google.protobuf import json_format from google.protobuf import json_format
import google.protobuf.message import google.protobuf.message
@ -29,7 +29,7 @@ class GrpcClientHelper:
channel: grpc.Channel channel: grpc.Channel
DEFAULT_RPC_DEADLINE_SEC = 90 DEFAULT_RPC_DEADLINE_SEC = 90
def __init__(self, channel: grpc.Channel, stub_class: ClassVar): def __init__(self, channel: grpc.Channel, stub_class: Any):
self.channel = channel self.channel = channel
self.stub = stub_class(channel) self.stub = stub_class(channel)
# This is purely cosmetic to make RPC logs look like method calls. # This is purely cosmetic to make RPC logs look like method calls.

@ -17,16 +17,17 @@ https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.p
""" """
import logging import logging
import queue from typing import Optional
from typing import Callable, Optional
from envoy.extensions.filters.common.fault.v3 import fault_pb2
from envoy.extensions.filters.http.fault.v3 import fault_pb2
from envoy.extensions.filters.http.router.v3 import router_pb2
# Envoy protos provided by PyPI package xds-protos # Envoy protos provided by PyPI package xds-protos
# Needs to import the generated Python file to load descriptors # Needs to import the generated Python file to load descriptors
# pylint: disable=unused-import
from envoy.extensions.filters.common.fault.v3 import fault_pb2 as _
from envoy.extensions.filters.http.fault.v3 import fault_pb2 as _
from envoy.extensions.filters.http.router.v3 import router_pb2 as _
from envoy.extensions.filters.network.http_connection_manager.v3 import \ from envoy.extensions.filters.network.http_connection_manager.v3 import \
http_connection_manager_pb2 http_connection_manager_pb2 as _
# pylint: enable=unused-import
from envoy.service.status.v3 import csds_pb2 from envoy.service.status.v3 import csds_pb2
from envoy.service.status.v3 import csds_pb2_grpc from envoy.service.status.v3 import csds_pb2_grpc
import grpc import grpc

@ -70,6 +70,7 @@ class KubernetesBaseRunner:
self.namespace: Optional[k8s.V1Namespace] = None self.namespace: Optional[k8s.V1Namespace] = None
def run(self, **kwargs): def run(self, **kwargs):
del kwargs
if self.reuse_namespace: if self.reuse_namespace:
self.namespace = self._reuse_namespace() self.namespace = self._reuse_namespace()
if not self.namespace: if not self.namespace:
@ -316,7 +317,7 @@ class KubernetesBaseRunner:
namespace_name: str, namespace_name: str,
gcp_project: str, gcp_project: str,
gcp_ui_url: str, gcp_ui_url: str,
end_delta: timedelta = None) -> None: end_delta: Optional[timedelta] = None) -> None:
"""Output the link to test server/client logs in GCP Logs Explorer.""" """Output the link to test server/client logs in GCP Logs Explorer."""
if end_delta is None: if end_delta is None:
end_delta = timedelta(hours=1) end_delta = timedelta(hours=1)

@ -227,7 +227,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
class KubernetesClientRunner(base_runner.KubernetesBaseRunner): class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
def __init__(self, def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace, k8s_namespace,
*, *,
deployment_name, deployment_name,
@ -282,7 +283,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
self.port_forwarder: Optional[k8s.PortForwarder] = None self.port_forwarder: Optional[k8s.PortForwarder] = None
# TODO(sergiitk): make rpc UnaryCall enum or get it from proto # TODO(sergiitk): make rpc UnaryCall enum or get it from proto
def run(self, def run( # pylint: disable=arguments-differ
self,
*, *,
server_target, server_target,
rpc='UnaryCall', rpc='UnaryCall',
@ -360,7 +362,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
server_target=server_target, server_target=server_target,
rpc_host=rpc_host) rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False): def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarder: if self.port_forwarder:
self.port_forwarder.close() self.port_forwarder.close()
self.port_forwarder = None self.port_forwarder = None

@ -19,7 +19,6 @@ modules.
""" """
import functools import functools
import logging import logging
import threading
from typing import Iterator, List, Optional from typing import Iterator, List, Optional
from framework.infrastructure import gcp from framework.infrastructure import gcp
@ -158,7 +157,8 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_MAINTENANCE_PORT = 8080 DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
def __init__(self, def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace, k8s_namespace,
*, *,
deployment_name, deployment_name,
@ -223,7 +223,8 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
self.service: Optional[k8s.V1Service] = None self.service: Optional[k8s.V1Service] = None
self.port_forwarders: List[k8s.PortForwarder] = [] self.port_forwarders: List[k8s.PortForwarder] = []
def run(self, def run( # pylint: disable=arguments-differ
self,
*, *,
test_port=DEFAULT_TEST_PORT, test_port=DEFAULT_TEST_PORT,
maintenance_port=None, maintenance_port=None,
@ -343,7 +344,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
pod_name=pod_name)) pod_name=pod_name))
return servers return servers
def cleanup(self, *, force=False, force_namespace=False): def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarders: if self.port_forwarders:
for port_forwarder in self.port_forwarders: for port_forwarder in self.port_forwarders:
port_forwarder.close() port_forwarder.close()

@ -85,6 +85,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
Returns: Returns:
A bool indicates if the given config is supported. A bool indicates if the given config is supported.
""" """
del config
return True return True
@classmethod @classmethod
@ -275,14 +276,13 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
"""Assert all RPCs for a method are completing with a certain status.""" """Assert all RPCs for a method are completing with a certain status."""
# Sending with pre-set QPS for a period of time # Sending with pre-set QPS for a period of time
before_stats = test_client.get_load_balancer_accumulated_stats() before_stats = test_client.get_load_balancer_accumulated_stats()
logging.info( response_type = 'LoadBalancerAccumulatedStatsResponse'
'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s', logging.info('Received %s from test client %s: before:\n%s',
test_client.ip, before_stats) response_type, test_client.ip, before_stats)
time.sleep(duration.total_seconds()) time.sleep(duration.total_seconds())
after_stats = test_client.get_load_balancer_accumulated_stats() after_stats = test_client.get_load_balancer_accumulated_stats()
logging.info( logging.info('Received %s from test client %s: after:\n%s',
'Received LoadBalancerAccumulatedStatsResponse from test client %s: after:\n%s', response_type, test_client.ip, after_stats)
test_client.ip, after_stats)
diff_stats = self.diffAccumulatedStatsPerMethod(before_stats, diff_stats = self.diffAccumulatedStatsPerMethod(before_stats,
after_stats) after_stats)
@ -314,7 +314,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
servers: List[XdsTestServer], servers: List[XdsTestServer],
num_rpcs: int): num_rpcs: int):
server_names = [server.pod_name for server in servers] server_names = [server.pod_name for server in servers]
logger.info(f'Verifying RPCs go to {server_names}') logger.info('Verifying RPCs go to %s', server_names)
lb_stats = self.getClientRpcStats(test_client, num_rpcs) lb_stats = self.getClientRpcStats(test_client, num_rpcs)
failed = int(lb_stats.num_failures) failed = int(lb_stats.num_failures)
self.assertLessEqual( self.assertLessEqual(
@ -383,11 +383,13 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
else: else:
self.assertSuccessfulRpcs(test_client) self.assertSuccessfulRpcs(test_client)
logger.info( logger.info(
'[SUCCESS] Confirmed successful RPC with the updated routing config, version=%s', ('[SUCCESS] Confirmed successful RPC with the '
'updated routing config, version=%s'),
route_config_version) route_config_version)
except retryers.RetryError as retry_error: except retryers.RetryError as retry_error:
logger.info( logger.info(
'Retry exhausted. TD routing config propagation failed after timeout %ds. Last seen client config dump: %s', ('Retry exhausted. TD routing config propagation failed after '
'timeout %ds. Last seen client config dump: %s'),
timeout_second, dumped_config) timeout_second, dumped_config)
raise retry_error raise retry_error
@ -432,7 +434,8 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
""" """
super().setUpClass() super().setUpClass()
if cls.server_maintenance_port is None: if cls.server_maintenance_port is None:
cls.server_maintenance_port = KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT cls.server_maintenance_port = \
KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT
def initTrafficDirectorManager(self) -> TrafficDirectorManager: def initTrafficDirectorManager(self) -> TrafficDirectorManager:
return TrafficDirectorManager( return TrafficDirectorManager(
@ -532,7 +535,8 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
# Health Checks and Channelz tests available. # Health Checks and Channelz tests available.
# When not provided, use explicit numeric port value, so # When not provided, use explicit numeric port value, so
# Backend Health Checks are created on a fixed port. # Backend Health Checks are created on a fixed port.
cls.server_maintenance_port = KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT cls.server_maintenance_port = \
KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager: def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager:
return TrafficDirectorSecureManager( return TrafficDirectorSecureManager(

@ -15,8 +15,7 @@
import functools import functools
import inspect import inspect
import time from typing import Any, Iterable, Mapping, Tuple
from typing import Any, Iterable, List, Mapping, Tuple
from absl import flags from absl import flags
from absl import logging from absl import logging
@ -65,8 +64,8 @@ class _UrlMapChangeAggregator:
*self._get_test_case_url_map(test_case)) *self._get_test_case_url_map(test_case))
self._set_test_case_url_map(*url_map_parts) self._set_test_case_url_map(*url_map_parts)
@staticmethod
def _get_test_case_url_map( def _get_test_case_url_map(
self,
test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]: test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
host_rule = { host_rule = {
"hosts": [test_case.hostname()], "hosts": [test_case.hostname()],
@ -140,7 +139,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
for key in absl_flags: for key in absl_flags:
setattr(self, key, absl_flags[key]) setattr(self, key, absl_flags[key])
# Pick a client_namespace_suffix if not set # Pick a client_namespace_suffix if not set
if self.resource_suffix is None: if getattr(self, 'resource_suffix', None) is None:
self.resource_suffix = "" self.resource_suffix = ""
else: else:
raise NotImplementedError( raise NotImplementedError(

@ -75,7 +75,7 @@ class DumpedXdsConfig(dict):
Feel free to add more pre-compute fields. Feel free to add more pre-compute fields.
""" """
def __init__(self, xds_json: JsonType): def __init__(self, xds_json: JsonType): # pylint: disable=too-many-branches
super().__init__(xds_json) super().__init__(xds_json)
self.json_config = xds_json self.json_config = xds_json
self.lds = None self.lds = None
@ -102,7 +102,8 @@ class DumpedXdsConfig(dict):
for endpoint in xds_config['endpointConfig'][ for endpoint in xds_config['endpointConfig'][
'dynamicEndpointConfigs']: 'dynamicEndpointConfigs']:
self.eds.append(endpoint['endpointConfig']) self.eds.append(endpoint['endpointConfig'])
except Exception as e: # TODO(lidiz) reduce the catch to LookupError
except Exception as e: # pylint: disable=broad-except
logging.debug('Parsing dumped xDS config failed with %s: %s', logging.debug('Parsing dumped xDS config failed with %s: %s',
type(e), e) type(e), e)
for generic_xds_config in self.get('genericXdsConfigs', []): for generic_xds_config in self.get('genericXdsConfigs', []):
@ -118,7 +119,8 @@ class DumpedXdsConfig(dict):
elif re.search(r'\.ClusterLoadAssignment$', elif re.search(r'\.ClusterLoadAssignment$',
generic_xds_config['typeUrl']): generic_xds_config['typeUrl']):
self.eds.append(generic_xds_config["xdsConfig"]) self.eds.append(generic_xds_config["xdsConfig"])
except Exception as e: # TODO(lidiz) reduce the catch to LookupError
except Exception as e: # pylint: disable=broad-except
logging.debug('Parsing dumped xDS config failed with %s: %s', logging.debug('Parsing dumped xDS config failed with %s: %s',
type(e), e) type(e), e)
for endpoint_config in self.eds: for endpoint_config in self.eds:
@ -131,7 +133,8 @@ class DumpedXdsConfig(dict):
['socketAddress']['address'], ['socketAddress']['address'],
lb_endpoint['endpoint']['address'] lb_endpoint['endpoint']['address']
['socketAddress']['portValue'])) ['socketAddress']['portValue']))
except Exception as e: # TODO(lidiz) reduce the catch to LookupError
except Exception as e: # pylint: disable=broad-except
logging.debug('Parse endpoint failed with %s: %s', logging.debug('Parse endpoint failed with %s: %s',
type(e), e) type(e), e)
@ -250,6 +253,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
Returns: Returns:
A bool indicates if the given config is supported. A bool indicates if the given config is supported.
""" """
del config
return True return True
@staticmethod @staticmethod
@ -293,7 +297,6 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
A tuple contains the updated version of given HostRule and A tuple contains the updated version of given HostRule and
PathMatcher. PathMatcher.
""" """
pass
@abc.abstractmethod @abc.abstractmethod
def xds_config_validate(self, xds_config: DumpedXdsConfig) -> None: def xds_config_validate(self, xds_config: DumpedXdsConfig) -> None:
@ -306,16 +309,14 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
xds_config: A DumpedXdsConfig instance can be used as a JSON dict, xds_config: A DumpedXdsConfig instance can be used as a JSON dict,
but also provides helper fields for commonly checked xDS config. but also provides helper fields for commonly checked xDS config.
""" """
pass
@abc.abstractmethod @abc.abstractmethod
def rpc_distribution_validate(self, client: XdsTestClient) -> None: def rpc_distribution_validate(self, test_client: XdsTestClient) -> None:
"""Validates the routing behavior, if any is wrong, raise. """Validates the routing behavior, if any is wrong, raise.
Args: Args:
client: A XdsTestClient instance for all sorts of end2end testing. test_client: A XdsTestClient instance for all sorts of end2end testing.
""" """
pass
@classmethod @classmethod
def hostname(cls): def hostname(cls):
@ -361,14 +362,15 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
GcpResourceManager().cleanup() GcpResourceManager().cleanup()
def _fetch_and_check_xds_config(self): def _fetch_and_check_xds_config(self):
# TODO(lidiz) find another way to store last seen xDS config
# Cleanup state for this attempt # Cleanup state for this attempt
self._xds_json_config = None self._xds_json_config = None # pylint: disable=attribute-defined-outside-init
# Fetch client config # Fetch client config
config = self.test_client.csds.fetch_client_status( config = self.test_client.csds.fetch_client_status(
log_level=logging.INFO) log_level=logging.INFO)
self.assertIsNotNone(config) self.assertIsNotNone(config)
# Found client config, test it. # Found client config, test it.
self._xds_json_config = json_format.MessageToDict(config) self._xds_json_config = json_format.MessageToDict(config) # pylint: disable=attribute-defined-outside-init
# Execute the child class provided validation logic # Execute the child class provided validation logic
self.xds_config_validate(DumpedXdsConfig(self._xds_json_config)) self.xds_config_validate(DumpedXdsConfig(self._xds_json_config))
@ -427,7 +429,8 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
f'insufficient endpoints in EDS: want={k} seen={xds_config.endpoints}' f'insufficient endpoints in EDS: want={k} seen={xds_config.endpoints}'
) )
def assertRpcStatusCode(self, test_client: XdsTestClient, *, def assertRpcStatusCode( # pylint: disable=too-many-locals
self, test_client: XdsTestClient, *,
expected: Iterable[ExpectedResult], length: int, expected: Iterable[ExpectedResult], length: int,
tolerance: float) -> None: tolerance: float) -> None:
"""Assert the distribution of RPC statuses over a period of time.""" """Assert the distribution of RPC statuses over a period of time."""
@ -468,6 +471,8 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
diff_ratio = abs(seen - want) / total diff_ratio = abs(seen - want) / total
self.assertLessEqual( self.assertLessEqual(
diff_ratio, tolerance, diff_ratio, tolerance,
'Expect rpc [%s] to return [%s] at %.2f ratio: seen=%d want=%d total=%d diff_ratio=%.4f > %.2f' (f'Expect rpc [{rpc}] to return '
% (rpc, expected_result.status_code, expected_result.ratio, f'[{expected_result.status_code}] at '
seen, want, total, diff_ratio, tolerance)) f'{expected_result.ratio:.2f} ratio: '
f'seen={seen} want={want} total={total} '
f'diff_ratio={diff_ratio:.4f} > {tolerance:.2f}'))

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import time import time
from typing import List, Optional from typing import List
from absl import flags from absl import flags
from absl.testing import absltest from absl.testing import absltest
@ -22,9 +22,7 @@ from google.protobuf import json_format
from framework import xds_k8s_testcase from framework import xds_k8s_testcase
from framework import xds_url_map_testcase from framework import xds_url_map_testcase
from framework.helpers import skips from framework.helpers import skips
from framework.infrastructure import k8s
from framework.rpc import grpc_channelz from framework.rpc import grpc_channelz
from framework.test_app import server_app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase) flags.adopt_module_key_flags(xds_k8s_testcase)
@ -50,7 +48,7 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
return config.version_ge('v1.40.x') return config.version_ge('v1.40.x')
return False return False
def test_affinity(self) -> None: def test_affinity(self) -> None: # pylint: disable=too-many-statements
with self.subTest('00_create_health_check'): with self.subTest('00_create_health_check'):
self.td.create_health_check() self.td.create_health_check()
@ -68,20 +66,21 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.test_servers: List[_XdsTestServer] = self.startTestServers( test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
replica_count=_REPLICA_COUNT)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends() self.setupServerBackends()
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(test_servers[0],
self.test_servers[0],
rpc='EmptyCall', rpc='EmptyCall',
metadata='EmptyCall:%s:123' % _TEST_AFFINITY_METADATA_KEY) metadata='EmptyCall:%s:123' %
_TEST_AFFINITY_METADATA_KEY)
# Validate the number of received endpoints and affinity configs. # Validate the number of received endpoints and affinity configs.
config = self.test_client.csds.fetch_client_status( config = test_client.csds.fetch_client_status(
log_level=logging.INFO) log_level=logging.INFO)
self.assertIsNotNone(config) self.assertIsNotNone(config)
json_config = json_format.MessageToDict(config) json_config = json_format.MessageToDict(config)
@ -95,34 +94,34 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.assertEqual(parsed.cds[0]['lbPolicy'], 'RING_HASH') self.assertEqual(parsed.cds[0]['lbPolicy'], 'RING_HASH')
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client) self.assertXdsConfigExists(test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'): with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client) self.assertSuccessfulRpcs(test_client)
with self.subTest('10_first_100_affinity_rpcs_pick_same_backend'): with self.subTest('10_first_100_affinity_rpcs_pick_same_backend'):
rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT) rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
json_lb_stats = json_format.MessageToDict(rpc_stats) json_lb_stats = json_format.MessageToDict(rpc_stats)
rpc_distribution = xds_url_map_testcase.RpcDistributionStats( rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
json_lb_stats) json_lb_stats)
self.assertEqual(1, rpc_distribution.num_peers) self.assertEqual(1, rpc_distribution.num_peers)
self.assertLen( self.assertLen(
self.test_client.find_subchannels_with_state( test_client.find_subchannels_with_state(
_ChannelzChannelState.READY), _ChannelzChannelState.READY),
1, 1,
) )
self.assertLen( self.assertLen(
self.test_client.find_subchannels_with_state( test_client.find_subchannels_with_state(
_ChannelzChannelState.IDLE), _ChannelzChannelState.IDLE),
2, 2,
) )
# Remember the backend inuse, and turn it down later. # Remember the backend inuse, and turn it down later.
self.first_backend_inuse = list( first_backend_inuse = list(
rpc_distribution.raw['rpcsByPeer'].keys())[0] rpc_distribution.raw['rpcsByPeer'].keys())[0]
with self.subTest('11_turn_down_server_in_use'): with self.subTest('11_turn_down_server_in_use'):
for s in self.test_servers: for s in test_servers:
if s.pod_name == self.first_backend_inuse: if s.pod_name == first_backend_inuse:
logging.info('setting backend %s to NOT_SERVING', logging.info('setting backend %s to NOT_SERVING',
s.pod_name) s.pod_name)
s.set_not_serving() s.set_not_serving()
@ -132,7 +131,7 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
parsed = None parsed = None
try: try:
while time.time() < deadline: while time.time() < deadline:
config = self.test_client.csds.fetch_client_status( config = test_client.csds.fetch_client_status(
log_level=logging.INFO) log_level=logging.INFO)
self.assertIsNotNone(config) self.assertIsNotNone(config)
json_config = json_format.MessageToDict(config) json_config = json_format.MessageToDict(config)
@ -150,14 +149,14 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
logging.info('Client received CSDS response: %s', parsed) logging.info('Client received CSDS response: %s', parsed)
with self.subTest('12_next_100_affinity_rpcs_pick_different_backend'): with self.subTest('12_next_100_affinity_rpcs_pick_different_backend'):
rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT) rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT)
json_lb_stats = json_format.MessageToDict(rpc_stats) json_lb_stats = json_format.MessageToDict(rpc_stats)
rpc_distribution = xds_url_map_testcase.RpcDistributionStats( rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
json_lb_stats) json_lb_stats)
self.assertEqual(1, rpc_distribution.num_peers) self.assertEqual(1, rpc_distribution.num_peers)
new_backend_inuse = list( new_backend_inuse = list(
rpc_distribution.raw['rpcsByPeer'].keys())[0] rpc_distribution.raw['rpcsByPeer'].keys())[0]
self.assertNotEqual(new_backend_inuse, self.first_backend_inuse) self.assertNotEqual(new_backend_inuse, first_backend_inuse)
if __name__ == '__main__': if __name__ == '__main__':

@ -49,14 +49,16 @@ class ApiListenerTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_default_forwarding_rule'): with self.subTest('04_create_default_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
test_server: _XdsTestServer
with self.subTest('05_start_test_server'): with self.subTest('05_start_test_server'):
test_server: _XdsTestServer = self.startTestServers()[0] test_server = self.startTestServers()[0]
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends() self.setupServerBackends()
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
test_client: _XdsTestClient = self.startTestClient(test_server) test_client = self.startTestClient(test_server)
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(test_client) self.assertXdsConfigExists(test_client)
@ -91,9 +93,9 @@ class ApiListenerTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
dumped_config = _DumpedXdsConfig( dumped_config = _DumpedXdsConfig(
json_format.MessageToDict(raw_config)) json_format.MessageToDict(raw_config))
previous_route_config_version = dumped_config.rds_version previous_route_config_version = dumped_config.rds_version
logger.info( logger.info(('received client config from CSDS with two url maps, '
'received client config from CSDS with two url maps, dump config: %s, rds version: %s', 'dump config: %s, rds version: %s'), dumped_config,
dumped_config, previous_route_config_version) previous_route_config_version)
with self.subTest('14_delete_one_url_map_target_proxy_forwarding_rule'): with self.subTest('14_delete_one_url_map_target_proxy_forwarding_rule'):
self.td.delete_forwarding_rule() self.td.delete_forwarding_rule()

@ -41,16 +41,17 @@ class AppNetTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase):
self.td.create_grpc_route(self.server_xds_host, self.td.create_grpc_route(self.server_xds_host,
self.server_xds_port) self.server_xds_port)
test_server: _XdsTestServer
with self.subTest('4_start_test_server'): with self.subTest('4_start_test_server'):
test_server: _XdsTestServer = self.startTestServers( test_server = self.startTestServers(replica_count=1)[0]
replica_count=1)[0]
with self.subTest('5_setup_server_backends'): with self.subTest('5_setup_server_backends'):
self.setupServerBackends() self.setupServerBackends()
test_client: _XdsTestClient
with self.subTest('6_start_test_client'): with self.subTest('6_start_test_client'):
test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(test_server,
test_server, config_mesh=self.td.mesh.name) config_mesh=self.td.mesh.name)
with self.subTest('7_assert_xds_config_exists'): with self.subTest('7_assert_xds_config_exists'):
self.assertXdsConfigExists(test_client) self.assertXdsConfigExists(test_client)

@ -44,14 +44,16 @@ class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('4_create_forwarding_rule'): with self.subTest('4_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
test_servers: _XdsTestServer
with self.subTest('5_start_test_server'): with self.subTest('5_start_test_server'):
test_servers: _XdsTestServer = self.startTestServers() test_servers = self.startTestServers()
with self.subTest('6_add_server_backends_to_backend_service'): with self.subTest('6_add_server_backends_to_backend_service'):
self.setupServerBackends() self.setupServerBackends()
test_client: _XdsTestClient
with self.subTest('7_start_test_client'): with self.subTest('7_start_test_client'):
test_client: _XdsTestClient = self.startTestClient(test_servers[0]) test_client = self.startTestClient(test_servers[0])
with self.subTest('8_test_client_xds_config_exists'): with self.subTest('8_test_client_xds_config_exists'):
self.assertXdsConfigExists(test_client) self.assertXdsConfigExists(test_client)

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import List, Optional from typing import List
from absl import flags from absl import flags
from absl.testing import absltest from absl.testing import absltest
@ -70,11 +70,11 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
default_test_servers: List[_XdsTestServer]
same_zone_test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.default_test_servers: List[ default_test_servers = self.startTestServers()
_XdsTestServer] = self.startTestServers() same_zone_test_servers = self.startTestServers(
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner) server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
@ -85,21 +85,21 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.alternative_backend_service_add_neg_backends( self.td.alternative_backend_service_add_neg_backends(
neg_name_alt, neg_zones_alt) neg_name_alt, neg_zones_alt)
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(default_test_servers[0])
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client) self.assertXdsConfigExists(test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'): with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client) self.assertSuccessfulRpcs(test_client)
with self.subTest('10_change_backend_service'): with self.subTest('10_change_backend_service'):
self.td.patch_url_map(self.server_xds_host, self.server_xds_port, self.td.patch_url_map(self.server_xds_host, self.server_xds_port,
self.td.alternative_backend_service) self.td.alternative_backend_service)
self.assertRpcsEventuallyGoToGivenServers( self.assertRpcsEventuallyGoToGivenServers(test_client,
self.test_client, self.same_zone_test_servers) same_zone_test_servers)
if __name__ == '__main__': if __name__ == '__main__':

@ -70,13 +70,13 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
default_test_servers: List[_XdsTestServer]
alternate_test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.default_test_servers: List[ default_test_servers = self.startTestServers(
_XdsTestServer] = self.startTestServers(
replica_count=self.REPLICA_COUNT) replica_count=self.REPLICA_COUNT)
self.alternate_test_servers: List[ alternate_test_servers = self.startTestServers(
_XdsTestServer] = self.startTestServers(
server_runner=self.secondary_server_runner) server_runner=self.secondary_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
@ -86,41 +86,40 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
server_runner=self.secondary_server_runner, server_runner=self.secondary_server_runner,
max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT) max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(default_test_servers[0])
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client) self.assertXdsConfigExists(test_client)
with self.subTest('09_primary_locality_receives_requests'): with self.subTest('09_primary_locality_receives_requests'):
self.assertRpcsEventuallyGoToGivenServers(self.test_client, self.assertRpcsEventuallyGoToGivenServers(test_client,
self.default_test_servers) default_test_servers)
with self.subTest( with self.subTest(
'10_secondary_locality_receives_no_requests_on_partial_primary_failure' '10_secondary_locality_receives_no_requests_on_partial_primary_failure'
): ):
self.default_test_servers[0].set_not_serving() default_test_servers[0].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers( self.assertRpcsEventuallyGoToGivenServers(test_client,
self.test_client, self.default_test_servers[1:]) default_test_servers[1:])
with self.subTest('11_gentle_failover'): with self.subTest('11_gentle_failover'):
self.default_test_servers[1].set_not_serving() default_test_servers[1].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers( self.assertRpcsEventuallyGoToGivenServers(
self.test_client, test_client, default_test_servers[2:] + alternate_test_servers)
self.default_test_servers[2:] + self.alternate_test_servers)
with self.subTest( with self.subTest(
'12_secondary_locality_receives_requests_on_primary_failure'): '12_secondary_locality_receives_requests_on_primary_failure'):
self.default_test_servers[2].set_not_serving() default_test_servers[2].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers( self.assertRpcsEventuallyGoToGivenServers(test_client,
self.test_client, self.alternate_test_servers) alternate_test_servers)
with self.subTest('13_traffic_resumes_to_healthy_backends'): with self.subTest('13_traffic_resumes_to_healthy_backends'):
for i in range(self.REPLICA_COUNT): for i in range(self.REPLICA_COUNT):
self.default_test_servers[i].set_serving() default_test_servers[i].set_serving()
self.assertRpcsEventuallyGoToGivenServers(self.test_client, self.assertRpcsEventuallyGoToGivenServers(test_client,
self.default_test_servers) default_test_servers)
if __name__ == '__main__': if __name__ == '__main__':

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import List, Optional from typing import List
from absl import flags from absl import flags
from absl.testing import absltest from absl.testing import absltest
@ -68,35 +68,34 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
default_test_servers: List[_XdsTestServer]
same_zone_test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.default_test_servers: List[ default_test_servers = self.startTestServers()
_XdsTestServer] = self.startTestServers() same_zone_test_servers = self.startTestServers(
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner) server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends() self.setupServerBackends()
self.setupServerBackends(server_runner=self.alternate_server_runner) self.setupServerBackends(server_runner=self.alternate_server_runner)
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(default_test_servers[0])
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client) self.assertXdsConfigExists(test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'): with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client) self.assertSuccessfulRpcs(test_client)
with self.subTest('10_remove_neg'): with self.subTest('10_remove_neg'):
self.assertRpcsEventuallyGoToGivenServers( self.assertRpcsEventuallyGoToGivenServers(
self.test_client, test_client, default_test_servers + same_zone_test_servers)
self.default_test_servers + self.same_zone_test_servers)
self.removeServerBackends( self.removeServerBackends(
server_runner=self.alternate_server_runner) server_runner=self.alternate_server_runner)
self.assertRpcsEventuallyGoToGivenServers(self.test_client, self.assertRpcsEventuallyGoToGivenServers(test_client,
self.default_test_servers) default_test_servers)
if __name__ == '__main__': if __name__ == '__main__':

@ -12,14 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import List, Optional from typing import List
from absl import flags from absl import flags
from absl.testing import absltest from absl.testing import absltest
from framework import xds_k8s_testcase from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase) flags.adopt_module_key_flags(xds_k8s_testcase)
@ -49,33 +47,33 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.test_servers: List[_XdsTestServer] = self.startTestServers( test_servers = self.startTestServers(replica_count=REPLICA_COUNT)
replica_count=REPLICA_COUNT)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends() self.setupServerBackends()
test_client: _XdsTestClient
with self.subTest('07_start_test_client'): with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient( test_client = self.startTestClient(test_servers[0])
self.test_servers[0])
with self.subTest('08_test_client_xds_config_exists'): with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client) self.assertXdsConfigExists(test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'): with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client) self.assertSuccessfulRpcs(test_client)
with self.subTest('10_round_robin'): with self.subTest('10_round_robin'):
num_rpcs = 100 num_rpcs = 100
expected_rpcs_per_replica = num_rpcs / REPLICA_COUNT expected_rpcs_per_replica = num_rpcs / REPLICA_COUNT
rpcs_by_peer = self.getClientRpcStats(self.test_client, rpcs_by_peer = self.getClientRpcStats(test_client,
num_rpcs).rpcs_by_peer num_rpcs).rpcs_by_peer
total_requests_received = sum(rpcs_by_peer[x] for x in rpcs_by_peer) total_requests_received = sum(rpcs_by_peer[x] for x in rpcs_by_peer)
self.assertEqual(total_requests_received, num_rpcs, self.assertEqual(total_requests_received, num_rpcs,
'Wrong number of RPCS') 'Wrong number of RPCS')
for server in self.test_servers: for server in test_servers:
pod_name = server.pod_name pod_name = server.pod_name
self.assertIn(pod_name, rpcs_by_peer, self.assertIn(pod_name, rpcs_by_peer,
f'pod {pod_name} did not receive RPCs') f'pod {pod_name} did not receive RPCs')

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import collections import collections
from typing import List, Optional from typing import List
from absl import flags from absl import flags
from absl import logging from absl import logging
@ -22,8 +22,6 @@ from google.protobuf import json_format
from framework import xds_k8s_testcase from framework import xds_k8s_testcase
from framework import xds_url_map_testcase from framework import xds_url_map_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
flags.adopt_module_key_flags(xds_k8s_testcase) flags.adopt_module_key_flags(xds_k8s_testcase)
@ -54,9 +52,9 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('04_create_forwarding_rule'): with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port) self.td.create_forwarding_rule(self.server_xds_port)
test_servers: List[_XdsTestServer]
with self.subTest('05_start_test_servers'): with self.subTest('05_start_test_servers'):
self.test_servers: List[_XdsTestServer] = self.startTestServers( test_servers = self.startTestServers(replica_count=_NUM_BACKENDS)
replica_count=_NUM_BACKENDS)
with self.subTest('06_add_server_backends_to_backend_services'): with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends() self.setupServerBackends()
@ -68,7 +66,7 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.client_runner.cleanup(force=True) self.client_runner.cleanup(force=True)
# Create a test client # Create a test client
test_client: _XdsTestClient = self.startTestClient( test_client: _XdsTestClient = self.startTestClient(
self.test_servers[0]) test_servers[0])
# Validate the number of received endpoints # Validate the number of received endpoints
config = test_client.csds.fetch_client_status( config = test_client.csds.fetch_client_status(
log_level=logging.INFO) log_level=logging.INFO)

@ -208,8 +208,8 @@ class TestHeaderBasedAffinityMultipleHeaders(
break break
self.assertTrue( self.assertTrue(
different_peer_picked, different_peer_picked,
"the same endpoint was picked for all the headers, expect a different endpoint to be picked" ("the same endpoint was picked for all the headers, expect a "
) "different endpoint to be picked"))
self.assertLen( self.assertLen(
test_client.find_subchannels_with_state( test_client.find_subchannels_with_state(
_ChannelzChannelState.READY), _ChannelzChannelState.READY),

@ -20,7 +20,6 @@ from absl.testing import absltest
import grpc import grpc
from framework import xds_url_map_testcase from framework import xds_url_map_testcase
from framework.rpc import grpc_testing
from framework.test_app import client_app from framework.test_app import client_app
# Type aliases # Type aliases
@ -114,7 +113,7 @@ def _wait_until_backlog_cleared(test_client: XdsTestClient,
# Both backlog of both types of RPCs is clear, success, return. # Both backlog of both types of RPCs is clear, success, return.
return return
raise RuntimeError('failed to clear RPC backlog in %s seconds', timeout) raise RuntimeError('failed to clear RPC backlog in %s seconds' % timeout)
class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase): class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
@ -145,8 +144,8 @@ class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
filter_config['abort']['percentage']['denominator']) filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall], rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client, self.assertRpcStatusCode(test_client,
expected=(ExpectedResult( expected=(ExpectedResult(
@ -236,9 +235,8 @@ class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase):
filter_config['delay']['percentage']['denominator']) filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(test_client,
test_client, rpc_types=(RpcTypeUnaryCall,),
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS, num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client) _wait_until_backlog_cleared(test_client)
@ -275,8 +273,8 @@ class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase):
filter_config['abort']['percentage']['denominator']) filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall], rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode( self.assertRpcStatusCode(
test_client, test_client,
@ -311,9 +309,8 @@ class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase):
filter_config['delay']['percentage']['denominator']) filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(test_client,
test_client, rpc_types=(RpcTypeUnaryCall,),
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS, num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client) _wait_until_backlog_cleared(test_client)
@ -350,8 +347,8 @@ class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase):
filter_config['abort']['percentage']['denominator']) filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall], rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode( self.assertRpcStatusCode(
test_client, test_client,

@ -121,8 +121,9 @@ class TestPrefixMatch(xds_url_map_testcase.XdsUrlMapTestCase):
[0]['prefixMatch'], _TEST_METADATA_VALUE_UNARY[:2]) [0]['prefixMatch'], _TEST_METADATA_VALUE_UNARY[:2])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client,
rpc_types=(RpcTypeUnaryCall,),
metadata=_TEST_METADATA, metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(
@ -203,8 +204,9 @@ class TestPresentMatch(xds_url_map_testcase.XdsUrlMapTestCase):
[0]['presentMatch'], True) [0]['presentMatch'], True)
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client,
rpc_types=(RpcTypeUnaryCall,),
metadata=_TEST_METADATA, metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
from typing import Tuple from typing import Tuple
@ -153,9 +152,8 @@ class TestMetadataFilterMatchAny(xds_url_map_testcase.XdsUrlMapTestCase):
"") "")
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS)
num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(
_NUM_RPCS, _NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count) rpc_distribution.unary_call_alternative_service_rpc_count)
@ -203,9 +201,8 @@ class TestMetadataFilterMatchAnyAndAll(xds_url_map_testcase.XdsUrlMapTestCase):
"") "")
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS)
num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(
_NUM_RPCS, _NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count) rpc_distribution.unary_call_alternative_service_rpc_count)

@ -89,9 +89,8 @@ class TestFullPathMatchUnaryCall(xds_url_map_testcase.XdsUrlMapTestCase):
"/grpc.testing.TestService/UnaryCall") "/grpc.testing.TestService/UnaryCall")
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS)
num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(
_NUM_RPCS, _NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count) rpc_distribution.unary_call_alternative_service_rpc_count)
@ -159,7 +158,7 @@ class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase):
# Regex UnaryCall -> alternate_backend_service. # Regex UnaryCall -> alternate_backend_service.
'matchRules': [{ 'matchRules': [{
'regexMatch': 'regexMatch':
'^\/.*\/UnaryCall$' # Unary methods with any services. r'^\/.*\/UnaryCall$' # Unary methods with any services.
}], }],
'service': GcpResourceManager().alternative_backend_service() 'service': GcpResourceManager().alternative_backend_service()
}] }]
@ -169,12 +168,11 @@ class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase):
self.assertNumEndpoints(xds_config, 2) self.assertNumEndpoints(xds_config, 2)
self.assertEqual( self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['safeRegex'] xds_config.rds['virtualHosts'][0]['routes'][0]['match']['safeRegex']
['regex'], '^\/.*\/UnaryCall$') ['regex'], r'^\/.*\/UnaryCall$')
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client, rpc_distribution = self.configure_and_send(
rpc_types=[RpcTypeUnaryCall], test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS)
num_rpcs=_NUM_RPCS)
self.assertEqual( self.assertEqual(
_NUM_RPCS, _NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count) rpc_distribution.unary_call_alternative_service_rpc_count)

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import time
from typing import Tuple from typing import Tuple
from absl import flags from absl import flags
@ -91,11 +90,13 @@ class TestRetryUpTo3AttemptsAndFail(xds_url_map_testcase.XdsUrlMapTestCase):
self.assertEqual('unavailable', retry_config['retryOn']) self.assertEqual('unavailable', retry_config['retryOn'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(test_client,
test_client, rpc_types=(RpcTypeUnaryCall,),
rpc_types=[RpcTypeUnaryCall], metadata=[
metadata=[(RpcTypeUnaryCall, _RPC_BEHAVIOR_HEADER_NAME, (RpcTypeUnaryCall,
'error-code-14,succeed-on-retry-attempt-4')], _RPC_BEHAVIOR_HEADER_NAME,
'error-code-14,succeed-on-retry-attempt-4')
],
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client, self.assertRpcStatusCode(test_client,
expected=(ExpectedResult( expected=(ExpectedResult(
@ -134,11 +135,13 @@ class TestRetryUpTo4AttemptsAndSucceed(xds_url_map_testcase.XdsUrlMapTestCase):
self.assertEqual('unavailable', retry_config['retryOn']) self.assertEqual('unavailable', retry_config['retryOn'])
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(test_client,
test_client, rpc_types=(RpcTypeUnaryCall,),
rpc_types=[RpcTypeUnaryCall], metadata=[
metadata=[(RpcTypeUnaryCall, _RPC_BEHAVIOR_HEADER_NAME, (RpcTypeUnaryCall,
'error-code-14,succeed-on-retry-attempt-4')], _RPC_BEHAVIOR_HEADER_NAME,
'error-code-14,succeed-on-retry-attempt-4')
],
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client, self.assertRpcStatusCode(test_client,
expected=(ExpectedResult( expected=(ExpectedResult(

@ -86,7 +86,7 @@ class TestTimeoutInRouteRule(_BaseXdsTimeOutTestCase):
return config.server_lang == 'java' return config.server_lang == 'java'
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(
test_client, test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall], rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
# UnaryCall and EmptyCall both sleep-4. # UnaryCall and EmptyCall both sleep-4.
@ -115,9 +115,9 @@ class TestTimeoutInApplication(_BaseXdsTimeOutTestCase):
return config.server_lang == 'java' return config.server_lang == 'java'
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(
test_client, test_client,
rpc_types=[RpcTypeUnaryCall], rpc_types=(RpcTypeUnaryCall,),
# UnaryCall only with sleep-2; timeout=1s; calls timeout. # UnaryCall only with sleep-2; timeout=1s; calls timeout.
metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),), metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),),
app_timeout=1, app_timeout=1,
@ -134,10 +134,10 @@ class TestTimeoutInApplication(_BaseXdsTimeOutTestCase):
class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase): class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase):
def rpc_distribution_validate(self, test_client: XdsTestClient): def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send( self.configure_and_send(
test_client, test_client,
# UnaryCall only with no sleep; calls succeed. # UnaryCall only with no sleep; calls succeed.
rpc_types=[RpcTypeUnaryCall], rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS) num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client, self.assertRpcStatusCode(test_client,
expected=(ExpectedResult( expected=(ExpectedResult(

Loading…
Cancel
Save