From 3817db13b642dc963bf4d2bf9284e357b91c8613 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Tue, 16 Aug 2022 13:40:55 -0700 Subject: [PATCH] xDS interop: Move k8s-specific logic out of the test app (#30591) Separates xDS Test Client/Server (represent an interface to corresponding workload running remotely) from their runners (kubernetes-specific logic to provision the workloads with prerequisites). This is a refactoring, should not change the behavior. --- tools/run_tests/xds_k8s_test_driver/README.md | 2 +- .../bin/cleanup/cleanup.py | 16 +- .../xds_k8s_test_driver/bin/run_td_setup.py | 10 +- .../bin/run_test_client.py | 8 +- .../bin/run_test_server.py | 9 +- .../framework/bootstrap_generator_testcase.py | 8 +- .../framework/test_app/client_app.py | 176 +----------- .../framework/test_app/runners/__init__.py | 13 + .../framework/test_app/runners/base_runner.py | 63 +++++ .../test_app/runners/k8s/__init__.py | 13 + .../k8s/k8s_base_runner.py} | 116 ++++---- .../runners/k8s/k8s_xds_client_runner.py | 194 ++++++++++++++ .../runners/k8s/k8s_xds_server_runner.py | 252 ++++++++++++++++++ .../framework/test_app/server_app.py | 238 +---------------- .../framework/xds_k8s_testcase.py | 6 +- .../framework/xds_url_map_test_resources.py | 21 +- .../tests/bootstrap_generator_test.py | 8 +- .../tests/change_backend_service_test.py | 5 +- .../tests/failover_test.py | 5 +- .../tests/remove_neg_test.py | 5 +- 20 files changed, 646 insertions(+), 522 deletions(-) create mode 100644 tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/__init__.py create mode 100644 tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/base_runner.py create mode 100644 tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/__init__.py rename tools/run_tests/xds_k8s_test_driver/framework/test_app/{base_runner.py => runners/k8s/k8s_base_runner.py} (82%) create mode 100644 tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py create mode 100644 tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py diff --git a/tools/run_tests/xds_k8s_test_driver/README.md b/tools/run_tests/xds_k8s_test_driver/README.md index 0c767784779..ea93d4cb852 100644 --- a/tools/run_tests/xds_k8s_test_driver/README.md +++ b/tools/run_tests/xds_k8s_test_driver/README.md @@ -16,7 +16,7 @@ changes to this codebase at the moment. simpler CRUD - [x] Security: manage `roles/iam.workloadIdentityUser` role grant lifecycle for dynamically-named namespaces -- [ ] Restructure `framework.test_app` and `framework.xds_k8s*` into a module +- [x] Restructure `framework.test_app` and `framework.xds_k8s*` into a module containing xDS-interop-specific logic - [ ] Address inline TODOs in code - [x] Improve README.md documentation, explain helpers in bin/ folder diff --git a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py b/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py index 1d39cb87066..1726227d373 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py @@ -42,13 +42,13 @@ from framework import xds_k8s_flags from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director -from framework.test_app import client_app -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) Json = Any -KubernetesClientRunner = client_app.KubernetesClientRunner -KubernetesServerRunner = server_app.KubernetesServerRunner +_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner GCLOUD = os.environ.get('GCLOUD', 'gcloud') GCLOUD_CMD_TIMEOUT_S = datetime.timedelta(seconds=5).total_seconds() @@ -226,9 +226,9 @@ def cleanup_client(project, network, k8s_api_manager, resource_prefix, network=network, stats_port=xds_flags.CLIENT_PORT.value) - client_namespace = KubernetesClientRunner.make_namespace_name( + client_namespace = _KubernetesClientRunner.make_namespace_name( resource_prefix, resource_suffix) - client_runner = KubernetesClientRunner( + client_runner = _KubernetesClientRunner( k8s.KubernetesNamespace(k8s_api_manager, client_namespace), **runner_kwargs) @@ -248,9 +248,9 @@ def cleanup_server(project, network, k8s_api_manager, resource_prefix, gcp_service_account=gcp_service_account, network=network) - server_namespace = KubernetesServerRunner.make_namespace_name( + server_namespace = _KubernetesServerRunner.make_namespace_name( resource_prefix, resource_suffix) - server_runner = KubernetesServerRunner( + server_runner = _KubernetesServerRunner( k8s.KubernetesNamespace(k8s_api_manager, server_namespace), **runner_kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py index 016b92f9c0f..c315db6c18f 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py @@ -41,7 +41,7 @@ from framework.helpers import rand from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) # Flags @@ -64,7 +64,8 @@ flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. flags.mark_flag_as_required("resource_suffix") -KubernetesServerRunner = server_app.KubernetesServerRunner +# Type aliases +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-statements @@ -90,7 +91,7 @@ def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-st server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value server_xds_host = xds_flags.SERVER_XDS_HOST.value server_xds_port = xds_flags.SERVER_XDS_PORT.value - server_namespace = KubernetesServerRunner.make_namespace_name( + server_namespace = _KubernetesServerRunner.make_namespace_name( resource_prefix, resource_suffix) gcp_api_manager = gcp.api.GcpApiManager() @@ -110,7 +111,8 @@ def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-st resource_prefix=resource_prefix, resource_suffix=resource_suffix) if server_maintenance_port is None: - server_maintenance_port = KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT + server_maintenance_port = \ + _KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT try: if command in ('create', 'cycle'): diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py b/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py index 044855a04b5..cf7ffd85dff 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py @@ -20,7 +20,7 @@ from framework import xds_flags from framework import xds_k8s_flags from framework.infrastructure import gcp from framework.infrastructure import k8s -from framework.test_app import client_app +from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) # Flags @@ -52,7 +52,7 @@ flags.adopt_module_key_flags(xds_k8s_flags) flags.mark_flag_as_required("resource_suffix") # Type aliases -KubernetesClientRunner = client_app.KubernetesClientRunner +_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner def main(argv): @@ -84,9 +84,9 @@ def main(argv): deployment_template='client-secure.deployment.yaml') k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) - client_namespace = KubernetesClientRunner.make_namespace_name( + client_namespace = _KubernetesClientRunner.make_namespace_name( xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value) - client_runner = KubernetesClientRunner( + client_runner = _KubernetesClientRunner( k8s.KubernetesNamespace(k8s_api_manager, client_namespace), **runner_kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py b/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py index dd183bab287..f49b13a20d4 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py @@ -20,7 +20,7 @@ from framework import xds_flags from framework import xds_k8s_flags from framework.infrastructure import gcp from framework.infrastructure import k8s -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) # Flags @@ -43,7 +43,8 @@ flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. flags.mark_flag_as_required("resource_suffix") -KubernetesServerRunner = server_app.KubernetesServerRunner +# Type aliases +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner def main(argv): @@ -78,9 +79,9 @@ def main(argv): deployment_template='server-secure.deployment.yaml') k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) - server_namespace = KubernetesServerRunner.make_namespace_name( + server_namespace = _KubernetesServerRunner.make_namespace_name( resource_prefix, resource_suffix) - server_runner = KubernetesServerRunner( + server_runner = _KubernetesServerRunner( k8s.KubernetesNamespace(k8s_api_manager, server_namespace), **runner_kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py index 0a197a429d6..4622428d910 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py @@ -18,8 +18,8 @@ from framework import xds_k8s_testcase from framework.helpers import rand as helpers_rand from framework.infrastructure import k8s from framework.infrastructure import traffic_director -from framework.test_app import client_app -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) @@ -27,8 +27,8 @@ logger = logging.getLogger(__name__) TrafficDirectorManager = traffic_director.TrafficDirectorManager XdsTestServer = xds_k8s_testcase.XdsTestServer XdsTestClient = xds_k8s_testcase.XdsTestClient -KubernetesServerRunner = server_app.KubernetesServerRunner -KubernetesClientRunner = client_app.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner class BootstrapGeneratorBaseTest(xds_k8s_testcase.XdsKubernetesBaseTestCase): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 803f27c95bc..b28d5b77ada 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -xDS Test Client. - -TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual -modules. +Provides an interface to xDS Test Client running remotely. """ import datetime import functools @@ -23,13 +20,10 @@ import logging from typing import Iterable, List, Optional from framework.helpers import retryers -from framework.infrastructure import gcp -from framework.infrastructure import k8s import framework.rpc from framework.rpc import grpc_channelz from framework.rpc import grpc_csds from framework.rpc import grpc_testing -from framework.test_app import base_runner logger = logging.getLogger(__name__) @@ -223,171 +217,3 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): if subchannel.data.state.state is state: subchannels.append(subchannel) return subchannels - - -class KubernetesClientRunner(base_runner.KubernetesBaseRunner): - - def __init__( # pylint: disable=too-many-locals - self, - k8s_namespace, - *, - deployment_name, - image_name, - td_bootstrap_image, - gcp_api_manager: gcp.api.GcpApiManager, - gcp_project: str, - gcp_service_account: str, - xds_server_uri=None, - network='default', - service_account_name=None, - stats_port=8079, - deployment_template='client.deployment.yaml', - service_account_template='service-account.yaml', - reuse_namespace=False, - namespace_template=None, - debug_use_port_forwarding=False, - enable_workload_identity=True): - super().__init__(k8s_namespace, namespace_template, reuse_namespace) - - # Settings - self.deployment_name = deployment_name - self.image_name = image_name - self.stats_port = stats_port - # xDS bootstrap generator - self.td_bootstrap_image = td_bootstrap_image - self.xds_server_uri = xds_server_uri - self.network = network - self.deployment_template = deployment_template - self.debug_use_port_forwarding = debug_use_port_forwarding - self.enable_workload_identity = enable_workload_identity - # Service account settings: - # Kubernetes service account - if self.enable_workload_identity: - self.service_account_name = service_account_name or deployment_name - self.service_account_template = service_account_template - else: - self.service_account_name = None - self.service_account_template = None - # GCP. - self.gcp_project = gcp_project - self.gcp_ui_url = gcp_api_manager.gcp_ui_url - # GCP service account to map to Kubernetes service account - self.gcp_service_account = gcp_service_account - # GCP IAM API used to grant allow workload service accounts permission - # to use GCP service account identity. - self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) - - # Mutable state - self.deployment: Optional[k8s.V1Deployment] = None - self.service_account: Optional[k8s.V1ServiceAccount] = None - self.port_forwarder: Optional[k8s.PortForwarder] = None - - # TODO(sergiitk): make rpc UnaryCall enum or get it from proto - def run( # pylint: disable=arguments-differ - self, - *, - server_target, - rpc='UnaryCall', - qps=25, - metadata='', - secure_mode=False, - config_mesh=None, - print_response=False) -> XdsTestClient: - logger.info( - 'Deploying xDS test client "%s" to k8s namespace %s: ' - 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s ' - 'print_response=%s', self.deployment_name, self.k8s_namespace.name, - server_target, rpc, qps, metadata, secure_mode, print_response) - self._logs_explorer_link(deployment_name=self.deployment_name, - namespace_name=self.k8s_namespace.name, - gcp_project=self.gcp_project, - gcp_ui_url=self.gcp_ui_url) - - super().run() - - if self.enable_workload_identity: - # Allow Kubernetes service account to use the GCP service account - # identity. - self._grant_workload_identity_user( - gcp_iam=self.gcp_iam, - gcp_service_account=self.gcp_service_account, - service_account_name=self.service_account_name) - - # Create service account - self.service_account = self._create_service_account( - self.service_account_template, - service_account_name=self.service_account_name, - namespace_name=self.k8s_namespace.name, - gcp_service_account=self.gcp_service_account) - - # Always create a new deployment - self.deployment = self._create_deployment( - self.deployment_template, - deployment_name=self.deployment_name, - image_name=self.image_name, - namespace_name=self.k8s_namespace.name, - service_account_name=self.service_account_name, - td_bootstrap_image=self.td_bootstrap_image, - xds_server_uri=self.xds_server_uri, - network=self.network, - stats_port=self.stats_port, - server_target=server_target, - rpc=rpc, - qps=qps, - metadata=metadata, - secure_mode=secure_mode, - config_mesh=config_mesh, - print_response=print_response) - - self._wait_deployment_with_available_replicas(self.deployment_name) - - # Load test client pod. We need only one client at the moment - pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0] - self._wait_pod_started(pod.metadata.name) - pod_ip = pod.status.pod_ip - rpc_port = self.stats_port - rpc_host = None - - # Experimental, for local debugging. - if self.debug_use_port_forwarding: - logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', - pod_ip, self.stats_port) - self.port_forwarder = self.k8s_namespace.port_forward_pod( - pod, remote_port=self.stats_port) - rpc_port = self.port_forwarder.local_port - rpc_host = self.port_forwarder.local_address - - return XdsTestClient(ip=pod_ip, - rpc_port=rpc_port, - server_target=server_target, - rpc_host=rpc_host) - - def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ - if self.port_forwarder: - self.port_forwarder.close() - self.port_forwarder = None - if self.deployment or force: - self._delete_deployment(self.deployment_name) - self.deployment = None - if self.enable_workload_identity and (self.service_account or force): - self._revoke_workload_identity_user( - gcp_iam=self.gcp_iam, - gcp_service_account=self.gcp_service_account, - service_account_name=self.service_account_name) - self._delete_service_account(self.service_account_name) - self.service_account = None - super().cleanup(force=force_namespace and force) - - @classmethod - def make_namespace_name(cls, - resource_prefix: str, - resource_suffix: str, - name: str = 'client') -> str: - """A helper to make consistent XdsTestClient kubernetes namespace name - for given resource prefix and suffix. - - Note: the idea is to intentionally produce different namespace name for - the test server, and the test client, as that closely mimics real-world - deployments. - """ - return cls._make_namespace_name(resource_prefix, resource_suffix, name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/__init__.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/__init__.py new file mode 100644 index 00000000000..a87ddec1edd --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/base_runner.py new file mode 100644 index 00000000000..ca3e7b0f2b3 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/base_runner.py @@ -0,0 +1,63 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Common functionality for running xDS Test Client and Server remotely. +""" +from abc import ABCMeta +from abc import abstractmethod +from typing import Dict, Optional +import urllib.parse + + +class RunnerError(Exception): + """Error running xDS Test App running remotely.""" + + +class BaseRunner(metaclass=ABCMeta): + + @abstractmethod + def run(self, **kwargs): + pass + + @abstractmethod + def cleanup(self, *, force=False): + pass + + @classmethod + def _logs_explorer_link_from_params( + cls, + *, + gcp_ui_url: str, + gcp_project: str, + query: Dict[str, str], + request: Optional[Dict[str, str]] = None) -> str: + req_merged = {'query': cls._logs_explorer_query(query)} + if request is not None: + req_merged.update(request) + + req = cls._logs_explorer_request(req_merged) + return f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}' + + @classmethod + def _logs_explorer_query(cls, query: Dict[str, str]) -> str: + return '\n'.join(f'{k}="{v}"' for k, v in query.items()) + + @classmethod + def _logs_explorer_request(cls, req: Dict[str, str]) -> str: + return ';'.join( + f'{k}={cls._logs_explorer_quote(v)}' for k, v in req.items()) + + @classmethod + def _logs_explorer_quote(cls, value: str) -> str: + return urllib.parse.quote_plus(value, safe=':') diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/__init__.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/__init__.py new file mode 100644 index 00000000000..a87ddec1edd --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py similarity index 82% rename from tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py rename to tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py index 2623b726fe0..ec45b4b32c0 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py @@ -1,4 +1,4 @@ -# Copyright 2020 gRPC authors. +# Copyright 2022 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,12 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +Common functionality for running xDS Test Client and Server on Kubernetes. +""" import contextlib import datetime import logging import pathlib -from typing import Dict, Optional -import urllib.parse +from typing import Optional import mako.template import yaml @@ -25,34 +27,20 @@ import framework.helpers.datetime import framework.helpers.highlighter from framework.infrastructure import gcp from framework.infrastructure import k8s +from framework.test_app.runners import base_runner logger = logging.getLogger(__name__) # Type aliases +_RunnerError = base_runner.RunnerError _HighlighterYaml = framework.helpers.highlighter.HighlighterYaml _helper_datetime = framework.helpers.datetime -timedelta = datetime.timedelta +_timedelta = datetime.timedelta -def _logs_explorer_query(query: Dict[str, str]) -> str: - return '\n'.join(f'{k}="{v}"' for k, v in query.items()) - - -def _logs_explorer_request(req: Dict[str, str]) -> str: - return ';'.join(f'{k}={_logs_explorer_quote(v)}' for k, v in req.items()) - - -def _logs_explorer_quote(value: str): - return urllib.parse.quote_plus(value, safe=':') - - -class RunnerError(Exception): - """Error running app""" - - -class KubernetesBaseRunner: +class KubernetesBaseRunner(base_runner.BaseRunner): TEMPLATE_DIR_NAME = 'kubernetes-manifests' - TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}' + TEMPLATE_DIR_RELATIVE_PATH = f'../../../../{TEMPLATE_DIR_NAME}' ROLE_WORKLOAD_IDENTITY_USER = 'roles/iam.workloadIdentityUser' def __init__(self, @@ -82,20 +70,20 @@ class KubernetesBaseRunner: self.delete_namespace() self.namespace = None - @staticmethod - def _render_template(template_file, **kwargs): + @classmethod + def _render_template(cls, template_file, **kwargs): template = mako.template.Template(filename=str(template_file)) return template.render(**kwargs) - @staticmethod - def _manifests_from_yaml_file(yaml_file): + @classmethod + def _manifests_from_yaml_file(cls, yaml_file): with open(yaml_file) as f: with contextlib.closing(yaml.safe_load_all(f)) as yml: for manifest in yml: yield manifest - @staticmethod - def _manifests_from_str(document): + @classmethod + def _manifests_from_str(cls, document): with contextlib.closing(yaml.safe_load_all(document)) as yml: for manifest in yml: yield manifest @@ -118,12 +106,12 @@ class KubernetesBaseRunner: manifest = next(manifests) # Error out on multi-document yaml if next(manifests, False): - raise RunnerError('Exactly one document expected in manifest ' - f'{template_file}') + raise _RunnerError('Exactly one document expected in manifest ' + f'{template_file}') k8s_objects = self.k8s_namespace.apply_manifest(manifest) if len(k8s_objects) != 1: - raise RunnerError('Expected exactly one object must created from ' - f'manifest {template_file}') + raise _RunnerError('Expected exactly one object must created from ' + f'manifest {template_file}') logger.info('%s %s created', k8s_objects[0].kind, k8s_objects[0].metadata.name) @@ -145,18 +133,18 @@ class KubernetesBaseRunner: def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace: namespace = self._create_from_template(template, **kwargs) if not isinstance(namespace, k8s.V1Namespace): - raise RunnerError('Expected V1Namespace to be created ' - f'from manifest {template}') + raise _RunnerError('Expected V1Namespace to be created ' + f'from manifest {template}') if namespace.metadata.name != kwargs['namespace_name']: - raise RunnerError('V1Namespace created with unexpected name: ' - f'{namespace.metadata.name}') + raise _RunnerError('V1Namespace created with unexpected name: ' + f'{namespace.metadata.name}') logger.debug('V1Namespace %s created at %s', namespace.metadata.self_link, namespace.metadata.creation_timestamp) return namespace - @staticmethod - def _get_workload_identity_member_name(project, namespace_name, + @classmethod + def _get_workload_identity_member_name(cls, project, namespace_name, service_account_name): """ Returns workload identity member name used to authenticate Kubernetes @@ -199,11 +187,11 @@ class KubernetesBaseRunner: **kwargs) -> k8s.V1ServiceAccount: resource = self._create_from_template(template, **kwargs) if not isinstance(resource, k8s.V1ServiceAccount): - raise RunnerError('Expected V1ServiceAccount to be created ' - f'from manifest {template}') + raise _RunnerError('Expected V1ServiceAccount to be created ' + f'from manifest {template}') if resource.metadata.name != kwargs['service_account_name']: - raise RunnerError('V1ServiceAccount created with unexpected name: ' - f'{resource.metadata.name}') + raise _RunnerError('V1ServiceAccount created with unexpected name: ' + f'{resource.metadata.name}') logger.debug('V1ServiceAccount %s created at %s', resource.metadata.self_link, resource.metadata.creation_timestamp) @@ -212,11 +200,11 @@ class KubernetesBaseRunner: def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment: deployment = self._create_from_template(template, **kwargs) if not isinstance(deployment, k8s.V1Deployment): - raise RunnerError('Expected V1Deployment to be created ' - f'from manifest {template}') + raise _RunnerError('Expected V1Deployment to be created ' + f'from manifest {template}') if deployment.metadata.name != kwargs['deployment_name']: - raise RunnerError('V1Deployment created with unexpected name: ' - f'{deployment.metadata.name}') + raise _RunnerError('V1Deployment created with unexpected name: ' + f'{deployment.metadata.name}') logger.debug('V1Deployment %s created at %s', deployment.metadata.self_link, deployment.metadata.creation_timestamp) @@ -225,11 +213,11 @@ class KubernetesBaseRunner: def _create_service(self, template, **kwargs) -> k8s.V1Service: service = self._create_from_template(template, **kwargs) if not isinstance(service, k8s.V1Service): - raise RunnerError('Expected V1Service to be created ' - f'from manifest {template}') + raise _RunnerError('Expected V1Service to be created ' + f'from manifest {template}') if service.metadata.name != kwargs['service_name']: - raise RunnerError('V1Service created with unexpected name: ' - f'{service.metadata.name}') + raise _RunnerError('V1Service created with unexpected name: ' + f'{service.metadata.name}') logger.debug('V1Service %s created at %s', service.metadata.self_link, service.metadata.creation_timestamp) return service @@ -311,36 +299,36 @@ class KubernetesBaseRunner: logger.info("Service %s: detected NEG=%s in zones=%s", name, neg_name, neg_zones) - @staticmethod - def _logs_explorer_link(*, + @classmethod + def _logs_explorer_link(cls, + *, deployment_name: str, namespace_name: str, gcp_project: str, gcp_ui_url: str, - end_delta: Optional[timedelta] = None) -> None: + end_delta: Optional[_timedelta] = None) -> None: """Output the link to test server/client logs in GCP Logs Explorer.""" if end_delta is None: - end_delta = timedelta(hours=1) - + end_delta = _timedelta(hours=1) time_now = _helper_datetime.iso8601_utc_time() time_end = _helper_datetime.iso8601_utc_time(end_delta) - query = _logs_explorer_query({ + request = {'timeRange': f'{time_now}/{time_end}'} + query = { 'resource.type': 'k8s_container', 'resource.labels.project_id': gcp_project, 'resource.labels.container_name': deployment_name, 'resource.labels.namespace_name': namespace_name, - }) - req = _logs_explorer_request({ - 'query': query, - 'timeRange': f'{time_now}/{time_end}', - }) + } - link = f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}' + link = cls._logs_explorer_link_from_params(gcp_ui_url=gcp_ui_url, + gcp_project=gcp_project, + query=query, + request=request) # A whitespace at the end to indicate the end of the url. logger.info("GCP Logs Explorer link to %s:\n%s ", deployment_name, link) - @staticmethod - def _make_namespace_name(resource_prefix: str, resource_suffix: str, + @classmethod + def _make_namespace_name(cls, resource_prefix: str, resource_suffix: str, name: str) -> str: """A helper to make consistent test app kubernetes namespace name for given resource prefix and suffix.""" diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py new file mode 100644 index 00000000000..591022608b2 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -0,0 +1,194 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Run xDS Test Client on Kubernetes. +""" + +import logging +from typing import Optional + +from framework.infrastructure import gcp +from framework.infrastructure import k8s +from framework.test_app.client_app import XdsTestClient +from framework.test_app.runners.k8s import k8s_base_runner + +logger = logging.getLogger(__name__) + + +class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): + + def __init__( # pylint: disable=too-many-locals + self, + k8s_namespace, + *, + deployment_name, + image_name, + td_bootstrap_image, + gcp_api_manager: gcp.api.GcpApiManager, + gcp_project: str, + gcp_service_account: str, + xds_server_uri=None, + network='default', + service_account_name=None, + stats_port=8079, + deployment_template='client.deployment.yaml', + service_account_template='service-account.yaml', + reuse_namespace=False, + namespace_template=None, + debug_use_port_forwarding=False, + enable_workload_identity=True): + super().__init__(k8s_namespace, namespace_template, reuse_namespace) + + # Settings + self.deployment_name = deployment_name + self.image_name = image_name + self.stats_port = stats_port + # xDS bootstrap generator + self.td_bootstrap_image = td_bootstrap_image + self.xds_server_uri = xds_server_uri + self.network = network + self.deployment_template = deployment_template + self.debug_use_port_forwarding = debug_use_port_forwarding + self.enable_workload_identity = enable_workload_identity + # Service account settings: + # Kubernetes service account + if self.enable_workload_identity: + self.service_account_name = service_account_name or deployment_name + self.service_account_template = service_account_template + else: + self.service_account_name = None + self.service_account_template = None + # GCP. + self.gcp_project = gcp_project + self.gcp_ui_url = gcp_api_manager.gcp_ui_url + # GCP service account to map to Kubernetes service account + self.gcp_service_account = gcp_service_account + # GCP IAM API used to grant allow workload service accounts permission + # to use GCP service account identity. + self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) + + # Mutable state + self.deployment: Optional[k8s.V1Deployment] = None + self.service_account: Optional[k8s.V1ServiceAccount] = None + self.port_forwarder: Optional[k8s.PortForwarder] = None + + # TODO(sergiitk): make rpc UnaryCall enum or get it from proto + def run( # pylint: disable=arguments-differ + self, + *, + server_target, + rpc='UnaryCall', + qps=25, + metadata='', + secure_mode=False, + config_mesh=None, + print_response=False) -> XdsTestClient: + logger.info( + 'Deploying xDS test client "%s" to k8s namespace %s: ' + 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s ' + 'print_response=%s', self.deployment_name, self.k8s_namespace.name, + server_target, rpc, qps, metadata, secure_mode, print_response) + self._logs_explorer_link(deployment_name=self.deployment_name, + namespace_name=self.k8s_namespace.name, + gcp_project=self.gcp_project, + gcp_ui_url=self.gcp_ui_url) + + super().run() + + if self.enable_workload_identity: + # Allow Kubernetes service account to use the GCP service account + # identity. + self._grant_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) + + # Create service account + self.service_account = self._create_service_account( + self.service_account_template, + service_account_name=self.service_account_name, + namespace_name=self.k8s_namespace.name, + gcp_service_account=self.gcp_service_account) + + # Always create a new deployment + self.deployment = self._create_deployment( + self.deployment_template, + deployment_name=self.deployment_name, + image_name=self.image_name, + namespace_name=self.k8s_namespace.name, + service_account_name=self.service_account_name, + td_bootstrap_image=self.td_bootstrap_image, + xds_server_uri=self.xds_server_uri, + network=self.network, + stats_port=self.stats_port, + server_target=server_target, + rpc=rpc, + qps=qps, + metadata=metadata, + secure_mode=secure_mode, + config_mesh=config_mesh, + print_response=print_response) + + self._wait_deployment_with_available_replicas(self.deployment_name) + + # Load test client pod. We need only one client at the moment + pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0] + self._wait_pod_started(pod.metadata.name) + pod_ip = pod.status.pod_ip + rpc_port = self.stats_port + rpc_host = None + + # Experimental, for local debugging. + if self.debug_use_port_forwarding: + logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', + pod_ip, self.stats_port) + self.port_forwarder = self.k8s_namespace.port_forward_pod( + pod, remote_port=self.stats_port) + rpc_port = self.port_forwarder.local_port + rpc_host = self.port_forwarder.local_address + + return XdsTestClient(ip=pod_ip, + rpc_port=rpc_port, + server_target=server_target, + rpc_host=rpc_host) + + def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ + if self.port_forwarder: + self.port_forwarder.close() + self.port_forwarder = None + if self.deployment or force: + self._delete_deployment(self.deployment_name) + self.deployment = None + if self.enable_workload_identity and (self.service_account or force): + self._revoke_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) + self._delete_service_account(self.service_account_name) + self.service_account = None + super().cleanup(force=force_namespace and force) + + @classmethod + def make_namespace_name(cls, + resource_prefix: str, + resource_suffix: str, + name: str = 'client') -> str: + """A helper to make consistent XdsTestClient kubernetes namespace name + for given resource prefix and suffix. + + Note: the idea is to intentionally produce different namespace name for + the test server, and the test client, as that closely mimics real-world + deployments. + """ + return cls._make_namespace_name(resource_prefix, resource_suffix, name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py new file mode 100644 index 00000000000..e10114841ba --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -0,0 +1,252 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Run xDS Test Client on Kubernetes. +""" +import logging +from typing import List, Optional + +from framework.infrastructure import gcp +from framework.infrastructure import k8s +from framework.test_app.runners.k8s import k8s_base_runner +from framework.test_app.server_app import XdsTestServer + +logger = logging.getLogger(__name__) + + +class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): + DEFAULT_TEST_PORT = 8080 + DEFAULT_MAINTENANCE_PORT = 8080 + DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 + + def __init__( # pylint: disable=too-many-locals + self, + k8s_namespace, + *, + deployment_name, + image_name, + td_bootstrap_image, + gcp_api_manager: gcp.api.GcpApiManager, + gcp_project: str, + gcp_service_account: str, + service_account_name=None, + service_name=None, + neg_name=None, + xds_server_uri=None, + network='default', + deployment_template='server.deployment.yaml', + service_account_template='service-account.yaml', + service_template='server.service.yaml', + reuse_service=False, + reuse_namespace=False, + namespace_template=None, + debug_use_port_forwarding=False, + enable_workload_identity=True): + super().__init__(k8s_namespace, namespace_template, reuse_namespace) + + # Settings + self.deployment_name = deployment_name + self.image_name = image_name + self.service_name = service_name or deployment_name + # xDS bootstrap generator + self.td_bootstrap_image = td_bootstrap_image + self.xds_server_uri = xds_server_uri + # This only works in k8s >= 1.18.10-gke.600 + # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs + self.neg_name = neg_name or (f'{self.k8s_namespace.name}-' + f'{self.service_name}') + self.network = network + self.deployment_template = deployment_template + self.service_template = service_template + self.reuse_service = reuse_service + self.debug_use_port_forwarding = debug_use_port_forwarding + self.enable_workload_identity = enable_workload_identity + # Service account settings: + # Kubernetes service account + if self.enable_workload_identity: + self.service_account_name = service_account_name or deployment_name + self.service_account_template = service_account_template + else: + self.service_account_name = None + self.service_account_template = None + + # GCP. + self.gcp_project = gcp_project + self.gcp_ui_url = gcp_api_manager.gcp_ui_url + # GCP service account to map to Kubernetes service account + self.gcp_service_account = gcp_service_account + # GCP IAM API used to grant allow workload service accounts permission + # to use GCP service account identity. + self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) + + # Mutable state + self.deployment: Optional[k8s.V1Deployment] = None + self.service_account: Optional[k8s.V1ServiceAccount] = None + self.service: Optional[k8s.V1Service] = None + self.port_forwarders: List[k8s.PortForwarder] = [] + + def run( # pylint: disable=arguments-differ + self, + *, + test_port=DEFAULT_TEST_PORT, + maintenance_port=None, + secure_mode=False, + server_id=None, + replica_count=1) -> List[XdsTestServer]: + # Implementation detail: in secure mode, maintenance ("backchannel") + # port must be different from the test port so communication with + # maintenance services can be reached independently from the security + # configuration under test. + if maintenance_port is None: + if not secure_mode: + maintenance_port = self.DEFAULT_MAINTENANCE_PORT + else: + maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT + + if secure_mode and maintenance_port == test_port: + raise ValueError('port and maintenance_port must be different ' + 'when running test server in secure mode') + # To avoid bugs with comparing wrong types. + if not (isinstance(test_port, int) and + isinstance(maintenance_port, int)): + raise TypeError('Port numbers must be integer') + + if secure_mode and not self.enable_workload_identity: + raise ValueError('Secure mode requires Workload Identity enabled.') + + logger.info( + 'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s ' + 'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s', + self.deployment_name, self.k8s_namespace.name, test_port, + maintenance_port, secure_mode, server_id, replica_count) + self._logs_explorer_link(deployment_name=self.deployment_name, + namespace_name=self.k8s_namespace.name, + gcp_project=self.gcp_project, + gcp_ui_url=self.gcp_ui_url) + + # Create namespace. + super().run() + + # Reuse existing if requested, create a new deployment when missing. + # Useful for debugging to avoid NEG loosing relation to deleted service. + if self.reuse_service: + self.service = self._reuse_service(self.service_name) + if not self.service: + self.service = self._create_service( + self.service_template, + service_name=self.service_name, + namespace_name=self.k8s_namespace.name, + deployment_name=self.deployment_name, + neg_name=self.neg_name, + test_port=test_port) + self._wait_service_neg(self.service_name, test_port) + + if self.enable_workload_identity: + # Allow Kubernetes service account to use the GCP service account + # identity. + self._grant_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) + + # Create service account + self.service_account = self._create_service_account( + self.service_account_template, + service_account_name=self.service_account_name, + namespace_name=self.k8s_namespace.name, + gcp_service_account=self.gcp_service_account) + + # Always create a new deployment + self.deployment = self._create_deployment( + self.deployment_template, + deployment_name=self.deployment_name, + image_name=self.image_name, + namespace_name=self.k8s_namespace.name, + service_account_name=self.service_account_name, + td_bootstrap_image=self.td_bootstrap_image, + xds_server_uri=self.xds_server_uri, + network=self.network, + replica_count=replica_count, + test_port=test_port, + maintenance_port=maintenance_port, + server_id=server_id, + secure_mode=secure_mode) + + self._wait_deployment_with_available_replicas(self.deployment_name, + replica_count) + + # Wait for pods running + pods = self.k8s_namespace.list_deployment_pods(self.deployment) + + servers = [] + for pod in pods: + pod_name = pod.metadata.name + self._wait_pod_started(pod_name) + + pod_ip = pod.status.pod_ip + rpc_host = None + # Experimental, for local debugging. + local_port = maintenance_port + if self.debug_use_port_forwarding: + logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', + pod_ip, maintenance_port) + port_forwarder = self.k8s_namespace.port_forward_pod( + pod, remote_port=maintenance_port) + self.port_forwarders.append(port_forwarder) + local_port = port_forwarder.local_port + rpc_host = port_forwarder.local_address + + servers.append( + XdsTestServer(ip=pod_ip, + rpc_port=test_port, + maintenance_port=local_port, + secure_mode=secure_mode, + server_id=server_id, + rpc_host=rpc_host, + pod_name=pod_name)) + return servers + + def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ + if self.port_forwarders: + for port_forwarder in self.port_forwarders: + port_forwarder.close() + self.port_forwarders = [] + if self.deployment or force: + self._delete_deployment(self.deployment_name) + self.deployment = None + if (self.service and not self.reuse_service) or force: + self._delete_service(self.service_name) + self.service = None + if self.enable_workload_identity and (self.service_account or force): + self._revoke_workload_identity_user( + gcp_iam=self.gcp_iam, + gcp_service_account=self.gcp_service_account, + service_account_name=self.service_account_name) + self._delete_service_account(self.service_account_name) + self.service_account = None + super().cleanup(force=(force_namespace and force)) + + @classmethod + def make_namespace_name(cls, + resource_prefix: str, + resource_suffix: str, + name: str = 'server') -> str: + """A helper to make consistent XdsTestServer kubernetes namespace name + for given resource prefix and suffix. + + Note: the idea is to intentionally produce different namespace name for + the test server, and the test client, as that closely mimics real-world + deployments. + """ + return cls._make_namespace_name(resource_prefix, resource_suffix, name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index dbeea70918a..8349c6bb6a3 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -12,21 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -xDS Test Server. - -TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual -modules. +Provides an interface to xDS Test Server running remotely. """ import functools import logging -from typing import Iterator, List, Optional +from typing import Iterator, Optional -from framework.infrastructure import gcp -from framework.infrastructure import k8s import framework.rpc from framework.rpc import grpc_channelz from framework.rpc import grpc_testing -from framework.test_app import base_runner logger = logging.getLogger(__name__) @@ -150,231 +144,3 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp): self.channelz.sock_addresses_pretty(server_socket), self.channelz.sock_addresses_pretty(client_socket)) return server_socket - - -class KubernetesServerRunner(base_runner.KubernetesBaseRunner): - DEFAULT_TEST_PORT = 8080 - DEFAULT_MAINTENANCE_PORT = 8080 - DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 - - def __init__( # pylint: disable=too-many-locals - self, - k8s_namespace, - *, - deployment_name, - image_name, - td_bootstrap_image, - gcp_api_manager: gcp.api.GcpApiManager, - gcp_project: str, - gcp_service_account: str, - service_account_name=None, - service_name=None, - neg_name=None, - xds_server_uri=None, - network='default', - deployment_template='server.deployment.yaml', - service_account_template='service-account.yaml', - service_template='server.service.yaml', - reuse_service=False, - reuse_namespace=False, - namespace_template=None, - debug_use_port_forwarding=False, - enable_workload_identity=True): - super().__init__(k8s_namespace, namespace_template, reuse_namespace) - - # Settings - self.deployment_name = deployment_name - self.image_name = image_name - self.service_name = service_name or deployment_name - # xDS bootstrap generator - self.td_bootstrap_image = td_bootstrap_image - self.xds_server_uri = xds_server_uri - # This only works in k8s >= 1.18.10-gke.600 - # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs - self.neg_name = neg_name or (f'{self.k8s_namespace.name}-' - f'{self.service_name}') - self.network = network - self.deployment_template = deployment_template - self.service_template = service_template - self.reuse_service = reuse_service - self.debug_use_port_forwarding = debug_use_port_forwarding - self.enable_workload_identity = enable_workload_identity - # Service account settings: - # Kubernetes service account - if self.enable_workload_identity: - self.service_account_name = service_account_name or deployment_name - self.service_account_template = service_account_template - else: - self.service_account_name = None - self.service_account_template = None - - # GCP. - self.gcp_project = gcp_project - self.gcp_ui_url = gcp_api_manager.gcp_ui_url - # GCP service account to map to Kubernetes service account - self.gcp_service_account = gcp_service_account - # GCP IAM API used to grant allow workload service accounts permission - # to use GCP service account identity. - self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) - - # Mutable state - self.deployment: Optional[k8s.V1Deployment] = None - self.service_account: Optional[k8s.V1ServiceAccount] = None - self.service: Optional[k8s.V1Service] = None - self.port_forwarders: List[k8s.PortForwarder] = [] - - def run( # pylint: disable=arguments-differ - self, - *, - test_port=DEFAULT_TEST_PORT, - maintenance_port=None, - secure_mode=False, - server_id=None, - replica_count=1) -> List[XdsTestServer]: - # Implementation detail: in secure mode, maintenance ("backchannel") - # port must be different from the test port so communication with - # maintenance services can be reached independently from the security - # configuration under test. - if maintenance_port is None: - if not secure_mode: - maintenance_port = self.DEFAULT_MAINTENANCE_PORT - else: - maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT - - if secure_mode and maintenance_port == test_port: - raise ValueError('port and maintenance_port must be different ' - 'when running test server in secure mode') - # To avoid bugs with comparing wrong types. - if not (isinstance(test_port, int) and - isinstance(maintenance_port, int)): - raise TypeError('Port numbers must be integer') - - if secure_mode and not self.enable_workload_identity: - raise ValueError('Secure mode requires Workload Identity enabled.') - - logger.info( - 'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s ' - 'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s', - self.deployment_name, self.k8s_namespace.name, test_port, - maintenance_port, secure_mode, server_id, replica_count) - self._logs_explorer_link(deployment_name=self.deployment_name, - namespace_name=self.k8s_namespace.name, - gcp_project=self.gcp_project, - gcp_ui_url=self.gcp_ui_url) - - # Create namespace. - super().run() - - # Reuse existing if requested, create a new deployment when missing. - # Useful for debugging to avoid NEG loosing relation to deleted service. - if self.reuse_service: - self.service = self._reuse_service(self.service_name) - if not self.service: - self.service = self._create_service( - self.service_template, - service_name=self.service_name, - namespace_name=self.k8s_namespace.name, - deployment_name=self.deployment_name, - neg_name=self.neg_name, - test_port=test_port) - self._wait_service_neg(self.service_name, test_port) - - if self.enable_workload_identity: - # Allow Kubernetes service account to use the GCP service account - # identity. - self._grant_workload_identity_user( - gcp_iam=self.gcp_iam, - gcp_service_account=self.gcp_service_account, - service_account_name=self.service_account_name) - - # Create service account - self.service_account = self._create_service_account( - self.service_account_template, - service_account_name=self.service_account_name, - namespace_name=self.k8s_namespace.name, - gcp_service_account=self.gcp_service_account) - - # Always create a new deployment - self.deployment = self._create_deployment( - self.deployment_template, - deployment_name=self.deployment_name, - image_name=self.image_name, - namespace_name=self.k8s_namespace.name, - service_account_name=self.service_account_name, - td_bootstrap_image=self.td_bootstrap_image, - xds_server_uri=self.xds_server_uri, - network=self.network, - replica_count=replica_count, - test_port=test_port, - maintenance_port=maintenance_port, - server_id=server_id, - secure_mode=secure_mode) - - self._wait_deployment_with_available_replicas(self.deployment_name, - replica_count) - - # Wait for pods running - pods = self.k8s_namespace.list_deployment_pods(self.deployment) - - servers = [] - for pod in pods: - pod_name = pod.metadata.name - self._wait_pod_started(pod_name) - - pod_ip = pod.status.pod_ip - rpc_host = None - # Experimental, for local debugging. - local_port = maintenance_port - if self.debug_use_port_forwarding: - logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', - pod_ip, maintenance_port) - port_forwarder = self.k8s_namespace.port_forward_pod( - pod, remote_port=maintenance_port) - self.port_forwarders.append(port_forwarder) - local_port = port_forwarder.local_port - rpc_host = port_forwarder.local_address - - servers.append( - XdsTestServer(ip=pod_ip, - rpc_port=test_port, - maintenance_port=local_port, - secure_mode=secure_mode, - server_id=server_id, - rpc_host=rpc_host, - pod_name=pod_name)) - return servers - - def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ - if self.port_forwarders: - for port_forwarder in self.port_forwarders: - port_forwarder.close() - self.port_forwarders = [] - if self.deployment or force: - self._delete_deployment(self.deployment_name) - self.deployment = None - if (self.service and not self.reuse_service) or force: - self._delete_service(self.service_name) - self.service = None - if self.enable_workload_identity and (self.service_account or force): - self._revoke_workload_identity_user( - gcp_iam=self.gcp_iam, - gcp_service_account=self.gcp_service_account, - service_account_name=self.service_account_name) - self._delete_service_account(self.service_account_name) - self.service_account = None - super().cleanup(force=(force_namespace and force)) - - @classmethod - def make_namespace_name(cls, - resource_prefix: str, - resource_suffix: str, - name: str = 'server') -> str: - """A helper to make consistent XdsTestServer kubernetes namespace name - for given resource prefix and suffix. - - Note: the idea is to intentionally produce different namespace name for - the test server, and the test client, as that closely mimics real-world - deployments. - :rtype: object - """ - return cls._make_namespace_name(resource_prefix, resource_suffix, name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index 5730e0e34d7..61df158b10e 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -40,6 +40,8 @@ from framework.rpc import grpc_csds from framework.rpc import grpc_testing from framework.test_app import client_app from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) # TODO(yashkt): We will no longer need this flag once Core exposes local certs @@ -57,8 +59,8 @@ TrafficDirectorAppNetManager = traffic_director.TrafficDirectorAppNetManager TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager XdsTestServer = server_app.XdsTestServer XdsTestClient = client_app.XdsTestClient -KubernetesServerRunner = server_app.KubernetesServerRunner -KubernetesClientRunner = client_app.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse _ChannelState = grpc_channelz.ChannelState _timedelta = datetime.timedelta diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py index 08fe6d5f5c1..180c7ca52e1 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py @@ -26,8 +26,8 @@ import framework.helpers.rand from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director -from framework.test_app import client_app -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) @@ -38,6 +38,8 @@ STRATEGY = flags.DEFINE_enum('strategy', help='Strategy of GCP resources management') # Type alias +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner UrlMapType = Any HostRule = Any PathMatcher = Any @@ -165,7 +167,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager, self.resource_prefix) # Kubernetes Test Servers - self.test_server_runner = server_app.KubernetesServerRunner( + self.test_server_runner = _KubernetesServerRunner( self.k8s_namespace, deployment_name=self.server_name, image_name=self.server_image, @@ -176,7 +178,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): xds_server_uri=self.xds_server_uri, network=self.network, enable_workload_identity=self.enable_workload_identity) - self.test_server_alternative_runner = server_app.KubernetesServerRunner( + self.test_server_alternative_runner = _KubernetesServerRunner( self.k8s_namespace, deployment_name=self.server_name + '-alternative', image_name=self.server_image, @@ -188,7 +190,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): network=self.network, enable_workload_identity=self.enable_workload_identity, reuse_namespace=True) - self.test_server_affinity_runner = server_app.KubernetesServerRunner( + self.test_server_affinity_runner = _KubernetesServerRunner( self.k8s_namespace, deployment_name=self.server_name + '-affinity', image_name=self.server_image, @@ -211,11 +213,10 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): logging.info('GcpResourceManager: client_namespace_suffix=%s', client_namespace_suffix) # Kubernetes Test Client - return client_app.KubernetesClientRunner( - k8s.KubernetesNamespace( - self.k8s_api_manager, - client_app.KubernetesClientRunner.make_namespace_name( - self.resource_prefix, client_namespace_suffix)), + namespace_name = _KubernetesClientRunner.make_namespace_name( + self.resource_prefix, client_namespace_suffix) + return _KubernetesClientRunner( + k8s.KubernetesNamespace(self.k8s_api_manager, namespace_name), deployment_name=self.client_name, image_name=self.client_image, gcp_project=self.project, diff --git a/tools/run_tests/xds_k8s_test_driver/tests/bootstrap_generator_test.py b/tools/run_tests/xds_k8s_test_driver/tests/bootstrap_generator_test.py index 22a13aecc87..3fe80085b79 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/bootstrap_generator_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/bootstrap_generator_test.py @@ -22,8 +22,8 @@ from absl.testing import parameterized from framework import bootstrap_generator_testcase from framework import xds_k8s_testcase from framework.helpers import retryers -from framework.test_app import client_app -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -31,8 +31,8 @@ flags.adopt_module_key_flags(xds_k8s_testcase) # Type aliases XdsTestServer = xds_k8s_testcase.XdsTestServer XdsTestClient = xds_k8s_testcase.XdsTestClient -KubernetesServerRunner = server_app.KubernetesServerRunner -KubernetesClientRunner = client_app.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner _timedelta = datetime.timedelta diff --git a/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py index 8c382a44ff6..d7814c9136b 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py @@ -19,7 +19,7 @@ from absl.testing import absltest from framework import xds_k8s_testcase from framework.infrastructure import k8s -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -27,6 +27,7 @@ flags.adopt_module_key_flags(xds_k8s_testcase) # Type aliases _XdsTestServer = xds_k8s_testcase.XdsTestServer _XdsTestClient = xds_k8s_testcase.XdsTestClient +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): @@ -35,7 +36,7 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): super().setUp() self.alternate_k8s_namespace = k8s.KubernetesNamespace( self.k8s_api_manager, self.server_namespace) - self.alternate_server_runner = server_app.KubernetesServerRunner( + self.alternate_server_runner = _KubernetesServerRunner( self.alternate_k8s_namespace, deployment_name=self.server_name + '-alt', image_name=self.server_image, diff --git a/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py index f9388c2280d..aa3926f82d6 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py @@ -19,7 +19,7 @@ from absl.testing import absltest from framework import xds_k8s_testcase from framework.infrastructure import k8s -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -27,6 +27,7 @@ flags.adopt_module_key_flags(xds_k8s_testcase) # Type aliases _XdsTestServer = xds_k8s_testcase.XdsTestServer _XdsTestClient = xds_k8s_testcase.XdsTestClient +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): @@ -35,7 +36,7 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): def setUp(self): super().setUp() - self.secondary_server_runner = server_app.KubernetesServerRunner( + self.secondary_server_runner = _KubernetesServerRunner( k8s.KubernetesNamespace(self.secondary_k8s_api_manager, self.server_namespace), deployment_name=self.server_name + '-alt', diff --git a/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py index 9fb44088e99..8b1a5a4b4d4 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py @@ -19,7 +19,7 @@ from absl.testing import absltest from framework import xds_k8s_testcase from framework.infrastructure import k8s -from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -27,13 +27,14 @@ flags.adopt_module_key_flags(xds_k8s_testcase) # Type aliases _XdsTestServer = xds_k8s_testcase.XdsTestServer _XdsTestClient = xds_k8s_testcase.XdsTestClient +_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): def setUp(self): super().setUp() - self.alternate_server_runner = server_app.KubernetesServerRunner( + self.alternate_server_runner = _KubernetesServerRunner( k8s.KubernetesNamespace(self.k8s_api_manager, self.server_namespace), deployment_name=self.server_name + '-alt',