xDS interop: Move k8s-specific logic out of the test app (#30591)

Separates xDS Test Client/Server (represent an interface to corresponding workload running remotely) from their runners (kubernetes-specific logic to provision the workloads with prerequisites).

This is a refactoring, should not change the behavior.
pull/30596/head
Sergii Tkachenko 2 years ago committed by GitHub
parent 103f4c2f1e
commit 3817db13b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      tools/run_tests/xds_k8s_test_driver/README.md
  2. 16
      tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py
  3. 10
      tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py
  4. 8
      tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py
  5. 9
      tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
  6. 8
      tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py
  7. 176
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  8. 13
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/__init__.py
  9. 63
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/base_runner.py
  10. 13
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/__init__.py
  11. 116
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
  12. 194
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
  13. 252
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
  14. 238
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  15. 6
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  16. 21
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py
  17. 8
      tools/run_tests/xds_k8s_test_driver/tests/bootstrap_generator_test.py
  18. 5
      tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py
  19. 5
      tools/run_tests/xds_k8s_test_driver/tests/failover_test.py
  20. 5
      tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py

@ -16,7 +16,7 @@ changes to this codebase at the moment.
simpler CRUD
- [x] Security: manage `roles/iam.workloadIdentityUser` role grant lifecycle for
dynamically-named namespaces
- [ ] Restructure `framework.test_app` and `framework.xds_k8s*` into a module
- [x] Restructure `framework.test_app` and `framework.xds_k8s*` into a module
containing xDS-interop-specific logic
- [ ] Address inline TODOs in code
- [x] Improve README.md documentation, explain helpers in bin/ folder

@ -42,13 +42,13 @@ from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
Json = Any
KubernetesClientRunner = client_app.KubernetesClientRunner
KubernetesServerRunner = server_app.KubernetesServerRunner
_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
GCLOUD = os.environ.get('GCLOUD', 'gcloud')
GCLOUD_CMD_TIMEOUT_S = datetime.timedelta(seconds=5).total_seconds()
@ -226,9 +226,9 @@ def cleanup_client(project, network, k8s_api_manager, resource_prefix,
network=network,
stats_port=xds_flags.CLIENT_PORT.value)
client_namespace = KubernetesClientRunner.make_namespace_name(
client_namespace = _KubernetesClientRunner.make_namespace_name(
resource_prefix, resource_suffix)
client_runner = KubernetesClientRunner(
client_runner = _KubernetesClientRunner(
k8s.KubernetesNamespace(k8s_api_manager, client_namespace),
**runner_kwargs)
@ -248,9 +248,9 @@ def cleanup_server(project, network, k8s_api_manager, resource_prefix,
gcp_service_account=gcp_service_account,
network=network)
server_namespace = KubernetesServerRunner.make_namespace_name(
server_namespace = _KubernetesServerRunner.make_namespace_name(
resource_prefix, resource_suffix)
server_runner = KubernetesServerRunner(
server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(k8s_api_manager, server_namespace),
**runner_kwargs)

@ -41,7 +41,7 @@ from framework.helpers import rand
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
# Flags
@ -64,7 +64,8 @@ flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
KubernetesServerRunner = server_app.KubernetesServerRunner
# Type aliases
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
@ -90,7 +91,7 @@ def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-st
server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
server_xds_host = xds_flags.SERVER_XDS_HOST.value
server_xds_port = xds_flags.SERVER_XDS_PORT.value
server_namespace = KubernetesServerRunner.make_namespace_name(
server_namespace = _KubernetesServerRunner.make_namespace_name(
resource_prefix, resource_suffix)
gcp_api_manager = gcp.api.GcpApiManager()
@ -110,7 +111,8 @@ def main(argv): # pylint: disable=too-many-locals,too-many-branches,too-many-st
resource_prefix=resource_prefix,
resource_suffix=resource_suffix)
if server_maintenance_port is None:
server_maintenance_port = KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
server_maintenance_port = \
_KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
try:
if command in ('create', 'cycle'):

@ -20,7 +20,7 @@ from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app import client_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
logger = logging.getLogger(__name__)
# Flags
@ -52,7 +52,7 @@ flags.adopt_module_key_flags(xds_k8s_flags)
flags.mark_flag_as_required("resource_suffix")
# Type aliases
KubernetesClientRunner = client_app.KubernetesClientRunner
_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
def main(argv):
@ -84,9 +84,9 @@ def main(argv):
deployment_template='client-secure.deployment.yaml')
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
client_namespace = KubernetesClientRunner.make_namespace_name(
client_namespace = _KubernetesClientRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
client_runner = KubernetesClientRunner(
client_runner = _KubernetesClientRunner(
k8s.KubernetesNamespace(k8s_api_manager, client_namespace),
**runner_kwargs)

@ -20,7 +20,7 @@ from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
# Flags
@ -43,7 +43,8 @@ flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
KubernetesServerRunner = server_app.KubernetesServerRunner
# Type aliases
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
def main(argv):
@ -78,9 +79,9 @@ def main(argv):
deployment_template='server-secure.deployment.yaml')
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
server_namespace = KubernetesServerRunner.make_namespace_name(
server_namespace = _KubernetesServerRunner.make_namespace_name(
resource_prefix, resource_suffix)
server_runner = KubernetesServerRunner(
server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(k8s_api_manager, server_namespace),
**runner_kwargs)

@ -18,8 +18,8 @@ from framework import xds_k8s_testcase
from framework.helpers import rand as helpers_rand
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
@ -27,8 +27,8 @@ logger = logging.getLogger(__name__)
TrafficDirectorManager = traffic_director.TrafficDirectorManager
XdsTestServer = xds_k8s_testcase.XdsTestServer
XdsTestClient = xds_k8s_testcase.XdsTestClient
KubernetesServerRunner = server_app.KubernetesServerRunner
KubernetesClientRunner = client_app.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
class BootstrapGeneratorBaseTest(xds_k8s_testcase.XdsKubernetesBaseTestCase):

@ -12,10 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
xDS Test Client.
TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual
modules.
Provides an interface to xDS Test Client running remotely.
"""
import datetime
import functools
@ -23,13 +20,10 @@ import logging
from typing import Iterable, List, Optional
from framework.helpers import retryers
from framework.infrastructure import gcp
from framework.infrastructure import k8s
import framework.rpc
from framework.rpc import grpc_channelz
from framework.rpc import grpc_csds
from framework.rpc import grpc_testing
from framework.test_app import base_runner
logger = logging.getLogger(__name__)
@ -223,171 +217,3 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
if subchannel.data.state.state is state:
subchannels.append(subchannel)
return subchannels
class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace,
*,
deployment_name,
image_name,
td_bootstrap_image,
gcp_api_manager: gcp.api.GcpApiManager,
gcp_project: str,
gcp_service_account: str,
xds_server_uri=None,
network='default',
service_account_name=None,
stats_port=8079,
deployment_template='client.deployment.yaml',
service_account_template='service-account.yaml',
reuse_namespace=False,
namespace_template=None,
debug_use_port_forwarding=False,
enable_workload_identity=True):
super().__init__(k8s_namespace, namespace_template, reuse_namespace)
# Settings
self.deployment_name = deployment_name
self.image_name = image_name
self.stats_port = stats_port
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
self.xds_server_uri = xds_server_uri
self.network = network
self.deployment_template = deployment_template
self.debug_use_port_forwarding = debug_use_port_forwarding
self.enable_workload_identity = enable_workload_identity
# Service account settings:
# Kubernetes service account
if self.enable_workload_identity:
self.service_account_name = service_account_name or deployment_name
self.service_account_template = service_account_template
else:
self.service_account_name = None
self.service_account_template = None
# GCP.
self.gcp_project = gcp_project
self.gcp_ui_url = gcp_api_manager.gcp_ui_url
# GCP service account to map to Kubernetes service account
self.gcp_service_account = gcp_service_account
# GCP IAM API used to grant allow workload service accounts permission
# to use GCP service account identity.
self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.port_forwarder: Optional[k8s.PortForwarder] = None
# TODO(sergiitk): make rpc UnaryCall enum or get it from proto
def run( # pylint: disable=arguments-differ
self,
*,
server_target,
rpc='UnaryCall',
qps=25,
metadata='',
secure_mode=False,
config_mesh=None,
print_response=False) -> XdsTestClient:
logger.info(
'Deploying xDS test client "%s" to k8s namespace %s: '
'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
'print_response=%s', self.deployment_name, self.k8s_namespace.name,
server_target, rpc, qps, metadata, secure_mode, print_response)
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
gcp_ui_url=self.gcp_ui_url)
super().run()
if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account
# identity.
self._grant_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
# Create service account
self.service_account = self._create_service_account(
self.service_account_template,
service_account_name=self.service_account_name,
namespace_name=self.k8s_namespace.name,
gcp_service_account=self.gcp_service_account)
# Always create a new deployment
self.deployment = self._create_deployment(
self.deployment_template,
deployment_name=self.deployment_name,
image_name=self.image_name,
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
stats_port=self.stats_port,
server_target=server_target,
rpc=rpc,
qps=qps,
metadata=metadata,
secure_mode=secure_mode,
config_mesh=config_mesh,
print_response=print_response)
self._wait_deployment_with_available_replicas(self.deployment_name)
# Load test client pod. We need only one client at the moment
pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
self._wait_pod_started(pod.metadata.name)
pod_ip = pod.status.pod_ip
rpc_port = self.stats_port
rpc_host = None
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, self.stats_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=self.stats_port)
rpc_port = self.port_forwarder.local_port
rpc_host = self.port_forwarder.local_address
return XdsTestClient(ip=pod_ip,
rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarder:
self.port_forwarder.close()
self.port_forwarder = None
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
if self.enable_workload_identity and (self.service_account or force):
self._revoke_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
self._delete_service_account(self.service_account_name)
self.service_account = None
super().cleanup(force=force_namespace and force)
@classmethod
def make_namespace_name(cls,
resource_prefix: str,
resource_suffix: str,
name: str = 'client') -> str:
"""A helper to make consistent XdsTestClient kubernetes namespace name
for given resource prefix and suffix.
Note: the idea is to intentionally produce different namespace name for
the test server, and the test client, as that closely mimics real-world
deployments.
"""
return cls._make_namespace_name(resource_prefix, resource_suffix, name)

@ -0,0 +1,13 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,63 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common functionality for running xDS Test Client and Server remotely.
"""
from abc import ABCMeta
from abc import abstractmethod
from typing import Dict, Optional
import urllib.parse
class RunnerError(Exception):
"""Error running xDS Test App running remotely."""
class BaseRunner(metaclass=ABCMeta):
@abstractmethod
def run(self, **kwargs):
pass
@abstractmethod
def cleanup(self, *, force=False):
pass
@classmethod
def _logs_explorer_link_from_params(
cls,
*,
gcp_ui_url: str,
gcp_project: str,
query: Dict[str, str],
request: Optional[Dict[str, str]] = None) -> str:
req_merged = {'query': cls._logs_explorer_query(query)}
if request is not None:
req_merged.update(request)
req = cls._logs_explorer_request(req_merged)
return f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}'
@classmethod
def _logs_explorer_query(cls, query: Dict[str, str]) -> str:
return '\n'.join(f'{k}="{v}"' for k, v in query.items())
@classmethod
def _logs_explorer_request(cls, req: Dict[str, str]) -> str:
return ';'.join(
f'{k}={cls._logs_explorer_quote(v)}' for k, v in req.items())
@classmethod
def _logs_explorer_quote(cls, value: str) -> str:
return urllib.parse.quote_plus(value, safe=':')

@ -0,0 +1,13 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -1,4 +1,4 @@
# Copyright 2020 gRPC authors.
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -11,12 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common functionality for running xDS Test Client and Server on Kubernetes.
"""
import contextlib
import datetime
import logging
import pathlib
from typing import Dict, Optional
import urllib.parse
from typing import Optional
import mako.template
import yaml
@ -25,34 +27,20 @@ import framework.helpers.datetime
import framework.helpers.highlighter
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.runners import base_runner
logger = logging.getLogger(__name__)
# Type aliases
_RunnerError = base_runner.RunnerError
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
_helper_datetime = framework.helpers.datetime
timedelta = datetime.timedelta
_timedelta = datetime.timedelta
def _logs_explorer_query(query: Dict[str, str]) -> str:
return '\n'.join(f'{k}="{v}"' for k, v in query.items())
def _logs_explorer_request(req: Dict[str, str]) -> str:
return ';'.join(f'{k}={_logs_explorer_quote(v)}' for k, v in req.items())
def _logs_explorer_quote(value: str):
return urllib.parse.quote_plus(value, safe=':')
class RunnerError(Exception):
"""Error running app"""
class KubernetesBaseRunner:
class KubernetesBaseRunner(base_runner.BaseRunner):
TEMPLATE_DIR_NAME = 'kubernetes-manifests'
TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}'
TEMPLATE_DIR_RELATIVE_PATH = f'../../../../{TEMPLATE_DIR_NAME}'
ROLE_WORKLOAD_IDENTITY_USER = 'roles/iam.workloadIdentityUser'
def __init__(self,
@ -82,20 +70,20 @@ class KubernetesBaseRunner:
self.delete_namespace()
self.namespace = None
@staticmethod
def _render_template(template_file, **kwargs):
@classmethod
def _render_template(cls, template_file, **kwargs):
template = mako.template.Template(filename=str(template_file))
return template.render(**kwargs)
@staticmethod
def _manifests_from_yaml_file(yaml_file):
@classmethod
def _manifests_from_yaml_file(cls, yaml_file):
with open(yaml_file) as f:
with contextlib.closing(yaml.safe_load_all(f)) as yml:
for manifest in yml:
yield manifest
@staticmethod
def _manifests_from_str(document):
@classmethod
def _manifests_from_str(cls, document):
with contextlib.closing(yaml.safe_load_all(document)) as yml:
for manifest in yml:
yield manifest
@ -118,12 +106,12 @@ class KubernetesBaseRunner:
manifest = next(manifests)
# Error out on multi-document yaml
if next(manifests, False):
raise RunnerError('Exactly one document expected in manifest '
f'{template_file}')
raise _RunnerError('Exactly one document expected in manifest '
f'{template_file}')
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
if len(k8s_objects) != 1:
raise RunnerError('Expected exactly one object must created from '
f'manifest {template_file}')
raise _RunnerError('Expected exactly one object must created from '
f'manifest {template_file}')
logger.info('%s %s created', k8s_objects[0].kind,
k8s_objects[0].metadata.name)
@ -145,18 +133,18 @@ class KubernetesBaseRunner:
def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace:
namespace = self._create_from_template(template, **kwargs)
if not isinstance(namespace, k8s.V1Namespace):
raise RunnerError('Expected V1Namespace to be created '
f'from manifest {template}')
raise _RunnerError('Expected V1Namespace to be created '
f'from manifest {template}')
if namespace.metadata.name != kwargs['namespace_name']:
raise RunnerError('V1Namespace created with unexpected name: '
f'{namespace.metadata.name}')
raise _RunnerError('V1Namespace created with unexpected name: '
f'{namespace.metadata.name}')
logger.debug('V1Namespace %s created at %s',
namespace.metadata.self_link,
namespace.metadata.creation_timestamp)
return namespace
@staticmethod
def _get_workload_identity_member_name(project, namespace_name,
@classmethod
def _get_workload_identity_member_name(cls, project, namespace_name,
service_account_name):
"""
Returns workload identity member name used to authenticate Kubernetes
@ -199,11 +187,11 @@ class KubernetesBaseRunner:
**kwargs) -> k8s.V1ServiceAccount:
resource = self._create_from_template(template, **kwargs)
if not isinstance(resource, k8s.V1ServiceAccount):
raise RunnerError('Expected V1ServiceAccount to be created '
f'from manifest {template}')
raise _RunnerError('Expected V1ServiceAccount to be created '
f'from manifest {template}')
if resource.metadata.name != kwargs['service_account_name']:
raise RunnerError('V1ServiceAccount created with unexpected name: '
f'{resource.metadata.name}')
raise _RunnerError('V1ServiceAccount created with unexpected name: '
f'{resource.metadata.name}')
logger.debug('V1ServiceAccount %s created at %s',
resource.metadata.self_link,
resource.metadata.creation_timestamp)
@ -212,11 +200,11 @@ class KubernetesBaseRunner:
def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment:
deployment = self._create_from_template(template, **kwargs)
if not isinstance(deployment, k8s.V1Deployment):
raise RunnerError('Expected V1Deployment to be created '
f'from manifest {template}')
raise _RunnerError('Expected V1Deployment to be created '
f'from manifest {template}')
if deployment.metadata.name != kwargs['deployment_name']:
raise RunnerError('V1Deployment created with unexpected name: '
f'{deployment.metadata.name}')
raise _RunnerError('V1Deployment created with unexpected name: '
f'{deployment.metadata.name}')
logger.debug('V1Deployment %s created at %s',
deployment.metadata.self_link,
deployment.metadata.creation_timestamp)
@ -225,11 +213,11 @@ class KubernetesBaseRunner:
def _create_service(self, template, **kwargs) -> k8s.V1Service:
service = self._create_from_template(template, **kwargs)
if not isinstance(service, k8s.V1Service):
raise RunnerError('Expected V1Service to be created '
f'from manifest {template}')
raise _RunnerError('Expected V1Service to be created '
f'from manifest {template}')
if service.metadata.name != kwargs['service_name']:
raise RunnerError('V1Service created with unexpected name: '
f'{service.metadata.name}')
raise _RunnerError('V1Service created with unexpected name: '
f'{service.metadata.name}')
logger.debug('V1Service %s created at %s', service.metadata.self_link,
service.metadata.creation_timestamp)
return service
@ -311,36 +299,36 @@ class KubernetesBaseRunner:
logger.info("Service %s: detected NEG=%s in zones=%s", name, neg_name,
neg_zones)
@staticmethod
def _logs_explorer_link(*,
@classmethod
def _logs_explorer_link(cls,
*,
deployment_name: str,
namespace_name: str,
gcp_project: str,
gcp_ui_url: str,
end_delta: Optional[timedelta] = None) -> None:
end_delta: Optional[_timedelta] = None) -> None:
"""Output the link to test server/client logs in GCP Logs Explorer."""
if end_delta is None:
end_delta = timedelta(hours=1)
end_delta = _timedelta(hours=1)
time_now = _helper_datetime.iso8601_utc_time()
time_end = _helper_datetime.iso8601_utc_time(end_delta)
query = _logs_explorer_query({
request = {'timeRange': f'{time_now}/{time_end}'}
query = {
'resource.type': 'k8s_container',
'resource.labels.project_id': gcp_project,
'resource.labels.container_name': deployment_name,
'resource.labels.namespace_name': namespace_name,
})
req = _logs_explorer_request({
'query': query,
'timeRange': f'{time_now}/{time_end}',
})
}
link = f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}'
link = cls._logs_explorer_link_from_params(gcp_ui_url=gcp_ui_url,
gcp_project=gcp_project,
query=query,
request=request)
# A whitespace at the end to indicate the end of the url.
logger.info("GCP Logs Explorer link to %s:\n%s ", deployment_name, link)
@staticmethod
def _make_namespace_name(resource_prefix: str, resource_suffix: str,
@classmethod
def _make_namespace_name(cls, resource_prefix: str, resource_suffix: str,
name: str) -> str:
"""A helper to make consistent test app kubernetes namespace name
for given resource prefix and suffix."""

@ -0,0 +1,194 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run xDS Test Client on Kubernetes.
"""
import logging
from typing import Optional
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.client_app import XdsTestClient
from framework.test_app.runners.k8s import k8s_base_runner
logger = logging.getLogger(__name__)
class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace,
*,
deployment_name,
image_name,
td_bootstrap_image,
gcp_api_manager: gcp.api.GcpApiManager,
gcp_project: str,
gcp_service_account: str,
xds_server_uri=None,
network='default',
service_account_name=None,
stats_port=8079,
deployment_template='client.deployment.yaml',
service_account_template='service-account.yaml',
reuse_namespace=False,
namespace_template=None,
debug_use_port_forwarding=False,
enable_workload_identity=True):
super().__init__(k8s_namespace, namespace_template, reuse_namespace)
# Settings
self.deployment_name = deployment_name
self.image_name = image_name
self.stats_port = stats_port
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
self.xds_server_uri = xds_server_uri
self.network = network
self.deployment_template = deployment_template
self.debug_use_port_forwarding = debug_use_port_forwarding
self.enable_workload_identity = enable_workload_identity
# Service account settings:
# Kubernetes service account
if self.enable_workload_identity:
self.service_account_name = service_account_name or deployment_name
self.service_account_template = service_account_template
else:
self.service_account_name = None
self.service_account_template = None
# GCP.
self.gcp_project = gcp_project
self.gcp_ui_url = gcp_api_manager.gcp_ui_url
# GCP service account to map to Kubernetes service account
self.gcp_service_account = gcp_service_account
# GCP IAM API used to grant allow workload service accounts permission
# to use GCP service account identity.
self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.port_forwarder: Optional[k8s.PortForwarder] = None
# TODO(sergiitk): make rpc UnaryCall enum or get it from proto
def run( # pylint: disable=arguments-differ
self,
*,
server_target,
rpc='UnaryCall',
qps=25,
metadata='',
secure_mode=False,
config_mesh=None,
print_response=False) -> XdsTestClient:
logger.info(
'Deploying xDS test client "%s" to k8s namespace %s: '
'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
'print_response=%s', self.deployment_name, self.k8s_namespace.name,
server_target, rpc, qps, metadata, secure_mode, print_response)
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
gcp_ui_url=self.gcp_ui_url)
super().run()
if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account
# identity.
self._grant_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
# Create service account
self.service_account = self._create_service_account(
self.service_account_template,
service_account_name=self.service_account_name,
namespace_name=self.k8s_namespace.name,
gcp_service_account=self.gcp_service_account)
# Always create a new deployment
self.deployment = self._create_deployment(
self.deployment_template,
deployment_name=self.deployment_name,
image_name=self.image_name,
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
stats_port=self.stats_port,
server_target=server_target,
rpc=rpc,
qps=qps,
metadata=metadata,
secure_mode=secure_mode,
config_mesh=config_mesh,
print_response=print_response)
self._wait_deployment_with_available_replicas(self.deployment_name)
# Load test client pod. We need only one client at the moment
pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
self._wait_pod_started(pod.metadata.name)
pod_ip = pod.status.pod_ip
rpc_port = self.stats_port
rpc_host = None
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, self.stats_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=self.stats_port)
rpc_port = self.port_forwarder.local_port
rpc_host = self.port_forwarder.local_address
return XdsTestClient(ip=pod_ip,
rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarder:
self.port_forwarder.close()
self.port_forwarder = None
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
if self.enable_workload_identity and (self.service_account or force):
self._revoke_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
self._delete_service_account(self.service_account_name)
self.service_account = None
super().cleanup(force=force_namespace and force)
@classmethod
def make_namespace_name(cls,
resource_prefix: str,
resource_suffix: str,
name: str = 'client') -> str:
"""A helper to make consistent XdsTestClient kubernetes namespace name
for given resource prefix and suffix.
Note: the idea is to intentionally produce different namespace name for
the test server, and the test client, as that closely mimics real-world
deployments.
"""
return cls._make_namespace_name(resource_prefix, resource_suffix, name)

@ -0,0 +1,252 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run xDS Test Client on Kubernetes.
"""
import logging
from typing import List, Optional
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.runners.k8s import k8s_base_runner
from framework.test_app.server_app import XdsTestServer
logger = logging.getLogger(__name__)
class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
DEFAULT_TEST_PORT = 8080
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace,
*,
deployment_name,
image_name,
td_bootstrap_image,
gcp_api_manager: gcp.api.GcpApiManager,
gcp_project: str,
gcp_service_account: str,
service_account_name=None,
service_name=None,
neg_name=None,
xds_server_uri=None,
network='default',
deployment_template='server.deployment.yaml',
service_account_template='service-account.yaml',
service_template='server.service.yaml',
reuse_service=False,
reuse_namespace=False,
namespace_template=None,
debug_use_port_forwarding=False,
enable_workload_identity=True):
super().__init__(k8s_namespace, namespace_template, reuse_namespace)
# Settings
self.deployment_name = deployment_name
self.image_name = image_name
self.service_name = service_name or deployment_name
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
self.xds_server_uri = xds_server_uri
# This only works in k8s >= 1.18.10-gke.600
# https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs
self.neg_name = neg_name or (f'{self.k8s_namespace.name}-'
f'{self.service_name}')
self.network = network
self.deployment_template = deployment_template
self.service_template = service_template
self.reuse_service = reuse_service
self.debug_use_port_forwarding = debug_use_port_forwarding
self.enable_workload_identity = enable_workload_identity
# Service account settings:
# Kubernetes service account
if self.enable_workload_identity:
self.service_account_name = service_account_name or deployment_name
self.service_account_template = service_account_template
else:
self.service_account_name = None
self.service_account_template = None
# GCP.
self.gcp_project = gcp_project
self.gcp_ui_url = gcp_api_manager.gcp_ui_url
# GCP service account to map to Kubernetes service account
self.gcp_service_account = gcp_service_account
# GCP IAM API used to grant allow workload service accounts permission
# to use GCP service account identity.
self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.service: Optional[k8s.V1Service] = None
self.port_forwarders: List[k8s.PortForwarder] = []
def run( # pylint: disable=arguments-differ
self,
*,
test_port=DEFAULT_TEST_PORT,
maintenance_port=None,
secure_mode=False,
server_id=None,
replica_count=1) -> List[XdsTestServer]:
# Implementation detail: in secure mode, maintenance ("backchannel")
# port must be different from the test port so communication with
# maintenance services can be reached independently from the security
# configuration under test.
if maintenance_port is None:
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
if secure_mode and maintenance_port == test_port:
raise ValueError('port and maintenance_port must be different '
'when running test server in secure mode')
# To avoid bugs with comparing wrong types.
if not (isinstance(test_port, int) and
isinstance(maintenance_port, int)):
raise TypeError('Port numbers must be integer')
if secure_mode and not self.enable_workload_identity:
raise ValueError('Secure mode requires Workload Identity enabled.')
logger.info(
'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s',
self.deployment_name, self.k8s_namespace.name, test_port,
maintenance_port, secure_mode, server_id, replica_count)
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
gcp_ui_url=self.gcp_ui_url)
# Create namespace.
super().run()
# Reuse existing if requested, create a new deployment when missing.
# Useful for debugging to avoid NEG loosing relation to deleted service.
if self.reuse_service:
self.service = self._reuse_service(self.service_name)
if not self.service:
self.service = self._create_service(
self.service_template,
service_name=self.service_name,
namespace_name=self.k8s_namespace.name,
deployment_name=self.deployment_name,
neg_name=self.neg_name,
test_port=test_port)
self._wait_service_neg(self.service_name, test_port)
if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account
# identity.
self._grant_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
# Create service account
self.service_account = self._create_service_account(
self.service_account_template,
service_account_name=self.service_account_name,
namespace_name=self.k8s_namespace.name,
gcp_service_account=self.gcp_service_account)
# Always create a new deployment
self.deployment = self._create_deployment(
self.deployment_template,
deployment_name=self.deployment_name,
image_name=self.image_name,
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
replica_count=replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
server_id=server_id,
secure_mode=secure_mode)
self._wait_deployment_with_available_replicas(self.deployment_name,
replica_count)
# Wait for pods running
pods = self.k8s_namespace.list_deployment_pods(self.deployment)
servers = []
for pod in pods:
pod_name = pod.metadata.name
self._wait_pod_started(pod_name)
pod_ip = pod.status.pod_ip
rpc_host = None
# Experimental, for local debugging.
local_port = maintenance_port
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, maintenance_port)
port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=maintenance_port)
self.port_forwarders.append(port_forwarder)
local_port = port_forwarder.local_port
rpc_host = port_forwarder.local_address
servers.append(
XdsTestServer(ip=pod_ip,
rpc_port=test_port,
maintenance_port=local_port,
secure_mode=secure_mode,
server_id=server_id,
rpc_host=rpc_host,
pod_name=pod_name))
return servers
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarders:
for port_forwarder in self.port_forwarders:
port_forwarder.close()
self.port_forwarders = []
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
if (self.service and not self.reuse_service) or force:
self._delete_service(self.service_name)
self.service = None
if self.enable_workload_identity and (self.service_account or force):
self._revoke_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
self._delete_service_account(self.service_account_name)
self.service_account = None
super().cleanup(force=(force_namespace and force))
@classmethod
def make_namespace_name(cls,
resource_prefix: str,
resource_suffix: str,
name: str = 'server') -> str:
"""A helper to make consistent XdsTestServer kubernetes namespace name
for given resource prefix and suffix.
Note: the idea is to intentionally produce different namespace name for
the test server, and the test client, as that closely mimics real-world
deployments.
"""
return cls._make_namespace_name(resource_prefix, resource_suffix, name)

@ -12,21 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
xDS Test Server.
TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
modules.
Provides an interface to xDS Test Server running remotely.
"""
import functools
import logging
from typing import Iterator, List, Optional
from typing import Iterator, Optional
from framework.infrastructure import gcp
from framework.infrastructure import k8s
import framework.rpc
from framework.rpc import grpc_channelz
from framework.rpc import grpc_testing
from framework.test_app import base_runner
logger = logging.getLogger(__name__)
@ -150,231 +144,3 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
self.channelz.sock_addresses_pretty(server_socket),
self.channelz.sock_addresses_pretty(client_socket))
return server_socket
class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_TEST_PORT = 8080
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
def __init__( # pylint: disable=too-many-locals
self,
k8s_namespace,
*,
deployment_name,
image_name,
td_bootstrap_image,
gcp_api_manager: gcp.api.GcpApiManager,
gcp_project: str,
gcp_service_account: str,
service_account_name=None,
service_name=None,
neg_name=None,
xds_server_uri=None,
network='default',
deployment_template='server.deployment.yaml',
service_account_template='service-account.yaml',
service_template='server.service.yaml',
reuse_service=False,
reuse_namespace=False,
namespace_template=None,
debug_use_port_forwarding=False,
enable_workload_identity=True):
super().__init__(k8s_namespace, namespace_template, reuse_namespace)
# Settings
self.deployment_name = deployment_name
self.image_name = image_name
self.service_name = service_name or deployment_name
# xDS bootstrap generator
self.td_bootstrap_image = td_bootstrap_image
self.xds_server_uri = xds_server_uri
# This only works in k8s >= 1.18.10-gke.600
# https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs
self.neg_name = neg_name or (f'{self.k8s_namespace.name}-'
f'{self.service_name}')
self.network = network
self.deployment_template = deployment_template
self.service_template = service_template
self.reuse_service = reuse_service
self.debug_use_port_forwarding = debug_use_port_forwarding
self.enable_workload_identity = enable_workload_identity
# Service account settings:
# Kubernetes service account
if self.enable_workload_identity:
self.service_account_name = service_account_name or deployment_name
self.service_account_template = service_account_template
else:
self.service_account_name = None
self.service_account_template = None
# GCP.
self.gcp_project = gcp_project
self.gcp_ui_url = gcp_api_manager.gcp_ui_url
# GCP service account to map to Kubernetes service account
self.gcp_service_account = gcp_service_account
# GCP IAM API used to grant allow workload service accounts permission
# to use GCP service account identity.
self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.service: Optional[k8s.V1Service] = None
self.port_forwarders: List[k8s.PortForwarder] = []
def run( # pylint: disable=arguments-differ
self,
*,
test_port=DEFAULT_TEST_PORT,
maintenance_port=None,
secure_mode=False,
server_id=None,
replica_count=1) -> List[XdsTestServer]:
# Implementation detail: in secure mode, maintenance ("backchannel")
# port must be different from the test port so communication with
# maintenance services can be reached independently from the security
# configuration under test.
if maintenance_port is None:
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
if secure_mode and maintenance_port == test_port:
raise ValueError('port and maintenance_port must be different '
'when running test server in secure mode')
# To avoid bugs with comparing wrong types.
if not (isinstance(test_port, int) and
isinstance(maintenance_port, int)):
raise TypeError('Port numbers must be integer')
if secure_mode and not self.enable_workload_identity:
raise ValueError('Secure mode requires Workload Identity enabled.')
logger.info(
'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s',
self.deployment_name, self.k8s_namespace.name, test_port,
maintenance_port, secure_mode, server_id, replica_count)
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
gcp_ui_url=self.gcp_ui_url)
# Create namespace.
super().run()
# Reuse existing if requested, create a new deployment when missing.
# Useful for debugging to avoid NEG loosing relation to deleted service.
if self.reuse_service:
self.service = self._reuse_service(self.service_name)
if not self.service:
self.service = self._create_service(
self.service_template,
service_name=self.service_name,
namespace_name=self.k8s_namespace.name,
deployment_name=self.deployment_name,
neg_name=self.neg_name,
test_port=test_port)
self._wait_service_neg(self.service_name, test_port)
if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account
# identity.
self._grant_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
# Create service account
self.service_account = self._create_service_account(
self.service_account_template,
service_account_name=self.service_account_name,
namespace_name=self.k8s_namespace.name,
gcp_service_account=self.gcp_service_account)
# Always create a new deployment
self.deployment = self._create_deployment(
self.deployment_template,
deployment_name=self.deployment_name,
image_name=self.image_name,
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
replica_count=replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
server_id=server_id,
secure_mode=secure_mode)
self._wait_deployment_with_available_replicas(self.deployment_name,
replica_count)
# Wait for pods running
pods = self.k8s_namespace.list_deployment_pods(self.deployment)
servers = []
for pod in pods:
pod_name = pod.metadata.name
self._wait_pod_started(pod_name)
pod_ip = pod.status.pod_ip
rpc_host = None
# Experimental, for local debugging.
local_port = maintenance_port
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, maintenance_port)
port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=maintenance_port)
self.port_forwarders.append(port_forwarder)
local_port = port_forwarder.local_port
rpc_host = port_forwarder.local_address
servers.append(
XdsTestServer(ip=pod_ip,
rpc_port=test_port,
maintenance_port=local_port,
secure_mode=secure_mode,
server_id=server_id,
rpc_host=rpc_host,
pod_name=pod_name))
return servers
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarders:
for port_forwarder in self.port_forwarders:
port_forwarder.close()
self.port_forwarders = []
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
if (self.service and not self.reuse_service) or force:
self._delete_service(self.service_name)
self.service = None
if self.enable_workload_identity and (self.service_account or force):
self._revoke_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name)
self._delete_service_account(self.service_account_name)
self.service_account = None
super().cleanup(force=(force_namespace and force))
@classmethod
def make_namespace_name(cls,
resource_prefix: str,
resource_suffix: str,
name: str = 'server') -> str:
"""A helper to make consistent XdsTestServer kubernetes namespace name
for given resource prefix and suffix.
Note: the idea is to intentionally produce different namespace name for
the test server, and the test client, as that closely mimics real-world
deployments.
:rtype: object
"""
return cls._make_namespace_name(resource_prefix, resource_suffix, name)

@ -40,6 +40,8 @@ from framework.rpc import grpc_csds
from framework.rpc import grpc_testing
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
# TODO(yashkt): We will no longer need this flag once Core exposes local certs
@ -57,8 +59,8 @@ TrafficDirectorAppNetManager = traffic_director.TrafficDirectorAppNetManager
TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager
XdsTestServer = server_app.XdsTestServer
XdsTestClient = client_app.XdsTestClient
KubernetesServerRunner = server_app.KubernetesServerRunner
KubernetesClientRunner = client_app.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
_ChannelState = grpc_channelz.ChannelState
_timedelta = datetime.timedelta

@ -26,8 +26,8 @@ import framework.helpers.rand
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
@ -38,6 +38,8 @@ STRATEGY = flags.DEFINE_enum('strategy',
help='Strategy of GCP resources management')
# Type alias
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
UrlMapType = Any
HostRule = Any
PathMatcher = Any
@ -165,7 +167,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
self.resource_prefix)
# Kubernetes Test Servers
self.test_server_runner = server_app.KubernetesServerRunner(
self.test_server_runner = _KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name,
image_name=self.server_image,
@ -176,7 +178,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
xds_server_uri=self.xds_server_uri,
network=self.network,
enable_workload_identity=self.enable_workload_identity)
self.test_server_alternative_runner = server_app.KubernetesServerRunner(
self.test_server_alternative_runner = _KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name + '-alternative',
image_name=self.server_image,
@ -188,7 +190,7 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
network=self.network,
enable_workload_identity=self.enable_workload_identity,
reuse_namespace=True)
self.test_server_affinity_runner = server_app.KubernetesServerRunner(
self.test_server_affinity_runner = _KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name + '-affinity',
image_name=self.server_image,
@ -211,11 +213,10 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
logging.info('GcpResourceManager: client_namespace_suffix=%s',
client_namespace_suffix)
# Kubernetes Test Client
return client_app.KubernetesClientRunner(
k8s.KubernetesNamespace(
self.k8s_api_manager,
client_app.KubernetesClientRunner.make_namespace_name(
self.resource_prefix, client_namespace_suffix)),
namespace_name = _KubernetesClientRunner.make_namespace_name(
self.resource_prefix, client_namespace_suffix)
return _KubernetesClientRunner(
k8s.KubernetesNamespace(self.k8s_api_manager, namespace_name),
deployment_name=self.client_name,
image_name=self.client_image,
gcp_project=self.project,

@ -22,8 +22,8 @@ from absl.testing import parameterized
from framework import bootstrap_generator_testcase
from framework import xds_k8s_testcase
from framework.helpers import retryers
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
@ -31,8 +31,8 @@ flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
XdsTestServer = xds_k8s_testcase.XdsTestServer
XdsTestClient = xds_k8s_testcase.XdsTestClient
KubernetesServerRunner = server_app.KubernetesServerRunner
KubernetesClientRunner = client_app.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
_timedelta = datetime.timedelta

@ -19,7 +19,7 @@ from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
@ -27,6 +27,7 @@ flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
@ -35,7 +36,7 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
super().setUp()
self.alternate_k8s_namespace = k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace)
self.alternate_server_runner = server_app.KubernetesServerRunner(
self.alternate_server_runner = _KubernetesServerRunner(
self.alternate_k8s_namespace,
deployment_name=self.server_name + '-alt',
image_name=self.server_image,

@ -19,7 +19,7 @@ from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
@ -27,6 +27,7 @@ flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
@ -35,7 +36,7 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.secondary_server_runner = server_app.KubernetesServerRunner(
self.secondary_server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(self.secondary_k8s_api_manager,
self.server_namespace),
deployment_name=self.server_name + '-alt',

@ -19,7 +19,7 @@ from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
@ -27,13 +27,14 @@ flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.alternate_server_runner = server_app.KubernetesServerRunner(
self.alternate_server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(self.k8s_api_manager,
self.server_namespace),
deployment_name=self.server_name + '-alt',

Loading…
Cancel
Save