@ -129,11 +129,15 @@ class XdsLb : public LoadBalancingPolicy {
channelz : : ChildRefsList * child_channels ) override ;
private :
/// Contains a channel to the LB server and all the data related to the
/// channel.
class BalancerChannelState
: public InternallyRefCounted < BalancerChannelState > {
public :
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState : public InternallyRefCounted < BalancerCallState > {
public :
explicit BalancerCallState (
RefCountedPtr < LoadBalancingPolicy > parent_xdslb_policy ) ;
explicit BalancerCallState ( RefCountedPtr < BalancerChannelState > lb_chand ) ;
// It's the caller's responsibility to ensure that Orphan() is called from
// inside the combiner.
@ -141,7 +145,9 @@ class XdsLb : public LoadBalancingPolicy {
void StartQuery ( ) ;
XdsLbClientStats * client_stats ( ) const { return client_stats_ . get ( ) ; }
RefCountedPtr < XdsLbClientStats > client_stats ( ) const {
return client_stats_ ;
}
bool seen_initial_response ( ) const { return seen_initial_response_ ; }
@ -152,8 +158,10 @@ class XdsLb : public LoadBalancingPolicy {
~ BalancerCallState ( ) ;
XdsLb * xdslb_policy ( ) const {
return static_cast < XdsLb * > ( xdslb_policy_ . get ( ) ) ;
XdsLb * xdslb_policy ( ) const { return lb_chand_ - > xdslb_policy_ . get ( ) ; }
bool IsCurrentCallOnChannel ( ) const {
return this = = lb_chand_ - > lb_calld_ . get ( ) ;
}
void ScheduleNextClientLoadReportLocked ( ) ;
@ -166,8 +174,8 @@ class XdsLb : public LoadBalancingPolicy {
static void OnBalancerMessageReceivedLocked ( void * arg , grpc_error * error ) ;
static void OnBalancerStatusReceivedLocked ( void * arg , grpc_error * error ) ;
// The owning LB policy .
RefCountedPtr < LoadBalancingPolicy > xdslb_policy _;
// The owning LB channel .
RefCountedPtr < BalancerChannelState > lb_chand _;
// The streaming call to the LB server. Always non-NULL.
grpc_call * lb_call_ = nullptr ;
@ -203,6 +211,48 @@ class XdsLb : public LoadBalancingPolicy {
grpc_closure client_load_report_closure_ ;
} ;
BalancerChannelState ( const char * balancer_name ,
const grpc_channel_args & args ,
RefCountedPtr < XdsLb > parent_xdslb_policy ) ;
~ BalancerChannelState ( ) ;
void Orphan ( ) override ;
grpc_channel * channel ( ) const { return channel_ ; }
BalancerCallState * lb_calld ( ) const { return lb_calld_ . get ( ) ; }
bool IsCurrentChannel ( ) const {
return this = = xdslb_policy_ - > lb_chand_ . get ( ) ;
}
bool IsPendingChannel ( ) const {
return this = = xdslb_policy_ - > pending_lb_chand_ . get ( ) ;
}
bool HasActiveCall ( ) const { return lb_calld_ ! = nullptr ; }
void StartCallRetryTimerLocked ( ) ;
static void OnCallRetryTimerLocked ( void * arg , grpc_error * error ) ;
void StartCallLocked ( ) ;
private :
// The owning LB policy.
RefCountedPtr < XdsLb > xdslb_policy_ ;
// The channel and its status.
grpc_channel * channel_ ;
bool shutting_down_ = false ;
// The data associated with the current LB call. It holds a ref to this LB
// channel. It's instantiated every time we query for backends. It's reset
// whenever the current LB call is no longer needed (e.g., the LB policy is
// shutting down, or the LB call has ended). A non-NULL lb_calld_ always
// contains a non-NULL lb_call_.
OrphanablePtr < BalancerCallState > lb_calld_ ;
BackOff lb_call_backoff_ ;
grpc_timer lb_call_retry_timer_ ;
grpc_closure lb_on_call_retry_ ;
bool retry_timer_callback_pending_ = false ;
} ;
class Picker : public SubchannelPicker {
public :
Picker ( UniquePtr < SubchannelPicker > child_picker ,
@ -223,7 +273,6 @@ class XdsLb : public LoadBalancingPolicy {
Subchannel * CreateSubchannel ( const grpc_channel_args & args ) override ;
grpc_channel * CreateChannel ( const char * target ,
grpc_client_channel_type type ,
const grpc_channel_args & args ) override ;
void UpdateState ( grpc_connectivity_state state , grpc_error * state_error ,
UniquePtr < SubchannelPicker > picker ) override ;
@ -246,13 +295,13 @@ class XdsLb : public LoadBalancingPolicy {
// found. Does nothing upon failure.
void ParseLbConfig ( Config * xds_config ) ;
// Methods for dealing with the balancer channel and call.
void StartBalancerCallLocked ( ) ;
BalancerChannelState * LatestLbChannel ( ) const {
return pending_lb_chand_ ! = nullptr ? pending_lb_chand_ . get ( )
: lb_chand_ . get ( ) ;
}
// Callback to enter fallback mode.
static void OnFallbackTimerLocked ( void * arg , grpc_error * error ) ;
void StartBalancerCallRetryTimerLocked ( ) ;
static void OnBalancerCallRetryTimerLocked ( void * arg , grpc_error * error ) ;
static void OnBalancerChannelConnectivityChangedLocked ( void * arg ,
grpc_error * error ) ;
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked ( ) ;
@ -272,30 +321,15 @@ class XdsLb : public LoadBalancingPolicy {
bool shutting_down_ = false ;
// The channel for communicating with the LB server.
grpc_channel * lb_channel_ = nullptr ;
OrphanablePtr < BalancerChannelState > lb_chand_ ;
OrphanablePtr < BalancerChannelState > pending_lb_chand_ ;
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
gpr_mu lb_channel_mu_ ;
grpc_connectivity_state lb_channel_connectivity_ ;
grpc_closure lb_channel_on_connectivity_changed_ ;
// Are we already watching the LB channel's connectivity?
bool watching_lb_channel_ = false ;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr < FakeResolverResponseGenerator > response_generator_ ;
// TODO(juanlishen): Replace this with atomic.
gpr_mu lb_chand_mu_ ;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
// NULL whenever the current LB call is no longer needed (e.g., the LB policy
// is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
// contains a non-NULL lb_call_.
OrphanablePtr < BalancerCallState > lb_calld_ ;
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0 ;
// Balancer call retry state.
BackOff lb_call_backoff_ ;
bool retry_timer_callback_pending_ = false ;
grpc_timer lb_call_retry_timer_ ;
grpc_closure lb_on_call_retry_ ;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
@ -323,11 +357,6 @@ class XdsLb : public LoadBalancingPolicy {
// XdsLb::Picker
//
// Destroy function used when embedding client stats in call context.
void DestroyClientStats ( void * arg ) {
static_cast < XdsLbClientStats * > ( arg ) - > Unref ( ) ;
}
XdsLb : : Picker : : PickResult XdsLb : : Picker : : Pick ( PickState * pick ,
grpc_error * * error ) {
// TODO(roth): Add support for drop handling.
@ -336,10 +365,7 @@ XdsLb::Picker::PickResult XdsLb::Picker::Pick(PickState* pick,
// If pick succeeded, add client stats.
if ( result = = PickResult : : PICK_COMPLETE & &
pick - > connected_subchannel ! = nullptr & & client_stats_ ! = nullptr ) {
pick - > subchannel_call_context [ GRPC_GRPCLB_CLIENT_STATS ] . value =
client_stats_ - > Ref ( ) . release ( ) ;
pick - > subchannel_call_context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy =
DestroyClientStats ;
// TODO(roth): Add support for client stats.
}
return result ;
}
@ -354,10 +380,9 @@ Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
}
grpc_channel * XdsLb : : Helper : : CreateChannel ( const char * target ,
grpc_client_channel_type type ,
const grpc_channel_args & args ) {
if ( parent_ - > shutting_down_ ) return nullptr ;
return parent_ - > channel_control_helper ( ) - > CreateChannel ( target , type , args ) ;
return parent_ - > channel_control_helper ( ) - > CreateChannel ( target , args ) ;
}
void XdsLb : : Helper : : UpdateState ( grpc_connectivity_state state ,
@ -370,11 +395,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
RefCountedPtr < XdsLbClientStats > client_stats ;
if ( parent_ - > lb_calld_ ! = nullptr & &
parent_ - > lb_calld_ - > client_stats ( ) ! = nullptr ) {
client_stats = parent_ - > lb_calld_ - > client_stats ( ) - > Ref ( ) ;
}
GPR_ASSERT ( parent_ - > lb_chand_ ! = nullptr ) ;
RefCountedPtr < XdsLbClientStats > client_stats =
parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
? nullptr
: parent_ - > lb_chand_ - > lb_calld ( ) - > client_stats ( ) ;
parent_ - > channel_control_helper ( ) - > UpdateState (
state , state_error ,
UniquePtr < SubchannelPicker > (
@ -389,12 +414,13 @@ void XdsLb::Helper::RequestReresolution() {
" (%p). " ,
parent_ . get ( ) , parent_ - > child_policy_ . get ( ) ) ;
}
GPR_ASSERT ( parent_ - > lb_chand_ ! = nullptr ) ;
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the RR policy. Otherwise, pass the re-resolution request up to the
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if ( parent_ - > lb_calld_ = = nullptr | |
! parent_ - > lb_calld_ - > seen_initial_response ( ) ) {
if ( parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr | |
! parent_ - > lb_chand_ - > lb_calld ( ) - > seen_initial_response ( ) ) {
parent_ - > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
}
@ -475,14 +501,98 @@ UniquePtr<ServerAddressList> ProcessServerlist(
}
//
// XdsLb::BalancerCallState
// XdsLb::BalancerChannelState
//
XdsLb : : BalancerChannelState : : BalancerChannelState (
const char * balancer_name , const grpc_channel_args & args ,
grpc_core : : RefCountedPtr < grpc_core : : XdsLb > parent_xdslb_policy )
: InternallyRefCounted < BalancerChannelState > ( & grpc_lb_xds_trace ) ,
xdslb_policy_ ( std : : move ( parent_xdslb_policy ) ) ,
lb_call_backoff_ (
BackOff : : Options ( )
. set_initial_backoff ( GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
1000 )
. set_multiplier ( GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER )
. set_jitter ( GRPC_XDS_RECONNECT_JITTER )
. set_max_backoff ( GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000 ) ) {
channel_ = xdslb_policy_ - > channel_control_helper ( ) - > CreateChannel (
balancer_name , args ) ;
GPR_ASSERT ( channel_ ! = nullptr ) ;
StartCallLocked ( ) ;
}
XdsLb : : BalancerChannelState : : ~ BalancerChannelState ( ) {
grpc_channel_destroy ( channel_ ) ;
}
void XdsLb : : BalancerChannelState : : Orphan ( ) {
shutting_down_ = true ;
lb_calld_ . reset ( ) ;
if ( retry_timer_callback_pending_ ) grpc_timer_cancel ( & lb_call_retry_timer_ ) ;
Unref ( DEBUG_LOCATION , " lb_channel_orphaned " ) ;
}
void XdsLb : : BalancerChannelState : : StartCallRetryTimerLocked ( ) {
grpc_millis next_try = lb_call_backoff_ . NextAttemptTime ( ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Failed to connect to LB server (lb_chand: %p)... " ,
xdslb_policy_ . get ( ) , this ) ;
grpc_millis timeout = next_try - ExecCtx : : Get ( ) - > Now ( ) ;
if ( timeout > 0 ) {
gpr_log ( GPR_INFO , " [xdslb %p] ... retry_timer_active in % " PRId64 " ms. " ,
xdslb_policy_ . get ( ) , timeout ) ;
} else {
gpr_log ( GPR_INFO , " [xdslb %p] ... retry_timer_active immediately. " ,
xdslb_policy_ . get ( ) ) ;
}
}
Ref ( DEBUG_LOCATION , " on_balancer_call_retry_timer " ) . release ( ) ;
GRPC_CLOSURE_INIT ( & lb_on_call_retry_ , & OnCallRetryTimerLocked , this ,
grpc_combiner_scheduler ( xdslb_policy_ - > combiner ( ) ) ) ;
grpc_timer_init ( & lb_call_retry_timer_ , next_try , & lb_on_call_retry_ ) ;
retry_timer_callback_pending_ = true ;
}
void XdsLb : : BalancerChannelState : : OnCallRetryTimerLocked ( void * arg ,
grpc_error * error ) {
BalancerChannelState * lb_chand = static_cast < BalancerChannelState * > ( arg ) ;
lb_chand - > retry_timer_callback_pending_ = false ;
if ( ! lb_chand - > shutting_down_ & & error = = GRPC_ERROR_NONE & &
lb_chand - > lb_calld_ = = nullptr ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Restarting call to LB server (lb_chand: %p) " ,
lb_chand - > xdslb_policy_ . get ( ) , lb_chand ) ;
}
lb_chand - > StartCallLocked ( ) ;
}
lb_chand - > Unref ( DEBUG_LOCATION , " on_balancer_call_retry_timer " ) ;
}
void XdsLb : : BalancerChannelState : : StartCallLocked ( ) {
if ( shutting_down_ ) return ;
GPR_ASSERT ( channel_ ! = nullptr ) ;
GPR_ASSERT ( lb_calld_ = = nullptr ) ;
lb_calld_ = MakeOrphanable < BalancerCallState > ( Ref ( ) ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Query for backends (lb_chand: %p, lb_calld: %p) " ,
xdslb_policy_ . get ( ) , this , lb_calld_ . get ( ) ) ;
}
lb_calld_ - > StartQuery ( ) ;
}
//
// XdsLb::BalancerChannelState::BalancerCallState
//
XdsLb : : BalancerCallState : : BalancerCallState (
RefCountedPtr < LoadBalancingPolicy > parent_xdslb_policy )
XdsLb : : BalancerChannelState : : BalancerC allState : : BalancerCallState (
RefCountedPtr < BalancerChannelState > lb_chand )
: InternallyRefCounted < BalancerCallState > ( & grpc_lb_xds_trace ) ,
xdslb_policy_ ( std : : move ( parent_xdslb_policy ) ) {
GPR_ASSERT ( xdslb_policy_ ! = nullptr ) ;
lb_chand _( std : : move ( lb_chand ) ) {
GPR_ASSERT ( xdslb_policy ( ) ! = nullptr ) ;
GPR_ASSERT ( ! xdslb_policy ( ) - > shutting_down_ ) ;
// Init the LB call. Note that the LB call will progress every time there's
// activity in xdslb_policy_->interested_parties(), which is comprised of
@ -494,8 +604,8 @@ XdsLb::BalancerCallState::BalancerCallState(
? GRPC_MILLIS_INF_FUTURE
: ExecCtx : : Get ( ) - > Now ( ) + xdslb_policy ( ) - > lb_call_timeout_ms_ ;
lb_call_ = grpc_channel_create_pollset_set_call (
xdslb_policy ( ) - > lb_ channel_, nullptr , GRPC_PROPAGATE_DEFAULTS ,
xdslb_policy_ - > interested_parties ( ) ,
lb_chand_ - > channel_ , nullptr , GRPC_PROPAGATE_DEFAULTS ,
xdslb_policy ( ) - > interested_parties ( ) ,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD ,
nullptr , deadline , nullptr ) ;
// Init the LB call request payload.
@ -519,7 +629,7 @@ XdsLb::BalancerCallState::BalancerCallState(
grpc_combiner_scheduler ( xdslb_policy ( ) - > combiner ( ) ) ) ;
}
XdsLb : : BalancerCallState : : ~ BalancerCallState ( ) {
XdsLb : : BalancerChannelState : : BalancerC allState : : ~ BalancerCallState ( ) {
GPR_ASSERT ( lb_call_ ! = nullptr ) ;
grpc_call_unref ( lb_call_ ) ;
grpc_metadata_array_destroy ( & lb_initial_metadata_recv_ ) ;
@ -529,7 +639,7 @@ XdsLb::BalancerCallState::~BalancerCallState() {
grpc_slice_unref_internal ( lb_call_status_details_ ) ;
}
void XdsLb : : BalancerCallState : : Orphan ( ) {
void XdsLb : : BalancerChannelState : : BalancerC allState : : Orphan ( ) {
GPR_ASSERT ( lb_call_ ! = nullptr ) ;
// If we are here because xdslb_policy wants to cancel the call,
// lb_on_balancer_status_received_ will complete the cancellation and clean
@ -544,11 +654,11 @@ void XdsLb::BalancerCallState::Orphan() {
// in lb_on_balancer_status_received_ instead of here.
}
void XdsLb : : BalancerCallState : : StartQuery ( ) {
void XdsLb : : BalancerChannelState : : BalancerC allState : : StartQuery ( ) {
GPR_ASSERT ( lb_call_ ! = nullptr ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p) " ,
xdslb_policy_ . get ( ) , this , lb_call_ ) ;
xdslb_policy ( ) , this , lb_call_ ) ;
}
// Create the ops.
grpc_call_error call_error ;
@ -616,7 +726,8 @@ void XdsLb::BalancerCallState::StartQuery() {
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
}
void XdsLb : : BalancerCallState : : ScheduleNextClientLoadReportLocked ( ) {
void XdsLb : : BalancerChannelState : : BalancerCallState : :
ScheduleNextClientLoadReportLocked ( ) {
const grpc_millis next_client_load_report_time =
ExecCtx : : Get ( ) - > Now ( ) + client_stats_report_interval_ ;
GRPC_CLOSURE_INIT ( & client_load_report_closure_ ,
@ -627,12 +738,11 @@ void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_timer_callback_pending_ = true ;
}
void XdsLb : : BalancerCallState : : MaybeSendClientLoadReportLocked (
void * arg , grpc_error * error ) {
void XdsLb : : BalancerChannelState : : BalancerCallState : :
MaybeSendClientLoadReportLocked ( void * arg , grpc_error * error ) {
BalancerCallState * lb_calld = static_cast < BalancerCallState * > ( arg ) ;
XdsLb * xdslb_policy = lb_calld - > xdslb_policy ( ) ;
lb_calld - > client_load_report_timer_callback_pending_ = false ;
if ( error ! = GRPC_ERROR_NONE | | lb_calld ! = xdslb_policy - > lb_calld_ . get ( ) ) {
if ( error ! = GRPC_ERROR_NONE | | ! lb_calld - > IsCurrentCallOnChannel ( ) ) {
lb_calld - > Unref ( DEBUG_LOCATION , " client_load_report " ) ;
return ;
}
@ -646,7 +756,7 @@ void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked(
}
}
bool XdsLb : : BalancerCallState : : LoadReportCountersAreZero (
bool XdsLb : : BalancerChannelState : : BalancerC allState : : LoadReportCountersAreZero (
xds_grpclb_request * request ) {
XdsLbClientStats : : DroppedCallCounts * drop_entries =
static_cast < XdsLbClientStats : : DroppedCallCounts * > (
@ -660,7 +770,8 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
}
// TODO(vpowar): Use LRS to send the client Load Report.
void XdsLb : : BalancerCallState : : SendClientLoadReportLocked ( ) {
void XdsLb : : BalancerChannelState : : BalancerCallState : :
SendClientLoadReportLocked ( ) {
// Construct message payload.
GPR_ASSERT ( send_message_payload_ = = nullptr ) ;
xds_grpclb_request * request =
@ -681,27 +792,27 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
xds_grpclb_request_destroy ( request ) ;
}
void XdsLb : : BalancerCallState : : OnInitialRequestSentLocked ( void * arg ,
grpc_error * error ) {
void XdsLb : : BalancerChannelState : : BalancerC allState : : OnInitialRequestSentLocked (
void * arg , grpc_error * error ) {
BalancerCallState * lb_calld = static_cast < BalancerCallState * > ( arg ) ;
grpc_byte_buffer_destroy ( lb_calld - > send_message_payload_ ) ;
lb_calld - > send_message_payload_ = nullptr ;
// If we attempted to send a client load report before the initial request was
// sent (and this lb_calld is still in use), send the load report now.
if ( lb_calld - > client_load_report_is_due_ & &
lb_calld = = lb_calld - > xdslb_policy ( ) - > lb_calld_ . get ( ) ) {
lb_calld - > IsCurrentCallOnChannel ( ) ) {
lb_calld - > SendClientLoadReportLocked ( ) ;
lb_calld - > client_load_report_is_due_ = false ;
}
lb_calld - > Unref ( DEBUG_LOCATION , " on_initial_request_sent " ) ;
}
void XdsLb : : BalancerCallState : : OnBalancerMessageReceivedLocked (
void * arg , grpc_error * error ) {
void XdsLb : : BalancerChannelState : : BalancerCallState : :
OnBalancerMessageReceivedLocked ( void * arg , grpc_error * error ) {
BalancerCallState * lb_calld = static_cast < BalancerCallState * > ( arg ) ;
XdsLb * xdslb_policy = lb_calld - > xdslb_policy ( ) ;
// Empty payload means the LB call was cancelled.
if ( lb_calld ! = xdslb_policy - > lb_calld_ . get ( ) | |
if ( ! lb_calld - > IsCurrentCallOnChannel ( ) | |
lb_calld - > recv_message_payload_ = = nullptr ) {
lb_calld - > Unref ( DEBUG_LOCATION , " on_message_received " ) ;
return ;
@ -719,21 +830,26 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
nullptr ) {
// Have NOT seen initial response, look for initial response.
if ( initial_response - > has_client_stats_report_interval ) {
lb_calld - > client_stats_report_interval_ = GPR_MAX (
GPR_MS_PER_SEC , xds_grpclb_duration_to_millis (
& initial_response - > client_stats_report_interval ) ) ;
const grpc_millis interval = xds_grpclb_duration_to_millis (
& initial_response - > client_stats_report_interval ) ;
if ( interval > 0 ) {
lb_calld - > client_stats_report_interval_ =
GPR_MAX ( GPR_MS_PER_SEC , interval ) ;
}
}
if ( grpc_lb_xds_trace . enabled ( ) ) {
if ( lb_calld - > client_stats_report_interval_ ! = 0 ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Received initial LB response message; "
" client load reporting interval = % " PRId64 " milliseconds " ,
xdslb_policy , lb_calld - > client_stats_report_interval_ ) ;
}
} else if ( grpc_lb_xds_trace . enabled ( ) ) {
} else {
gpr_log ( GPR_INFO ,
" [xdslb %p] Received initial LB response message; client load "
" reporting NOT enabled " ,
xdslb_policy ) ;
}
}
xds_grpclb_initial_response_destroy ( initial_response ) ;
lb_calld - > seen_initial_response_ = true ;
} else if ( ( serverlist = xds_grpclb_response_parse_serverlist (
@ -755,7 +871,23 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
}
}
/* update serverlist */
// TODO(juanlishen): Don't ingore empty serverlist.
if ( serverlist - > num_servers > 0 ) {
// Pending LB channel receives a serverlist; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
// is a current call.
if ( ! lb_calld - > lb_chand_ - > IsCurrentChannel ( ) ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Promoting pending LB channel %p to replace "
" current LB channel %p " ,
xdslb_policy , lb_calld - > lb_chand_ . get ( ) ,
lb_calld - > xdslb_policy ( ) - > lb_chand_ . get ( ) ) ;
}
lb_calld - > xdslb_policy ( ) - > lb_chand_ =
std : : move ( lb_calld - > xdslb_policy ( ) - > pending_lb_chand_ ) ;
}
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if ( lb_calld - > client_stats_report_interval_ > 0 & &
@ -828,37 +960,53 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
}
}
void XdsLb : : BalancerCallState : : OnBalancerStatusReceivedLocked (
void * arg , grpc_error * error ) {
void XdsLb : : BalancerChannelState : : BalancerCallState : :
OnBalancerStatusReceivedLocked ( void * arg , grpc_error * error ) {
BalancerCallState * lb_calld = static_cast < BalancerCallState * > ( arg ) ;
XdsLb * xdslb_policy = lb_calld - > xdslb_policy ( ) ;
BalancerChannelState * lb_chand = lb_calld - > lb_chand_ . get ( ) ;
GPR_ASSERT ( lb_calld - > lb_call_ ! = nullptr ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
char * status_details =
grpc_slice_to_c_string ( lb_calld - > lb_call_status_details_ ) ;
gpr_log ( GPR_INFO ,
" [xdslb %p] Status from LB server received. Status = %d, details "
" = '%s', (lb_calld: %p, lb_call: %p), error '%s' " ,
xdslb_policy , lb_calld - > lb_call_status_ , status_details , lb_call d ,
lb_calld - > lb_call_ , grpc_error_string ( error ) ) ;
" = '%s', (lb_chand: %p, lb_c alld: %p, lb_call: %p), error '%s' " ,
xdslb_policy , lb_calld - > lb_call_status_ , status_details , lb_chan d ,
lb_calld , lb_calld - > lb_call_ , grpc_error_string ( error ) ) ;
gpr_free ( status_details ) ;
}
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if ( lb_calld = = xdslb_policy - > lb_calld_ . get ( ) ) {
xdslb_policy - > lb_calld_ . reset ( ) ;
// Ignore status from a stale call.
if ( lb_calld - > IsCurrentCallOnChannel ( ) ) {
// Because this call is the current one on the channel, the channel can't
// have been swapped out; otherwise, the call should have been reset.
GPR_ASSERT ( lb_chand - > IsCurrentChannel ( ) | | lb_chand - > IsPendingChannel ( ) ) ;
GPR_ASSERT ( ! xdslb_policy - > shutting_down_ ) ;
xdslb_policy - > channel_control_helper ( ) - > RequestReresolution ( ) ;
if ( lb_chand ! = xdslb_policy - > LatestLbChannel ( ) ) {
// This channel must be the current one and there is a pending one. Swap
// in the pending one and we are done.
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Promoting pending LB channel %p to replace "
" current LB channel %p " ,
xdslb_policy , lb_calld - > lb_chand_ . get ( ) ,
lb_calld - > xdslb_policy ( ) - > lb_chand_ . get ( ) ) ;
}
xdslb_policy - > lb_chand_ = std : : move ( xdslb_policy - > pending_lb_chand_ ) ;
} else {
// This channel is the most recently created one. Try to restart the call
// and reresolve.
lb_chand - > lb_calld_ . reset ( ) ;
if ( lb_calld - > seen_initial_response_ ) {
// If we lose connection to the LB server, reset the backoff and restart
// If we lost connection to the LB server, reset the backoff and restart
// the LB call immediately.
xdslb_policy - > lb_call_backoff_ . Reset ( ) ;
xdslb_policy - > StartBalancerCallLocked ( ) ;
lb_chand - > lb_call_backoff_ . Reset ( ) ;
lb_chand - > StartCallLocked ( ) ;
} else {
// If this LB call fails establishing any connection to the LB server,
// retry later.
xdslb_policy - > StartBalancerCallRetryTimerLocked ( ) ;
// If we failed to connect to the LB server, retry later.
lb_chand - > StartCallRetryTimerLocked ( ) ;
}
xdslb_policy - > channel_control_helper ( ) - > RequestReresolution ( ) ;
}
}
lb_calld - > Unref ( DEBUG_LOCATION , " lb_call_ended " ) ;
@ -868,53 +1016,23 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
UniquePtr < ServerAddressList > ExtractBalancerAddresses (
const ServerAddressList & addresses ) {
auto balancer_addresses = MakeUnique < ServerAddressList > ( ) ;
for ( size_t i = 0 ; i < addresses . size ( ) ; + + i ) {
if ( addresses [ i ] . IsBalancer ( ) ) {
balancer_addresses - > emplace_back ( addresses [ i ] ) ;
}
}
return balancer_addresses ;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
* stream for the reception of load balancing updates .
*
* Inputs :
* - \ a addresses : corresponding to the balancers .
* - \ a response_generator : in order to propagate updates from the resolver
* above the grpclb policy .
* - \ a args : other args inherited from the xds policy . */
grpc_channel_args * BuildBalancerChannelArgs (
const ServerAddressList & addresses ,
FakeResolverResponseGenerator * response_generator ,
const grpc_channel_args * args ) {
UniquePtr < ServerAddressList > balancer_addresses =
ExtractBalancerAddresses ( addresses ) ;
// Channel args to remove.
// Returns the channel args for the LB channel, used to create a bidirectional
// stream for the reception of load balancing updates.
grpc_channel_args * BuildBalancerChannelArgs ( const grpc_channel_args * args ) {
static const char * args_to_remove [ ] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME ,
// The service config that contains the LB config. We don't want to
// recursively use xds in the LB channel.
GRPC_ARG_SERVICE_CONFIG ,
// The channel arg for the server URI, since that will be different for
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI ,
// The resolved addresses, which will be generated by the name resolver
// used in the LB channel. Note that the LB channel will use the fake
// resolver, so this won't actually generate a query to DNS (or some
// other name service). However, the addresses returned by the fake
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// xds LB policy, as per the special case logic in client_channel.c.
// used in the LB channel.
GRPC_ARG_SERVER_ADDRESS_LIST ,
// The fake resolver response generator, because we are replacing it
// with the one from the xds policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR ,
// The LB channel should use the authority indicated by the target
// authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args),
// as opposed to the authority from the parent channel.
@ -926,14 +1044,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
} ;
// Channel args to add.
const grpc_arg args_to_add [ ] = {
// New server address list.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
CreateServerAddressListChannelArg ( balancer_addresses . get ( ) ) ,
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core : : FakeResolverResponseGenerator : : MakeChannelArg (
response_generator ) ,
// A channel arg indicating the target is a xds load balancer.
grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER ) , 1 ) ,
@ -954,21 +1064,8 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
XdsLb : : XdsLb ( Args args )
: LoadBalancingPolicy ( std : : move ( args ) ) ,
response_generator_ ( MakeRefCounted < FakeResolverResponseGenerator > ( ) ) ,
lb_call_backoff_ (
BackOff : : Options ( )
. set_initial_backoff ( GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
1000 )
. set_multiplier ( GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER )
. set_jitter ( GRPC_XDS_RECONNECT_JITTER )
. set_max_backoff ( GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000 ) ) {
// Initialization.
gpr_mu_init ( & lb_channel_mu_ ) ;
GRPC_CLOSURE_INIT ( & lb_channel_on_connectivity_changed_ ,
& XdsLb : : OnBalancerChannelConnectivityChangedLocked , this ,
grpc_combiner_scheduler ( args . combiner ) ) ;
XdsLb : : XdsLb ( Args args ) : LoadBalancingPolicy ( std : : move ( args ) ) {
gpr_mu_init ( & lb_chand_mu_ ) ;
// Record server name.
const grpc_arg * arg = grpc_channel_args_find ( args . args , GRPC_ARG_SERVER_URI ) ;
const char * server_uri = grpc_channel_arg_get_string ( arg ) ;
@ -992,7 +1089,7 @@ XdsLb::XdsLb(Args args)
}
XdsLb : : ~ XdsLb ( ) {
gpr_mu_destroy ( & lb_channel _mu_ ) ;
gpr_mu_destroy ( & lb_chand _mu_ ) ;
gpr_free ( ( void * ) server_name_ ) ;
grpc_channel_args_destroy ( args_ ) ;
if ( serverlist_ ! = nullptr ) {
@ -1002,10 +1099,6 @@ XdsLb::~XdsLb() {
void XdsLb : : ShutdownLocked ( ) {
shutting_down_ = true ;
lb_calld_ . reset ( ) ;
if ( retry_timer_callback_pending_ ) {
grpc_timer_cancel ( & lb_call_retry_timer_ ) ;
}
if ( fallback_timer_callback_pending_ ) {
grpc_timer_cancel ( & lb_fallback_timer_ ) ;
}
@ -1014,11 +1107,10 @@ void XdsLb::ShutdownLocked() {
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if ( lb_channel_ ! = nullptr ) {
gpr_mu_lock ( & lb_channel_mu_ ) ;
grpc_channel_destroy ( lb_channel_ ) ;
lb_channel_ = nullptr ;
gpr_mu_unlock ( & lb_channel_mu_ ) ;
{
MutexLock lock ( & lb_chand_mu_ ) ;
lb_chand_ . reset ( ) ;
pending_lb_chand_ . reset ( ) ;
}
}
@ -1027,8 +1119,11 @@ void XdsLb::ShutdownLocked() {
//
void XdsLb : : ResetBackoffLocked ( ) {
if ( lb_channel_ ! = nullptr ) {
grpc_channel_reset_connect_backoff ( lb_channel_ ) ;
if ( lb_chand_ ! = nullptr ) {
grpc_channel_reset_connect_backoff ( lb_chand_ - > channel ( ) ) ;
}
if ( pending_lb_chand_ ! = nullptr ) {
grpc_channel_reset_connect_backoff ( pending_lb_chand_ - > channel ( ) ) ;
}
if ( child_policy_ ! = nullptr ) {
child_policy_ - > ResetBackoffLocked ( ) ;
@ -1037,12 +1132,19 @@ void XdsLb::ResetBackoffLocked() {
void XdsLb : : FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
// d elegate to the child_policy_ to fill the children subchannels.
// D elegate to the child_policy_ to fill the children subchannels.
child_policy_ - > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
MutexLock lock ( & lb_channel_mu_ ) ;
if ( lb_channel_ ! = nullptr ) {
MutexLock lock ( & lb_chand_mu_ ) ;
if ( lb_chand_ ! = nullptr ) {
grpc_core : : channelz : : ChannelNode * channel_node =
grpc_channel_get_channelz_node ( lb_chand_ - > channel ( ) ) ;
if ( channel_node ! = nullptr ) {
child_channels - > push_back ( channel_node - > uuid ( ) ) ;
}
}
if ( pending_lb_chand_ ! = nullptr ) {
grpc_core : : channelz : : ChannelNode * channel_node =
grpc_channel_get_channelz_node ( lb_channel_ ) ;
grpc_channel_get_channelz_node ( pending_lb_chand_ - > channel ( ) ) ;
if ( channel_node ! = nullptr ) {
child_channels - > push_back ( channel_node - > uuid ( ) ) ;
}
@ -1069,22 +1171,29 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
args_ = grpc_channel_args_copy_and_add_and_remove (
& args , args_to_remove , GPR_ARRAY_SIZE ( args_to_remove ) , & new_arg , 1 ) ;
// Construct args for balancer channel.
grpc_channel_args * lb_channel_args =
BuildBalancerChannelArgs ( * addresses , response_generator_ . get ( ) , & args ) ;
// Create balancer channel if needed.
if ( lb_channel_ = = nullptr ) {
char * uri_str ;
gpr_asprintf ( & uri_str , " fake:///%s " , server_name_ ) ;
gpr_mu_lock ( & lb_channel_mu_ ) ;
lb_channel_ = channel_control_helper ( ) - > CreateChannel (
uri_str , GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING , * lb_channel_args ) ;
gpr_mu_unlock ( & lb_channel_mu_ ) ;
GPR_ASSERT ( lb_channel_ ! = nullptr ) ;
gpr_free ( uri_str ) ;
}
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
response_generator_ - > SetResponse ( lb_channel_args ) ;
grpc_channel_args * lb_channel_args = BuildBalancerChannelArgs ( & args ) ;
// Create an LB channel if we don't have one yet or the balancer name has
// changed from the last received one.
bool create_lb_channel = lb_chand_ = = nullptr ;
if ( lb_chand_ ! = nullptr ) {
UniquePtr < char > last_balancer_name (
grpc_channel_get_target ( LatestLbChannel ( ) - > channel ( ) ) ) ;
create_lb_channel =
strcmp ( last_balancer_name . get ( ) , balancer_name_ . get ( ) ) ! = 0 ;
}
if ( create_lb_channel ) {
OrphanablePtr < BalancerChannelState > lb_chand =
MakeOrphanable < BalancerChannelState > ( balancer_name_ . get ( ) ,
* lb_channel_args , Ref ( ) ) ;
if ( lb_chand_ = = nullptr | | ! lb_chand_ - > HasActiveCall ( ) ) {
GPR_ASSERT ( pending_lb_chand_ = = nullptr ) ;
// If we do not have a working LB channel yet, use the newly created one.
lb_chand_ = std : : move ( lb_chand ) ;
} else {
// Otherwise, wait until the new LB channel to be ready to swap it in.
pending_lb_chand_ = std : : move ( lb_chand ) ;
}
}
grpc_channel_args_destroy ( lb_channel_args ) ;
}
@ -1124,12 +1233,13 @@ void XdsLb::ParseLbConfig(Config* xds_config) {
void XdsLb : : UpdateLocked ( const grpc_channel_args & args ,
RefCountedPtr < Config > lb_config ) {
const bool is_initial_update = lb_channel _ = = nullptr ;
const bool is_initial_update = lb_chand _ = = nullptr ;
ParseLbConfig ( lb_config . get ( ) ) ;
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if ( balancer_name_ = = nullptr ) {
gpr_log ( GPR_ERROR , " [xdslb %p] LB config parsing fails. " , this ) ;
return ;
}
ProcessChannelArgsLocked ( args ) ;
// Update the existing child policy.
@ -1149,24 +1259,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args,
fallback_timer_callback_pending_ = true ;
grpc_timer_init ( & lb_fallback_timer_ , deadline , & lb_on_fallback_ ) ;
}
StartBalancerCallLocked ( ) ;
} else if ( ! watching_lb_channel_ ) {
// If this is not the initial update and we're not already watching
// the LB channel's connectivity state, start a watch now. This
// ensures that we'll know when to switch to a new balancer call.
lb_channel_connectivity_ = grpc_channel_check_connectivity_state (
lb_channel_ , true /* try to connect */ ) ;
grpc_channel_element * client_channel_elem = grpc_channel_stack_last_element (
grpc_channel_get_channel_stack ( lb_channel_ ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
watching_lb_channel_ = true ;
// Ref held by closure.
Ref ( DEBUG_LOCATION , " watch_lb_channel_connectivity " ) . release ( ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set ( interested_parties ( ) ) ,
& lb_channel_connectivity_ , & lb_channel_on_connectivity_changed_ ,
nullptr ) ;
}
}
@ -1174,20 +1266,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args,
// code for balancer channel and call
//
void XdsLb : : StartBalancerCallLocked ( ) {
GPR_ASSERT ( lb_channel_ ! = nullptr ) ;
if ( shutting_down_ ) return ;
// Init the LB call data.
GPR_ASSERT ( lb_calld_ = = nullptr ) ;
lb_calld_ = MakeOrphanable < BalancerCallState > ( Ref ( ) ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p) " ,
this , lb_channel_ , lb_calld_ . get ( ) ) ;
}
lb_calld_ - > StartQuery ( ) ;
}
void XdsLb : : OnFallbackTimerLocked ( void * arg , grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
xdslb_policy - > fallback_timer_callback_pending_ = false ;
@ -1204,88 +1282,6 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_fallback_timer " ) ;
}
void XdsLb : : StartBalancerCallRetryTimerLocked ( ) {
grpc_millis next_try = lb_call_backoff_ . NextAttemptTime ( ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Connection to LB server lost... " , this ) ;
grpc_millis timeout = next_try - ExecCtx : : Get ( ) - > Now ( ) ;
if ( timeout > 0 ) {
gpr_log ( GPR_INFO , " [xdslb %p] ... retry_timer_active in % " PRId64 " ms. " ,
this , timeout ) ;
} else {
gpr_log ( GPR_INFO , " [xdslb %p] ... retry_timer_active immediately. " , this ) ;
}
}
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref ( DEBUG_LOCATION , " on_balancer_call_retry_timer " ) ;
self . release ( ) ;
GRPC_CLOSURE_INIT ( & lb_on_call_retry_ , & XdsLb : : OnBalancerCallRetryTimerLocked ,
this , grpc_combiner_scheduler ( combiner ( ) ) ) ;
retry_timer_callback_pending_ = true ;
grpc_timer_init ( & lb_call_retry_timer_ , next_try , & lb_on_call_retry_ ) ;
}
void XdsLb : : OnBalancerCallRetryTimerLocked ( void * arg , grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
xdslb_policy - > retry_timer_callback_pending_ = false ;
if ( ! xdslb_policy - > shutting_down_ & & error = = GRPC_ERROR_NONE & &
xdslb_policy - > lb_calld_ = = nullptr ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Restarting call to LB server " ,
xdslb_policy ) ;
}
xdslb_policy - > StartBalancerCallLocked ( ) ;
}
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_balancer_call_retry_timer " ) ;
}
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
void XdsLb : : OnBalancerChannelConnectivityChangedLocked ( void * arg ,
grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
if ( xdslb_policy - > shutting_down_ ) goto done ;
// Re-initialize the lb_call. This should also take care of updating the
// child policy. Note that the current child policy, if any, will
// stay in effect until an update from the new lb_call is received.
switch ( xdslb_policy - > lb_channel_connectivity_ ) {
case GRPC_CHANNEL_CONNECTING :
case GRPC_CHANNEL_TRANSIENT_FAILURE : {
// Keep watching the LB channel.
grpc_channel_element * client_channel_elem =
grpc_channel_stack_last_element (
grpc_channel_get_channel_stack ( xdslb_policy - > lb_channel_ ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set (
xdslb_policy - > interested_parties ( ) ) ,
& xdslb_policy - > lb_channel_connectivity_ ,
& xdslb_policy - > lb_channel_on_connectivity_changed_ , nullptr ) ;
break ;
}
// The LB channel may be IDLE because it's shut down before the update.
// Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE :
case GRPC_CHANNEL_READY :
xdslb_policy - > lb_calld_ . reset ( ) ;
if ( xdslb_policy - > retry_timer_callback_pending_ ) {
grpc_timer_cancel ( & xdslb_policy - > lb_call_retry_timer_ ) ;
}
xdslb_policy - > lb_call_backoff_ . Reset ( ) ;
xdslb_policy - > StartBalancerCallLocked ( ) ;
// Fall through.
case GRPC_CHANNEL_SHUTDOWN :
done :
xdslb_policy - > watching_lb_channel_ = false ;
xdslb_policy - > Unref ( DEBUG_LOCATION ,
" watch_lb_channel_connectivity_cb_shutdown " ) ;
}
}
//
// code for interacting with the child policy
//
@ -1307,11 +1303,14 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER ) ,
1 ) ,
// Inhibit client-side health checking, since the balancer does
// this for us.
grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_INHIBIT_HEALTH_CHECKING ) , 1 ) ,
} ;
grpc_channel_args * args = grpc_channel_args_copy_and_add_and_remove (
return grpc_channel_args_copy_and_add_and_remove (
args_ , keys_to_remove , GPR_ARRAY_SIZE ( keys_to_remove ) , args_to_add ,
GPR_ARRAY_SIZE ( args_to_add ) ) ;
return args ;
}
void XdsLb : : CreateChildPolicyLocked ( const char * name , Args args ) {
@ -1367,18 +1366,6 @@ class XdsFactory : public LoadBalancingPolicyFactory {
public :
OrphanablePtr < LoadBalancingPolicy > CreateLoadBalancingPolicy (
LoadBalancingPolicy : : Args args ) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const ServerAddressList * addresses =
FindServerAddressListChannelArg ( args . args ) ;
if ( addresses = = nullptr ) return nullptr ;
bool found_balancer_address = false ;
for ( size_t i = 0 ; i < addresses - > size ( ) ; + + i ) {
if ( ( * addresses ) [ i ] . IsBalancer ( ) ) {
found_balancer_address = true ;
break ;
}
}
if ( ! found_balancer_address ) return nullptr ;
return OrphanablePtr < LoadBalancingPolicy > ( New < XdsLb > ( std : : move ( args ) ) ) ;
}