diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py index 485b6fba6dd..8d128ef3f9e 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py @@ -142,11 +142,17 @@ class ComputeV1(gcp.api.GcpProjectApiResource): body=body, **kwargs) - def backend_service_add_backends(self, backend_service, backends): + def backend_service_patch_backends( + self, + backend_service, + backends, + max_rate_per_endpoint: Optional[int] = None): + if max_rate_per_endpoint is None: + max_rate_per_endpoint = 5 backend_list = [{ 'group': backend.url, 'balancingMode': 'RATE', - 'maxRatePerEndpoint': 5 + 'maxRatePerEndpoint': max_rate_per_endpoint } for backend in backends] self._patch_resource(collection=self.api.backendServices(), @@ -191,6 +197,12 @@ class ComputeV1(gcp.api.GcpProjectApiResource): def create_url_map_with_content(self, url_map_body: Any) -> GcpResource: return self._insert_resource(self.api.urlMaps(), url_map_body) + def patch_url_map(self, url_map: GcpResource, body, **kwargs): + self._patch_resource(collection=self.api.urlMaps(), + urlMap=url_map.name, + body=body, + **kwargs) + def delete_url_map(self, name): self._delete_resource(self.api.urlMaps(), 'urlMap', name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py index 04a334518e0..ebbc06edf07 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py @@ -14,7 +14,7 @@ import functools import logging import random -from typing import Any, Iterable, List, Optional, Set +from typing import Any, Dict, Iterable, List, Optional, Set from framework import xds_flags from framework.infrastructure import gcp @@ -203,20 +203,35 @@ class TrafficDirectorManager: self.compute.delete_backend_service(name) self.backend_service = None - def backend_service_add_neg_backends(self, name, zones): + def backend_service_add_neg_backends(self, + name, + zones, + max_rate_per_endpoint: Optional[ + int] = None): logger.info('Waiting for Network Endpoint Groups to load endpoints.') for zone in zones: backend = self.compute.wait_for_network_endpoint_group(name, zone) logger.info('Loaded NEG "%s" in zone %s', backend.name, backend.zone) self.backends.add(backend) - self.backend_service_add_backends() + self.backend_service_patch_backends(max_rate_per_endpoint) - def backend_service_add_backends(self): + def backend_service_remove_neg_backends(self, name, zones): + logger.info('Waiting for Network Endpoint Groups to load endpoints.') + for zone in zones: + backend = self.compute.wait_for_network_endpoint_group(name, zone) + logger.info('Loaded NEG "%s" in zone %s', backend.name, + backend.zone) + self.backends.remove(backend) + self.backend_service_patch_backends() + + def backend_service_patch_backends( + self, max_rate_per_endpoint: Optional[int] = None): logging.info('Adding backends to Backend Service %s: %r', self.backend_service.name, self.backends) - self.compute.backend_service_add_backends(self.backend_service, - self.backends) + self.compute.backend_service_patch_backends(self.backend_service, + self.backends, + max_rate_per_endpoint) def backend_service_remove_all_backends(self): logging.info('Removing backends from Backend Service %s', @@ -266,13 +281,13 @@ class TrafficDirectorManager: logger.info('Loaded NEG "%s" in zone %s', backend.name, backend.zone) self.alternative_backends.add(backend) - self.alternative_backend_service_add_backends() + self.alternative_backend_service_patch_backends() - def alternative_backend_service_add_backends(self): + def alternative_backend_service_patch_backends(self): logging.info('Adding backends to Backend Service %s: %r', self.alternative_backend_service.name, self.alternative_backends) - self.compute.backend_service_add_backends( + self.compute.backend_service_patch_backends( self.alternative_backend_service, self.alternative_backends) def alternative_backend_service_remove_all_backends(self): @@ -326,13 +341,13 @@ class TrafficDirectorManager: logger.info('Loaded NEG "%s" in zone %s', backend.name, backend.zone) self.affinity_backends.add(backend) - self.affinity_backend_service_add_backends() + self.affinity_backend_service_patch_backends() - def affinity_backend_service_add_backends(self): + def affinity_backend_service_patch_backends(self): logging.info('Adding backends to Backend Service %s: %r', self.affinity_backend_service.name, self.affinity_backends) - self.compute.backend_service_add_backends(self.affinity_backend_service, - self.affinity_backends) + self.compute.backend_service_patch_backends( + self.affinity_backend_service, self.affinity_backends) def affinity_backend_service_remove_all_backends(self): logging.info('Removing backends from Backend Service %s', @@ -347,6 +362,31 @@ class TrafficDirectorManager: self.compute.wait_for_backends_healthy_status( self.affinity_backend_service, self.affinity_backends) + def _generate_url_map_body( + self, + name: str, + matcher_name: str, + src_hosts, + dst_default_backend_service: GcpResource, + dst_host_rule_match_backend_service: Optional[GcpResource] = None, + ) -> Dict[str, Any]: + if dst_host_rule_match_backend_service is None: + dst_host_rule_match_backend_service = dst_default_backend_service + return { + 'name': + name, + 'defaultService': + dst_default_backend_service.url, + 'hostRules': [{ + 'hosts': src_hosts, + 'pathMatcher': matcher_name, + }], + 'pathMatchers': [{ + 'name': matcher_name, + 'defaultService': dst_host_rule_match_backend_service.url, + }], + } + def create_url_map( self, src_host: str, @@ -357,12 +397,24 @@ class TrafficDirectorManager: matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME) logger.info('Creating URL map "%s": %s -> %s', name, src_address, self.backend_service.name) - resource = self.compute.create_url_map(name, matcher_name, - [src_address], - self.backend_service) + resource = self.compute.create_url_map_with_content( + self._generate_url_map_body(name, matcher_name, [src_address], + self.backend_service)) self.url_map = resource return resource + def patch_url_map(self, src_host: str, src_port: int, + backend_service: GcpResource): + src_address = f'{src_host}:{src_port}' + name = self.make_resource_name(self.URL_MAP_NAME) + matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME) + logger.info('Patching URL map "%s": %s -> %s', name, src_address, + backend_service.name) + self.compute.patch_url_map( + self.url_map, + self._generate_url_map_body(name, matcher_name, [src_address], + backend_service)) + def create_url_map_with_content(self, url_map_body: Any) -> GcpResource: logger.info('Creating URL map: %s', url_map_body) resource = self.compute.create_url_map_with_content(url_map_body) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py index ac32d51aa79..7d010b33a32 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py @@ -19,8 +19,11 @@ import logging from typing import Iterable, Optional, Tuple import grpc +from grpc_health.v1 import health_pb2 +from grpc_health.v1 import health_pb2_grpc import framework.rpc +from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc @@ -107,3 +110,33 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper req=request, deadline_sec=timeout_sec, log_level=logging.INFO) + + +class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper): + stub: test_pb2_grpc.XdsUpdateHealthServiceStub + + def __init__(self, channel: grpc.Channel): + super().__init__(channel, test_pb2_grpc.XdsUpdateHealthServiceStub) + + def set_serving(self): + self.call_unary_with_deadline(rpc='SetServing', + req=empty_pb2.Empty(), + log_level=logging.INFO) + + def set_not_serving(self): + self.call_unary_with_deadline(rpc='SetNotServing', + req=empty_pb2.Empty(), + log_level=logging.INFO) + + +class HealthClient(framework.rpc.grpc.GrpcClientHelper): + stub: health_pb2_grpc.HealthStub + + def __init__(self, channel: grpc.Channel): + super().__init__(channel, health_pb2_grpc.HealthStub) + + def check_health(self): + return self.call_unary_with_deadline( + rpc='Check', + req=health_pb2.HealthCheckRequest(), + log_level=logging.INFO) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index 80f9472435f..7fec8f1b696 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -26,12 +26,15 @@ from framework.infrastructure import gcp from framework.infrastructure import k8s import framework.rpc from framework.rpc import grpc_channelz +from framework.rpc import grpc_testing from framework.test_app import base_runner logger = logging.getLogger(__name__) # Type aliases _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient +_XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient +_HealthClient = grpc_testing.HealthClient class XdsTestServer(framework.rpc.grpc.GrpcApp): @@ -65,6 +68,27 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp): def channelz(self) -> _ChannelzServiceClient: return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) + @property + @functools.lru_cache(None) + def update_health_service_client(self) -> _XdsUpdateHealthServiceClient: + return _XdsUpdateHealthServiceClient( + self._make_channel(self.maintenance_port)) + + @property + @functools.lru_cache(None) + def health_client(self) -> _HealthClient: + return _HealthClient(self._make_channel(self.maintenance_port)) + + def set_serving(self): + logger.info('Setting health status to serving') + self.update_health_service_client.set_serving() + logger.info('Server reports %s', self.health_client.check_health()) + + def set_not_serving(self): + logger.info('Setting health status to not serving') + self.update_health_service_client.set_not_serving() + logger.info('Server reports %s', self.health_client.check_health()) + def set_xds_address(self, xds_host, xds_port: Optional[int] = None): self.xds_host, self.xds_port = xds_host, xds_port diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py index d7f3f0b4f98..801c83a4a8c 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py @@ -17,6 +17,10 @@ from absl import flags KUBE_CONTEXT = flags.DEFINE_string("kube_context", default=None, help="Kubectl context to use") +SECONDARY_KUBE_CONTEXT = flags.DEFINE_string( + "secondary_kube_context", + default=None, + help="Secondary kubectl context to use for cluster in another region") GCP_SERVICE_ACCOUNT = flags.DEFINE_string( "gcp_service_account", default=None, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index d1b59f74c1b..bf42b105dff 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -17,7 +17,7 @@ import enum import hashlib import logging import time -from typing import Optional, Tuple +from typing import List, Optional, Tuple from absl import flags from absl.testing import absltest @@ -61,6 +61,8 @@ LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse _ChannelState = grpc_channelz.ChannelState _timedelta = datetime.timedelta +_TD_CONFIG_MAX_WAIT_SEC = 600 + class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): _resource_suffix_randomize: bool = True @@ -123,6 +125,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): # Resource managers cls.k8s_api_manager = k8s.KubernetesApiManager( xds_k8s_flags.KUBE_CONTEXT.value) + cls.secondary_k8s_api_manager = k8s.KubernetesApiManager( + xds_k8s_flags.SECONDARY_KUBE_CONTEXT.value) cls.gcp_api_manager = gcp.api.GcpApiManager() def setUp(self): @@ -178,6 +182,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): @classmethod def tearDownClass(cls): cls.k8s_api_manager.close() + cls.secondary_k8s_api_manager.close() cls.gcp_api_manager.close() def tearDown(self): @@ -201,20 +206,37 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): self.server_xds_port, health_check_port=self.server_maintenance_port) - def setupServerBackends(self, *, wait_for_healthy_status=True): + def setupServerBackends(self, + *, + wait_for_healthy_status=True, + server_runner=None, + max_rate_per_endpoint: Optional[int] = None): + if server_runner is None: + server_runner = self.server_runner # Load Backends - neg_name, neg_zones = self.server_runner.k8s_namespace.get_service_neg( - self.server_runner.service_name, self.server_port) + neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg( + server_runner.service_name, self.server_port) # Add backends to the Backend Service - self.td.backend_service_add_neg_backends(neg_name, neg_zones) + self.td.backend_service_add_neg_backends( + neg_name, neg_zones, max_rate_per_endpoint=max_rate_per_endpoint) if wait_for_healthy_status: self.td.wait_for_backends_healthy_status() + def removeServerBackends(self, *, server_runner=None): + if server_runner is None: + server_runner = self.server_runner + # Load Backends + neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg( + server_runner.service_name, self.server_port) + + # Remove backends from the Backend Service + self.td.backend_service_remove_neg_backends(neg_name, neg_zones) + def assertSuccessfulRpcs(self, test_client: XdsTestClient, num_rpcs: int = 100): - lb_stats = self.sendRpcs(test_client, num_rpcs) + lb_stats = self.getClientRpcStats(test_client, num_rpcs) self.assertAllBackendsReceivedRpcs(lb_stats) failed = int(lb_stats.num_failures) self.assertLessEqual( @@ -222,6 +244,40 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): 0, msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed') + def assertRpcsEventuallyGoToGivenServers(self, + test_client: XdsTestClient, + servers: List[XdsTestServer], + num_rpcs: int = 100): + retryer = retryers.constant_retryer( + wait_fixed=datetime.timedelta(seconds=1), + timeout=datetime.timedelta(seconds=_TD_CONFIG_MAX_WAIT_SEC), + log_level=logging.INFO) + try: + retryer(self._assertRpcsEventuallyGoToGivenServers, test_client, + servers, num_rpcs) + except retryers.RetryError: + logger.exception( + 'Rpcs did not go to expected servers before timeout %s', + _TD_CONFIG_MAX_WAIT_SEC) + + def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient, + servers: List[XdsTestServer], + num_rpcs: int): + server_names = [server.pod_name for server in servers] + logger.info(f'Verifying RPCs go to {server_names}') + lb_stats = self.getClientRpcStats(test_client, num_rpcs) + failed = int(lb_stats.num_failures) + self.assertLessEqual( + failed, + 0, + msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed') + for server_name in server_names: + self.assertIn(server_name, lb_stats.rpcs_by_peer, + f'{server_name} did not receive RPCs') + for peer in lb_stats.rpcs_by_peer.keys(): + self.assertIn(peer, server_names, + f'Unexpected server {peer} received RPCs') + def assertXdsConfigExists(self, test_client: XdsTestClient): config = test_client.csds.fetch_client_status(log_level=logging.INFO) self.assertIsNotNone(config) @@ -241,7 +297,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): def assertFailedRpcs(self, test_client: XdsTestClient, num_rpcs: Optional[int] = 100): - lb_stats = self.sendRpcs(test_client, num_rpcs) + lb_stats = self.getClientRpcStats(test_client, num_rpcs) failed = int(lb_stats.num_failures) self.assertEqual( failed, @@ -249,8 +305,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): msg=f'Expected all RPCs to fail: {failed} of {num_rpcs} failed') @staticmethod - def sendRpcs(test_client: XdsTestClient, - num_rpcs: int) -> LoadBalancerStatsResponse: + def getClientRpcStats(test_client: XdsTestClient, + num_rpcs: int) -> LoadBalancerStatsResponse: lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs) logger.info( 'Received LoadBalancerStatsResponse from test client %s:\n%s', @@ -295,7 +351,8 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase): gcp_api_manager=self.gcp_api_manager, gcp_service_account=self.gcp_service_account, xds_server_uri=self.xds_server_uri, - network=self.network) + network=self.network, + debug_use_port_forwarding=self.debug_use_port_forwarding) def initKubernetesClientRunner(self) -> KubernetesClientRunner: return KubernetesClientRunner( @@ -313,14 +370,21 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase): stats_port=self.client_port, reuse_namespace=self.server_namespace == self.client_namespace) - def startTestServer(self, replica_count=1, **kwargs) -> XdsTestServer: - test_server = self.server_runner.run( + def startTestServers(self, + replica_count=1, + server_runner=None, + **kwargs) -> List[XdsTestServer]: + if server_runner is None: + server_runner = self.server_runner + test_servers = server_runner.run( replica_count=replica_count, test_port=self.server_port, maintenance_port=self.server_maintenance_port, - **kwargs)[0] - test_server.set_xds_address(self.server_xds_host, self.server_xds_port) - return test_server + **kwargs) + for test_server in test_servers: + test_server.set_xds_address(self.server_xds_host, + self.server_xds_port) + return test_servers def startTestClient(self, test_server: XdsTestServer, **kwargs) -> XdsTestClient: diff --git a/tools/run_tests/xds_k8s_test_driver/requirements.txt b/tools/run_tests/xds_k8s_test_driver/requirements.txt index 7783cc32365..64a24a19a05 100644 --- a/tools/run_tests/xds_k8s_test_driver/requirements.txt +++ b/tools/run_tests/xds_k8s_test_driver/requirements.txt @@ -5,6 +5,7 @@ dataclasses~=0.8; python_version < '3.7' google-api-python-client~=1.12 google-cloud-secret-manager~=2.1 grpcio~=1.34 +grpcio-health-checking~=1.34 grpcio-tools~=1.34 grpcio-channelz~=1.34 kubernetes~=12.0 diff --git a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py index 3a723121f73..8ea018a2c7e 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py @@ -45,13 +45,13 @@ class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): self.td.create_forwarding_rule(self.server_xds_port) with self.subTest('5_start_test_server'): - test_server: _XdsTestServer = self.startTestServer() + test_servers: _XdsTestServer = self.startTestServers() with self.subTest('6_add_server_backends_to_backend_service'): self.setupServerBackends() with self.subTest('7_start_test_client'): - test_client: _XdsTestClient = self.startTestClient(test_server) + test_client: _XdsTestClient = self.startTestClient(test_servers[0]) with self.subTest('8_test_client_xds_config_exists'): self.assertXdsConfigExists(test_client) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py new file mode 100644 index 00000000000..419a134e99e --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py @@ -0,0 +1,106 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import List, Optional + +from absl import flags +from absl.testing import absltest + +from framework import xds_k8s_testcase +from framework.infrastructure import k8s +from framework.test_app import server_app + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): + + def setUp(self): + super().setUp() + self.alternate_k8s_namespace = k8s.KubernetesNamespace( + self.k8s_api_manager, self.server_namespace) + self.alternate_server_runner = server_app.KubernetesServerRunner( + self.alternate_k8s_namespace, + deployment_name=self.server_name + '-alt', + image_name=self.server_image, + gcp_service_account=self.gcp_service_account, + td_bootstrap_image=self.td_bootstrap_image, + gcp_project=self.project, + gcp_api_manager=self.gcp_api_manager, + xds_server_uri=self.xds_server_uri, + network=self.network, + debug_use_port_forwarding=self.debug_use_port_forwarding, + reuse_namespace=True) + + def tearDown(self): + if hasattr(self, 'alternate_server_runner'): + self.alternate_server_runner.cleanup() + super().tearDown() + + def test_change_backend_service(self) -> None: + with self.subTest('00_create_health_check'): + self.td.create_health_check() + + with self.subTest('01_create_backend_services'): + self.td.create_backend_service() + self.td.create_alternative_backend_service() + + with self.subTest('02_create_url_map'): + self.td.create_url_map(self.server_xds_host, self.server_xds_port) + + with self.subTest('03_create_target_proxy'): + self.td.create_target_proxy() + + with self.subTest('04_create_forwarding_rule'): + self.td.create_forwarding_rule(self.server_xds_port) + + with self.subTest('05_start_test_servers'): + self.default_test_servers: List[ + _XdsTestServer] = self.startTestServers() + self.same_zone_test_servers: List[ + _XdsTestServer] = self.startTestServers( + server_runner=self.alternate_server_runner) + + with self.subTest('06_add_server_backends_to_backend_services'): + self.setupServerBackends() + # Add backend to alternative backend service + neg_name_alt, neg_zones_alt = self.alternate_k8s_namespace.get_service_neg( + self.alternate_server_runner.service_name, self.server_port) + self.td.alternative_backend_service_add_neg_backends( + neg_name_alt, neg_zones_alt) + + with self.subTest('07_start_test_client'): + self.test_client: _XdsTestClient = self.startTestClient( + self.default_test_servers[0]) + + with self.subTest('08_test_client_xds_config_exists'): + self.assertXdsConfigExists(self.test_client) + + with self.subTest('09_test_server_received_rpcs_from_test_client'): + self.assertSuccessfulRpcs(self.test_client) + + with self.subTest('10_change_backend_service'): + self.td.patch_url_map(self.server_xds_host, self.server_xds_port, + self.td.alternative_backend_service) + self.assertRpcsEventuallyGoToGivenServers( + self.test_client, self.same_zone_test_servers) + + +if __name__ == '__main__': + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py new file mode 100644 index 00000000000..5c429488588 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/failover_test.py @@ -0,0 +1,127 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import List + +from absl import flags +from absl.testing import absltest + +from framework import xds_k8s_testcase +from framework.infrastructure import k8s +from framework.test_app import server_app + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): + REPLICA_COUNT = 3 + MAX_RATE_PER_ENDPOINT = 100 + + def setUp(self): + super().setUp() + self.secondary_server_runner = server_app.KubernetesServerRunner( + k8s.KubernetesNamespace(self.secondary_k8s_api_manager, + self.server_namespace), + deployment_name=self.server_name + '-alt', + image_name=self.server_image, + gcp_service_account=self.gcp_service_account, + td_bootstrap_image=self.td_bootstrap_image, + gcp_project=self.project, + gcp_api_manager=self.gcp_api_manager, + xds_server_uri=self.xds_server_uri, + network=self.network, + debug_use_port_forwarding=self.debug_use_port_forwarding, + reuse_namespace=True) + + def tearDown(self): + if hasattr(self, 'secondary_server_runner'): + self.secondary_server_runner.cleanup() + super().tearDown() + + def test_failover(self) -> None: + with self.subTest('00_create_health_check'): + self.td.create_health_check() + + with self.subTest('01_create_backend_services'): + self.td.create_backend_service() + + with self.subTest('02_create_url_map'): + self.td.create_url_map(self.server_xds_host, self.server_xds_port) + + with self.subTest('03_create_target_proxy'): + self.td.create_target_proxy() + + with self.subTest('04_create_forwarding_rule'): + self.td.create_forwarding_rule(self.server_xds_port) + + with self.subTest('05_start_test_servers'): + self.default_test_servers: List[ + _XdsTestServer] = self.startTestServers( + replica_count=self.REPLICA_COUNT) + + self.alternate_test_servers: List[ + _XdsTestServer] = self.startTestServers( + server_runner=self.secondary_server_runner) + + with self.subTest('06_add_server_backends_to_backend_services'): + self.setupServerBackends( + max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT) + self.setupServerBackends( + server_runner=self.secondary_server_runner, + max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT) + + with self.subTest('07_start_test_client'): + self.test_client: _XdsTestClient = self.startTestClient( + self.default_test_servers[0]) + + with self.subTest('08_test_client_xds_config_exists'): + self.assertXdsConfigExists(self.test_client) + + with self.subTest('09_primary_locality_receives_requests'): + self.assertRpcsEventuallyGoToGivenServers(self.test_client, + self.default_test_servers) + + with self.subTest( + '10_secondary_locality_receives_no_requests_on_partial_primary_failure' + ): + self.default_test_servers[0].set_not_serving() + self.assertRpcsEventuallyGoToGivenServers( + self.test_client, self.default_test_servers[1:]) + + with self.subTest('11_gentle_failover'): + self.default_test_servers[1].set_not_serving() + self.assertRpcsEventuallyGoToGivenServers( + self.test_client, + self.default_test_servers[2:] + self.alternate_test_servers) + + with self.subTest( + '12_secondary_locality_receives_requests_on_primary_failure'): + self.default_test_servers[2].set_not_serving() + self.assertRpcsEventuallyGoToGivenServers( + self.test_client, self.alternate_test_servers) + + with self.subTest('13_traffic_resumes_to_healthy_backends'): + for i in range(self.REPLICA_COUNT): + self.default_test_servers[i].set_serving() + self.assertRpcsEventuallyGoToGivenServers(self.test_client, + self.default_test_servers) + + +if __name__ == '__main__': + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py new file mode 100644 index 00000000000..b7201026cf8 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py @@ -0,0 +1,103 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import List, Optional + +from absl import flags +from absl.testing import absltest + +from framework import xds_k8s_testcase +from framework.infrastructure import k8s +from framework.test_app import server_app + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): + + def setUp(self): + super().setUp() + self.alternate_server_runner = server_app.KubernetesServerRunner( + k8s.KubernetesNamespace(self.k8s_api_manager, + self.server_namespace), + deployment_name=self.server_name + '-alt', + image_name=self.server_image, + gcp_service_account=self.gcp_service_account, + td_bootstrap_image=self.td_bootstrap_image, + gcp_project=self.project, + gcp_api_manager=self.gcp_api_manager, + xds_server_uri=self.xds_server_uri, + network=self.network, + debug_use_port_forwarding=self.debug_use_port_forwarding, + reuse_namespace=True) + + def tearDown(self): + if hasattr(self, 'alternate_server_runner'): + self.alternate_server_runner.cleanup() + super().tearDown() + + def test_remove_neg(self) -> None: + with self.subTest('00_create_health_check'): + self.td.create_health_check() + + with self.subTest('01_create_backend_services'): + self.td.create_backend_service() + + with self.subTest('02_create_url_map'): + self.td.create_url_map(self.server_xds_host, self.server_xds_port) + + with self.subTest('03_create_target_proxy'): + self.td.create_target_proxy() + + with self.subTest('04_create_forwarding_rule'): + self.td.create_forwarding_rule(self.server_xds_port) + + with self.subTest('05_start_test_servers'): + self.default_test_servers: List[ + _XdsTestServer] = self.startTestServers() + self.same_zone_test_servers: List[ + _XdsTestServer] = self.startTestServers( + server_runner=self.alternate_server_runner) + + with self.subTest('06_add_server_backends_to_backend_services'): + self.setupServerBackends() + self.setupServerBackends(server_runner=self.alternate_server_runner) + + with self.subTest('07_start_test_client'): + self.test_client: _XdsTestClient = self.startTestClient( + self.default_test_servers[0]) + + with self.subTest('08_test_client_xds_config_exists'): + self.assertXdsConfigExists(self.test_client) + + with self.subTest('09_test_server_received_rpcs_from_test_client'): + self.assertSuccessfulRpcs(self.test_client) + + with self.subTest('10_remove_neg'): + self.assertRpcsEventuallyGoToGivenServers( + self.test_client, + self.default_test_servers + self.same_zone_test_servers) + self.removeServerBackends( + server_runner=self.alternate_server_runner) + self.assertRpcsEventuallyGoToGivenServers(self.test_client, + self.default_test_servers) + + +if __name__ == '__main__': + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py new file mode 100644 index 00000000000..0daa095d065 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py @@ -0,0 +1,88 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import List, Optional + +from absl import flags +from absl.testing import absltest + +from framework import xds_k8s_testcase +from framework.infrastructure import k8s +from framework.test_app import server_app + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): + + def test_round_robin(self) -> None: + REPLICA_COUNT = 2 + + with self.subTest('00_create_health_check'): + self.td.create_health_check() + + with self.subTest('01_create_backend_services'): + self.td.create_backend_service() + + with self.subTest('02_create_url_map'): + self.td.create_url_map(self.server_xds_host, self.server_xds_port) + + with self.subTest('03_create_target_proxy'): + self.td.create_target_proxy() + + with self.subTest('04_create_forwarding_rule'): + self.td.create_forwarding_rule(self.server_xds_port) + + with self.subTest('05_start_test_servers'): + self.test_servers: List[_XdsTestServer] = self.startTestServers( + replica_count=REPLICA_COUNT) + + with self.subTest('06_add_server_backends_to_backend_services'): + self.setupServerBackends() + + with self.subTest('07_start_test_client'): + self.test_client: _XdsTestClient = self.startTestClient( + self.test_servers[0]) + + with self.subTest('08_test_client_xds_config_exists'): + self.assertXdsConfigExists(self.test_client) + + with self.subTest('09_test_server_received_rpcs_from_test_client'): + self.assertSuccessfulRpcs(self.test_client) + + with self.subTest('10_round_robin'): + num_rpcs = 100 + expected_rpcs_per_replica = num_rpcs / REPLICA_COUNT + + rpcs_by_peer = self.getClientRpcStats(self.test_client, + num_rpcs).rpcs_by_peer + total_requests_received = sum(rpcs_by_peer[x] for x in rpcs_by_peer) + self.assertEqual(total_requests_received, num_rpcs, + 'Wrong number of RPCS') + for server in self.test_servers: + pod_name = server.pod_name + self.assertIn(pod_name, rpcs_by_peer, + f'pod {pod_name} did not receive RPCs') + self.assertLessEqual( + abs(rpcs_by_peer[pod_name] - expected_rpcs_per_replica), 1, + f'Wrong number of RPCs for {pod_name}') + + +if __name__ == '__main__': + absltest.main(failfast=True)