@ -18,8 +18,10 @@ import enum
import hashlib
import logging
import re
import signal
import time
from typing import List , Optional , Tuple
from types import FrameType
from typing import Any , Callable , List , Optional , Tuple , Union
from absl import flags
from absl . testing import absltest
@ -65,6 +67,9 @@ LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
_ChannelState = grpc_channelz . ChannelState
_timedelta = datetime . timedelta
ClientConfig = grpc_csds . ClientConfig
# pylint complains about signal.Signals for some reason.
_SignalNum = Union [ int , signal . Signals ] # pylint: disable=no-member
_SignalHandler = Callable [ [ _SignalNum , Optional [ FrameType ] ] , Any ]
_TD_CONFIG_MAX_WAIT_SEC = 600
@ -97,6 +102,8 @@ class XdsKubernetesBaseTestCase(absltest.TestCase):
server_xds_port : int
td : TrafficDirectorManager
td_bootstrap_image : str
_prev_sigint_handler : Optional [ _SignalHandler ] = None
_handling_sigint : bool = False
@staticmethod
def is_supported ( config : skips . TestConfig ) - > bool :
@ -173,13 +180,32 @@ class XdsKubernetesBaseTestCase(absltest.TestCase):
cls . secondary_k8s_api_manager . close ( )
cls . gcp_api_manager . close ( )
def setUp ( self ) :
self . _prev_sigint_handler = signal . signal ( signal . SIGINT ,
self . handle_sigint )
def handle_sigint ( self , signalnum : _SignalNum ,
frame : Optional [ FrameType ] ) - > None :
logger . info ( ' Caught Ctrl+C, cleaning up... ' )
self . _handling_sigint = True
# Force resource cleanup by their name. Addresses the case where ctrl-c
# is pressed while waiting for the resource creation.
self . force_cleanup = True
self . tearDown ( )
self . tearDownClass ( )
self . _handling_sigint = False
if self . _prev_sigint_handler is not None :
signal . signal ( signal . SIGINT , self . _prev_sigint_handler )
raise KeyboardInterrupt
@contextlib . contextmanager
def subTest ( self , msg , * * params ) : # noqa pylint: disable=signature-differs
logger . info ( ' --- Starting subTest %s . %s --- ' , self . id ( ) , msg )
try :
yield super ( ) . subTest ( msg , * * params )
finally :
logger . info ( ' --- Finished subTest %s . %s --- ' , self . id ( ) , msg )
if not self . _handling_sigint :
logger . info ( ' --- Finished subTest %s . %s --- ' , self . id ( ) , msg )
def setupTrafficDirectorGrpc ( self ) :
self . td . setup_for_grpc ( self . server_xds_host ,