yapf and isort

pull/26360/head
Eric Gribkoff 4 years ago
parent 7ee7f506f9
commit 237ec6fba7
  1. 6
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  2. 4
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  3. 60
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  4. 12
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  5. 8
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  6. 7
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_flags.py
  7. 43
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  8. 22
      tools/run_tests/xds_k8s_test_driver/tests/change_backend_service_test.py
  9. 37
      tools/run_tests/xds_k8s_test_driver/tests/failover_test.py
  10. 22
      tools/run_tests/xds_k8s_test_driver/tests/remove_neg_test.py
  11. 11
      tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py

@ -142,7 +142,11 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
body=body,
**kwargs)
def backend_service_patch_backends(self, backend_service, backends, max_rate_per_endpoint: Optional[int] = None):
def backend_service_patch_backends(
self,
backend_service,
backends,
max_rate_per_endpoint: Optional[int] = None):
if max_rate_per_endpoint is None:
max_rate_per_endpoint = 5
backend_list = [{

@ -241,8 +241,8 @@ class KubernetesNamespace:
@retrying.retry(
retry_on_result=lambda r: not self._replicas_available(r, count),
stop_max_delay=timeout_sec * 1000 * 10, # TODO
wait_fixed=wait_sec * 1000 * 10) # TODO
stop_max_delay=timeout_sec * 1000 * 10, # TODO
wait_fixed=wait_sec * 1000 * 10) # TODO
def _wait_for_deployment_available_replicas():
deployment = self.get_deployment(name)
logger.debug(

@ -203,7 +203,11 @@ class TrafficDirectorManager:
self.compute.delete_backend_service(name)
self.backend_service = None
def backend_service_add_neg_backends(self, name, zones, max_rate_per_endpoint: Optional[int] = None):
def backend_service_add_neg_backends(self,
name,
zones,
max_rate_per_endpoint: Optional[
int] = None):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
@ -221,11 +225,13 @@ class TrafficDirectorManager:
self.backends.remove(backend)
self.backend_service_patch_backends()
def backend_service_patch_backends(self, max_rate_per_endpoint: Optional[int] = None):
def backend_service_patch_backends(
self, max_rate_per_endpoint: Optional[int] = None):
logging.info('Adding backends to Backend Service %s: %r',
self.backend_service.name, self.backends)
self.compute.backend_service_patch_backends(self.backend_service,
self.backends, max_rate_per_endpoint)
self.backends,
max_rate_per_endpoint)
def backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
@ -340,8 +346,8 @@ class TrafficDirectorManager:
def affinity_backend_service_patch_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.affinity_backend_service.name, self.affinity_backends)
self.compute.backend_service_patch_backends(self.affinity_backend_service,
self.affinity_backends)
self.compute.backend_service_patch_backends(
self.affinity_backend_service, self.affinity_backends)
def affinity_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
@ -367,19 +373,19 @@ class TrafficDirectorManager:
if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service
return {
'name':
name,
'defaultService':
dst_default_backend_service.url,
'hostRules': [{
'hosts': src_hosts,
'pathMatcher': matcher_name,
}],
'pathMatchers': [{
'name': matcher_name,
'defaultService': dst_host_rule_match_backend_service.url,
}],
}
'name':
name,
'defaultService':
dst_default_backend_service.url,
'hostRules': [{
'hosts': src_hosts,
'pathMatcher': matcher_name,
}],
'pathMatchers': [{
'name': matcher_name,
'defaultService': dst_host_rule_match_backend_service.url,
}],
}
def create_url_map(
self,
@ -391,23 +397,23 @@ class TrafficDirectorManager:
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Creating URL map "%s": %s -> %s', name, src_address,
self.backend_service.name)
resource = self.compute.create_url_map_with_content(self._generate_url_map_body(name, matcher_name,
[src_address],
self.backend_service))
resource = self.compute.create_url_map_with_content(
self._generate_url_map_body(name, matcher_name, [src_address],
self.backend_service))
self.url_map = resource
return resource
def patch_url_map(self, src_host: str, src_port: int, backend_service: GcpResource):
def patch_url_map(self, src_host: str, src_port: int,
backend_service: GcpResource):
src_address = f'{src_host}:{src_port}'
name = self.make_resource_name(self.URL_MAP_NAME)
matcher_name = self.make_resource_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Patching URL map "%s": %s -> %s', name, src_address,
backend_service.name)
self.compute.patch_url_map(self.url_map,
self._generate_url_map_body(name,
matcher_name,
[src_address],
backend_service))
self.compute.patch_url_map(
self.url_map,
self._generate_url_map_body(name, matcher_name, [src_address],
backend_service))
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
logger.info('Creating URL map: %s', url_map_body)

@ -19,10 +19,10 @@ import logging
from typing import Iterable, Optional, Tuple
import grpc
import framework.rpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
import framework.rpc
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
@ -111,6 +111,7 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper
deadline_sec=timeout_sec,
log_level=logging.INFO)
class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper):
stub: test_pb2_grpc.XdsUpdateHealthServiceStub
@ -135,6 +136,7 @@ class HealthClient(framework.rpc.grpc.GrpcClientHelper):
super().__init__(channel, health_pb2_grpc.HealthStub)
def check_health(self):
return self.call_unary_with_deadline(rpc='Check',
req=health_pb2.HealthCheckRequest(),
log_level=logging.INFO)
return self.call_unary_with_deadline(
rpc='Check',
req=health_pb2.HealthCheckRequest(),
log_level=logging.INFO)

@ -36,6 +36,7 @@ _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
_XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient
_HealthClient = grpc_testing.HealthClient
class XdsTestServer(framework.rpc.grpc.GrpcApp):
"""
Represents RPC services implemented in Server component of the xDS test app.
@ -70,14 +71,13 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
@property
@functools.lru_cache(None)
def update_health_service_client(self) -> _XdsUpdateHealthServiceClient:
return _XdsUpdateHealthServiceClient(self._make_channel(
self.maintenance_port))
return _XdsUpdateHealthServiceClient(
self._make_channel(self.maintenance_port))
@property
@functools.lru_cache(None)
def health_client(self) -> _HealthClient:
return _HealthClient(self._make_channel(
self.maintenance_port))
return _HealthClient(self._make_channel(self.maintenance_port))
def set_serving(self):
logger.info('Setting health status to serving')

@ -17,9 +17,10 @@ from absl import flags
KUBE_CONTEXT = flags.DEFINE_string("kube_context",
default=None,
help="Kubectl context to use")
SECONDARY_KUBE_CONTEXT = flags.DEFINE_string("secondary_kube_context",
default=None,
help="Secondary kubectl context to use for cluster in another region")
SECONDARY_KUBE_CONTEXT = flags.DEFINE_string(
"secondary_kube_context",
default=None,
help="Secondary kubectl context to use for cluster in another region")
GCP_SERVICE_ACCOUNT = flags.DEFINE_string(
"gcp_service_account",
default=None,

@ -62,6 +62,7 @@ _timedelta = datetime.timedelta
_TD_CONFIG_MAX_WAIT_SEC = 600
class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
_resource_suffix_randomize: bool = True
client_namespace: str
@ -204,7 +205,11 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
self.server_xds_port,
health_check_port=self.server_maintenance_port)
def setupServerBackends(self, *, wait_for_healthy_status=True, server_runner=None, max_rate_per_endpoint: Optional[int] = None):
def setupServerBackends(self,
*,
wait_for_healthy_status=True,
server_runner=None,
max_rate_per_endpoint: Optional[int] = None):
if server_runner is None:
server_runner = self.server_runner
# Load Backends
@ -212,7 +217,8 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
server_runner.service_name, self.server_port)
# Add backends to the Backend Service
self.td.backend_service_add_neg_backends(neg_name, neg_zones, max_rate_per_endpoint=max_rate_per_endpoint)
self.td.backend_service_add_neg_backends(
neg_name, neg_zones, max_rate_per_endpoint=max_rate_per_endpoint)
if wait_for_healthy_status:
self.td.wait_for_backends_healthy_status()
@ -238,21 +244,24 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
def assertRpcsEventuallyGoToGivenServers(self,
test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int = 100):
retryer = retryers.constant_retryer(wait_fixed=datetime.timedelta(
seconds=1),
test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int = 100):
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=1),
timeout=datetime.timedelta(seconds=_TD_CONFIG_MAX_WAIT_SEC),
log_level=logging.INFO)
log_level=logging.INFO)
try:
retryer(self._assertRpcsEventuallyGoToGivenServers, test_client, servers, num_rpcs)
retryer(self._assertRpcsEventuallyGoToGivenServers, test_client,
servers, num_rpcs)
except retryers.RetryError:
logger.exception('Rpcs did not go to expected servers before timeout %s', _TD_CONFIG_MAX_WAIT_SEC)
logger.exception(
'Rpcs did not go to expected servers before timeout %s',
_TD_CONFIG_MAX_WAIT_SEC)
def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int):
servers: List[XdsTestServer],
num_rpcs: int):
server_names = [server.pod_name for server in servers]
logger.info(f'Verifying RPCs go to {server_names}')
lb_stats = self.getClientRpcStats(test_client, num_rpcs)
@ -296,7 +305,7 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
@staticmethod
def getClientRpcStats(test_client: XdsTestClient,
num_rpcs: int) -> LoadBalancerStatsResponse:
num_rpcs: int) -> LoadBalancerStatsResponse:
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
logger.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
@ -360,7 +369,10 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
stats_port=self.client_port,
reuse_namespace=self.server_namespace == self.client_namespace)
def startTestServers(self, replica_count=1, server_runner=None, **kwargs) -> List[XdsTestServer]:
def startTestServers(self,
replica_count=1,
server_runner=None,
**kwargs) -> List[XdsTestServer]:
if server_runner is None:
server_runner = self.server_runner
test_servers = server_runner.run(
@ -369,7 +381,8 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
maintenance_port=self.server_maintenance_port,
**kwargs)
for test_server in test_servers:
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
test_server.set_xds_address(self.server_xds_host,
self.server_xds_port)
return test_servers
def startTestClient(self, test_server: XdsTestServer,

@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from typing import List
from typing import List
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
@ -33,10 +30,11 @@ _XdsTestClient = xds_k8s_testcase.XdsTestClient
class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.alternate_k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
self.server_namespace)
self.alternate_k8s_namespace = k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace)
self.alternate_server_runner = server_app.KubernetesServerRunner(
self.alternate_k8s_namespace,
deployment_name=self.server_name + '-alt',
@ -52,7 +50,8 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def tearDown(self):
if hasattr(self, 'alternate_server_runner'):
self.alternate_server_runner.cleanup(force=self.force_cleanup, force_namespace=self.force_cleanup)
self.alternate_server_runner.cleanup(
force=self.force_cleanup, force_namespace=self.force_cleanup)
super().tearDown()
def test_change_backend_service(self) -> None:
@ -64,8 +63,7 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_alternative_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host,
self.server_xds_port)
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
@ -78,7 +76,7 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
_XdsTestServer] = self.startTestServers()
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner)
server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends()
@ -101,8 +99,8 @@ class ChangeBackendServiceTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('10_change_backend_service'):
self.td.patch_url_map(self.server_xds_host, self.server_xds_port,
self.td.alternative_backend_service)
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.same_zone_test_servers)
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.same_zone_test_servers)
if __name__ == '__main__':

@ -12,12 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from absl import flags
from absl.testing import absltest
from typing import List
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
from framework.test_app import server_app
@ -52,7 +51,8 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def tearDown(self):
if hasattr(self, 'secondary_server_runner'):
self.secondary_server_runner.cleanup(force=self.force_cleanup, force_namespace=self.force_cleanup)
self.secondary_server_runner.cleanup(
force=self.force_cleanup, force_namespace=self.force_cleanup)
super().tearDown()
def test_failover(self) -> None:
@ -63,8 +63,7 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host,
self.server_xds_port)
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
@ -75,14 +74,15 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
with self.subTest('05_start_test_servers'):
self.default_test_servers: List[
_XdsTestServer] = self.startTestServers(
replica_count=self.REPLICA_COUNT)
replica_count=self.REPLICA_COUNT)
self.alternate_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.secondary_server_runner)
server_runner=self.secondary_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends(max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
self.setupServerBackends(
max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
self.setupServerBackends(
server_runner=self.secondary_server_runner,
max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
@ -99,23 +99,23 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.default_test_servers)
with self.subTest(
'10_secondary_locality_receives_no_requests_on_partial_primary_failure'):
'10_secondary_locality_receives_no_requests_on_partial_primary_failure'
):
self.default_test_servers[0].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers[
1:])
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.default_test_servers[1:])
with self.subTest('11_gentle_failover'):
self.default_test_servers[1].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers[
2:] + self.alternate_test_servers)
self.assertRpcsEventuallyGoToGivenServers(
self.test_client,
self.default_test_servers[2:] + self.alternate_test_servers)
with self.subTest(
'12_secondary_locality_receives_requests_on_primary_failure'):
'12_secondary_locality_receives_requests_on_primary_failure'):
self.default_test_servers[2].set_not_serving()
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.alternate_test_servers)
self.assertRpcsEventuallyGoToGivenServers(
self.test_client, self.alternate_test_servers)
with self.subTest('13_traffic_resumes_to_healthy_backends'):
for i in range(self.REPLICA_COUNT):
@ -124,6 +124,5 @@ class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.default_test_servers)
if __name__ == '__main__':
absltest.main(failfast=True)

@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from typing import List
from typing import List
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
@ -33,6 +30,7 @@ _XdsTestClient = xds_k8s_testcase.XdsTestClient
class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def setUp(self):
super().setUp()
self.alternate_server_runner = server_app.KubernetesServerRunner(
@ -51,7 +49,8 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
def tearDown(self):
if hasattr(self, 'alternate_server_runner'):
self.alternate_server_runner.cleanup(force=self.force_cleanup, force_namespace=self.force_cleanup)
self.alternate_server_runner.cleanup(
force=self.force_cleanup, force_namespace=self.force_cleanup)
super().tearDown()
def test_remove_neg(self) -> None:
@ -62,8 +61,7 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host,
self.server_xds_port)
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
@ -76,12 +74,11 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
_XdsTestServer] = self.startTestServers()
self.same_zone_test_servers: List[
_XdsTestServer] = self.startTestServers(
server_runner=self.alternate_server_runner)
server_runner=self.alternate_server_runner)
with self.subTest('06_add_server_backends_to_backend_services'):
self.setupServerBackends()
self.setupServerBackends(
server_runner=self.alternate_server_runner)
self.setupServerBackends(server_runner=self.alternate_server_runner)
with self.subTest('07_start_test_client'):
self.test_client: _XdsTestClient = self.startTestClient(
@ -94,8 +91,9 @@ class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.assertSuccessfulRpcs(self.test_client)
with self.subTest('10_remove_neg'):
self.assertRpcsEventuallyGoToGivenServers(self.test_client,
self.default_test_servers + self.same_zone_test_servers)
self.assertRpcsEventuallyGoToGivenServers(
self.test_client,
self.default_test_servers + self.same_zone_test_servers)
self.removeServerBackends(
server_runner=self.alternate_server_runner)
self.assertRpcsEventuallyGoToGivenServers(self.test_client,

@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from typing import List, Optional
from absl import flags
from absl.testing import absltest
from typing import List
from typing import List
from framework import xds_k8s_testcase
from framework.infrastructure import k8s
@ -44,8 +41,7 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_backend_service()
with self.subTest('02_create_url_map'):
self.td.create_url_map(self.server_xds_host,
self.server_xds_port)
self.td.create_url_map(self.server_xds_host, self.server_xds_port)
with self.subTest('03_create_target_proxy'):
self.td.create_target_proxy()
@ -54,8 +50,7 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.td.create_forwarding_rule(self.server_xds_port)
with self.subTest('05_start_test_servers'):
self.test_servers: List[
_XdsTestServer] = self.startTestServers(
self.test_servers: List[_XdsTestServer] = self.startTestServers(
replica_count=REPLICA_COUNT)
with self.subTest('06_add_server_backends_to_backend_services'):

Loading…
Cancel
Save