@ -15,7 +15,6 @@
# added to get around circular dependencies caused by k8s.py clashing with
# k8s/__init__.py
import datetime
import functools
import json
import logging
import pathlib
@ -33,10 +32,12 @@ from framework.infrastructure.k8s_internal import k8s_log_collector
from framework . infrastructure . k8s_internal import k8s_port_forwarder
logger = logging . getLogger ( __name__ )
# Type aliases
_HighlighterYaml = framework . helpers . highlighter . HighlighterYaml
PodLogCollector = k8s_log_collector . PodLogCollector
PortForwarder = k8s_port_forwarder . PortForwarder
ApiClient = client . ApiClient
V1Deployment = client . V1Deployment
V1ServiceAccount = client . V1ServiceAccount
V1Pod = client . V1Pod
@ -44,20 +45,27 @@ V1PodList = client.V1PodList
V1Service = client . V1Service
V1Namespace = client . V1Namespace
ApiException = client . ApiException
FailToCreateError = utils . FailToCreateError
def simple_resource_get ( func ) :
def _ simple_resource_get( func ) :
def wrap_not_found_return_none ( * args , * * kwargs ) :
def _wrap_simple_resource_get ( self : ' KubernetesNamespace ' , * args , * * kwargs ) :
try :
return func ( * args , * * kwargs )
except client . ApiException as e :
return func ( self , * args , * * kwargs )
except ApiException as e :
if e . status == 404 :
# Ignore 404
# Instead of trowing an error when a resource doesn't exist,
# just return None.
return None
elif e . status == 401 :
# 401 Unauthorized: token might be expired, attempt auth refresh
self . refresh_auth ( )
return func ( self , * args , * * kwargs )
# Reraise for anything else.
raise
return wrap_not_found_return_none
return _wrap_simple_resource_get
def label_dict_to_selector ( labels : dict ) - > str :
@ -65,19 +73,37 @@ def label_dict_to_selector(labels: dict) -> str:
class KubernetesApiManager :
_client : ApiClient
context : str
apps : client . AppsV1Api
core : client . CoreV1Api
_apis : set
def __init__ ( self , context ) :
def __init__ ( self , context : str ) :
self . context = context
self . client = self . _cached_api_client_for_context ( context )
self . _ client = self . _new_client_from _context ( context )
self . apps = client . AppsV1Api ( self . client )
self . core = client . CoreV1Api ( self . client )
self . _apis = { self . apps , self . core }
@property
def client ( self ) - > ApiClient :
return self . _client
def close ( self ) :
self . client . close ( )
@classmethod
@functools . lru_cache ( None )
def _cached_api_client_for_context ( cls , context : str ) - > client . ApiClient :
def reload ( self ) :
self . close ( )
self . _client = self . _new_client_from_context ( self . context )
# Update default configuration so that modules that initialize
# ApiClient implicitly (e.g. kubernetes.watch.Watch) get the updates.
client . Configuration . set_default ( self . _client . configuration )
for api in self . _apis :
api . api_client = self . _client
@staticmethod
def _new_client_from_context ( context : str ) - > ApiClient :
client_instance = kubernetes . config . new_client_from_config (
context = context )
logger . info ( ' Using kubernetes context " %s " , active host: %s ' , context ,
@ -101,16 +127,20 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
self . name = name
self . api = api
def refresh_auth ( self ) :
logger . info ( ' Reloading k8s api client to refresh the auth. ' )
self . api . reload ( )
def apply_manifest ( self , manifest ) :
return utils . create_from_dict ( self . api . client ,
manifest ,
namespace = self . name )
@simple_resource_get
@_ simple_resource_get
def get_service ( self , name ) - > V1Service :
return self . api . core . read_namespaced_service ( name , self . name )
@simple_resource_get
@_ simple_resource_get
def get_service_account ( self , name ) - > V1Service :
return self . api . core . read_namespaced_service_account ( name , self . name )
@ -134,7 +164,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
propagation_policy = ' Foreground ' ,
grace_period_seconds = grace_period_seconds ) )
@simple_resource_get
@_ simple_resource_get
def get ( self ) - > V1Namespace :
return self . api . core . read_namespace ( self . name )
@ -202,7 +232,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
neg_zones : List [ str ] = neg_info [ ' zones ' ]
return neg_name , neg_zones
@simple_resource_get
@_ simple_resource_get
def get_deployment ( self , name ) - > V1Deployment :
return self . api . apps . read_namespaced_deployment ( name , self . name )