From 697c438df6adcd1acbd9c4436466ccc7e28fbf3c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 3 May 2022 11:05:04 -0700 Subject: [PATCH] [PSM interop] Introduce isort/pylint to the GKE framework (#29476) * [PSM interop] Introduce isort/pylint to the GKE framework * Update the rpc_types * Update variable type annotation style in tests/*.py --- .pylintrc | 2 + .pylintrc-examples | 2 + .pylintrc-tests | 2 + tools/distrib/isort_code.sh | 1 + tools/distrib/pylint_code.sh | 3 + .../xds_k8s_test_driver/bin/run_channelz.py | 1 - .../xds_k8s_test_driver/bin/run_td_setup.py | 2 +- .../framework/helpers/datetime.py | 4 +- .../framework/helpers/retryers.py | 2 +- .../framework/infrastructure/gcp/api.py | 12 ++-- .../framework/infrastructure/gcp/compute.py | 57 ++++++++++--------- .../framework/infrastructure/gcp/iam.py | 15 ++--- .../infrastructure/gcp/network_security.py | 5 +- .../infrastructure/gcp/network_services.py | 47 ++++++++------- .../framework/infrastructure/k8s.py | 7 ++- .../infrastructure/traffic_director.py | 15 +++-- .../xds_k8s_test_driver/framework/rpc/grpc.py | 4 +- .../framework/rpc/grpc_csds.py | 13 +++-- .../framework/test_app/base_runner.py | 3 +- .../framework/test_app/client_app.py | 44 +++++++------- .../framework/test_app/server_app.py | 51 +++++++++-------- .../framework/xds_k8s_testcase.py | 26 +++++---- .../framework/xds_url_map_test_resources.py | 7 +-- .../framework/xds_url_map_testcase.py | 39 +++++++------ .../tests/affinity_test.py | 43 +++++++------- .../tests/api_listener_test.py | 12 ++-- .../xds_k8s_test_driver/tests/app_net_test.py | 9 +-- .../tests/baseline_test.py | 6 +- .../tests/change_backend_service_test.py | 24 ++++---- .../tests/failover_test.py | 45 +++++++-------- .../tests/remove_neg_test.py | 27 +++++---- .../tests/round_robin_test.py | 20 +++---- .../tests/subsetting_test.py | 10 ++-- .../tests/url_map/affinity_test.py | 4 +- .../tests/url_map/fault_injection_test.py | 39 ++++++------- .../tests/url_map/header_matching_test.py | 18 +++--- .../tests/url_map/metadata_filter_test.py | 11 ++-- .../tests/url_map/path_matching_test.py | 14 ++--- .../tests/url_map/retry_test.py | 29 +++++----- .../tests/url_map/timeout_test.py | 10 ++-- 40 files changed, 360 insertions(+), 325 deletions(-) diff --git a/.pylintrc b/.pylintrc index 3f2384e0674..414505fe34b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -97,5 +97,7 @@ disable= useless-object-inheritance, # NOTE(lidiz): the import order will be enforced by isort instead wrong-import-order, + # TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint + unsubscriptable-object, # NOTE(sergiitk): yapf compatibility, ref #25071 bad-continuation, diff --git a/.pylintrc-examples b/.pylintrc-examples index ba9bded89fd..fbd137b55de 100644 --- a/.pylintrc-examples +++ b/.pylintrc-examples @@ -100,5 +100,7 @@ disable= useless-object-inheritance, # NOTE(lidiz): the import order will be enforced by isort instead wrong-import-order, + # TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint + unsubscriptable-object, # NOTE(sergiitk): yapf compatibility, ref #25071 bad-continuation, diff --git a/.pylintrc-tests b/.pylintrc-tests index 14512019601..fffb08783a0 100644 --- a/.pylintrc-tests +++ b/.pylintrc-tests @@ -126,5 +126,7 @@ disable= useless-object-inheritance, # NOTE(lidiz): the import order will be enforced by isort instead wrong-import-order, + # TODO(https://github.com/PyCQA/pylint/issues/3882): Upgrade Pylint + unsubscriptable-object, # NOTE(sergiitk): yapf compatibility, ref #25071 bad-continuation, diff --git a/tools/distrib/isort_code.sh b/tools/distrib/isort_code.sh index 08b4e5a18cc..4bdc7271972 100755 --- a/tools/distrib/isort_code.sh +++ b/tools/distrib/isort_code.sh @@ -31,6 +31,7 @@ DIRS=( 'test' 'tools' 'setup.py' + 'tools/run_tests/xds_k8s_test_driver' ) VIRTUALENV=isort_virtual_environment diff --git a/tools/distrib/pylint_code.sh b/tools/distrib/pylint_code.sh index 83faa932ba9..7ea443842e9 100755 --- a/tools/distrib/pylint_code.sh +++ b/tools/distrib/pylint_code.sh @@ -28,11 +28,14 @@ DIRS=( 'src/python/grpcio_reflection/grpc_reflection' 'src/python/grpcio_testing/grpc_testing' 'src/python/grpcio_status/grpc_status' + 'tools/run_tests/xds_k8s_test_driver/bin' + 'tools/run_tests/xds_k8s_test_driver/framework' ) TEST_DIRS=( 'src/python/grpcio_tests/tests' 'src/python/grpcio_tests/tests_gevent' + 'tools/run_tests/xds_k8s_test_driver/tests' ) VIRTUALENV=python_pylint_venv diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py index 5c2c0ac0cea..6af0ee241f3 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py @@ -178,7 +178,6 @@ def main(argv): # Resource names. resource_prefix: str = xds_flags.RESOURCE_PREFIX.value - resource_suffix: str = xds_flags.RESOURCE_SUFFIX.value # Server server_name = xds_flags.SERVER_NAME.value diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py index efcb5916bef..2968d9647ce 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py @@ -67,7 +67,7 @@ flags.mark_flag_as_required("resource_suffix") KubernetesServerRunner = server_app.KubernetesServerRunner -def main(argv): +def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-statements if len(argv) > 1: raise app.UsageError('Too many command-line arguments.') diff --git a/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py b/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py index a4b27849160..0e2f9d1d261 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py @@ -14,7 +14,7 @@ """This contains common helpers for working with dates and time.""" import datetime import re -from typing import Pattern +from typing import Optional, Pattern RE_ZERO_OFFSET: Pattern[str] = re.compile(r'[+\-]00:?00$') @@ -29,7 +29,7 @@ def shorten_utc_zone(utc_datetime_str: str) -> str: return RE_ZERO_OFFSET.sub('Z', utc_datetime_str) -def iso8601_utc_time(timedelta: datetime.timedelta = None) -> str: +def iso8601_utc_time(timedelta: Optional[datetime.timedelta] = None) -> str: """Return datetime relative to current in ISO-8601 format, UTC tz.""" time: datetime.datetime = utc_now() if timedelta: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py b/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py index 94c7cf1ce03..45588d7d65a 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py @@ -70,7 +70,7 @@ def exponential_retryer_with_timeout( def constant_retryer(*, wait_fixed: timedelta, attempts: int = 0, - timeout: timedelta = None, + timeout: Optional[timedelta] = None, retry_on_exceptions: Optional[Sequence[Any]] = None, logger: Optional[logging.Logger] = None, log_level: Optional[int] = logging.DEBUG) -> Retrying: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py index 475dfd69598..116a2f21079 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py @@ -149,8 +149,9 @@ class GcpApiManager: raise NotImplementedError(f'Network Services {version} not supported') + @staticmethod @functools.lru_cache(None) - def secrets(self, version): + def secrets(version: str): if version == 'v1': return secretmanager_v1.SecretManagerServiceClient() @@ -263,7 +264,7 @@ class OperationError(Error): """ api_name: str name: str - metadata: str + metadata: Any code_name: code_pb2.Code error: status_pb2.Status @@ -431,9 +432,10 @@ class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta): return False # TODO(sergiitk): Use ResponseError and TransportError - def _execute(self, - request: HttpRequest, - timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): + def _execute( # pylint: disable=arguments-differ + self, + request: HttpRequest, + timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): operation = request.execute(num_retries=self._GCP_API_RETRIES) self._wait(operation, timeout_sec) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py index 700c9c07cb1..1f9a3d4a4b5 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py @@ -26,7 +26,7 @@ from framework.infrastructure import gcp logger = logging.getLogger(__name__) -class ComputeV1(gcp.api.GcpProjectApiResource): +class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-public-methods # TODO(sergiitk): move someplace better _WAIT_FOR_BACKEND_SEC = 60 * 10 _WAIT_FOR_OPERATION_SEC = 60 * 10 @@ -58,7 +58,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): name: str, protocol: HealthCheckProtocol, *, - port: Optional[int] = None) -> GcpResource: + port: Optional[int] = None) -> 'GcpResource': if protocol is self.HealthCheckProtocol.TCP: health_check_field = 'tcpHealthCheck' elif protocol is self.HealthCheckProtocol.GRPC: @@ -88,7 +88,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_firewall_rule(self, name: str, network_url: str, source_ranges: List[str], - ports: List[str]) -> Optional[GcpResource]: + ports: List[str]) -> Optional['GcpResource']: try: return self._insert_resource( self.api.firewalls(), { @@ -107,7 +107,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): # TODO(lidiz) use status_code() when we upgrade googleapiclient if http_error.resp.status == 409: logger.debug('Firewall rule %s already existed', name) - return + return None else: raise @@ -117,10 +117,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_backend_service_traffic_director( self, name: str, - health_check: GcpResource, - affinity_header: str = None, + health_check: 'GcpResource', + affinity_header: Optional[str] = None, protocol: Optional[BackendServiceProtocol] = None, - subset_size: Optional[int] = None) -> GcpResource: + subset_size: Optional[int] = None) -> 'GcpResource': if not isinstance(protocol, self.BackendServiceProtocol): raise TypeError(f'Unexpected Backend Service protocol: {protocol}') body = { @@ -144,7 +144,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): } return self._insert_resource(self.api.backendServices(), body) - def get_backend_service_traffic_director(self, name: str) -> GcpResource: + def get_backend_service_traffic_director(self, name: str) -> 'GcpResource': return self._get_resource(self.api.backendServices(), backendService=name) @@ -185,9 +185,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource): name: str, matcher_name: str, src_hosts, - dst_default_backend_service: GcpResource, - dst_host_rule_match_backend_service: Optional[GcpResource] = None, - ) -> GcpResource: + dst_default_backend_service: 'GcpResource', + dst_host_rule_match_backend_service: Optional['GcpResource'] = None, + ) -> 'GcpResource': if dst_host_rule_match_backend_service is None: dst_host_rule_match_backend_service = dst_default_backend_service return self._insert_resource( @@ -206,10 +206,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }], }) - def create_url_map_with_content(self, url_map_body: Any) -> GcpResource: + def create_url_map_with_content(self, url_map_body: Any) -> 'GcpResource': return self._insert_resource(self.api.urlMaps(), url_map_body) - def patch_url_map(self, url_map: GcpResource, body, **kwargs): + def patch_url_map(self, url_map: 'GcpResource', body, **kwargs): self._patch_resource(collection=self.api.urlMaps(), urlMap=url_map.name, body=body, @@ -221,9 +221,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_target_grpc_proxy( self, name: str, - url_map: GcpResource, + url_map: 'GcpResource', validate_for_proxyless: bool = True, - ) -> GcpResource: + ) -> 'GcpResource': return self._insert_resource( self.api.targetGrpcProxies(), { 'name': name, @@ -238,8 +238,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_target_http_proxy( self, name: str, - url_map: GcpResource, - ) -> GcpResource: + url_map: 'GcpResource', + ) -> 'GcpResource': return self._insert_resource(self.api.targetHttpProxies(), { 'name': name, 'url_map': url_map.url, @@ -252,10 +252,10 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_forwarding_rule(self, name: str, src_port: int, - target_proxy: GcpResource, + target_proxy: 'GcpResource', network_url: str, *, - ip_address: str = '0.0.0.0') -> GcpResource: + ip_address: str = '0.0.0.0') -> 'GcpResource': return self._insert_resource( self.api.globalForwardingRules(), { @@ -367,14 +367,14 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }).execute() def _get_resource(self, collection: discovery.Resource, - **kwargs) -> GcpResource: + **kwargs) -> 'GcpResource': resp = collection.get(project=self.project, **kwargs).execute() logger.info('Loaded compute resource:\n%s', self.resource_pretty_format(resp)) return self.GcpResource(resp['name'], resp['selfLink']) - def _exists_resource(self, collection: discovery.Resource, - filter: str) -> bool: + def _exists_resource( + self, collection: discovery.Resource, filter: str) -> bool: # pylint: disable=redefined-builtin resp = collection.list( project=self.project, filter=filter, maxResults=1).execute(num_retries=self._GCP_API_RETRIES) @@ -384,7 +384,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): return 'items' in resp and resp['items'] def _insert_resource(self, collection: discovery.Resource, - body: Dict[str, Any]) -> GcpResource: + body: Dict[str, Any]) -> 'GcpResource': logger.info('Creating compute resource:\n%s', self.resource_pretty_format(body)) resp = self._execute(collection.insert(project=self.project, body=body)) @@ -420,11 +420,12 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def _operation_status_done(operation): return 'status' in operation and operation['status'] == 'DONE' - def _execute(self, - request, - *, - test_success_fn=None, - timeout_sec=_WAIT_FOR_OPERATION_SEC): + def _execute( # pylint: disable=arguments-differ + self, + request, + *, + test_success_fn=None, + timeout_sec=_WAIT_FOR_OPERATION_SEC): operation = request.execute(num_retries=self._GCP_API_RETRIES) logger.debug('Response %s', operation) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py index b1f6ed4ed21..acd319519d9 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py @@ -33,7 +33,6 @@ class EtagConflict(gcp.api.Error): https://cloud.google.com/iam/docs/policies#etag """ - pass def handle_etag_conflict(func): @@ -54,7 +53,7 @@ def _replace_binding(policy: 'Policy', binding: 'Policy.Binding', new_bindings = set(policy.bindings) new_bindings.discard(binding) new_bindings.add(new_binding) - return dataclasses.replace(policy, bindings=frozenset(new_bindings)) + return dataclasses.replace(policy, bindings=frozenset(new_bindings)) # pylint: disable=too-many-function-args @dataclasses.dataclass(frozen=True) @@ -190,7 +189,7 @@ class IamV1(gcp.api.GcpProjectApiResource): # Otherwise conditions are omitted, and role names returned with a suffix, # f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0 # https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version - POLICY_VERSION: str = 3 + POLICY_VERSION: int = 3 def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): super().__init__(api_manager.iam('v1'), project) @@ -271,8 +270,9 @@ class IamV1(gcp.api.GcpProjectApiResource): updated_binding = Policy.Binding(role, frozenset([member])) else: updated_members: FrozenSet[str] = binding.members.union({member}) - updated_binding: Policy.Binding = dataclasses.replace( - binding, members=updated_members) + updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args + binding, + members=updated_members) updated_policy: Policy = _replace_binding(policy, binding, updated_binding) @@ -303,8 +303,9 @@ class IamV1(gcp.api.GcpProjectApiResource): return updated_members: FrozenSet[str] = binding.members.difference({member}) - updated_binding: Policy.Binding = dataclasses.replace( - binding, members=updated_members) + updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args + binding, + members=updated_members) updated_policy: Policy = _replace_binding(policy, binding, updated_binding) self.set_service_account_iam_policy(account, updated_policy) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py index 225969f9428..1c2d7dbc7e3 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py @@ -92,6 +92,9 @@ class _NetworkSecurityBase(gcp.api.GcpStandardCloudApiResource, metaclass=abc.ABCMeta): """Base class for NetworkSecurity APIs.""" + # TODO(https://github.com/grpc/grpc/issues/29532) remove pylint disable + # pylint: disable=abstract-method + def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): super().__init__(api_manager.networksecurity(self.api_version), project) # Shortcut to projects/*/locations/ endpoints @@ -101,7 +104,7 @@ class _NetworkSecurityBase(gcp.api.GcpStandardCloudApiResource, def api_name(self) -> str: return 'networksecurity' - def _execute(self, *args, **kwargs): # pylint: disable=signature-differs + def _execute(self, *args, **kwargs): # pylint: disable=signature-differs,arguments-differ # Workaround TD bug: throttled operations are reported as internal. # Ref b/175345578 retryer = tenacity.Retrying( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py index 58851c68ebb..f080aaa62c3 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py @@ -81,7 +81,7 @@ class GrpcRoute: case_sensitive: Optional[bool] @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'MethodMatch': + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.MethodMatch': return cls( type=d.get("type"), grpc_service=d.get("grpcService"), @@ -96,7 +96,7 @@ class GrpcRoute: value: str @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'HeaderMatch': + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.HeaderMatch': return cls( type=d.get("type"), key=d["key"], @@ -105,17 +105,17 @@ class GrpcRoute: @dataclasses.dataclass(frozen=True) class RouteMatch: - method: Optional['MethodMatch'] - headers: Tuple['HeaderMatch'] + method: Optional['GrpcRoute.MethodMatch'] + headers: Tuple['GrpcRoute.HeaderMatch'] @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'RouteMatch': + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteMatch': return cls( - method=MethodMatch.from_response(d["method"]) + method=GrpcRoute.MethodMatch.from_response(d["method"]) if "method" in d else None, headers=tuple( - HeaderMatch.from_response(h) for h in d["headers"]) - if "headers" in d else (), + GrpcRoute.HeaderMatch.from_response(h) + for h in d["headers"]) if "headers" in d else (), ) @dataclasses.dataclass(frozen=True) @@ -124,7 +124,7 @@ class GrpcRoute: weight: Optional[int] @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'Destination': + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.Destination': return cls( service_name=d["serviceName"], weight=d.get("weight"), @@ -132,37 +132,39 @@ class GrpcRoute: @dataclasses.dataclass(frozen=True) class RouteAction: - destinations: List['Destination'] @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'RouteAction': + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteAction': destinations = [ - Destination.from_response(dest) for dest in d["destinations"] + GrpcRoute.Destination.from_response(dest) + for dest in d["destinations"] ] if "destinations" in d else [] return cls(destinations=destinations) @dataclasses.dataclass(frozen=True) class RouteRule: - matches: List['RouteMatch'] - action: 'RouteAction' + matches: List['GrpcRoute.RouteMatch'] + action: 'GrpcRoute.RouteAction' @classmethod - def from_response(cls, d: Dict[str, Any]) -> 'RouteRule': - matches = [RouteMatch.from_response(m) for m in d["matches"] - ] if "matches" in d else [] + def from_response(cls, d: Dict[str, Any]) -> 'GrpcRoute.RouteRule': + matches = [ + GrpcRoute.RouteMatch.from_response(m) for m in d["matches"] + ] if "matches" in d else [] return cls( matches=matches, - action=RouteAction.from_response(d["action"]), + action=GrpcRoute.RouteAction.from_response(d["action"]), ) name: str url: str hostnames: Tuple[str] - rules: Tuple['RouteRule'] + rules: Tuple['GrpcRoute.RouteRule'] meshes: Optional[Tuple[str]] @classmethod - def from_response(cls, name: str, d: Dict[str, Any]) -> 'RouteRule': + def from_response(cls, name: str, d: Dict[str, + Any]) -> 'GrpcRoute.RouteRule': return cls( name=name, url=d["name"], @@ -176,6 +178,9 @@ class _NetworkServicesBase(gcp.api.GcpStandardCloudApiResource, metaclass=abc.ABCMeta): """Base class for NetworkServices APIs.""" + # TODO(https://github.com/grpc/grpc/issues/29532) remove pylint disable + # pylint: disable=abstract-method + def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): super().__init__(api_manager.networkservices(self.api_version), project) # Shortcut to projects/*/locations/ endpoints @@ -185,7 +190,7 @@ class _NetworkServicesBase(gcp.api.GcpStandardCloudApiResource, def api_name(self) -> str: return 'networkservices' - def _execute(self, *args, **kwargs): # pylint: disable=signature-differs + def _execute(self, *args, **kwargs): # pylint: disable=signature-differs,arguments-differ # Workaround TD bug: throttled operations are reported as internal. # Ref b/175345578 retryer = tenacity.Retrying( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 3d389696f17..68689892d8b 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -98,7 +98,10 @@ class PortForwarder: self.subprocess: Optional[subprocess.Popen] = None def connect(self) -> None: - port_mapping = f"{self.local_port}:{self.remote_port}" if self.local_port else f":{self.remote_port}" + if self.local_port: + port_mapping = f"{self.local_port}:{self.remote_port}" + else: + port_mapping = f":{self.remote_port}" cmd = [ "kubectl", "--context", self.context, "--namespace", self.namespace, "port-forward", "--address", self.local_address, self.destination, @@ -167,7 +170,7 @@ class PortForwarder: self.subprocess = None -class KubernetesNamespace: +class KubernetesNamespace: # pylint: disable=too-many-public-methods NEG_STATUS_META = 'cloud.google.com/neg-status' DELETE_GRACE_PERIOD_SEC: int = 5 WAIT_SHORT_TIMEOUT_SEC: int = 60 diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py index d3629c1d9fc..ff395301d33 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py @@ -41,12 +41,14 @@ AuthorizationPolicy = gcp.network_security.AuthorizationPolicy _NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1 _NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1 EndpointPolicy = gcp.network_services.EndpointPolicy +GrpcRoute = gcp.network_services.GrpcRoute +Mesh = gcp.network_services.Mesh # Testing metadata consts TEST_AFFINITY_METADATA_KEY = 'xds_md' -class TrafficDirectorManager: +class TrafficDirectorManager: # pylint: disable=too-many-public-methods compute: _ComputeV1 resource_prefix: str resource_suffix: str @@ -383,8 +385,8 @@ class TrafficDirectorManager: self.compute.wait_for_backends_healthy_status( self.affinity_backend_service, self.affinity_backends) + @staticmethod def _generate_url_map_body( - self, name: str, matcher_name: str, src_hosts, @@ -547,9 +549,9 @@ class TrafficDirectorManager: lo: int = 1024, # To avoid confusion, skip well-known ports. hi: int = 65535, attempts: int = 25) -> int: - for attempts in range(attempts): + for _ in range(attempts): src_port = random.randint(lo, hi) - if not (self.compute.exists_forwarding_rule(src_port)): + if not self.compute.exists_forwarding_rule(src_port): return src_port # TODO(sergiitk): custom exception raise RuntimeError("Couldn't find unused forwarding rule port") @@ -656,8 +658,9 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager): self.netsvc = _NetworkServicesV1Alpha1(gcp_api_manager, project) # Managed resources - self.grpc_route: Optional[_NetworkServicesV1Alpha1.GrpcRoute] = None - self.mesh: Optional[_NetworkServicesV1Alpha1.Mesh] = None + # TODO(gnossen) PTAL at the pylint error + self.grpc_route: Optional[GrpcRoute] = None + self.mesh: Optional[Mesh] = None def create_mesh(self) -> GcpResource: name = self.make_resource_name(self.MESH_NAME) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py index c93a8f41723..5ee953406c0 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import re -from typing import ClassVar, Dict, Optional +from typing import Any, Dict, Optional from google.protobuf import json_format import google.protobuf.message @@ -29,7 +29,7 @@ class GrpcClientHelper: channel: grpc.Channel DEFAULT_RPC_DEADLINE_SEC = 90 - def __init__(self, channel: grpc.Channel, stub_class: ClassVar): + def __init__(self, channel: grpc.Channel, stub_class: Any): self.channel = channel self.stub = stub_class(channel) # This is purely cosmetic to make RPC logs look like method calls. diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py index 2278c5bce9e..622ba60a7b6 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py @@ -17,16 +17,17 @@ https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.p """ import logging -import queue -from typing import Callable, Optional +from typing import Optional -from envoy.extensions.filters.common.fault.v3 import fault_pb2 -from envoy.extensions.filters.http.fault.v3 import fault_pb2 -from envoy.extensions.filters.http.router.v3 import router_pb2 # Envoy protos provided by PyPI package xds-protos # Needs to import the generated Python file to load descriptors +# pylint: disable=unused-import +from envoy.extensions.filters.common.fault.v3 import fault_pb2 as _ +from envoy.extensions.filters.http.fault.v3 import fault_pb2 as _ +from envoy.extensions.filters.http.router.v3 import router_pb2 as _ from envoy.extensions.filters.network.http_connection_manager.v3 import \ - http_connection_manager_pb2 + http_connection_manager_pb2 as _ +# pylint: enable=unused-import from envoy.service.status.v3 import csds_pb2 from envoy.service.status.v3 import csds_pb2_grpc import grpc diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py index 5c9e32c88f7..c0b8fd8a2bb 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py @@ -70,6 +70,7 @@ class KubernetesBaseRunner: self.namespace: Optional[k8s.V1Namespace] = None def run(self, **kwargs): + del kwargs if self.reuse_namespace: self.namespace = self._reuse_namespace() if not self.namespace: @@ -316,7 +317,7 @@ class KubernetesBaseRunner: namespace_name: str, gcp_project: str, gcp_ui_url: str, - end_delta: timedelta = None) -> None: + end_delta: Optional[timedelta] = None) -> None: """Output the link to test server/client logs in GCP Logs Explorer.""" if end_delta is None: end_delta = timedelta(hours=1) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index de3564545e3..803f27c95bc 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -227,25 +227,26 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): class KubernetesClientRunner(base_runner.KubernetesBaseRunner): - def __init__(self, - k8s_namespace, - *, - deployment_name, - image_name, - td_bootstrap_image, - gcp_api_manager: gcp.api.GcpApiManager, - gcp_project: str, - gcp_service_account: str, - xds_server_uri=None, - network='default', - service_account_name=None, - stats_port=8079, - deployment_template='client.deployment.yaml', - service_account_template='service-account.yaml', - reuse_namespace=False, - namespace_template=None, - debug_use_port_forwarding=False, - enable_workload_identity=True): + def __init__( # pylint: disable=too-many-locals + self, + k8s_namespace, + *, + deployment_name, + image_name, + td_bootstrap_image, + gcp_api_manager: gcp.api.GcpApiManager, + gcp_project: str, + gcp_service_account: str, + xds_server_uri=None, + network='default', + service_account_name=None, + stats_port=8079, + deployment_template='client.deployment.yaml', + service_account_template='service-account.yaml', + reuse_namespace=False, + namespace_template=None, + debug_use_port_forwarding=False, + enable_workload_identity=True): super().__init__(k8s_namespace, namespace_template, reuse_namespace) # Settings @@ -282,7 +283,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): self.port_forwarder: Optional[k8s.PortForwarder] = None # TODO(sergiitk): make rpc UnaryCall enum or get it from proto - def run(self, + def run( # pylint: disable=arguments-differ + self, *, server_target, rpc='UnaryCall', @@ -360,7 +362,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): server_target=server_target, rpc_host=rpc_host) - def cleanup(self, *, force=False, force_namespace=False): + def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ if self.port_forwarder: self.port_forwarder.close() self.port_forwarder = None diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index 3d583f66d80..dbeea70918a 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -19,7 +19,6 @@ modules. """ import functools import logging -import threading from typing import Iterator, List, Optional from framework.infrastructure import gcp @@ -158,28 +157,29 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): DEFAULT_MAINTENANCE_PORT = 8080 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 - def __init__(self, - k8s_namespace, - *, - deployment_name, - image_name, - td_bootstrap_image, - gcp_api_manager: gcp.api.GcpApiManager, - gcp_project: str, - gcp_service_account: str, - service_account_name=None, - service_name=None, - neg_name=None, - xds_server_uri=None, - network='default', - deployment_template='server.deployment.yaml', - service_account_template='service-account.yaml', - service_template='server.service.yaml', - reuse_service=False, - reuse_namespace=False, - namespace_template=None, - debug_use_port_forwarding=False, - enable_workload_identity=True): + def __init__( # pylint: disable=too-many-locals + self, + k8s_namespace, + *, + deployment_name, + image_name, + td_bootstrap_image, + gcp_api_manager: gcp.api.GcpApiManager, + gcp_project: str, + gcp_service_account: str, + service_account_name=None, + service_name=None, + neg_name=None, + xds_server_uri=None, + network='default', + deployment_template='server.deployment.yaml', + service_account_template='service-account.yaml', + service_template='server.service.yaml', + reuse_service=False, + reuse_namespace=False, + namespace_template=None, + debug_use_port_forwarding=False, + enable_workload_identity=True): super().__init__(k8s_namespace, namespace_template, reuse_namespace) # Settings @@ -223,7 +223,8 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): self.service: Optional[k8s.V1Service] = None self.port_forwarders: List[k8s.PortForwarder] = [] - def run(self, + def run( # pylint: disable=arguments-differ + self, *, test_port=DEFAULT_TEST_PORT, maintenance_port=None, @@ -343,7 +344,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): pod_name=pod_name)) return servers - def cleanup(self, *, force=False, force_namespace=False): + def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ if self.port_forwarders: for port_forwarder in self.port_forwarders: port_forwarder.close() diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index deacd984dad..a27dbbe5cb8 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -85,6 +85,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): Returns: A bool indicates if the given config is supported. """ + del config return True @classmethod @@ -275,14 +276,13 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): """Assert all RPCs for a method are completing with a certain status.""" # Sending with pre-set QPS for a period of time before_stats = test_client.get_load_balancer_accumulated_stats() - logging.info( - 'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s', - test_client.ip, before_stats) + response_type = 'LoadBalancerAccumulatedStatsResponse' + logging.info('Received %s from test client %s: before:\n%s', + response_type, test_client.ip, before_stats) time.sleep(duration.total_seconds()) after_stats = test_client.get_load_balancer_accumulated_stats() - logging.info( - 'Received LoadBalancerAccumulatedStatsResponse from test client %s: after:\n%s', - test_client.ip, after_stats) + logging.info('Received %s from test client %s: after:\n%s', + response_type, test_client.ip, after_stats) diff_stats = self.diffAccumulatedStatsPerMethod(before_stats, after_stats) @@ -314,7 +314,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): servers: List[XdsTestServer], num_rpcs: int): server_names = [server.pod_name for server in servers] - logger.info(f'Verifying RPCs go to {server_names}') + logger.info('Verifying RPCs go to %s', server_names) lb_stats = self.getClientRpcStats(test_client, num_rpcs) failed = int(lb_stats.num_failures) self.assertLessEqual( @@ -383,11 +383,13 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): else: self.assertSuccessfulRpcs(test_client) logger.info( - '[SUCCESS] Confirmed successful RPC with the updated routing config, version=%s', + ('[SUCCESS] Confirmed successful RPC with the ' + 'updated routing config, version=%s'), route_config_version) except retryers.RetryError as retry_error: logger.info( - 'Retry exhausted. TD routing config propagation failed after timeout %ds. Last seen client config dump: %s', + ('Retry exhausted. TD routing config propagation failed after ' + 'timeout %ds. Last seen client config dump: %s'), timeout_second, dumped_config) raise retry_error @@ -432,7 +434,8 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase): """ super().setUpClass() if cls.server_maintenance_port is None: - cls.server_maintenance_port = KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT + cls.server_maintenance_port = \ + KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT def initTrafficDirectorManager(self) -> TrafficDirectorManager: return TrafficDirectorManager( @@ -532,7 +535,8 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase): # Health Checks and Channelz tests available. # When not provided, use explicit numeric port value, so # Backend Health Checks are created on a fixed port. - cls.server_maintenance_port = KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT + cls.server_maintenance_port = \ + KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager: return TrafficDirectorSecureManager( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py index 17b64eeede4..198f0296ea9 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py @@ -15,8 +15,7 @@ import functools import inspect -import time -from typing import Any, Iterable, List, Mapping, Tuple +from typing import Any, Iterable, Mapping, Tuple from absl import flags from absl import logging @@ -65,8 +64,8 @@ class _UrlMapChangeAggregator: *self._get_test_case_url_map(test_case)) self._set_test_case_url_map(*url_map_parts) + @staticmethod def _get_test_case_url_map( - self, test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]: host_rule = { "hosts": [test_case.hostname()], @@ -140,7 +139,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): for key in absl_flags: setattr(self, key, absl_flags[key]) # Pick a client_namespace_suffix if not set - if self.resource_suffix is None: + if getattr(self, 'resource_suffix', None) is None: self.resource_suffix = "" else: raise NotImplementedError( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py index 2d9ca9f73c0..f7281db02a7 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py @@ -75,7 +75,7 @@ class DumpedXdsConfig(dict): Feel free to add more pre-compute fields. """ - def __init__(self, xds_json: JsonType): + def __init__(self, xds_json: JsonType): # pylint: disable=too-many-branches super().__init__(xds_json) self.json_config = xds_json self.lds = None @@ -102,7 +102,8 @@ class DumpedXdsConfig(dict): for endpoint in xds_config['endpointConfig'][ 'dynamicEndpointConfigs']: self.eds.append(endpoint['endpointConfig']) - except Exception as e: + # TODO(lidiz) reduce the catch to LookupError + except Exception as e: # pylint: disable=broad-except logging.debug('Parsing dumped xDS config failed with %s: %s', type(e), e) for generic_xds_config in self.get('genericXdsConfigs', []): @@ -118,7 +119,8 @@ class DumpedXdsConfig(dict): elif re.search(r'\.ClusterLoadAssignment$', generic_xds_config['typeUrl']): self.eds.append(generic_xds_config["xdsConfig"]) - except Exception as e: + # TODO(lidiz) reduce the catch to LookupError + except Exception as e: # pylint: disable=broad-except logging.debug('Parsing dumped xDS config failed with %s: %s', type(e), e) for endpoint_config in self.eds: @@ -131,7 +133,8 @@ class DumpedXdsConfig(dict): ['socketAddress']['address'], lb_endpoint['endpoint']['address'] ['socketAddress']['portValue'])) - except Exception as e: + # TODO(lidiz) reduce the catch to LookupError + except Exception as e: # pylint: disable=broad-except logging.debug('Parse endpoint failed with %s: %s', type(e), e) @@ -250,6 +253,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): Returns: A bool indicates if the given config is supported. """ + del config return True @staticmethod @@ -293,7 +297,6 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): A tuple contains the updated version of given HostRule and PathMatcher. """ - pass @abc.abstractmethod def xds_config_validate(self, xds_config: DumpedXdsConfig) -> None: @@ -306,16 +309,14 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): xds_config: A DumpedXdsConfig instance can be used as a JSON dict, but also provides helper fields for commonly checked xDS config. """ - pass @abc.abstractmethod - def rpc_distribution_validate(self, client: XdsTestClient) -> None: + def rpc_distribution_validate(self, test_client: XdsTestClient) -> None: """Validates the routing behavior, if any is wrong, raise. Args: - client: A XdsTestClient instance for all sorts of end2end testing. + test_client: A XdsTestClient instance for all sorts of end2end testing. """ - pass @classmethod def hostname(cls): @@ -361,14 +362,15 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): GcpResourceManager().cleanup() def _fetch_and_check_xds_config(self): + # TODO(lidiz) find another way to store last seen xDS config # Cleanup state for this attempt - self._xds_json_config = None + self._xds_json_config = None # pylint: disable=attribute-defined-outside-init # Fetch client config config = self.test_client.csds.fetch_client_status( log_level=logging.INFO) self.assertIsNotNone(config) # Found client config, test it. - self._xds_json_config = json_format.MessageToDict(config) + self._xds_json_config = json_format.MessageToDict(config) # pylint: disable=attribute-defined-outside-init # Execute the child class provided validation logic self.xds_config_validate(DumpedXdsConfig(self._xds_json_config)) @@ -427,9 +429,10 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): f'insufficient endpoints in EDS: want={k} seen={xds_config.endpoints}' ) - def assertRpcStatusCode(self, test_client: XdsTestClient, *, - expected: Iterable[ExpectedResult], length: int, - tolerance: float) -> None: + def assertRpcStatusCode( # pylint: disable=too-many-locals + self, test_client: XdsTestClient, *, + expected: Iterable[ExpectedResult], length: int, + tolerance: float) -> None: """Assert the distribution of RPC statuses over a period of time.""" # Sending with pre-set QPS for a period of time before_stats = test_client.get_load_balancer_accumulated_stats() @@ -468,6 +471,8 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): diff_ratio = abs(seen - want) / total self.assertLessEqual( diff_ratio, tolerance, - 'Expect rpc [%s] to return [%s] at %.2f ratio: seen=%d want=%d total=%d diff_ratio=%.4f > %.2f' - % (rpc, expected_result.status_code, expected_result.ratio, - seen, want, total, diff_ratio, tolerance)) + (f'Expect rpc [{rpc}] to return ' + f'[{expected_result.status_code}] at ' + f'{expected_result.ratio:.2f} ratio: ' + f'seen={seen} want={want} total={total} ' + f'diff_ratio={diff_ratio:.4f} > {tolerance:.2f}')) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py b/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py index 6032eb3d902..eef8f15e26b 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import time -from typing import List, Optional +from typing import List from absl import flags from absl.testing import absltest @@ -22,9 +22,7 @@ from google.protobuf import json_format from framework import xds_k8s_testcase from framework import xds_url_map_testcase from framework.helpers import skips -from framework.infrastructure import k8s from framework.rpc import grpc_channelz -from framework.test_app import server_app logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -50,7 +48,7 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): return config.version_ge('v1.40.x') return False - def test_affinity(self) -> None: + def test_affinity(self) -> None: # pylint: disable=too-many-statements with self.subTest('00_create_health_check'): self.td.create_health_check() @@ -68,20 +66,21 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.test_servers: List[_XdsTestServer] = self.startTestServers( - replica_count=_REPLICA_COUNT) + test_servers = self.startTestServers(replica_count=_REPLICA_COUNT) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - self.test_client: _XdsTestClient = self.startTestClient( - self.test_servers[0], - rpc='EmptyCall', - metadata='EmptyCall:%s:123' % _TEST_AFFINITY_METADATA_KEY) + test_client = self.startTestClient(test_servers[0], + rpc='EmptyCall', + metadata='EmptyCall:%s:123' % + _TEST_AFFINITY_METADATA_KEY) # Validate the number of received endpoints and affinity configs. - config = self.test_client.csds.fetch_client_status( + config = test_client.csds.fetch_client_status( log_level=logging.INFO) self.assertIsNotNone(config) json_config = json_format.MessageToDict(config) @@ -95,34 +94,34 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): self.assertEqual(parsed.cds[0]['lbPolicy'], 'RING_HASH') with self.subTest('08_test_client_xds_config_exists'): - self.assertXdsConfigExists(self.test_client) + self.assertXdsConfigExists(test_client) with self.subTest('09_test_server_received_rpcs_from_test_client'): - self.assertSuccessfulRpcs(self.test_client) + self.assertSuccessfulRpcs(test_client) with self.subTest('10_first_100_affinity_rpcs_pick_same_backend'): - rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT) + rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT) json_lb_stats = json_format.MessageToDict(rpc_stats) rpc_distribution = xds_url_map_testcase.RpcDistributionStats( json_lb_stats) self.assertEqual(1, rpc_distribution.num_peers) self.assertLen( - self.test_client.find_subchannels_with_state( + test_client.find_subchannels_with_state( _ChannelzChannelState.READY), 1, ) self.assertLen( - self.test_client.find_subchannels_with_state( + test_client.find_subchannels_with_state( _ChannelzChannelState.IDLE), 2, ) # Remember the backend inuse, and turn it down later. - self.first_backend_inuse = list( + first_backend_inuse = list( rpc_distribution.raw['rpcsByPeer'].keys())[0] with self.subTest('11_turn_down_server_in_use'): - for s in self.test_servers: - if s.pod_name == self.first_backend_inuse: + for s in test_servers: + if s.pod_name == first_backend_inuse: logging.info('setting backend %s to NOT_SERVING', s.pod_name) s.set_not_serving() @@ -132,7 +131,7 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): parsed = None try: while time.time() < deadline: - config = self.test_client.csds.fetch_client_status( + config = test_client.csds.fetch_client_status( log_level=logging.INFO) self.assertIsNotNone(config) json_config = json_format.MessageToDict(config) @@ -150,14 +149,14 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): logging.info('Client received CSDS response: %s', parsed) with self.subTest('12_next_100_affinity_rpcs_pick_different_backend'): - rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT) + rpc_stats = self.getClientRpcStats(test_client, _RPC_COUNT) json_lb_stats = json_format.MessageToDict(rpc_stats) rpc_distribution = xds_url_map_testcase.RpcDistributionStats( json_lb_stats) self.assertEqual(1, rpc_distribution.num_peers) new_backend_inuse = list( rpc_distribution.raw['rpcsByPeer'].keys())[0] - self.assertNotEqual(new_backend_inuse, self.first_backend_inuse) + self.assertNotEqual(new_backend_inuse, first_backend_inuse) if __name__ == '__main__': diff --git a/tools/run_tests/xds_k8s_test_driver/tests/api_listener_test.py b/tools/run_tests/xds_k8s_test_driver/tests/api_listener_test.py index ace26688e9c..74d8ec99d4a 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/api_listener_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/api_listener_test.py @@ -49,14 +49,16 @@ class ApiListenerTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_default_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + test_server: _XdsTestServer with self.subTest('05_start_test_server'): - test_server: _XdsTestServer = self.startTestServers()[0] + test_server = self.startTestServers()[0] with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - test_client: _XdsTestClient = self.startTestClient(test_server) + test_client = self.startTestClient(test_server) with self.subTest('08_test_client_xds_config_exists'): self.assertXdsConfigExists(test_client) @@ -91,9 +93,9 @@ class ApiListenerTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): dumped_config = _DumpedXdsConfig( json_format.MessageToDict(raw_config)) previous_route_config_version = dumped_config.rds_version - logger.info( - 'received client config from CSDS with two url maps, dump config: %s, rds version: %s', - dumped_config, previous_route_config_version) + logger.info(('received client config from CSDS with two url maps, ' + 'dump config: %s, rds version: %s'), dumped_config, + previous_route_config_version) with self.subTest('14_delete_one_url_map_target_proxy_forwarding_rule'): self.td.delete_forwarding_rule() diff --git a/tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py b/tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py index 2303b643d1a..1b1e6ead2af 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py @@ -41,16 +41,17 @@ class AppNetTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): self.td.create_grpc_route(self.server_xds_host, self.server_xds_port) + test_server: _XdsTestServer with self.subTest('4_start_test_server'): - test_server: _XdsTestServer = self.startTestServers( - replica_count=1)[0] + test_server = self.startTestServers(replica_count=1)[0] with self.subTest('5_setup_server_backends'): self.setupServerBackends() + test_client: _XdsTestClient with self.subTest('6_start_test_client'): - test_client: _XdsTestClient = self.startTestClient( - test_server, config_mesh=self.td.mesh.name) + test_client = self.startTestClient(test_server, + config_mesh=self.td.mesh.name) with self.subTest('7_assert_xds_config_exists'): self.assertXdsConfigExists(test_client) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py index 8ea018a2c7e..4f2901b3a8d 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py @@ -44,14 +44,16 @@ class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('4_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + test_servers: _XdsTestServer with self.subTest('5_start_test_server'): - test_servers: _XdsTestServer = self.startTestServers() + test_servers = self.startTestServers() with self.subTest('6_add_server_backends_to_backend_service'): self.setupServerBackends() + test_client: _XdsTestClient with self.subTest('7_start_test_client'): - test_client: _XdsTestClient = self.startTestClient(test_servers[0]) + test_client = self.startTestClient(test_servers[0]) with self.subTest('8_test_client_xds_config_exists'): self.assertXdsConfigExists(test_client) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py index 419a134e99e..78e1940a40b 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Optional +from typing import List from absl import flags from absl.testing import absltest @@ -70,12 +70,12 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + default_test_servers: List[_XdsTestServer] + same_zone_test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.default_test_servers: List[ - _XdsTestServer] = self.startTestServers() - self.same_zone_test_servers: List[ - _XdsTestServer] = self.startTestServers( - server_runner=self.alternate_server_runner) + default_test_servers = self.startTestServers() + same_zone_test_servers = self.startTestServers( + server_runner=self.alternate_server_runner) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() @@ -85,21 +85,21 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): self.td.alternative_backend_service_add_neg_backends( neg_name_alt, neg_zones_alt) + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - self.test_client: _XdsTestClient = self.startTestClient( - self.default_test_servers[0]) + test_client = self.startTestClient(default_test_servers[0]) with self.subTest('08_test_client_xds_config_exists'): - self.assertXdsConfigExists(self.test_client) + self.assertXdsConfigExists(test_client) with self.subTest('09_test_server_received_rpcs_from_test_client'): - self.assertSuccessfulRpcs(self.test_client) + self.assertSuccessfulRpcs(test_client) with self.subTest('10_change_backend_service'): self.td.patch_url_map(self.server_xds_host, self.server_xds_port, self.td.alternative_backend_service) - self.assertRpcsEventuallyGoToGivenServers( - self.test_client, self.same_zone_test_servers) + self.assertRpcsEventuallyGoToGivenServers(test_client, + same_zone_test_servers) if __name__ == '__main__': diff --git a/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py index 5c429488588..e7e68a1b3e6 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py @@ -70,14 +70,14 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + default_test_servers: List[_XdsTestServer] + alternate_test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.default_test_servers: List[ - _XdsTestServer] = self.startTestServers( - replica_count=self.REPLICA_COUNT) + default_test_servers = self.startTestServers( + replica_count=self.REPLICA_COUNT) - self.alternate_test_servers: List[ - _XdsTestServer] = self.startTestServers( - server_runner=self.secondary_server_runner) + alternate_test_servers = self.startTestServers( + server_runner=self.secondary_server_runner) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends( @@ -86,41 +86,40 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): server_runner=self.secondary_server_runner, max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT) + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - self.test_client: _XdsTestClient = self.startTestClient( - self.default_test_servers[0]) + test_client = self.startTestClient(default_test_servers[0]) with self.subTest('08_test_client_xds_config_exists'): - self.assertXdsConfigExists(self.test_client) + self.assertXdsConfigExists(test_client) with self.subTest('09_primary_locality_receives_requests'): - self.assertRpcsEventuallyGoToGivenServers(self.test_client, - self.default_test_servers) + self.assertRpcsEventuallyGoToGivenServers(test_client, + default_test_servers) with self.subTest( '10_secondary_locality_receives_no_requests_on_partial_primary_failure' ): - self.default_test_servers[0].set_not_serving() - self.assertRpcsEventuallyGoToGivenServers( - self.test_client, self.default_test_servers[1:]) + default_test_servers[0].set_not_serving() + self.assertRpcsEventuallyGoToGivenServers(test_client, + default_test_servers[1:]) with self.subTest('11_gentle_failover'): - self.default_test_servers[1].set_not_serving() + default_test_servers[1].set_not_serving() self.assertRpcsEventuallyGoToGivenServers( - self.test_client, - self.default_test_servers[2:] + self.alternate_test_servers) + test_client, default_test_servers[2:] + alternate_test_servers) with self.subTest( '12_secondary_locality_receives_requests_on_primary_failure'): - self.default_test_servers[2].set_not_serving() - self.assertRpcsEventuallyGoToGivenServers( - self.test_client, self.alternate_test_servers) + default_test_servers[2].set_not_serving() + self.assertRpcsEventuallyGoToGivenServers(test_client, + alternate_test_servers) with self.subTest('13_traffic_resumes_to_healthy_backends'): for i in range(self.REPLICA_COUNT): - self.default_test_servers[i].set_serving() - self.assertRpcsEventuallyGoToGivenServers(self.test_client, - self.default_test_servers) + default_test_servers[i].set_serving() + self.assertRpcsEventuallyGoToGivenServers(test_client, + default_test_servers) if __name__ == '__main__': diff --git a/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py index b7201026cf8..67f56514422 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Optional +from typing import List from absl import flags from absl.testing import absltest @@ -68,35 +68,34 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + default_test_servers: List[_XdsTestServer] + same_zone_test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.default_test_servers: List[ - _XdsTestServer] = self.startTestServers() - self.same_zone_test_servers: List[ - _XdsTestServer] = self.startTestServers( - server_runner=self.alternate_server_runner) + default_test_servers = self.startTestServers() + same_zone_test_servers = self.startTestServers( + server_runner=self.alternate_server_runner) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() self.setupServerBackends(server_runner=self.alternate_server_runner) + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - self.test_client: _XdsTestClient = self.startTestClient( - self.default_test_servers[0]) + test_client = self.startTestClient(default_test_servers[0]) with self.subTest('08_test_client_xds_config_exists'): - self.assertXdsConfigExists(self.test_client) + self.assertXdsConfigExists(test_client) with self.subTest('09_test_server_received_rpcs_from_test_client'): - self.assertSuccessfulRpcs(self.test_client) + self.assertSuccessfulRpcs(test_client) with self.subTest('10_remove_neg'): self.assertRpcsEventuallyGoToGivenServers( - self.test_client, - self.default_test_servers + self.same_zone_test_servers) + test_client, default_test_servers + same_zone_test_servers) self.removeServerBackends( server_runner=self.alternate_server_runner) - self.assertRpcsEventuallyGoToGivenServers(self.test_client, - self.default_test_servers) + self.assertRpcsEventuallyGoToGivenServers(test_client, + default_test_servers) if __name__ == '__main__': diff --git a/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py index 0daa095d065..4965ac13794 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py @@ -12,14 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Optional +from typing import List from absl import flags from absl.testing import absltest from framework import xds_k8s_testcase -from framework.infrastructure import k8s -from framework.test_app import server_app logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -49,33 +47,33 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.test_servers: List[_XdsTestServer] = self.startTestServers( - replica_count=REPLICA_COUNT) + test_servers = self.startTestServers(replica_count=REPLICA_COUNT) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() + test_client: _XdsTestClient with self.subTest('07_start_test_client'): - self.test_client: _XdsTestClient = self.startTestClient( - self.test_servers[0]) + test_client = self.startTestClient(test_servers[0]) with self.subTest('08_test_client_xds_config_exists'): - self.assertXdsConfigExists(self.test_client) + self.assertXdsConfigExists(test_client) with self.subTest('09_test_server_received_rpcs_from_test_client'): - self.assertSuccessfulRpcs(self.test_client) + self.assertSuccessfulRpcs(test_client) with self.subTest('10_round_robin'): num_rpcs = 100 expected_rpcs_per_replica = num_rpcs / REPLICA_COUNT - rpcs_by_peer = self.getClientRpcStats(self.test_client, + rpcs_by_peer = self.getClientRpcStats(test_client, num_rpcs).rpcs_by_peer total_requests_received = sum(rpcs_by_peer[x] for x in rpcs_by_peer) self.assertEqual(total_requests_received, num_rpcs, 'Wrong number of RPCS') - for server in self.test_servers: + for server in test_servers: pod_name = server.pod_name self.assertIn(pod_name, rpcs_by_peer, f'pod {pod_name} did not receive RPCs') diff --git a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py index 2b652240450..8ed8fc907a6 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/subsetting_test.py @@ -13,7 +13,7 @@ # limitations under the License. import collections -from typing import List, Optional +from typing import List from absl import flags from absl import logging @@ -22,8 +22,6 @@ from google.protobuf import json_format from framework import xds_k8s_testcase from framework import xds_url_map_testcase -from framework.infrastructure import k8s -from framework.test_app import server_app flags.adopt_module_key_flags(xds_k8s_testcase) @@ -54,9 +52,9 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): with self.subTest('04_create_forwarding_rule'): self.td.create_forwarding_rule(self.server_xds_port) + test_servers: List[_XdsTestServer] with self.subTest('05_start_test_servers'): - self.test_servers: List[_XdsTestServer] = self.startTestServers( - replica_count=_NUM_BACKENDS) + test_servers = self.startTestServers(replica_count=_NUM_BACKENDS) with self.subTest('06_add_server_backends_to_backend_services'): self.setupServerBackends() @@ -68,7 +66,7 @@ class SubsettingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): self.client_runner.cleanup(force=True) # Create a test client test_client: _XdsTestClient = self.startTestClient( - self.test_servers[0]) + test_servers[0]) # Validate the number of received endpoints config = test_client.csds.fetch_client_status( log_level=logging.INFO) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py index 7eefa2d241b..fe079bbc5e3 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py @@ -208,8 +208,8 @@ class TestHeaderBasedAffinityMultipleHeaders( break self.assertTrue( different_peer_picked, - "the same endpoint was picked for all the headers, expect a different endpoint to be picked" - ) + ("the same endpoint was picked for all the headers, expect a " + "different endpoint to be picked")) self.assertLen( test_client.find_subchannels_with_state( _ChannelzChannelState.READY), diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py index f866020f480..ae9fdc83392 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py @@ -20,7 +20,6 @@ from absl.testing import absltest import grpc from framework import xds_url_map_testcase -from framework.rpc import grpc_testing from framework.test_app import client_app # Type aliases @@ -114,7 +113,7 @@ def _wait_until_backlog_cleared(test_client: XdsTestClient, # Both backlog of both types of RPCs is clear, success, return. return - raise RuntimeError('failed to clear RPC backlog in %s seconds', timeout) + raise RuntimeError('failed to clear RPC backlog in %s seconds' % timeout) class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase): @@ -145,9 +144,9 @@ class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase): filter_config['abort']['percentage']['denominator']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + num_rpcs=_NUM_RPCS) self.assertRpcStatusCode(test_client, expected=(ExpectedResult( rpc_type=RpcTypeUnaryCall, @@ -236,11 +235,10 @@ class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase): filter_config['delay']['percentage']['denominator']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( - test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS, - app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + num_rpcs=_NUM_RPCS, + app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) _wait_until_backlog_cleared(test_client) self.assertRpcStatusCode( test_client, @@ -275,9 +273,9 @@ class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase): filter_config['abort']['percentage']['denominator']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + num_rpcs=_NUM_RPCS) self.assertRpcStatusCode( test_client, expected=(ExpectedResult( @@ -311,11 +309,10 @@ class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase): filter_config['delay']['percentage']['denominator']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( - test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS, - app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + num_rpcs=_NUM_RPCS, + app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC) _wait_until_backlog_cleared(test_client) self.assertRpcStatusCode( test_client, @@ -350,9 +347,9 @@ class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase): filter_config['abort']['percentage']['denominator']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + num_rpcs=_NUM_RPCS) self.assertRpcStatusCode( test_client, expected=(ExpectedResult( diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py index 9eb3439a9d8..159587e88ed 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py @@ -121,10 +121,11 @@ class TestPrefixMatch(xds_url_map_testcase.XdsUrlMapTestCase): [0]['prefixMatch'], _TEST_METADATA_VALUE_UNARY[:2]) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - metadata=_TEST_METADATA, - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, + rpc_types=(RpcTypeUnaryCall,), + metadata=_TEST_METADATA, + num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) @@ -203,10 +204,11 @@ class TestPresentMatch(xds_url_map_testcase.XdsUrlMapTestCase): [0]['presentMatch'], True) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - metadata=_TEST_METADATA, - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, + rpc_types=(RpcTypeUnaryCall,), + metadata=_TEST_METADATA, + num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/metadata_filter_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/metadata_filter_test.py index 615cff61d92..c9e054e09e9 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/metadata_filter_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/metadata_filter_test.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json import logging from typing import Tuple @@ -153,9 +152,8 @@ class TestMetadataFilterMatchAny(xds_url_map_testcase.XdsUrlMapTestCase): "") def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) @@ -203,9 +201,8 @@ class TestMetadataFilterMatchAnyAndAll(xds_url_map_testcase.XdsUrlMapTestCase): "") def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py index 9b64b32472f..4e263fb7257 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py @@ -89,9 +89,8 @@ class TestFullPathMatchUnaryCall(xds_url_map_testcase.XdsUrlMapTestCase): "/grpc.testing.TestService/UnaryCall") def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) @@ -159,7 +158,7 @@ class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase): # Regex UnaryCall -> alternate_backend_service. 'matchRules': [{ 'regexMatch': - '^\/.*\/UnaryCall$' # Unary methods with any services. + r'^\/.*\/UnaryCall$' # Unary methods with any services. }], 'service': GcpResourceManager().alternative_backend_service() }] @@ -169,12 +168,11 @@ class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase): self.assertNumEndpoints(xds_config, 2) self.assertEqual( xds_config.rds['virtualHosts'][0]['routes'][0]['match']['safeRegex'] - ['regex'], '^\/.*\/UnaryCall$') + ['regex'], r'^\/.*\/UnaryCall$') def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send(test_client, - rpc_types=[RpcTypeUnaryCall], - num_rpcs=_NUM_RPCS) + rpc_distribution = self.configure_and_send( + test_client, rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS) self.assertEqual( _NUM_RPCS, rpc_distribution.unary_call_alternative_service_rpc_count) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/retry_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/retry_test.py index 28c28739dcc..8929e99c1b2 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/retry_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/retry_test.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import time from typing import Tuple from absl import flags @@ -91,12 +90,14 @@ class TestRetryUpTo3AttemptsAndFail(xds_url_map_testcase.XdsUrlMapTestCase): self.assertEqual('unavailable', retry_config['retryOn']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( - test_client, - rpc_types=[RpcTypeUnaryCall], - metadata=[(RpcTypeUnaryCall, _RPC_BEHAVIOR_HEADER_NAME, - 'error-code-14,succeed-on-retry-attempt-4')], - num_rpcs=_NUM_RPCS) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + metadata=[ + (RpcTypeUnaryCall, + _RPC_BEHAVIOR_HEADER_NAME, + 'error-code-14,succeed-on-retry-attempt-4') + ], + num_rpcs=_NUM_RPCS) self.assertRpcStatusCode(test_client, expected=(ExpectedResult( rpc_type=RpcTypeUnaryCall, @@ -134,12 +135,14 @@ class TestRetryUpTo4AttemptsAndSucceed(xds_url_map_testcase.XdsUrlMapTestCase): self.assertEqual('unavailable', retry_config['retryOn']) def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( - test_client, - rpc_types=[RpcTypeUnaryCall], - metadata=[(RpcTypeUnaryCall, _RPC_BEHAVIOR_HEADER_NAME, - 'error-code-14,succeed-on-retry-attempt-4')], - num_rpcs=_NUM_RPCS) + self.configure_and_send(test_client, + rpc_types=(RpcTypeUnaryCall,), + metadata=[ + (RpcTypeUnaryCall, + _RPC_BEHAVIOR_HEADER_NAME, + 'error-code-14,succeed-on-retry-attempt-4') + ], + num_rpcs=_NUM_RPCS) self.assertRpcStatusCode(test_client, expected=(ExpectedResult( rpc_type=RpcTypeUnaryCall, diff --git a/tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py b/tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py index 70c580d6bbb..c43dcd00b24 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py @@ -86,7 +86,7 @@ class TestTimeoutInRouteRule(_BaseXdsTimeOutTestCase): return config.server_lang == 'java' def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( + self.configure_and_send( test_client, rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall], # UnaryCall and EmptyCall both sleep-4. @@ -115,9 +115,9 @@ class TestTimeoutInApplication(_BaseXdsTimeOutTestCase): return config.server_lang == 'java' def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( + self.configure_and_send( test_client, - rpc_types=[RpcTypeUnaryCall], + rpc_types=(RpcTypeUnaryCall,), # UnaryCall only with sleep-2; timeout=1s; calls timeout. metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),), app_timeout=1, @@ -134,10 +134,10 @@ class TestTimeoutInApplication(_BaseXdsTimeOutTestCase): class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase): def rpc_distribution_validate(self, test_client: XdsTestClient): - rpc_distribution = self.configure_and_send( + self.configure_and_send( test_client, # UnaryCall only with no sleep; calls succeed. - rpc_types=[RpcTypeUnaryCall], + rpc_types=(RpcTypeUnaryCall,), num_rpcs=_NUM_RPCS) self.assertRpcStatusCode(test_client, expected=(ExpectedResult(