diff --git a/tools/run_tests/xds_k8s_test_driver/config/common.cfg b/tools/run_tests/xds_k8s_test_driver/config/common.cfg index 8c46151e278..9e253aa4fd7 100644 --- a/tools/run_tests/xds_k8s_test_driver/config/common.cfg +++ b/tools/run_tests/xds_k8s_test_driver/config/common.cfg @@ -1,4 +1,4 @@ --namespace=interop-psm-security --td_bootstrap_image=gcr.io/trafficdirector-prod/td-grpc-bootstrap:0.10.0 ---logger_levels=__main__:DEBUG,framework:DEBUG +--logger_levels=__main__:DEBUG,framework:INFO --verbosity=0 diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py index bb2cd65d064..bbf66f26b33 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py @@ -11,15 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import contextlib import functools import logging -import os +from typing import Optional # Workaround: `grpc` must be imported before `google.protobuf.json_format`, # to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897 # TODO(sergiitk): Remove after #24897 is solved -import grpc +import grpc # noqa # pylint: disable=unused-import from absl import flags from google.cloud import secretmanager_v1 from google.longrunning import operations_pb2 @@ -28,6 +29,7 @@ from google.rpc import code_pb2 from googleapiclient import discovery import googleapiclient.errors import tenacity +import yaml logger = logging.getLogger(__name__) PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string( @@ -64,6 +66,7 @@ class GcpApiManager: COMPUTE_V1_DISCOVERY_FILE.value) self.private_api_key_secret_name = (private_api_key_secret_name or PRIVATE_API_KEY_SECRET_NAME.value) + # TODO(sergiitk): add options to pass google Credentials self._exit_stack = contextlib.ExitStack() def close(self): @@ -208,13 +211,18 @@ class GcpProjectApiResource: reraise=True) return retryer(operation_request.execute) + @staticmethod + def _resource_pretty_format(body: dict) -> str: + """Return a string with pretty-printed resource body.""" + return yaml.dump(body, explicit_start=True, explicit_end=True) + -class GcpStandardCloudApiResource(GcpProjectApiResource): - DEFAULT_GLOBAL = 'global' +class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta): + GLOBAL_LOCATION = 'global' - def parent(self, location=None): - if not location: - location = self.DEFAULT_GLOBAL + def parent(self, location: Optional[str] = GLOBAL_LOCATION): + if location is None: + location = self.GLOBAL_LOCATION return f'projects/{self.project}/locations/{location}' def resource_full_name(self, name, collection_name): @@ -222,27 +230,41 @@ class GcpStandardCloudApiResource(GcpProjectApiResource): def _create_resource(self, collection: discovery.Resource, body: dict, **kwargs): - logger.debug("Creating %s", body) + logger.info("Creating %s resource:\n%s", self.api_name, + self._resource_pretty_format(body)) create_req = collection.create(parent=self.parent(), body=body, **kwargs) self._execute(create_req) - @staticmethod - def _get_resource(collection: discovery.Resource, full_name): + @property + @abc.abstractmethod + def api_name(self) -> str: + raise NotImplementedError + + @property + @abc.abstractmethod + def api_version(self) -> str: + raise NotImplementedError + + def _get_resource(self, collection: discovery.Resource, full_name): resource = collection.get(name=full_name).execute() - logger.debug("Loaded %r", resource) + logger.info('Loaded %s:\n%s', full_name, + self._resource_pretty_format(resource)) return resource - def _delete_resource(self, collection: discovery.Resource, full_name: str): + def _delete_resource(self, collection: discovery.Resource, + full_name: str) -> bool: logger.debug("Deleting %s", full_name) try: self._execute(collection.delete(name=full_name)) + return True except googleapiclient.errors.HttpError as error: - # noinspection PyProtectedMember - reason = error._get_reason() - logger.info('Delete failed. Error: %s %s', error.resp.status, - reason) + if error.resp and error.resp.status == 404: + logger.info('%s not deleted since it does not exist', full_name) + else: + logger.warning('Failed to delete %s, %r', full_name, error) + return False def _execute(self, request, @@ -255,7 +277,7 @@ class GcpStandardCloudApiResource(GcpProjectApiResource): timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): op_name = operation['name'] logger.debug('Waiting for %s operation, timeout %s sec: %s', - self.__class__.__name__, timeout_sec, op_name) + self.api_name, timeout_sec, op_name) op_request = self.api.projects().locations().operations().get( name=op_name) @@ -266,4 +288,4 @@ class GcpStandardCloudApiResource(GcpProjectApiResource): logger.debug('Completed operation: %s', operation) if 'error' in operation: - raise OperationError(self.__class__.__name__, operation) + raise OperationError(self.api_name, operation) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py index d02499ba6e8..5fa7738c3d6 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py @@ -30,7 +30,6 @@ class ComputeV1(gcp.api.GcpProjectApiResource): # TODO(sergiitk): move someplace better _WAIT_FOR_BACKEND_SEC = 1200 _WAIT_FOR_OPERATION_SEC = 1200 - _GCP_API_RETRIES = 5 @dataclasses.dataclass(frozen=True) class GcpResource: @@ -64,7 +63,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }) def delete_health_check(self, name): - self._delete_resource(self.api.healthChecks(), healthCheck=name) + self._delete_resource(self.api.healthChecks(), 'healthCheck', name) def create_backend_service_traffic_director( self, @@ -110,7 +109,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource): backendService=backend_service.name) def delete_backend_service(self, name): - self._delete_resource(self.api.backendServices(), backendService=name) + self._delete_resource(self.api.backendServices(), 'backendService', + name) def create_url_map( self, @@ -139,7 +139,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }) def delete_url_map(self, name): - self._delete_resource(self.api.urlMaps(), urlMap=name) + self._delete_resource(self.api.urlMaps(), 'urlMap', name) def create_target_grpc_proxy( self, @@ -153,8 +153,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }) def delete_target_grpc_proxy(self, name): - self._delete_resource(self.api.targetGrpcProxies(), - targetGrpcProxy=name) + self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy', + name) def create_target_http_proxy( self, @@ -167,8 +167,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource): }) def delete_target_http_proxy(self, name): - self._delete_resource(self.api.targetHttpProxies(), - targetHttpProxy=name) + self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy', + name) def create_forwarding_rule( self, @@ -191,7 +191,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def delete_forwarding_rule(self, name): self._delete_resource(self.api.globalForwardingRules(), - forwardingRule=name) + 'forwardingRule', name) @staticmethod def _network_endpoint_group_not_ready(neg): @@ -279,29 +279,38 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def _get_resource(self, collection: discovery.Resource, **kwargs) -> GcpResource: resp = collection.get(project=self.project, **kwargs).execute() - logger.debug("Loaded %r", resp) + logger.info('Loaded compute resource:\n%s', + self._resource_pretty_format(resp)) return self.GcpResource(resp['name'], resp['selfLink']) def _insert_resource(self, collection: discovery.Resource, body: Dict[str, Any]) -> GcpResource: - logger.debug("Creating %s", body) + logger.info('Creating compute resource:\n%s', + self._resource_pretty_format(body)) resp = self._execute(collection.insert(project=self.project, body=body)) return self.GcpResource(body['name'], resp['targetLink']) def _patch_resource(self, collection, body, **kwargs): - logger.debug("Patching %s", body) + logger.info('Patching compute resource:\n%s', + self._resource_pretty_format(body)) self._execute( collection.patch(project=self.project, body=body, **kwargs)) - def _delete_resource(self, collection, **kwargs): + def _delete_resource(self, collection: discovery.Resource, + resource_type: str, resource_name: str) -> bool: try: - self._execute(collection.delete(project=self.project, **kwargs)) + params = {"project": self.project, resource_type: resource_name} + self._execute(collection.delete(**params)) return True except googleapiclient.errors.HttpError as error: - # noinspection PyProtectedMember - reason = error._get_reason() - logger.info('Delete failed. Error: %s %s', error.resp.status, - reason) + if error.resp and error.resp.status == 404: + logger.info( + 'Resource %s "%s" not deleted since it does not exist', + resource_type, resource_name) + else: + logger.warning('Failed to delete %s "%s", %r', resource_type, + resource_name, error) + return False @staticmethod def _operation_status_done(operation): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py index 6ca8a051301..cb60840b376 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py @@ -23,8 +23,6 @@ logger = logging.getLogger(__name__) class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource): - API_NAME = 'networksecurity' - API_VERSION = 'v1alpha1' SERVER_TLS_POLICIES = 'serverTlsPolicies' CLIENT_TLS_POLICIES = 'clientTlsPolicies' @@ -47,10 +45,18 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource): create_time: str def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): - super().__init__(api_manager.networksecurity(self.API_VERSION), project) + super().__init__(api_manager.networksecurity(self.api_version), project) # Shortcut to projects/*/locations/ endpoints self._api_locations = self.api.projects().locations() + @property + def api_name(self) -> str: + return 'networksecurity' + + @property + def api_version(self) -> str: + return 'v1alpha1' + def create_server_tls_policy(self, name, body: dict): return self._create_resource(self._api_locations.serverTlsPolicies(), body, @@ -97,7 +103,7 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource): collection=self._api_locations.clientTlsPolicies(), full_name=self.resource_full_name(name, self.CLIENT_TLS_POLICIES)) - def _execute(self, *args, **kwargs): + def _execute(self, *args, **kwargs): # pylint: disable=signature-differs # Workaround TD bug: throttled operations are reported as internal. # Ref b/175345578 retryer = tenacity.Retrying( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py index 7b85787a2cb..b331ade69c6 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py @@ -24,9 +24,6 @@ logger = logging.getLogger(__name__) class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource): - API_NAME = 'networkservices' - API_VERSION = 'v1alpha1' - DEFAULT_GLOBAL = 'global' ENDPOINT_CONFIG_SELECTORS = 'endpointConfigSelectors' @dataclasses.dataclass(frozen=True) @@ -42,10 +39,18 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource): create_time: str def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): - super().__init__(api_manager.networkservices(self.API_VERSION), project) + super().__init__(api_manager.networkservices(self.api_version), project) # Shortcut to projects/*/locations/ endpoints self._api_locations = self.api.projects().locations() + @property + def api_name(self) -> str: + return 'networkservices' + + @property + def api_version(self) -> str: + return 'v1alpha1' + def create_endpoint_config_selector(self, name, body: dict): return self._create_resource( self._api_locations.endpointConfigSelectors(), @@ -74,7 +79,7 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource): full_name=self.resource_full_name(name, self.ENDPOINT_CONFIG_SELECTORS)) - def _execute(self, *args, **kwargs): + def _execute(self, *args, **kwargs): # pylint: disable=signature-differs # Workaround TD bug: throttled operations are reported as internal. # Ref b/175345578 retryer = tenacity.Retrying( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 287e525ca5e..ca446bc19d6 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -67,7 +67,11 @@ class KubernetesApiManager: @classmethod @functools.lru_cache(None) def _cached_api_client_for_context(cls, context: str) -> client.ApiClient: - return kubernetes.config.new_client_from_config(context=context) + client_instance = kubernetes.config.new_client_from_config( + context=context) + logger.info('Using kubernetes context "%s", active host: %s', context, + client_instance.configuration.host) + return client_instance class PortForwardingError(Exception): @@ -134,8 +138,8 @@ class KubernetesNamespace: def _wait_for_deleted_service_with_retry(): service = self.get_service(name) if service is not None: - logger.info('Waiting for service %s to be deleted', - service.metadata.name) + logger.debug('Waiting for service %s to be deleted', + service.metadata.name) return service _wait_for_deleted_service_with_retry() @@ -151,13 +155,13 @@ class KubernetesNamespace: def _wait_for_deleted_service_account_with_retry(): service_account = self.get_service_account(name) if service_account is not None: - logger.info('Waiting for service account %s to be deleted', - service_account.metadata.name) + logger.debug('Waiting for service account %s to be deleted', + service_account.metadata.name) return service_account _wait_for_deleted_service_account_with_retry() - def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=2): + def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=5): @retrying.retry(retry_on_result=lambda r: r is not None, stop_max_delay=timeout_sec * 1000, @@ -165,8 +169,8 @@ class KubernetesNamespace: def _wait_for_deleted_namespace_with_retry(): namespace = self.get() if namespace is not None: - logger.info('Waiting for namespace %s to be deleted', - namespace.metadata.name) + logger.debug('Waiting for namespace %s to be deleted', + namespace.metadata.name) return namespace _wait_for_deleted_namespace_with_retry() @@ -179,7 +183,8 @@ class KubernetesNamespace: def _wait_for_service_neg(): service = self.get_service(name) if self.NEG_STATUS_META not in service.metadata.annotations: - logger.info('Waiting for service %s NEG', service.metadata.name) + logger.debug('Waiting for service %s NEG', + service.metadata.name) return False return True @@ -216,7 +221,7 @@ class KubernetesNamespace: name, count=1, timeout_sec=60, - wait_sec=1): + wait_sec=3): @retrying.retry( retry_on_result=lambda r: not self._replicas_available(r, count), @@ -224,7 +229,7 @@ class KubernetesNamespace: wait_fixed=wait_sec * 1000) def _wait_for_deployment_available_replicas(): deployment = self.get_deployment(name) - logger.info( + logger.debug( 'Waiting for deployment %s to have %s available ' 'replicas, current count %s', deployment.metadata.name, count, deployment.status.available_replicas) @@ -243,7 +248,7 @@ class KubernetesNamespace: def _wait_for_deleted_deployment_with_retry(): deployment = self.get_deployment(deployment_name) if deployment is not None: - logger.info( + logger.debug( 'Waiting for deployment %s to be deleted. ' 'Non-terminated replicas: %s', deployment.metadata.name, deployment.status.replicas) @@ -266,8 +271,8 @@ class KubernetesNamespace: wait_fixed=wait_sec * 1000) def _wait_for_pod_started(): pod = self.get_pod(pod_name) - logger.info('Waiting for pod %s to start, current phase: %s', - pod.metadata.name, pod.status.phase) + logger.debug('Waiting for pod %s to start, current phase: %s', + pod.metadata.name, pod.status.phase) return pod _wait_for_pod_started() @@ -324,8 +329,7 @@ class KubernetesNamespace: pf.kill() stdout, _stderr = pf.communicate(timeout=5) logger.info('Port forwarding stopped') - # TODO(sergiitk): make debug - logger.info('Port forwarding remaining stdout: %s', stdout) + logger.debug('Port forwarding remaining stdout: %s', stdout) @staticmethod def _pod_started(pod: V1Pod): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py index 8f5954de0dd..80c8d2e98bf 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py @@ -66,7 +66,7 @@ class TrafficDirectorManager: self.backend_service: Optional[GcpResource] = None self.url_map: Optional[GcpResource] = None self.target_proxy: Optional[GcpResource] = None - # TODO(sergiitk): fix + # TODO(sergiitk): remove this flag once target proxy resource loaded self.target_proxy_is_http: bool = False self.forwarding_rule: Optional[GcpResource] = None self.backends: Set[ZonalGcpResource] = set() @@ -108,7 +108,7 @@ class TrafficDirectorManager: raise ValueError('Health check %s already created, delete it first', self.health_check.name) name = self._ns_name(self.HEALTH_CHECK_NAME) - logger.info('Creating %s Health Check %s', protocol.name, name) + logger.info('Creating %s Health Check "%s"', protocol.name, name) if protocol is HealthCheckProtocol.TCP: resource = self.compute.create_health_check_tcp( name, use_serving_port=True) @@ -123,7 +123,7 @@ class TrafficDirectorManager: name = self.health_check.name else: return - logger.info('Deleting Health Check %s', name) + logger.info('Deleting Health Check "%s"', name) self.compute.delete_health_check(name) self.health_check = None @@ -131,7 +131,7 @@ class TrafficDirectorManager: self, protocol: BackendServiceProtocol = BackendServiceProtocol.GRPC): name = self._ns_name(self.BACKEND_SERVICE_NAME) - logger.info('Creating %s Backend Service %s', protocol.name, name) + logger.info('Creating %s Backend Service "%s"', protocol.name, name) resource = self.compute.create_backend_service_traffic_director( name, health_check=self.health_check, protocol=protocol) self.backend_service = resource @@ -148,15 +148,16 @@ class TrafficDirectorManager: name = self.backend_service.name else: return - logger.info('Deleting Backend Service %s', name) + logger.info('Deleting Backend Service "%s"', name) self.compute.delete_backend_service(name) self.backend_service = None def backend_service_add_neg_backends(self, name, zones): - logger.info('Loading NEGs') + logger.info('Waiting for Network Endpoint Groups recognize endpoints.') for zone in zones: backend = self.compute.wait_for_network_endpoint_group(name, zone) - logger.info('Loaded NEG %s in zone %s', backend.name, backend.zone) + logger.info('Loaded NEG "%s" in zone %s', backend.name, + backend.zone) self.backends.add(backend) self.backend_service_add_backends() @@ -188,7 +189,7 @@ class TrafficDirectorManager: src_address = f'{src_host}:{src_port}' name = self._ns_name(self.URL_MAP_NAME) matcher_name = self._ns_name(self.URL_MAP_PATH_MATCHER_NAME) - logger.info('Creating URL map %s %s -> %s', name, src_address, + logger.info('Creating URL map "%s": %s -> %s', name, src_address, self.backend_service.name) resource = self.compute.create_url_map(name, matcher_name, [src_address], @@ -203,14 +204,14 @@ class TrafficDirectorManager: name = self.url_map.name else: return - logger.info('Deleting URL Map %s', name) + logger.info('Deleting URL Map "%s"', name) self.compute.delete_url_map(name) self.url_map = None def create_target_grpc_proxy(self): # TODO(sergiitk): merge with create_target_http_proxy() name = self._ns_name(self.TARGET_PROXY_NAME) - logger.info('Creating target GRPC proxy %s to url map %s', name, + logger.info('Creating target GRPC proxy "%s" to URL map %s', name, self.url_map.name) resource = self.compute.create_target_grpc_proxy(name, self.url_map) self.target_proxy = resource @@ -222,7 +223,7 @@ class TrafficDirectorManager: name = self.target_proxy.name else: return - logger.info('Deleting Target GRPC proxy %s', name) + logger.info('Deleting Target GRPC proxy "%s"', name) self.compute.delete_target_grpc_proxy(name) self.target_proxy = None self.target_proxy_is_http = False @@ -230,7 +231,7 @@ class TrafficDirectorManager: def create_target_http_proxy(self): # TODO(sergiitk): merge with create_target_grpc_proxy() name = self._ns_name(self.TARGET_PROXY_NAME) - logger.info('Creating target HTTP proxy %s to url map %s', name, + logger.info('Creating target HTTP proxy "%s" to url map %s', name, self.url_map.name) resource = self.compute.create_target_http_proxy(name, self.url_map) self.target_proxy = resource @@ -243,7 +244,7 @@ class TrafficDirectorManager: name = self.target_proxy.name else: return - logger.info('Deleting HTTP Target proxy %s', name) + logger.info('Deleting HTTP Target proxy "%s"', name) self.compute.delete_target_http_proxy(name) self.target_proxy = None self.target_proxy_is_http = False @@ -251,8 +252,9 @@ class TrafficDirectorManager: def create_forwarding_rule(self, src_port: int): name = self._ns_name(self.FORWARDING_RULE_NAME) src_port = int(src_port) - logging.info('Creating forwarding rule %s 0.0.0.0:%s -> %s in %s', name, - src_port, self.target_proxy.url, self.network) + logging.info( + 'Creating forwarding rule "%s" in network "%s": 0.0.0.0:%s -> %s', + name, self.network, src_port, self.target_proxy.url) resource = self.compute.create_forwarding_rule(name, src_port, self.target_proxy, self.network_url) @@ -266,7 +268,7 @@ class TrafficDirectorManager: name = self.forwarding_rule.name else: return - logger.info('Deleting Forwarding rule %s', name) + logger.info('Deleting Forwarding rule "%s"', name) self.compute.delete_forwarding_rule(name) self.forwarding_rule = None diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py index ce0f7659914..ef7e0d2a2ae 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py @@ -46,7 +46,8 @@ class GrpcClientHelper: req: Message, wait_for_ready_sec: Optional[int] = DEFAULT_WAIT_FOR_READY_SEC, connection_timeout_sec: Optional[ - int] = DEFAULT_CONNECTION_TIMEOUT_SEC) -> Message: + int] = DEFAULT_CONNECTION_TIMEOUT_SEC, + log_level: Optional[int] = logging.DEBUG) -> Message: if wait_for_ready_sec is None: wait_for_ready_sec = self.DEFAULT_WAIT_FOR_READY_SEC if connection_timeout_sec is None: @@ -56,14 +57,14 @@ class GrpcClientHelper: rpc_callable: grpc.UnaryUnaryMultiCallable = getattr(self.stub, rpc) call_kwargs = dict(wait_for_ready=True, timeout=timeout_sec) - self._log_debug(rpc, req, call_kwargs) + self._log_rpc_request(rpc, req, call_kwargs, log_level) return rpc_callable(req, **call_kwargs) - def _log_debug(self, rpc, req, call_kwargs): - logger.debug('RPC %s.%s(request=%s(%r), %s)', - self.log_service_name, rpc, req.__class__.__name__, - json_format.MessageToDict(req), - ', '.join({f'{k}={v}' for k, v in call_kwargs.items()})) + def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG): + logger.log(logging.DEBUG if log_level is None else log_level, + 'RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc, + req.__class__.__name__, json_format.MessageToDict(req), + ', '.join({f'{k}={v}' for k, v in call_kwargs.items()})) class GrpcApp: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py index aea22e8bf96..58fff6418a1 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py @@ -15,6 +15,7 @@ This contains helpers for gRPC services defined in https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto """ +import logging from typing import Optional import grpc @@ -25,7 +26,7 @@ from src.proto.grpc.testing import messages_pb2 # Type aliases _LoadBalancerStatsRequest = messages_pb2.LoadBalancerStatsRequest -_LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse +LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper): @@ -40,12 +41,13 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper): *, num_rpcs: int, timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC, - ) -> _LoadBalancerStatsResponse: + ) -> LoadBalancerStatsResponse: if timeout_sec is None: timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC return self.call_unary_with_deadline(rpc='GetClientStats', - wait_for_ready_sec=timeout_sec, req=_LoadBalancerStatsRequest( num_rpcs=num_rpcs, - timeout_sec=timeout_sec)) + timeout_sec=timeout_sec), + wait_for_ready_sec=timeout_sec, + log_level=logging.INFO) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py index 85d7dbe0897..58a73d44a41 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py @@ -28,10 +28,9 @@ class RunnerError(Exception): """Error running app""" -TEMPLATE_DIR = '../../kubernetes-manifests' - - class KubernetesBaseRunner: + TEMPLATE_DIR_NAME = 'kubernetes-manifests' + TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}' def __init__(self, k8s_namespace, @@ -75,17 +74,19 @@ class KubernetesBaseRunner: for manifest in yml: yield manifest - @staticmethod - def _template_file_from_name(template_name): - templates_path = pathlib.Path(__file__).parent / TEMPLATE_DIR - return templates_path.joinpath(template_name).absolute() + @classmethod + def _template_file_from_name(cls, template_name): + templates_path = (pathlib.Path(__file__).parent / + cls.TEMPLATE_DIR_RELATIVE_PATH) + return templates_path.joinpath(template_name).resolve() def _create_from_template(self, template_name, **kwargs): template_file = self._template_file_from_name(template_name) - logger.info("Loading template: %s", template_file) + logger.debug("Loading k8s manifest template: %s", template_file) yaml_doc = self._render_template(template_file, **kwargs) - logger.info("Rendered template:\n%s\n", yaml_doc) + logger.info("Rendered template %s/%s:\n%s", self.TEMPLATE_DIR_NAME, + template_name, yaml_doc) manifests = self._manifests_from_str(yaml_doc) manifest = next(manifests) @@ -121,10 +122,11 @@ class KubernetesBaseRunner: raise RunnerError('Expected V1Namespace to be created ' f'from manifest {template}') if namespace.metadata.name != kwargs['namespace_name']: - raise RunnerError('Namespace created with unexpected name: ' + raise RunnerError('V1Namespace created with unexpected name: ' f'{namespace.metadata.name}') - logger.info('Deployment %s created at %s', namespace.metadata.self_link, - namespace.metadata.creation_timestamp) + logger.debug('V1Namespace %s created at %s', + namespace.metadata.self_link, + namespace.metadata.creation_timestamp) return namespace def _create_service_account(self, template, @@ -136,9 +138,9 @@ class KubernetesBaseRunner: if resource.metadata.name != kwargs['service_account_name']: raise RunnerError('V1ServiceAccount created with unexpected name: ' f'{resource.metadata.name}') - logger.info('V1ServiceAccount %s created at %s', - resource.metadata.self_link, - resource.metadata.creation_timestamp) + logger.debug('V1ServiceAccount %s created at %s', + resource.metadata.self_link, + resource.metadata.creation_timestamp) return resource def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment: @@ -147,11 +149,11 @@ class KubernetesBaseRunner: raise RunnerError('Expected V1Deployment to be created ' f'from manifest {template}') if deployment.metadata.name != kwargs['deployment_name']: - raise RunnerError('Deployment created with unexpected name: ' + raise RunnerError('V1Deployment created with unexpected name: ' f'{deployment.metadata.name}') - logger.info('Deployment %s created at %s', - deployment.metadata.self_link, - deployment.metadata.creation_timestamp) + logger.debug('V1Deployment %s created at %s', + deployment.metadata.self_link, + deployment.metadata.creation_timestamp) return deployment def _create_service(self, template, **kwargs) -> k8s.V1Service: @@ -160,13 +162,14 @@ class KubernetesBaseRunner: raise RunnerError('Expected V1Service to be created ' f'from manifest {template}') if service.metadata.name != kwargs['service_name']: - raise RunnerError('Service created with unexpected name: ' + raise RunnerError('V1Service created with unexpected name: ' f'{service.metadata.name}') - logger.info('Service %s created at %s', service.metadata.self_link, - service.metadata.creation_timestamp) + logger.debug('V1Service %s created at %s', service.metadata.self_link, + service.metadata.creation_timestamp) return service def _delete_deployment(self, name, wait_for_deletion=True): + logger.info('Deleting deployment %s', name) try: self.k8s_namespace.delete_deployment(name) except k8s.ApiException as e: @@ -176,9 +179,10 @@ class KubernetesBaseRunner: if wait_for_deletion: self.k8s_namespace.wait_for_deployment_deleted(name) - logger.info('Deployment %s deleted', name) + logger.debug('Deployment %s deleted', name) def _delete_service(self, name, wait_for_deletion=True): + logger.info('Deleting service %s', name) try: self.k8s_namespace.delete_service(name) except k8s.ApiException as e: @@ -188,9 +192,10 @@ class KubernetesBaseRunner: if wait_for_deletion: self.k8s_namespace.wait_for_service_deleted(name) - logger.info('Service %s deleted', name) + logger.debug('Service %s deleted', name) def _delete_service_account(self, name, wait_for_deletion=True): + logger.info('Deleting service account %s', name) try: self.k8s_namespace.delete_service_account(name) except k8s.ApiException as e: @@ -200,9 +205,10 @@ class KubernetesBaseRunner: if wait_for_deletion: self.k8s_namespace.wait_for_service_account_deleted(name) - logger.info('Service account %s deleted', name) + logger.debug('Service account %s deleted', name) def _delete_namespace(self, wait_for_deletion=True): + logger.info('Deleting namespace %s', self.k8s_namespace.name) try: self.k8s_namespace.delete() except k8s.ApiException as e: @@ -212,10 +218,10 @@ class KubernetesBaseRunner: if wait_for_deletion: self.k8s_namespace.wait_for_namespace_deleted() - logger.info('Namespace %s deleted', self.k8s_namespace.name) + logger.debug('Namespace %s deleted', self.k8s_namespace.name) def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs): - logger.info('Waiting for deployment %s to have %s available replicas', + logger.info('Waiting for deployment %s to have %s available replica(s)', name, count) self.k8s_namespace.wait_for_deployment_available_replicas( name, count, **kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 1a63d28f56c..6d6eaa714b1 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -72,7 +72,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): *, num_rpcs: int, timeout_sec: Optional[int] = None, - ) -> grpc_testing._LoadBalancerStatsResponse: + ) -> grpc_testing.LoadBalancerStatsResponse: """ Shortcut to LoadBalancerStatsServiceClient.get_client_stats() """ @@ -86,19 +86,22 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): retryer = tenacity.Retrying( retry=(tenacity.retry_if_result(lambda r: r is None) | tenacity.retry_if_exception_type()), - wait=tenacity.wait_exponential(max=10), + wait=tenacity.wait_exponential(min=10, max=25), stop=tenacity.stop_after_delay(60 * 3), reraise=True) + logger.info( + 'Waiting for client %s to establish READY gRPC channel with %s', + self.ip, self.server_target) channel = retryer(self.get_active_server_channel) - logger.info('Active server channel found: channel_id: %s, %s', - channel.ref.channel_id, channel.ref.name) - logger.debug('Server channel:\n%r', channel) + logger.info( + 'gRPC channel between client %s and %s transitioned to READY:\n%s', + self.ip, self.server_target, channel) def get_active_server_channel(self) -> Optional[grpc_channelz.Channel]: for channel in self.get_server_channels(): state: _ChannelConnectivityState = channel.data.state - logger.debug('Server channel: %s, state: %s', channel.ref.name, - _ChannelConnectivityState.State.Name(state.state)) + logger.info('Server channel: %s, state: %s', channel.ref.name, + _ChannelConnectivityState.State.Name(state.state)) if state.state is _ChannelConnectivityState.READY: return channel raise self.NotFound('Client has no active channel with the server') @@ -198,8 +201,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): # Experimental, for local debugging. if self.debug_use_port_forwarding: - logger.info('Enabling port forwarding from %s:%s', pod_ip, - self.stats_port) + logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', + pod_ip, self.stats_port) self.port_forwarder = self.k8s_namespace.port_forward_pod( pod, remote_port=self.stats_port) rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index 05810c66027..6f4fdc6b6cd 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -233,8 +233,8 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): rpc_host = None # Experimental, for local debugging. if self.debug_use_port_forwarding: - logger.info('Enabling port forwarding from %s:%s', pod_ip, - maintenance_port) + logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', + pod_ip, maintenance_port) self.port_forwarder = self.k8s_namespace.port_forward_pod( pod, remote_port=maintenance_port) rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index 4a39988fb0d..02bf89715f4 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -21,10 +21,11 @@ from absl.testing import absltest from framework import xds_flags from framework import xds_k8s_flags -from framework.infrastructure import k8s from framework.infrastructure import gcp +from framework.infrastructure import k8s from framework.infrastructure import traffic_director from framework.rpc import grpc_channelz +from framework.rpc import grpc_testing from framework.test_app import client_app from framework.test_app import server_app @@ -39,6 +40,7 @@ flags.adopt_module_key_flags(xds_k8s_flags) # Type aliases XdsTestServer = server_app.XdsTestServer XdsTestClient = client_app.XdsTestClient +_LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse class XdsKubernetesTestCase(absltest.TestCase): @@ -99,7 +101,7 @@ class XdsKubernetesTestCase(absltest.TestCase): cls.gcp_api_manager.close() def tearDown(self): - logger.debug('######## tearDown(): resource cleanup initiated ########') + logger.info('----- TestMethod %s teardown -----', self.id()) self.td.cleanup(force=self.force_cleanup) self.client_runner.cleanup(force=self.force_cleanup) self.server_runner.cleanup(force=self.force_cleanup, @@ -120,19 +122,22 @@ class XdsKubernetesTestCase(absltest.TestCase): test_client: XdsTestClient, num_rpcs: int = 100): # Run the test + lb_stats: _LoadBalancerStatsResponse lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs) + logger.info( + 'Received LoadBalancerStatsResponse from test client %s:\n%s', + test_client.ip, lb_stats) # Check the results self.assertAllBackendsReceivedRpcs(lb_stats) self.assertFailedRpcsAtMost(lb_stats, 0) def assertAllBackendsReceivedRpcs(self, lb_stats): # TODO(sergiitk): assert backends length - logger.info(lb_stats.rpcs_by_peer) for backend, rpcs_count in lb_stats.rpcs_by_peer.items(): self.assertGreater( int(rpcs_count), 0, - msg='Backend {backend} did not receive a single RPC') + msg=f'Backend {backend} did not receive a single RPC') def assertFailedRpcsAtMost(self, lb_stats, limit): failed = int(lb_stats.num_failures) @@ -188,8 +193,6 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase): **kwargs) -> XdsTestClient: test_client = self.client_runner.run(server_target=test_server.xds_uri, **kwargs) - logger.debug('Waiting fot the client to establish healthy channel with ' - 'the server') test_client.wait_for_active_server_channel() return test_client @@ -263,8 +266,6 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase): test_client = self.client_runner.run(server_target=test_server.xds_uri, secure_mode=True, **kwargs) - logger.debug('Waiting fot the client to establish healthy channel with ' - 'the server') test_client.wait_for_active_server_channel() return test_client diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client-secure.deployment.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client-secure.deployment.yaml index d583d6eda25..18630ad5ce9 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client-secure.deployment.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client-secure.deployment.yaml @@ -1,3 +1,4 @@ +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -78,3 +79,4 @@ spec: - name: gke-spiffe-certs-volume csi: driver: certs.spiffe.gke.io +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml index ef1d30401b9..793bef61ffa 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml @@ -1,3 +1,4 @@ +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -65,3 +66,4 @@ spec: - name: grpc-td-conf emptyDir: medium: Memory +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/namespace.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/namespace.yaml index a29aed9681f..8b8153a2042 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/namespace.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/namespace.yaml @@ -1,3 +1,4 @@ +--- apiVersion: v1 kind: Namespace metadata: @@ -5,3 +6,4 @@ metadata: labels: name: ${namespace_name} owner: xds-k8s-interop-test +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml index bf5fb0e2aa4..0687aa00067 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml @@ -1,3 +1,4 @@ +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -77,3 +78,4 @@ spec: - name: gke-spiffe-certs-volume csi: driver: certs.spiffe.gke.io +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml index 17c742bf57f..e3ace2ebbe8 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml @@ -1,3 +1,4 @@ +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -32,3 +33,4 @@ spec: requests: cpu: 100m memory: 512Mi +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.service.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.service.yaml index 40eb2aebd28..376de175015 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.service.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.service.yaml @@ -1,3 +1,4 @@ +--- apiVersion: v1 kind: Service metadata: @@ -15,3 +16,4 @@ spec: - port: ${test_port} protocol: TCP targetPort: ${test_port} +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/service-account.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/service-account.yaml index 773577d04da..35d99dfae51 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/service-account.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/service-account.yaml @@ -1,3 +1,4 @@ +--- apiVersion: v1 kind: ServiceAccount metadata: @@ -7,3 +8,4 @@ metadata: owner: xds-k8s-interop-test annotations: iam.gke.io/gcp-service-account: ${gcp_service_account} +...