@ -68,7 +68,9 @@
# include <grpc/support/string_util.h>
# include <grpc/support/time.h>
# include "include/grpc/support/alloc.h"
# include "src/core/ext/filters/client_channel/client_channel.h"
# include "src/core/ext/filters/client_channel/lb_policy.h"
# include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
# include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
# include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
@ -78,12 +80,14 @@
# include "src/core/ext/filters/client_channel/parse_address.h"
# include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
# include "src/core/ext/filters/client_channel/server_address.h"
# include "src/core/ext/filters/client_channel/service_config.h"
# include "src/core/lib/backoff/backoff.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/channel/channel_stack.h"
# include "src/core/lib/gpr/host_port.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gprpp/manual_constructor.h"
# include "src/core/lib/gprpp/map.h"
# include "src/core/lib/gprpp/memory.h"
# include "src/core/lib/gprpp/mutex_lock.h"
# include "src/core/lib/gprpp/orphanable.h"
@ -98,7 +102,6 @@
# include "src/core/lib/surface/call.h"
# include "src/core/lib/surface/channel.h"
# include "src/core/lib/surface/channel_init.h"
# include "src/core/lib/transport/service_config.h"
# include "src/core/lib/transport/static_metadata.h"
# define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -114,6 +117,7 @@ TraceFlag grpc_lb_xds_trace(false, "xds");
namespace {
constexpr char kXds [ ] = " xds_experimental " ;
constexpr char kDefaultLocalityName [ ] = " xds_default_locality " ;
class XdsLb : public LoadBalancingPolicy {
public :
@ -121,14 +125,16 @@ class XdsLb : public LoadBalancingPolicy {
const char * name ( ) const override { return kXds ; }
void UpdateLocked ( const grpc_channel_args & args ,
RefCountedPtr < Config > lb_config ) override ;
void UpdateLocked ( UpdateArgs args ) override ;
void ResetBackoffLocked ( ) override ;
void FillChildRefsForChannelz (
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) override ;
private :
struct LocalityServerlistEntry ;
using LocalityList = InlinedVector < UniquePtr < LocalityServerlistEntry > , 1 > ;
/// Contains a channel to the LB server and all the data related to the
/// channel.
class BalancerChannelState
@ -267,25 +273,88 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr < XdsLbClientStats > client_stats_ ;
} ;
class Helper : public ChannelControlHelper {
class LocalityMap {
public :
explicit Helper ( RefCountedPtr < XdsLb > parent ) : parent_ ( std : : move ( parent ) ) { }
class LocalityEntry : public InternallyRefCounted < LocalityEntry > {
public :
explicit LocalityEntry ( RefCountedPtr < XdsLb > parent )
: parent_ ( std : : move ( parent ) ) {
gpr_mu_init ( & child_policy_mu_ ) ;
}
~ LocalityEntry ( ) { gpr_mu_destroy ( & child_policy_mu_ ) ; }
void UpdateLocked ( xds_grpclb_serverlist * serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
const grpc_channel_args * args ) ;
void ShutdownLocked ( ) ;
void ResetBackoffLocked ( ) ;
void FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) ;
void Orphan ( ) override ;
private :
class Helper : public ChannelControlHelper {
public :
explicit Helper ( RefCountedPtr < LocalityEntry > entry )
: entry_ ( std : : move ( entry ) ) { }
Subchannel * CreateSubchannel ( const grpc_channel_args & args ) override ;
grpc_channel * CreateChannel ( const char * target ,
const grpc_channel_args & args ) override ;
void UpdateState ( grpc_connectivity_state state , grpc_error * state_error ,
UniquePtr < SubchannelPicker > picker ) override ;
void RequestReresolution ( ) override ;
void set_child ( LoadBalancingPolicy * child ) { child_ = child ; }
private :
bool CalledByPendingChild ( ) const ;
bool CalledByCurrentChild ( ) const ;
RefCountedPtr < LocalityEntry > entry_ ;
LoadBalancingPolicy * child_ = nullptr ;
} ;
// Methods for dealing with the child policy.
OrphanablePtr < LoadBalancingPolicy > CreateChildPolicyLocked (
const char * name , const grpc_channel_args * args ) ;
grpc_channel_args * CreateChildPolicyArgsLocked (
const grpc_channel_args * args ) ;
OrphanablePtr < LoadBalancingPolicy > child_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_child_policy_ ;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_ ;
RefCountedPtr < XdsLb > parent_ ;
} ;
Subchannel * CreateSubchannel ( const grpc_channel_args & args ) override ;
grpc_channel * CreateChannel ( const char * target ,
const grpc_channel_args & args ) override ;
void UpdateState ( grpc_connectivity_state state , grpc_error * state_error ,
UniquePtr < SubchannelPicker > picker ) override ;
void RequestReresolution ( ) override ;
LocalityMap ( ) { gpr_mu_init ( & child_refs_mu_ ) ; }
~ LocalityMap ( ) { gpr_mu_destroy ( & child_refs_mu_ ) ; }
void set_child ( LoadBalancingPolicy * child ) { child_ = child ; }
void UpdateLocked ( const LocalityList & locality_list ,
LoadBalancingPolicy : : Config * child_policy_config ,
const grpc_channel_args * args , XdsLb * parent ) ;
void ShutdownLocked ( ) ;
void ResetBackoffLocked ( ) ;
void FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) ;
private :
bool CalledByPendingChild ( ) const ;
bool CalledByCurrentChild ( ) const ;
void PruneLocalities ( const LocalityList & locality_list ) ;
Map < UniquePtr < char > , OrphanablePtr < LocalityEntry > , StringLess > map_ ;
// Lock held while filling child refs for all localities
// inside the map
gpr_mu child_refs_mu_ ;
} ;
RefCountedPtr < XdsLb > parent_ ;
LoadBalancingPolicy * child_ = nullptr ;
struct LocalityServerlistEntry {
~ LocalityServerlistEntry ( ) {
gpr_free ( locality_name ) ;
xds_grpclb_destroy_serverlist ( serverlist ) ;
}
char * locality_name ;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
xds_grpclb_serverlist * serverlist ;
} ;
~ XdsLb ( ) ;
@ -293,7 +362,8 @@ class XdsLb : public LoadBalancingPolicy {
void ShutdownLocked ( ) override ;
// Helper function used in UpdateLocked().
void ProcessChannelArgsLocked ( const grpc_channel_args & args ) ;
void ProcessAddressesAndChannelArgsLocked ( const ServerAddressList & addresses ,
const grpc_channel_args & args ) ;
// Parses the xds config given the JSON node of the first child of XdsConfig.
// If parsing succeeds, updates \a balancer_name, and updates \a
@ -309,12 +379,6 @@ class XdsLb : public LoadBalancingPolicy {
// Callback to enter fallback mode.
static void OnFallbackTimerLocked ( void * arg , grpc_error * error ) ;
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked ( ) ;
grpc_channel_args * CreateChildPolicyArgsLocked ( ) ;
OrphanablePtr < LoadBalancingPolicy > CreateChildPolicyLocked (
const char * name , const grpc_channel_args * args ) ;
// Who the client is trying to communicate with.
const char * server_name_ = nullptr ;
@ -338,10 +402,6 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0 ;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
xds_grpclb_serverlist * serverlist_ = nullptr ;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
RefCountedPtr < Config > fallback_policy_config_ ;
@ -355,11 +415,12 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the backends.
RefCountedPtr < Config > child_policy_config_ ;
OrphanablePtr < LoadBalancingPolicy > child_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_child_policy_ ;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_ ;
// Map of policies to use in the backend
LocalityMap locality_map_ ;
LocalityList locality_serverlist_ ;
// TODO(mhaidry) : Add a pending locality map that may be swapped with the
// the current one when new localities in the pending map are ready
// to accept connections
} ;
//
@ -378,105 +439,6 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
return result ;
}
//
// XdsLb::Helper
//
bool XdsLb : : Helper : : CalledByPendingChild ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = parent_ - > pending_child_policy_ . get ( ) ;
}
bool XdsLb : : Helper : : CalledByCurrentChild ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = parent_ - > child_policy_ . get ( ) ;
}
Subchannel * XdsLb : : Helper : : CreateSubchannel ( const grpc_channel_args & args ) {
if ( parent_ - > shutting_down_ | |
( ! CalledByPendingChild ( ) & & ! CalledByCurrentChild ( ) ) ) {
return nullptr ;
}
return parent_ - > channel_control_helper ( ) - > CreateSubchannel ( args ) ;
}
grpc_channel * XdsLb : : Helper : : CreateChannel ( const char * target ,
const grpc_channel_args & args ) {
if ( parent_ - > shutting_down_ | |
( ! CalledByPendingChild ( ) & & ! CalledByCurrentChild ( ) ) ) {
return nullptr ;
}
return parent_ - > channel_control_helper ( ) - > CreateChannel ( target , args ) ;
}
void XdsLb : : Helper : : UpdateState ( grpc_connectivity_state state ,
grpc_error * state_error ,
UniquePtr < SubchannelPicker > picker ) {
if ( parent_ - > shutting_down_ ) {
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if ( CalledByPendingChild ( ) ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p helper %p] pending child policy %p reports state=%s " ,
parent_ . get ( ) , this , parent_ - > pending_child_policy_ . get ( ) ,
grpc_connectivity_state_name ( state ) ) ;
}
if ( state ! = GRPC_CHANNEL_READY ) {
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
grpc_pollset_set_del_pollset_set (
parent_ - > child_policy_ - > interested_parties ( ) ,
parent_ - > interested_parties ( ) ) ;
MutexLock lock ( & parent_ - > child_policy_mu_ ) ;
parent_ - > child_policy_ = std : : move ( parent_ - > pending_child_policy_ ) ;
} else if ( ! CalledByCurrentChild ( ) ) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
GPR_ASSERT ( parent_ - > lb_chand_ ! = nullptr ) ;
RefCountedPtr < XdsLbClientStats > client_stats =
parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
? nullptr
: parent_ - > lb_chand_ - > lb_calld ( ) - > client_stats ( ) ;
parent_ - > channel_control_helper ( ) - > UpdateState (
state , state_error ,
UniquePtr < SubchannelPicker > (
New < Picker > ( std : : move ( picker ) , std : : move ( client_stats ) ) ) ) ;
}
void XdsLb : : Helper : : RequestReresolution ( ) {
if ( parent_ - > shutting_down_ ) return ;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if ( parent_ - > pending_child_policy_ ! = nullptr & & ! CalledByPendingChild ( ) ) {
return ;
}
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Re-resolution requested from the internal RR policy "
" (%p). " ,
parent_ . get ( ) , parent_ - > child_policy_ . get ( ) ) ;
}
GPR_ASSERT ( parent_ - > lb_chand_ ! = nullptr ) ;
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if ( parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr | |
! parent_ - > lb_chand_ - > lb_calld ( ) - > seen_initial_response ( ) ) {
parent_ - > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
}
//
// serverlist parsing code
//
@ -539,15 +501,14 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) {
}
// Returns addresses extracted from \a serverlist.
UniquePtr < ServerAddressList > ProcessServerlist (
const xds_grpclb_serverlist * serverlist ) {
auto addresses = MakeUnique < ServerAddressList > ( ) ;
ServerAddressList ProcessServerlist ( const xds_grpclb_serverlist * serverlist ) {
ServerAddressList addresses ;
for ( size_t i = 0 ; i < serverlist - > num_servers ; + + i ) {
const xds_grpclb_server * server = serverlist - > servers [ i ] ;
if ( ! IsServerValid ( serverlist - > servers [ i ] , i , false ) ) continue ;
grpc_resolved_address addr ;
ParseServer ( server , & addr ) ;
addresses - > emplace_back ( addr , nullptr ) ;
addresses . emplace_back ( addr , nullptr ) ;
}
return addresses ;
}
@ -952,7 +913,9 @@ void XdsLb::BalancerChannelState::BalancerCallState::
self . release ( ) ;
lb_calld - > ScheduleNextClientLoadReportLocked ( ) ;
}
if ( xds_grpclb_serverlist_equals ( xdslb_policy - > serverlist_ , serverlist ) ) {
if ( ! xdslb_policy - > locality_serverlist_ . empty ( ) & &
xds_grpclb_serverlist_equals (
xdslb_policy - > locality_serverlist_ [ 0 ] - > serverlist , serverlist ) ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Incoming server list identical to current, "
@ -961,21 +924,31 @@ void XdsLb::BalancerChannelState::BalancerCallState::
}
xds_grpclb_destroy_serverlist ( serverlist ) ;
} else { /* new serverlist */
if ( xdslb_policy - > serverlist_ ! = nullptr ) {
if ( ! xdslb_policy - > locality_serverlist_ . empty ( ) ) {
/* dispose of the old serverlist */
xds_grpclb_destroy_serverlist ( xdslb_policy - > serverlist_ ) ;
xds_grpclb_destroy_serverlist (
xdslb_policy - > locality_serverlist_ [ 0 ] - > serverlist ) ;
} else {
/* or dispose of the fallback */
xdslb_policy - > fallback_backend_addresses_ . reset ( ) ;
if ( xdslb_policy - > fallback_timer_callback_pending_ ) {
grpc_timer_cancel ( & xdslb_policy - > lb_fallback_timer_ ) ;
}
/* Initialize locality serverlist, currently the list only handles
* one child */
xdslb_policy - > locality_serverlist_ . emplace_back (
MakeUnique < LocalityServerlistEntry > ( ) ) ;
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_name =
static_cast < char * > ( gpr_strdup ( kDefaultLocalityName ) ) ;
}
// and update the copy in the XdsLb instance. This
// serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed.
xdslb_policy - > serverlist_ = serverlist ;
xdslb_policy - > CreateOrUpdateChildPolicyLocked ( ) ;
xdslb_policy - > locality_serverlist_ [ 0 ] - > serverlist = serverlist ;
xdslb_policy - > locality_map_ . UpdateLocked (
xdslb_policy - > locality_serverlist_ ,
xdslb_policy - > child_policy_config_ . get ( ) , xdslb_policy - > args_ ,
xdslb_policy ) ;
}
} else {
if ( grpc_lb_xds_trace . enabled ( ) ) {
@ -1082,9 +1055,6 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI ,
// The resolved addresses, which will be generated by the name resolver
// used in the LB channel.
GRPC_ARG_SERVER_ADDRESS_LIST ,
// The LB channel should use the authority indicated by the target
// authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args),
// as opposed to the authority from the parent channel.
@ -1116,9 +1086,11 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
// ctor and dtor
//
XdsLb : : XdsLb ( Args args ) : LoadBalancingPolicy ( std : : move ( args ) ) {
XdsLb : : XdsLb ( Args args )
: LoadBalancingPolicy ( std : : move ( args ) ) ,
locality_map_ ( ) ,
locality_serverlist_ ( ) {
gpr_mu_init ( & lb_chand_mu_ ) ;
gpr_mu_init ( & child_policy_mu_ ) ;
// Record server name.
const grpc_arg * arg = grpc_channel_args_find ( args . args , GRPC_ARG_SERVER_URI ) ;
const char * server_uri = grpc_channel_arg_get_string ( arg ) ;
@ -1145,10 +1117,7 @@ XdsLb::~XdsLb() {
gpr_mu_destroy ( & lb_chand_mu_ ) ;
gpr_free ( ( void * ) server_name_ ) ;
grpc_channel_args_destroy ( args_ ) ;
if ( serverlist_ ! = nullptr ) {
xds_grpclb_destroy_serverlist ( serverlist_ ) ;
}
gpr_mu_destroy ( & child_policy_mu_ ) ;
locality_serverlist_ . clear ( ) ;
}
void XdsLb : : ShutdownLocked ( ) {
@ -1156,19 +1125,7 @@ void XdsLb::ShutdownLocked() {
if ( fallback_timer_callback_pending_ ) {
grpc_timer_cancel ( & lb_fallback_timer_ ) ;
}
if ( child_policy_ ! = nullptr ) {
grpc_pollset_set_del_pollset_set ( child_policy_ - > interested_parties ( ) ,
interested_parties ( ) ) ;
}
if ( pending_child_policy_ ! = nullptr ) {
grpc_pollset_set_del_pollset_set (
pending_child_policy_ - > interested_parties ( ) , interested_parties ( ) ) ;
}
{
MutexLock lock ( & child_policy_mu_ ) ;
child_policy_ . reset ( ) ;
pending_child_policy_ . reset ( ) ;
}
locality_map_ . ShutdownLocked ( ) ;
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1191,30 +1148,13 @@ void XdsLb::ResetBackoffLocked() {
if ( pending_lb_chand_ ! = nullptr ) {
grpc_channel_reset_connect_backoff ( pending_lb_chand_ - > channel ( ) ) ;
}
if ( child_policy_ ! = nullptr ) {
child_policy_ - > ResetBackoffLocked ( ) ;
}
if ( pending_child_policy_ ! = nullptr ) {
pending_child_policy_ - > ResetBackoffLocked ( ) ;
}
locality_map_ . ResetBackoffLocked ( ) ;
}
void XdsLb : : FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
{
// Delegate to the child_policy_ to fill the children subchannels.
// This must be done holding child_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock ( & child_policy_mu_ ) ;
if ( child_policy_ ! = nullptr ) {
child_policy_ - > FillChildRefsForChannelz ( child_subchannels ,
child_channels ) ;
}
if ( pending_child_policy_ ! = nullptr ) {
pending_child_policy_ - > FillChildRefsForChannelz ( child_subchannels ,
child_channels ) ;
}
}
// Delegate to the child_policy_ to fill the children subchannels.
locality_map_ . FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
MutexLock lock ( & lb_chand_mu_ ) ;
if ( lb_chand_ ! = nullptr ) {
grpc_core : : channelz : : ChannelNode * channel_node =
@ -1232,17 +1172,10 @@ void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
}
}
void XdsLb : : ProcessChannelArgsLocked ( const grpc_channel_args & args ) {
const ServerAddressList * addresses = FindServerAddressListChannelArg ( & args ) ;
if ( addresses = = nullptr ) {
// Ignore this update.
gpr_log ( GPR_ERROR ,
" [xdslb %p] No valid LB addresses channel arg in update, ignoring. " ,
this ) ;
return ;
}
void XdsLb : : ProcessAddressesAndChannelArgsLocked (
const ServerAddressList & addresses , const grpc_channel_args & args ) {
// Update fallback address list.
fallback_backend_addresses_ = ExtractBackendAddresses ( * addresses ) ;
fallback_backend_addresses_ = ExtractBackendAddresses ( addresses ) ;
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char * args_to_remove [ ] = { GRPC_ARG_LB_POLICY_NAME } ;
@ -1310,26 +1243,26 @@ void XdsLb::ParseLbConfig(Config* xds_config) {
}
}
void XdsLb : : UpdateLocked ( const grpc_channel_args & args ,
RefCountedPtr < Config > lb_config ) {
void XdsLb : : UpdateLocked ( UpdateArgs args ) {
const bool is_initial_update = lb_chand_ = = nullptr ;
ParseLbConfig ( lb_ config. get ( ) ) ;
ParseLbConfig ( args . config . get ( ) ) ;
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if ( balancer_name_ = = nullptr ) {
gpr_log ( GPR_ERROR , " [xdslb %p] LB config parsing fails. " , this ) ;
return ;
}
ProcessChannelArgsLocked ( args ) ;
ProcessAddressesAnd ChannelArgsLocked ( args . addresses , * args . args ) ;
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
// have been created from a serverlist.
// TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS.
if ( child_policy_ ! = nullptr ) CreateOrUpdateChildPolicyLocked ( ) ;
locality_map_ . UpdateLocked ( locality_serverlist_ , child_policy_config_ . get ( ) ,
args_ , this ) ;
// If this is the initial update, start the fallback timer.
if ( is_initial_update ) {
if ( lb_fallback_timeout_ms_ > 0 & & serverlist_ = = nullptr & &
if ( lb_fallback_timeout_ms_ > 0 & & locality_serverlist_ . empty ( ) & &
! fallback_timer_callback_pending_ ) {
grpc_millis deadline = ExecCtx : : Get ( ) - > Now ( ) + lb_fallback_timeout_ms_ ;
Ref ( DEBUG_LOCATION , " on_fallback_timer " ) . release ( ) ; // Held by closure
@ -1353,8 +1286,8 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy - > fallback_timer_callback_pending_ = false ;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if ( xdslb_policy - > serverlist_ = = nullptr & & ! xdslb_policy - > shutting_down_ & &
error = = GRPC_ERROR_NONE ) {
if ( xdslb_policy - > locality_serverlist_ . empty ( ) & &
! xdslb_policy - > shutting_down_ & & error = = GRPC_ERROR_NONE ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Fallback timer fired. Not using fallback backends " ,
@ -1364,22 +1297,71 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_fallback_timer " ) ;
}
//
// code for interacting with the child policy
//
void XdsLb : : LocalityMap : : PruneLocalities ( const LocalityList & locality_list ) {
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; ) {
bool found = false ;
for ( size_t i = 0 ; i < locality_list . size ( ) ; i + + ) {
if ( ! gpr_stricmp ( locality_list [ i ] - > locality_name , iter - > first . get ( ) ) ) {
found = true ;
}
}
if ( ! found ) { // Remove entries not present in the locality list
MutexLock lock ( & child_refs_mu_ ) ;
iter = map_ . erase ( iter ) ;
} else
iter + + ;
}
}
grpc_channel_args * XdsLb : : CreateChildPolicyArgsLocked ( ) {
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
GPR_ASSERT ( serverlist_ ! = nullptr ) ;
GPR_ASSERT ( serverlist_ - > num_servers > 0 ) ;
UniquePtr < ServerAddressList > addresses = ProcessServerlist ( serverlist_ ) ;
GPR_ASSERT ( addresses ! = nullptr ) ;
// Replace the server address list in the channel args that we pass down to
// the subchannel.
static const char * keys_to_remove [ ] = { GRPC_ARG_SERVER_ADDRESS_LIST } ;
void XdsLb : : LocalityMap : : UpdateLocked (
const LocalityList & locality_serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
const grpc_channel_args * args , XdsLb * parent ) {
if ( parent - > shutting_down_ ) return ;
for ( size_t i = 0 ; i < locality_serverlist . size ( ) ; i + + ) {
UniquePtr < char > locality_name (
gpr_strdup ( locality_serverlist [ i ] - > locality_name ) ) ;
auto iter = map_ . find ( locality_name ) ;
if ( iter = = map_ . end ( ) ) {
OrphanablePtr < LocalityEntry > new_entry =
MakeOrphanable < LocalityEntry > ( parent - > Ref ( ) ) ;
MutexLock lock ( & child_refs_mu_ ) ;
iter = map_ . emplace ( std : : move ( locality_name ) , std : : move ( new_entry ) ) . first ;
}
// Don't create new child policies if not directed to
xds_grpclb_serverlist * serverlist =
parent - > locality_serverlist_ [ i ] - > serverlist ;
iter - > second - > UpdateLocked ( serverlist , child_policy_config , args ) ;
}
PruneLocalities ( locality_serverlist ) ;
}
void grpc_core : : XdsLb : : LocalityMap : : ShutdownLocked ( ) {
MutexLock lock ( & child_refs_mu_ ) ;
map_ . clear ( ) ;
}
void grpc_core : : XdsLb : : LocalityMap : : ResetBackoffLocked ( ) {
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; iter + + ) {
iter - > second - > ResetBackoffLocked ( ) ;
}
}
void grpc_core : : XdsLb : : LocalityMap : : FillChildRefsForChannelz (
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
MutexLock lock ( & child_refs_mu_ ) ;
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; iter + + ) {
iter - > second - > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
}
}
// Locality Entry child policy methods
grpc_channel_args *
XdsLb : : LocalityMap : : LocalityEntry : : CreateChildPolicyArgsLocked (
const grpc_channel_args * args_in ) {
const grpc_arg args_to_add [ ] = {
CreateServerAddressListChannelArg ( addresses . get ( ) ) ,
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create (
@ -1390,16 +1372,16 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_INHIBIT_HEALTH_CHECKING ) , 1 ) ,
} ;
return grpc_channel_args_copy_and_add_and_remove (
args_ , keys_to_remove , GPR_ARRAY_SIZE ( keys_to_remove ) , args_to_add ,
GPR_ARRAY_SIZE ( args_to_add ) ) ;
return grpc_channel_args_copy_and_add ( args_in , args_to_add ,
GPR_ARRAY_SIZE ( args_to_add ) ) ;
}
OrphanablePtr < LoadBalancingPolicy > XdsLb : : CreateChildPolicyLocked (
OrphanablePtr < LoadBalancingPolicy >
XdsLb : : LocalityMap : : LocalityEntry : : CreateChildPolicyLocked (
const char * name , const grpc_channel_args * args ) {
Helper * helper = New < Helper > ( Ref ( ) ) ;
Helper * helper = New < Helper > ( this - > Ref ( ) ) ;
LoadBalancingPolicy : : Args lb_policy_args ;
lb_policy_args . combiner = combiner ( ) ;
lb_policy_args . combiner = parent_ - > combiner ( ) ;
lb_policy_args . args = args ;
lb_policy_args . channel_control_helper =
UniquePtr < ChannelControlHelper > ( helper ) ;
@ -1420,14 +1402,27 @@ OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked(
// child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set ( lb_policy - > interested_parties ( ) ,
interested_parties ( ) ) ;
parent_ - > interested_parties ( ) ) ;
return lb_policy ;
}
void XdsLb : : CreateOrUpdateChildPolicyLocked ( ) {
if ( shutting_down_ ) return ;
grpc_channel_args * args = CreateChildPolicyArgsLocked ( ) ;
GPR_ASSERT ( args ! = nullptr ) ;
void XdsLb : : LocalityMap : : LocalityEntry : : UpdateLocked (
xds_grpclb_serverlist * serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
const grpc_channel_args * args_in ) {
if ( parent_ - > shutting_down_ ) return ;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
// TODO(juanlishen): Change this as part of implementing fallback mode.
GPR_ASSERT ( serverlist ! = nullptr ) ;
GPR_ASSERT ( serverlist - > num_servers > 0 ) ;
// Construct update args.
UpdateArgs update_args ;
update_args . addresses = ProcessServerlist ( serverlist ) ;
update_args . config =
child_policy_config = = nullptr ? nullptr : child_policy_config - > Ref ( ) ;
update_args . args = CreateChildPolicyArgsLocked ( args_in ) ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -1479,9 +1474,9 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
const char * child_policy_name = child_policy_config_ = = nullptr
const char * child_policy_name = child_policy_config = = nullptr
? " round_robin "
: child_policy_config_ - > name ( ) ;
: child_policy_config - > name ( ) ;
const bool create_policy =
// case 1
child_policy_ = = nullptr | |
@ -1500,7 +1495,8 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
gpr_log ( GPR_INFO , " [xdslb %p] Creating new %schild policy %s " , this ,
child_policy_ = = nullptr ? " " : " pending " , child_policy_name ) ;
}
auto new_policy = CreateChildPolicyLocked ( child_policy_name , args ) ;
auto new_policy =
CreateChildPolicyLocked ( child_policy_name , update_args . args ) ;
auto & lb_policy =
child_policy_ = = nullptr ? child_policy_ : pending_child_policy_ ;
{
@ -1523,9 +1519,146 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
policy_to_update = = pending_child_policy_ . get ( ) ? " pending " : " " ,
policy_to_update ) ;
}
policy_to_update - > UpdateLocked ( * args , child_policy_config_ ) ;
// Clean up.
grpc_channel_args_destroy ( args ) ;
policy_to_update - > UpdateLocked ( std : : move ( update_args ) ) ;
}
void XdsLb : : LocalityMap : : LocalityEntry : : ShutdownLocked ( ) {
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set ( child_policy_ - > interested_parties ( ) ,
parent_ - > interested_parties ( ) ) ;
if ( pending_child_policy_ ! = nullptr ) {
grpc_pollset_set_del_pollset_set (
pending_child_policy_ - > interested_parties ( ) ,
parent_ - > interested_parties ( ) ) ;
}
{
MutexLock lock ( & child_policy_mu_ ) ;
child_policy_ . reset ( ) ;
pending_child_policy_ . reset ( ) ;
}
}
void XdsLb : : LocalityMap : : LocalityEntry : : ResetBackoffLocked ( ) {
child_policy_ - > ResetBackoffLocked ( ) ;
if ( pending_child_policy_ ! = nullptr ) {
pending_child_policy_ - > ResetBackoffLocked ( ) ;
}
}
void XdsLb : : LocalityMap : : LocalityEntry : : FillChildRefsForChannelz (
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
MutexLock lock ( & child_policy_mu_ ) ;
child_policy_ - > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
if ( pending_child_policy_ ! = nullptr ) {
pending_child_policy_ - > FillChildRefsForChannelz ( child_subchannels ,
child_channels ) ;
}
}
void XdsLb : : LocalityMap : : LocalityEntry : : Orphan ( ) {
ShutdownLocked ( ) ;
Unref ( ) ;
}
//
// LocalityEntry::Helper implementation
//
bool XdsLb : : LocalityMap : : LocalityEntry : : Helper : : CalledByPendingChild ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = entry_ - > pending_child_policy_ . get ( ) ;
}
bool XdsLb : : LocalityMap : : LocalityEntry : : Helper : : CalledByCurrentChild ( ) const {
GPR_ASSERT ( child_ ! = nullptr ) ;
return child_ = = entry_ - > child_policy_ . get ( ) ;
}
Subchannel * XdsLb : : LocalityMap : : LocalityEntry : : Helper : : CreateSubchannel (
const grpc_channel_args & args ) {
if ( entry_ - > parent_ - > shutting_down_ | |
( ! CalledByPendingChild ( ) & & ! CalledByCurrentChild ( ) ) ) {
return nullptr ;
}
return entry_ - > parent_ - > channel_control_helper ( ) - > CreateSubchannel ( args ) ;
}
grpc_channel * XdsLb : : LocalityMap : : LocalityEntry : : Helper : : CreateChannel (
const char * target , const grpc_channel_args & args ) {
if ( entry_ - > parent_ - > shutting_down_ | |
( ! CalledByPendingChild ( ) & & ! CalledByCurrentChild ( ) ) ) {
return nullptr ;
}
return entry_ - > parent_ - > channel_control_helper ( ) - > CreateChannel ( target , args ) ;
}
void XdsLb : : LocalityMap : : LocalityEntry : : Helper : : UpdateState (
grpc_connectivity_state state , grpc_error * state_error ,
UniquePtr < SubchannelPicker > picker ) {
if ( entry_ - > parent_ - > shutting_down_ ) {
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if ( CalledByPendingChild ( ) ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p helper %p] pending child policy %p reports state=%s " ,
entry_ - > parent_ . get ( ) , this , entry_ - > pending_child_policy_ . get ( ) ,
grpc_connectivity_state_name ( state ) ) ;
}
if ( state ! = GRPC_CHANNEL_READY ) {
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
grpc_pollset_set_del_pollset_set (
entry_ - > child_policy_ - > interested_parties ( ) ,
entry_ - > parent_ - > interested_parties ( ) ) ;
MutexLock lock ( & entry_ - > child_policy_mu_ ) ;
entry_ - > child_policy_ = std : : move ( entry_ - > pending_child_policy_ ) ;
} else if ( ! CalledByCurrentChild ( ) ) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF ( state_error ) ;
return ;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
GPR_ASSERT ( entry_ - > parent_ - > lb_chand_ ! = nullptr ) ;
RefCountedPtr < XdsLbClientStats > client_stats =
entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
? nullptr
: entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) - > client_stats ( ) ;
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
state , state_error ,
UniquePtr < SubchannelPicker > (
New < Picker > ( std : : move ( picker ) , std : : move ( client_stats ) ) ) ) ;
}
void XdsLb : : LocalityMap : : LocalityEntry : : Helper : : RequestReresolution ( ) {
if ( entry_ - > parent_ - > shutting_down_ ) return ;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if ( entry_ - > pending_child_policy_ ! = nullptr & & ! CalledByPendingChild ( ) ) {
return ;
}
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Re-resolution requested from the internal RR policy "
" (%p). " ,
entry_ - > parent_ . get ( ) , entry_ - > child_policy_ . get ( ) ) ;
}
GPR_ASSERT ( entry_ - > parent_ - > lb_chand_ ! = nullptr ) ;
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if ( entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr | |
! entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) - > seen_initial_response ( ) ) {
entry_ - > parent_ - > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
}
//