Port basic non-security tests to k8s (#26360)

* wip: port non-security tests to k8s

* port forwarding offset

* combine and log health status changes

* split tests

* add remove_neg method

* implement assertRpcsEventuallyGoToGivenServers

* remove neg and change backend

* make maxRatePerEndpoint configurable

* _test suffix

* "rebase" onto master

* yapf and isort

* add to grpc_xds_k8s.sh

* undo change to run_test_server.py

* fix (avoid?) cleanup error with reuse_namespace=True when run with force_cleanup flag

otherwise cleaning up the secondary KubernetesServerRunner tries to
delete the namespace as well (since force overrides the reuse_namespace
flag in
https://github.com/grpc/grpc/blob/master/tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py#L76)

I'm not sure of the intended semantics of reuse_namespace and force, so
unclear if that conditional in base_runner.py should be changed to allow
the calls to cleanup the secondary KubernetesServerRunners continue to
forward the values of the cleanup/force_cleanup flags

* fix dates

* Add support for secondary kube context for failover test

* Revert "Add support for secondary kube context for failover test"

This reverts commit b7455f2a92.

* Revert "add to grpc_xds_k8s.sh"

This reverts commit 737f13fdc8.
pull/26877/head
Eric Gribkoff 3 years ago committed by GitHub
parent 639b7acdfa
commit 239acada8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  2. 84
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  3. 33
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  4. 24
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  5. 4
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py
  6. 94
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  7. 1
      tools/run_tests/xds_k8s_test_driver/requirements.txt
  8. 4
      tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py
  9. 106
      tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py
  10. 127
      tools/run_tests/xds_k8s_test_driver/tests/failover_test.py
  11. 103
      tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py
  12. 88
      tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py

@ -142,11 +142,17 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
body=body,
**kwargs)
def backend_service_add_backends(self, backend_service, backends):
def backend_service_patch_backends(
self,
backend_service,
backends,
max_rate_per_endpoint: Optional[int] = None):
if max_rate_per_endpoint is None:
max_rate_per_endpoint = 5
backend_list = [{
'group': backend.url,
'balancingMode': 'RATE',
'maxRatePerEndpoint': 5
'maxRatePerEndpoint': max_rate_per_endpoint
} for backend in backends]
self._patch_resource(collection=self.api.backendServices(),
@ -191,6 +197,12 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
return self._insert_resource(self.api.urlMaps(), url_map_body)
def patch_url_map(self, url_map: GcpResource, body, **kwargs):
self._patch_resource(collection=self.api.urlMaps(),
urlMap=url_map.name,
body=body,
**kwargs)
def delete_url_map(self, name):
self._delete_resource(self.api.urlMaps(), 'urlMap', name)

@ -14,7 +14,7 @@
import functools
import logging
import random
from typing import Any, Iterable, List, Optional, Set
from typing import Any, Dict, Iterable, List, Optional, Set
from framework import xds_flags
from framework.infrastructure import gcp
@ -203,20 +203,35 @@ class TrafficDirectorManager:
self.compute.delete_backend_service(name)
self.backend_service = None
def backend_service_add_neg_backends(self, name, zones):
def backend_service_add_neg_backends(self,
name,
zones,
max_rate_per_endpoint: Optional[
int] = None):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.backends.add(backend)
self.backend_service_add_backends()
self.backend_service_patch_backends(max_rate_per_endpoint)
def backend_service_add_backends(self):
def backend_service_remove_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.backends.remove(backend)
self.backend_service_patch_backends()
def backend_service_patch_backends(
self, max_rate_per_endpoint: Optional[int] = None):
logging.info('Adding backends to Backend Service %s: %r',
self.backend_service.name, self.backends)
self.compute.backend_service_add_backends(self.backend_service,
self.backends)
self.compute.backend_service_patch_backends(self.backend_service,
self.backends,
max_rate_per_endpoint)
def backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
@ -266,13 +281,13 @@ class TrafficDirectorManager:
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.alternative_backends.add(backend)
self.alternative_backend_service_add_backends()
self.alternative_backend_service_patch_backends()
def alternative_backend_service_add_backends(self):
def alternative_backend_service_patch_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.alternative_backend_service.name,
self.alternative_backends)
self.compute.backend_service_add_backends(
self.compute.backend_service_patch_backends(
self.alternative_backend_service, self.alternative_backends)
def alternative_backend_service_remove_all_backends(self):
@ -326,13 +341,13 @@ class TrafficDirectorManager:
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.affinity_backends.add(backend)
self.affinity_backend_service_add_backends()
self.affinity_backend_service_patch_backends()
def affinity_backend_service_add_backends(self):
def affinity_backend_service_patch_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.affinity_backend_service.name, self.affinity_backends)
self.compute.backend_service_add_backends(self.affinity_backend_service,
self.affinity_backends)
self.compute.backend_service_patch_backends(
self.affinity_backend_service, self.affinity_backends)
def affinity_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
@ -347,6 +362,31 @@ class TrafficDirectorManager:
self.compute.wait_for_backends_healthy_status(
self.affinity_backend_service, self.affinity_backends)
def _generate_url_map_body(
self,
name: str,
matcher_name: str,
src_hosts,
dst_default_backend_service: GcpResource,
dst_host_rule_match_backend_service: Optional[GcpResource] = None,
) -> Dict[str, Any]:
if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service
return {
'name':
name,
'defaultService':
dst_default_backend_service.url,
'hostRules': [{
'hosts': src_hosts,
'pathMatcher': matcher_name,
}],
'pathMatchers': [{
'name': matcher_name,
'defaultService': dst_host_rule_match_backend_service.url,
}],
}
def create_url_map(
self,
src_host: str,
@ -357,12 +397,24 @@ class TrafficDirectorManager:
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Creating URL map "%s": %s -> %s', name, src_address,
self.backend_service.name)
resource = self.compute.create_url_map(name, matcher_name,
[src_address],
self.backend_service)
resource = self.compute.create_url_map_with_content(
self._generate_url_map_body(name, matcher_name, [src_address],
self.backend_service))
self.url_map = resource
return resource
def patch_url_map(self, src_host: str, src_port: int,
backend_service: GcpResource):
src_address = f'{src_host}:{src_port}'
name = self.make_resource_name(self.URL_MAP_NAME)
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Patching URL map "%s": %s -> %s', name, src_address,
backend_service.name)
self.compute.patch_url_map(
self.url_map,
self._generate_url_map_body(name, matcher_name, [src_address],
backend_service))
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
logger.info('Creating URL map: %s', url_map_body)
resource = self.compute.create_url_map_with_content(url_map_body)

@ -19,8 +19,11 @@ import logging
from typing import Iterable, Optional, Tuple
import grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
import framework.rpc
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
@ -107,3 +110,33 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper
req=request,
deadline_sec=timeout_sec,
log_level=logging.INFO)
class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper):
stub: test_pb2_grpc.XdsUpdateHealthServiceStub
def __init__(self, channel: grpc.Channel):
super().__init__(channel, test_pb2_grpc.XdsUpdateHealthServiceStub)
def set_serving(self):
self.call_unary_with_deadline(rpc='SetServing',
req=empty_pb2.Empty(),
log_level=logging.INFO)
def set_not_serving(self):
self.call_unary_with_deadline(rpc='SetNotServing',
req=empty_pb2.Empty(),
log_level=logging.INFO)
class HealthClient(framework.rpc.grpc.GrpcClientHelper):
stub: health_pb2_grpc.HealthStub
def __init__(self, channel: grpc.Channel):
super().__init__(channel, health_pb2_grpc.HealthStub)
def check_health(self):
return self.call_unary_with_deadline(
rpc='Check',
req=health_pb2.HealthCheckRequest(),
log_level=logging.INFO)

@ -26,12 +26,15 @@ from framework.infrastructure import gcp
from framework.infrastructure import k8s
import framework.rpc
from framework.rpc import grpc_channelz
from framework.rpc import grpc_testing
from framework.test_app import base_runner
logger = logging.getLogger(__name__)
# Type aliases
_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
_XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient
_HealthClient = grpc_testing.HealthClient
class XdsTestServer(framework.rpc.grpc.GrpcApp):
@ -65,6 +68,27 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
def channelz(self) -> _ChannelzServiceClient:
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
@property
@functools.lru_cache(None)
def update_health_service_client(self) -> _XdsUpdateHealthServiceClient:
return _XdsUpdateHealthServiceClient(
self._make_channel(self.maintenance_port))
@property
@functools.lru_cache(None)
def health_client(self) -> _HealthClient:
return _HealthClient(self._make_channel(self.maintenance_port))
def set_serving(self):
logger.info('Setting health status to serving')
self.update_health_service_client.set_serving()
logger.info('Server reports %s', self.health_client.check_health())
def set_not_serving(self):
logger.info('Setting health status to not serving')
self.update_health_service_client.set_not_serving()
logger.info('Server reports %s', self.health_client.check_health())
def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
self.xds_host, self.xds_port = xds_host, xds_port

@ -17,6 +17,10 @@ from absl import flags
KUBE_CONTEXT = flags.DEFINE_string("kube_context",
default=None,
help="Kubectl context to use")
SECONDARY_KUBE_CONTEXT = flags.DEFINE_string(
"secondary_kube_context",
default=None,
help="Secondary kubectl context to use for cluster in another region")
GCP_SERVICE_ACCOUNT = flags.DEFINE_string(
"gcp_service_account",
default=None,

@ -17,7 +17,7 @@ import enum
import hashlib
import logging
import time
from typing import Optional, Tuple
from typing import List, Optional, Tuple
from absl import flags
from absl.testing import absltest
@ -61,6 +61,8 @@ LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
_ChannelState = grpc_channelz.ChannelState
_timedelta = datetime.timedelta
_TD_CONFIG_MAX_WAIT_SEC = 600
class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
_resource_suffix_randomize: bool = True
@ -123,6 +125,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
# Resource managers
cls.k8s_api_manager = k8s.KubernetesApiManager(
xds_k8s_flags.KUBE_CONTEXT.value)
cls.secondary_k8s_api_manager = k8s.KubernetesApiManager(
xds_k8s_flags.SECONDARY_KUBE_CONTEXT.value)
cls.gcp_api_manager = gcp.api.GcpApiManager()
def setUp(self):
@ -178,6 +182,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
@classmethod
def tearDownClass(cls):
cls.k8s_api_manager.close()
cls.secondary_k8s_api_manager.close()
cls.gcp_api_manager.close()
def tearDown(self):
@ -201,20 +206,37 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
self.server_xds_port,
health_check_port=self.server_maintenance_port)
def setupServerBackends(self, *, wait_for_healthy_status=True):
def setupServerBackends(self,
*,
wait_for_healthy_status=True,
server_runner=None,
max_rate_per_endpoint: Optional[int] = None):
if server_runner is None:
server_runner = self.server_runner
# Load Backends
neg_name, neg_zones = self.server_runner.k8s_namespace.get_service_neg(
self.server_runner.service_name, self.server_port)
neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg(
server_runner.service_name, self.server_port)
# Add backends to the Backend Service
self.td.backend_service_add_neg_backends(neg_name, neg_zones)
self.td.backend_service_add_neg_backends(
neg_name, neg_zones, max_rate_per_endpoint=max_rate_per_endpoint)
if wait_for_healthy_status:
self.td.wait_for_backends_healthy_status()
def removeServerBackends(self, *, server_runner=None):
if server_runner is None:
server_runner = self.server_runner
# Load Backends
neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg(
server_runner.service_name, self.server_port)
# Remove backends from the Backend Service
self.td.backend_service_remove_neg_backends(neg_name, neg_zones)
def assertSuccessfulRpcs(self,
test_client: XdsTestClient,
num_rpcs: int = 100):
lb_stats = self.sendRpcs(test_client, num_rpcs)
lb_stats = self.getClientRpcStats(test_client, num_rpcs)
self.assertAllBackendsReceivedRpcs(lb_stats)
failed = int(lb_stats.num_failures)
self.assertLessEqual(
@ -222,6 +244,40 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
0,
msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
def assertRpcsEventuallyGoToGivenServers(self,
test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int = 100):
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=1),
timeout=datetime.timedelta(seconds=_TD_CONFIG_MAX_WAIT_SEC),
log_level=logging.INFO)
try:
retryer(self._assertRpcsEventuallyGoToGivenServers, test_client,
servers, num_rpcs)
except retryers.RetryError:
logger.exception(
'Rpcs did not go to expected servers before timeout %s',
_TD_CONFIG_MAX_WAIT_SEC)
def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int):
server_names = [server.pod_name for server in servers]
logger.info(f'Verifying RPCs go to {server_names}')
lb_stats = self.getClientRpcStats(test_client, num_rpcs)
failed = int(lb_stats.num_failures)
self.assertLessEqual(
failed,
0,
msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
for server_name in server_names:
self.assertIn(server_name, lb_stats.rpcs_by_peer,
f'{server_name} did not receive RPCs')
for peer in lb_stats.rpcs_by_peer.keys():
self.assertIn(peer, server_names,
f'Unexpected server {peer} received RPCs')
def assertXdsConfigExists(self, test_client: XdsTestClient):
config = test_client.csds.fetch_client_status(log_level=logging.INFO)
self.assertIsNotNone(config)
@ -241,7 +297,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
def assertFailedRpcs(self,
test_client: XdsTestClient,
num_rpcs: Optional[int] = 100):
lb_stats = self.sendRpcs(test_client, num_rpcs)
lb_stats = self.getClientRpcStats(test_client, num_rpcs)
failed = int(lb_stats.num_failures)
self.assertEqual(
failed,
@ -249,8 +305,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
msg=f'Expected all RPCs to fail: {failed} of {num_rpcs} failed')
@staticmethod
def sendRpcs(test_client: XdsTestClient,
num_rpcs: int) -> LoadBalancerStatsResponse:
def getClientRpcStats(test_client: XdsTestClient,
num_rpcs: int) -> LoadBalancerStatsResponse:
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
logger.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
@ -295,7 +351,8 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
xds_server_uri=self.xds_server_uri,
network=self.network)
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding)
def initKubernetesClientRunner(self) -> KubernetesClientRunner:
return KubernetesClientRunner(
@ -313,14 +370,21 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
stats_port=self.client_port,
reuse_namespace=self.server_namespace == self.client_namespace)
def startTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
test_server = self.server_runner.run(
def startTestServers(self,
replica_count=1,
server_runner=None,
**kwargs) -> List[XdsTestServer]:
if server_runner is None:
server_runner = self.server_runner
test_servers = server_runner.run(
replica_count=replica_count,
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
**kwargs)[0]
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
return test_server
**kwargs)
for test_server in test_servers:
test_server.set_xds_address(self.server_xds_host,
self.server_xds_port)
return test_servers
def startTestClient(self, test_server: XdsTestServer,
**kwargs) -> XdsTestClient:

@ -5,6 +5,7 @@ dataclasses~=0.8; python_version < '3.7'
google-api-python-client~=1.12
google-cloud-secret-manager~=2.1
grpcio~=1.34
grpcio-health-checking~=1.34
grpcio-tools~=1.34
grpcio-channelz~=1.34
kubernetes~=12.0

@ -45,13 +45,13 @@ class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('5_start_test_server'):
test_server: _XdsTestServer = self.startTestServer()
test_servers: _XdsTestServer = self.startTestServers()
with self.subTest('6_add_server_backends_to_backend_service'):
self.setupServerBackends()
with self.subTest('7_start_test_client'):
test_client: _XdsTestClient = self.startTestClient(test_server)
test_client: _XdsTestClient = self.startTestClient(test_servers[0])
with self.subTest('8_test_client_xds_config_exists'):
self.assertXdsConfigExists(test_client)

@ -0,0 +1,106 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.alternate_k8s_namespace = k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace)
self.alternate_server_runner = server_app.KubernetesServerRunner(
self.alternate_k8s_namespace,
deployment_name=self.server_name + '-alt',
image_name=self.server_image,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
xds_server_uri=self.xds_server_uri,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
reuse_namespace=True)
def tearDown(self):
if hasattr(self, 'alternate_server_runner'):
self.alternate_server_runner.cleanup()
super().tearDown()
def test_change_backend_service(self) -> None:
with self.subTest('00_create_health_check'):
self.td.create_health_check()
with self.subTest('01_create_backend_services'):
self.td.create_backend_service()
self.td.create_alternative_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('05_start_test_servers'):
self.default_test_servers: List[
_XdsTestServer] = self.startTestServers()
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends()
# Add backend to alternative backend service
neg_name_alt, neg_zones_alt = self.alternate_k8s_namespace.get_service_neg(
self.alternate_server_runner.service_name, self.server_port)
self.td.alternative_backend_service_add_neg_backends(
neg_name_alt, neg_zones_alt)
with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient(
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client)
with self.subTest('10_change_backend_service'):
self.td.patch_url_map(self.server_xds_host, self.server_xds_port,
self.td.alternative_backend_service)
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.same_zone_test_servers)
if __name__ == '__main__':
absltest.main(failfast=True)

@ -0,0 +1,127 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
REPLICA_COUNT = 3
MAX_RATE_PER_ENDPOINT = 100
def setUp(self):
super().setUp()
self.secondary_server_runner = server_app.KubernetesServerRunner(
k8s.KubernetesNamespace(self.secondary_k8s_api_manager,
self.server_namespace),
deployment_name=self.server_name + '-alt',
image_name=self.server_image,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
xds_server_uri=self.xds_server_uri,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
reuse_namespace=True)
def tearDown(self):
if hasattr(self, 'secondary_server_runner'):
self.secondary_server_runner.cleanup()
super().tearDown()
def test_failover(self) -> None:
with self.subTest('00_create_health_check'):
self.td.create_health_check()
with self.subTest('01_create_backend_services'):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('05_start_test_servers'):
self.default_test_servers: List[
_XdsTestServer] = self.startTestServers(
replica_count=self.REPLICA_COUNT)
self.alternate_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.secondary_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends(
max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
self.setupServerBackends(
server_runner=self.secondary_server_runner,
max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient(
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client)
with self.subTest('09_primary_locality_receives_requests'):
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers)
with self.subTest(
'10_secondary_locality_receives_no_requests_on_partial_primary_failure'
):
self.default_test_servers[0].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.default_test_servers[1:])
with self.subTest('11_gentle_failover'):
self.default_test_servers[1].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(
self.test_client,
self.default_test_servers[2:] + self.alternate_test_servers)
with self.subTest(
'12_secondary_locality_receives_requests_on_primary_failure'):
self.default_test_servers[2].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.alternate_test_servers)
with self.subTest('13_traffic_resumes_to_healthy_backends'):
for i in range(self.REPLICA_COUNT):
self.default_test_servers[i].set_serving()
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers)
if __name__ == '__main__':
absltest.main(failfast=True)

@ -0,0 +1,103 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.alternate_server_runner = server_app.KubernetesServerRunner(
k8s.KubernetesNamespace(self.k8s_api_manager,
self.server_namespace),
deployment_name=self.server_name + '-alt',
image_name=self.server_image,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
xds_server_uri=self.xds_server_uri,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
reuse_namespace=True)
def tearDown(self):
if hasattr(self, 'alternate_server_runner'):
self.alternate_server_runner.cleanup()
super().tearDown()
def test_remove_neg(self) -> None:
with self.subTest('00_create_health_check'):
self.td.create_health_check()
with self.subTest('01_create_backend_services'):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('05_start_test_servers'):
self.default_test_servers: List[
_XdsTestServer] = self.startTestServers()
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends()
self.setupServerBackends(server_runner=self.alternate_server_runner)
with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient(
self.default_test_servers[0])
with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client)
with self.subTest('10_remove_neg'):
self.assertRpcsEventuallyGoToGivenServers(
self.test_client,
self.default_test_servers + self.same_zone_test_servers)
self.removeServerBackends(
server_runner=self.alternate_server_runner)
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers)
if __name__ == '__main__':
absltest.main(failfast=True)

@ -0,0 +1,88 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def test_round_robin(self) -> None:
REPLICA_COUNT = 2
with self.subTest('00_create_health_check'):
self.td.create_health_check()
with self.subTest('01_create_backend_services'):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
with self.subTest('04_create_forwarding_rule'):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('05_start_test_servers'):
self.test_servers: List[_XdsTestServer] = self.startTestServers(
replica_count=REPLICA_COUNT)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends()
with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient(
self.test_servers[0])
with self.subTest('08_test_client_xds_config_exists'):
self.assertXdsConfigExists(self.test_client)
with self.subTest('09_test_server_received_rpcs_from_test_client'):
self.assertSuccessfulRpcs(self.test_client)
with self.subTest('10_round_robin'):
num_rpcs = 100
expected_rpcs_per_replica = num_rpcs / REPLICA_COUNT
rpcs_by_peer = self.getClientRpcStats(self.test_client,
num_rpcs).rpcs_by_peer
total_requests_received = sum(rpcs_by_peer[x] for x in rpcs_by_peer)
self.assertEqual(total_requests_received, num_rpcs,
'Wrong number of RPCS')
for server in self.test_servers:
pod_name = server.pod_name
self.assertIn(pod_name, rpcs_by_peer,
f'pod {pod_name} did not receive RPCs')
self.assertLessEqual(
abs(rpcs_by_peer[pod_name] - expected_rpcs_per_replica), 1,
f'Wrong number of RPCs for {pod_name}')
if __name__ == '__main__':
absltest.main(failfast=True)
Loading…
Cancel
Save