@ -120,6 +120,33 @@ constexpr char kXds[] = "xds_experimental";
constexpr char kDefaultLocalityName [ ] = " xds_default_locality " ;
constexpr uint32_t kDefaultLocalityWeight = 3 ;
class ParsedXdsConfig : public ParsedLoadBalancingConfig {
public :
ParsedXdsConfig ( const char * balancer_name ,
RefCountedPtr < ParsedLoadBalancingConfig > child_policy ,
RefCountedPtr < ParsedLoadBalancingConfig > fallback_policy )
: balancer_name_ ( balancer_name ) ,
child_policy_ ( std : : move ( child_policy ) ) ,
fallback_policy_ ( std : : move ( fallback_policy ) ) { }
const char * name ( ) const override { return kXds ; }
const char * balancer_name ( ) const { return balancer_name_ ; } ;
RefCountedPtr < ParsedLoadBalancingConfig > child_policy ( ) const {
return child_policy_ ;
}
RefCountedPtr < ParsedLoadBalancingConfig > fallback_policy ( ) const {
return fallback_policy_ ;
}
private :
const char * balancer_name_ = nullptr ;
RefCountedPtr < ParsedLoadBalancingConfig > child_policy_ ;
RefCountedPtr < ParsedLoadBalancingConfig > fallback_policy_ ;
} ;
class XdsLb : public LoadBalancingPolicy {
public :
explicit XdsLb ( Args args ) ;
@ -240,6 +267,10 @@ class XdsLb : public LoadBalancingPolicy {
static void OnCallRetryTimerLocked ( void * arg , grpc_error * error ) ;
void StartCallLocked ( ) ;
void StartConnectivityWatchLocked ( ) ;
void CancelConnectivityWatchLocked ( ) ;
static void OnConnectivityChangedLocked ( void * arg , grpc_error * error ) ;
private :
// The owning LB policy.
RefCountedPtr < XdsLb > xdslb_policy_ ;
@ -247,6 +278,8 @@ class XdsLb : public LoadBalancingPolicy {
// The channel and its status.
grpc_channel * channel_ ;
bool shutting_down_ = false ;
grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE ;
grpc_closure on_connectivity_changed_ ;
// The data associated with the current LB call. It holds a ref to this LB
// channel. It's instantiated every time we query for backends. It's reset
@ -299,6 +332,28 @@ class XdsLb : public LoadBalancingPolicy {
PickerList pickers_ ;
} ;
class FallbackHelper : public ChannelControlHelper {
public :
explicit FallbackHelper ( RefCountedPtr < XdsLb > parent )
: parent_ ( std : : move ( parent ) ) { }
Subchannel * CreateSubchannel ( const grpc_channel_args & args ) override ;
grpc_channel * CreateChannel ( const char * target ,
const grpc_channel_args & args ) override ;
void UpdateState ( grpc_connectivity_state state ,
UniquePtr < SubchannelPicker > picker ) override ;
void RequestReresolution ( ) override ;
void set_child ( LoadBalancingPolicy * child ) { child_ = child ; }
private :
bool CalledByPendingFallback ( ) const ;
bool CalledByCurrentFallback ( ) const ;
RefCountedPtr < XdsLb > parent_ ;
LoadBalancingPolicy * child_ = nullptr ;
} ;
class LocalityMap {
public :
class LocalityEntry : public InternallyRefCounted < LocalityEntry > {
@ -308,7 +363,7 @@ class XdsLb : public LoadBalancingPolicy {
~ LocalityEntry ( ) = default ;
void UpdateLocked ( xds_grpclb_serverlist * serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
Parsed LoadBalancingConfig* child_policy_config ,
const grpc_channel_args * args ) ;
void ShutdownLocked ( ) ;
void ResetBackoffLocked ( ) ;
@ -355,7 +410,7 @@ class XdsLb : public LoadBalancingPolicy {
} ;
void UpdateLocked ( const LocalityList & locality_list ,
LoadBalancingPolicy : : Config * child_policy_config ,
Parsed LoadBalancingConfig* child_policy_config ,
const grpc_channel_args * args , XdsLb * parent ) ;
void ShutdownLocked ( ) ;
void ResetBackoffLocked ( ) ;
@ -395,15 +450,20 @@ class XdsLb : public LoadBalancingPolicy {
// If parsing succeeds, updates \a balancer_name, and updates \a
// child_policy_config_ and \a fallback_policy_config_ if they are also
// found. Does nothing upon failure.
void ParseLbConfig ( Config * xds_config ) ;
void ParseLbConfig ( const ParsedXds Config* xds_config ) ;
BalancerChannelState * LatestLbChannel ( ) const {
return pending_lb_chand_ ! = nullptr ? pending_lb_chand_ . get ( )
: lb_chand_ . get ( ) ;
}
// Callback to enter fallback mode.
// Methods for dealing with fallback state.
void MaybeCancelFallbackAtStartupChecks ( ) ;
static void OnFallbackTimerLocked ( void * arg , grpc_error * error ) ;
void UpdateFallbackPolicyLocked ( ) ;
OrphanablePtr < LoadBalancingPolicy > CreateFallbackPolicyLocked (
const char * name , const grpc_channel_args * args ) ;
void MaybeExitFallbackMode ( ) ;
// Who the client is trying to communicate with.
const char * server_name_ = nullptr ;
@ -428,19 +488,35 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0 ;
// Whether the checks for fallback at startup are ALL pending. There are
// several cases where this can be reset:
// 1. The fallback timer fires, we enter fallback mode.
// 2. Before the fallback timer fires, the LB channel becomes
// TRANSIENT_FAILURE or the LB call fails, we enter fallback mode.
// 3. Before the fallback timer fires, we receive a response from the
// balancer, we cancel the fallback timer and use the response to update the
// locality map.
bool fallback_at_startup_checks_pending_ = false ;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
RefCountedPtr < Config > fallback_policy_config_ ;
int lb_fallback_timeout_ms_ = 0 ;
// The backend addresses from the resolver.
UniquePtr < ServerAddressList > fallback_backend_addresses_ ;
ServerAddressList fallback_backend_addresses_ ;
// Fallback timer.
bool fallback_timer_callback_pending_ = false ;
grpc_timer lb_fallback_timer_ ;
grpc_closure lb_on_fallback_ ;
// The policy to use for the fallback backends.
RefCountedPtr < ParsedLoadBalancingConfig > fallback_policy_config_ ;
// Lock held when modifying the value of fallback_policy_ or
// pending_fallback_policy_.
Mutex fallback_policy_mu_ ;
// Non-null iff we are in fallback mode.
OrphanablePtr < LoadBalancingPolicy > fallback_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_fallback_policy_ ;
// The policy to use for the backends.
RefCountedPtr < Config > child_policy_config_ ;
RefCountedPtr < ParsedLoadBalancing Config> child_policy_config_ ;
// Map of policies to use in the backend
LocalityMap locality_map_ ;
// TODO(mhaidry) : Add support for multiple maps of localities
@ -494,17 +570,90 @@ XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
return pickers_ [ index ] . second - > Pick ( pick , error ) ;
}
//
// XdsLb::FallbackHelper
//
bool XdsLb : : FallbackHelper : : CalledByPendingFallback ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = parent_ - > pending_fallback_policy_ . get ( ) ;
}
bool XdsLb : : FallbackHelper : : CalledByCurrentFallback ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = parent_ - > fallback_policy_ . get ( ) ;
}
Subchannel * XdsLb : : FallbackHelper : : CreateSubchannel (
const grpc_channel_args & args ) {
if ( parent_ - > shutting_down_ | |
( ! CalledByPendingFallback ( ) & & ! CalledByCurrentFallback ( ) ) ) {
return nullptr ;
}
return parent_ - > channel_control_helper ( ) - > CreateSubchannel ( args ) ;
}
grpc_channel * XdsLb : : FallbackHelper : : CreateChannel (
const char * target , const grpc_channel_args & args ) {
if ( parent_ - > shutting_down_ | |
( ! CalledByPendingFallback ( ) & & ! CalledByCurrentFallback ( ) ) ) {
return nullptr ;
}
return parent_ - > channel_control_helper ( ) - > CreateChannel ( target , args ) ;
}
void XdsLb : : FallbackHelper : : UpdateState ( grpc_connectivity_state state ,
UniquePtr < SubchannelPicker > picker ) {
if ( parent_ - > shutting_down_ ) return ;
// If this request is from the pending fallback policy, ignore it until
// it reports READY, at which point we swap it into place.
if ( CalledByPendingFallback ( ) ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log (
GPR_INFO ,
" [xdslb %p helper %p] pending fallback policy %p reports state=%s " ,
parent_ . get ( ) , this , parent_ - > pending_fallback_policy_ . get ( ) ,
grpc_connectivity_state_name ( state ) ) ;
}
if ( state ! = GRPC_CHANNEL_READY ) return ;
grpc_pollset_set_del_pollset_set (
parent_ - > fallback_policy_ - > interested_parties ( ) ,
parent_ - > interested_parties ( ) ) ;
MutexLock lock ( & parent_ - > fallback_policy_mu_ ) ;
parent_ - > fallback_policy_ = std : : move ( parent_ - > pending_fallback_policy_ ) ;
} else if ( ! CalledByCurrentFallback ( ) ) {
// This request is from an outdated fallback policy, so ignore it.
return ;
}
parent_ - > channel_control_helper ( ) - > UpdateState ( state , std : : move ( picker ) ) ;
}
void XdsLb : : FallbackHelper : : RequestReresolution ( ) {
if ( parent_ - > shutting_down_ ) return ;
const LoadBalancingPolicy * latest_fallback_policy =
parent_ - > pending_fallback_policy_ ! = nullptr
? parent_ - > pending_fallback_policy_ . get ( )
: parent_ - > fallback_policy_ . get ( ) ;
if ( child_ ! = latest_fallback_policy ) return ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Re-resolution requested from the fallback policy (%p). " ,
parent_ . get ( ) , child_ ) ;
}
GPR_ASSERT ( parent_ - > lb_chand_ ! = nullptr ) ;
parent_ - > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
//
// serverlist parsing code
//
// Returns the backend addresses extracted from the given addresses.
UniquePtr < ServerAddressList > ExtractBackendAddresses (
const ServerAddressList & addresses ) {
auto backend_addresses = MakeUnique < ServerAddressList > ( ) ;
ServerAddressList ExtractBackendAddresses ( const ServerAddressList & addresses ) {
ServerAddressList backend_addresses ;
for ( size_t i = 0 ; i < addresses . size ( ) ; + + i ) {
if ( ! addresses [ i ] . IsBalancer ( ) ) {
backend_addresses - > emplace_back ( addresses [ i ] ) ;
backend_addresses . emplace_back ( addresses [ i ] ) ;
}
}
return backend_addresses ;
@ -584,6 +733,9 @@ XdsLb::BalancerChannelState::BalancerChannelState(
. set_multiplier ( GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER )
. set_jitter ( GRPC_XDS_RECONNECT_JITTER )
. set_max_backoff ( GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000 ) ) {
GRPC_CLOSURE_INIT ( & on_connectivity_changed_ ,
& XdsLb : : BalancerChannelState : : OnConnectivityChangedLocked ,
this , grpc_combiner_scheduler ( xdslb_policy_ - > combiner ( ) ) ) ;
channel_ = xdslb_policy_ - > channel_control_helper ( ) - > CreateChannel (
balancer_name , args ) ;
GPR_ASSERT ( channel_ ! = nullptr ) ;
@ -652,6 +804,62 @@ void XdsLb::BalancerChannelState::StartCallLocked() {
lb_calld_ - > StartQuery ( ) ;
}
void XdsLb : : BalancerChannelState : : StartConnectivityWatchLocked ( ) {
grpc_channel_element * client_channel_elem =
grpc_channel_stack_last_element ( grpc_channel_get_channel_stack ( channel_ ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
// Ref held by callback.
Ref ( DEBUG_LOCATION , " watch_lb_channel_connectivity " ) . release ( ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set (
xdslb_policy_ - > interested_parties ( ) ) ,
& connectivity_ , & on_connectivity_changed_ , nullptr ) ;
}
void XdsLb : : BalancerChannelState : : CancelConnectivityWatchLocked ( ) {
grpc_channel_element * client_channel_elem =
grpc_channel_stack_last_element ( grpc_channel_get_channel_stack ( channel_ ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set (
xdslb_policy_ - > interested_parties ( ) ) ,
nullptr , & on_connectivity_changed_ , nullptr ) ;
}
void XdsLb : : BalancerChannelState : : OnConnectivityChangedLocked (
void * arg , grpc_error * error ) {
BalancerChannelState * self = static_cast < BalancerChannelState * > ( arg ) ;
if ( ! self - > shutting_down_ & &
self - > xdslb_policy_ - > fallback_at_startup_checks_pending_ ) {
if ( self - > connectivity_ ! = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element * client_channel_elem =
grpc_channel_stack_last_element (
grpc_channel_get_channel_stack ( self - > channel_ ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set (
self - > xdslb_policy_ - > interested_parties ( ) ) ,
& self - > connectivity_ , & self - > on_connectivity_changed_ , nullptr ) ;
return ; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log ( GPR_INFO ,
" [xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
" entering fallback mode " ,
self ) ;
self - > xdslb_policy_ - > fallback_at_startup_checks_pending_ = false ;
grpc_timer_cancel ( & self - > xdslb_policy_ - > lb_fallback_timer_ ) ;
self - > xdslb_policy_ - > UpdateFallbackPolicyLocked ( ) ;
}
// Done watching connectivity state, so drop ref.
self - > Unref ( DEBUG_LOCATION , " watch_lb_channel_connectivity " ) ;
}
//
// XdsLb::BalancerChannelState::BalancerCallState
//
@ -897,6 +1105,14 @@ void XdsLb::BalancerChannelState::BalancerCallState::
( initial_response = xds_grpclb_initial_response_parse ( response_slice ) ) ! =
nullptr ) {
// Have NOT seen initial response, look for initial response.
// TODO(juanlishen): When we convert this to use the xds protocol, the
// balancer will send us a fallback timeout such that we should go into
// fallback mode if we have lost contact with the balancer after a certain
// period of time. We will need to save the timeout value here, and then
// when the balancer call ends, we will need to start a timer for the
// specified period of time, and if the timer fires, we go into fallback
// mode. We will also need to cancel the timer when we receive a serverlist
// from the balancer.
if ( initial_response - > has_client_stats_report_interval ) {
const grpc_millis interval = xds_grpclb_duration_to_millis (
& initial_response - > client_stats_report_interval ) ;
@ -938,9 +1154,6 @@ void XdsLb::BalancerChannelState::BalancerCallState::
gpr_free ( ipport ) ;
}
}
/* update serverlist */
// TODO(juanlishen): Don't ingore empty serverlist.
if ( serverlist - > num_servers > 0 ) {
// Pending LB channel receives a serverlist; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
@ -961,11 +1174,7 @@ void XdsLb::BalancerChannelState::BalancerCallState::
if ( lb_calld - > client_stats_report_interval_ > 0 & &
lb_calld - > client_stats_ = = nullptr ) {
lb_calld - > client_stats_ = MakeRefCounted < XdsLbClientStats > ( ) ;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = lb_calld - > Ref ( DEBUG_LOCATION , " client_load_report " ) ;
self . release ( ) ;
lb_calld - > Ref ( DEBUG_LOCATION , " client_load_report " ) . release ( ) ;
lb_calld - > ScheduleNextClientLoadReportLocked ( ) ;
}
if ( ! xdslb_policy - > locality_serverlist_ . empty ( ) & &
@ -978,19 +1187,21 @@ void XdsLb::BalancerChannelState::BalancerCallState::
xdslb_policy ) ;
}
xds_grpclb_destroy_serverlist ( serverlist ) ;
} else { /* new serverlist */
} else { // New serverlist.
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
// TODO(juanlishen): When we add EDS drop, we should change to check
// drop_percentage.
if ( serverlist - > num_servers = = 0 ) xdslb_policy - > MaybeExitFallbackMode ( ) ;
if ( ! xdslb_policy - > locality_serverlist_ . empty ( ) ) {
/* dispose of the old serverlist */
xds_grpclb_destroy_serverlist (
xdslb_policy - > locality_serverlist_ [ 0 ] - > serverlist ) ;
} else {
/* or dispose of the fallback */
xdslb_policy - > fallback_backend_addresses_ . reset ( ) ;
if ( xdslb_policy - > fallback_timer_callback_pending_ ) {
grpc_timer_cancel ( & xdslb_policy - > lb_fallback_timer_ ) ;
}
/* Initialize locality serverlist, currently the list only handles
* one child */
// This is the first serverlist we've received, don't enter fallback
// mode.
xdslb_policy - > MaybeCancelFallbackAtStartupChecks ( ) ;
// Initialize locality serverlist, currently the list only handles
// one child.
xdslb_policy - > locality_serverlist_ . emplace_back (
MakeUnique < LocalityServerlistEntry > ( ) ) ;
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_name =
@ -998,22 +1209,15 @@ void XdsLb::BalancerChannelState::BalancerCallState::
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_weight =
kDefaultLocalityWeight ;
}
// and update the copy in the XdsLb instance. This
// serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed.
// Update the serverlist in the XdsLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
// XdsLb instance is destroyed.
xdslb_policy - > locality_serverlist_ [ 0 ] - > serverlist = serverlist ;
xdslb_policy - > locality_map_ . UpdateLocked (
xdslb_policy - > locality_serverlist_ ,
xdslb_policy - > child_policy_config_ . get ( ) , xdslb_policy - > args_ ,
xdslb_policy ) ;
}
} else {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Received empty server list, ignoring. " ,
xdslb_policy ) ;
}
xds_grpclb_destroy_serverlist ( serverlist ) ;
}
} else {
// No valid initial response or serverlist found.
char * response_slice_str =
@ -1089,6 +1293,18 @@ void XdsLb::BalancerChannelState::BalancerCallState::
lb_chand - > StartCallRetryTimerLocked ( ) ;
}
xdslb_policy - > channel_control_helper ( ) - > RequestReresolution ( ) ;
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the
// fallback-at-startup case.
if ( xdslb_policy - > fallback_at_startup_checks_pending_ ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Balancer call finished; entering fallback mode " ,
xdslb_policy ) ;
xdslb_policy - > fallback_at_startup_checks_pending_ = false ;
grpc_timer_cancel ( & xdslb_policy - > lb_fallback_timer_ ) ;
lb_chand - > CancelConnectivityWatchLocked ( ) ;
xdslb_policy - > UpdateFallbackPolicyLocked ( ) ;
}
}
}
lb_calld - > Unref ( DEBUG_LOCATION , " lb_call_ended " ) ;
@ -1164,7 +1380,7 @@ XdsLb::XdsLb(Args args)
arg = grpc_channel_args_find ( args . args , GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS ) ;
lb_call_timeout_ms_ = grpc_channel_arg_get_integer ( arg , { 0 , 0 , INT_MAX } ) ;
// Record fallback timeout.
arg = grpc_channel_args_find ( args . args , GRPC_ARG_GRPCLB _FALLBACK_TIMEOUT_MS ) ;
arg = grpc_channel_args_find ( args . args , GRPC_ARG_XDS _FALLBACK_TIMEOUT_MS ) ;
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer (
arg , { GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS , 0 , INT_MAX } ) ;
}
@ -1177,14 +1393,25 @@ XdsLb::~XdsLb() {
void XdsLb : : ShutdownLocked ( ) {
shutting_down_ = true ;
if ( fallback_timer_callback _pending_ ) {
if ( fallback_at_startup_checks _pending_ ) {
grpc_timer_cancel ( & lb_fallback_timer_ ) ;
}
locality_map_ . ShutdownLocked ( ) ;
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if ( fallback_policy_ ! = nullptr ) {
grpc_pollset_set_del_pollset_set ( fallback_policy_ - > interested_parties ( ) ,
interested_parties ( ) ) ;
}
if ( pending_fallback_policy_ ! = nullptr ) {
grpc_pollset_set_del_pollset_set (
pending_fallback_policy_ - > interested_parties ( ) , interested_parties ( ) ) ;
}
{
MutexLock lock ( & fallback_policy_mu_ ) ;
fallback_policy_ . reset ( ) ;
pending_fallback_policy_ . reset ( ) ;
}
// We reset the LB channels here instead of in our destructor because they
// hold refs to XdsLb.
{
MutexLock lock ( & lb_chand_mu_ ) ;
lb_chand_ . reset ( ) ;
@ -1204,12 +1431,31 @@ void XdsLb::ResetBackoffLocked() {
grpc_channel_reset_connect_backoff ( pending_lb_chand_ - > channel ( ) ) ;
}
locality_map_ . ResetBackoffLocked ( ) ;
if ( fallback_policy_ ! = nullptr ) {
fallback_policy_ - > ResetBackoffLocked ( ) ;
}
if ( pending_fallback_policy_ ! = nullptr ) {
pending_fallback_policy_ - > ResetBackoffLocked ( ) ;
}
}
void XdsLb : : FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
// Delegate to the child_policy_ to fill the children subchannels.
// Delegate to the locality_map _ to fill the children subchannels.
locality_map_ . FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
{
// This must be done holding fallback_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock ( & fallback_policy_mu_ ) ;
if ( fallback_policy_ ! = nullptr ) {
fallback_policy_ - > FillChildRefsForChannelz ( child_subchannels ,
child_channels ) ;
}
if ( pending_fallback_policy_ ! = nullptr ) {
pending_fallback_policy_ - > FillChildRefsForChannelz ( child_subchannels ,
child_channels ) ;
}
}
MutexLock lock ( & lb_chand_mu_ ) ;
if ( lb_chand_ ! = nullptr ) {
grpc_core : : channelz : : ChannelNode * channel_node =
@ -1266,92 +1512,224 @@ void XdsLb::ProcessAddressesAndChannelArgsLocked(
grpc_channel_args_destroy ( lb_channel_args ) ;
}
void XdsLb : : ParseLbConfig ( Config * xds_config ) {
const grpc_json * xds_config_json = xds_config - > config ( ) ;
const char * balancer_name = nullptr ;
grpc_json * child_policy = nullptr ;
grpc_json * fallback_policy = nullptr ;
for ( const grpc_json * field = xds_config_json ; field ! = nullptr ;
field = field - > next ) {
if ( field - > key = = nullptr ) return ;
if ( strcmp ( field - > key , " balancerName " ) = = 0 ) {
if ( balancer_name ! = nullptr ) return ; // Duplicate.
if ( field - > type ! = GRPC_JSON_STRING ) return ;
balancer_name = field - > value ;
} else if ( strcmp ( field - > key , " childPolicy " ) = = 0 ) {
if ( child_policy ! = nullptr ) return ; // Duplicate.
child_policy = ParseLoadBalancingConfig ( field ) ;
} else if ( strcmp ( field - > key , " fallbackPolicy " ) = = 0 ) {
if ( fallback_policy ! = nullptr ) return ; // Duplicate.
fallback_policy = ParseLoadBalancingConfig ( field ) ;
}
}
if ( balancer_name = = nullptr ) return ; // Required field.
balancer_name_ = UniquePtr < char > ( gpr_strdup ( balancer_name ) ) ;
if ( child_policy ! = nullptr ) {
child_policy_config_ =
MakeRefCounted < Config > ( child_policy , xds_config - > service_config ( ) ) ;
}
if ( fallback_policy ! = nullptr ) {
fallback_policy_config_ =
MakeRefCounted < Config > ( fallback_policy , xds_config - > service_config ( ) ) ;
}
void XdsLb : : ParseLbConfig ( const ParsedXdsConfig * xds_config ) {
if ( xds_config = = nullptr | | xds_config - > balancer_name ( ) = = nullptr ) return ;
// TODO(yashykt) : does this need to be a gpr_strdup
balancer_name_ = UniquePtr < char > ( gpr_strdup ( xds_config - > balancer_name ( ) ) ) ;
child_policy_config_ = xds_config - > child_policy ( ) ;
fallback_policy_config_ = xds_config - > fallback_policy ( ) ;
}
void XdsLb : : UpdateLocked ( UpdateArgs args ) {
const bool is_initial_update = lb_chand_ = = nullptr ;
ParseLbConfig ( args . config . get ( ) ) ;
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
ParseLbConfig ( static_cast < const ParsedXdsConfig * > ( args . config . get ( ) ) ) ;
if ( balancer_name_ = = nullptr ) {
gpr_log ( GPR_ERROR , " [xdslb %p] LB config parsing fails. " , this ) ;
return ;
}
ProcessAddressesAndChannelArgsLocked ( args . addresses , * args . args ) ;
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
// have been created from a serverlist.
// TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS.
locality_map_ . UpdateLocked ( locality_serverlist_ , child_policy_config_ . get ( ) ,
args_ , this ) ;
// If this is the initial update, start the fallback timer.
// Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new.
if ( fallback_policy_ ! = nullptr ) UpdateFallbackPolicyLocked ( ) ;
// If this is the initial update, start the fallback-at-startup checks.
if ( is_initial_update ) {
if ( lb_fallback_timeout_ms_ > 0 & & locality_serverlist_ . empty ( ) & &
! fallback_timer_callback_pending_ ) {
grpc_millis deadline = ExecCtx : : Get ( ) - > Now ( ) + lb_fallback_timeout_ms_ ;
Ref ( DEBUG_LOCATION , " on_fallback_timer " ) . release ( ) ; // Held by closure
GRPC_CLOSURE_INIT ( & lb_on_fallback_ , & XdsLb : : OnFallbackTimerLocked , this ,
grpc_combiner_scheduler ( combiner ( ) ) ) ;
fallback_timer_callback _pending_ = true ;
fallback_at_startup_checks_pending_ = true ;
grpc_timer_init ( & lb_fallback_timer_ , deadline , & lb_on_fallback_ ) ;
// TODO(juanlishen): Monitor the connectivity state of the balancer
// channel. If the channel reports TRANSIENT_FAILURE before the
// fallback timeout expires, go into fallback mode early .
}
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE, we go into fallback mode even if
// the fallback timeout has not elapsed .
lb_chand_ - > StartConnectivityWatchLocked ( ) ;
}
}
//
// code for balancer channel and call
// fallback-related methods
//
void XdsLb : : MaybeCancelFallbackAtStartupChecks ( ) {
if ( ! fallback_at_startup_checks_pending_ ) return ;
gpr_log ( GPR_INFO ,
" [xdslb %p] Cancelling fallback timer and LB channel connectivity "
" watch " ,
this ) ;
grpc_timer_cancel ( & lb_fallback_timer_ ) ;
lb_chand_ - > CancelConnectivityWatchLocked ( ) ;
fallback_at_startup_checks_pending_ = false ;
}
void XdsLb : : OnFallbackTimerLocked ( void * arg , grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
xdslb_policy - > fallback_timer_callback_pending_ = false ;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if ( xdslb_policy - > locality_serverlist_ . empty ( ) & &
// If some fallback-at-startup check is done after the timer fires but before
// this callback actually runs, don't fall back.
if ( xdslb_policy - > fallback_at_startup_checks_pending_ & &
! xdslb_policy - > shutting_down_ & & error = = GRPC_ERROR_NONE ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Fallback timer fired. Not using fallback backends " ,
" [xdslb %p] Child policy not ready after fallback timeout; "
" entering fallback mode " ,
xdslb_policy ) ;
}
xdslb_policy - > fallback_at_startup_checks_pending_ = false ;
xdslb_policy - > UpdateFallbackPolicyLocked ( ) ;
xdslb_policy - > lb_chand_ - > CancelConnectivityWatchLocked ( ) ;
}
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_fallback_timer " ) ;
}
void XdsLb : : UpdateFallbackPolicyLocked ( ) {
if ( shutting_down_ ) return ;
// Construct update args.
UpdateArgs update_args ;
update_args . addresses = fallback_backend_addresses_ ;
update_args . config = fallback_policy_config_ = = nullptr
? nullptr
: fallback_policy_config_ - > Ref ( ) ;
update_args . args = grpc_channel_args_copy ( args_ ) ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char * fallback_policy_name = fallback_policy_config_ = = nullptr
? " round_robin "
: fallback_policy_config_ - > name ( ) ;
const bool create_policy =
// case 1
fallback_policy_ = = nullptr | |
// case 2b
( pending_fallback_policy_ = = nullptr & &
strcmp ( fallback_policy_ - > name ( ) , fallback_policy_name ) ! = 0 ) | |
// case 3b
( pending_fallback_policy_ ! = nullptr & &
strcmp ( pending_fallback_policy_ - > name ( ) , fallback_policy_name ) ! = 0 ) ;
LoadBalancingPolicy * policy_to_update = nullptr ;
if ( create_policy ) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Creating new %sfallback policy %s " , this ,
fallback_policy_ = = nullptr ? " " : " pending " ,
fallback_policy_name ) ;
}
auto new_policy =
CreateFallbackPolicyLocked ( fallback_policy_name , update_args . args ) ;
auto & lb_policy = fallback_policy_ = = nullptr ? fallback_policy_
: pending_fallback_policy_ ;
{
MutexLock lock ( & fallback_policy_mu_ ) ;
lb_policy = std : : move ( new_policy ) ;
}
policy_to_update = lb_policy . get ( ) ;
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_fallback_policy_ ! = nullptr
? pending_fallback_policy_ . get ( )
: fallback_policy_ . get ( ) ;
}
GPR_ASSERT ( policy_to_update ! = nullptr ) ;
// Update the policy.
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log (
GPR_INFO , " [xdslb %p] Updating %sfallback policy %p " , this ,
policy_to_update = = pending_fallback_policy_ . get ( ) ? " pending " : " " ,
policy_to_update ) ;
}
policy_to_update - > UpdateLocked ( std : : move ( update_args ) ) ;
}
OrphanablePtr < LoadBalancingPolicy > XdsLb : : CreateFallbackPolicyLocked (
const char * name , const grpc_channel_args * args ) {
FallbackHelper * helper = New < FallbackHelper > ( Ref ( ) ) ;
LoadBalancingPolicy : : Args lb_policy_args ;
lb_policy_args . combiner = combiner ( ) ;
lb_policy_args . args = args ;
lb_policy_args . channel_control_helper =
UniquePtr < ChannelControlHelper > ( helper ) ;
OrphanablePtr < LoadBalancingPolicy > lb_policy =
LoadBalancingPolicyRegistry : : CreateLoadBalancingPolicy (
name , std : : move ( lb_policy_args ) ) ;
if ( GPR_UNLIKELY ( lb_policy = = nullptr ) ) {
gpr_log ( GPR_ERROR , " [xdslb %p] Failure creating fallback policy %s " , this ,
name ) ;
return nullptr ;
}
helper - > set_child ( lb_policy . get ( ) ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Created new fallback policy %s (%p) " , this ,
name , lb_policy . get ( ) ) ;
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set ( lb_policy - > interested_parties ( ) ,
interested_parties ( ) ) ;
return lb_policy ;
}
void XdsLb : : MaybeExitFallbackMode ( ) {
if ( fallback_policy_ = = nullptr ) return ;
gpr_log ( GPR_INFO , " [xdslb %p] Exiting fallback mode " , this ) ;
fallback_policy_ . reset ( ) ;
pending_fallback_policy_ . reset ( ) ;
}
//
// XdsLb::LocalityMap
//
void XdsLb : : LocalityMap : : PruneLocalities ( const LocalityList & locality_list ) {
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; ) {
bool found = false ;
@ -1370,7 +1748,7 @@ void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
void XdsLb : : LocalityMap : : UpdateLocked (
const LocalityList & locality_serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
Parsed LoadBalancingConfig* child_policy_config ,
const grpc_channel_args * args , XdsLb * parent ) {
if ( parent - > shutting_down_ ) return ;
for ( size_t i = 0 ; i < locality_serverlist . size ( ) ; i + + ) {
@ -1391,18 +1769,18 @@ void XdsLb::LocalityMap::UpdateLocked(
PruneLocalities ( locality_serverlist ) ;
}
void grpc_core : : XdsLb : : LocalityMap : : ShutdownLocked ( ) {
void XdsLb : : LocalityMap : : ShutdownLocked ( ) {
MutexLock lock ( & child_refs_mu_ ) ;
map_ . clear ( ) ;
}
void grpc_core : : XdsLb : : LocalityMap : : ResetBackoffLocked ( ) {
void XdsLb : : LocalityMap : : ResetBackoffLocked ( ) {
for ( auto & p : map_ ) {
p . second - > ResetBackoffLocked ( ) ;
}
}
void grpc_core : : XdsLb : : LocalityMap : : FillChildRefsForChannelz (
void XdsLb : : LocalityMap : : FillChildRefsForChannelz (
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
MutexLock lock ( & child_refs_mu_ ) ;
@ -1411,7 +1789,9 @@ void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
}
}
// Locality Entry child policy methods
//
// XdsLb::LocalityMap::LocalityEntry
//
grpc_channel_args *
XdsLb : : LocalityMap : : LocalityEntry : : CreateChildPolicyArgsLocked (
@ -1463,21 +1843,15 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
void XdsLb : : LocalityMap : : LocalityEntry : : UpdateLocked (
xds_grpclb_serverlist * serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
Parsed LoadBalancingConfig* child_policy_config ,
const grpc_channel_args * args_in ) {
if ( parent_ - > shutting_down_ ) return ;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
// TODO(juanlishen): Change this as part of implementing fallback mode.
GPR_ASSERT ( serverlist ! = nullptr ) ;
GPR_ASSERT ( serverlist - > num_servers > 0 ) ;
// Construct update args.
UpdateArgs update_args ;
update_args . addresses = ProcessServerlist ( serverlist ) ;
update_args . config =
child_policy_config = = nullptr ? nullptr : child_policy_config - > Ref ( ) ;
update_args . args = CreateChildPolicyArgsLocked ( args_in ) ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -1618,7 +1992,7 @@ void XdsLb::LocalityMap::LocalityEntry::Orphan() {
}
//
// LocalityEntry::Helper implementation
// XdsLb:: LocalityEntry::Helper
//
bool XdsLb : : LocalityMap : : LocalityEntry : : Helper : : CalledByPendingChild ( ) const {
@ -1671,9 +2045,10 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
// This request is from an outdated child, so ignore it.
return ;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
// At this point, child_ must be the current child policy.
if ( state = = GRPC_CHANNEL_READY ) entry_ - > parent_ - > MaybeExitFallbackMode ( ) ;
// If we are in fallback mode, ignore update request from the child policy.
if ( entry_ - > parent_ - > fallback_policy_ ! = nullptr ) return ;
GPR_ASSERT ( entry_ - > parent_ - > lb_chand_ ! = nullptr ) ;
RefCountedPtr < XdsLbClientStats > client_stats =
entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
@ -1783,6 +2158,77 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
const char * name ( ) const override { return kXds ; }
RefCountedPtr < ParsedLoadBalancingConfig > ParseLoadBalancingConfig (
const grpc_json * json , grpc_error * * error ) const override {
GPR_DEBUG_ASSERT ( error ! = nullptr & & * error = = GRPC_ERROR_NONE ) ;
if ( json = = nullptr ) {
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
// field or in the client API.
* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:loadBalancingPolicy error:Xds Parser has required field - "
" balancerName. Please use loadBalancingConfig field of service "
" config instead. " ) ;
return nullptr ;
}
GPR_DEBUG_ASSERT ( strcmp ( json - > key , name ( ) ) = = 0 ) ;
InlinedVector < grpc_error * , 3 > error_list ;
const char * balancer_name = nullptr ;
RefCountedPtr < ParsedLoadBalancingConfig > child_policy ;
RefCountedPtr < ParsedLoadBalancingConfig > fallback_policy ;
for ( const grpc_json * field = json - > child ; field ! = nullptr ;
field = field - > next ) {
if ( field - > key = = nullptr ) continue ;
if ( strcmp ( field - > key , " balancerName " ) = = 0 ) {
if ( balancer_name ! = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:balancerName error:Duplicate entry " ) ) ;
}
if ( field - > type ! = GRPC_JSON_STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:balancerName error:type should be string " ) ) ;
continue ;
}
balancer_name = field - > value ;
} else if ( strcmp ( field - > key , " childPolicy " ) = = 0 ) {
if ( child_policy ! = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:childPolicy error:Duplicate entry " ) ) ;
}
grpc_error * parse_error = GRPC_ERROR_NONE ;
child_policy = LoadBalancingPolicyRegistry : : ParseLoadBalancingConfig (
field , & parse_error ) ;
if ( child_policy = = nullptr ) {
GPR_DEBUG_ASSERT ( parse_error ! = GRPC_ERROR_NONE ) ;
error_list . push_back ( parse_error ) ;
}
} else if ( strcmp ( field - > key , " fallbackPolicy " ) = = 0 ) {
if ( fallback_policy ! = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:fallbackPolicy error:Duplicate entry " ) ) ;
}
grpc_error * parse_error = GRPC_ERROR_NONE ;
fallback_policy = LoadBalancingPolicyRegistry : : ParseLoadBalancingConfig (
field , & parse_error ) ;
if ( fallback_policy = = nullptr ) {
GPR_DEBUG_ASSERT ( parse_error ! = GRPC_ERROR_NONE ) ;
error_list . push_back ( parse_error ) ;
}
}
}
if ( balancer_name = = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:balancerName error:not found " ) ) ;
}
if ( error_list . empty ( ) ) {
return RefCountedPtr < ParsedLoadBalancingConfig > ( New < ParsedXdsConfig > (
balancer_name , std : : move ( child_policy ) , std : : move ( fallback_policy ) ) ) ;
} else {
* error = GRPC_ERROR_CREATE_FROM_VECTOR ( " Xds Parser " , & error_list ) ;
return nullptr ;
}
}
} ;
} // namespace