xds-k8s driver: Improve logging INFO-level logging

pull/25063/head
Sergii Tkachenko 4 years ago
parent 8934661278
commit 3b66fb6ead
  1. 2
      tools/run_tests/xds_k8s_test_driver/config/common.cfg
  2. 58
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py
  3. 45
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  4. 14
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py
  5. 15
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_services.py
  6. 36
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  7. 34
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  8. 15
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py
  9. 10
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  10. 60
      tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py
  11. 21
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  12. 4
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  13. 17
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  14. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client-secure.deployment.yaml
  15. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml
  16. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/namespace.yaml
  17. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml
  18. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml
  19. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.service.yaml
  20. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/service-account.yaml

@ -1,4 +1,4 @@
--namespace=interop-psm-security
--td_bootstrap_image=gcr.io/trafficdirector-prod/td-grpc-bootstrap:0.10.0
--logger_levels=__main__:DEBUG,framework:DEBUG
--logger_levels=__main__:DEBUG,framework:INFO
--verbosity=0

@ -11,15 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import contextlib
import functools
import logging
import os
from typing import Optional
# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
# TODO(sergiitk): Remove after #24897 is solved
import grpc
import grpc # noqa # pylint: disable=unused-import
from absl import flags
from google.cloud import secretmanager_v1
from google.longrunning import operations_pb2
@ -28,6 +29,7 @@ from google.rpc import code_pb2
from googleapiclient import discovery
import googleapiclient.errors
import tenacity
import yaml
logger = logging.getLogger(__name__)
PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
@ -64,6 +66,7 @@ class GcpApiManager:
COMPUTE_V1_DISCOVERY_FILE.value)
self.private_api_key_secret_name = (private_api_key_secret_name or
PRIVATE_API_KEY_SECRET_NAME.value)
# TODO(sergiitk): add options to pass google Credentials
self._exit_stack = contextlib.ExitStack()
def close(self):
@ -208,13 +211,18 @@ class GcpProjectApiResource:
reraise=True)
return retryer(operation_request.execute)
@staticmethod
def _resource_pretty_format(body: dict) -> str:
"""Return a string with pretty-printed resource body."""
return yaml.dump(body, explicit_start=True, explicit_end=True)
class GcpStandardCloudApiResource(GcpProjectApiResource):
DEFAULT_GLOBAL = 'global'
class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
GLOBAL_LOCATION = 'global'
def parent(self, location=None):
if not location:
location = self.DEFAULT_GLOBAL
def parent(self, location: Optional[str] = GLOBAL_LOCATION):
if location is None:
location = self.GLOBAL_LOCATION
return f'projects/{self.project}/locations/{location}'
def resource_full_name(self, name, collection_name):
@ -222,27 +230,41 @@ class GcpStandardCloudApiResource(GcpProjectApiResource):
def _create_resource(self, collection: discovery.Resource, body: dict,
**kwargs):
logger.debug("Creating %s", body)
logger.info("Creating %s resource:\n%s", self.api_name,
self._resource_pretty_format(body))
create_req = collection.create(parent=self.parent(),
body=body,
**kwargs)
self._execute(create_req)
@staticmethod
def _get_resource(collection: discovery.Resource, full_name):
@property
@abc.abstractmethod
def api_name(self) -> str:
raise NotImplementedError
@property
@abc.abstractmethod
def api_version(self) -> str:
raise NotImplementedError
def _get_resource(self, collection: discovery.Resource, full_name):
resource = collection.get(name=full_name).execute()
logger.debug("Loaded %r", resource)
logger.info('Loaded %s:\n%s', full_name,
self._resource_pretty_format(resource))
return resource
def _delete_resource(self, collection: discovery.Resource, full_name: str):
def _delete_resource(self, collection: discovery.Resource,
full_name: str) -> bool:
logger.debug("Deleting %s", full_name)
try:
self._execute(collection.delete(name=full_name))
return True
except googleapiclient.errors.HttpError as error:
# noinspection PyProtectedMember
reason = error._get_reason()
logger.info('Delete failed. Error: %s %s', error.resp.status,
reason)
if error.resp and error.resp.status == 404:
logger.info('%s not deleted since it does not exist', full_name)
else:
logger.warning('Failed to delete %s, %r', full_name, error)
return False
def _execute(self,
request,
@ -255,7 +277,7 @@ class GcpStandardCloudApiResource(GcpProjectApiResource):
timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
op_name = operation['name']
logger.debug('Waiting for %s operation, timeout %s sec: %s',
self.__class__.__name__, timeout_sec, op_name)
self.api_name, timeout_sec, op_name)
op_request = self.api.projects().locations().operations().get(
name=op_name)
@ -266,4 +288,4 @@ class GcpStandardCloudApiResource(GcpProjectApiResource):
logger.debug('Completed operation: %s', operation)
if 'error' in operation:
raise OperationError(self.__class__.__name__, operation)
raise OperationError(self.api_name, operation)

@ -30,7 +30,6 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
# TODO(sergiitk): move someplace better
_WAIT_FOR_BACKEND_SEC = 1200
_WAIT_FOR_OPERATION_SEC = 1200
_GCP_API_RETRIES = 5
@dataclasses.dataclass(frozen=True)
class GcpResource:
@ -64,7 +63,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
})
def delete_health_check(self, name):
self._delete_resource(self.api.healthChecks(), healthCheck=name)
self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
def create_backend_service_traffic_director(
self,
@ -110,7 +109,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
backendService=backend_service.name)
def delete_backend_service(self, name):
self._delete_resource(self.api.backendServices(), backendService=name)
self._delete_resource(self.api.backendServices(), 'backendService',
name)
def create_url_map(
self,
@ -139,7 +139,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
})
def delete_url_map(self, name):
self._delete_resource(self.api.urlMaps(), urlMap=name)
self._delete_resource(self.api.urlMaps(), 'urlMap', name)
def create_target_grpc_proxy(
self,
@ -153,8 +153,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
})
def delete_target_grpc_proxy(self, name):
self._delete_resource(self.api.targetGrpcProxies(),
targetGrpcProxy=name)
self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy',
name)
def create_target_http_proxy(
self,
@ -167,8 +167,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
})
def delete_target_http_proxy(self, name):
self._delete_resource(self.api.targetHttpProxies(),
targetHttpProxy=name)
self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy',
name)
def create_forwarding_rule(
self,
@ -191,7 +191,7 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def delete_forwarding_rule(self, name):
self._delete_resource(self.api.globalForwardingRules(),
forwardingRule=name)
'forwardingRule', name)
@staticmethod
def _network_endpoint_group_not_ready(neg):
@ -279,29 +279,38 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
def _get_resource(self, collection: discovery.Resource,
**kwargs) -> GcpResource:
resp = collection.get(project=self.project, **kwargs).execute()
logger.debug("Loaded %r", resp)
logger.info('Loaded compute resource:\n%s',
self._resource_pretty_format(resp))
return self.GcpResource(resp['name'], resp['selfLink'])
def _insert_resource(self, collection: discovery.Resource,
body: Dict[str, Any]) -> GcpResource:
logger.debug("Creating %s", body)
logger.info('Creating compute resource:\n%s',
self._resource_pretty_format(body))
resp = self._execute(collection.insert(project=self.project, body=body))
return self.GcpResource(body['name'], resp['targetLink'])
def _patch_resource(self, collection, body, **kwargs):
logger.debug("Patching %s", body)
logger.info('Patching compute resource:\n%s',
self._resource_pretty_format(body))
self._execute(
collection.patch(project=self.project, body=body, **kwargs))
def _delete_resource(self, collection, **kwargs):
def _delete_resource(self, collection: discovery.Resource,
resource_type: str, resource_name: str) -> bool:
try:
self._execute(collection.delete(project=self.project, **kwargs))
params = {"project": self.project, resource_type: resource_name}
self._execute(collection.delete(**params))
return True
except googleapiclient.errors.HttpError as error:
# noinspection PyProtectedMember
reason = error._get_reason()
logger.info('Delete failed. Error: %s %s', error.resp.status,
reason)
if error.resp and error.resp.status == 404:
logger.info(
'Resource %s "%s" not deleted since it does not exist',
resource_type, resource_name)
else:
logger.warning('Failed to delete %s "%s", %r', resource_type,
resource_name, error)
return False
@staticmethod
def _operation_status_done(operation):

@ -23,8 +23,6 @@ logger = logging.getLogger(__name__)
class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
API_NAME = 'networksecurity'
API_VERSION = 'v1alpha1'
SERVER_TLS_POLICIES = 'serverTlsPolicies'
CLIENT_TLS_POLICIES = 'clientTlsPolicies'
@ -47,10 +45,18 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
create_time: str
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networksecurity(self.API_VERSION), project)
super().__init__(api_manager.networksecurity(self.api_version), project)
# Shortcut to projects/*/locations/ endpoints
self._api_locations = self.api.projects().locations()
@property
def api_name(self) -> str:
return 'networksecurity'
@property
def api_version(self) -> str:
return 'v1alpha1'
def create_server_tls_policy(self, name, body: dict):
return self._create_resource(self._api_locations.serverTlsPolicies(),
body,
@ -97,7 +103,7 @@ class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
collection=self._api_locations.clientTlsPolicies(),
full_name=self.resource_full_name(name, self.CLIENT_TLS_POLICIES))
def _execute(self, *args, **kwargs):
def _execute(self, *args, **kwargs): # pylint: disable=signature-differs
# Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578
retryer = tenacity.Retrying(

@ -24,9 +24,6 @@ logger = logging.getLogger(__name__)
class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
API_NAME = 'networkservices'
API_VERSION = 'v1alpha1'
DEFAULT_GLOBAL = 'global'
ENDPOINT_CONFIG_SELECTORS = 'endpointConfigSelectors'
@dataclasses.dataclass(frozen=True)
@ -42,10 +39,18 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
create_time: str
def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
super().__init__(api_manager.networkservices(self.API_VERSION), project)
super().__init__(api_manager.networkservices(self.api_version), project)
# Shortcut to projects/*/locations/ endpoints
self._api_locations = self.api.projects().locations()
@property
def api_name(self) -> str:
return 'networkservices'
@property
def api_version(self) -> str:
return 'v1alpha1'
def create_endpoint_config_selector(self, name, body: dict):
return self._create_resource(
self._api_locations.endpointConfigSelectors(),
@ -74,7 +79,7 @@ class NetworkServicesV1Alpha1(gcp.api.GcpStandardCloudApiResource):
full_name=self.resource_full_name(name,
self.ENDPOINT_CONFIG_SELECTORS))
def _execute(self, *args, **kwargs):
def _execute(self, *args, **kwargs): # pylint: disable=signature-differs
# Workaround TD bug: throttled operations are reported as internal.
# Ref b/175345578
retryer = tenacity.Retrying(

@ -67,7 +67,11 @@ class KubernetesApiManager:
@classmethod
@functools.lru_cache(None)
def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
return kubernetes.config.new_client_from_config(context=context)
client_instance = kubernetes.config.new_client_from_config(
context=context)
logger.info('Using kubernetes context "%s", active host: %s', context,
client_instance.configuration.host)
return client_instance
class PortForwardingError(Exception):
@ -134,8 +138,8 @@ class KubernetesNamespace:
def _wait_for_deleted_service_with_retry():
service = self.get_service(name)
if service is not None:
logger.info('Waiting for service %s to be deleted',
service.metadata.name)
logger.debug('Waiting for service %s to be deleted',
service.metadata.name)
return service
_wait_for_deleted_service_with_retry()
@ -151,13 +155,13 @@ class KubernetesNamespace:
def _wait_for_deleted_service_account_with_retry():
service_account = self.get_service_account(name)
if service_account is not None:
logger.info('Waiting for service account %s to be deleted',
service_account.metadata.name)
logger.debug('Waiting for service account %s to be deleted',
service_account.metadata.name)
return service_account
_wait_for_deleted_service_account_with_retry()
def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=2):
def wait_for_namespace_deleted(self, timeout_sec=240, wait_sec=5):
@retrying.retry(retry_on_result=lambda r: r is not None,
stop_max_delay=timeout_sec * 1000,
@ -165,8 +169,8 @@ class KubernetesNamespace:
def _wait_for_deleted_namespace_with_retry():
namespace = self.get()
if namespace is not None:
logger.info('Waiting for namespace %s to be deleted',
namespace.metadata.name)
logger.debug('Waiting for namespace %s to be deleted',
namespace.metadata.name)
return namespace
_wait_for_deleted_namespace_with_retry()
@ -179,7 +183,8 @@ class KubernetesNamespace:
def _wait_for_service_neg():
service = self.get_service(name)
if self.NEG_STATUS_META not in service.metadata.annotations:
logger.info('Waiting for service %s NEG', service.metadata.name)
logger.debug('Waiting for service %s NEG',
service.metadata.name)
return False
return True
@ -216,7 +221,7 @@ class KubernetesNamespace:
name,
count=1,
timeout_sec=60,
wait_sec=1):
wait_sec=3):
@retrying.retry(
retry_on_result=lambda r: not self._replicas_available(r, count),
@ -224,7 +229,7 @@ class KubernetesNamespace:
wait_fixed=wait_sec * 1000)
def _wait_for_deployment_available_replicas():
deployment = self.get_deployment(name)
logger.info(
logger.debug(
'Waiting for deployment %s to have %s available '
'replicas, current count %s', deployment.metadata.name, count,
deployment.status.available_replicas)
@ -243,7 +248,7 @@ class KubernetesNamespace:
def _wait_for_deleted_deployment_with_retry():
deployment = self.get_deployment(deployment_name)
if deployment is not None:
logger.info(
logger.debug(
'Waiting for deployment %s to be deleted. '
'Non-terminated replicas: %s', deployment.metadata.name,
deployment.status.replicas)
@ -266,8 +271,8 @@ class KubernetesNamespace:
wait_fixed=wait_sec * 1000)
def _wait_for_pod_started():
pod = self.get_pod(pod_name)
logger.info('Waiting for pod %s to start, current phase: %s',
pod.metadata.name, pod.status.phase)
logger.debug('Waiting for pod %s to start, current phase: %s',
pod.metadata.name, pod.status.phase)
return pod
_wait_for_pod_started()
@ -324,8 +329,7 @@ class KubernetesNamespace:
pf.kill()
stdout, _stderr = pf.communicate(timeout=5)
logger.info('Port forwarding stopped')
# TODO(sergiitk): make debug
logger.info('Port forwarding remaining stdout: %s', stdout)
logger.debug('Port forwarding remaining stdout: %s', stdout)
@staticmethod
def _pod_started(pod: V1Pod):

@ -66,7 +66,7 @@ class TrafficDirectorManager:
self.backend_service: Optional[GcpResource] = None
self.url_map: Optional[GcpResource] = None
self.target_proxy: Optional[GcpResource] = None
# TODO(sergiitk): fix
# TODO(sergiitk): remove this flag once target proxy resource loaded
self.target_proxy_is_http: bool = False
self.forwarding_rule: Optional[GcpResource] = None
self.backends: Set[ZonalGcpResource] = set()
@ -108,7 +108,7 @@ class TrafficDirectorManager:
raise ValueError('Health check %s already created, delete it first',
self.health_check.name)
name = self._ns_name(self.HEALTH_CHECK_NAME)
logger.info('Creating %s Health Check %s', protocol.name, name)
logger.info('Creating %s Health Check "%s"', protocol.name, name)
if protocol is HealthCheckProtocol.TCP:
resource = self.compute.create_health_check_tcp(
name, use_serving_port=True)
@ -123,7 +123,7 @@ class TrafficDirectorManager:
name = self.health_check.name
else:
return
logger.info('Deleting Health Check %s', name)
logger.info('Deleting Health Check "%s"', name)
self.compute.delete_health_check(name)
self.health_check = None
@ -131,7 +131,7 @@ class TrafficDirectorManager:
self,
protocol: BackendServiceProtocol = BackendServiceProtocol.GRPC):
name = self._ns_name(self.BACKEND_SERVICE_NAME)
logger.info('Creating %s Backend Service %s', protocol.name, name)
logger.info('Creating %s Backend Service "%s"', protocol.name, name)
resource = self.compute.create_backend_service_traffic_director(
name, health_check=self.health_check, protocol=protocol)
self.backend_service = resource
@ -148,15 +148,16 @@ class TrafficDirectorManager:
name = self.backend_service.name
else:
return
logger.info('Deleting Backend Service %s', name)
logger.info('Deleting Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.backend_service = None
def backend_service_add_neg_backends(self, name, zones):
logger.info('Loading NEGs')
logger.info('Waiting for Network Endpoint Groups recognize endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG %s in zone %s', backend.name, backend.zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.backends.add(backend)
self.backend_service_add_backends()
@ -188,7 +189,7 @@ class TrafficDirectorManager:
src_address = f'{src_host}:{src_port}'
name = self._ns_name(self.URL_MAP_NAME)
matcher_name = self._ns_name(self.URL_MAP_PATH_MATCHER_NAME)
logger.info('Creating URL map %s %s -> %s', name, src_address,
logger.info('Creating URL map "%s": %s -> %s', name, src_address,
self.backend_service.name)
resource = self.compute.create_url_map(name, matcher_name,
[src_address],
@ -203,14 +204,14 @@ class TrafficDirectorManager:
name = self.url_map.name
else:
return
logger.info('Deleting URL Map %s', name)
logger.info('Deleting URL Map "%s"', name)
self.compute.delete_url_map(name)
self.url_map = None
def create_target_grpc_proxy(self):
# TODO(sergiitk): merge with create_target_http_proxy()
name = self._ns_name(self.TARGET_PROXY_NAME)
logger.info('Creating target GRPC proxy %s to url map %s', name,
logger.info('Creating target GRPC proxy "%s" to URL map %s', name,
self.url_map.name)
resource = self.compute.create_target_grpc_proxy(name, self.url_map)
self.target_proxy = resource
@ -222,7 +223,7 @@ class TrafficDirectorManager:
name = self.target_proxy.name
else:
return
logger.info('Deleting Target GRPC proxy %s', name)
logger.info('Deleting Target GRPC proxy "%s"', name)
self.compute.delete_target_grpc_proxy(name)
self.target_proxy = None
self.target_proxy_is_http = False
@ -230,7 +231,7 @@ class TrafficDirectorManager:
def create_target_http_proxy(self):
# TODO(sergiitk): merge with create_target_grpc_proxy()
name = self._ns_name(self.TARGET_PROXY_NAME)
logger.info('Creating target HTTP proxy %s to url map %s', name,
logger.info('Creating target HTTP proxy "%s" to url map %s', name,
self.url_map.name)
resource = self.compute.create_target_http_proxy(name, self.url_map)
self.target_proxy = resource
@ -243,7 +244,7 @@ class TrafficDirectorManager:
name = self.target_proxy.name
else:
return
logger.info('Deleting HTTP Target proxy %s', name)
logger.info('Deleting HTTP Target proxy "%s"', name)
self.compute.delete_target_http_proxy(name)
self.target_proxy = None
self.target_proxy_is_http = False
@ -251,8 +252,9 @@ class TrafficDirectorManager:
def create_forwarding_rule(self, src_port: int):
name = self._ns_name(self.FORWARDING_RULE_NAME)
src_port = int(src_port)
logging.info('Creating forwarding rule %s 0.0.0.0:%s -> %s in %s', name,
src_port, self.target_proxy.url, self.network)
logging.info(
'Creating forwarding rule "%s" in network "%s": 0.0.0.0:%s -> %s',
name, self.network, src_port, self.target_proxy.url)
resource = self.compute.create_forwarding_rule(name, src_port,
self.target_proxy,
self.network_url)
@ -266,7 +268,7 @@ class TrafficDirectorManager:
name = self.forwarding_rule.name
else:
return
logger.info('Deleting Forwarding rule %s', name)
logger.info('Deleting Forwarding rule "%s"', name)
self.compute.delete_forwarding_rule(name)
self.forwarding_rule = None

@ -46,7 +46,8 @@ class GrpcClientHelper:
req: Message,
wait_for_ready_sec: Optional[int] = DEFAULT_WAIT_FOR_READY_SEC,
connection_timeout_sec: Optional[
int] = DEFAULT_CONNECTION_TIMEOUT_SEC) -> Message:
int] = DEFAULT_CONNECTION_TIMEOUT_SEC,
log_level: Optional[int] = logging.DEBUG) -> Message:
if wait_for_ready_sec is None:
wait_for_ready_sec = self.DEFAULT_WAIT_FOR_READY_SEC
if connection_timeout_sec is None:
@ -56,14 +57,14 @@ class GrpcClientHelper:
rpc_callable: grpc.UnaryUnaryMultiCallable = getattr(self.stub, rpc)
call_kwargs = dict(wait_for_ready=True, timeout=timeout_sec)
self._log_debug(rpc, req, call_kwargs)
self._log_rpc_request(rpc, req, call_kwargs, log_level)
return rpc_callable(req, **call_kwargs)
def _log_debug(self, rpc, req, call_kwargs):
logger.debug('RPC %s.%s(request=%s(%r), %s)',
self.log_service_name, rpc, req.__class__.__name__,
json_format.MessageToDict(req),
', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))
def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG):
logger.log(logging.DEBUG if log_level is None else log_level,
'RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc,
req.__class__.__name__, json_format.MessageToDict(req),
', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))
class GrpcApp:

@ -15,6 +15,7 @@
This contains helpers for gRPC services defined in
https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
"""
import logging
from typing import Optional
import grpc
@ -25,7 +26,7 @@ from src.proto.grpc.testing import messages_pb2
# Type aliases
_LoadBalancerStatsRequest = messages_pb2.LoadBalancerStatsRequest
_LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse
LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse
class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
@ -40,12 +41,13 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
*,
num_rpcs: int,
timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
) -> _LoadBalancerStatsResponse:
) -> LoadBalancerStatsResponse:
if timeout_sec is None:
timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC
return self.call_unary_with_deadline(rpc='GetClientStats',
wait_for_ready_sec=timeout_sec,
req=_LoadBalancerStatsRequest(
num_rpcs=num_rpcs,
timeout_sec=timeout_sec))
timeout_sec=timeout_sec),
wait_for_ready_sec=timeout_sec,
log_level=logging.INFO)

@ -28,10 +28,9 @@ class RunnerError(Exception):
"""Error running app"""
TEMPLATE_DIR = '../../kubernetes-manifests'
class KubernetesBaseRunner:
TEMPLATE_DIR_NAME = 'kubernetes-manifests'
TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}'
def __init__(self,
k8s_namespace,
@ -75,17 +74,19 @@ class KubernetesBaseRunner:
for manifest in yml:
yield manifest
@staticmethod
def _template_file_from_name(template_name):
templates_path = pathlib.Path(__file__).parent / TEMPLATE_DIR
return templates_path.joinpath(template_name).absolute()
@classmethod
def _template_file_from_name(cls, template_name):
templates_path = (pathlib.Path(__file__).parent /
cls.TEMPLATE_DIR_RELATIVE_PATH)
return templates_path.joinpath(template_name).resolve()
def _create_from_template(self, template_name, **kwargs):
template_file = self._template_file_from_name(template_name)
logger.info("Loading template: %s", template_file)
logger.debug("Loading k8s manifest template: %s", template_file)
yaml_doc = self._render_template(template_file, **kwargs)
logger.info("Rendered template:\n%s\n", yaml_doc)
logger.info("Rendered template %s/%s:\n%s", self.TEMPLATE_DIR_NAME,
template_name, yaml_doc)
manifests = self._manifests_from_str(yaml_doc)
manifest = next(manifests)
@ -121,10 +122,11 @@ class KubernetesBaseRunner:
raise RunnerError('Expected V1Namespace to be created '
f'from manifest {template}')
if namespace.metadata.name != kwargs['namespace_name']:
raise RunnerError('Namespace created with unexpected name: '
raise RunnerError('V1Namespace created with unexpected name: '
f'{namespace.metadata.name}')
logger.info('Deployment %s created at %s', namespace.metadata.self_link,
namespace.metadata.creation_timestamp)
logger.debug('V1Namespace %s created at %s',
namespace.metadata.self_link,
namespace.metadata.creation_timestamp)
return namespace
def _create_service_account(self, template,
@ -136,9 +138,9 @@ class KubernetesBaseRunner:
if resource.metadata.name != kwargs['service_account_name']:
raise RunnerError('V1ServiceAccount created with unexpected name: '
f'{resource.metadata.name}')
logger.info('V1ServiceAccount %s created at %s',
resource.metadata.self_link,
resource.metadata.creation_timestamp)
logger.debug('V1ServiceAccount %s created at %s',
resource.metadata.self_link,
resource.metadata.creation_timestamp)
return resource
def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment:
@ -147,11 +149,11 @@ class KubernetesBaseRunner:
raise RunnerError('Expected V1Deployment to be created '
f'from manifest {template}')
if deployment.metadata.name != kwargs['deployment_name']:
raise RunnerError('Deployment created with unexpected name: '
raise RunnerError('V1Deployment created with unexpected name: '
f'{deployment.metadata.name}')
logger.info('Deployment %s created at %s',
deployment.metadata.self_link,
deployment.metadata.creation_timestamp)
logger.debug('V1Deployment %s created at %s',
deployment.metadata.self_link,
deployment.metadata.creation_timestamp)
return deployment
def _create_service(self, template, **kwargs) -> k8s.V1Service:
@ -160,13 +162,14 @@ class KubernetesBaseRunner:
raise RunnerError('Expected V1Service to be created '
f'from manifest {template}')
if service.metadata.name != kwargs['service_name']:
raise RunnerError('Service created with unexpected name: '
raise RunnerError('V1Service created with unexpected name: '
f'{service.metadata.name}')
logger.info('Service %s created at %s', service.metadata.self_link,
service.metadata.creation_timestamp)
logger.debug('V1Service %s created at %s', service.metadata.self_link,
service.metadata.creation_timestamp)
return service
def _delete_deployment(self, name, wait_for_deletion=True):
logger.info('Deleting deployment %s', name)
try:
self.k8s_namespace.delete_deployment(name)
except k8s.ApiException as e:
@ -176,9 +179,10 @@ class KubernetesBaseRunner:
if wait_for_deletion:
self.k8s_namespace.wait_for_deployment_deleted(name)
logger.info('Deployment %s deleted', name)
logger.debug('Deployment %s deleted', name)
def _delete_service(self, name, wait_for_deletion=True):
logger.info('Deleting service %s', name)
try:
self.k8s_namespace.delete_service(name)
except k8s.ApiException as e:
@ -188,9 +192,10 @@ class KubernetesBaseRunner:
if wait_for_deletion:
self.k8s_namespace.wait_for_service_deleted(name)
logger.info('Service %s deleted', name)
logger.debug('Service %s deleted', name)
def _delete_service_account(self, name, wait_for_deletion=True):
logger.info('Deleting service account %s', name)
try:
self.k8s_namespace.delete_service_account(name)
except k8s.ApiException as e:
@ -200,9 +205,10 @@ class KubernetesBaseRunner:
if wait_for_deletion:
self.k8s_namespace.wait_for_service_account_deleted(name)
logger.info('Service account %s deleted', name)
logger.debug('Service account %s deleted', name)
def _delete_namespace(self, wait_for_deletion=True):
logger.info('Deleting namespace %s', self.k8s_namespace.name)
try:
self.k8s_namespace.delete()
except k8s.ApiException as e:
@ -212,10 +218,10 @@ class KubernetesBaseRunner:
if wait_for_deletion:
self.k8s_namespace.wait_for_namespace_deleted()
logger.info('Namespace %s deleted', self.k8s_namespace.name)
logger.debug('Namespace %s deleted', self.k8s_namespace.name)
def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs):
logger.info('Waiting for deployment %s to have %s available replicas',
logger.info('Waiting for deployment %s to have %s available replica(s)',
name, count)
self.k8s_namespace.wait_for_deployment_available_replicas(
name, count, **kwargs)

@ -72,7 +72,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
*,
num_rpcs: int,
timeout_sec: Optional[int] = None,
) -> grpc_testing._LoadBalancerStatsResponse:
) -> grpc_testing.LoadBalancerStatsResponse:
"""
Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
"""
@ -86,19 +86,22 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
retryer = tenacity.Retrying(
retry=(tenacity.retry_if_result(lambda r: r is None) |
tenacity.retry_if_exception_type()),
wait=tenacity.wait_exponential(max=10),
wait=tenacity.wait_exponential(min=10, max=25),
stop=tenacity.stop_after_delay(60 * 3),
reraise=True)
logger.info(
'Waiting for client %s to establish READY gRPC channel with %s',
self.ip, self.server_target)
channel = retryer(self.get_active_server_channel)
logger.info('Active server channel found: channel_id: %s, %s',
channel.ref.channel_id, channel.ref.name)
logger.debug('Server channel:\n%r', channel)
logger.info(
'gRPC channel between client %s and %s transitioned to READY:\n%s',
self.ip, self.server_target, channel)
def get_active_server_channel(self) -> Optional[grpc_channelz.Channel]:
for channel in self.get_server_channels():
state: _ChannelConnectivityState = channel.data.state
logger.debug('Server channel: %s, state: %s', channel.ref.name,
_ChannelConnectivityState.State.Name(state.state))
logger.info('Server channel: %s, state: %s', channel.ref.name,
_ChannelConnectivityState.State.Name(state.state))
if state.state is _ChannelConnectivityState.READY:
return channel
raise self.NotFound('Client has no active channel with the server')
@ -198,8 +201,8 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('Enabling port forwarding from %s:%s', pod_ip,
self.stats_port)
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, self.stats_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=self.stats_port)
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS

@ -233,8 +233,8 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
rpc_host = None
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('Enabling port forwarding from %s:%s', pod_ip,
maintenance_port)
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, maintenance_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=maintenance_port)
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS

@ -21,10 +21,11 @@ from absl.testing import absltest
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import k8s
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.rpc import grpc_channelz
from framework.rpc import grpc_testing
from framework.test_app import client_app
from framework.test_app import server_app
@ -39,6 +40,7 @@ flags.adopt_module_key_flags(xds_k8s_flags)
# Type aliases
XdsTestServer = server_app.XdsTestServer
XdsTestClient = client_app.XdsTestClient
_LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
class XdsKubernetesTestCase(absltest.TestCase):
@ -99,7 +101,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
cls.gcp_api_manager.close()
def tearDown(self):
logger.debug('######## tearDown(): resource cleanup initiated ########')
logger.info('----- TestMethod %s teardown -----', self.id())
self.td.cleanup(force=self.force_cleanup)
self.client_runner.cleanup(force=self.force_cleanup)
self.server_runner.cleanup(force=self.force_cleanup,
@ -120,19 +122,22 @@ class XdsKubernetesTestCase(absltest.TestCase):
test_client: XdsTestClient,
num_rpcs: int = 100):
# Run the test
lb_stats: _LoadBalancerStatsResponse
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
logger.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
test_client.ip, lb_stats)
# Check the results
self.assertAllBackendsReceivedRpcs(lb_stats)
self.assertFailedRpcsAtMost(lb_stats, 0)
def assertAllBackendsReceivedRpcs(self, lb_stats):
# TODO(sergiitk): assert backends length
logger.info(lb_stats.rpcs_by_peer)
for backend, rpcs_count in lb_stats.rpcs_by_peer.items():
self.assertGreater(
int(rpcs_count),
0,
msg='Backend {backend} did not receive a single RPC')
msg=f'Backend {backend} did not receive a single RPC')
def assertFailedRpcsAtMost(self, lb_stats, limit):
failed = int(lb_stats.num_failures)
@ -188,8 +193,6 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
**kwargs) -> XdsTestClient:
test_client = self.client_runner.run(server_target=test_server.xds_uri,
**kwargs)
logger.debug('Waiting fot the client to establish healthy channel with '
'the server')
test_client.wait_for_active_server_channel()
return test_client
@ -263,8 +266,6 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
test_client = self.client_runner.run(server_target=test_server.xds_uri,
secure_mode=True,
**kwargs)
logger.debug('Waiting fot the client to establish healthy channel with '
'the server')
test_client.wait_for_active_server_channel()
return test_client

@ -1,3 +1,4 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
@ -78,3 +79,4 @@ spec:
- name: gke-spiffe-certs-volume
csi:
driver: certs.spiffe.gke.io
...

@ -1,3 +1,4 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
@ -65,3 +66,4 @@ spec:
- name: grpc-td-conf
emptyDir:
medium: Memory
...

@ -1,3 +1,4 @@
---
apiVersion: v1
kind: Namespace
metadata:
@ -5,3 +6,4 @@ metadata:
labels:
name: ${namespace_name}
owner: xds-k8s-interop-test
...

@ -1,3 +1,4 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
@ -77,3 +78,4 @@ spec:
- name: gke-spiffe-certs-volume
csi:
driver: certs.spiffe.gke.io
...

@ -1,3 +1,4 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
@ -32,3 +33,4 @@ spec:
requests:
cpu: 100m
memory: 512Mi
...

@ -1,3 +1,4 @@
---
apiVersion: v1
kind: Service
metadata:
@ -15,3 +16,4 @@ spec:
- port: ${test_port}
protocol: TCP
targetPort: ${test_port}
...

@ -1,3 +1,4 @@
---
apiVersion: v1
kind: ServiceAccount
metadata:
@ -7,3 +8,4 @@ metadata:
owner: xds-k8s-interop-test
annotations:
iam.gke.io/gcp-service-account: ${gcp_service_account}
...

Loading…
Cancel
Save