diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 869346e169c..731ded345d7 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -15,7 +15,6 @@ # added to get around circular dependencies caused by k8s.py clashing with # k8s/__init__.py import datetime -import functools import json import logging import pathlib @@ -33,10 +32,12 @@ from framework.infrastructure.k8s_internal import k8s_log_collector from framework.infrastructure.k8s_internal import k8s_port_forwarder logger = logging.getLogger(__name__) + # Type aliases _HighlighterYaml = framework.helpers.highlighter.HighlighterYaml PodLogCollector = k8s_log_collector.PodLogCollector PortForwarder = k8s_port_forwarder.PortForwarder +ApiClient = client.ApiClient V1Deployment = client.V1Deployment V1ServiceAccount = client.V1ServiceAccount V1Pod = client.V1Pod @@ -44,20 +45,27 @@ V1PodList = client.V1PodList V1Service = client.V1Service V1Namespace = client.V1Namespace ApiException = client.ApiException +FailToCreateError = utils.FailToCreateError -def simple_resource_get(func): +def _simple_resource_get(func): - def wrap_not_found_return_none(*args, **kwargs): + def _wrap_simple_resource_get(self: 'KubernetesNamespace', *args, **kwargs): try: - return func(*args, **kwargs) - except client.ApiException as e: + return func(self, *args, **kwargs) + except ApiException as e: if e.status == 404: - # Ignore 404 + # Instead of trowing an error when a resource doesn't exist, + # just return None. return None + elif e.status == 401: + # 401 Unauthorized: token might be expired, attempt auth refresh + self.refresh_auth() + return func(self, *args, **kwargs) + # Reraise for anything else. raise - return wrap_not_found_return_none + return _wrap_simple_resource_get def label_dict_to_selector(labels: dict) -> str: @@ -65,19 +73,37 @@ def label_dict_to_selector(labels: dict) -> str: class KubernetesApiManager: + _client: ApiClient + context: str + apps: client.AppsV1Api + core: client.CoreV1Api + _apis: set - def __init__(self, context): + def __init__(self, context: str): self.context = context - self.client = self._cached_api_client_for_context(context) + self._client = self._new_client_from_context(context) self.apps = client.AppsV1Api(self.client) self.core = client.CoreV1Api(self.client) + self._apis = {self.apps, self.core} + + @property + def client(self) -> ApiClient: + return self._client def close(self): self.client.close() - @classmethod - @functools.lru_cache(None) - def _cached_api_client_for_context(cls, context: str) -> client.ApiClient: + def reload(self): + self.close() + self._client = self._new_client_from_context(self.context) + # Update default configuration so that modules that initialize + # ApiClient implicitly (e.g. kubernetes.watch.Watch) get the updates. + client.Configuration.set_default(self._client.configuration) + for api in self._apis: + api.api_client = self._client + + @staticmethod + def _new_client_from_context(context: str) -> ApiClient: client_instance = kubernetes.config.new_client_from_config( context=context) logger.info('Using kubernetes context "%s", active host: %s', context, @@ -101,16 +127,20 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods self.name = name self.api = api + def refresh_auth(self): + logger.info('Reloading k8s api client to refresh the auth.') + self.api.reload() + def apply_manifest(self, manifest): return utils.create_from_dict(self.api.client, manifest, namespace=self.name) - @simple_resource_get + @_simple_resource_get def get_service(self, name) -> V1Service: return self.api.core.read_namespaced_service(name, self.name) - @simple_resource_get + @_simple_resource_get def get_service_account(self, name) -> V1Service: return self.api.core.read_namespaced_service_account(name, self.name) @@ -134,7 +164,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods propagation_policy='Foreground', grace_period_seconds=grace_period_seconds)) - @simple_resource_get + @_simple_resource_get def get(self) -> V1Namespace: return self.api.core.read_namespace(self.name) @@ -202,7 +232,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods neg_zones: List[str] = neg_info['zones'] return neg_name, neg_zones - @simple_resource_get + @_simple_resource_get def get_deployment(self, name) -> V1Deployment: return self.api.apps.read_namespaced_deployment(name, self.name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py index a7c42797277..7ef152093e4 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py @@ -121,7 +121,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner): cls.TEMPLATE_DIR_RELATIVE_PATH) return templates_path.joinpath(template_name).resolve() - def _create_from_template(self, template_name, **kwargs): + def _create_from_template(self, template_name, **kwargs) -> object: template_file = self._template_file_from_name(template_name) logger.debug("Loading k8s manifest template: %s", template_file) @@ -135,7 +135,26 @@ class KubernetesBaseRunner(base_runner.BaseRunner): if next(manifests, False): raise _RunnerError('Exactly one document expected in manifest ' f'{template_file}') - k8s_objects = self.k8s_namespace.apply_manifest(manifest) + + # TODO(sergiitk, b/178378578): add a retryer. + try: + k8s_objects = self.k8s_namespace.apply_manifest(manifest) + except k8s.FailToCreateError as err_create: + # Since we verified this is not a multi-doc yaml, we should + # expect a single exception. Otherwise, something went horribly + # wrong, or API promises got broken. + if len(err_create.api_exceptions) != 1: + raise + + api_exception: k8s.ApiException = err_create.api_exceptions[0] + if api_exception.status == 401: + # 401 Unauthorized: token might be expired, attempt auth refresh + self.k8s_namespace.refresh_auth() + k8s_objects = self.k8s_namespace.apply_manifest(manifest) + else: + # Reraise for anything else. + raise + if len(k8s_objects) != 1: raise _RunnerError('Expected exactly one object must created from ' f'manifest {template_file}')