diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index d331db6c520..7b7561580ca 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -28,13 +28,14 @@ import grpc from framework import xds_flags from framework import xds_k8s_flags from framework import xds_url_map_testcase +from framework.helpers import rand as helpers_rand from framework.helpers import retryers from framework.helpers import skips -import framework.helpers.rand from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director from framework.rpc import grpc_channelz +from framework.rpc import grpc_csds from framework.rpc import grpc_testing from framework.test_app import client_app from framework.test_app import server_app @@ -60,27 +61,32 @@ KubernetesClientRunner = client_app.KubernetesClientRunner LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse _ChannelState = grpc_channelz.ChannelState _timedelta = datetime.timedelta -ClientConfig = framework.rpc.grpc_csds.ClientConfig +ClientConfig = grpc_csds.ClientConfig _TD_CONFIG_MAX_WAIT_SEC = 600 -class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): - _resource_suffix_randomize: bool = True +class TdPropagationRetryableError(Exception): + """Indicates that TD config hasn't propagated yet, and it's safe to retry""" + + +class XdsKubernetesBaseTestCase(absltest.TestCase): client_namespace: str client_runner: KubernetesClientRunner gcp_api_manager: gcp.api.GcpApiManager k8s_api_manager: k8s.KubernetesApiManager + secondary_k8s_api_manager: k8s.KubernetesApiManager resource_prefix: str resource_suffix: str = '' server_namespace: str server_runner: KubernetesServerRunner server_xds_port: int + server_maintenance_port: int td: TrafficDirectorManager @staticmethod def isSupported(config: skips.TestConfig) -> bool: - """Overrided by the test class to decide if the config is supported. + """Overridden by the test class to decide if the config is supported. Returns: A bool indicates if the given config is supported. @@ -133,7 +139,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): cls.force_cleanup = xds_flags.FORCE_CLEANUP.value cls.debug_use_port_forwarding = \ xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value - cls.enable_workload_identity = xds_k8s_flags.ENABLE_WORKLOAD_IDENTITY.value + cls.enable_workload_identity = \ + xds_k8s_flags.ENABLE_WORKLOAD_IDENTITY.value cls.check_local_certs = _CHECK_LOCAL_CERTS.value # Resource managers @@ -143,78 +150,12 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): xds_k8s_flags.SECONDARY_KUBE_CONTEXT.value) cls.gcp_api_manager = gcp.api.GcpApiManager() - def setUp(self): - """Hook method for setting up the test fixture before exercising it.""" - super().setUp() - - if self._resource_suffix_randomize: - self.resource_suffix = framework.helpers.rand.random_resource_suffix( - ) - logger.info('Test run resource prefix: %s, suffix: %s', - self.resource_prefix, self.resource_suffix) - - # TD Manager - self.td = self.initTrafficDirectorManager() - - # Test Server runner - self.server_namespace = KubernetesServerRunner.make_namespace_name( - self.resource_prefix, self.resource_suffix) - self.server_runner = self.initKubernetesServerRunner() - - # Test Client runner - self.client_namespace = KubernetesClientRunner.make_namespace_name( - self.resource_prefix, self.resource_suffix) - self.client_runner = self.initKubernetesClientRunner() - - # Ensures the firewall exist - if self.ensure_firewall: - self.td.create_firewall_rule( - allowed_ports=self.firewall_allowed_ports) - - # Randomize xds port, when it's set to 0 - if self.server_xds_port == 0: - # TODO(sergiitk): this is prone to race conditions: - # The port might not me taken now, but there's not guarantee - # it won't be taken until the tests get to creating - # forwarding rule. This check is better than nothing, - # but we should find a better approach. - self.server_xds_port = self.td.find_unused_forwarding_rule_port() - logger.info('Found unused xds port: %s', self.server_xds_port) - - @abc.abstractmethod - def initTrafficDirectorManager(self) -> TrafficDirectorManager: - raise NotImplementedError - - @abc.abstractmethod - def initKubernetesServerRunner(self) -> KubernetesServerRunner: - raise NotImplementedError - - @abc.abstractmethod - def initKubernetesClientRunner(self) -> KubernetesClientRunner: - raise NotImplementedError - @classmethod def tearDownClass(cls): cls.k8s_api_manager.close() cls.secondary_k8s_api_manager.close() cls.gcp_api_manager.close() - def tearDown(self): - logger.info('----- TestMethod %s teardown -----', self.id()) - retryer = retryers.constant_retryer(wait_fixed=_timedelta(seconds=10), - attempts=3, - log_level=logging.INFO) - try: - retryer(self._cleanup) - except retryers.RetryError: - logger.exception('Got error during teardown') - - def _cleanup(self): - self.td.cleanup(force=self.force_cleanup) - self.client_runner.cleanup(force=self.force_cleanup) - self.server_runner.cleanup(force=self.force_cleanup, - force_namespace=self.force_cleanup) - def setupTrafficDirectorGrpc(self): self.td.setup_for_grpc(self.server_xds_host, self.server_xds_port, @@ -424,11 +365,86 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta): msg=f'Backend {backend} did not receive a single RPC') -class TdPropagationRetryableError(Exception): - pass +class IsolatedXdsKubernetesTestCase(XdsKubernetesBaseTestCase, + metaclass=abc.ABCMeta): + """Isolated test case. + + Base class for tests cases where infra resources are created before + each test, and destroyed after. + """ + + # Whether to randomize resources names for each test by appending a + # unique suffix. + _resource_suffix_randomize: bool = True + + def setUp(self): + """Hook method for setting up the test fixture before exercising it.""" + super().setUp() + + if self._resource_suffix_randomize: + self.resource_suffix = helpers_rand.random_resource_suffix() + logger.info('Test run resource prefix: %s, suffix: %s', + self.resource_prefix, self.resource_suffix) + + # TD Manager + self.td = self.initTrafficDirectorManager() + + # Test Server runner + self.server_namespace = KubernetesServerRunner.make_namespace_name( + self.resource_prefix, self.resource_suffix) + self.server_runner = self.initKubernetesServerRunner() + + # Test Client runner + self.client_namespace = KubernetesClientRunner.make_namespace_name( + self.resource_prefix, self.resource_suffix) + self.client_runner = self.initKubernetesClientRunner() + + # Ensures the firewall exist + if self.ensure_firewall: + self.td.create_firewall_rule( + allowed_ports=self.firewall_allowed_ports) + + # Randomize xds port, when it's set to 0 + if self.server_xds_port == 0: + # TODO(sergiitk): this is prone to race conditions: + # The port might not me taken now, but there's not guarantee + # it won't be taken until the tests get to creating + # forwarding rule. This check is better than nothing, + # but we should find a better approach. + self.server_xds_port = self.td.find_unused_forwarding_rule_port() + logger.info('Found unused xds port: %s', self.server_xds_port) + + @abc.abstractmethod + def initTrafficDirectorManager(self) -> TrafficDirectorManager: + raise NotImplementedError + + @abc.abstractmethod + def initKubernetesServerRunner(self) -> KubernetesServerRunner: + raise NotImplementedError + + @abc.abstractmethod + def initKubernetesClientRunner(self) -> KubernetesClientRunner: + raise NotImplementedError + + def tearDown(self): + logger.info('----- TestMethod %s teardown -----', self.id()) + retryer = retryers.constant_retryer(wait_fixed=_timedelta(seconds=10), + attempts=3, + log_level=logging.INFO) + try: + retryer(self._cleanup) + except retryers.RetryError: + logger.exception('Got error during teardown') + + def _cleanup(self): + self.td.cleanup(force=self.force_cleanup) + self.client_runner.cleanup(force=self.force_cleanup) + self.server_runner.cleanup(force=self.force_cleanup, + force_namespace=self.force_cleanup) -class RegularXdsKubernetesTestCase(XdsKubernetesTestCase): +class RegularXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase): + """Regular test case base class for testing PSM features in isolation.""" @classmethod def setUpClass(cls): @@ -518,7 +534,8 @@ class AppNetXdsKubernetesTestCase(RegularXdsKubernetesTestCase): compute_api_version=self.compute_api_version) -class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase): +class SecurityXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase): + """Test case base class for testing PSM security features in isolation.""" td: TrafficDirectorSecureManager class SecurityMode(enum.Enum):