@ -28,13 +28,14 @@ import grpc
from framework import xds_flags
from framework import xds_k8s_flags
from framework import xds_url_map_testcase
from framework . helpers import rand as helpers_rand
from framework . helpers import retryers
from framework . helpers import skips
import framework . helpers . rand
from framework . infrastructure import gcp
from framework . infrastructure import k8s
from framework . infrastructure import traffic_director
from framework . rpc import grpc_channelz
from framework . rpc import grpc_csds
from framework . rpc import grpc_testing
from framework . test_app import client_app
from framework . test_app import server_app
@ -60,27 +61,32 @@ KubernetesClientRunner = client_app.KubernetesClientRunner
LoadBalancerStatsResponse = grpc_testing . LoadBalancerStatsResponse
_ChannelState = grpc_channelz . ChannelState
_timedelta = datetime . timedelta
ClientConfig = framework . rpc . grpc_csds . ClientConfig
ClientConfig = grpc_csds . ClientConfig
_TD_CONFIG_MAX_WAIT_SEC = 600
class XdsKubernetesTestCase ( absltest . TestCase , metaclass = abc . ABCMeta ) :
_resource_suffix_randomize : bool = True
class TdPropagationRetryableError ( Exception ) :
""" Indicates that TD config hasn ' t propagated yet, and it ' s safe to retry """
class XdsKubernetesBaseTestCase ( absltest . TestCase ) :
client_namespace : str
client_runner : KubernetesClientRunner
gcp_api_manager : gcp . api . GcpApiManager
k8s_api_manager : k8s . KubernetesApiManager
secondary_k8s_api_manager : k8s . KubernetesApiManager
resource_prefix : str
resource_suffix : str = ' '
server_namespace : str
server_runner : KubernetesServerRunner
server_xds_port : int
server_maintenance_port : int
td : TrafficDirectorManager
@staticmethod
def isSupported ( config : skips . TestConfig ) - > bool :
""" Overrided by the test class to decide if the config is supported.
""" Overridden by the test class to decide if the config is supported.
Returns :
A bool indicates if the given config is supported .
@ -133,7 +139,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
cls . force_cleanup = xds_flags . FORCE_CLEANUP . value
cls . debug_use_port_forwarding = \
xds_k8s_flags . DEBUG_USE_PORT_FORWARDING . value
cls . enable_workload_identity = xds_k8s_flags . ENABLE_WORKLOAD_IDENTITY . value
cls . enable_workload_identity = \
xds_k8s_flags . ENABLE_WORKLOAD_IDENTITY . value
cls . check_local_certs = _CHECK_LOCAL_CERTS . value
# Resource managers
@ -143,78 +150,12 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
xds_k8s_flags . SECONDARY_KUBE_CONTEXT . value )
cls . gcp_api_manager = gcp . api . GcpApiManager ( )
def setUp ( self ) :
""" Hook method for setting up the test fixture before exercising it. """
super ( ) . setUp ( )
if self . _resource_suffix_randomize :
self . resource_suffix = framework . helpers . rand . random_resource_suffix (
)
logger . info ( ' Test run resource prefix: %s , suffix: %s ' ,
self . resource_prefix , self . resource_suffix )
# TD Manager
self . td = self . initTrafficDirectorManager ( )
# Test Server runner
self . server_namespace = KubernetesServerRunner . make_namespace_name (
self . resource_prefix , self . resource_suffix )
self . server_runner = self . initKubernetesServerRunner ( )
# Test Client runner
self . client_namespace = KubernetesClientRunner . make_namespace_name (
self . resource_prefix , self . resource_suffix )
self . client_runner = self . initKubernetesClientRunner ( )
# Ensures the firewall exist
if self . ensure_firewall :
self . td . create_firewall_rule (
allowed_ports = self . firewall_allowed_ports )
# Randomize xds port, when it's set to 0
if self . server_xds_port == 0 :
# TODO(sergiitk): this is prone to race conditions:
# The port might not me taken now, but there's not guarantee
# it won't be taken until the tests get to creating
# forwarding rule. This check is better than nothing,
# but we should find a better approach.
self . server_xds_port = self . td . find_unused_forwarding_rule_port ( )
logger . info ( ' Found unused xds port: %s ' , self . server_xds_port )
@abc . abstractmethod
def initTrafficDirectorManager ( self ) - > TrafficDirectorManager :
raise NotImplementedError
@abc . abstractmethod
def initKubernetesServerRunner ( self ) - > KubernetesServerRunner :
raise NotImplementedError
@abc . abstractmethod
def initKubernetesClientRunner ( self ) - > KubernetesClientRunner :
raise NotImplementedError
@classmethod
def tearDownClass ( cls ) :
cls . k8s_api_manager . close ( )
cls . secondary_k8s_api_manager . close ( )
cls . gcp_api_manager . close ( )
def tearDown ( self ) :
logger . info ( ' ----- TestMethod %s teardown ----- ' , self . id ( ) )
retryer = retryers . constant_retryer ( wait_fixed = _timedelta ( seconds = 10 ) ,
attempts = 3 ,
log_level = logging . INFO )
try :
retryer ( self . _cleanup )
except retryers . RetryError :
logger . exception ( ' Got error during teardown ' )
def _cleanup ( self ) :
self . td . cleanup ( force = self . force_cleanup )
self . client_runner . cleanup ( force = self . force_cleanup )
self . server_runner . cleanup ( force = self . force_cleanup ,
force_namespace = self . force_cleanup )
def setupTrafficDirectorGrpc ( self ) :
self . td . setup_for_grpc ( self . server_xds_host ,
self . server_xds_port ,
@ -424,11 +365,86 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
msg = f ' Backend { backend } did not receive a single RPC ' )
class TdPropagationRetryableError ( Exception ) :
pass
class IsolatedXdsKubernetesTestCase ( XdsKubernetesBaseTestCase ,
metaclass = abc . ABCMeta ) :
""" Isolated test case.
Base class for tests cases where infra resources are created before
each test , and destroyed after .
"""
# Whether to randomize resources names for each test by appending a
# unique suffix.
_resource_suffix_randomize : bool = True
def setUp ( self ) :
""" Hook method for setting up the test fixture before exercising it. """
super ( ) . setUp ( )
if self . _resource_suffix_randomize :
self . resource_suffix = helpers_rand . random_resource_suffix ( )
logger . info ( ' Test run resource prefix: %s , suffix: %s ' ,
self . resource_prefix , self . resource_suffix )
# TD Manager
self . td = self . initTrafficDirectorManager ( )
# Test Server runner
self . server_namespace = KubernetesServerRunner . make_namespace_name (
self . resource_prefix , self . resource_suffix )
self . server_runner = self . initKubernetesServerRunner ( )
# Test Client runner
self . client_namespace = KubernetesClientRunner . make_namespace_name (
self . resource_prefix , self . resource_suffix )
self . client_runner = self . initKubernetesClientRunner ( )
# Ensures the firewall exist
if self . ensure_firewall :
self . td . create_firewall_rule (
allowed_ports = self . firewall_allowed_ports )
# Randomize xds port, when it's set to 0
if self . server_xds_port == 0 :
# TODO(sergiitk): this is prone to race conditions:
# The port might not me taken now, but there's not guarantee
# it won't be taken until the tests get to creating
# forwarding rule. This check is better than nothing,
# but we should find a better approach.
self . server_xds_port = self . td . find_unused_forwarding_rule_port ( )
logger . info ( ' Found unused xds port: %s ' , self . server_xds_port )
@abc . abstractmethod
def initTrafficDirectorManager ( self ) - > TrafficDirectorManager :
raise NotImplementedError
@abc . abstractmethod
def initKubernetesServerRunner ( self ) - > KubernetesServerRunner :
raise NotImplementedError
@abc . abstractmethod
def initKubernetesClientRunner ( self ) - > KubernetesClientRunner :
raise NotImplementedError
def tearDown ( self ) :
logger . info ( ' ----- TestMethod %s teardown ----- ' , self . id ( ) )
retryer = retryers . constant_retryer ( wait_fixed = _timedelta ( seconds = 10 ) ,
attempts = 3 ,
log_level = logging . INFO )
try :
retryer ( self . _cleanup )
except retryers . RetryError :
logger . exception ( ' Got error during teardown ' )
def _cleanup ( self ) :
self . td . cleanup ( force = self . force_cleanup )
self . client_runner . cleanup ( force = self . force_cleanup )
self . server_runner . cleanup ( force = self . force_cleanup ,
force_namespace = self . force_cleanup )
class RegularXdsKubernetesTestCase ( XdsKubernetesTestCase ) :
class RegularXdsKubernetesTestCase ( IsolatedXdsKubernetesTestCase ) :
""" Regular test case base class for testing PSM features in isolation. """
@classmethod
def setUpClass ( cls ) :
@ -518,7 +534,8 @@ class AppNetXdsKubernetesTestCase(RegularXdsKubernetesTestCase):
compute_api_version = self . compute_api_version )
class SecurityXdsKubernetesTestCase ( XdsKubernetesTestCase ) :
class SecurityXdsKubernetesTestCase ( IsolatedXdsKubernetesTestCase ) :
""" Test case base class for testing PSM security features in isolation. """
td : TrafficDirectorSecureManager
class SecurityMode ( enum . Enum ) :