@ -19,11 +19,12 @@ import json
import logging
import pathlib
import threading
from typing import List , Optional , Tuple
from typing import Any , Callable , List , Optional , Tuple
from kubernetes import client
from kubernetes import utils
import kubernetes . config
import urllib3 . exceptions
import yaml
from framework . helpers import retryers
@ -46,43 +47,32 @@ V1Service = client.V1Service
V1Namespace = client . V1Namespace
ApiException = client . ApiException
FailToCreateError = utils . FailToCreateError
_timedelta = datetime . timedelta
_RETRY_ON_EXCEPTIONS = ( urllib3 . exceptions . HTTPError , ApiException ,
FailToCreateError )
def _simple_resource_get ( func ) :
def _wrap_simple_resource_get ( self : ' KubernetesNamespace ' , * args , * * kwargs ) :
try :
return func ( self , * args , * * kwargs )
except ApiException as e :
if e . status == 404 :
# Instead of trowing an error when a resource doesn't exist,
# just return None.
return None
elif e . status == 401 :
# 401 Unauthorized: token might be expired, attempt auth refresh
self . refresh_auth ( )
return func ( self , * args , * * kwargs )
# Reraise for anything else.
raise
return _wrap_simple_resource_get
def _server_restart_retryer ( ) - > retryers . Retrying :
return retryers . exponential_retryer_with_timeout (
retry_on_exceptions = _RETRY_ON_EXCEPTIONS ,
wait_min = _timedelta ( seconds = 1 ) ,
wait_max = _timedelta ( seconds = 10 ) ,
timeout = _timedelta ( minutes = 3 ) )
def _simple_resource_delete ( func ) :
def _too_many_requests_retryer ( ) - > retryers . Retrying :
return retryers . exponential_retryer_with_timeout (
retry_on_exceptions = _RETRY_ON_EXCEPTIONS ,
wait_min = _timedelta ( seconds = 10 ) ,
wait_max = _timedelta ( seconds = 30 ) ,
timeout = _timedelta ( minutes = 3 ) )
def _wrap_simple_resource_delete ( self : ' KubernetesNamespace ' , * args ,
* * kwargs ) :
try :
return func ( self , * args , * * kwargs )
except ApiException as e :
if e . status == 401 :
# 401 Unauthorized: token might be expired, attempt auth refresh
self . refresh_auth ( )
return func ( self , * args , * * kwargs )
# Reraise for anything else.
raise
return _wrap_simple_resource_delete
def _quick_recovery_retryer ( ) - > retryers . Retrying :
return retryers . constant_retryer ( wait_fixed = _timedelta ( seconds = 1 ) ,
attempts = 3 ,
retry_on_exceptions = _RETRY_ON_EXCEPTIONS )
def label_dict_to_selector ( labels : dict ) - > str :
@ -125,10 +115,16 @@ class KubernetesApiManager:
context = context )
logger . info ( ' Using kubernetes context " %s " , active host: %s ' , context ,
client_instance . configuration . host )
# TODO(sergiitk): fine-tune if we see the total wait unreasonably long.
client_instance . configuration . retries = 10
return client_instance
class KubernetesNamespace : # pylint: disable=too-many-public-methods
_highlighter : framework . helpers . highlighter . Highlighter
_api : KubernetesApiManager
_name : str
NEG_STATUS_META = ' cloud.google.com/neg-status '
DELETE_GRACE_PERIOD_SEC : int = 5
WAIT_SHORT_TIMEOUT_SEC : int = 60
@ -140,68 +136,188 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
WAIT_POD_START_TIMEOUT_SEC : int = 3 * 60
def __init__ ( self , api : KubernetesApiManager , name : str ) :
self . _api = api
self . _name = name
self . _highlighter = _HighlighterYaml ( )
self . name = name
self . api = api
def refresh_auth ( self ) :
@property
def name ( self ) :
return self . _name
def _refresh_auth ( self ) :
logger . info ( ' Reloading k8s api client to refresh the auth. ' )
self . api . reload ( )
self . _ api. reload ( )
def apply_manifest ( self , manifest ) :
return utils . create_from_dict ( self . api . client ,
def _ apply_manifest( self , manifest ) :
return utils . create_from_dict ( self . _ api. client ,
manifest ,
namespace = self . name )
@_simple_resource_get
def _get_resource ( self , method : Callable [ [ Any ] , object ] , * args , * * kwargs ) :
try :
return self . _execute ( method , * args , * * kwargs )
except ApiException as err :
if err . status == 404 :
# Instead of trowing an error when a resource doesn't exist,
# just return None.
return None
raise
def _execute ( self , method : Callable [ [ Any ] , object ] , * args , * * kwargs ) :
# Note: Intentionally leaving return type as unspecified to not confuse
# pytype for methods that delegate calls to this wrapper.
try :
return method ( * args , * * kwargs )
except _RETRY_ON_EXCEPTIONS as err :
retryer = self . _handle_exception ( err )
if retryer is not None :
return retryer ( method , * args , * * kwargs )
raise
def _handle_exception ( self , err : Exception ) - > Optional [ retryers . Retrying ] :
# TODO(sergiitk): replace returns with match/case when we use to py3.10.
# pylint: disable=too-many-return-statements
# Unwrap MaxRetryError.
if isinstance ( err , urllib3 . exceptions . MaxRetryError ) :
return self . _handle_exception ( err . reason ) if err . reason else None
# We consider all `NewConnectionError`s as caused by a k8s
# API server restart. `NewConnectionError`s we've seen:
# - [Errno 110] Connection timed out
# - [Errno 111] Connection refused
if isinstance ( err , urllib3 . exceptions . NewConnectionError ) :
return _server_restart_retryer ( )
# We consider all `ProtocolError`s with "Connection aborted" message
# as caused by a k8s API server restart.
# `ProtocolError`s we've seen:
# - RemoteDisconnected('Remote end closed connection
# without response')
# - ConnectionResetError(104, 'Connection reset by peer')
if isinstance ( err , urllib3 . exceptions . ProtocolError ) :
if ' connection aborted ' in str ( err ) . lower ( ) :
return _server_restart_retryer ( )
else :
# To cover other cases we didn't account for, and haven't
# seen in the wild, f.e. "Connection broken"
return _quick_recovery_retryer ( )
# ApiException means the server has received our request and responded
# with an error we can parse (except a few corner cases, f.e. SSLError).
if isinstance ( err , ApiException ) :
return self . _handle_api_exception ( err )
# Unwrap FailToCreateError.
if isinstance ( err , FailToCreateError ) :
# We're always sending a single document, so we expect
# a single wrapped exception in return.
if len ( err . api_exceptions ) == 1 :
return self . _handle_exception ( err . api_exceptions [ 0 ] )
return None
def _handle_api_exception ( self ,
err : ApiException ) - > Optional [ retryers . Retrying ] :
# TODO(sergiitk): replace returns with match/case when we use to py3.10.
# pylint: disable=too-many-return-statements
# TODO(sergiitk): can I chain the retryers?
logger . debug (
' Handling k8s.ApiException: status= %s reason= %s body= %s headers= %s ' ,
err . status , err . reason , err . body , err . headers )
code : int = err . status
body = err . body . lower ( ) if err . body else ' '
# 401 Unauthorized: token might be expired, attempt auth refresh.
if code == 401 :
self . _refresh_auth ( )
return _quick_recovery_retryer ( )
# 409 Conflict
# "Operation cannot be fulfilled on resourcequotas "foo": the object
# has been modified; please apply your changes to the latest version
# and try again".
# See https://github.com/kubernetes/kubernetes/issues/67761
if code == 409 :
return _quick_recovery_retryer ( )
# 429 Too Many Requests: "Too many requests, please try again later"
if code == 429 :
return _too_many_requests_retryer ( )
# 500 Internal Server Error
if code == 500 :
# Observed when using `kubectl proxy`.
# "dial tcp 127.0.0.1:8080: connect: connection refused"
if ' connection refused ' in body :
return _server_restart_retryer ( )
# Known 500 errors that should be treated as 429:
# - Internal Server Error: "/api/v1/namespaces": the server has
# received too many requests and has asked us
# to try again later
# - Internal Server Error: "/api/v1/namespaces/foo/services":
# the server is currently unable to handle the request
if ( ' too many requests ' in body or
' currently unable to handle the request ' in body ) :
return _too_many_requests_retryer ( )
# In other cases, just retry a few times in case the server
# resumes normal operation.
return _quick_recovery_retryer ( )
return None
def create_single_resource ( self , manifest ) :
return self . _execute ( self . _apply_manifest , manifest )
def get_service ( self , name ) - > V1Service :
return self . api . core . read_namespaced_service ( name , self . name )
return self . _get_resource ( self . _api . core . read_namespaced_service , name ,
self . name )
@_simple_resource_get
def get_service_account ( self , name ) - > V1Service :
return self . api . core . read_namespaced_service_account ( name , self . name )
return self . _get_resource (
self . _api . core . read_namespaced_service_account , name , self . name )
@_simple_resource_delete
def delete_service ( self ,
name ,
grace_period_seconds = DELETE_GRACE_PERIOD_SEC ) :
self . api . core . delete_namespaced_service (
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
@_simple_resource_delete
self . _execute ( self . _api . core . delete_namespaced_service ,
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
def delete_service_account ( self ,
name ,
grace_period_seconds = DELETE_GRACE_PERIOD_SEC ) :
self . api . core . delete_namespaced_service_account (
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
@_simple_resource_get
self . _execute ( self . _api . core . delete_namespaced_service_account ,
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
def get ( self ) - > V1Namespace :
return self . api . core . read_namespace ( self . name )
return self . _get_resource ( self . _ api. core . read_namespace , self . name )
@_simple_resource_delete
def delete ( self , grace_period_seconds = DELETE_GRACE_PERIOD_SEC ) :
self . api . core . delete_namespace (
name = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
self . _execute ( self . _ api. core . delete_namespace ,
name = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
def wait_for_service_deleted ( self ,
name : str ,
timeout_sec : int = WAIT_SHORT_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
timeout = datetime . timedelta ( seconds = timeout_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = _ timedelta( seconds = timeout_sec ) ,
check_result = lambda service : service is None )
retryer ( self . get_service , name )
@ -211,8 +327,8 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
timeout_sec : int = WAIT_SHORT_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
timeout = datetime . timedelta ( seconds = timeout_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = _ timedelta( seconds = timeout_sec ) ,
check_result = lambda service_account : service_account is None )
retryer ( self . get_service_account , name )
@ -220,8 +336,8 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
timeout_sec : int = WAIT_LONG_TIMEOUT_SEC ,
wait_sec : int = WAIT_LONG_SLEEP_SEC ) - > None :
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
timeout = datetime . timedelta ( seconds = timeout_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = _ timedelta( seconds = timeout_sec ) ,
check_result = lambda namespace : namespace is None )
retryer ( self . get )
@ -229,9 +345,9 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
name : str ,
timeout_sec : int = WAIT_SHORT_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
timeout = datetime . timedelta ( seconds = timeout_sec )
timeout = _ timedelta( seconds = timeout_sec )
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = timeout ,
check_result = self . _check_service_neg_annotation )
try :
@ -252,21 +368,20 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
neg_zones : List [ str ] = neg_info [ ' zones ' ]
return neg_name , neg_zones
@_simple_resource_get
def get_deployment ( self , name ) - > V1Deployment :
return self . api . apps . read_namespaced_deployment ( name , self . name )
return self . _get_resource ( self . _api . apps . read_namespaced_deployment ,
name , self . name )
@_simple_resource_delete
def delete_deployment (
self ,
name : str ,
grace_period_seconds : int = DELETE_GRACE_PERIOD_SEC ) - > None :
self . api . apps . delete_namespaced_deployment (
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
self . _execute ( self . _ api. apps . delete_namespaced_deployment ,
name = name ,
namespace = self . name ,
body = client . V1DeleteOptions (
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
def list_deployment_pods ( self , deployment : V1Deployment ) - > List [ V1Pod ] :
# V1LabelSelector.match_expressions not supported at the moment
@ -278,9 +393,9 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
count : int = 1 ,
timeout_sec : int = WAIT_MEDIUM_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
timeout = datetime . timedelta ( seconds = timeout_sec )
timeout = _ timedelta( seconds = timeout_sec )
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = timeout ,
check_result = lambda depl : self . _replicas_available ( depl , count ) )
try :
@ -299,9 +414,9 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
* ,
timeout_sec : int = WAIT_MEDIUM_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
timeout = datetime . timedelta ( seconds = timeout_sec )
timeout = _ timedelta( seconds = timeout_sec )
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = timeout ,
check_result = lambda pods : len ( pods ) == count )
try :
@ -320,28 +435,29 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
timeout_sec : int = WAIT_MEDIUM_TIMEOUT_SEC ,
wait_sec : int = WAIT_MEDIUM_SLEEP_SEC ) - > None :
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
timeout = datetime . timedelta ( seconds = timeout_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = _ timedelta( seconds = timeout_sec ) ,
check_result = lambda deployment : deployment is None )
retryer ( self . get_deployment , deployment_name )
@_simple_resource_get
def list_pods_with_labels ( self , labels : dict ) - > List [ V1Pod ] :
pod_list : V1PodList = self . api . core . list_namespaced_pod (
self . name , label_selector = label_dict_to_selector ( labels ) )
pod_list : V1PodList = self . _execute (
self . _api . core . list_namespaced_pod ,
self . name ,
label_selector = label_dict_to_selector ( labels ) )
return pod_list . items
@_simple_resource_get
def get_pod ( self , name : str ) - > V1Pod :
return self . api . core . read_namespaced_pod ( name , self . name )
return self . _get_resource ( self . _api . core . read_namespaced_pod , name ,
self . name )
def wait_for_pod_started ( self ,
pod_name : str ,
timeout_sec : int = WAIT_POD_START_TIMEOUT_SEC ,
wait_sec : int = WAIT_SHORT_SLEEP_SEC ) - > None :
timeout = datetime . timedelta ( seconds = timeout_sec )
timeout = _ timedelta( seconds = timeout_sec )
retryer = retryers . constant_retryer (
wait_fixed = datetime . timedelta ( seconds = wait_sec ) ,
wait_fixed = _ timedelta( seconds = wait_sec ) ,
timeout = timeout ,
check_result = self . _pod_started )
try :
@ -360,7 +476,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
local_port : Optional [ int ] = None ,
local_address : Optional [ str ] = None ,
) - > k8s_port_forwarder . PortForwarder :
pf = k8s_port_forwarder . PortForwarder ( self . api . context , self . name ,
pf = k8s_port_forwarder . PortForwarder ( self . _ api. context , self . name ,
f " pod/ { pod . metadata . name } " ,
remote_port , local_port ,
local_address )
@ -377,7 +493,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
pod_log_collector = PodLogCollector (
pod_name = pod_name ,
namespace_name = self . name ,
read_pod_log_fn = self . api . core . read_namespaced_pod_log ,
read_pod_log_fn = self . _ api. core . read_namespaced_pod_log ,
stop_event = log_stop_event ,
log_path = log_path ,
log_to_stdout = log_to_stdout ,