xDS interop: Improve retry logic and logging for the k8s retry operations (#30607)

- Changes the order of waiting for pods to start: wait for the pods first, then for the deployment to transition to active. This should provide more useful information in the logs, showing exactly why the pod didn't start, instead of generic "Replicas not available" ref b/200293121. This also needed for https://github.com/grpc/grpc/pull/30594
- Add support for `check_result` callback in the retryer helpers
- Completely replaces `retrying` with `tenacity`, ref b/200293121. Retrying is not longer maintained.
- Improves the readability of timeout errors: now they contain the timeout (or the attempt number) exceeded, and information why the timeout failed (exception/check function):
  Before:  
  > `tenacity.RetryError: RetryError[<Future at 0x7f8ce156bc18 state=finished returned dict>]`
  
  After:
  > `framework.helpers.retryers.RetryError: Retry error calling framework.infrastructure.k8s.KubernetesNamespace.get_pod: timeout 0:01:00 exceeded. Check result callback returned False.`
- Improves the readability of the k8s wait operation errors: now the log includes colorized and formatted status of the k8s object being watched, instead of dumping the full k8s object. For example, here's how an error caused by using incorrect TD bootstrap image:
pull/30411/head
Sergii Tkachenko 3 years ago committed by GitHub
parent 4c2aa29b13
commit 5abe970123
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      tools/run_tests/xds_k8s_test_driver/README.md
  2. 6
      tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
  3. 155
      tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py
  4. 130
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  5. 290
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  6. 25
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py
  7. 13
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
  8. 14
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
  9. 3
      tools/run_tests/xds_k8s_test_driver/requirements.txt

@ -7,7 +7,7 @@ Work in progress. Internal APIs may and will change. Please refrain from making
changes to this codebase at the moment.
### Stabilization roadmap
- [ ] Replace retrying with tenacity
- [x] Replace retrying with tenacity
- [x] Generate namespace for each test to prevent resource name conflicts and
allow running tests in parallel
- [x] Security: run server and client in separate namespaces

@ -34,6 +34,9 @@ _SECURE = flags.DEFINE_bool("secure",
_REUSE_NAMESPACE = flags.DEFINE_bool("reuse_namespace",
default=True,
help="Use existing namespace if exists")
_REUSE_SERVICE = flags.DEFINE_bool("reuse_service",
default=False,
help="Use existing service if exists")
_CLEANUP_NAMESPACE = flags.DEFINE_bool(
"cleanup_namespace",
default=False,
@ -71,7 +74,8 @@ def main(argv):
gcp_api_manager=gcp.api.GcpApiManager(),
gcp_service_account=gcp_service_account,
network=xds_flags.NETWORK.value,
reuse_namespace=_REUSE_NAMESPACE.value)
reuse_namespace=_REUSE_NAMESPACE.value,
reuse_service=_REUSE_SERVICE.value)
if _SECURE.value:
runner_kwargs.update(

@ -22,30 +22,39 @@ We use tenacity as a general-purpose retrying library.
"""
import datetime
import logging
from typing import Any, Optional, Sequence
from typing import Any, Callable, List, Optional, Tuple, Type
import tenacity
from tenacity import _utils as tenacity_utils
from tenacity import stop
from tenacity import wait
from tenacity.retry import retry_base
retryers_logger = logging.getLogger(__name__)
# Type aliases
timedelta = datetime.timedelta
Retrying = tenacity.Retrying
RetryError = tenacity.RetryError
_after_log = tenacity.after_log
_before_sleep_log = tenacity.before_sleep_log
_retry_if_exception_type = tenacity.retry_if_exception_type
_stop_after_attempt = tenacity.stop_after_attempt
_stop_after_delay = tenacity.stop_after_delay
_stop_any = tenacity.stop_any
_wait_exponential = tenacity.wait_exponential
_wait_fixed = tenacity.wait_fixed
def _retry_on_exceptions(retry_on_exceptions: Optional[Sequence[Any]] = None):
CheckResultFn = Callable[[Any], bool]
_ExceptionClasses = Tuple[Type[Exception], ...]
def _build_retry_conditions(
*,
retry_on_exceptions: Optional[_ExceptionClasses] = None,
check_result: Optional[CheckResultFn] = None) -> List[retry_base]:
# Retry on all exceptions by default
if retry_on_exceptions is None:
retry_on_exceptions = (Exception,)
return _retry_if_exception_type(retry_on_exceptions)
retry_conditions = [tenacity.retry_if_exception_type(retry_on_exceptions)]
if check_result is not None:
if retry_on_exceptions:
# When retry_on_exceptions is set, also catch them while executing
# check_result callback.
check_result = _safe_check_result(check_result, retry_on_exceptions)
retry_conditions.append(tenacity.retry_if_not_result(check_result))
return retry_conditions
def exponential_retryer_with_timeout(
@ -53,25 +62,33 @@ def exponential_retryer_with_timeout(
wait_min: timedelta,
wait_max: timedelta,
timeout: timedelta,
retry_on_exceptions: Optional[Sequence[Any]] = None,
retry_on_exceptions: Optional[_ExceptionClasses] = None,
check_result: Optional[CheckResultFn] = None,
logger: Optional[logging.Logger] = None,
log_level: Optional[int] = logging.DEBUG) -> Retrying:
if logger is None:
logger = retryers_logger
if log_level is None:
log_level = logging.DEBUG
return Retrying(retry=_retry_on_exceptions(retry_on_exceptions),
wait=_wait_exponential(min=wait_min.total_seconds(),
max=wait_max.total_seconds()),
stop=_stop_after_delay(timeout.total_seconds()),
before_sleep=_before_sleep_log(logger, log_level))
retry_conditions = _build_retry_conditions(
retry_on_exceptions=retry_on_exceptions, check_result=check_result)
retry_error_callback = _on_error_callback(timeout=timeout,
check_result=check_result)
return Retrying(retry=tenacity.retry_any(*retry_conditions),
wait=wait.wait_exponential(min=wait_min.total_seconds(),
max=wait_max.total_seconds()),
stop=stop.stop_after_delay(timeout.total_seconds()),
before_sleep=tenacity.before_sleep_log(logger, log_level),
retry_error_callback=retry_error_callback)
def constant_retryer(*,
wait_fixed: timedelta,
attempts: int = 0,
timeout: Optional[timedelta] = None,
retry_on_exceptions: Optional[Sequence[Any]] = None,
retry_on_exceptions: Optional[_ExceptionClasses] = None,
check_result: Optional[CheckResultFn] = None,
logger: Optional[logging.Logger] = None,
log_level: Optional[int] = logging.DEBUG) -> Retrying:
if logger is None:
@ -82,11 +99,95 @@ def constant_retryer(*,
raise ValueError('The number of attempts or the timeout must be set')
stops = []
if attempts > 0:
stops.append(_stop_after_attempt(attempts))
stops.append(stop.stop_after_attempt(attempts))
if timeout is not None:
stops.append(_stop_after_delay(timeout.total_seconds()))
stops.append(stop.stop_after_delay(timeout.total_seconds()))
retry_conditions = _build_retry_conditions(
retry_on_exceptions=retry_on_exceptions, check_result=check_result)
retry_error_callback = _on_error_callback(timeout=timeout,
attempts=attempts,
check_result=check_result)
return Retrying(retry=tenacity.retry_any(*retry_conditions),
wait=wait.wait_fixed(wait_fixed.total_seconds()),
stop=stop.stop_any(*stops),
before_sleep=tenacity.before_sleep_log(logger, log_level),
retry_error_callback=retry_error_callback)
def _on_error_callback(*,
timeout: Optional[timedelta] = None,
attempts: int = 0,
check_result: Optional[CheckResultFn] = None):
"""A helper to propagate the initial state to the RetryError, so that
it can assemble a helpful message containing timeout/number of attempts.
"""
def error_handler(retry_state: tenacity.RetryCallState):
raise RetryError(retry_state,
timeout=timeout,
attempts=attempts,
check_result=check_result)
return error_handler
def _safe_check_result(check_result: CheckResultFn,
retry_on_exceptions: _ExceptionClasses) -> CheckResultFn:
"""Wraps check_result callback to catch and handle retry_on_exceptions.
Normally tenacity doesn't retry when retry_if_result/retry_if_not_result
raise an error. This wraps the callback to automatically catch Exceptions
specified in the retry_on_exceptions argument.
Ideally we should make all check_result callbacks to not throw, but
in case it does, we'd rather be annoying in the logs, than break the test.
"""
def _check_result_wrapped(result):
try:
return check_result(result)
except retry_on_exceptions:
retryers_logger.warning(
"Result check callback %s raised an exception."
"This shouldn't happen, please handle any exceptions and "
"return return a boolean.",
tenacity_utils.get_callback_name(check_result),
exc_info=True)
return False
return _check_result_wrapped
class RetryError(tenacity.RetryError):
def __init__(self,
retry_state,
*,
timeout: Optional[timedelta] = None,
attempts: int = 0,
check_result: Optional[CheckResultFn] = None):
super().__init__(retry_state.outcome)
callback_name = tenacity_utils.get_callback_name(retry_state.fn)
self.message = f'Retry error calling {callback_name}:'
if timeout:
self.message += f' timeout {timeout} (h:mm:ss) exceeded'
if attempts:
self.message += ' or'
if attempts:
self.message += f' {attempts} attempts exhausted'
self.message += '.'
if retry_state.outcome.failed:
ex = retry_state.outcome.exception()
self.message += f' Last exception: {type(ex).__name__}: {ex}'
elif check_result:
self.message += ' Check result callback returned False.'
def result(self, *, default=None):
return default if self.last_attempt.failed else self.last_attempt.result(
)
return Retrying(retry=_retry_on_exceptions(retry_on_exceptions),
wait=_wait_fixed(wait_fixed.total_seconds()),
stop=_stop_any(*stops),
before_sleep=_before_sleep_log(logger, log_level))
def __str__(self):
return self.message

@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import datetime
import enum
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set
from googleapiclient import discovery
import googleapiclient.errors
# TODO(sergiitk): replace with tenacity
import retrying
from framework.helpers import retryers
from framework.infrastructure import gcp
logger = logging.getLogger(__name__)
@ -29,6 +29,7 @@ logger = logging.getLogger(__name__)
class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-public-methods
# TODO(sergiitk): move someplace better
_WAIT_FOR_BACKEND_SEC = 60 * 10
_WAIT_FOR_BACKEND_SLEEP_SEC = 4
_WAIT_FOR_OPERATION_SEC = 60 * 10
@dataclasses.dataclass(frozen=True)
@ -289,34 +290,36 @@ class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-publ
self._delete_resource(self.api.globalForwardingRules(),
'forwardingRule', name)
@staticmethod
def _network_endpoint_group_not_ready(neg):
return not neg or neg.get('size', 0) == 0
def wait_for_network_endpoint_group(self, name, zone):
@retrying.retry(retry_on_result=self._network_endpoint_group_not_ready,
stop_max_delay=60 * 1000,
wait_fixed=2 * 1000)
def _wait_for_network_endpoint_group_ready():
try:
neg = self.get_network_endpoint_group(name, zone)
logger.debug(
'Waiting for endpoints: NEG %s in zone %s, '
'current count %s', neg['name'], zone, neg.get('size'))
except googleapiclient.errors.HttpError as error:
# noinspection PyProtectedMember
reason = error._get_reason()
logger.debug('Retrying NEG load, got %s, details %s',
error.resp.status, reason)
raise
return neg
network_endpoint_group = _wait_for_network_endpoint_group_ready()
def wait_for_network_endpoint_group(self,
name: str,
zone: str,
*,
timeout_sec=_WAIT_FOR_BACKEND_SEC,
wait_sec=_WAIT_FOR_BACKEND_SLEEP_SEC):
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda neg: neg and neg.get('size', 0) > 0)
network_endpoint_group = retryer(
self._retry_network_endpoint_group_ready, name, zone)
# TODO(sergiitk): dataclass
return self.ZonalGcpResource(network_endpoint_group['name'],
network_endpoint_group['selfLink'], zone)
def _retry_network_endpoint_group_ready(self, name: str, zone: str):
try:
neg = self.get_network_endpoint_group(name, zone)
logger.debug(
'Waiting for endpoints: NEG %s in zone %s, '
'current count %s', neg['name'], zone, neg.get('size'))
except googleapiclient.errors.HttpError as error:
# noinspection PyProtectedMember
reason = error._get_reason()
logger.debug('Retrying NEG load, got %s, details %s',
error.resp.status, reason)
raise
return neg
def get_network_endpoint_group(self, name, zone):
neg = self.api.networkEndpointGroups().get(project=self.project,
networkEndpointGroup=name,
@ -325,44 +328,43 @@ class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-publ
return neg
def wait_for_backends_healthy_status(
self,
backend_service,
backends,
timeout_sec=_WAIT_FOR_BACKEND_SEC,
wait_sec=4,
):
self,
backend_service: GcpResource,
backends: Set[ZonalGcpResource],
*,
timeout_sec: int = _WAIT_FOR_BACKEND_SEC,
wait_sec: int = _WAIT_FOR_BACKEND_SLEEP_SEC):
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda result: result)
pending = set(backends)
@retrying.retry(retry_on_result=lambda result: not result,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _retry_backends_health():
for backend in pending:
result = self.get_backend_service_backend_health(
backend_service, backend)
if 'healthStatus' not in result:
logger.debug('Waiting for instances: backend %s, zone %s',
backend.name, backend.zone)
continue
backend_healthy = True
for instance in result['healthStatus']:
logger.debug(
'Backend %s in zone %s: instance %s:%s health: %s',
backend.name, backend.zone, instance['ipAddress'],
instance['port'], instance['healthState'])
if instance['healthState'] != 'HEALTHY':
backend_healthy = False
if backend_healthy:
logger.info('Backend %s in zone %s reported healthy',
backend.name, backend.zone)
pending.remove(backend)
return not pending
_retry_backends_health()
retryer(self._retry_backends_health, backend_service, pending)
def _retry_backends_health(self, backend_service: GcpResource,
pending: Set[ZonalGcpResource]):
for backend in pending:
result = self.get_backend_service_backend_health(
backend_service, backend)
if 'healthStatus' not in result:
logger.debug('Waiting for instances: backend %s, zone %s',
backend.name, backend.zone)
continue
backend_healthy = True
for instance in result['healthStatus']:
logger.debug('Backend %s in zone %s: instance %s:%s health: %s',
backend.name, backend.zone, instance['ipAddress'],
instance['port'], instance['healthState'])
if instance['healthState'] != 'HEALTHY':
backend_healthy = False
if backend_healthy:
logger.info('Backend %s in zone %s reported healthy',
backend.name, backend.zone)
pending.remove(backend)
return not pending
def get_backend_service_backend_health(self, backend_service, backend):
return self.api.backendServices().getHealth(

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import functools
import json
import logging
@ -22,11 +23,14 @@ from typing import List, Optional, Tuple
from kubernetes import client
from kubernetes import utils
import kubernetes.config
# TODO(sergiitk): replace with tenacity
import retrying
import yaml
from framework.helpers import retryers
import framework.helpers.highlighter
logger = logging.getLogger(__name__)
# Type aliases
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
V1Deployment = client.V1Deployment
V1ServiceAccount = client.V1ServiceAccount
V1Pod = client.V1Pod
@ -181,6 +185,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
WAIT_LONG_SLEEP_SEC: int = 30
def __init__(self, api: KubernetesApiManager, name: str):
self._highlighter = _HighlighterYaml()
self.name = name
self.api = api
@ -230,71 +235,51 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def wait_for_service_deleted(self,
name: str,
timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
wait_sec=WAIT_SHORT_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: r is not None,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_deleted_service_with_retry():
service = self.get_service(name)
if service is not None:
logger.debug('Waiting for service %s to be deleted',
service.metadata.name)
return service
_wait_for_deleted_service_with_retry()
def wait_for_service_account_deleted(self,
name: str,
timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
wait_sec=WAIT_SHORT_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: r is not None,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_deleted_service_account_with_retry():
service_account = self.get_service_account(name)
if service_account is not None:
logger.debug('Waiting for service account %s to be deleted',
service_account.metadata.name)
return service_account
_wait_for_deleted_service_account_with_retry()
timeout_sec: int = WAIT_SHORT_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda service: service is None)
retryer(self.get_service, name)
def wait_for_service_account_deleted(
self,
name: str,
timeout_sec: int = WAIT_SHORT_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda service_account: service_account is None)
retryer(self.get_service_account, name)
def wait_for_namespace_deleted(self,
timeout_sec=WAIT_LONG_TIMEOUT_SEC,
wait_sec=WAIT_LONG_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: r is not None,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_deleted_namespace_with_retry():
namespace = self.get()
if namespace is not None:
logger.debug('Waiting for namespace %s to be deleted',
namespace.metadata.name)
return namespace
_wait_for_deleted_namespace_with_retry()
timeout_sec: int = WAIT_LONG_TIMEOUT_SEC,
wait_sec: int = WAIT_LONG_SLEEP_SEC) -> None:
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda namespace: namespace is None)
retryer(self.get)
def wait_for_service_neg(self,
name: str,
timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
wait_sec=WAIT_SHORT_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: not r,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_service_neg():
service = self.get_service(name)
if self.NEG_STATUS_META not in service.metadata.annotations:
logger.debug('Waiting for service %s NEG',
service.metadata.name)
return False
return True
_wait_for_service_neg()
timeout_sec: int = WAIT_SHORT_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
timeout = datetime.timedelta(seconds=timeout_sec)
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=timeout,
check_result=self._check_service_neg_annotation)
try:
retryer(self.get_service, name)
except retryers.RetryError as e:
logger.error(
'Timeout %s (h:mm:ss) waiting for service %s to report NEG '
'status. Last service status:\n%s', timeout, name,
self._pretty_format_status(e.result()))
raise
def get_service_neg(self, service_name: str,
service_port: int) -> Tuple[str, List[str]]:
@ -309,9 +294,10 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def get_deployment(self, name) -> V1Deployment:
return self.api.apps.read_namespaced_deployment(name, self.name)
def delete_deployment(self,
name,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
def delete_deployment(
self,
name: str,
grace_period_seconds: int = DELETE_GRACE_PERIOD_SEC) -> None:
self.api.apps.delete_namespaced_deployment(
name=name,
namespace=self.name,
@ -325,67 +311,82 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def wait_for_deployment_available_replicas(
self,
name,
count=1,
timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec=WAIT_MEDIUM_SLEEP_SEC):
@retrying.retry(
retry_on_result=lambda r: not self._replicas_available(r, count),
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_deployment_available_replicas():
deployment = self.get_deployment(name)
logger.debug(
'Waiting for deployment %s to have %s available '
'replicas, current count %s', deployment.metadata.name, count,
deployment.status.available_replicas)
return deployment
_wait_for_deployment_available_replicas()
def wait_for_deployment_deleted(self,
deployment_name: str,
timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec=WAIT_MEDIUM_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: r is not None,
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_deleted_deployment_with_retry():
deployment = self.get_deployment(deployment_name)
if deployment is not None:
logger.debug(
'Waiting for deployment %s to be deleted. '
'Non-terminated replicas: %s', deployment.metadata.name,
deployment.status.replicas)
return deployment
_wait_for_deleted_deployment_with_retry()
name: str,
count: int = 1,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
timeout = datetime.timedelta(seconds=timeout_sec)
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=timeout,
check_result=lambda depl: self._replicas_available(depl, count))
try:
retryer(self.get_deployment, name)
except retryers.RetryError as e:
logger.error(
'Timeout %s (h:mm:ss) waiting for deployment %s to report %i '
'replicas available. Last status:\n%s', timeout, name, count,
self._pretty_format_status(e.result()))
raise
def wait_for_deployment_replica_count(
self,
deployment: V1Deployment,
count: int = 1,
*,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
timeout = datetime.timedelta(seconds=timeout_sec)
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=timeout,
check_result=lambda pods: len(pods) == count)
try:
retryer(self.list_deployment_pods, deployment)
except retryers.RetryError as e:
result = e.result(default=[])
logger.error(
'Timeout %s (h:mm:ss) waiting for pod count %i, got: %i. '
'Pod statuses:\n%s', timeout, count, len(result),
self._pretty_format_statuses(result))
raise
def wait_for_deployment_deleted(
self,
deployment_name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC) -> None:
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=datetime.timedelta(seconds=timeout_sec),
check_result=lambda deployment: deployment is None)
retryer(self.get_deployment, deployment_name)
def list_pods_with_labels(self, labels: dict) -> List[V1Pod]:
pod_list: V1PodList = self.api.core.list_namespaced_pod(
self.name, label_selector=label_dict_to_selector(labels))
return pod_list.items
def get_pod(self, name) -> client.V1Pod:
def get_pod(self, name: str) -> V1Pod:
return self.api.core.read_namespaced_pod(name, self.name)
def wait_for_pod_started(self,
pod_name,
timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
wait_sec=WAIT_SHORT_SLEEP_SEC):
@retrying.retry(retry_on_result=lambda r: not self._pod_started(r),
stop_max_delay=timeout_sec * 1000,
wait_fixed=wait_sec * 1000)
def _wait_for_pod_started():
pod = self.get_pod(pod_name)
logger.debug('Waiting for pod %s to start, current phase: %s',
pod.metadata.name, pod.status.phase)
return pod
_wait_for_pod_started()
pod_name: str,
timeout_sec: int = WAIT_SHORT_TIMEOUT_SEC,
wait_sec: int = WAIT_SHORT_SLEEP_SEC) -> None:
timeout = datetime.timedelta(seconds=timeout_sec)
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(seconds=wait_sec),
timeout=timeout,
check_result=self._pod_started)
try:
retryer(self.get_pod, pod_name)
except retryers.RetryError as e:
logger.error(
'Timeout %s (h:mm:ss) waiting for pod %s to start. '
'Pod status:\n%s', timeout, pod_name,
self._pretty_format_status(e.result()))
raise
def port_forward_pod(
self,
@ -400,12 +401,55 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
pf.connect()
return pf
@staticmethod
def _pod_started(pod: V1Pod):
return pod.status.phase not in ('Pending', 'Unknown')
def _pretty_format_statuses(self,
k8s_objects: List[Optional[object]]) -> str:
return '\n'.join(
self._pretty_format_status(k8s_object)
for k8s_object in k8s_objects)
def _pretty_format_status(self, k8s_object: Optional[object]) -> str:
if k8s_object is None:
return 'No data'
# Parse the name if present.
if hasattr(k8s_object, 'metadata') and hasattr(k8s_object.metadata,
'name'):
name = k8s_object.metadata.name
else:
name = 'Can\'t parse resource name'
# Pretty-print the status if present.
if hasattr(k8s_object, 'status'):
try:
status = self._pretty_format(k8s_object.status.to_dict())
except Exception as e: # pylint: disable=broad-except
# Catching all exceptions because not printing the status
# isn't as important as the system under test.
status = f'Can\'t parse resource status: {e}'
else:
status = 'Can\'t parse resource status'
@staticmethod
def _replicas_available(deployment, count):
return (deployment is not None and
# Return the name of k8s object, and its pretty-printed status.
return f'{name}:\n{status}\n'
def _pretty_format(self, data: dict) -> str:
"""Return a string with pretty-printed yaml data from a python dict."""
yaml_out: str = yaml.dump(data, explicit_start=True, explicit_end=True)
return self._highlighter.highlight(yaml_out)
@classmethod
def _check_service_neg_annotation(cls,
service: Optional[V1Service]) -> bool:
return (isinstance(service, V1Service) and
cls.NEG_STATUS_META in service.metadata.annotations)
@classmethod
def _pod_started(cls, pod: V1Pod) -> bool:
return (isinstance(pod, V1Pod) and
pod.status.phase not in ('Pending', 'Unknown'))
@classmethod
def _replicas_available(cls, deployment: V1Deployment, count: int) -> bool:
return (isinstance(deployment, V1Deployment) and
deployment.status.available_replicas is not None and
deployment.status.available_replicas >= count)

@ -18,7 +18,7 @@ import contextlib
import datetime
import logging
import pathlib
from typing import Optional
from typing import List, Optional
import mako.template
import yaml
@ -282,8 +282,9 @@ class KubernetesBaseRunner(base_runner.BaseRunner):
logger.debug('Namespace %s deleted', self.k8s_namespace.name)
def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs):
logger.info('Waiting for deployment %s to have %s available replica(s)',
name, count)
logger.info(
'Waiting for deployment %s to report %s '
'available replica(s)', name, count)
self.k8s_namespace.wait_for_deployment_available_replicas(
name, count, **kwargs)
deployment = self.k8s_namespace.get_deployment(name)
@ -291,12 +292,28 @@ class KubernetesBaseRunner(base_runner.BaseRunner):
deployment.metadata.name,
deployment.status.available_replicas)
def _wait_pod_started(self, name, **kwargs):
def _wait_deployment_pod_count(self,
deployment: k8s.V1Deployment,
count: int = 1,
**kwargs) -> List[str]:
logger.info('Waiting for deployment %s to initialize %s pod(s)',
deployment.metadata.name, count)
self.k8s_namespace.wait_for_deployment_replica_count(
deployment, count, **kwargs)
pods = self.k8s_namespace.list_deployment_pods(deployment)
pod_names = [pod.metadata.name for pod in pods]
logger.info('Deployment %s initialized %i pod(s): %s',
deployment.metadata.name, count, pod_names)
# Pods may not be started yet, just return the names.
return pod_names
def _wait_pod_started(self, name, **kwargs) -> k8s.V1Pod:
logger.info('Waiting for pod %s to start', name)
self.k8s_namespace.wait_for_pod_started(name, **kwargs)
pod = self.k8s_namespace.get_pod(name)
logger.info('Pod %s ready, IP: %s', pod.metadata.name,
pod.status.pod_ip)
return pod
def _wait_service_neg(self, name, service_port, **kwargs):
logger.info('Waiting for NEG for service %s', name)

@ -140,16 +140,17 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
config_mesh=config_mesh,
print_response=print_response)
# Load test client pod. We need only one client at the moment
pod_name = self._wait_deployment_pod_count(self.deployment)[0]
pod: k8s.V1Pod = self._wait_pod_started(pod_name)
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name)
# Load test client pod. We need only one client at the moment
pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
self._wait_pod_started(pod.metadata.name)
# Experimental, for local debugging.
pod_ip = pod.status.pod_ip
rpc_port = self.stats_port
rpc_host = None
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, self.stats_port)
@ -162,7 +163,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host,
hostname=pod.metadata.name)
hostname=pod_name)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarder:

@ -181,17 +181,15 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
maintenance_port=maintenance_port,
secure_mode=secure_mode)
pod_names = self._wait_deployment_pod_count(self.deployment,
replica_count)
pods = [self._wait_pod_started(pod_name) for pod_name in pod_names]
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name,
replica_count)
# Wait for pods running
pods = self.k8s_namespace.list_deployment_pods(self.deployment)
servers = []
for pod in pods:
pod_name = pod.metadata.name
self._wait_pod_started(pod_name)
pod_ip = pod.status.pod_ip
rpc_host = None
# Experimental, for local debugging.
@ -208,7 +206,7 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
servers.append(
XdsTestServer(ip=pod_ip,
rpc_port=test_port,
hostname=pod_name,
hostname=pod.metadata.name,
maintenance_port=local_port,
secure_mode=secure_mode,
rpc_host=rpc_host))

@ -9,9 +9,6 @@ grpcio-health-checking~=1.34
grpcio-tools~=1.34
grpcio-channelz~=1.34
kubernetes~=12.0
# TODO(sergiitk): remove retrying when replaced with tenacity in code.
# Context: https://github.com/grpc/grpc/pull/24983#discussion_r543017022
retrying~=1.3
six~=1.13
tenacity~=6.2
packaging~=21.3

Loading…
Cancel
Save