[PSM Interop] Initial support for GAMMA tests (#34151)

Adds initial support for K8s
[GAMMA](https://gateway-api.sigs.k8s.io/concepts/gamma/) (Gateway API
for Service Mesh) initiative.

- Add framework support for loading CRD-based APIs using k8s python
dynamic client
- Add basic mesh baseline test (aka ping-pong) using GAMMA setup
- Implement initial framework changes needed to run PSM tests on
GAMMA-enabled cluster using
[TDMesh](https://cloud.google.com/traffic-director/docs/gke-gateway-overview#gateway-api)
and GRPCRoute.

Based on https://github.com/grpc/grpc/pull/33504.
pull/34166/head
Sergii Tkachenko 2 years ago committed by GitHub
parent a6689e6444
commit 2c5abd316d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      tools/run_tests/xds_k8s_test_driver/README.md
  2. 6
      tools/run_tests/xds_k8s_test_driver/bin/cleanup.sh
  3. 26
      tools/run_tests/xds_k8s_test_driver/bin/lib/common.py
  4. 4
      tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py
  5. 19
      tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py
  6. 37
      tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py
  7. 20
      tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
  8. 193
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  9. 22
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director_gamma.py
  10. 242
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py
  11. 98
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
  12. 17
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
  13. 118
      tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py
  14. 21
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  15. 22
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_grpc.yaml
  16. 17
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/service.yaml
  17. 19
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml
  18. 13
      tools/run_tests/xds_k8s_test_driver/tests/gamma/__init__.py
  19. 44
      tools/run_tests/xds_k8s_test_driver/tests/gamma/gamma_baseline_test.py

@ -366,13 +366,13 @@ XDS_K8S_CONFIG=./path-to-flagfile.cfg ./run.sh bin/run_td_setup.py --resource_su
./run.sh bin/run_td_setup.py --security=mtls
# Start test server in a secure mode
./run.sh bin/run_test_server.py --secure
./run.sh bin/run_test_server.py --mode=secure
# Add test server to the backend service
./run.sh bin/run_td_setup.py --cmd=backends-add
# Start test client in a secure more --secure
./run.sh bin/run_test_client.py --secure
# Start test client in a secure more --mode=secure
./run.sh bin/run_test_client.py --mode=secure
```
### Sending RPCs
@ -432,9 +432,9 @@ Cleanup regular and security-specific resources:
# Cleanup TD resources, with security
./run.sh bin/run_td_setup.py --cmd=cleanup --security=mtls
# Stop test client (secure)
./run.sh bin/run_test_client.py --cmd=cleanup --secure
./run.sh bin/run_test_client.py --cmd=cleanup --mode=secure
# Stop test server (secure), and remove the namespace
./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace --secure
./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace --mode=secure
```
In addition, here's some other helpful partial cleanup commands:

@ -31,7 +31,7 @@ ENVIRONMENT:
Default: $XDS_K8S_DRIVER_DIR/venv
EXAMPLES:
$0
$0 --secure
$0 --nosecure
XDS_K8S_CONFIG=./path-to-flagfile.cfg $0 --resource_suffix=override-suffix
EOF
exit 1
@ -54,6 +54,6 @@ if [[ "$1" == "--nosecure" ]]; then
./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace "$@"
else
./run.sh bin/run_td_setup.py --cmd=cleanup --security=mtls "$@" && \
./run.sh bin/run_test_client.py --cmd=cleanup --cleanup_namespace --secure "$@" && \
./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace --secure "$@"
./run.sh bin/run_test_client.py --cmd=cleanup --cleanup_namespace --mode=secure "$@" && \
./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace --mode=secure "$@"
fi

@ -24,6 +24,7 @@ from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import gamma_server_runner
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
@ -32,16 +33,19 @@ logger = logging.get_absl_logger()
# Type aliases
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
GammaServerRunner = gamma_server_runner.GammaServerRunner
_XdsTestServer = server_app.XdsTestServer
_XdsTestClient = client_app.XdsTestClient
def make_client_namespace(
k8s_api_manager: k8s.KubernetesApiManager,
namespace_name: str = None,
) -> k8s.KubernetesNamespace:
namespace_name: str = KubernetesClientRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value
)
if not namespace_name:
namespace_name: str = KubernetesClientRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value
)
return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
@ -50,7 +54,7 @@ def make_client_runner(
gcp_api_manager: gcp.api.GcpApiManager,
port_forwarding: bool = False,
reuse_namespace: bool = True,
secure: bool = False,
mode: str = "default",
) -> KubernetesClientRunner:
# KubernetesClientRunner arguments.
runner_kwargs = dict(
@ -67,7 +71,7 @@ def make_client_runner(
debug_use_port_forwarding=port_forwarding,
)
if secure:
if mode == "secure":
runner_kwargs.update(
deployment_template="client-secure.deployment.yaml"
)
@ -76,8 +80,9 @@ def make_client_runner(
def make_server_namespace(
k8s_api_manager: k8s.KubernetesApiManager,
server_runner: KubernetesServerRunner = KubernetesServerRunner,
) -> k8s.KubernetesNamespace:
namespace_name: str = KubernetesServerRunner.make_namespace_name(
namespace_name: str = server_runner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value
)
return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
@ -89,7 +94,7 @@ def make_server_runner(
port_forwarding: bool = False,
reuse_namespace: bool = True,
reuse_service: bool = False,
secure: bool = False,
mode: str = "default",
) -> KubernetesServerRunner:
# KubernetesServerRunner arguments.
runner_kwargs = dict(
@ -106,10 +111,13 @@ def make_server_runner(
debug_use_port_forwarding=port_forwarding,
)
if secure:
server_runner = KubernetesServerRunner
if mode == "secure":
runner_kwargs["deployment_template"] = "server-secure.deployment.yaml"
elif mode == "gamma":
server_runner = GammaServerRunner
return KubernetesServerRunner(namespace, **runner_kwargs)
return server_runner(namespace, **runner_kwargs)
def _ensure_atexit(signum, frame):

@ -212,7 +212,7 @@ def main(argv):
server_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure,
mode="secure",
)
# Find server pod.
server_pod: k8s.V1Pod = common.get_server_pod(
@ -225,7 +225,7 @@ def main(argv):
client_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure,
mode="secure",
)
# Find client pod.
client_pod: k8s.V1Pod = common.get_client_pod(

@ -28,13 +28,11 @@ from framework.test_app import client_app
from framework.test_app import server_app
# Flags
_SECURE = flags.DEFINE_bool(
"secure",
default=False,
help=(
"Set to True if the the client/server were started "
"with the PSM security enabled."
),
_MODE = flags.DEFINE_enum(
"mode",
default="default",
enum_values=["default", "secure", "gamma"],
help="Select a deployment of the client/server",
)
_NUM_RPCS = flags.DEFINE_integer(
"num_rpcs",
@ -105,7 +103,6 @@ def main(argv):
# Flags.
should_port_forward: bool = xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value
is_secure: bool = _SECURE.value
# Setup.
gcp_api_manager = gcp.api.GcpApiManager()
@ -117,7 +114,7 @@ def main(argv):
server_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure,
mode=_MODE.value,
)
# Find server pod.
server_pod: k8s.V1Pod = common.get_server_pod(
@ -130,7 +127,7 @@ def main(argv):
client_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure,
mode=_MODE.value,
)
# Find client pod.
client_pod: k8s.V1Pod = common.get_client_pod(
@ -145,7 +142,7 @@ def main(argv):
server_runner,
server_pod,
test_port=xds_flags.SERVER_PORT.value,
secure_mode=is_secure,
secure_mode=_MODE.value == "secure",
)
test_server.set_xds_address(
xds_flags.SERVER_XDS_HOST.value, xds_flags.SERVER_XDS_PORT.value

@ -11,6 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run test xds client.
Gamma example:
./run.sh bin/run_test_client.py --server_xds_host=psm-grpc-server \
--server_xds_port=80 \
--config_mesh=gketd-psm-grpc-server
"""
import logging
import signal
@ -28,8 +38,16 @@ logger = logging.getLogger(__name__)
_CMD = flags.DEFINE_enum(
"cmd", default="run", enum_values=["run", "cleanup"], help="Command"
)
_SECURE = flags.DEFINE_bool(
"secure", default=False, help="Run client in the secure mode"
_MODE = flags.DEFINE_enum(
"mode",
default="default",
enum_values=[
"default",
"secure",
# Uncomment if gamma-specific changes added to the client.
# "gamma",
],
help="Select client mode",
)
_QPS = flags.DEFINE_integer("qps", default=25, help="Queries per second")
_PRINT_RESPONSE = flags.DEFINE_bool(
@ -43,7 +61,7 @@ _FOLLOW = flags.DEFINE_bool(
" --debug_use_port_forwarding"
),
)
_CONFIG_MESH = flags.DEFINE_bool(
_CONFIG_MESH = flags.DEFINE_string(
"config_mesh",
default=None,
help="Optional. Supplied to bootstrap generator to indicate AppNet mesh.",
@ -105,21 +123,22 @@ def main(argv):
client_namespace,
gcp_api_manager,
reuse_namespace=_REUSE_NAMESPACE.value,
secure=_SECURE.value,
mode=_MODE.value,
port_forwarding=should_port_forward,
)
# Server target
server_xds_host = xds_flags.SERVER_XDS_HOST.value
server_xds_port = xds_flags.SERVER_XDS_PORT.value
server_target = f"xds:///{xds_flags.SERVER_XDS_HOST.value}"
if xds_flags.SERVER_XDS_PORT.value != 80:
server_target = f"{server_target}:{xds_flags.SERVER_XDS_PORT.value}"
if _CMD.value == "run":
logger.info("Run client, secure_mode=%s", _SECURE.value)
logger.info("Run client, mode=%s", _MODE.value)
client_runner.run(
server_target=f"xds:///{server_xds_host}:{server_xds_port}",
server_target=server_target,
qps=_QPS.value,
print_response=_PRINT_RESPONSE.value,
secure_mode=_SECURE.value,
secure_mode=_MODE.value == "secure",
config_mesh=_CONFIG_MESH.value,
log_to_stdout=_FOLLOW.value,
)

@ -11,6 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run test xds server.
Gamma example:
./run.sh bin/run_test_server.py --mode=gamma
"""
import logging
import signal
@ -28,8 +34,11 @@ logger = logging.getLogger(__name__)
_CMD = flags.DEFINE_enum(
"cmd", default="run", enum_values=["run", "cleanup"], help="Command"
)
_SECURE = flags.DEFINE_bool(
"secure", default=False, help="Run server in the secure mode"
_MODE = flags.DEFINE_enum(
"mode",
default="default",
enum_values=["default", "secure", "gamma"],
help="Select server mode",
)
_REUSE_NAMESPACE = flags.DEFINE_bool(
"reuse_namespace", default=True, help="Use existing namespace if exists"
@ -76,21 +85,22 @@ def main(argv):
gcp_api_manager = gcp.api.GcpApiManager()
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
server_namespace = common.make_server_namespace(k8s_api_manager)
server_runner = common.make_server_runner(
server_namespace,
gcp_api_manager,
reuse_namespace=_REUSE_NAMESPACE.value,
reuse_service=_REUSE_SERVICE.value,
secure=_SECURE.value,
mode=_MODE.value,
port_forwarding=should_port_forward,
)
if _CMD.value == "run":
logger.info("Run server, secure_mode=%s", _SECURE.value)
logger.info("Run server, mode=%s", _MODE.value)
server_runner.run(
test_port=xds_flags.SERVER_PORT.value,
maintenance_port=xds_flags.SERVER_MAINTENANCE_PORT.value,
secure_mode=_SECURE.value,
secure_mode=_MODE.value == "secure",
log_to_stdout=_FOLLOW.value,
)
if should_follow_logs:

@ -15,6 +15,7 @@
# added to get around circular dependencies caused by k8s.py clashing with
# k8s/__init__.py
import datetime
import functools
import json
import logging
import pathlib
@ -23,8 +24,11 @@ from typing import Any, Callable, List, Optional, Tuple
import warnings
from kubernetes import client
from kubernetes import dynamic
from kubernetes import utils
import kubernetes.config
from kubernetes.dynamic import exceptions as dynamic_exc
from kubernetes.dynamic import resource as dynamic_res
import urllib3.exceptions
import yaml
@ -40,7 +44,6 @@ logger = logging.getLogger(__name__)
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
PodLogCollector = k8s_log_collector.PodLogCollector
PortForwarder = k8s_port_forwarder.PortForwarder
ApiClient = client.ApiClient
V1Deployment = client.V1Deployment
V1ServiceAccount = client.V1ServiceAccount
V1Pod = client.V1Pod
@ -48,6 +51,10 @@ V1PodList = client.V1PodList
V1Service = client.V1Service
V1Namespace = client.V1Namespace
DynResourceInstance = dynamic_res.ResourceInstance
GammaMesh = DynResourceInstance
GammaGrpcRoute = DynResourceInstance
_timedelta = datetime.timedelta
_ApiException = client.ApiException
_FailToCreateError = utils.FailToCreateError
@ -94,18 +101,22 @@ class NotFound(Exception):
class KubernetesApiManager:
_client: ApiClient
_client: client.ApiClient
_dynamic_client: dynamic.DynamicClient
context: str
apps: client.AppsV1Api
core: client.CoreV1Api
_apis: set
_apis: set[object]
_dynamic_apis: set[str]
def __init__(self, context: str):
self.context = context
self._client = self._new_client_from_context(context)
self._dynamic_client = dynamic.DynamicClient(self._client)
self.apps = client.AppsV1Api(self.client)
self.core = client.CoreV1Api(self.client)
self._apis = {self.apps, self.core}
self._dynamic_apis = set()
# TODO(https://github.com/kubernetes-client/python/issues/2101): remove
# when the issue is solved, and the kubernetes dependency is bumped.
warnings.filterwarnings(
@ -119,10 +130,39 @@ class KubernetesApiManager:
)
@property
def client(self) -> ApiClient:
def client(self) -> client.ApiClient:
return self._client
@property
def dynamic_client(self) -> dynamic.DynamicClient:
return self._dynamic_client
@functools.cache # pylint: disable=no-member
def gke_tdmesh(self, version: str) -> dynamic_res.Resource:
api_name = "net.gke.io"
kind = "TDMesh"
supported_versions = {"v1alpha1"}
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)
return self._load_dynamic_api(api_name, version, kind)
@functools.cache # pylint: disable=no-member
def grpc_route(self, version: str) -> dynamic_res.Resource:
api_name = "gateway.networking.k8s.io"
kind = "GRPCRoute"
supported_versions = {"v1alpha2"}
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)
return self._load_dynamic_api(api_name, version, kind)
def close(self):
# TODO(sergiitk): [GAMMA] what to do with dynamic clients?
self.client.close()
def reload(self):
@ -134,8 +174,10 @@ class KubernetesApiManager:
for api in self._apis:
api.api_client = self._client
@staticmethod
def _new_client_from_context(context: str) -> ApiClient:
# TODO(sergiitk): [GAMMA] what to do with dynamic apis?
@classmethod
def _new_client_from_context(cls, context: str) -> "client.ApiClient":
client_instance = kubernetes.config.new_client_from_config(
context=context
)
@ -148,6 +190,23 @@ class KubernetesApiManager:
client_instance.configuration.retries = 10
return client_instance
def _load_dynamic_api(
self, api_name: str, version: str, kind: str
) -> dynamic_res.Resource:
api_version = f"{api_name}/{version}"
try:
k8s_api: dynamic_res.Resource = self.dynamic_client.resources.get(
api_version=api_version, kind=kind
)
self._dynamic_apis.add(k8s_api.group_version)
return k8s_api
except dynamic_exc.ResourceNotFoundError as err:
# TODO(sergiitk): [GAMMA] add retries if static client
# retries not apply.
raise RuntimeError(
f"Couldn't discover k8s API {api_version}, resource {kind}",
) from err
class KubernetesNamespace: # pylint: disable=too-many-public-methods
_highlighter: framework.helpers.highlighter.Highlighter
@ -173,14 +232,52 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def name(self):
return self._name
@functools.cached_property # pylint: disable=no-member
def api_gke_mesh(self) -> dynamic_res.Resource:
return self._get_dynamic_api("net.gke.io/v1alpha1", "TDMesh")
@functools.cached_property # pylint: disable=no-member
def api_grpc_route(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"gateway.networking.k8s.io/v1alpha2",
"GRPCRoute",
)
def _refresh_auth(self):
logger.info("Reloading k8s api client to refresh the auth.")
self._api.reload()
def _apply_manifest(self, manifest):
return utils.create_from_dict(
def _apply_manifest_single(self, manifest) -> object:
k8s_objects = utils.create_from_dict(
self._api.client, manifest, namespace=self.name
)
if len(k8s_objects) != 1:
raise ValueError(
f"Expected exactly one k8s object created from"
f" manifest {manifest}"
)
return k8s_objects[0]
def _apply_manifest_custom_object(self, manifest) -> DynResourceInstance:
api = self._get_dynamic_api(manifest["apiVersion"], manifest["kind"])
return api.create(manifest)
@functools.cache # pylint: disable=no-member
def _get_dynamic_api(self, api_version, kind) -> dynamic_res.Resource:
group, _, version = api_version.partition("/")
# TODO(sergiitk): [GAMMA] Needs to be improved. This all is very clunky
# when considered together with _get_dynamic_api and api_gke_mesh,
# api_grpc_route.
if group == "net.gke.io":
if kind == "TDMesh":
return self._api.gke_tdmesh(version)
elif group == "gateway.networking.k8s.io":
if kind == "GRPCRoute":
return self._api.grpc_route(version)
raise NotImplementedError(f"{kind} {api_version} not implemented.")
def _get_resource(self, method: Callable[[Any], object], *args, **kwargs):
try:
@ -190,6 +287,16 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
# just return None.
return None
def _get_dyn_resource(
self, api: dynamic_res.Resource, name, *args, **kwargs
) -> Optional[DynResourceInstance]:
try:
return api.get(name=name, namespace=self.name, *args, **kwargs)
except dynamic_exc.NotFoundError:
# Instead of trowing an error when a resource doesn't exist,
# just return None.
return None
def _execute(self, method: Callable[[Any], object], *args, **kwargs):
# Note: Intentionally leaving return type as unspecified to not confuse
# pytype for methods that delegate calls to this wrapper.
@ -259,6 +366,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
err.headers,
)
# TODO(sergiitk): [GAMMA] let dynamic/exception parse this instead?
code: int = err.status
body = err.body.lower() if err.body else ""
@ -323,14 +431,23 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
except (KeyError, ValueError):
return body
def create_single_resource(self, manifest):
return self._execute(self._apply_manifest, manifest)
def create_single_resource(self, manifest, custom_object: bool = False):
if custom_object:
return self._execute(self._apply_manifest_custom_object, manifest)
return self._execute(self._apply_manifest_single, manifest)
def get_service(self, name) -> V1Service:
return self._get_resource(
self._api.core.read_namespaced_service, name, self.name
)
def get_gamma_mesh(self, name) -> Optional[GammaMesh]:
return self._get_dyn_resource(self.api_gke_mesh, name)
def get_gamma_route(self, name) -> Optional[GammaGrpcRoute]:
return self._get_dyn_resource(self.api_grpc_route, name)
def get_service_account(self, name) -> V1Service:
return self._get_resource(
self._api.core.read_namespaced_service_account, name, self.name
@ -362,6 +479,36 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
),
)
def delete_gamma_mesh(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
# TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList
# to avoid no-member issues due to dynamic_res.Resource proxying calls?
self._execute(
self.api_gke_mesh.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)
def delete_gamma_route(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
# TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList
# to avoid no-member issues due to dynamic_res.Resource proxying calls?
self._execute(
self.api_grpc_route.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)
def get(self) -> V1Namespace:
return self._get_resource(self._api.core.read_namespace, self.name)
@ -388,6 +535,32 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
)
retryer(self.get_service, name)
def wait_for_get_gamma_mesh_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda mesh: mesh is None,
)
retryer(self.get_gamma_mesh, name)
def wait_for_get_gamma_route_deleted(
self,
name: str,
timeout_sec: int = WAIT_SHORT_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda route: route is None,
)
retryer(self.get_gamma_route, name)
def wait_for_service_account_deleted(
self,
name: str,

@ -0,0 +1,22 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import framework.infrastructure.traffic_director as td_base
# TODO(sergiitk): [GAMMA] make a TD-manager-less base test case.
class TrafficDirectorGammaManager(td_base.TrafficDirectorManager):
"""Gamma."""
def cleanup(self, *, force=False): # pylint: disable=unused-argument
return True

@ -0,0 +1,242 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run xDS Test Client on Kubernetes using Gamma
"""
import logging
from typing import List, Optional
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.runners.k8s import k8s_xds_server_runner
from framework.test_app.server_app import XdsTestServer
logger = logging.getLogger(__name__)
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
class GammaServerRunner(KubernetesServerRunner):
# Mutable state.
mesh: Optional[k8s.GammaMesh] = None
route: Optional[k8s.GammaGrpcRoute] = None
# Mesh
server_xds_host: str
mesh_name: str
route_name: str
def __init__(
self,
k8s_namespace: k8s.KubernetesNamespace,
*,
mesh_name: str,
server_xds_host: str,
deployment_name: str,
image_name: str,
td_bootstrap_image: str,
network: str = "default",
xds_server_uri: Optional[str] = None,
gcp_api_manager: gcp.api.GcpApiManager,
gcp_project: str,
gcp_service_account: str,
service_account_name: Optional[str] = None,
service_name: Optional[str] = None,
route_name: Optional[str] = None,
neg_name: Optional[str] = None,
deployment_template: str = "server.deployment.yaml",
service_account_template: str = "service-account.yaml",
service_template: str = "gamma/service.yaml",
reuse_service: bool = False,
reuse_namespace: bool = False,
namespace_template: Optional[str] = None,
debug_use_port_forwarding: bool = False,
enable_workload_identity: bool = True,
):
# pylint: disable=too-many-locals
super().__init__(
k8s_namespace,
deployment_name=deployment_name,
image_name=image_name,
td_bootstrap_image=td_bootstrap_image,
network=network,
xds_server_uri=xds_server_uri,
gcp_api_manager=gcp_api_manager,
gcp_project=gcp_project,
gcp_service_account=gcp_service_account,
service_account_name=service_account_name,
service_name=service_name,
neg_name=neg_name,
deployment_template=deployment_template,
service_account_template=service_account_template,
service_template=service_template,
reuse_service=reuse_service,
reuse_namespace=reuse_namespace,
namespace_template=namespace_template,
debug_use_port_forwarding=debug_use_port_forwarding,
enable_workload_identity=enable_workload_identity,
)
self.server_xds_host = server_xds_host
self.mesh_name = mesh_name
self.route_name = route_name or f"route-{deployment_name}"
def run(
self,
*,
test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT,
maintenance_port: Optional[int] = None,
secure_mode: bool = False,
replica_count: int = 1,
log_to_stdout: bool = False,
bootstrap_version: Optional[str] = None,
) -> List[XdsTestServer]:
if not maintenance_port:
maintenance_port = self._get_default_maintenance_port(secure_mode)
logger.info(
(
'Deploying GAMMA xDS test server "%s" to k8s namespace %s:'
" test_port=%s maintenance_port=%s secure_mode=%s"
" replica_count=%s"
),
self.deployment_name,
self.k8s_namespace.name,
test_port,
maintenance_port,
False,
replica_count,
)
# super(k8s_base_runner.KubernetesBaseRunner, self).run()
if self.reuse_namespace:
self.namespace = self._reuse_namespace()
if not self.namespace:
self.namespace = self._create_namespace(
self.namespace_template, namespace_name=self.k8s_namespace.name
)
# Create gamma mesh.
# Note: this will be pre-provisioned per cluster.
self.mesh = self._create_gamma_mesh(
"gamma/tdmesh.yaml",
mesh_name=self.mesh_name,
namespace_name=self.k8s_namespace.name,
)
# Reuse existing if requested, create a new deployment when missing.
# Useful for debugging to avoid NEG loosing relation to deleted service.
if self.reuse_service:
self.service = self._reuse_service(self.service_name)
if not self.service:
self.service = self._create_service(
self.service_template,
service_name=self.service_name,
namespace_name=self.k8s_namespace.name,
deployment_name=self.deployment_name,
neg_name=self.gcp_neg_name,
test_port=test_port,
)
# Create the route.
self.route = self._create_gamma_route(
"gamma/route_grpc.yaml",
xds_server_uri=self.server_xds_host,
route_name=self.route_name,
mesh_name=self.mesh_name,
service_name=self.service_name,
namespace_name=self.k8s_namespace.name,
test_port=test_port,
)
# Surprised this just works.
self._wait_service_neg(self.service_name, test_port)
if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account
# identity.
self._grant_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name,
)
# Create service account
self.service_account = self._create_service_account(
self.service_account_template,
service_account_name=self.service_account_name,
namespace_name=self.k8s_namespace.name,
gcp_service_account=self.gcp_service_account,
)
# Always create a new deployment
self.deployment = self._create_deployment(
self.deployment_template,
deployment_name=self.deployment_name,
image_name=self.image_name,
namespace_name=self.k8s_namespace.name,
service_account_name=self.service_account_name,
td_bootstrap_image=self.td_bootstrap_image,
xds_server_uri=self.xds_server_uri,
network=self.network,
replica_count=replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
bootstrap_version=bootstrap_version,
)
return self._make_servers_for_deployment(
replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
log_to_stdout=log_to_stdout,
secure_mode=secure_mode,
)
# pylint: disable=arguments-differ
def cleanup(self, *, force=False, force_namespace=False):
try:
if self.route or force:
self._delete_gamma_route(self.route_name)
self.route = None
if self.mesh or force:
self._delete_gamma_mesh(self.mesh_name)
if (self.service and not self.reuse_service) or force:
self._delete_service(self.service_name)
self.service = None
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None
if self.enable_workload_identity and (
self.service_account or force
):
self._revoke_workload_identity_user(
gcp_iam=self.gcp_iam,
gcp_service_account=self.gcp_service_account,
service_account_name=self.service_account_name,
)
self._delete_service_account(self.service_account_name)
self.service_account = None
self._cleanup_namespace(force=(force_namespace and force))
finally:
self._stop()
# pylint: enable=arguments-differ

@ -250,7 +250,13 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
)
return templates_path.joinpath(template_name).resolve()
def _create_from_template(self, template_name, **kwargs) -> object:
def _create_from_template(
self,
template_name,
*,
custom_object: bool = False,
**kwargs,
) -> object:
template_file = self._template_file_from_name(template_name)
logger.debug("Loading k8s manifest template: %s", template_file)
@ -270,17 +276,13 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
f"Exactly one document expected in manifest {template_file}"
)
k8s_objects = self.k8s_namespace.create_single_resource(manifest)
if len(k8s_objects) != 1:
raise _RunnerError(
"Expected exactly one object must created from "
f"manifest {template_file}"
)
logger.info(
"%s %s created", k8s_objects[0].kind, k8s_objects[0].metadata.name
k8s_object = self.k8s_namespace.create_single_resource(
manifest,
custom_object=custom_object,
)
return k8s_objects[0]
logger.info("%s %s created", k8s_object.kind, k8s_object.metadata.name)
return k8s_object
def _reuse_deployment(self, deployment_name) -> k8s.V1Deployment:
deployment = self.k8s_namespace.get_deployment(deployment_name)
@ -289,10 +291,12 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
def _reuse_service(self, service_name) -> k8s.V1Service:
service = self.k8s_namespace.get_service(service_name)
logger.info("Reusing service: %s", service_name)
# TODO(sergiitk): check if good or must be recreated
return service
def _reuse_namespace(self) -> k8s.V1Namespace:
logger.info("Reusing namespace: %s", self.k8s_namespace.name)
return self.k8s_namespace.get()
def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace:
@ -439,6 +443,76 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
)
return deployment
def _create_gamma_mesh(self, template, **kwargs) -> k8s.GammaMesh:
mesh = self._create_from_template(
template, custom_object=True, **kwargs
)
if not (isinstance(mesh, k8s.GammaMesh) and mesh.kind == "TDMesh"):
raise _RunnerError(
f"Expected ResourceInstance[TDMesh] to be created from"
f" manifest {template}"
)
if mesh.metadata.name != kwargs["mesh_name"]:
raise _RunnerError(
"ResourceInstance[TDMesh] created with unexpected name: "
f"{mesh.metadata.name}"
)
logger.debug(
"ResourceInstance[TDMesh] %s created at %s",
mesh.metadata.name,
mesh.metadata.creation_timestamp,
)
return mesh
def _create_gamma_route(self, template, **kwargs) -> k8s.GammaGrpcRoute:
route = self._create_from_template(
template,
custom_object=True,
**kwargs,
)
if not (
isinstance(route, k8s.GammaGrpcRoute) and route.kind == "GRPCRoute"
):
raise _RunnerError(
f"Expected ResourceInstance[GRPCRoute] to be created from"
f" manifest {template}"
)
if route.metadata.name != kwargs["route_name"]:
raise _RunnerError(
"ResourceInstance[GRPCRoute] created with unexpected name: "
f"{route.metadata.name}"
)
logger.debug(
"ResourceInstance[GRPCRoute] %s created at %s",
route.metadata.name,
route.metadata.creation_timestamp,
)
return route
def _delete_gamma_mesh(self, name, wait_for_deletion=True):
logger.info("Deleting GAMMA mesh %s", name)
try:
self.k8s_namespace.delete_gamma_mesh(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info("GAMMA mesh %s deletion failed: %s", name, e)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_gamma_mesh_deleted(name)
logger.debug("GAMMA mesh %s deleted", name)
def _delete_gamma_route(self, name, wait_for_deletion=True):
logger.info("Deleting GRPCRoute %s", name)
try:
self.k8s_namespace.delete_gamma_route(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info("GRPCRoute %s deletion failed: %s", name, e)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_gamma_route_deleted(name)
logger.debug("GRPCRoute %s deleted", name)
def _create_service(self, template, **kwargs) -> k8s.V1Service:
service = self._create_from_template(template, **kwargs)
if not isinstance(service, k8s.V1Service):
@ -458,8 +532,8 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
return service
def _delete_deployment(self, name, wait_for_deletion=True):
self.stop_pod_dependencies()
logger.info("Deleting deployment %s", name)
self.stop_pod_dependencies()
try:
self.k8s_namespace.delete_deployment(name)
except (retryers.RetryError, k8s.NotFound) as e:

@ -206,6 +206,23 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
bootstrap_version=bootstrap_version,
)
return self._make_servers_for_deployment(
replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
log_to_stdout=log_to_stdout,
secure_mode=secure_mode,
)
def _make_servers_for_deployment(
self,
replica_count,
*,
test_port: int,
maintenance_port: int,
log_to_stdout: bool,
secure_mode: bool = False,
) -> List[XdsTestServer]:
pod_names = self._wait_deployment_pod_count(
self.deployment, replica_count
)

@ -0,0 +1,118 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from framework.infrastructure import k8s
import framework.infrastructure.traffic_director_gamma as td_gamma
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import gamma_server_runner
from framework.test_app.runners.k8s import k8s_xds_client_runner
import framework.xds_k8s_testcase as xds_k8s_testcase
GammaServerRunner = gamma_server_runner.GammaServerRunner
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
XdsTestClient = client_app.XdsTestClient
XdsTestServer = server_app.XdsTestServer
logger = logging.getLogger(__name__)
# TODO(sergiitk): [GAMMA] Move into framework/test_cases
class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
server_runner: GammaServerRunner
mesh_name: str
mesh_name_td: str
def setUp(self):
"""Hook method for setting up the test fixture before exercising it."""
# TODO(sergiitk): [GAMMA] Remove when refactored to be TD-manager-less.
# pylint: disable=bad-super-call
# Skips RegularXdsKubernetesTestCase and IsolatedXdsKubernetesTestCase
# and calls setUp on XdsKubernetesBaseTestCase.
# IsolatedXdsKubernetesTestCase randomizes server_xds_port when it's 0,
# and in GAMMA we always need it unset.
# Calls XdsKubernetesBaseTestCase.setUp():
super(xds_k8s_testcase.IsolatedXdsKubernetesTestCase, self).setUp()
# pylint: enable=bad-super-call
# Random suffix per test.
self.createRandomSuffix()
# TODO(sergiitk): [GAMMA] Make a TD-manager-less base test case
# TD Manager
self.td = self.initTrafficDirectorManager()
# Generate unique mesh name too.
self.mesh_name = f"{self.resource_prefix}-mesh-{self.resource_suffix}"
self.mesh_name_td = f"gketd-{self.mesh_name}"
# The gamma mesh doesn't use the port.
self.server_xds_host = f"{self.server_xds_host}-{self.resource_suffix}"
self.server_xds_port = None
# Test Server runner
self.server_namespace = GammaServerRunner.make_namespace_name(
self.resource_prefix, self.resource_suffix
)
self.server_runner = self.initKubernetesServerRunner()
# Test Client runner
self.client_namespace = KubernetesClientRunner.make_namespace_name(
self.resource_prefix, self.resource_suffix
)
self.client_runner = self.initKubernetesClientRunner()
# Cleanup.
self.force_cleanup = True
self.force_cleanup_namespace = True
# TODO(sergiitk): [GAMMA] Make a TD-manager-less base test case
def initTrafficDirectorManager(
self,
) -> td_gamma.TrafficDirectorGammaManager:
return td_gamma.TrafficDirectorGammaManager(
self.gcp_api_manager,
project=self.project,
resource_prefix=self.resource_prefix,
resource_suffix=self.resource_suffix,
network=self.network,
compute_api_version=self.compute_api_version,
)
def initKubernetesServerRunner(self) -> GammaServerRunner:
return GammaServerRunner(
k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace
),
mesh_name=self.mesh_name,
server_xds_host=self.server_xds_host,
deployment_name=self.server_name,
image_name=self.server_image,
td_bootstrap_image=self.td_bootstrap_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
xds_server_uri=self.xds_server_uri,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
enable_workload_identity=self.enable_workload_identity,
)
def startTestClient(
self, test_server: XdsTestServer, **kwargs
) -> XdsTestClient:
return super().startTestClient(
test_server, config_mesh=self.mesh_name_td
)

@ -130,7 +130,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
server_namespace: str
server_runner: KubernetesServerRunner
server_xds_host: str
server_xds_port: int
server_xds_port: Optional[int]
td: TrafficDirectorManager
td_bootstrap_image: str
_prev_sigint_handler: Optional[_SignalHandler] = None
@ -193,6 +193,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
# Test suite settings
cls.force_cleanup = xds_flags.FORCE_CLEANUP.value
cls.force_cleanup_namespace = xds_flags.FORCE_CLEANUP.value
cls.debug_use_port_forwarding = (
xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value
)
@ -612,13 +613,8 @@ class IsolatedXdsKubernetesTestCase(
"""Hook method for setting up the test fixture before exercising it."""
super().setUp()
if self.resource_suffix_randomize:
self.resource_suffix = helpers_rand.random_resource_suffix()
logger.info(
"Test run resource prefix: %s, suffix: %s",
self.resource_prefix,
self.resource_suffix,
)
# Random suffix per test.
self.createRandomSuffix()
# TD Manager
self.td = self.initTrafficDirectorManager()
@ -651,6 +647,15 @@ class IsolatedXdsKubernetesTestCase(
self.server_xds_port = self.td.find_unused_forwarding_rule_port()
logger.info("Found unused xds port: %s", self.server_xds_port)
def createRandomSuffix(self):
if self.resource_suffix_randomize:
self.resource_suffix = helpers_rand.random_resource_suffix()
logger.info(
"Test run resource prefix: %s, suffix: %s",
self.resource_prefix,
self.resource_suffix,
)
@abc.abstractmethod
def initTrafficDirectorManager(self) -> TrafficDirectorManager:
raise NotImplementedError

@ -0,0 +1,22 @@
---
kind: GRPCRoute
apiVersion: gateway.networking.k8s.io/v1alpha2
metadata:
name: ${route_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
parentRefs:
- name: ${mesh_name}
namespace: ${namespace_name}
group: net.gke.io
kind: TDMesh
hostnames:
- ${xds_server_uri}
rules:
- backendRefs:
- name: ${service_name}
port: ${test_port}
namespace: ${namespace_name}
...

@ -0,0 +1,17 @@
---
apiVersion: v1
kind: Service
metadata:
name: ${service_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
type: ClusterIP
selector:
app: ${deployment_name}
ports:
- port: ${test_port}
protocol: TCP
targetPort: ${test_port}
...

@ -0,0 +1,19 @@
---
kind: TDMesh
apiVersion: net.gke.io/v1alpha1
metadata:
name: ${mesh_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
gatewayClassName: gke-td
allowedRoutes:
namespaces:
from: All
kinds:
- group: net.gke.io
# This is intentionally incorrect and should be set to GRPCRoute.
# TODO(sergiitk): [GAMMA] Change when the fix is ready.
kind: TDGRPCRoute
...

@ -0,0 +1,13 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,44 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from absl import flags
from absl.testing import absltest
from framework import xds_gamma_testcase
from framework import xds_k8s_testcase
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class GammaBaselineTest(xds_gamma_testcase.GammaXdsKubernetesTestCase):
def test_ping_pong(self):
# TODO(sergiitk): [GAMMA] Consider moving out custom gamma
# resource creation out of self.startTestServers()
with self.subTest("1_run_test_server"):
test_server: _XdsTestServer = self.startTestServers()[0]
with self.subTest("2_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(test_server)
with self.subTest("3_test_server_received_rpcs_from_test_client"):
self.assertSuccessfulRpcs(test_client)
if __name__ == "__main__":
absltest.main()
Loading…
Cancel
Save