@ -77,14 +77,9 @@ constexpr char kXds[] = "xds_experimental";
class ParsedXdsConfig : public LoadBalancingPolicy : : Config {
public :
ParsedXdsConfig ( RefCountedPtr < LoadBalancingPolicy : : Config > child_policy ,
RefCountedPtr < LoadBalancingPolicy : : Config > fallback_policy ,
UniquePtr < char > eds_service_name ,
UniquePtr < char > lrs_load_reporting_server_name )
RefCountedPtr < LoadBalancingPolicy : : Config > fallback_policy )
: child_policy_ ( std : : move ( child_policy ) ) ,
fallback_policy_ ( std : : move ( fallback_policy ) ) ,
eds_service_name_ ( std : : move ( eds_service_name ) ) ,
lrs_load_reporting_server_name_ (
std : : move ( lrs_load_reporting_server_name ) ) { }
fallback_policy_ ( std : : move ( fallback_policy ) ) { }
const char * name ( ) const override { return kXds ; }
@ -96,17 +91,9 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config {
return fallback_policy_ ;
}
const char * eds_service_name ( ) const { return eds_service_name_ . get ( ) ; } ;
const char * lrs_load_reporting_server_name ( ) const {
return lrs_load_reporting_server_name_ . get ( ) ;
} ;
private :
RefCountedPtr < LoadBalancingPolicy : : Config > child_policy_ ;
RefCountedPtr < LoadBalancingPolicy : : Config > fallback_policy_ ;
UniquePtr < char > eds_service_name_ ;
UniquePtr < char > lrs_load_reporting_server_name_ ;
} ;
class XdsLb : public LoadBalancingPolicy {
@ -124,17 +111,16 @@ class XdsLb : public LoadBalancingPolicy {
// We need this wrapper for the following reasons:
// 1. To process per-locality load reporting.
// 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control
// references to it by the xds picker and the locality.
class Endpoint PickerWrapper : public RefCounted < Endpoint PickerWrapper> {
// references to it by the xds picker and the locality.
class PickerWrapper : public RefCounted < PickerWrapper > {
public :
EndpointPickerWrapper (
UniquePtr < SubchannelPicker > picker ,
RefCountedPtr < XdsClientStats : : LocalityStats > locality_stats )
PickerWrapper ( UniquePtr < SubchannelPicker > picker ,
RefCountedPtr < XdsClientStats : : LocalityStats > locality_stats )
: picker_ ( std : : move ( picker ) ) ,
locality_stats_ ( std : : move ( locality_stats ) ) {
locality_stats_ - > RefByPicker ( ) ;
}
~ Endpoint PickerWrapper( ) { locality_stats_ - > UnrefByPicker ( ) ; }
~ PickerWrapper ( ) { locality_stats_ - > UnrefByPicker ( ) ; }
PickResult Pick ( PickArgs args ) ;
@ -145,16 +131,15 @@ class XdsLb : public LoadBalancingPolicy {
// The picker will use a stateless weighting algorithm to pick the locality to
// use for each request.
class Locality Picker : public SubchannelPicker {
class Picker : public SubchannelPicker {
public :
// Maintains a weighted list of pickers from each locality that is in ready
// state. The first element in the pair represents the end of a range
// proportional to the locality's weight. The start of the range is the
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector < std : : pair < uint32_t , RefCountedPtr < EndpointPickerWrapper > > ,
1 > ;
LocalityPicker ( RefCountedPtr < XdsLb > xds_policy , PickerList pickers )
InlinedVector < std : : pair < uint32_t , RefCountedPtr < PickerWrapper > > , 1 > ;
Picker ( RefCountedPtr < XdsLb > xds_policy , PickerList pickers )
: xds_policy_ ( std : : move ( xds_policy ) ) ,
pickers_ ( std : : move ( pickers ) ) ,
drop_config_ ( xds_policy_ - > drop_config_ ) { }
@ -219,7 +204,7 @@ class XdsLb : public LoadBalancingPolicy {
return connectivity_state_ ;
}
uint32_t weight ( ) const { return weight_ ; }
RefCountedPtr < Endpoint PickerWrapper> picker_wrapper ( ) const {
RefCountedPtr < PickerWrapper > picker_wrapper ( ) const {
return picker_wrapper_ ;
}
@ -271,7 +256,7 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr < XdsLocalityName > name_ ;
OrphanablePtr < LoadBalancingPolicy > child_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_child_policy_ ;
RefCountedPtr < Endpoint PickerWrapper> picker_wrapper_ ;
RefCountedPtr < PickerWrapper > picker_wrapper_ ;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE ;
uint32_t weight_ ;
@ -392,24 +377,16 @@ class XdsLb : public LoadBalancingPolicy {
const char * name , const grpc_channel_args * args ) ;
void MaybeExitFallbackMode ( ) ;
const char * eds_service_name ( ) const {
if ( config_ ! = nullptr & & config_ - > eds_service_name ( ) ! = nullptr ) {
return config_ - > eds_service_name ( ) ;
}
return server_name_ . get ( ) ;
}
XdsClient * xds_client ( ) const {
return xds_client_from_channel_ ! = nullptr ? xds_client_from_channel_ . get ( )
: xds_client_ . get ( ) ;
}
// Server name from target URI .
UniquePtr < char > server_name_ ;
// Name of the backend server to connect to.
const char * server_name_ = nullptr ;
// Current channel args and config from the resolver.
// Current channel args from the resolver.
const grpc_channel_args * args_ = nullptr ;
RefCountedPtr < ParsedXdsConfig > config_ ;
// Internal state.
bool shutting_down_ = false ;
@ -441,10 +418,14 @@ class XdsLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_ ;
grpc_closure lb_on_fallback_ ;
// The policy to use for the fallback backends.
RefCountedPtr < LoadBalancingPolicy : : Config > fallback_policy_config_ ;
// Non-null iff we are in fallback mode.
OrphanablePtr < LoadBalancingPolicy > fallback_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_fallback_policy_ ;
// The policy to use for the backends.
RefCountedPtr < LoadBalancingPolicy : : Config > child_policy_config_ ;
const grpc_millis locality_retention_interval_ms_ ;
const grpc_millis locality_map_failover_timeout_ms_ ;
// A list of locality maps indexed by priority.
@ -460,10 +441,10 @@ class XdsLb : public LoadBalancingPolicy {
} ;
//
// XdsLb::Endpoint PickerWrapper
// XdsLb::PickerWrapper::Pick
//
LoadBalancingPolicy : : PickResult XdsLb : : Endpoint PickerWrapper: : Pick (
LoadBalancingPolicy : : PickResult XdsLb : : PickerWrapper : : Pick (
LoadBalancingPolicy : : PickArgs args ) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_ - > Pick ( args ) ;
@ -479,8 +460,8 @@ LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick(
result . recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// combiner or in the data plane mutex.
[ locality_stats ] ( grpc_error * error , MetadataInterface * metadata ,
CallState * call_state ) {
[ locality_stats ] ( grpc_error * error , MetadataInterface * /*metadata*/ ,
CallState * /*call_state*/ ) {
const bool call_failed = error ! = GRPC_ERROR_NONE ;
locality_stats - > AddCallFinished ( call_failed ) ;
locality_stats - > Unref ( DEBUG_LOCATION , " LocalityStats+call " ) ;
@ -489,10 +470,10 @@ LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick(
}
//
// XdsLb::Locality Picker
// XdsLb::Picker
//
XdsLb : : PickResult XdsLb : : Locality Picker: : Pick ( PickArgs args ) {
XdsLb : : PickResult XdsLb : : Picker : : Pick ( PickArgs args ) {
// Handle drop.
const UniquePtr < char > * drop_category ;
if ( drop_config_ - > ShouldDrop ( & drop_category ) ) {
@ -508,8 +489,8 @@ XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) {
return PickFromLocality ( key , args ) ;
}
XdsLb : : PickResult XdsLb : : Locality Picker: : PickFromLocality ( const uint32_t key ,
PickArgs args ) {
XdsLb : : PickResult XdsLb : : Picker : : PickFromLocality ( const uint32_t key ,
PickArgs args ) {
size_t mid = 0 ;
size_t start_index = 0 ;
size_t end_index = pickers_ . size ( ) - 1 ;
@ -705,11 +686,11 @@ XdsLb::XdsLb(Args args)
GPR_ASSERT ( server_uri ! = nullptr ) ;
grpc_uri * uri = grpc_uri_parse ( server_uri , true ) ;
GPR_ASSERT ( uri - > path [ 0 ] ! = ' \0 ' ) ;
server_name_ . reset (
gpr_strdup ( uri - > path [ 0 ] = = ' / ' ? uri - > path + 1 : uri - > path ) ) ;
server_name_ = gpr_strdup ( uri - > path [ 0 ] = = ' / ' ? uri - > path + 1 : uri - > path ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_xds_trace ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] server name from channel: %s " , this ,
server_name_ . get ( ) ) ;
gpr_log ( GPR_INFO ,
" [xdslb %p] Will use '%s' as the server name for LB request. " , this ,
server_name_ ) ;
}
grpc_uri_destroy ( uri ) ;
}
@ -718,6 +699,7 @@ XdsLb::~XdsLb() {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_xds_trace ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] destroying xds LB policy " , this ) ;
}
gpr_free ( ( void * ) server_name_ ) ;
grpc_channel_args_destroy ( args_ ) ;
}
@ -740,13 +722,9 @@ void XdsLb::ShutdownLocked() {
pending_fallback_policy_ . reset ( ) ;
// Cancel the endpoint watch here instead of in our dtor, because the
// watcher holds a ref to us.
xds_client ( ) - > CancelEndpointDataWatch ( StringView ( eds_service_name ( ) ) ,
xds_client ( ) - > CancelEndpointDataWatch ( StringView ( server_name_ ) ,
endpoint_watcher_ ) ;
if ( config_ - > lrs_load_reporting_server_name ( ) ! = nullptr ) {
xds_client ( ) - > RemoveClientStats (
StringView ( config_ - > lrs_load_reporting_server_name ( ) ) ,
StringView ( eds_service_name ( ) ) , & client_stats_ ) ;
}
xds_client ( ) - > RemoveClientStats ( StringView ( server_name_ ) , & client_stats_ ) ;
xds_client_from_channel_ . reset ( ) ;
xds_client_ . reset ( ) ;
}
@ -775,9 +753,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
}
const bool is_initial_update = args_ = = nullptr ;
// Update config.
const char * old_eds_service_name = eds_service_name ( ) ;
auto old_config = std : : move ( config_ ) ;
config_ = std : : move ( args . config ) ;
auto * xds_config = static_cast < const ParsedXdsConfig * > ( args . config . get ( ) ) ;
child_policy_config_ = xds_config - > child_policy ( ) ;
fallback_policy_ config_ = xds_config - > fallback_policy ( ) ;
// Update fallback address list.
fallback_backend_addresses_ = std : : move ( args . addresses ) ;
// Update args.
@ -794,7 +772,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
if ( xds_client_from_channel_ = = nullptr ) {
grpc_error * error = GRPC_ERROR_NONE ;
xds_client_ = MakeOrphanable < XdsClient > (
combiner ( ) , interested_parties ( ) , StringView ( eds_service_name ( ) ) ,
combiner ( ) , interested_parties ( ) , StringView ( server_name_ ) ,
nullptr /* service config watcher */ , * args_ , & error ) ;
// TODO(roth): If we decide that we care about fallback mode, add
// proper error handling here.
@ -804,6 +782,11 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
xds_client_ . get ( ) ) ;
}
}
auto watcher = MakeUnique < EndpointWatcher > ( Ref ( ) ) ;
endpoint_watcher_ = watcher . get ( ) ;
xds_client ( ) - > WatchEndpointData ( StringView ( server_name_ ) ,
std : : move ( watcher ) ) ;
xds_client ( ) - > AddClientStats ( StringView ( server_name_ ) , & client_stats_ ) ;
// Start fallback-at-startup checks.
grpc_millis deadline = ExecCtx : : Get ( ) - > Now ( ) + lb_fallback_timeout_ms_ ;
Ref ( DEBUG_LOCATION , " on_fallback_timer " ) . release ( ) ; // Held by closure
@ -812,42 +795,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
fallback_at_startup_checks_pending_ = true ;
grpc_timer_init ( & lb_fallback_timer_ , deadline , & lb_on_fallback_ ) ;
}
// Update endpoint watcher if needed.
if ( is_initial_update | |
strcmp ( old_eds_service_name , eds_service_name ( ) ) ! = 0 ) {
if ( ! is_initial_update ) {
xds_client ( ) - > CancelEndpointDataWatch ( StringView ( old_eds_service_name ) ,
endpoint_watcher_ ) ;
}
auto watcher = MakeUnique < EndpointWatcher > ( Ref ( ) ) ;
endpoint_watcher_ = watcher . get ( ) ;
xds_client ( ) - > WatchEndpointData ( StringView ( eds_service_name ( ) ) ,
std : : move ( watcher ) ) ;
}
// Update load reporting if needed.
// TODO(roth): Ideally, we should not collect any stats if load reporting
// is disabled, which would require changing this code to recreate
// all of the pickers whenever load reporting is enabled or disabled
// here.
if ( is_initial_update | |
( config_ - > lrs_load_reporting_server_name ( ) = = nullptr ) ! =
( old_config - > lrs_load_reporting_server_name ( ) = = nullptr ) | |
( config_ - > lrs_load_reporting_server_name ( ) ! = nullptr & &
old_config - > lrs_load_reporting_server_name ( ) ! = nullptr & &
strcmp ( config_ - > lrs_load_reporting_server_name ( ) ,
old_config - > lrs_load_reporting_server_name ( ) ) ! = 0 ) ) {
if ( old_config ! = nullptr & &
old_config - > lrs_load_reporting_server_name ( ) ! = nullptr ) {
xds_client ( ) - > RemoveClientStats (
StringView ( old_config - > lrs_load_reporting_server_name ( ) ) ,
StringView ( old_eds_service_name ) , & client_stats_ ) ;
}
if ( config_ - > lrs_load_reporting_server_name ( ) ! = nullptr ) {
xds_client ( ) - > AddClientStats (
StringView ( config_ - > lrs_load_reporting_server_name ( ) ) ,
StringView ( eds_service_name ( ) ) , & client_stats_ ) ;
}
}
}
//
@ -892,7 +839,9 @@ void XdsLb::UpdateFallbackPolicyLocked() {
// Construct update args.
UpdateArgs update_args ;
update_args . addresses = fallback_backend_addresses_ ;
update_args . config = config_ - > fallback_policy ( ) ;
update_args . config = fallback_policy_config_ = = nullptr
? nullptr
: fallback_policy_config_ - > Ref ( ) ;
update_args . args = grpc_channel_args_copy ( args_ ) ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
@ -943,9 +892,9 @@ void XdsLb::UpdateFallbackPolicyLocked() {
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char * fallback_policy_name = update_args . config = = nullptr
const char * fallback_policy_name = fallback_policy_config_ = = nullptr
? " round_robin "
: update_args . config - > name ( ) ;
: fallback_policy_config_ - > name ( ) ;
const bool create_policy =
// case 1
fallback_policy_ = = nullptr | |
@ -1226,7 +1175,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() {
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all localities.
Locality Picker: : PickerList picker_list ;
Picker : : PickerList picker_list ;
uint32_t end = 0 ;
for ( const auto & p : localities_ ) {
const auto & locality_name = p . first ;
@ -1238,9 +1187,9 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() {
picker_list . push_back ( std : : make_pair ( end , locality - > picker_wrapper ( ) ) ) ;
}
xds_policy ( ) - > channel_control_helper ( ) - > UpdateState (
GRPC_CHANNEL_READY , MakeUnique < LocalityPicker > (
xds_policy_ - > Ref ( DEBUG_LOCATION , " XdsLb+Picker " ) ,
std : : move ( picker_list ) ) ) ;
GRPC_CHANNEL_READY ,
MakeUnique < Picker > ( xds_policy_ - > Ref ( DEBUG_LOCATION , " XdsLb+Picker " ) ,
std : : move ( picker_list ) ) ) ;
}
OrphanablePtr < XdsLb : : PriorityList : : LocalityMap : : Locality >
@ -1541,7 +1490,9 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
// Construct update args.
UpdateArgs update_args ;
update_args . addresses = std : : move ( serverlist ) ;
update_args . config = xds_policy ( ) - > config_ - > child_policy ( ) ;
update_args . config = xds_policy ( ) - > child_policy_config_ = = nullptr
? nullptr
: xds_policy ( ) - > child_policy_config_ - > Ref ( ) ;
update_args . args = CreateChildPolicyArgsLocked ( xds_policy ( ) - > args_ ) ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
@ -1594,9 +1545,10 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
const char * child_policy_name = update_args . config = = nullptr
? " round_robin "
: update_args . config - > name ( ) ;
const char * child_policy_name =
xds_policy ( ) - > child_policy_config_ = = nullptr
? " round_robin "
: xds_policy ( ) - > child_policy_config_ - > name ( ) ;
const bool create_policy =
// case 1
child_policy_ = = nullptr | |
@ -1763,11 +1715,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState(
return ;
}
// Cache the picker and its state in the locality.
// TODO(roth): If load reporting is not configured, we should ideally
// pass a null LocalityStats ref to the EndpointPickerWrapper and have it
// not collect any stats, since they're not going to be used. This would
// require recreating all of the pickers whenever we get a config update.
locality_ - > picker_wrapper_ = MakeRefCounted < EndpointPickerWrapper > (
locality_ - > picker_wrapper_ = MakeRefCounted < PickerWrapper > (
std : : move ( picker ) ,
locality_ - > xds_policy ( ) - > client_stats_ . FindLocalityStats (
locality_ - > name_ ) ) ;
@ -1814,8 +1762,6 @@ class XdsFactory : public LoadBalancingPolicyFactory {
InlinedVector < grpc_error * , 3 > error_list ;
RefCountedPtr < LoadBalancingPolicy : : Config > child_policy ;
RefCountedPtr < LoadBalancingPolicy : : Config > fallback_policy ;
const char * eds_service_name = nullptr ;
const char * lrs_load_reporting_server_name = nullptr ;
for ( const grpc_json * field = json - > child ; field ! = nullptr ;
field = field - > next ) {
if ( field - > key = = nullptr ) continue ;
@ -1843,35 +1789,11 @@ class XdsFactory : public LoadBalancingPolicyFactory {
GPR_DEBUG_ASSERT ( parse_error ! = GRPC_ERROR_NONE ) ;
error_list . push_back ( parse_error ) ;
}
} else if ( strcmp ( field - > key , " edsServiceName " ) = = 0 ) {
if ( eds_service_name ! = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:edsServiceName error:Duplicate entry " ) ) ;
}
if ( field - > type ! = GRPC_JSON_STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:edsServiceName error:type should be string " ) ) ;
continue ;
}
eds_service_name = field - > value ;
} else if ( strcmp ( field - > key , " lrsLoadReportingServerName " ) = = 0 ) {
if ( lrs_load_reporting_server_name ! = nullptr ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:lrsLoadReportingServerName error:Duplicate entry " ) ) ;
}
if ( field - > type ! = GRPC_JSON_STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:lrsLoadReportingServerName error:type should be string " ) ) ;
continue ;
}
lrs_load_reporting_server_name = field - > value ;
}
}
if ( error_list . empty ( ) ) {
return MakeRefCounted < ParsedXdsConfig > (
std : : move ( child_policy ) , std : : move ( fallback_policy ) ,
UniquePtr < char > ( gpr_strdup ( eds_service_name ) ) ,
UniquePtr < char > ( gpr_strdup ( lrs_load_reporting_server_name ) ) ) ;
return RefCountedPtr < LoadBalancingPolicy : : Config > ( New < ParsedXdsConfig > (
std : : move ( child_policy ) , std : : move ( fallback_policy ) ) ) ;
} else {
* error = GRPC_ERROR_CREATE_FROM_VECTOR ( " Xds Parser " , & error_list ) ;
return nullptr ;