Implement Simple Ping-Pong Interop Test for Application Networking APIs (#26769)

* Create GrpcRoute resources

* Pass ping pong test

* Dumb down type annotations for 3.6

* Yapf

* Address review comments

* Review comments

* s/from_dict/from_response/g

* Yapf

* an empty commit to force EasyCLA rerun
pull/26360/head
Richard Belleville 3 years ago committed by GitHub
parent d140f14caf
commit 4271d1a482
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 163
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py
  2. 96
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  3. 13
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  4. 61
      tools/run_tests/xds_k8s_test_driver/tests/app_net_test.py

@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import logging
from typing import Optional
from typing import Any, Dict, List, Optional, Tuple
from google.rpc import code_pb2
import tenacity
@ -22,9 +23,135 @@ from framework.infrastructure import gcp
logger = logging.getLogger(__name__)
_ComputeV1 = gcp.compute.ComputeV1
GcpResource = _ComputeV1.GcpResource
@dataclasses.dataclass(frozen=True)
class Router:
name: str
url: str
type: str
network: Optional[str]
routes: Optional[List[str]]
@classmethod
def from_response(cls, name: str, d: Dict[str, Any]) -> 'Router':
return cls(
name=name,
url=d["name"],
type=d["type"],
network=d.get("network"),
routes=list(d["routes"]) if "routes" in d else None,
)
@dataclasses.dataclass(frozen=True)
class GrpcRoute:
@dataclasses.dataclass(frozen=True)
class MethodMatch:
type: Optional[str]
grpc_service: Optional[str]
grpc_method: Optional[str]
case_sensitive: Optional[bool]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'MethodMatch':
return cls(
type=d.get("type"),
grpc_service=d.get("grpcService"),
grpc_method=d.get("grpcMethod"),
case_sensitive=d.get("caseSensitive"),
)
@dataclasses.dataclass(frozen=True)
class HeaderMatch:
type: Optional[str]
key: str
value: str
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'HeaderMatch':
return cls(
type=d.get("type"),
key=d["key"],
value=d["value"],
)
@dataclasses.dataclass(frozen=True)
class RouteMatch:
method: Optional['MethodMatch']
headers: Tuple['HeaderMatch']
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteMatch':
return cls(
method=MethodMatch.from_response(d["method"])
if "method" in d else None,
headers=tuple(
HeaderMatch.from_response(h) for h in d["headers"])
if "headers" in d else (),
)
@dataclasses.dataclass(frozen=True)
class Destination:
service_name: str
weight: Optional[int]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'Destination':
return cls(
service_name=d["serviceName"],
weight=d.get("weight"),
)
@dataclasses.dataclass(frozen=True)
class RouteAction:
destination: Optional['Destination']
drop: Optional[int]
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteAction':
return cls(
destination=Destination.from_response(d["destination"])
if "destination" in d else None,
drop=d.get("drop"),
)
@dataclasses.dataclass(frozen=True)
class RouteRule:
match: Optional['RouteMatch']
action: 'RouteAction'
@classmethod
def from_response(cls, d: Dict[str, Any]) -> 'RouteRule':
return cls(
match=RouteMatch.from_response(d["match"])
if "match" in d else "",
action=RouteAction.from_response(d["action"]),
)
name: str
url: str
hostnames: Tuple[str]
rules: Tuple['RouteRule']
@classmethod
def from_response(cls, name: str, d: Dict[str, Any]) -> 'RouteRule':
return cls(
name=name,
url=d["name"],
hostnames=tuple(d["hostnames"]),
rules=tuple(d["rules"]),
)
class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
ENDPOINT_CONFIG_SELECTORS = 'endpointConfigSelectors'
GRPC_ROUTES = 'grpcRoutes'
ROUTERS = 'routers'
@dataclasses.dataclass(frozen=True)
class EndpointConfigSelector:
@ -90,6 +217,40 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
reraise=True)
retryer(super()._execute, *args, **kwargs)
def create_router(self, name: str, body: dict) -> GcpResource:
return self._create_resource(
self._api_locations.routers(),
body,
routerId=name,
)
def get_router(self, name: str) -> Router:
result = self._get_resource(collection=self._api_locations.routers(),
full_name=self.resource_full_name(
name, self.ROUTERS))
return Router.from_response(name, result)
def delete_router(self, name: str) -> bool:
return self._delete_resource(collection=self._api_locations.routers(),
full_name=self.resource_full_name(
name, self.ROUTERS))
def create_grpc_route(self, name: str, body: dict) -> GcpResource:
return self._create_resource(self._api_locations.grpcRoutes(),
body,
grpcRouteId=name)
def get_grpc_route(self, name: str) -> GrpcRoute:
result = self._get_resource(collection=self._api_locations.grpcRoutes(),
full_name=self.resource_full_name(
name, self.GRPC_ROUTES))
return GrpcRoute.from_response(name, result)
def delete_grpc_route(self, name: str) -> bool:
return self._delete_resource(
collection=self._api_locations.grpcRoutes(),
full_name=self.resource_full_name(name, self.GRPC_ROUTES))
@staticmethod
def _operation_internal_error(exception):
return (isinstance(exception, gcp.api.OperationError) and

@ -14,7 +14,7 @@
import functools
import logging
import random
from typing import Any, List, Optional, Set
from typing import Any, Iterable, List, Optional, Set
from framework import xds_flags
from framework.infrastructure import gcp
@ -480,6 +480,100 @@ class TrafficDirectorManager:
self.firewall_rule = None
class TrafficDirectorAppNetManager(TrafficDirectorManager):
GRPC_ROUTE_NAME = "grpc-route"
ROUTER_NAME = "router"
def __init__(self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
resource_suffix: Optional[str] = None,
network: str = 'default'):
super().__init__(gcp_api_manager,
project,
resource_prefix=resource_prefix,
resource_suffix=resource_suffix,
network=network)
# API
self.netsvc = _NetworkServicesV1Alpha1(gcp_api_manager, project)
# Managed resources
self.grpc_route: Optional[_NetworkServicesV1Alpha1.GrpcRoute] = None
self.router: Optional[_NetworkServicesV1Alpha1.Router] = None
def create_router(self) -> GcpResource:
name = self.make_resource_name(self.ROUTER_NAME)
logger.info("Creating Router %s", name)
body = {
"type": "PROXYLESS_GRPC",
"routes": [self.grpc_route.url],
"network": "default",
}
resource = self.netsvc.create_router(name, body)
self.router = self.netsvc.get_router(name)
logger.debug("Loaded Router: %s", self.router)
return resource
def delete_router(self, force=False):
if force:
name = self.make_resource_name(self.ROUTER_NAME)
elif self.router:
name = self.router.name
else:
return
logger.info('Deleting Router %s', name)
self.netsvc.delete_router(name)
self.router = None
def create_grpc_route(self, src_host: str, src_port: int) -> GcpResource:
host = f'{src_host}:{src_port}'
body = {
"hostnames":
host,
"rules": [{
"action": {
"destination": {
"serviceName": self.backend_service.name
}
}
}],
}
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
logger.info("Creating GrpcRoute %s", name)
resource = self.netsvc.create_grpc_route(name, body)
self.grpc_route = self.netsvc.get_grpc_route(name)
logger.debug("Loaded GrpcRoute: %s", self.grpc_route)
return resource
def create_grpc_route_with_content(self, body: Any) -> GcpResource:
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
logger.info("Creating GrpcRoute %s", name)
resource = self.netsvc.create_grpc_route(name, body)
self.grpc_route = self.netsvc.get_grpc_route(name)
logger.debug("Loaded GrpcRoute: %s", self.grpc_route)
return resource
def delete_grpc_route(self, force=False):
if force:
name = self.make_resource_name(self.GRPC_ROUTE_NAME)
elif self.grpc_route:
name = self.grpc_route.name
else:
return
logger.info('Deleting GrpcRoute %s', name)
self.netsvc.delete_grpc_route(name)
self.grpc_route = None
def cleanup(self, *, force=False):
self.delete_router(force=force)
self.delete_grpc_route(force=force)
super().cleanup(force=force)
class TrafficDirectorSecureManager(TrafficDirectorManager):
netsec: Optional[_NetworkSecurityV1Alpha1]
SERVER_TLS_POLICY_NAME = "server-tls-policy"

@ -51,6 +51,7 @@ flags.adopt_module_key_flags(xds_k8s_flags)
# Type aliases
TrafficDirectorManager = traffic_director.TrafficDirectorManager
TrafficDirectorAppNetManager = traffic_director.TrafficDirectorAppNetManager
TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager
XdsTestServer = server_app.XdsTestServer
XdsTestClient = client_app.XdsTestClient
@ -329,6 +330,18 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
return test_client
class AppNetXdsKubernetesTestCase(RegularXdsKubernetesTestCase):
td: TrafficDirectorAppNetManager
def initTrafficDirectorManager(self) -> TrafficDirectorAppNetManager:
return TrafficDirectorAppNetManager(
self.gcp_api_manager,
project=self.project,
resource_prefix=self.resource_prefix,
resource_suffix=self.resource_suffix,
network=self.network)
class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
td: TrafficDirectorSecureManager

@ -0,0 +1,61 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class AppNetTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase):
def test_ping_pong(self):
with self.subTest('0_create_health_check'):
self.td.create_health_check()
with self.subTest('1_create_backend_service'):
self.td.create_backend_service()
with self.subTest('2_create_grpc_route'):
self.td.create_grpc_route(self.server_xds_host,
self.server_xds_port)
with self.subTest('3_create_router'):
self.td.create_router()
with self.subTest('4_start_test_server'):
test_server: _XdsTestServer = self.startTestServer()
with self.subTest('5_setup_server_backends'):
self.setupServerBackends()
with self.subTest('6_start_test_client'):
test_client: _XdsTestClient = self.startTestClient(test_server)
with self.subTest('7_assert_xds_config_exists'):
self.assertXdsConfigExists(test_client)
with self.subTest('8_assert_successful_rpcs'):
self.assertSuccessfulRpcs(test_client)
if __name__ == '__main__':
absltest.main(failfast=True)
Loading…
Cancel
Save