@ -16,6 +16,8 @@
# include <grpc/support/port_platform.h>
# include "absl/strings/string_view.h"
# include <grpc/grpc.h>
# include "src/core/ext/filters/client_channel/lb_policy.h"
@ -25,60 +27,61 @@
# include "src/core/ext/xds/xds_client.h"
# include "src/core/ext/xds/xds_client_stats.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gprpp/orphanable.h"
# include "src/core/lib/gprpp/ref_counted_ptr.h"
# include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
TraceFlag grpc_lb_lrs _trace ( false , " lrs _lb" ) ;
TraceFlag grpc_eds_drop_lb _trace ( false , " eds_drop _lb" ) ;
namespace {
constexpr char kLrs [ ] = " lrs _experimental" ;
constexpr char kEdsDrop [ ] = " eds_drop _experimental" ;
// Config for LRS LB policy.
class Lrs LbConfig : public LoadBalancingPolicy : : Config {
// Config for EDS drop LB policy.
class EdsDrop LbConfig : public LoadBalancingPolicy : : Config {
public :
Lrs LbConfig( RefCountedPtr < LoadBalancingPolicy : : Config > child_policy ,
std : : string cluster_name , std : : string eds_service_name ,
std : : string lrs_load_reporting_server_name ,
RefCountedPtr < XdsLocalityName > locality_name )
EdsDrop LbConfig( RefCountedPtr < LoadBalancingPolicy : : Config > child_policy ,
std : : string cluster_name , std : : string eds_service_name ,
absl : : optional < std : : string > lrs_load_reporting_server_name ,
RefCountedPtr < XdsApi : : EdsUpdate : : DropConfig > drop_config )
: child_policy_ ( std : : move ( child_policy ) ) ,
cluster_name_ ( std : : move ( cluster_name ) ) ,
eds_service_name_ ( std : : move ( eds_service_name ) ) ,
lrs_load_reporting_server_name_ (
std : : move ( lrs_load_reporting_server_name ) ) ,
locality_name _( std : : move ( locality_name ) ) { }
drop_config _( std : : move ( drop_config ) ) { }
const char * name ( ) const override { return kLrs ; }
const char * name ( ) const override { return kEdsDrop ; }
RefCountedPtr < LoadBalancingPolicy : : Config > child_policy ( ) const {
return child_policy_ ;
}
const std : : string & cluster_name ( ) const { return cluster_name_ ; }
const std : : string & eds_service_name ( ) const { return eds_service_name_ ; }
const std : : string & lrs_load_reporting_server_name ( ) const {
const absl : : optional < std : : string > & lrs_load_reporting_server_name ( ) const {
return lrs_load_reporting_server_name_ ;
} ;
RefCountedPtr < XdsLocalityName > locality_name ( ) const {
return locality_name _;
RefCountedPtr < XdsApi : : EdsUpdate : : DropConfig > drop_config ( ) const {
return drop_config _;
}
private :
RefCountedPtr < LoadBalancingPolicy : : Config > child_policy_ ;
std : : string cluster_name_ ;
std : : string eds_service_name_ ;
std : : string lrs_load_reporting_server_name_ ;
RefCountedPtr < XdsLocalityName > locality_name _ ;
absl : : optional < std : : string > lrs_load_reporting_server_name_ ;
RefCountedPtr < XdsApi : : EdsUpdate : : DropConfig > drop_config _ ;
} ;
// LRS LB policy.
class Lrs Lb : public LoadBalancingPolicy {
// EDS Drop LB policy.
class EdsDrop Lb : public LoadBalancingPolicy {
public :
Lrs Lb( RefCountedPtr < XdsClient > xds_client , Args args ) ;
EdsDrop Lb( RefCountedPtr < XdsClient > xds_client , Args args ) ;
const char * name ( ) const override { return kLrs ; }
const char * name ( ) const override { return kEdsDrop ; }
void UpdateLocked ( UpdateArgs args ) override ;
void ExitIdleLocked ( ) override ;
@ -96,27 +99,28 @@ class LrsLb : public LoadBalancingPolicy {
std : : unique_ptr < SubchannelPicker > picker_ ;
} ;
// A picker that wraps the picker from the child to perform load reporting .
class LoadReporting Picker : public SubchannelPicker {
// A picker that wraps the picker from the child to perform drops .
class Drop Picker : public SubchannelPicker {
public :
LoadReportingPicker ( RefCountedPtr < RefCountedPicker > picker ,
RefCountedPtr < XdsClusterLocalityStats > locality_stats )
: picker_ ( std : : move ( picker ) ) ,
locality_stats _( std : : move ( locality_stats ) ) { }
DropPicker ( EdsDropLb * eds_drop_lb , RefCountedPtr < RefCountedPicker > picker )
: drop_config_ ( eds_drop_lb - > config_ - > drop_config ( ) ) ,
drop_stats_ ( eds_drop_lb - > drop_stats_ ) ,
picker _( std : : move ( picker ) ) { }
PickResult Pick ( PickArgs args ) ;
private :
RefCountedPtr < XdsApi : : EdsUpdate : : DropConfig > drop_config_ ;
RefCountedPtr < XdsClusterDropStats > drop_stats_ ;
RefCountedPtr < RefCountedPicker > picker_ ;
RefCountedPtr < XdsClusterLocalityStats > locality_stats_ ;
} ;
class Helper : public ChannelControlHelper {
public :
explicit Helper ( RefCountedPtr < LrsLb > lrs _policy)
: lrs _policy_( std : : move ( lrs _policy) ) { }
explicit Helper ( RefCountedPtr < EdsDropLb > eds_drop _policy)
: eds_drop _policy_( std : : move ( eds_drop _policy) ) { }
~ Helper ( ) { lrs _policy_. reset ( DEBUG_LOCATION , " Helper " ) ; }
~ Helper ( ) { eds_drop _policy_. reset ( DEBUG_LOCATION , " Helper " ) ; }
RefCountedPtr < SubchannelInterface > CreateSubchannel (
ServerAddress address , const grpc_channel_args & args ) override ;
@ -127,10 +131,10 @@ class LrsLb : public LoadBalancingPolicy {
absl : : string_view message ) override ;
private :
RefCountedPtr < LrsLb > lrs _policy_;
RefCountedPtr < EdsDropLb > eds_drop _policy_;
} ;
~ Lrs Lb( ) ;
~ EdsDrop Lb( ) ;
void ShutdownLocked ( ) override ;
@ -142,7 +146,7 @@ class LrsLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked ( ) ;
// Current config from the resolver.
RefCountedPtr < Lrs LbConfig> config_ ;
RefCountedPtr < EdsDrop LbConfig> config_ ;
// Internal state.
bool shutting_down_ = false ;
@ -151,7 +155,7 @@ class LrsLb : public LoadBalancingPolicy {
RefCountedPtr < XdsClient > xds_client_ ;
// The stats for client-side load reporting.
RefCountedPtr < XdsClusterLocalityStats > locality _stats_ ;
RefCountedPtr < XdsClusterDropStats > drop _stats_ ;
OrphanablePtr < LoadBalancingPolicy > child_policy_ ;
@ -162,54 +166,54 @@ class LrsLb : public LoadBalancingPolicy {
} ;
//
// LrsLb::LoadReporting Picker
// EdsDropLb::Drop Picker
//
LoadBalancingPolicy : : PickResult LrsLb : : LoadReporting Picker: : Pick (
LoadBalancingPolicy : : PickResult EdsDropLb : : Drop Picker: : Pick (
LoadBalancingPolicy : : PickArgs args ) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_ - > Pick ( args ) ;
if ( result . type = = PickResult : : PICK_COMPLETE & &
result . subchannel ! = nullptr ) {
// Record a call started.
locality_stats_ - > AddCallStarted ( ) ;
// Intercept the recv_trailing_metadata op to record call completion.
XdsClusterLocalityStats * locality_stats =
locality_stats_ - > Ref ( DEBUG_LOCATION , " LocalityStats+call " ) . release ( ) ;
result . recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[ locality_stats ] ( grpc_error * error , MetadataInterface * /*metadata*/ ,
CallState * /*call_state*/ ) {
const bool call_failed = error ! = GRPC_ERROR_NONE ;
locality_stats - > AddCallFinished ( call_failed ) ;
locality_stats - > Unref ( DEBUG_LOCATION , " LocalityStats+call " ) ;
} ;
// Handle drop.
const std : : string * drop_category ;
if ( drop_config_ - > ShouldDrop ( & drop_category ) ) {
if ( drop_stats_ ! = nullptr ) drop_stats_ - > AddCallDropped ( * drop_category ) ;
PickResult result ;
result . type = PickResult : : PICK_COMPLETE ;
return result ;
}
// If we're not dropping the call, we should always have a child picker.
if ( picker_ = = nullptr ) { // Should never happen.
PickResult result ;
result . type = PickResult : : PICK_FAILED ;
result . error =
grpc_error_set_int ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" eds_drop picker not given any child picker " ) ,
GRPC_ERROR_INT_GRPC_STATUS , GRPC_STATUS_INTERNAL ) ;
return result ;
}
return result ;
// Not dropping, so delegate to child picker.
return picker_ - > Pick ( args ) ;
}
//
// Lrs Lb
// EdsDrop Lb
//
LrsLb : : Lrs Lb( RefCountedPtr < XdsClient > xds_client , Args args )
EdsDropLb : : EdsDrop Lb( RefCountedPtr < XdsClient > xds_client , Args args )
: LoadBalancingPolicy ( std : : move ( args ) ) , xds_client_ ( std : : move ( xds_client ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] created -- using xds client %p " , this ,
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] created -- using xds client %p " , this ,
xds_client_ . get ( ) ) ;
}
}
LrsLb : : ~ Lrs Lb( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] destroying xds LB policy " , this ) ;
EdsDropLb : : ~ EdsDrop Lb( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] destroying xds LB policy " , this ) ;
}
}
void Lrs Lb: : ShutdownLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] shutting down " , this ) ;
void EdsDrop Lb: : ShutdownLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] shutting down " , this ) ;
}
shutting_down_ = true ;
// Remove the child policy's interested_parties pollset_set from the
@ -222,23 +226,23 @@ void LrsLb::ShutdownLocked() {
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_ . reset ( ) ;
locality _stats_. reset ( ) ;
drop _stats_. reset ( ) ;
xds_client_ . reset ( ) ;
}
void Lrs Lb: : ExitIdleLocked ( ) {
void EdsDrop Lb: : ExitIdleLocked ( ) {
if ( child_policy_ ! = nullptr ) child_policy_ - > ExitIdleLocked ( ) ;
}
void Lrs Lb: : ResetBackoffLocked ( ) {
void EdsDrop Lb: : ResetBackoffLocked ( ) {
// The XdsClient will have its backoff reset by the xds resolver, so we
// don't need to do it here.
if ( child_policy_ ! = nullptr ) child_policy_ - > ResetBackoffLocked ( ) ;
}
void Lrs Lb: : UpdateLocked ( UpdateArgs args ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] Received update " , this ) ;
void EdsDrop Lb: : UpdateLocked ( UpdateArgs args ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] Received update " , this ) ;
}
// Update config.
auto old_config = std : : move ( config_ ) ;
@ -248,11 +252,13 @@ void LrsLb::UpdateLocked(UpdateArgs args) {
config_ - > lrs_load_reporting_server_name ( ) ! =
old_config - > lrs_load_reporting_server_name ( ) | |
config_ - > cluster_name ( ) ! = old_config - > cluster_name ( ) | |
config_ - > eds_service_name ( ) ! = old_config - > eds_service_name ( ) | |
* config_ - > locality_name ( ) ! = * old_config - > locality_name ( ) ) {
locality_stats_ = xds_client_ - > AddClusterLocalityStats (
config_ - > lrs_load_reporting_server_name ( ) , config_ - > cluster_name ( ) ,
config_ - > eds_service_name ( ) , config_ - > locality_name ( ) ) ;
config_ - > eds_service_name ( ) ! = old_config - > eds_service_name ( ) ) {
drop_stats_ . reset ( ) ;
if ( config_ - > lrs_load_reporting_server_name ( ) . has_value ( ) ) {
drop_stats_ = xds_client_ - > AddClusterDropStats (
config_ - > lrs_load_reporting_server_name ( ) . value ( ) ,
config_ - > cluster_name ( ) , config_ - > eds_service_name ( ) ) ;
}
MaybeUpdatePickerLocked ( ) ;
}
// Update child policy.
@ -260,23 +266,37 @@ void LrsLb::UpdateLocked(UpdateArgs args) {
args . args = nullptr ;
}
void LrsLb : : MaybeUpdatePickerLocked ( ) {
void EdsDropLb : : MaybeUpdatePickerLocked ( ) {
// If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported.
if ( config_ - > drop_config ( ) ! = nullptr & & config_ - > drop_config ( ) - > drop_all ( ) ) {
auto drop_picker = absl : : make_unique < DropPicker > ( this , picker_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb_trace ) ) {
gpr_log ( GPR_INFO ,
" [eds_drop_lb %p] updating connectivity (drop all): state=READY "
" picker=%p " ,
this , drop_picker . get ( ) ) ;
}
channel_control_helper ( ) - > UpdateState ( GRPC_CHANNEL_READY , absl : : Status ( ) ,
std : : move ( drop_picker ) ) ;
return ;
}
// Otherwise, update only if we have a child picker.
if ( picker_ ! = nullptr ) {
auto lrs_picker =
absl : : make_unique < LoadReportingPicker > ( picker_ , locality_stats_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs_trace ) ) {
gpr_log (
GPR_INFO ,
" [lrs_lb %p] updating connectivity: state=%s status=(%s) picker=%p " ,
this , ConnectivityStateName ( state_ ) , status_ . ToString ( ) . c_str ( ) ,
lrs_picker . get ( ) ) ;
auto drop_picker = absl : : make_unique < DropPicker > ( this , picker_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb_trace ) ) {
gpr_log ( GPR_INFO ,
" [eds_drop_lb %p] updating connectivity: state=%s status=(%s) "
" picker=%p " ,
this , ConnectivityStateName ( state_ ) , status_ . ToString ( ) . c_str ( ) ,
drop_picker . get ( ) ) ;
}
channel_control_helper ( ) - > UpdateState ( state_ , status_ ,
std : : move ( lrs_picker ) ) ;
std : : move ( drop _picker) ) ;
}
}
OrphanablePtr < LoadBalancingPolicy > Lrs Lb: : CreateChildPolicyLocked (
OrphanablePtr < LoadBalancingPolicy > EdsDrop Lb: : CreateChildPolicyLocked (
const grpc_channel_args * args ) {
LoadBalancingPolicy : : Args lb_policy_args ;
lb_policy_args . work_serializer = work_serializer ( ) ;
@ -285,10 +305,10 @@ OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked(
absl : : make_unique < Helper > ( Ref ( DEBUG_LOCATION , " Helper " ) ) ;
OrphanablePtr < LoadBalancingPolicy > lb_policy =
MakeOrphanable < ChildPolicyHandler > ( std : : move ( lb_policy_args ) ,
& grpc_lb_lrs _trace ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] Created new child policy handler %p " , this ,
lb_policy . get ( ) ) ;
& grpc_eds_drop_lb _trace ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] Created new child policy handler %p " ,
this , lb_policy . get ( ) ) ;
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
@ -298,8 +318,8 @@ OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked(
return lb_policy ;
}
void Lrs Lb: : UpdateChildPolicyLocked ( ServerAddressList addresses ,
const grpc_channel_args * args ) {
void EdsDrop Lb: : UpdateChildPolicyLocked ( ServerAddressList addresses ,
const grpc_channel_args * args ) {
// Create policy if needed.
if ( child_policy_ = = nullptr ) {
child_policy_ = CreateChildPolicyLocked ( args ) ;
@ -310,59 +330,60 @@ void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses,
update_args . config = config_ - > child_policy ( ) ;
update_args . args = args ;
// Update the policy.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log ( GPR_INFO , " [lrs _lb %p] Updating child policy handler %p " , this ,
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO , " [eds_drop _lb %p] Updating child policy handler %p " , this ,
child_policy_ . get ( ) ) ;
}
child_policy_ - > UpdateLocked ( std : : move ( update_args ) ) ;
}
//
// Lrs Lb::Helper
// EdsDrop Lb::Helper
//
RefCountedPtr < SubchannelInterface > Lrs Lb: : Helper : : CreateSubchannel (
RefCountedPtr < SubchannelInterface > EdsDrop Lb: : Helper : : CreateSubchannel (
ServerAddress address , const grpc_channel_args & args ) {
if ( lrs _policy_- > shutting_down_ ) return nullptr ;
return lrs _policy_- > channel_control_helper ( ) - > CreateSubchannel (
if ( eds_drop _policy_- > shutting_down_ ) return nullptr ;
return eds_drop _policy_- > channel_control_helper ( ) - > CreateSubchannel (
std : : move ( address ) , args ) ;
}
void Lrs Lb: : Helper : : UpdateState ( grpc_connectivity_state state ,
const absl : : Status & status ,
std : : unique_ptr < SubchannelPicker > picker ) {
if ( lrs _policy_- > shutting_down_ ) return ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_lb_lrs _trace ) ) {
gpr_log (
GPR_INFO ,
" [lrs_lb %p] child connectivity state update: state=%s (%s) picker=%p" ,
lrs _policy_. get ( ) , ConnectivityStateName ( state ) ,
status . ToString ( ) . c_str ( ) , picker . get ( ) ) ;
void EdsDrop Lb: : Helper : : UpdateState ( grpc_connectivity_state state ,
const absl : : Status & status ,
std : : unique_ptr < SubchannelPicker > picker ) {
if ( eds_drop _policy_- > shutting_down_ ) return ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_eds_drop_lb _trace ) ) {
gpr_log ( GPR_INFO ,
" [eds_drop_lb %p] child connectivity state update: state=%s (%s) "
" picker=%p " ,
eds_drop _policy_ . get ( ) , ConnectivityStateName ( state ) ,
status . ToString ( ) . c_str ( ) , picker . get ( ) ) ;
}
// Save the state and picker.
lrs_policy_ - > state_ = state ;
lrs_policy_ - > status_ = status ;
lrs_policy_ - > picker_ = MakeRefCounted < RefCountedPicker > ( std : : move ( picker ) ) ;
eds_drop_policy_ - > state_ = state ;
eds_drop_policy_ - > status_ = status ;
eds_drop_policy_ - > picker_ =
MakeRefCounted < RefCountedPicker > ( std : : move ( picker ) ) ;
// Wrap the picker and return it to the channel.
lrs _policy_- > MaybeUpdatePickerLocked ( ) ;
eds_drop _policy_- > MaybeUpdatePickerLocked ( ) ;
}
void Lrs Lb: : Helper : : RequestReresolution ( ) {
if ( lrs _policy_- > shutting_down_ ) return ;
lrs _policy_- > channel_control_helper ( ) - > RequestReresolution ( ) ;
void EdsDrop Lb: : Helper : : RequestReresolution ( ) {
if ( eds_drop _policy_- > shutting_down_ ) return ;
eds_drop _policy_- > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
void Lrs Lb: : Helper : : AddTraceEvent ( TraceSeverity severity ,
absl : : string_view message ) {
if ( lrs _policy_- > shutting_down_ ) return ;
lrs _policy_- > channel_control_helper ( ) - > AddTraceEvent ( severity , message ) ;
void EdsDrop Lb: : Helper : : AddTraceEvent ( TraceSeverity severity ,
absl : : string_view message ) {
if ( eds_drop _policy_- > shutting_down_ ) return ;
eds_drop _policy_- > channel_control_helper ( ) - > AddTraceEvent ( severity , message ) ;
}
//
// factory
//
class Lrs LbFactory : public LoadBalancingPolicyFactory {
class EdsDrop LbFactory : public LoadBalancingPolicyFactory {
public :
OrphanablePtr < LoadBalancingPolicy > CreateLoadBalancingPolicy (
LoadBalancingPolicy : : Args args ) const override {
@ -370,25 +391,26 @@ class LrsLbFactory : public LoadBalancingPolicyFactory {
RefCountedPtr < XdsClient > xds_client = XdsClient : : GetOrCreate ( & error ) ;
if ( error ! = GRPC_ERROR_NONE ) {
gpr_log ( GPR_ERROR ,
" cannot get XdsClient to instantiate lrs LB policy: %s " ,
" cannot get XdsClient to instantiate eds_drop LB policy: %s " ,
grpc_error_string ( error ) ) ;
GRPC_ERROR_UNREF ( error ) ;
return nullptr ;
}
return MakeOrphanable < Lrs Lb> ( std : : move ( xds_client ) , std : : move ( args ) ) ;
return MakeOrphanable < EdsDrop Lb> ( std : : move ( xds_client ) , std : : move ( args ) ) ;
}
const char * name ( ) const override { return kLrs ; }
const char * name ( ) const override { return kEdsDrop ; }
RefCountedPtr < LoadBalancingPolicy : : Config > ParseLoadBalancingConfig (
const Json & json , grpc_error * * error ) const override {
GPR_DEBUG_ASSERT ( error ! = nullptr & & * error = = GRPC_ERROR_NONE ) ;
if ( json . type ( ) = = Json : : Type : : JSON_NULL ) {
// lrs was mentioned as a policy in the deprecated loadBalancingPolicy
// This policy was configured in the deprecated loadBalancingPolicy
// field or in the client API.
* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:loadBalancingPolicy error:lrs policy requires configuration. "
" Please use loadBalancingConfig field of service config instead. " ) ;
" field:loadBalancingPolicy error:eds_drop policy requires "
" configuration. Please use loadBalancingConfig field of service "
" config instead. " ) ;
return nullptr ;
}
std : : vector < grpc_error * > error_list ;
@ -433,88 +455,100 @@ class LrsLbFactory : public LoadBalancingPolicyFactory {
eds_service_name = it - > second . string_value ( ) ;
}
}
// Locality.
RefCountedPtr < XdsLocalityName > locality_name ;
it = json . object_value ( ) . find ( " locality " ) ;
// LRS load reporting server name.
absl : : optional < std : : string > lrs_load_reporting_server_name ;
it = json . object_value ( ) . find ( " lrsLoadReportingServerName " ) ;
if ( it ! = json . object_value ( ) . end ( ) ) {
if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:lrsLoadReportingServerName error:type should be string " ) ) ;
} else {
lrs_load_reporting_server_name = it - > second . string_value ( ) ;
}
}
// Drop config.
auto drop_config = MakeRefCounted < XdsApi : : EdsUpdate : : DropConfig > ( ) ;
it = json . object_value ( ) . find ( " dropCategories " ) ;
if ( it = = json . object_value ( ) . end ( ) ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:locality error:required field missing " ) ) ;
" field:dropCategories error:required field missing " ) ) ;
} else {
std : : vector < grpc_error * > child_errors =
ParseLocality ( it - > second , & locality_name ) ;
ParseDropCategories ( it - > second , drop_config . get ( ) ) ;
if ( ! child_errors . empty ( ) ) {
error_list . push_back (
GRPC_ERROR_CREATE_FROM_VECTOR ( " field:locality " , & child_errors ) ) ;
error_list . push_back ( GRPC_ERROR_CREATE_FROM_VECTOR (
" field:dropCategories " , & child_errors ) ) ;
}
}
// LRS load reporting server name.
std : : string lrs_load_reporting_server_name ;
it = json . object_value ( ) . find ( " lrsLoadReportingServerName " ) ;
if ( it = = json . object_value ( ) . end ( ) ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:lrsLoadReportingServerName error:required field missing " ) ) ;
} else if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" field:lrsLoadReportingServerName error:type should be string " ) ) ;
} else {
lrs_load_reporting_server_name = it - > second . string_value ( ) ;
}
if ( ! error_list . empty ( ) ) {
* error = GRPC_ERROR_CREATE_FROM_VECTOR (
" lrs _experimental LB policy config" , & error_list ) ;
" eds_drop_experimental LB policy config " , & error_list ) ;
return nullptr ;
}
return MakeRefCounted < Lrs LbConfig> (
return MakeRefCounted < EdsDropLbConfig > (
std : : move ( child_policy ) , std : : move ( cluster_name ) ,
std : : move ( eds_service_name ) , std : : move ( lrs_load_reporting_server_name ) ,
std : : move ( locality_name ) ) ;
std : : move ( drop_config ) ) ;
}
private :
static std : : vector < grpc_error * > ParseLocality (
const Json & json , RefCountedPtr < XdsLocalityName > * name ) {
static std : : vector < grpc_error * > ParseDropCategories (
const Json & json , XdsApi : : EdsUpdate : : DropConfig * drop_config ) {
std : : vector < grpc_error * > error_list ;
if ( json . type ( ) ! = Json : : Type : : OBJECT ) {
if ( json . type ( ) ! = Json : : Type : : ARRAY ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" locality field is not an object " ) ) ;
" dropCategories field is not an array " ) ) ;
return error_list ;
}
std : : string region ;
auto it = json . object_value ( ) . find ( " region " ) ;
if ( it ! = json . object_value ( ) . end ( ) ) {
if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" region \" field is not a string " ) ) ;
} else {
region = it - > second . string_value ( ) ;
for ( size_t i = 0 ; i < json . array_value ( ) . size ( ) ; + + i ) {
const Json & entry = json . array_value ( ) [ i ] ;
std : : vector < grpc_error * > child_errors =
ParseDropCategory ( entry , drop_config ) ;
if ( ! child_errors . empty ( ) ) {
grpc_error * error = GRPC_ERROR_CREATE_FROM_COPIED_STRING (
absl : : StrCat ( " errors parsing index " , i ) . c_str ( ) ) ;
for ( size_t i = 0 ; i < child_errors . size ( ) ; + + i ) {
error = grpc_error_add_child ( error , child_errors [ i ] ) ;
}
error_list . push_back ( error ) ;
}
}
std : : string zone ;
it = json . object_value ( ) . find ( " zone " ) ;
if ( it ! = json . object_value ( ) . end ( ) ) {
if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" zone \" field is not a string " ) ) ;
} else {
zone = it - > second . string_value ( ) ;
}
return error_list ;
}
static std : : vector < grpc_error * > ParseDropCategory (
const Json & json , XdsApi : : EdsUpdate : : DropConfig * drop_config ) {
std : : vector < grpc_error * > error_list ;
if ( json . type ( ) ! = Json : : Type : : OBJECT ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" dropCategories entry is not an object " ) ) ;
return error_list ;
}
std : : string subzone ;
it = json . object_value ( ) . find ( " subzone " ) ;
if ( it ! = json . object_value ( ) . end ( ) ) {
if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" subzone \" field is not a string " ) ) ;
} else {
subzone = it - > second . string_value ( ) ;
}
std : : string category ;
auto it = json . object_value ( ) . find ( " category " ) ;
if ( it = = json . object_value ( ) . end ( ) ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" category \" field not present " ) ) ;
} else if ( it - > second . type ( ) ! = Json : : Type : : STRING ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" category \" field is not a string " ) ) ;
} else {
category = it - > second . string_value ( ) ;
}
if ( region . empty ( ) & & zone . empty ( ) & & subzone . empty ( ) ) {
uint32_t requests_per_million = 0 ;
it = json . object_value ( ) . find ( " requests_per_million " ) ;
if ( it = = json . object_value ( ) . end ( ) ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" \" requests_per_million \" field is not present " ) ) ;
} else if ( it - > second . type ( ) ! = Json : : Type : : NUMBER ) {
error_list . push_back ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" at least one of region, zone, or subzone must be set " ) ) ;
" \" requests_per_million \" field is not a number " ) ) ;
} else {
requests_per_million =
gpr_parse_nonnegative_int ( it - > second . string_value ( ) . c_str ( ) ) ;
}
if ( error_list . empty ( ) ) {
* name = MakeRefCounted < XdsLocalityName > ( region , zone , subzone ) ;
drop_config - > AddCategory ( std : : move ( category ) , requests_per_million ) ;
}
return error_list ;
}
@ -528,10 +562,10 @@ class LrsLbFactory : public LoadBalancingPolicyFactory {
// Plugin registration
//
void grpc_lb_policy_lrs _init ( ) {
void grpc_lb_policy_eds_drop _init ( ) {
grpc_core : : LoadBalancingPolicyRegistry : : Builder : :
RegisterLoadBalancingPolicyFactory (
absl : : make_unique < grpc_core : : Lrs LbFactory> ( ) ) ;
absl : : make_unique < grpc_core : : EdsDrop LbFactory> ( ) ) ;
}
void grpc_lb_policy_lrs _shutdown ( ) { }
void grpc_lb_policy_eds_drop _shutdown ( ) { }