Address PR feedback on comments/naming

pull/24983/head
Sergii Tkachenko 4 years ago
parent 9d5a7fad4d
commit a95f4288d9
  1. 4
      tools/run_tests/xds_test_driver/bin/run_td_setup.py
  2. 11
      tools/run_tests/xds_test_driver/framework/infrastructure/gcp/api.py
  3. 8
      tools/run_tests/xds_test_driver/framework/infrastructure/gcp/compute.py
  4. 3
      tools/run_tests/xds_test_driver/framework/infrastructure/gcp/network_security.py
  5. 3
      tools/run_tests/xds_test_driver/framework/infrastructure/gcp/network_services.py
  6. 4
      tools/run_tests/xds_test_driver/framework/infrastructure/k8s.py
  7. 12
      tools/run_tests/xds_test_driver/framework/infrastructure/traffic_director.py
  8. 2
      tools/run_tests/xds_test_driver/framework/rpc/__init__.py
  9. 9
      tools/run_tests/xds_test_driver/framework/test_app/base_runner.py
  10. 2
      tools/run_tests/xds_test_driver/framework/test_app/client_app.py
  11. 4
      tools/run_tests/xds_test_driver/framework/test_app/server_app.py
  12. 8
      tools/run_tests/xds_test_driver/framework/xds_k8s_testcase.py

@ -134,11 +134,9 @@ def main(argv):
neg_name, neg_zones = k8s_namespace.get_service_neg(
server_name, server_port)
# todo(sergiitk): figure out how to confirm NEG is ready to be added
# time.sleep(30)
td.load_backend_service()
td.backend_service_add_neg_backends(neg_name, neg_zones)
# todo(sergiitk): wait until client reports rpc health
# TODO(sergiitk): wait until client reports rpc health
elif command == 'backends-cleanup':
td.load_backend_service()
td.backend_service_remove_all_backends()

@ -16,8 +16,9 @@ import functools
import logging
import os
# For some reason without `import grpc`, google.protobuf.json_format produces
# "Segmentation fault"
# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
# TODO(sergiitk): Remove after #24897 is solved
import grpc
from absl import flags
from google.longrunning import operations_pb2
@ -145,9 +146,9 @@ class OperationError(Error):
class GcpProjectApiResource:
# todo(sergiitk): move someplace better
# TODO(sergiitk): move someplace better
_WAIT_FOR_OPERATION_SEC = 60 * 5
_WAIT_FIXES_SEC = 2
_WAIT_FIXED_SEC = 2
_GCP_API_RETRIES = 5
def __init__(self, api: discovery.Resource, project: str):
@ -158,7 +159,7 @@ class GcpProjectApiResource:
def wait_for_operation(operation_request,
test_success_fn,
timeout_sec=_WAIT_FOR_OPERATION_SEC,
wait_sec=_WAIT_FIXES_SEC):
wait_sec=_WAIT_FIXED_SEC):
retryer = tenacity.Retrying(
retry=(tenacity.retry_if_not_result(test_success_fn) |
tenacity.retry_if_exception_type()),

@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class ComputeV1(gcp.api.GcpProjectApiResource):
# todo(sergiitk): move someplace better
# TODO(sergiitk): move someplace better
_WAIT_FOR_BACKEND_SEC = 1200
_WAIT_FOR_OPERATION_SEC = 1200
_GCP_API_RETRIES = 5
@ -216,7 +216,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
return neg
network_endpoint_group = _wait_for_network_endpoint_group_ready()
# @todo(sergiitk): dataclass
# TODO(sergiitk): dataclass
return self.ZonalGcpResource(network_endpoint_group['name'],
network_endpoint_group['selfLink'], zone)
@ -224,7 +224,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
neg = self.api.networkEndpointGroups().get(project=self.project,
networkEndpointGroup=name,
zone=zone).execute()
# @todo(sergiitk): dataclass
# TODO(sergiitk): dataclass
return neg
def wait_for_backends_healthy_status(
@ -314,7 +314,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
operation = request.execute(num_retries=self._GCP_API_RETRIES)
logger.debug('Response %s', operation)
# todo(sergiitk) try using wait() here
# TODO(sergiitk) try using wait() here
# https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait
operation_request = self.api.globalOperations().get(
project=self.project, operation=operation['name'])

@ -48,7 +48,7 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networksecurity(self.API_VERSION), project)
# Shortcut
# Shortcut to projects/*/locations/ endpoints
self._api_locations = self.api.projects().locations()
def create_server_tls_policy(self, name, body: dict):
@ -99,6 +99,7 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
def _execute(self, *args, **kwargs):
# Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578
retryer = tenacity.Retrying(
retry=tenacity.retry_if_exception(self._operation_internal_error),
wait=tenacity.wait_fixed(10),

@ -43,7 +43,7 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networkservices(self.API_VERSION), project)
# Shortcut
# Shortcut to projects/*/locations/ endpoints
self._api_locations = self.api.projects().locations()
def create_endpoint_config_selector(self, name, body: dict):
@ -76,6 +76,7 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
def _execute(self, *args, **kwargs):
# Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578
retryer = tenacity.Retrying(
retry=tenacity.retry_if_exception(self._operation_internal_error),
wait=tenacity.wait_fixed(10),

@ -314,7 +314,7 @@ class KubernetesNamespace:
self.port_forward_stop(pf)
raise
# todo(sergiitk): return new PortForwarder object
# TODO(sergiitk): return new PortForwarder object
return pf
@staticmethod
@ -323,7 +323,7 @@ class KubernetesNamespace:
pf.kill()
stdout, _stderr = pf.communicate(timeout=5)
logger.info('Port forwarding stopped')
# todo(sergiitk): make debug
# TODO(sergiitk): make debug
logger.info('Port forwarding remaining stdout: %s', stdout)
@staticmethod

@ -66,7 +66,7 @@ class TrafficDirectorManager:
self.backend_service: Optional[GcpResource] = None
self.url_map: Optional[GcpResource] = None
self.target_proxy: Optional[GcpResource] = None
# todo(sergiitk): fix
# TODO(sergiitk): fix
self.target_proxy_is_http: bool = False
self.forwarding_rule: Optional[GcpResource] = None
self.backends: Set[ZonalGcpResource] = set()
@ -208,7 +208,7 @@ class TrafficDirectorManager:
self.url_map = None
def create_target_grpc_proxy(self):
# todo: different kinds
# TODO(sergiitk): merge with create_target_http_proxy()
name = self._ns_name(self.TARGET_PROXY_NAME)
logger.info('Creating target GRPC proxy %s to url map %s', name,
self.url_map.name)
@ -228,7 +228,7 @@ class TrafficDirectorManager:
self.target_proxy_is_http = False
def create_target_http_proxy(self):
# todo: different kinds
# TODO(sergiitk): merge with create_target_grpc_proxy()
name = self._ns_name(self.TARGET_PROXY_NAME)
logger.info('Creating target HTTP proxy %s to url map %s', name,
self.url_map.name)
@ -333,7 +333,8 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
def cleanup(self, *, force=False):
# Cleanup in the reverse order of creation
# todo(sergiitk): todo: fix
# TODO(sergiitk): remove next line once proxy deletion is not dependent
# upon proxy type.
self.target_proxy_is_http = True
super().cleanup(force=force)
self.delete_endpoint_config_selector(force=force)
@ -377,14 +378,11 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
server_port):
name = self._ns_name(self.ENDPOINT_CONFIG_SELECTOR_NAME)
logger.info('Creating Endpoint Config Selector %s', name)
# todo(sergiitk): user server config value
endpoint_matcher_labels = [{
"labelName": "app",
"labelValue": f"{server_namespace}-{server_name}"
}]
port_selector = {"ports": [str(server_port)]}
label_matcher_all = {
"metadataLabelMatchCriteria": "MATCH_ALL",
"metadataLabels": endpoint_matcher_labels

@ -15,6 +15,8 @@ import logging
import re
from typing import Optional, ClassVar, Dict
# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
import grpc
from google.protobuf import json_format
import google.protobuf.message

@ -64,7 +64,6 @@ class KubernetesBaseRunner:
@staticmethod
def _manifests_from_yaml_file(yaml_file):
# Parse yaml
with open(yaml_file) as f:
with contextlib.closing(yaml.safe_load_all(f)) as yml:
for manifest in yml:
@ -94,27 +93,23 @@ class KubernetesBaseRunner:
if next(manifests, False):
raise RunnerError('Exactly one document expected in manifest '
f'{template_file}')
# Apply the manifest
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
# Check correctness
if len(k8s_objects) != 1:
raise RunnerError('Expected exactly one object must created from '
f'manifest {template_file}')
logger.info('%s %s created', k8s_objects[0].kind,
k8s_objects[0].metadata.name)
return k8s_objects[0]
def _reuse_deployment(self, deployment_name) -> k8s.V1Deployment:
deployment = self.k8s_namespace.get_deployment(deployment_name)
# todo(sergiitk): check if good or must be recreated
# TODO(sergiitk): check if good or must be recreated
return deployment
def _reuse_service(self, service_name) -> k8s.V1Service:
service = self.k8s_namespace.get_service(service_name)
# todo(sergiitk): check if good or must be recreated
# TODO(sergiitk): check if good or must be recreated
return service
def _reuse_namespace(self) -> k8s.V1Namespace:

@ -153,7 +153,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
secure_mode=False,
print_response=False) -> XdsTestClient:
super().run()
# todo(sergiitk): make rpc UnaryCall enum or get it from proto
# TODO(sergiitk): make rpc UnaryCall enum or get it from proto
# Create service account
self.service_account = self._create_service_account(

@ -151,7 +151,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
secure_mode=False,
server_id=None,
replica_count=1) -> XdsTestServer:
# todo(sergiitk): multiple replicas
# TODO(sergiitk): multiple replicas
if replica_count != 1:
raise NotImplementedError("Multiple replicas not yet supported")
@ -217,7 +217,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
for pod in pods:
self._wait_pod_started(pod.metadata.name)
# todo(sergiitk): This is why multiple replicas not yet supported
# TODO(sergiitk): This is why multiple replicas not yet supported
pod = pods[0]
pod_ip = pod.status.pod_ip
rpc_host = None

@ -53,7 +53,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
cls.td_bootstrap_image = xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value
# Base namespace
# todo(sergiitk): generate for each test
# TODO(sergiitk): generate for each test
cls.namespace: str = xds_flags.NAMESPACE.value
# Test server
@ -76,7 +76,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
cls.gcp_api_manager = gcp.api.GcpApiManager()
def setUp(self):
# todo(sergiitk): generate for each test
# TODO(sergiitk): generate for each test
self.server_namespace = self.namespace
self.client_namespace = self.namespace
@ -84,7 +84,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
self.server_runner = None
self.client_runner = None
self.td = None
# todo(sergiitk): generate namespace with run id
# TODO(sergiitk): generate namespace with run id
@classmethod
def tearDownClass(cls):
@ -118,7 +118,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
self.assertFailedRpcsAtMost(lb_stats, 0)
def assertAllBackendsReceivedRpcs(self, lb_stats):
# todo(sergiitk): assert backends length
# TODO(sergiitk): assert backends length
logger.info(lb_stats.rpcs_by_peer)
for backend, rpcs_count in lb_stats.rpcs_by_peer.items():
self.assertGreater(

Loading…
Cancel
Save