[Interop] Tests for SSA and GAMMA (#34387)

This is just an initial scope of tests. Much of this code was written by
@ginayeh . I just did the final polish/integration step.

There are 3 main tests included:

1. The GAMMA baseline test, including the [actual GAMMA
API](https://gateway-api.sigs.k8s.io/geps/gep-1426/) rather than vendor
extensions.
2. Kubernetes-based stateful session affinity tests, where the mesh
(including SSA configuration) is configured using CRDs
3. GCP-based stateful session affinity tests, where the mesh is
configured using the networkservices APIs directly

Tests 1 and 2 will run in both prod and GKE staging, i.e.
`container.googleapis.com` and
`staging-container.sandbox.googleapis.com`. The latter of these will act
as an early detection mechanism for regressions in the controller that
translates Gateway resources into networkservices resources.

Test 3 will run against `staging-networkservices.sandbox.googleapis.com`
to act as an early detection mechanism for regressions in the control
plane SSA implementation.

The scope of the SSA tests is still fairly minimal. Session drain
testing is in-progress but not included in this PR, though several
elements required for it are (grace period, pre-stop hook, and the
ability to kill a single pod in a deployment).

---------

Co-authored-by: Jung-Yu (Gina) Yeh <ginayeh@google.com>
Co-authored-by: Sergii Tkachenko <sergiitk@google.com>
pull/32941/merge
Richard Belleville 1 year ago committed by GitHub
parent e57b32588b
commit 62521a889f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      tools/run_tests/xds_k8s_test_driver/README.md
  2. 30
      tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py
  3. 154
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py
  4. 215
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  5. 23
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  6. 5
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  7. 8
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  8. 106
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py
  9. 195
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
  10. 2
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
  11. 90
      tools/run_tests/xds_k8s_test_driver/framework/test_cases/session_affinity_util.py
  12. 34
      tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py
  13. 24
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  14. 3
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml
  15. 17
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/backend_policy.yaml
  16. 10
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/frontend_service.yaml
  17. 23
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http.yaml
  18. 29
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_ssafilter.yaml
  19. 10
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_filter.yaml
  20. 15
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_route.yaml
  21. 15
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_service.yaml
  22. 19
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml
  23. 9
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml
  24. 116
      tools/run_tests/xds_k8s_test_driver/tests/app_net_ssa_test.py
  25. 170
      tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_test.py

@ -38,7 +38,7 @@ sudo apt-get install python3-venv
##### Getting Started ##### Getting Started
1. If you haven't, [initialize](https://cloud.google.com/sdk/docs/install-sdk) gcloud SDK 1. If you haven't, [initialize](https://cloud.google.com/sdk/docs/install-sdk) gcloud SDK
2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project 2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project
3. Enable gcloud services: 3. Enable gcloud services:
```shell ```shell
gcloud services enable \ gcloud services enable \
@ -54,7 +54,6 @@ sudo apt-get install python3-venv
#### Configure GKE cluster #### Configure GKE cluster
This is an example outlining minimal requirements to run the [baseline tests](xds-baseline-tests). This is an example outlining minimal requirements to run the [baseline tests](xds-baseline-tests).
Update gloud sdk: Update gloud sdk:
```shell ```shell
gcloud -q components update gcloud -q components update
@ -91,7 +90,6 @@ gcloud container clusters create "${CLUSTER_NAME}" \
--workload-metadata=GKE_METADATA \ --workload-metadata=GKE_METADATA \
--tags=allow-health-checks --tags=allow-health-checks
``` ```
For security tests you also need to create CAs and configure the cluster to use those CAs For security tests you also need to create CAs and configure the cluster to use those CAs
as described as described
[here](https://cloud.google.com/traffic-director/docs/security-proxyless-setup#configure-cas). [here](https://cloud.google.com/traffic-director/docs/security-proxyless-setup#configure-cas).

@ -23,6 +23,8 @@ from framework.rpc import grpc_testing
# Type aliases # Type aliases
RpcsByPeer: Dict[str, int] RpcsByPeer: Dict[str, int]
RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata
MetadataByPeer: list[str, RpcMetadata]
@functools.cache # pylint: disable=no-member @functools.cache # pylint: disable=no-member
@ -131,6 +133,8 @@ class PrettyLoadBalancerStats:
# } # }
rpcs_by_method: Dict[str, "RpcsByPeer"] rpcs_by_method: Dict[str, "RpcsByPeer"]
metadatas_by_peer: Dict[str, "MetadataByPeer"]
@staticmethod @staticmethod
def _parse_rpcs_by_peer( def _parse_rpcs_by_peer(
rpcs_by_peer: grpc_testing.RpcsByPeer, rpcs_by_peer: grpc_testing.RpcsByPeer,
@ -140,6 +144,21 @@ class PrettyLoadBalancerStats:
result[peer] = count result[peer] = count
return result return result
@staticmethod
def _parse_metadatas_by_peer(
metadatas_by_peer: grpc_testing.LoadBalancerStatsResponse.MetadataByPeer,
) -> "MetadataByPeer":
result = dict()
for peer, metadatas in metadatas_by_peer.items():
pretty_metadata = ""
for rpc_metadatas in metadatas.rpc_metadata:
for metadata in rpc_metadatas.metadata:
pretty_metadata += (
metadata.key + ": " + metadata.value + ", "
)
result[peer] = pretty_metadata
return result
@classmethod @classmethod
def from_response( def from_response(
cls, lb_stats: grpc_testing.LoadBalancerStatsResponse cls, lb_stats: grpc_testing.LoadBalancerStatsResponse
@ -154,6 +173,9 @@ class PrettyLoadBalancerStats:
num_failures=lb_stats.num_failures, num_failures=lb_stats.num_failures,
rpcs_by_peer=cls._parse_rpcs_by_peer(lb_stats.rpcs_by_peer), rpcs_by_peer=cls._parse_rpcs_by_peer(lb_stats.rpcs_by_peer),
rpcs_by_method=rpcs_by_method, rpcs_by_method=rpcs_by_method,
metadatas_by_peer=cls._parse_metadatas_by_peer(
lb_stats.metadatas_by_peer
),
) )
@ -173,4 +195,10 @@ def lb_stats_pretty(lb: grpc_testing.LoadBalancerStatsResponse) -> str:
psm-grpc-server-b: 42 psm-grpc-server-b: 42
""" """
pretty_lb_stats = PrettyLoadBalancerStats.from_response(lb) pretty_lb_stats = PrettyLoadBalancerStats.from_response(lb)
return yaml.dump(dataclasses.asdict(pretty_lb_stats), sort_keys=False) stats_as_dict = dataclasses.asdict(pretty_lb_stats)
# Don't print metadatas_by_peer unless it has data
if not stats_as_dict["metadatas_by_peer"]:
stats_as_dict.pop("metadatas_by_peer")
return yaml.dump(stats_as_dict, sort_keys=False)

@ -184,6 +184,139 @@ class GrpcRoute:
) )
@dataclasses.dataclass(frozen=True)
class HttpRoute:
@dataclasses.dataclass(frozen=True)
class MethodMatch:
type: Optional[str]
http_service: Optional[str]
http_method: Optional[str]
case_sensitive: Optional[bool]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.MethodMatch":
return cls(
type=d.get("type"),
http_service=d.get("httpService"),
http_method=d.get("httpMethod"),
case_sensitive=d.get("caseSensitive"),
)
@dataclasses.dataclass(frozen=True)
class HeaderMatch:
type: Optional[str]
key: str
value: str
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.HeaderMatch":
return cls(
type=d.get("type"),
key=d["key"],
value=d["value"],
)
@dataclasses.dataclass(frozen=True)
class RouteMatch:
method: Optional["HttpRoute.MethodMatch"]
headers: Tuple["HttpRoute.HeaderMatch"]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.RouteMatch":
return cls(
method=HttpRoute.MethodMatch.from_response(d["method"])
if "method" in d
else None,
headers=tuple(
HttpRoute.HeaderMatch.from_response(h) for h in d["headers"]
)
if "headers" in d
else (),
)
@dataclasses.dataclass(frozen=True)
class Destination:
service_name: str
weight: Optional[int]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.Destination":
return cls(
service_name=d["serviceName"],
weight=d.get("weight"),
)
@dataclasses.dataclass(frozen=True)
class RouteAction:
destinations: List["HttpRoute.Destination"]
stateful_session_affinity: Optional["HttpRoute.StatefulSessionAffinity"]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.RouteAction":
destinations = (
[
HttpRoute.Destination.from_response(dest)
for dest in d["destinations"]
]
if "destinations" in d
else []
)
stateful_session_affinity = (
HttpRoute.StatefulSessionAffinity.from_response(
d["statefulSessionAffinity"]
)
if "statefulSessionAffinity" in d
else None
)
return cls(
destinations=destinations,
stateful_session_affinity=stateful_session_affinity,
)
@dataclasses.dataclass(frozen=True)
class StatefulSessionAffinity:
cookie_ttl: Optional[str]
@classmethod
def from_response(
cls, d: Dict[str, Any]
) -> "HttpRoute.StatefulSessionAffinity":
return cls(cookie_ttl=d.get("cookieTtl"))
@dataclasses.dataclass(frozen=True)
class RouteRule:
matches: List["HttpRoute.RouteMatch"]
action: "HttpRoute.RouteAction"
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.RouteRule":
matches = (
[HttpRoute.RouteMatch.from_response(m) for m in d["matches"]]
if "matches" in d
else []
)
return cls(
matches=matches,
action=HttpRoute.RouteAction.from_response(d["action"]),
)
name: str
url: str
hostnames: Tuple[str]
rules: Tuple["HttpRoute.RouteRule"]
meshes: Optional[Tuple[str]]
@classmethod
def from_response(cls, name: str, d: Dict[str, Any]) -> "HttpRoute":
return cls(
name=name,
url=d["name"],
hostnames=tuple(d["hostnames"]),
rules=tuple(d["rules"]),
meshes=None if d.get("meshes") is None else tuple(d["meshes"]),
)
class _NetworkServicesBase( class _NetworkServicesBase(
gcp.api.GcpStandardCloudApiResource, metaclass=abc.ABCMeta gcp.api.GcpStandardCloudApiResource, metaclass=abc.ABCMeta
): ):
@ -261,6 +394,7 @@ class NetworkServicesV1Alpha1(NetworkServicesV1Beta1):
v1alpha1 class can always override and reimplement incompatible methods. v1alpha1 class can always override and reimplement incompatible methods.
""" """
HTTP_ROUTES = "httpRoutes"
GRPC_ROUTES = "grpcRoutes" GRPC_ROUTES = "grpcRoutes"
MESHES = "meshes" MESHES = "meshes"
@ -293,6 +427,13 @@ class NetworkServicesV1Alpha1(NetworkServicesV1Beta1):
grpcRouteId=name, grpcRouteId=name,
) )
def create_http_route(self, name: str, body: dict) -> GcpResource:
return self._create_resource(
collection=self._api_locations.httpRoutes(),
body=body,
httpRouteId=name,
)
def get_grpc_route(self, name: str) -> GrpcRoute: def get_grpc_route(self, name: str) -> GrpcRoute:
full_name = self.resource_full_name(name, self.GRPC_ROUTES) full_name = self.resource_full_name(name, self.GRPC_ROUTES)
result = self._get_resource( result = self._get_resource(
@ -300,8 +441,21 @@ class NetworkServicesV1Alpha1(NetworkServicesV1Beta1):
) )
return GrpcRoute.from_response(name, result) return GrpcRoute.from_response(name, result)
def get_http_route(self, name: str) -> GrpcRoute:
full_name = self.resource_full_name(name, self.HTTP_ROUTES)
result = self._get_resource(
collection=self._api_locations.httpRoutes(), full_name=full_name
)
return HttpRoute.from_response(name, result)
def delete_grpc_route(self, name: str) -> bool: def delete_grpc_route(self, name: str) -> bool:
return self._delete_resource( return self._delete_resource(
collection=self._api_locations.grpcRoutes(), collection=self._api_locations.grpcRoutes(),
full_name=self.resource_full_name(name, self.GRPC_ROUTES), full_name=self.resource_full_name(name, self.GRPC_ROUTES),
) )
def delete_http_route(self, name: str) -> bool:
return self._delete_resource(
collection=self._api_locations.httpRoutes(),
full_name=self.resource_full_name(name, self.HTTP_ROUTES),
)

@ -54,6 +54,10 @@ V1Namespace = client.V1Namespace
DynResourceInstance = dynamic_res.ResourceInstance DynResourceInstance = dynamic_res.ResourceInstance
GammaMesh = DynResourceInstance GammaMesh = DynResourceInstance
GammaGrpcRoute = DynResourceInstance GammaGrpcRoute = DynResourceInstance
GammaHttpRoute = DynResourceInstance
GcpSessionAffinityPolicy = DynResourceInstance
GcpSessionAffinityFilter = DynResourceInstance
GcpBackendPolicy = DynResourceInstance
_timedelta = datetime.timedelta _timedelta = datetime.timedelta
_ApiException = client.ApiException _ApiException = client.ApiException
@ -138,10 +142,12 @@ class KubernetesApiManager:
return self._dynamic_client return self._dynamic_client
@functools.cache # pylint: disable=no-member @functools.cache # pylint: disable=no-member
def gke_tdmesh(self, version: str) -> dynamic_res.Resource: def grpc_route(self, version: str) -> dynamic_res.Resource:
api_name = "net.gke.io" api_name = "gateway.networking.k8s.io"
kind = "TDMesh" kind = "GRPCRoute"
supported_versions = {"v1alpha1"} supported_versions = {
"v1alpha2",
}
if version not in supported_versions: if version not in supported_versions:
raise NotImplementedError( raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented." f"{kind} {api_name}/{version} not implemented."
@ -150,10 +156,46 @@ class KubernetesApiManager:
return self._load_dynamic_api(api_name, version, kind) return self._load_dynamic_api(api_name, version, kind)
@functools.cache # pylint: disable=no-member @functools.cache # pylint: disable=no-member
def grpc_route(self, version: str) -> dynamic_res.Resource: def http_route(self, version: str) -> dynamic_res.Resource:
api_name = "gateway.networking.k8s.io" api_name = "gateway.networking.k8s.io"
kind = "GRPCRoute" kind = "HTTPRoute"
supported_versions = {"v1alpha2"} supported_versions = {"v1alpha2", "v1beta1"}
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)
return self._load_dynamic_api(api_name, version, kind)
@functools.cache # pylint: disable=no-member
def gcp_session_affinity_filter(self, version: str) -> dynamic_res.Resource:
api_name = "networking.gke.io"
kind = "GCPSessionAffinityFilter"
supported_versions = ("v1",)
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)
return self._load_dynamic_api(api_name, version, kind)
@functools.cache # pylint: disable=no-member
def gcp_session_affinity_policy(self, version: str) -> dynamic_res.Resource:
api_name = "networking.gke.io"
kind = "GCPSessionAffinityPolicy"
supported_versions = ("v1",)
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)
return self._load_dynamic_api(api_name, version, kind)
@functools.cache # pylint: disable=no-member
def gcp_backend_policy(self, version: str) -> dynamic_res.Resource:
api_name = "networking.gke.io"
kind = "GCPBackendPolicy"
supported_versions = ("v1",)
if version not in supported_versions: if version not in supported_versions:
raise NotImplementedError( raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented." f"{kind} {api_name}/{version} not implemented."
@ -243,6 +285,34 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
"GRPCRoute", "GRPCRoute",
) )
@functools.cached_property # pylint: disable=no-member
def api_http_route(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"gateway.networking.k8s.io/v1beta1",
"HTTPRoute",
)
@functools.cached_property # pylint: disable=no-member
def api_session_affinity_filter(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPSessionAffinityFilter",
)
@functools.cached_property # pylint: disable=no-member
def api_session_affinity_policy(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPSessionAffinityPolicy",
)
@functools.cached_property # pylint: disable=no-member
def api_backend_policy(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPBackendPolicy",
)
def _refresh_auth(self): def _refresh_auth(self):
logger.info("Reloading k8s api client to refresh the auth.") logger.info("Reloading k8s api client to refresh the auth.")
self._api.reload() self._api.reload()
@ -269,15 +339,22 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
# TODO(sergiitk): [GAMMA] Needs to be improved. This all is very clunky # TODO(sergiitk): [GAMMA] Needs to be improved. This all is very clunky
# when considered together with _get_dynamic_api and api_gke_mesh, # when considered together with _get_dynamic_api and api_gke_mesh,
# api_grpc_route. # api_grpc_route.
if group == "net.gke.io": if group == "networking.gke.io":
if kind == "TDMesh": if kind == "GCPSessionAffinityFilter":
return self._api.gke_tdmesh(version) return self._api.gcp_session_affinity_filter(version)
elif kind == "GCPSessionAffinityPolicy":
return self._api.gcp_session_affinity_policy(version)
elif kind == "GCPBackendPolicy":
return self._api.gcp_backend_policy(version)
elif group == "gateway.networking.k8s.io": elif group == "gateway.networking.k8s.io":
if kind == "GRPCRoute": if kind == "GRPCRoute":
return self._api.grpc_route(version) return self._api.grpc_route(version)
elif kind == "HTTPRoute":
return self._api.http_route(version)
raise NotImplementedError(f"{kind} {api_version} not implemented.") raise NotImplementedError(
f"{kind} ({group}) {api_version} not implemented."
)
def _get_resource(self, method: Callable[[Any], object], *args, **kwargs): def _get_resource(self, method: Callable[[Any], object], *args, **kwargs):
try: try:
@ -445,8 +522,21 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def get_gamma_mesh(self, name) -> Optional[GammaMesh]: def get_gamma_mesh(self, name) -> Optional[GammaMesh]:
return self._get_dyn_resource(self.api_gke_mesh, name) return self._get_dyn_resource(self.api_gke_mesh, name)
def get_gamma_route(self, name) -> Optional[GammaGrpcRoute]: def get_gamma_route(self, name) -> Optional[GammaHttpRoute]:
return self._get_dyn_resource(self.api_grpc_route, name) return self._get_dyn_resource(self.api_http_route, name)
def get_session_affinity_policy(
self, name
) -> Optional[GcpSessionAffinityPolicy]:
return self._get_dyn_resource(self.api_session_affinity_policy, name)
def get_session_affinity_filter(
self, name
) -> Optional[GcpSessionAffinityFilter]:
return self._get_dyn_resource(self.api_session_affinity_filter, name)
def get_backend_policy(self, name) -> Optional[GcpBackendPolicy]:
return self._get_dyn_resource(self.api_backend_policy, name)
def get_service_account(self, name) -> V1Service: def get_service_account(self, name) -> V1Service:
return self._get_resource( return self._get_resource(
@ -502,13 +592,69 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
# TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList # TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList
# to avoid no-member issues due to dynamic_res.Resource proxying calls? # to avoid no-member issues due to dynamic_res.Resource proxying calls?
self._execute( self._execute(
self.api_grpc_route.delete, # pylint: disable=no-member self.api_http_route.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)
def delete_session_affinity_policy(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_session_affinity_policy.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)
def delete_session_affinity_filter(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_session_affinity_filter.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)
def delete_backend_policy(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_backend_policy.delete, # pylint: disable=no-member
name=name, name=name,
namespace=self.name, namespace=self.name,
propagation_policy="Foreground", propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds, grace_period_seconds=grace_period_seconds,
) )
def delete_pod_async(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
# TODO(sergiitk): Do we need async? Won't it break error handling?
self._execute(
self._api.core.delete_namespaced_pod,
name=name,
namespace=self.name,
body=client.V1DeleteOptions(
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
),
async_req=True,
)
def get(self) -> V1Namespace: def get(self) -> V1Namespace:
return self._get_resource(self._api.core.read_namespace, self.name) return self._get_resource(self._api.core.read_namespace, self.name)
@ -561,6 +707,45 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
) )
retryer(self.get_gamma_route, name) retryer(self.get_gamma_route, name)
def wait_for_get_session_affinity_policy_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda affinity_policy: affinity_policy is None,
)
retryer(self.get_session_affinity_policy, name)
def wait_for_get_session_affinity_filter_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda affinity_filter: affinity_filter is None,
)
retryer(self.get_session_affinity_filter, name)
def wait_for_get_backend_policy_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda backend_policy: backend_policy is None,
)
retryer(self.get_backend_policy, name)
def wait_for_service_account_deleted( def wait_for_service_account_deleted(
self, self,
name: str, name: str,

@ -42,6 +42,7 @@ _NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1
_NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1 _NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1
EndpointPolicy = gcp.network_services.EndpointPolicy EndpointPolicy = gcp.network_services.EndpointPolicy
GrpcRoute = gcp.network_services.GrpcRoute GrpcRoute = gcp.network_services.GrpcRoute
HttpRoute = gcp.network_services.HttpRoute
Mesh = gcp.network_services.Mesh Mesh = gcp.network_services.Mesh
# Testing metadata consts # Testing metadata consts
@ -741,6 +742,7 @@ class TrafficDirectorManager: # pylint: disable=too-many-public-methods
class TrafficDirectorAppNetManager(TrafficDirectorManager): class TrafficDirectorAppNetManager(TrafficDirectorManager):
GRPC_ROUTE_NAME = "grpc-route" GRPC_ROUTE_NAME = "grpc-route"
HTTP_ROUTE_NAME = "http-route"
MESH_NAME = "mesh" MESH_NAME = "mesh"
netsvc: _NetworkServicesV1Alpha1 netsvc: _NetworkServicesV1Alpha1
@ -770,6 +772,7 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager):
# Managed resources # Managed resources
# TODO(gnossen) PTAL at the pylint error # TODO(gnossen) PTAL at the pylint error
self.grpc_route: Optional[GrpcRoute] = None self.grpc_route: Optional[GrpcRoute] = None
self.http_route: Optional[HttpRoute] = None
self.mesh: Optional[Mesh] = None self.mesh: Optional[Mesh] = None
def create_mesh(self) -> GcpResource: def create_mesh(self) -> GcpResource:
@ -819,6 +822,14 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager):
logger.debug("Loaded GrpcRoute: %s", self.grpc_route) logger.debug("Loaded GrpcRoute: %s", self.grpc_route)
return resource return resource
def create_http_route_with_content(self, body: Any) -> GcpResource:
name = self.make_resource_name(self.HTTP_ROUTE_NAME)
logger.info("Creating HttpRoute %s", name)
resource = self.netsvc.create_http_route(name, body)
self.http_route = self.netsvc.get_http_route(name)
logger.debug("Loaded HttpRoute: %s", self.http_route)
return resource
def delete_grpc_route(self, force=False): def delete_grpc_route(self, force=False):
if force: if force:
name = self.make_resource_name(self.GRPC_ROUTE_NAME) name = self.make_resource_name(self.GRPC_ROUTE_NAME)
@ -830,7 +841,19 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager):
self.netsvc.delete_grpc_route(name) self.netsvc.delete_grpc_route(name)
self.grpc_route = None self.grpc_route = None
def delete_http_route(self, force=False):
if force:
name = self.make_resource_name(self.HTTP_ROUTE_NAME)
elif self.http_route:
name = self.http_route.name
else:
return
logger.info("Deleting HttpRoute %s", name)
self.netsvc.delete_http_route(name)
self.http_route = None
def cleanup(self, *, force=False): def cleanup(self, *, force=False):
self.delete_http_route(force=force)
self.delete_grpc_route(force=force) self.delete_grpc_route(force=force)
self.delete_mesh(force=force) self.delete_mesh(force=force)
super().cleanup(force=force) super().cleanup(force=force)

@ -59,6 +59,7 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
*, *,
num_rpcs: int, num_rpcs: int,
timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC, timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
metadata_keys: Optional[tuple[str, ...]] = None,
) -> LoadBalancerStatsResponse: ) -> LoadBalancerStatsResponse:
if timeout_sec is None: if timeout_sec is None:
timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC
@ -66,7 +67,9 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
return self.call_unary_with_deadline( return self.call_unary_with_deadline(
rpc="GetClientStats", rpc="GetClientStats",
req=_LoadBalancerStatsRequest( req=_LoadBalancerStatsRequest(
num_rpcs=num_rpcs, timeout_sec=timeout_sec num_rpcs=num_rpcs,
timeout_sec=timeout_sec,
metadata_keys=metadata_keys or None,
), ),
deadline_sec=timeout_sec, deadline_sec=timeout_sec,
log_level=logging.INFO, log_level=logging.INFO,

@ -41,6 +41,9 @@ _ChannelzSubchannel = grpc_channelz.Subchannel
_ChannelzSocket = grpc_channelz.Socket _ChannelzSocket = grpc_channelz.Socket
_CsdsClient = grpc_csds.CsdsClient _CsdsClient = grpc_csds.CsdsClient
# Use in get_load_balancer_stats request to request all metadata.
REQ_LB_STATS_METADATA_ALL = ("*",)
class XdsTestClient(framework.rpc.grpc.GrpcApp): class XdsTestClient(framework.rpc.grpc.GrpcApp):
""" """
@ -104,13 +107,16 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
self, self,
*, *,
num_rpcs: int, num_rpcs: int,
metadata_keys: Optional[tuple[str, ...]] = None,
timeout_sec: Optional[int] = None, timeout_sec: Optional[int] = None,
) -> grpc_testing.LoadBalancerStatsResponse: ) -> grpc_testing.LoadBalancerStatsResponse:
""" """
Shortcut to LoadBalancerStatsServiceClient.get_client_stats() Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
""" """
return self.load_balancer_stats.get_client_stats( return self.load_balancer_stats.get_client_stats(
num_rpcs=num_rpcs, timeout_sec=timeout_sec num_rpcs=num_rpcs,
timeout_sec=timeout_sec,
metadata_keys=metadata_keys,
) )
def get_load_balancer_accumulated_stats( def get_load_balancer_accumulated_stats(

@ -30,20 +30,22 @@ KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
class GammaServerRunner(KubernetesServerRunner): class GammaServerRunner(KubernetesServerRunner):
# Mutable state. # Mutable state.
mesh: Optional[k8s.GammaMesh] = None route: Optional[k8s.GammaHttpRoute] = None
route: Optional[k8s.GammaGrpcRoute] = None frontend_service: Optional[k8s.V1Service] = None
sa_filter: Optional[k8s.GcpSessionAffinityFilter] = None
sa_policy: Optional[k8s.GcpSessionAffinityPolicy] = None
be_policy: Optional[k8s.GcpBackendPolicy] = None
termination_grace_period_seconds: Optional[int] = None
pre_stop_hook: bool = False
# Mesh
server_xds_host: str
mesh_name: str
route_name: str route_name: str
frontend_service_name: str
def __init__( def __init__(
self, self,
k8s_namespace: k8s.KubernetesNamespace, k8s_namespace: k8s.KubernetesNamespace,
frontend_service_name: str,
*, *,
mesh_name: str,
server_xds_host: str,
deployment_name: str, deployment_name: str,
image_name: str, image_name: str,
td_bootstrap_image: str, td_bootstrap_image: str,
@ -64,6 +66,11 @@ class GammaServerRunner(KubernetesServerRunner):
namespace_template: Optional[str] = None, namespace_template: Optional[str] = None,
debug_use_port_forwarding: bool = False, debug_use_port_forwarding: bool = False,
enable_workload_identity: bool = True, enable_workload_identity: bool = True,
safilter_name: str = "ssa-filter",
sapolicy_name: str = "ssa-policy",
bepolicy_name: str = "backend-policy",
termination_grace_period_seconds: Optional[int] = None,
pre_stop_hook: bool = False,
): ):
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
super().__init__( super().__init__(
@ -89,11 +96,15 @@ class GammaServerRunner(KubernetesServerRunner):
enable_workload_identity=enable_workload_identity, enable_workload_identity=enable_workload_identity,
) )
self.server_xds_host = server_xds_host self.frontend_service_name = frontend_service_name
self.mesh_name = mesh_name
self.route_name = route_name or f"route-{deployment_name}" self.route_name = route_name or f"route-{deployment_name}"
self.safilter_name = safilter_name
self.sapolicy_name = sapolicy_name
self.bepolicy_name = bepolicy_name
self.termination_grace_period_seconds = termination_grace_period_seconds
self.pre_stop_hook = pre_stop_hook
def run( def run( # pylint: disable=arguments-differ
self, self,
*, *,
test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT,
@ -102,6 +113,7 @@ class GammaServerRunner(KubernetesServerRunner):
replica_count: int = 1, replica_count: int = 1,
log_to_stdout: bool = False, log_to_stdout: bool = False,
bootstrap_version: Optional[str] = None, bootstrap_version: Optional[str] = None,
route_template: str = "gamma/route_http.yaml",
) -> List[XdsTestServer]: ) -> List[XdsTestServer]:
if not maintenance_port: if not maintenance_port:
maintenance_port = self._get_default_maintenance_port(secure_mode) maintenance_port = self._get_default_maintenance_port(secure_mode)
@ -128,14 +140,6 @@ class GammaServerRunner(KubernetesServerRunner):
self.namespace_template, namespace_name=self.k8s_namespace.name self.namespace_template, namespace_name=self.k8s_namespace.name
) )
# Create gamma mesh.
# Note: this will be pre-provisioned per cluster.
self.mesh = self._create_gamma_mesh(
"gamma/tdmesh.yaml",
mesh_name=self.mesh_name,
namespace_name=self.k8s_namespace.name,
)
# Reuse existing if requested, create a new deployment when missing. # Reuse existing if requested, create a new deployment when missing.
# Useful for debugging to avoid NEG loosing relation to deleted service. # Useful for debugging to avoid NEG loosing relation to deleted service.
if self.reuse_service: if self.reuse_service:
@ -150,20 +154,23 @@ class GammaServerRunner(KubernetesServerRunner):
test_port=test_port, test_port=test_port,
) )
# Create the parentref service
self.frontend_service = self._create_service(
"gamma/frontend_service.yaml",
service_name=self.frontend_service_name,
namespace_name=self.k8s_namespace.name,
)
# Create the route. # Create the route.
self.route = self._create_gamma_route( self.route = self._create_gamma_route(
"gamma/route_grpc.yaml", route_template,
xds_server_uri=self.server_xds_host,
route_name=self.route_name, route_name=self.route_name,
mesh_name=self.mesh_name,
service_name=self.service_name, service_name=self.service_name,
namespace_name=self.k8s_namespace.name, namespace_name=self.k8s_namespace.name,
test_port=test_port, test_port=test_port,
frontend_service_name=self.frontend_service_name,
) )
# Surprised this just works.
self._wait_service_neg(self.service_name, test_port)
if self.enable_workload_identity: if self.enable_workload_identity:
# Allow Kubernetes service account to use the GCP service account # Allow Kubernetes service account to use the GCP service account
# identity. # identity.
@ -196,9 +203,11 @@ class GammaServerRunner(KubernetesServerRunner):
maintenance_port=maintenance_port, maintenance_port=maintenance_port,
secure_mode=secure_mode, secure_mode=secure_mode,
bootstrap_version=bootstrap_version, bootstrap_version=bootstrap_version,
termination_grace_period_seconds=self.termination_grace_period_seconds,
pre_stop_hook=self.pre_stop_hook,
) )
return self._make_servers_for_deployment( servers = self._make_servers_for_deployment(
replica_count, replica_count,
test_port=test_port, test_port=test_port,
maintenance_port=maintenance_port, maintenance_port=maintenance_port,
@ -206,6 +215,36 @@ class GammaServerRunner(KubernetesServerRunner):
secure_mode=secure_mode, secure_mode=secure_mode,
) )
# The controller will not populate the NEGs until there are
# endpoint slices.
self._wait_service_neg(self.service_name, test_port)
return servers
def createSessionAffinityPolicy(self, manifest):
self.sa_policy = self._create_session_affinity_policy(
manifest,
session_affinity_policy_name=self.sapolicy_name,
namespace_name=self.k8s_namespace.name,
route_name=self.route_name,
service_name=self.service_name,
)
def createSessionAffinityFilter(self):
self.sa_filter = self._create_session_affinity_filter(
"gamma/session_affinity_filter.yaml",
session_affinity_filter_name=self.safilter_name,
namespace_name=self.k8s_namespace.name,
)
def createBackendPolicy(self):
self.be_policy = self._create_backend_policy(
"gamma/backend_policy.yaml",
be_policy_name=self.bepolicy_name,
namespace_name=self.k8s_namespace.name,
service_name=self.service_name,
)
# pylint: disable=arguments-differ # pylint: disable=arguments-differ
def cleanup(self, *, force=False, force_namespace=False): def cleanup(self, *, force=False, force_namespace=False):
try: try:
@ -213,8 +252,9 @@ class GammaServerRunner(KubernetesServerRunner):
self._delete_gamma_route(self.route_name) self._delete_gamma_route(self.route_name)
self.route = None self.route = None
if self.mesh or force: if self.frontend_service or force:
self._delete_gamma_mesh(self.mesh_name) self._delete_service(self.frontend_service_name)
self.frontend_service = None
if (self.service and not self.reuse_service) or force: if (self.service and not self.reuse_service) or force:
self._delete_service(self.service_name) self._delete_service(self.service_name)
@ -224,6 +264,18 @@ class GammaServerRunner(KubernetesServerRunner):
self._delete_deployment(self.deployment_name) self._delete_deployment(self.deployment_name)
self.deployment = None self.deployment = None
if self.sa_policy or force:
self._delete_session_affinity_policy(self.sapolicy_name)
self.sa_policy = None
if self.sa_filter or force:
self._delete_session_affinity_filter(self.safilter_name)
self.sa_filter = None
if self.be_policy or force:
self._delete_backend_policy(self.bepolicy_name)
self.be_policy = None
if self.enable_workload_identity and ( if self.enable_workload_identity and (
self.service_account or force self.service_account or force
): ):

@ -399,6 +399,14 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
) )
return resource return resource
def delete_pod_async(self, pod_name: str):
logger.info(
"Initiating deletion of pod %s in namespace %s",
pod_name,
self.k8s_namespace.name,
)
self.k8s_namespace.delete_pod_async(pod_name)
def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment: def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment:
# Not making deployment_name an explicit kwarg to be consistent with # Not making deployment_name an explicit kwarg to be consistent with
# the rest of the _create_* methods, which pass kwargs as-is # the rest of the _create_* methods, which pass kwargs as-is
@ -443,75 +451,114 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
) )
return deployment return deployment
def _create_gamma_mesh(self, template, **kwargs) -> k8s.GammaMesh: def _create_gamma_route(self, template, **kwargs) -> k8s.GammaHttpRoute:
mesh = self._create_from_template( route = self._create_from_template(
template, custom_object=True, **kwargs template,
custom_object=True,
**kwargs,
) )
if not (isinstance(mesh, k8s.GammaMesh) and mesh.kind == "TDMesh"): if not (
isinstance(route, k8s.GammaHttpRoute) and route.kind == "HTTPRoute"
):
raise _RunnerError( raise _RunnerError(
f"Expected ResourceInstance[TDMesh] to be created from" f"Expected ResourceInstance[HTTPRoute] to be created from"
f" manifest {template}" f" manifest {template}"
) )
if mesh.metadata.name != kwargs["mesh_name"]: if route.metadata.name != kwargs["route_name"]:
raise _RunnerError( raise _RunnerError(
"ResourceInstance[TDMesh] created with unexpected name: " "ResourceInstance[HTTPRoute] created with unexpected name: "
f"{mesh.metadata.name}" f"{route.metadata.name}"
) )
logger.debug( logger.debug(
"ResourceInstance[TDMesh] %s created at %s", "ResourceInstance[HTTPRoute] %s created at %s",
mesh.metadata.name, route.metadata.name,
mesh.metadata.creation_timestamp, route.metadata.creation_timestamp,
) )
return mesh return route
def _create_gamma_route(self, template, **kwargs) -> k8s.GammaGrpcRoute: def _create_session_affinity_policy(
route = self._create_from_template( self, template, **kwargs
) -> k8s.GcpSessionAffinityPolicy:
saPolicy = self._create_from_template(
template, template,
custom_object=True, custom_object=True,
**kwargs, **kwargs,
) )
if not ( if not (
isinstance(route, k8s.GammaGrpcRoute) and route.kind == "GRPCRoute" isinstance(saPolicy, k8s.GcpSessionAffinityPolicy)
and saPolicy.kind == "GCPSessionAffinityPolicy"
): ):
raise _RunnerError( raise _RunnerError(
f"Expected ResourceInstance[GRPCRoute] to be created from" f"Expected ResourceInstance[GCPSessionAffinityPolicy] to be"
f" manifest {template}" f" created from manifest {template}"
) )
if route.metadata.name != kwargs["route_name"]: if saPolicy.metadata.name != kwargs["session_affinity_policy_name"]:
raise _RunnerError( raise _RunnerError(
"ResourceInstance[GRPCRoute] created with unexpected name: " "ResourceInstance[GCPSessionAffinityPolicy] created with"
f"{route.metadata.name}" f" unexpected name: {saPolicy.metadata.name}"
) )
logger.debug( logger.debug(
"ResourceInstance[GRPCRoute] %s created at %s", "ResourceInstance[GCPSessionAffinityPolicy] %s created at %s",
route.metadata.name, saPolicy.metadata.name,
route.metadata.creation_timestamp, saPolicy.metadata.creation_timestamp,
) )
return route return saPolicy
def _delete_gamma_mesh(self, name, wait_for_deletion=True):
logger.info("Deleting GAMMA mesh %s", name)
try:
self.k8s_namespace.delete_gamma_mesh(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info("GAMMA mesh %s deletion failed: %s", name, e)
return
if wait_for_deletion: def _create_session_affinity_filter(
self.k8s_namespace.wait_for_get_gamma_mesh_deleted(name) self, template, **kwargs
logger.debug("GAMMA mesh %s deleted", name) ) -> k8s.GcpSessionAffinityFilter:
saFilter = self._create_from_template(
def _delete_gamma_route(self, name, wait_for_deletion=True): template,
logger.info("Deleting GRPCRoute %s", name) custom_object=True,
try: **kwargs,
self.k8s_namespace.delete_gamma_route(name) )
except (retryers.RetryError, k8s.NotFound) as e: if not (
logger.info("GRPCRoute %s deletion failed: %s", name, e) isinstance(saFilter, k8s.GcpSessionAffinityFilter)
return and saFilter.kind == "GCPSessionAffinityFilter"
):
raise _RunnerError(
f"Expected ResourceInstance[GCPSessionAffinityFilter] to be"
f" created from manifest {template}"
)
if saFilter.metadata.name != kwargs["session_affinity_filter_name"]:
raise _RunnerError(
"ResourceInstance[GCPSessionAffinityFilter] created with"
f" unexpected name: {saFilter.metadata.name}"
)
logger.debug(
"ResourceInstance[GCPSessionAffinityFilter] %s created at %s",
saFilter.metadata.name,
saFilter.metadata.creation_timestamp,
)
return saFilter
if wait_for_deletion: def _create_backend_policy(
self.k8s_namespace.wait_for_get_gamma_route_deleted(name) self, template, **kwargs
logger.debug("GRPCRoute %s deleted", name) ) -> k8s.GcpBackendPolicy:
be_policy = self._create_from_template(
template,
custom_object=True,
**kwargs,
)
if not (
isinstance(be_policy, k8s.GcpBackendPolicy)
and be_policy.kind == "GCPBackendPolicy"
):
raise _RunnerError(
f"Expected ResourceInstance[GCPBackendPolicy] to be"
f" created from manifest {template}"
)
if be_policy.metadata.name != kwargs["be_policy_name"]:
raise _RunnerError(
"ResourceInstance[GCPBackendPolicy] created with"
f" unexpected name: {be_policy.metadata.name}"
)
logger.debug(
"ResourceInstance[GCPBackendPolicy] %s created at %s",
be_policy.metadata.name,
be_policy.metadata.creation_timestamp,
)
return be_policy
def _create_service(self, template, **kwargs) -> k8s.V1Service: def _create_service(self, template, **kwargs) -> k8s.V1Service:
service = self._create_from_template(template, **kwargs) service = self._create_from_template(template, **kwargs)
@ -531,6 +578,62 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
) )
return service return service
def _delete_gamma_route(self, name, wait_for_deletion=True):
logger.info("Deleting HTTPRoute %s", name)
try:
self.k8s_namespace.delete_gamma_route(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info("HTTPRoute %s deletion failed: %s", name, e)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_gamma_route_deleted(name)
logger.debug("HTTPRoute %s deleted", name)
def _delete_session_affinity_policy(self, name, wait_for_deletion=True):
logger.info("Deleting GCPSessionAffinityPolicy %s", name)
try:
self.k8s_namespace.delete_session_affinity_policy(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info(
"GCPSessionAffinityPolicy %s deletion failed: %s", name, e
)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_session_affinity_policy_deleted(
name
)
logger.debug("GCPSessionAffinityPolicy %s deleted", name)
def _delete_session_affinity_filter(self, name, wait_for_deletion=True):
logger.info("Deleting GCPSessionAffinityFilter %s", name)
try:
self.k8s_namespace.delete_session_affinity_filter(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info(
"GCPSessionAffinityFilter %s deletion failed: %s", name, e
)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_session_affinity_filter_deleted(
name
)
logger.debug("GCPSessionAffinityFilter %s deleted", name)
def _delete_backend_policy(self, name, wait_for_deletion=True):
logger.info("Deleting GCPBackendPolicy %s", name)
try:
self.k8s_namespace.delete_backend_policy(name)
except (retryers.RetryError, k8s.NotFound) as e:
logger.info("GGCPBackendPolicy %s deletion failed: %s", name, e)
return
if wait_for_deletion:
self.k8s_namespace.wait_for_get_backend_policy_deleted(name)
logger.debug("GCPBackendPolicy %s deleted", name)
def _delete_deployment(self, name, wait_for_deletion=True): def _delete_deployment(self, name, wait_for_deletion=True):
logger.info("Deleting deployment %s", name) logger.info("Deleting deployment %s", name)
self.stop_pod_dependencies() self.stop_pod_dependencies()

@ -101,6 +101,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
metadata="", metadata="",
secure_mode=False, secure_mode=False,
config_mesh=None, config_mesh=None,
generate_mesh_id=False,
print_response=False, print_response=False,
log_to_stdout: bool = False, log_to_stdout: bool = False,
) -> XdsTestClient: ) -> XdsTestClient:
@ -155,6 +156,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
metadata=metadata, metadata=metadata,
secure_mode=secure_mode, secure_mode=secure_mode,
config_mesh=config_mesh, config_mesh=config_mesh,
generate_mesh_id=generate_mesh_id,
print_response=print_response, print_response=print_response,
) )

@ -0,0 +1,90 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for stateful session affinity tests.
These utilities must be shared between test environments that configure SSA
via Kubernetes CRDs and environments that configure SSA directly through the
networkservices.googleapis.com API.
"""
import datetime
import logging
from typing import Sequence, Tuple
from framework import xds_k8s_testcase
from framework.helpers import retryers
_XdsKubernetesBaseTestCase = xds_k8s_testcase.XdsKubernetesBaseTestCase
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_SET_COOKIE_MAX_WAIT_SEC = 300
def get_setcookie_headers(
metadatas_by_peer: dict[str, "MetadataByPeer"]
) -> dict[str, str]:
cookies = dict()
for peer, metadatas in metadatas_by_peer.items():
for rpc_metadatas in metadatas.rpc_metadata:
for metadata in rpc_metadatas.metadata:
if metadata.key.lower() == "set-cookie":
cookies[peer] = metadata.value
return cookies
def assert_eventually_retrieve_cookie_and_server(
test: _XdsKubernetesBaseTestCase,
test_client: _XdsTestClient,
servers: Sequence[_XdsTestServer],
) -> Tuple[str, _XdsTestServer]:
"""Retrieves the initial cookie and corresponding server.
Given a test client and set of backends for which SSA is enabled, samples
a single RPC from the test client to the backends, with metadata collection enabled.
The "set-cookie" header is retrieved and its contents are returned along with the
server to which it corresponds.
Since SSA config is supplied as a separate resource from the Route resource,
there will be periods of time where the SSA config may not be applied. This is
therefore an eventually consistent function.
"""
def _assert_retrieve_cookie_and_server():
lb_stats = test.assertSuccessfulRpcs(test_client, 1)
cookies = get_setcookie_headers(lb_stats.metadatas_by_peer)
test.assertLen(cookies, 1)
hostname = next(iter(cookies.keys()))
cookie = cookies[hostname]
chosen_server_candidates = tuple(
srv for srv in servers if srv.hostname == hostname
)
test.assertLen(chosen_server_candidates, 1)
chosen_server = chosen_server_candidates[0]
return cookie, chosen_server
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=10),
timeout=datetime.timedelta(seconds=_SET_COOKIE_MAX_WAIT_SEC),
log_level=logging.INFO,
)
try:
return retryer(_assert_retrieve_cookie_and_server)
except retryers.RetryError as retry_error:
logging.exception(
"Rpcs did not go to expected servers before timeout %s",
_SET_COOKIE_MAX_WAIT_SEC,
)
raise retry_error

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Optional
from framework.infrastructure import k8s from framework.infrastructure import k8s
import framework.infrastructure.traffic_director_gamma as td_gamma import framework.infrastructure.traffic_director_gamma as td_gamma
@ -28,12 +29,16 @@ XdsTestServer = server_app.XdsTestServer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# We never actually hit this timeout under normal circumstances, so this large
# value is acceptable.
_TERMINATION_GRACE_PERIOD_SECONDS = 600
# TODO(sergiitk): [GAMMA] Move into framework/test_cases # TODO(sergiitk): [GAMMA] Move into framework/test_cases
class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase): class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
server_runner: GammaServerRunner server_runner: GammaServerRunner
mesh_name: str frontend_service_name: str
mesh_name_td: str pre_stop_hook: Optional[bool] = None
def setUp(self): def setUp(self):
"""Hook method for setting up the test fixture before exercising it.""" """Hook method for setting up the test fixture before exercising it."""
@ -47,6 +52,9 @@ class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
super(xds_k8s_testcase.IsolatedXdsKubernetesTestCase, self).setUp() super(xds_k8s_testcase.IsolatedXdsKubernetesTestCase, self).setUp()
# pylint: enable=bad-super-call # pylint: enable=bad-super-call
if self.pre_stop_hook is None:
self.pre_stop_hook = False
# Random suffix per test. # Random suffix per test.
self.createRandomSuffix() self.createRandomSuffix()
@ -55,12 +63,9 @@ class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td = self.initTrafficDirectorManager() self.td = self.initTrafficDirectorManager()
# Generate unique mesh name too. # Generate unique mesh name too.
self.mesh_name = f"{self.resource_prefix}-mesh-{self.resource_suffix}" self.frontend_service_name = (
self.mesh_name_td = f"gketd-{self.mesh_name}" f"{self.resource_prefix}-{self.resource_suffix.lower()}"
)
# The gamma mesh doesn't use the port.
self.server_xds_host = f"{self.server_xds_host}-{self.resource_suffix}"
self.server_xds_port = None
# Test Server runner # Test Server runner
self.server_namespace = GammaServerRunner.make_namespace_name( self.server_namespace = GammaServerRunner.make_namespace_name(
@ -96,8 +101,7 @@ class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
k8s.KubernetesNamespace( k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace self.k8s_api_manager, self.server_namespace
), ),
mesh_name=self.mesh_name, self.frontend_service_name,
server_xds_host=self.server_xds_host,
deployment_name=self.server_name, deployment_name=self.server_name,
image_name=self.server_image, image_name=self.server_image,
td_bootstrap_image=self.td_bootstrap_image, td_bootstrap_image=self.td_bootstrap_image,
@ -108,11 +112,17 @@ class GammaXdsKubernetesTestCase(xds_k8s_testcase.RegularXdsKubernetesTestCase):
network=self.network, network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding, debug_use_port_forwarding=self.debug_use_port_forwarding,
enable_workload_identity=self.enable_workload_identity, enable_workload_identity=self.enable_workload_identity,
termination_grace_period_seconds=_TERMINATION_GRACE_PERIOD_SECONDS,
pre_stop_hook=self.pre_stop_hook,
) )
def startTestClient( def startTestClient(
self, test_server: XdsTestServer, **kwargs self, test_server: XdsTestServer, **kwargs
) -> XdsTestClient: ) -> XdsTestClient:
return super().startTestClient( server_target = (
test_server, config_mesh=self.mesh_name_td f"xds:///{self.frontend_service_name}"
f".{self.server_namespace}.svc.cluster.local"
f":{test_server.rpc_port}"
) )
kwargs.setdefault("generate_mesh_id", True)
return self._start_test_client(server_target=server_target, **kwargs)

@ -74,6 +74,8 @@ _LoadBalancerAccumulatedStatsResponse = (
_ChannelState = grpc_channelz.ChannelState _ChannelState = grpc_channelz.ChannelState
_timedelta = datetime.timedelta _timedelta = datetime.timedelta
ClientConfig = grpc_csds.ClientConfig ClientConfig = grpc_csds.ClientConfig
RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata
MetadataByPeer: list[str, RpcMetadata]
# pylint complains about signal.Signals for some reason. # pylint complains about signal.Signals for some reason.
_SignalNum = Union[int, signal.Signals] # pylint: disable=no-member _SignalNum = Union[int, signal.Signals] # pylint: disable=no-member
_SignalHandler = Callable[[_SignalNum, Optional[FrameType]], Any] _SignalHandler = Callable[[_SignalNum, Optional[FrameType]], Any]
@ -312,7 +314,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
def assertSuccessfulRpcs( def assertSuccessfulRpcs(
self, test_client: XdsTestClient, num_rpcs: int = 100 self, test_client: XdsTestClient, num_rpcs: int = 100
): ) -> _LoadBalancerStatsResponse:
lb_stats = self.getClientRpcStats(test_client, num_rpcs) lb_stats = self.getClientRpcStats(test_client, num_rpcs)
self.assertAllBackendsReceivedRpcs(lb_stats) self.assertAllBackendsReceivedRpcs(lb_stats)
failed = int(lb_stats.num_failures) failed = int(lb_stats.num_failures)
@ -321,6 +323,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
0, 0,
msg=f"Expected all RPCs to succeed: {failed} of {num_rpcs} failed", msg=f"Expected all RPCs to succeed: {failed} of {num_rpcs} failed",
) )
return lb_stats
@staticmethod @staticmethod
def diffAccumulatedStatsPerMethod( def diffAccumulatedStatsPerMethod(
@ -578,15 +581,21 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
msg=f"Expected all RPCs to fail: {failed} of {num_rpcs} failed", msg=f"Expected all RPCs to fail: {failed} of {num_rpcs} failed",
) )
@classmethod
def getClientRpcStats( def getClientRpcStats(
cls, test_client: XdsTestClient, num_rpcs: int self,
test_client: XdsTestClient,
num_rpcs: int,
*,
metadata_keys: Optional[tuple[str, ...]] = None,
) -> _LoadBalancerStatsResponse: ) -> _LoadBalancerStatsResponse:
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs) lb_stats = test_client.get_load_balancer_stats(
num_rpcs=num_rpcs,
metadata_keys=metadata_keys,
)
logger.info( logger.info(
"[%s] << Received LoadBalancerStatsResponse:\n%s", "[%s] << Received LoadBalancerStatsResponse:\n%s",
test_client.hostname, test_client.hostname,
cls._pretty_lb_stats(lb_stats), self._pretty_lb_stats(lb_stats),
) )
return lb_stats return lb_stats
@ -805,8 +814,11 @@ class RegularXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase):
def startTestClient( def startTestClient(
self, test_server: XdsTestServer, **kwargs self, test_server: XdsTestServer, **kwargs
) -> XdsTestClient: ) -> XdsTestClient:
return self._start_test_client(test_server.xds_uri, **kwargs)
def _start_test_client(self, server_target: str, **kwargs) -> XdsTestClient:
test_client = self.client_runner.run( test_client = self.client_runner.run(
server_target=test_server.xds_uri, **kwargs server_target=server_target, **kwargs
) )
test_client.wait_for_active_server_channel() test_client.wait_for_active_server_channel()
return test_client return test_client

@ -81,6 +81,9 @@ spec:
% if config_mesh: % if config_mesh:
- "--config-mesh-experimental=${config_mesh}" - "--config-mesh-experimental=${config_mesh}"
% endif % endif
% if generate_mesh_id:
- "--generate-mesh-id-experimental"
% endif
resources: resources:
limits: limits:
cpu: 100m cpu: 100m

@ -0,0 +1,17 @@
---
kind: GCPBackendPolicy
apiVersion: networking.gke.io/v1
metadata:
name: ${be_policy_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
targetRef:
group: ""
kind: Service
name: ${service_name}
default:
connectionDraining:
drainingTimeoutSec: 600
...

@ -0,0 +1,10 @@
---
apiVersion: v1
kind: Service
metadata:
name: ${service_name}
namespace: ${namespace_name}
spec:
ports:
- port: 8080
targetPort: 8080

@ -0,0 +1,23 @@
---
kind: HTTPRoute
apiVersion: gateway.networking.k8s.io/v1beta1
metadata:
name: ${route_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
parentRefs:
- name: ${frontend_service_name}
namespace: ${namespace_name}
group: ""
kind: Service
rules:
- matches:
- path:
type: Exact
value: /grpc.testing.TestService/UnaryCall
backendRefs:
- name: ${service_name}
port: 8080
...

@ -0,0 +1,29 @@
---
kind: HTTPRoute
apiVersion: gateway.networking.k8s.io/v1beta1
metadata:
name: ${route_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
parentRefs:
- name: ${frontend_service_name}
namespace: ${namespace_name}
group: ""
kind: Service
rules:
- matches:
- path:
type: Exact
value: /grpc.testing.TestService/UnaryCall
filters:
- type: ExtensionRef
extensionRef:
group: networking.gke.io
kind: GCPSessionAffinityFilter
name: ssa-filter
backendRefs:
- name: ${service_name}
port: 8080
...

@ -0,0 +1,10 @@
---
apiVersion: networking.gke.io/v1
kind: GCPSessionAffinityFilter
metadata:
name: ${session_affinity_filter_name}
namespace: ${namespace_name}
spec:
statefulGeneratedCookie:
cookieTtlSeconds: 50
...

@ -0,0 +1,15 @@
---
apiVersion: networking.gke.io/v1
kind: GCPSessionAffinityPolicy
metadata:
name: ${session_affinity_policy_name}
namespace: ${namespace_name}
spec:
statefulGeneratedCookie:
cookieTtlSeconds: 50
targetRef:
name: ${route_name}
group: gateway.networking.k8s.io
kind: HTTPRoute
namespace: ${namespace_name}
...

@ -0,0 +1,15 @@
---
apiVersion: networking.gke.io/v1
kind: GCPSessionAffinityPolicy
metadata:
name: ${session_affinity_policy_name}
namespace: ${namespace_name}
spec:
statefulGeneratedCookie:
cookieTtlSeconds: 50
targetRef:
name: ${service_name}
kind: Service
namespace: ${namespace_name}
group: ""
...

@ -1,19 +0,0 @@
---
kind: TDMesh
apiVersion: net.gke.io/v1alpha1
metadata:
name: ${mesh_name}
namespace: ${namespace_name}
labels:
owner: xds-k8s-interop-test
spec:
gatewayClassName: gke-td
allowedRoutes:
namespaces:
from: All
kinds:
- group: net.gke.io
# This is intentionally incorrect and should be set to GRPCRoute.
# TODO(sergiitk): [GAMMA] Change when the fix is ready.
kind: TDGRPCRoute
...

@ -24,6 +24,9 @@ spec:
% if service_account_name: % if service_account_name:
serviceAccountName: ${service_account_name} serviceAccountName: ${service_account_name}
% endif % endif
% if termination_grace_period_seconds:
terminationGracePeriodSeconds: ${termination_grace_period_seconds}
% endif
containers: containers:
- name: ${deployment_name} - name: ${deployment_name}
image: ${image_name} image: ${image_name}
@ -57,6 +60,12 @@ spec:
requests: requests:
cpu: 100m cpu: 100m
memory: 512Mi memory: 512Mi
% if pre_stop_hook:
lifecycle:
preStop:
exec:
command: ["tail", "-f", "/dev/null"]
% endif
initContainers: initContainers:
- name: grpc-td-init - name: grpc-td-init
image: ${td_bootstrap_image} image: ${td_bootstrap_image}

@ -0,0 +1,116 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
from framework import xds_url_map_testcase
from framework.test_cases import session_affinity_util
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
_REPLICA_COUNT = 3
# This is here temporarily to run this test separately from other app_net tests.
# TODO(sergiitk): Move into app_net_test.py
class AppNetSsaTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase):
def test_session_affinity_policy(self):
test_servers: List[_XdsTestServer]
with self.subTest("0_create_health_check"):
self.td.create_health_check()
with self.subTest("1_create_backend_service"):
self.td.create_backend_service()
with self.subTest("2_create_mesh"):
self.td.create_mesh()
with self.subTest("3_create_http_route"):
self.td.create_http_route_with_content(
{
"meshes": [self.td.mesh.url],
"hostnames": [
f"{self.server_xds_host}:{self.server_xds_port}"
],
"rules": [
{
"action": {
"destinations": [
{
"serviceName": self.td.netsvc.resource_full_name(
self.td.backend_service.name,
"backendServices",
),
"weight": 1,
},
],
"statefulSessionAffinity": {
"cookieTtl": "50s",
},
},
},
],
}
)
with self.subTest("4_run_test_server"):
test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
with self.subTest("5_setup_server_backends"):
self.setupServerBackends()
# Default is round robin LB policy.
with self.subTest("6_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(
test_servers[0], config_mesh=self.td.mesh.name
)
with self.subTest("7_send_first_RPC_and_retrieve_cookie"):
(
cookie,
chosen_server,
) = session_affinity_util.assert_eventually_retrieve_cookie_and_server(
self, test_client, test_servers
)
with self.subTest("8_send_RPCs_with_cookie"):
test_client.update_config.configure(
rpc_types=(RpcTypeUnaryCall,),
metadata=(
(
RpcTypeUnaryCall,
"cookie",
cookie,
),
),
)
self.assertRpcsEventuallyGoToGivenServers(
test_client, [chosen_server], 10
)
if __name__ == "__main__":
absltest.main(failfast=True)

@ -0,0 +1,170 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from framework import xds_gamma_testcase
from framework import xds_k8s_testcase
from framework import xds_url_map_testcase
from framework.rpc import grpc_testing
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_cases import session_affinity_util
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
_XdsTestServer = server_app.XdsTestServer
_XdsTestClient = client_app.XdsTestClient
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
_REPLICA_COUNT = 3
class AffinityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase):
def getClientRpcStats(
self,
test_client: _XdsTestClient,
num_rpcs: int,
*,
metadata_keys: Optional[tuple[str, ...]] = None,
) -> grpc_testing.LoadBalancerStatsResponse:
"""Load all metadata_keys by default."""
return super().getClientRpcStats(
test_client,
num_rpcs,
metadata_keys=metadata_keys or client_app.REQ_LB_STATS_METADATA_ALL,
)
def test_session_affinity_filter(self):
test_servers: List[_XdsTestServer]
with self.subTest("01_run_test_server"):
test_servers = self.startTestServers(
replica_count=_REPLICA_COUNT,
route_template="gamma/route_http_ssafilter.yaml",
)
with self.subTest("02_create_ssa_filter"):
self.server_runner.createSessionAffinityFilter()
# Default is round robin LB policy.
with self.subTest("03_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(test_servers[0])
with self.subTest("04_send_first_RPC_and_retrieve_cookie"):
(
cookie,
chosen_server,
) = session_affinity_util.assert_eventually_retrieve_cookie_and_server(
self, test_client, test_servers
)
with self.subTest("05_send_RPCs_with_cookie"):
test_client.update_config.configure(
rpc_types=(RpcTypeUnaryCall,),
metadata=(
(
RpcTypeUnaryCall,
"cookie",
cookie,
),
),
)
self.assertRpcsEventuallyGoToGivenServers(
test_client, [chosen_server], 10
)
def test_session_affinity_policy_with_route_target(self):
test_servers: List[_XdsTestServer]
with self.subTest("01_run_test_server"):
test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
with self.subTest("02_create_ssa_policy"):
self.server_runner.createSessionAffinityPolicy(
"gamma/session_affinity_policy_route.yaml"
)
# Default is round robin LB policy.
with self.subTest("03_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(test_servers[0])
with self.subTest("04_send_first_RPC_and_retrieve_cookie"):
(
cookie,
chosen_server,
) = session_affinity_util.assert_eventually_retrieve_cookie_and_server(
self, test_client, test_servers
)
with self.subTest("05_send_RPCs_with_cookie"):
test_client.update_config.configure(
rpc_types=(RpcTypeUnaryCall,),
metadata=(
(
RpcTypeUnaryCall,
"cookie",
cookie,
),
),
)
self.assertRpcsEventuallyGoToGivenServers(
test_client, [chosen_server], 10
)
def test_session_affinity_policy_with_service_target(self):
test_servers: List[_XdsTestServer]
with self.subTest("01_run_test_server"):
test_servers = self.startTestServers(replica_count=_REPLICA_COUNT)
with self.subTest("02_create_ssa_policy"):
self.server_runner.createSessionAffinityPolicy(
"gamma/session_affinity_policy_service.yaml"
)
# Default is round robin LB policy.
with self.subTest("03_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(test_servers[0])
with self.subTest("04_send_first_RPC_and_retrieve_cookie"):
(
cookie,
chosen_server,
) = session_affinity_util.assert_eventually_retrieve_cookie_and_server(
self, test_client, test_servers
)
with self.subTest("05_send_RPCs_with_cookie"):
test_client.update_config.configure(
rpc_types=(RpcTypeUnaryCall,),
metadata=(
(
RpcTypeUnaryCall,
"cookie",
cookie,
),
),
)
self.assertRpcsEventuallyGoToGivenServers(
test_client, [chosen_server], 10
)
if __name__ == "__main__":
absltest.main(failfast=True)
Loading…
Cancel
Save