@ -57,6 +57,7 @@ GcpResourceManager = xds_url_map_test_resources.GcpResourceManager
HostRule = xds_url_map_test_resources . HostRule
PathMatcher = xds_url_map_test_resources . PathMatcher
JsonType = Any
_timedelta = datetime . timedelta
# ProtoBuf translatable RpcType enums
RpcTypeUnaryCall = ' UNARY_CALL '
@ -362,11 +363,26 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
@classmethod
def cleanupAfterTests ( cls ) :
logging . info ( ' ----- Doing cleanup after %s ----- ' , cls . __name__ )
cls . test_client_runner . cleanup ( force = True , force_namespace = True )
logging . info ( ' ----- TestCase %s teardown ----- ' , cls . __name__ )
retryer = retryers . constant_retryer ( wait_fixed = _timedelta ( seconds = 10 ) ,
attempts = 3 ,
log_level = logging . INFO )
cls . finished_test_cases . add ( cls . __name__ )
if cls . finished_test_cases == cls . test_case_names :
# Tear down the GCP resource after all tests finished
# Whether to clean up shared pre-provisioned infrastructure too.
# We only do it after all tests are finished.
cleanup_all = cls . finished_test_cases == cls . test_case_names
# Graceful cleanup: try three times, and don't fail the test on
# a cleanup failure.
try :
retryer ( cls . _cleanup , cleanup_all )
except retryers . RetryError :
logging . exception ( ' Got error during teardown ' )
@classmethod
def _cleanup ( cls , cleanup_all : bool = False ) :
cls . test_client_runner . cleanup ( force = True , force_namespace = True )
if cleanup_all :
GcpResourceManager ( ) . cleanup ( )
def _fetch_and_check_xds_config ( self ) :