@ -125,8 +125,7 @@ class GrpcLb : public LoadBalancingPolicy {
const char * name ( ) const override { return kGrpclb ; }
void UpdateLocked ( const grpc_channel_args & args ,
RefCountedPtr < Config > lb_config ) override ;
void UpdateLocked ( UpdateArgs args ) override ;
void ResetBackoffLocked ( ) override ;
void FillChildRefsForChannelz (
channelz : : ChildRefsList * child_subchannels ,
@ -295,7 +294,8 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked ( ) override ;
// Helper functions used in UpdateLocked().
void ProcessChannelArgsLocked ( const grpc_channel_args & args ) ;
void ProcessAddressesAndChannelArgsLocked ( const ServerAddressList & addresses ,
const grpc_channel_args & args ) ;
void ParseLbConfig ( Config * grpclb_config ) ;
static void OnBalancerChannelConnectivityChangedLocked ( void * arg ,
grpc_error * error ) ;
@ -311,7 +311,8 @@ class GrpcLb : public LoadBalancingPolicy {
static void OnBalancerCallRetryTimerLocked ( void * arg , grpc_error * error ) ;
// Methods for dealing with the child policy.
grpc_channel_args * CreateChildPolicyArgsLocked ( ) ;
grpc_channel_args * CreateChildPolicyArgsLocked (
bool is_backend_from_grpclb_load_balancer ) ;
OrphanablePtr < LoadBalancingPolicy > CreateChildPolicyLocked (
const char * name , const grpc_channel_args * args ) ;
void CreateOrUpdateChildPolicyLocked ( ) ;
@ -1204,7 +1205,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
const ServerAddressList & addresses ,
FakeResolverResponseGenerator * response_generator ,
const grpc_channel_args * args ) {
ServerAddressList balancer_addresses = ExtractBalancerAddresses ( addresses ) ;
// Channel args to remove.
static const char * args_to_remove [ ] = {
// LB policy name, since we want to use the default (pick_first) in
@ -1217,15 +1217,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI ,
// The resolved addresses, which will be generated by the name resolver
// used in the LB channel. Note that the LB channel will use the fake
// resolver, so this won't actually generate a query to DNS (or some
// other name service). However, the addresses returned by the fake
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy.
GRPC_ARG_SERVER_ADDRESS_LIST ,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
@ -1241,10 +1232,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
} ;
// Channel args to add.
const grpc_arg args_to_add [ ] = {
// New address list.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
CreateServerAddressListChannelArg ( & balancer_addresses ) ,
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core : : FakeResolverResponseGenerator : : MakeChannelArg (
@ -1262,7 +1249,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
args , args_to_remove , GPR_ARRAY_SIZE ( args_to_remove ) , args_to_add ,
GPR_ARRAY_SIZE ( args_to_add ) ) ;
// Make any necessary modifications for security.
return grpc_lb_policy_grpclb_modify_lb_channel_args ( new_args ) ;
return grpc_lb_policy_grpclb_modify_lb_channel_args ( addresses , new_args ) ;
}
//
@ -1388,11 +1375,10 @@ void GrpcLb::FillChildRefsForChannelz(
}
}
void GrpcLb : : UpdateLocked ( const grpc_channel_args & args ,
RefCountedPtr < Config > lb_config ) {
void GrpcLb : : UpdateLocked ( UpdateArgs args ) {
const bool is_initial_update = lb_channel_ = = nullptr ;
ParseLbConfig ( lb_ config. get ( ) ) ;
ProcessChannelArgsLocked ( args ) ;
ParseLbConfig ( args . config . get ( ) ) ;
ProcessAddressesAnd ChannelArgsLocked ( args . addresses , * args . args ) ;
// Update the existing child policy.
if ( child_policy_ ! = nullptr ) CreateOrUpdateChildPolicyLocked ( ) ;
// If this is the initial update, start the fallback-at-startup checks
@ -1442,18 +1428,10 @@ ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
return backend_addresses ;
}
void GrpcLb : : ProcessChannelArgsLocked ( const grpc_channel_args & args ) {
const ServerAddressList * addresses = FindServerAddressListChannelArg ( & args ) ;
if ( addresses = = nullptr ) {
// Ignore this update.
gpr_log (
GPR_ERROR ,
" [grpclb %p] No valid LB addresses channel arg in update, ignoring. " ,
this ) ;
return ;
}
void GrpcLb : : ProcessAddressesAndChannelArgsLocked (
const ServerAddressList & addresses , const grpc_channel_args & args ) {
// Update fallback address list.
fallback_backend_addresses_ = ExtractBackendAddresses ( * addresses ) ;
fallback_backend_addresses_ = ExtractBackendAddresses ( addresses ) ;
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char * args_to_remove [ ] = { GRPC_ARG_LB_POLICY_NAME } ;
@ -1463,8 +1441,9 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
args_ = grpc_channel_args_copy_and_add_and_remove (
& args , args_to_remove , GPR_ARRAY_SIZE ( args_to_remove ) , & new_arg , 1 ) ;
// Construct args for balancer channel.
grpc_channel_args * lb_channel_args =
BuildBalancerChannelArgs ( * addresses , response_generator_ . get ( ) , & args ) ;
ServerAddressList balancer_addresses = ExtractBalancerAddresses ( addresses ) ;
grpc_channel_args * lb_channel_args = BuildBalancerChannelArgs (
balancer_addresses , response_generator_ . get ( ) , & args ) ;
// Create balancer channel if needed.
if ( lb_channel_ = = nullptr ) {
char * uri_str ;
@ -1481,8 +1460,10 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
}
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
response_generator_ - > SetResponse ( lb_channel_args ) ;
grpc_channel_args_destroy ( lb_channel_args ) ;
Resolver : : Result result ;
result . addresses = std : : move ( balancer_addresses ) ;
result . args = lb_channel_args ;
response_generator_ - > SetResponse ( std : : move ( result ) ) ;
}
void GrpcLb : : ParseLbConfig ( Config * grpclb_config ) {
@ -1649,25 +1630,9 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
// code for interacting with the child policy
//
grpc_channel_args * GrpcLb : : CreateChildPolicyArgsLocked ( ) {
ServerAddressList tmp_addresses ;
ServerAddressList * addresses = & tmp_addresses ;
bool is_backend_from_grpclb_load_balancer = false ;
if ( fallback_mode_ ) {
// Note: If fallback backend address list is empty, the child policy
// will go into state TRANSIENT_FAILURE.
addresses = & fallback_backend_addresses_ ;
} else {
tmp_addresses = serverlist_ - > GetServerAddressList (
lb_calld_ = = nullptr ? nullptr : lb_calld_ - > client_stats ( ) ) ;
is_backend_from_grpclb_load_balancer = true ;
}
GPR_ASSERT ( addresses ! = nullptr ) ;
// Replace the server address list in the channel args that we pass down to
// the subchannel.
static const char * keys_to_remove [ ] = { GRPC_ARG_SERVER_ADDRESS_LIST } ;
grpc_arg args_to_add [ 3 ] = {
CreateServerAddressListChannelArg ( addresses ) ,
grpc_channel_args * GrpcLb : : CreateChildPolicyArgsLocked (
bool is_backend_from_grpclb_load_balancer ) {
grpc_arg args_to_add [ 2 ] = {
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create (
@ -1675,15 +1640,12 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() {
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER ) ,
is_backend_from_grpclb_load_balancer ) ,
} ;
size_t num_args_to_add = 2 ;
size_t num_args_to_add = 1 ;
if ( is_backend_from_grpclb_load_balancer ) {
args_to_add [ 2 ] = grpc_channel_arg_integer_create (
args_to_add [ num_args_to_add + + ] = grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_INHIBIT_HEALTH_CHECKING ) , 1 ) ;
+ + num_args_to_add ;
}
return grpc_channel_args_copy_and_add_and_remove (
args_ , keys_to_remove , GPR_ARRAY_SIZE ( keys_to_remove ) , args_to_add ,
num_args_to_add ) ;
return grpc_channel_args_copy_and_add ( args_ , args_to_add , num_args_to_add ) ;
}
OrphanablePtr < LoadBalancingPolicy > GrpcLb : : CreateChildPolicyLocked (
@ -1717,8 +1679,25 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
void GrpcLb : : CreateOrUpdateChildPolicyLocked ( ) {
if ( shutting_down_ ) return ;
grpc_channel_args * args = CreateChildPolicyArgsLocked ( ) ;
GPR_ASSERT ( args ! = nullptr ) ;
// Construct update args.
UpdateArgs update_args ;
bool is_backend_from_grpclb_load_balancer = false ;
if ( fallback_mode_ ) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
update_args . addresses = fallback_backend_addresses_ ;
} else {
update_args . addresses = serverlist_ - > GetServerAddressList (
lb_calld_ = = nullptr ? nullptr : lb_calld_ - > client_stats ( ) ) ;
is_backend_from_grpclb_load_balancer = true ;
}
update_args . args =
CreateChildPolicyArgsLocked ( is_backend_from_grpclb_load_balancer ) ;
GPR_ASSERT ( update_args . args ! = nullptr ) ;
update_args . config = child_policy_config_ ;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -1789,7 +1768,8 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
gpr_log ( GPR_INFO , " [grpclb %p] Creating new %schild policy %s " , this ,
child_policy_ = = nullptr ? " " : " pending " , child_policy_name ) ;
}
auto new_policy = CreateChildPolicyLocked ( child_policy_name , args ) ;
auto new_policy =
CreateChildPolicyLocked ( child_policy_name , update_args . args ) ;
// Swap the policy into place.
auto & lb_policy =
child_policy_ = = nullptr ? child_policy_ : pending_child_policy_ ;
@ -1813,9 +1793,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
policy_to_update = = pending_child_policy_ . get ( ) ? " pending " : " " ,
policy_to_update ) ;
}
policy_to_update - > UpdateLocked ( * args , child_policy_config_ ) ;
// Clean up.
grpc_channel_args_destroy ( args ) ;
policy_to_update - > UpdateLocked ( std : : move ( update_args ) ) ;
}
//