@ -26,30 +26,26 @@
/// channel that uses pick_first to select from the list of balancer
/// addresses.
///
/// The first time the policy gets a request for a pick, a ping, or to exit
/// the idle state, \a StartPickingLocked() is called. This method is
/// responsible for instantiating the internal *streaming* call to the LB
/// server (whichever address pick_first chose). The call will be complete
/// when either the balancer sends status or when we cancel the call (e.g.,
/// because we are shutting down). In needed, we retry the call. If we
/// received at least one valid message from the server, a new call attempt
/// will be made immediately; otherwise, we apply back-off delays between
/// attempts.
/// The first time the xDS policy gets a request for a pick or to exit the idle
/// state, \a StartPickingLocked() is called. This method is responsible for
/// instantiating the internal *streaming* call to the LB server (whichever
/// address pick_first chose). The call will be complete when either the
/// balancer sends status or when we cancel the call (e.g., because we are
/// shutting down). In needed, we retry the call. If we received at least one
/// valid message from the server, a new call attempt will be made immediately;
/// otherwise, we apply back-off delays between attempts.
///
/// We maintain an internal round_robin policy instance for distributing
/// We maintain an internal child policy (round_robin) instance for distributing
/// requests across backends. Whenever we receive a new serverlist from
/// the balancer, we update the round_robin policy with the new list of
/// addresses. If we cannot communicate with the balancer on startup,
/// however, we may enter fallback mode, in which case we will populate
/// the RR policy's addresses from the backend addresses returned by the
/// resolver.
/// the balancer, we update the child policy with the new list of
/// addresses.
///
/// Once an RR policy instance is in place (and getting updated as described),
/// calls for a pick, a ping , or a cancellation will be serviced right
/// away by forwarding them to the RR instance. Any time there's no RR
/// policy available (i.e., right after the creation of the gRPCLB policy),
/// pick and ping requests are added to a list of pending picks and ping s
/// to be flushed and serviced when the RR policy instance becomes available.
/// Once a child policy instance is in place (and getting updated as
/// described), calls for a pick, or a cancellation will be serviced right away
/// by forwarding them to the child policy instance. Any time there's no child
/// policy available (i.e., right after the creation of the xDS policy), pick
/// requests are added to a list of pending picks to be flushed and serviced
/// when the child policy instance becomes available.
///
/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
/// high level design and details.
@ -141,10 +137,10 @@ class XdsLb : public LoadBalancingPolicy {
private :
/// Linked list of pending pick requests. It stores all information needed to
/// eventually call (Round Robin's) pick() on them. They mainly stay pending
/// waiting for the RR policy to be created.
/// eventually call pick() on them. They mainly stay pending waiting for the
/// child policy to be created.
///
/// Note that when a pick is sent to the RR policy, we inject our own
/// Note that when a pick is sent to the child policy, we inject our own
/// on_complete callback, so that we can intercept the result before
/// invoking the original on_complete callback. This allows us to set the
/// LB token metadata and add client_stats to the call context.
@ -266,18 +262,18 @@ class XdsLb : public LoadBalancingPolicy {
void AddPendingPick ( PendingPick * pp ) ;
static void OnPendingPickComplete ( void * arg , grpc_error * error ) ;
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobin PolicyLocked ( ) ;
grpc_channel_args * CreateRoundRobin PolicyArgsLocked ( ) ;
void CreateRoundRobin PolicyLocked ( const Args & args ) ;
bool PickFromRoundRobin PolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) ;
void UpdateConnectivityStateFromRoundRobin PolicyLocked (
grpc_error * rr _state_error) ;
static void OnRoundRobin ConnectivityChangedLocked ( void * arg ,
grpc_error * error ) ;
static void OnRoundRobin RequestReresolutionLocked ( void * arg ,
grpc_error * error ) ;
// Methods for dealing with the child policy.
void CreateOrUpdateChild PolicyLocked ( ) ;
grpc_channel_args * CreateChild PolicyArgsLocked ( ) ;
void CreateChild PolicyLocked ( const Args & args ) ;
bool PickFromChild PolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) ;
void UpdateConnectivityStateFromChild PolicyLocked (
grpc_error * child _state_error) ;
static void OnChildPolicy ConnectivityChangedLocked ( void * arg ,
grpc_error * error ) ;
static void OnChildPolicy RequestReresolutionLocked ( void * arg ,
grpc_error * error ) ;
// Who the client is trying to communicate with.
const char * server_name_ = nullptr ;
@ -330,14 +326,14 @@ class XdsLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_ ;
grpc_closure lb_on_fallback_ ;
// Pending picks that are waiting on the RR policy's connectivity.
// Pending picks that are waiting on the xDS policy's connectivity.
PendingPick * pending_picks_ = nullptr ;
// The RR policy to use for the backends.
OrphanablePtr < LoadBalancingPolicy > rr _policy_;
grpc_connectivity_state rr _connectivity_state_;
grpc_closure on_rr _connectivity_changed_ ;
grpc_closure on_rr _request_reresolution_ ;
// The policy to use for the backends.
OrphanablePtr < LoadBalancingPolicy > child _policy_;
grpc_connectivity_state child _connectivity_state_;
grpc_closure on_child _connectivity_changed_ ;
grpc_closure on_child _request_reresolution_ ;
} ;
//
@ -444,7 +440,7 @@ grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) {
grpc_lb_addresses * lb_addresses =
grpc_lb_addresses_create ( num_valid , & lb_token_vtable ) ;
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world ) to be read by the RR policy during its creation .
* to the outside world ) to be read by the child policy during its creation .
* Given that the validity tests are very cheap , they are performed again
* instead of marking the valid ones during the first pass , as this would
* incurr in an allocation due to the arbitrary number of server */
@ -833,7 +829,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
// serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed.
xdslb_policy - > serverlist_ = serverlist ;
xdslb_policy - > CreateOrUpdateRoundRobin PolicyLocked ( ) ;
xdslb_policy - > CreateOrUpdateChild PolicyLocked ( ) ;
}
} else {
if ( grpc_lb_xds_trace . enabled ( ) ) {
@ -866,7 +862,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
& lb_calld - > lb_on_balancer_message_received_ ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
} else {
lb_calld - > Unref ( DEBUG_LOCATION , " on_message_received+grpclb _shutdown " ) ;
lb_calld - > Unref ( DEBUG_LOCATION , " on_message_received+xds _shutdown " ) ;
}
}
@ -944,7 +940,7 @@ grpc_lb_addresses* ExtractBalancerAddresses(
* - \ a addresses : corresponding to the balancers .
* - \ a response_generator : in order to propagate updates from the resolver
* above the grpclb policy .
* - \ a args : other args inherited from the grpclb policy . */
* - \ a args : other args inherited from the xds policy . */
grpc_channel_args * BuildBalancerChannelArgs (
const grpc_lb_addresses * addresses ,
FakeResolverResponseGenerator * response_generator ,
@ -966,10 +962,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy, as per the special case logic in client_channel.c.
// xds LB policy, as per the special case logic in client_channel.c.
GRPC_ARG_LB_ADDRESSES ,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// with the one from the xds policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR ,
// The LB channel should use the authority indicated by the target
@ -991,7 +987,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// address updates into the LB channel.
grpc_core : : FakeResolverResponseGenerator : : MakeChannelArg (
response_generator ) ,
// A channel arg indicating the target is a grpclb load balancer.
// A channel arg indicating the target is a xds load balancer.
grpc_channel_arg_integer_create (
const_cast < char * > ( GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER ) , 1 ) ,
// A channel arg indicating this is an internal channels, aka it is
@ -1031,11 +1027,11 @@ XdsLb::XdsLb(const grpc_lb_addresses* addresses,
GRPC_CLOSURE_INIT ( & lb_channel_on_connectivity_changed_ ,
& XdsLb : : OnBalancerChannelConnectivityChangedLocked , this ,
grpc_combiner_scheduler ( args . combiner ) ) ;
GRPC_CLOSURE_INIT ( & on_rr _connectivity_changed_ ,
& XdsLb : : OnRoundRobin ConnectivityChangedLocked , this ,
GRPC_CLOSURE_INIT ( & on_child _connectivity_changed_ ,
& XdsLb : : OnChildPolicy ConnectivityChangedLocked , this ,
grpc_combiner_scheduler ( args . combiner ) ) ;
GRPC_CLOSURE_INIT ( & on_rr _request_reresolution_ ,
& XdsLb : : OnRoundRobin RequestReresolutionLocked , this ,
GRPC_CLOSURE_INIT ( & on_child _request_reresolution_ ,
& XdsLb : : OnChildPolicy RequestReresolutionLocked , this ,
grpc_combiner_scheduler ( args . combiner ) ) ;
grpc_connectivity_state_init ( & state_tracker_ , GRPC_CHANNEL_IDLE , " xds " ) ;
// Record server name.
@ -1087,7 +1083,7 @@ void XdsLb::ShutdownLocked() {
if ( fallback_timer_callback_pending_ ) {
grpc_timer_cancel ( & lb_fallback_timer_ ) ;
}
rr _policy_. reset ( ) ;
child _policy_. reset ( ) ;
TryReresolutionLocked ( & grpc_lb_xds_trace , GRPC_ERROR_CANCELLED ) ;
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
@ -1100,7 +1096,7 @@ void XdsLb::ShutdownLocked() {
gpr_mu_unlock ( & lb_channel_mu_ ) ;
}
grpc_connectivity_state_set ( & state_tracker_ , GRPC_CHANNEL_SHUTDOWN ,
GRPC_ERROR_REF ( error ) , " grpclb _shutdown" ) ;
GRPC_ERROR_REF ( error ) , " xds _shutdown" ) ;
// Clear pending picks.
PendingPick * pp ;
while ( ( pp = pending_picks_ ) ! = nullptr ) {
@ -1133,13 +1129,13 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
// Cancel a specific pending pick.
//
// A grpclb pick progresses as follows:
// - If there's a Round Robin policy (rr_policy_) available, it'll be
// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
// that point onwards, it'll be RR's responsibility. For cancellations, that
// implies the pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy' s
// level (grpclb ), inside the pending_picks_ list. To cancel these,
// A pick progresses as follows:
// - If there's a child policy available, it'll be handed over to child policy
// (in CreateChildPolicyLocked()). From that point onwards, it'll be the
// child policy's responsibility. For cancellations, that implies the pick
// needs to be also cancelled by the child policy instance.
// - Otherwise, without a child policy instance, picks stay pending at thi s
// policy's level (xds ), inside the pending_picks_ list. To cancel these,
// we invoke the completion closure and set the pick's connected
// subchannel to nullptr right here.
void XdsLb : : CancelPickLocked ( PickState * pick , grpc_error * error ) {
@ -1159,21 +1155,21 @@ void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) {
}
pp = next ;
}
if ( rr _policy_ ! = nullptr ) {
rr _policy_- > CancelPickLocked ( pick , GRPC_ERROR_REF ( error ) ) ;
if ( child _policy_ ! = nullptr ) {
child _policy_- > CancelPickLocked ( pick , GRPC_ERROR_REF ( error ) ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
// Cancel all pending picks.
//
// A grpclb pick progresses as follows:
// - If there's a Round Robin policy (rr_policy_) available, it'll be
// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
// that point onwards, it'll be RR's responsibility. For cancellations, that
// implies the pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy' s
// level (grpclb ), inside the pending_picks_ list. To cancel these,
// A pick progresses as follows:
// - If there's a child policy available, it'll be handed over to child policy
// (in CreateChildPolicyLocked()). From that point onwards, it'll be the
// child policy's responsibility. For cancellations, that implies the pick
// needs to be also cancelled by the child policy instance.
// - Otherwise, without a child policy instance, picks stay pending at thi s
// policy's level (xds ), inside the pending_picks_ list. To cancel these,
// we invoke the completion closure and set the pick's connected
// subchannel to nullptr right here.
void XdsLb : : CancelMatchingPicksLocked ( uint32_t initial_metadata_flags_mask ,
@ -1195,10 +1191,10 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
}
pp = next ;
}
if ( rr _policy_ ! = nullptr ) {
rr _policy_- > CancelMatchingPicksLocked ( initial_metadata_flags_mask ,
initial_metadata_flags_eq ,
GRPC_ERROR_REF ( error ) ) ;
if ( child _policy_ ! = nullptr ) {
child _policy_- > CancelMatchingPicksLocked ( initial_metadata_flags_mask ,
initial_metadata_flags_eq ,
GRPC_ERROR_REF ( error ) ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
@ -1213,22 +1209,21 @@ void XdsLb::ResetBackoffLocked() {
if ( lb_channel_ ! = nullptr ) {
grpc_channel_reset_connect_backoff ( lb_channel_ ) ;
}
if ( rr _policy_ ! = nullptr ) {
rr _policy_- > ResetBackoffLocked ( ) ;
if ( child _policy_ ! = nullptr ) {
child _policy_- > ResetBackoffLocked ( ) ;
}
}
bool XdsLb : : PickLocked ( PickState * pick , grpc_error * * error ) {
PendingPick * pp = PendingPickCreate ( pick ) ;
bool pick_done = false ;
if ( rr _policy_ ! = nullptr ) {
if ( child _policy_ ! = nullptr ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] about to PICK from RR %p " , this ,
rr _policy_. get ( ) ) ;
gpr_log ( GPR_INFO , " [xdslb %p] about to PICK from policy %p " , this ,
child _policy_. get ( ) ) ;
}
pick_done =
PickFromRoundRobinPolicyLocked ( false /* force_async */ , pp , error ) ;
} else { // rr_policy_ == NULL
pick_done = PickFromChildPolicyLocked ( false /* force_async */ , pp , error ) ;
} else { // child_policy_ == NULL
if ( pick - > on_complete = = nullptr ) {
* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" No pick result available but synchronous result required. " ) ;
@ -1236,7 +1231,7 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
} else {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] No RR policy. Adding to grpclb 's pending picks " ,
" [xdslb %p] No child policy. Adding to xds 's pending picks " ,
this ) ;
}
AddPendingPick ( pp ) ;
@ -1251,8 +1246,8 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
void XdsLb : : FillChildRefsForChannelz ( channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
// delegate to the RoundRobin to fill the children subchannels.
rr _policy_- > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
// delegate to the child_policy_ to fill the children subchannels.
child _policy_- > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
MutexLock lock ( & lb_channel_mu_ ) ;
if ( lb_channel_ ! = nullptr ) {
grpc_core : : channelz : : ChannelNode * channel_node =
@ -1321,10 +1316,12 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
void XdsLb : : UpdateLocked ( const grpc_channel_args & args ) {
ProcessChannelArgsLocked ( args ) ;
// Note: We have disabled fallback mode in the code, so we don't need to
// handle fallback address changes.
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
// have been created from a serverlist.
// TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS.
if ( child_policy_ ! = nullptr ) CreateOrUpdateChildPolicyLocked ( ) ;
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if ( ! watching_lb_channel_ ) {
@ -1445,8 +1442,8 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
if ( xdslb_policy - > shutting_down_ ) goto done ;
// Re-initialize the lb_call. This should also take care of updating the
// embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received.
// child policy. Note that the current child policy, if any, will
// stay in effect until an update from the new lb_call is received.
switch ( xdslb_policy - > lb_channel_connectivity_ ) {
case GRPC_CHANNEL_CONNECTING :
case GRPC_CHANNEL_TRANSIENT_FAILURE : {
@ -1505,8 +1502,8 @@ void DestroyClientStats(void* arg) {
}
void XdsLb : : PendingPickSetMetadataAndContext ( PendingPick * pp ) {
/* if connected_subchannel is nullptr, no pick has been made by the RR
* policy ( e . g . , all addresses failed to connect ) . There won ' t be any
/* if connected_subchannel is nullptr, no pick has been made by the
* child policy ( e . g . , all addresses failed to connect ) . There won ' t be any
* user_data / token available */
if ( pp - > pick - > connected_subchannel ! = nullptr ) {
if ( GPR_LIKELY ( ! GRPC_MDISNULL ( pp - > lb_token ) ) ) {
@ -1532,8 +1529,8 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
}
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance . We wrap this closure in
* order to unref the round robin instance upon its invocation */
* reference to its associated child policy instance . We wrap this closure in
* order to unref the child policy instance upon its invocation */
void XdsLb : : OnPendingPickComplete ( void * arg , grpc_error * error ) {
PendingPick * pp = static_cast < PendingPick * > ( arg ) ;
PendingPickSetMetadataAndContext ( pp ) ;
@ -1558,24 +1555,24 @@ void XdsLb::AddPendingPick(PendingPick* pp) {
}
//
// code for interacting with the RR policy
// code for interacting with the child policy
//
// Performs a pick over \a rr _policy_. Given that a pick can return
// Performs a pick over \a child _policy_. Given that a pick can return
// immediately (ignoring its completion callback), we need to perform the
// cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately.
bool XdsLb : : PickFromRoundRobin PolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) {
bool XdsLb : : PickFromChild PolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) {
// Set client_stats and user_data.
if ( lb_calld_ ! = nullptr & & lb_calld_ - > client_stats ( ) ! = nullptr ) {
pp - > client_stats = lb_calld_ - > client_stats ( ) - > Ref ( ) ;
}
GPR_ASSERT ( pp - > pick - > user_data = = nullptr ) ;
pp - > pick - > user_data = ( void * * ) & pp - > lb_token ;
// Pick via the RR policy.
bool pick_done = rr _policy_- > PickLocked ( pp - > pick , error ) ;
// Pick via the child policy.
bool pick_done = child _policy_- > PickLocked ( pp - > pick , error ) ;
if ( pick_done ) {
PendingPickSetMetadataAndContext ( pp ) ;
if ( force_async ) {
@ -1586,57 +1583,59 @@ bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
Delete ( pp ) ;
}
// else, the pending pick will be registered and taken care of by the
// pending pick list inside the RR policy. Eventually,
// pending pick list inside the child policy. Eventually,
// OnPendingPickComplete() will be called, which will (among other
// things) add the LB token to the call's initial metadata.
return pick_done ;
}
void XdsLb : : CreateRoundRobin PolicyLocked ( const Args & args ) {
GPR_ASSERT ( rr _policy_ = = nullptr ) ;
rr _policy_ = LoadBalancingPolicyRegistry : : CreateLoadBalancingPolicy (
void XdsLb : : CreateChild PolicyLocked ( const Args & args ) {
GPR_ASSERT ( child _policy_ = = nullptr ) ;
child _policy_ = LoadBalancingPolicyRegistry : : CreateLoadBalancingPolicy (
" round_robin " , args ) ;
if ( GPR_UNLIKELY ( rr _policy_ = = nullptr ) ) {
gpr_log ( GPR_ERROR , " [xdslb %p] Failure creating a RoundRobin policy " , this ) ;
if ( GPR_UNLIKELY ( child _policy_ = = nullptr ) ) {
gpr_log ( GPR_ERROR , " [xdslb %p] Failure creating a child policy " , this ) ;
return ;
}
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
auto self = Ref ( DEBUG_LOCATION , " on_rr _reresolution_requested " ) ;
auto self = Ref ( DEBUG_LOCATION , " on_child _reresolution_requested " ) ;
self . release ( ) ;
rr_policy_ - > SetReresolutionClosureLocked ( & on_rr_request_reresolution_ ) ;
grpc_error * rr_state_error = nullptr ;
rr_connectivity_state_ = rr_policy_ - > CheckConnectivityLocked ( & rr_state_error ) ;
// Connectivity state is a function of the RR policy updated/created.
UpdateConnectivityStateFromRoundRobinPolicyLocked ( rr_state_error ) ;
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created RR policy. This will make the RR policy progress upon activity on
// gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set ( rr_policy_ - > interested_parties ( ) ,
child_policy_ - > SetReresolutionClosureLocked ( & on_child_request_reresolution_ ) ;
grpc_error * child_state_error = nullptr ;
child_connectivity_state_ =
child_policy_ - > CheckConnectivityLocked ( & child_state_error ) ;
// Connectivity state is a function of the child policy updated/created.
UpdateConnectivityStateFromChildPolicyLocked ( child_state_error ) ;
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set ( child_policy_ - > interested_parties ( ) ,
interested_parties ( ) ) ;
// Subscribe to changes to the connectivity of the new RR .
// Subscribe to changes to the connectivity of the new child policy .
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
self = Ref ( DEBUG_LOCATION , " on_rr _connectivity_changed " ) ;
self = Ref ( DEBUG_LOCATION , " on_child _connectivity_changed " ) ;
self . release ( ) ;
rr _policy_- > NotifyOnStateChangeLocked ( & rr _connectivity_state_,
& on_rr _connectivity_changed_) ;
rr _policy_- > ExitIdleLocked ( ) ;
// Send pending picks to RR policy.
child _policy_- > NotifyOnStateChangeLocked ( & child _connectivity_state_,
& on_child _connectivity_changed_ ) ;
child _policy_- > ExitIdleLocked ( ) ;
// Send pending picks to child policy.
PendingPick * pp ;
while ( ( pp = pending_picks_ ) ) {
pending_picks_ = pp - > next ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [xdslb %p] Pending pick about to (async) PICK from RR %p " , this ,
rr_policy_ . get ( ) ) ;
gpr_log (
GPR_INFO ,
" [xdslb %p] Pending pick about to (async) PICK from child policy %p " ,
this , child_policy_ . get ( ) ) ;
}
grpc_error * error = GRPC_ERROR_NONE ;
PickFromRoundRobin PolicyLocked ( true /* force_async */ , pp , & error ) ;
PickFromChild PolicyLocked ( true /* force_async */ , pp , & error ) ;
}
}
grpc_channel_args * XdsLb : : CreateRoundRobin PolicyArgsLocked ( ) {
grpc_channel_args * XdsLb : : CreateChild PolicyArgsLocked ( ) {
grpc_lb_addresses * addresses ;
bool is_backend_from_grpclb_load_balancer = false ;
// This should never be invoked if we do not have serverlist_, as fallback
@ -1664,66 +1663,67 @@ grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() {
return args ;
}
void XdsLb : : CreateOrUpdateRoundRobin PolicyLocked ( ) {
void XdsLb : : CreateOrUpdateChild PolicyLocked ( ) {
if ( shutting_down_ ) return ;
grpc_channel_args * args = CreateRoundRobin PolicyArgsLocked ( ) ;
grpc_channel_args * args = CreateChild PolicyArgsLocked ( ) ;
GPR_ASSERT ( args ! = nullptr ) ;
if ( rr _policy_ ! = nullptr ) {
if ( child _policy_ ! = nullptr ) {
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Updating RR policy %p " , this ,
rr _policy_. get ( ) ) ;
gpr_log ( GPR_INFO , " [xdslb %p] Updating the child policy %p " , this ,
child _policy_. get ( ) ) ;
}
rr _policy_- > UpdateLocked ( * args ) ;
child _policy_- > UpdateLocked ( * args ) ;
} else {
LoadBalancingPolicy : : Args lb_policy_args ;
lb_policy_args . combiner = combiner ( ) ;
lb_policy_args . client_channel_factory = client_channel_factory ( ) ;
lb_policy_args . args = args ;
CreateRoundRobin PolicyLocked ( lb_policy_args ) ;
CreateChild PolicyLocked ( lb_policy_args ) ;
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [xdslb %p] Created new RR policy %p " , this ,
rr _policy_. get ( ) ) ;
gpr_log ( GPR_INFO , " [xdslb %p] Created a new child policy %p " , this ,
child _policy_. get ( ) ) ;
}
}
grpc_channel_args_destroy ( args ) ;
}
void XdsLb : : OnRoundRobin RequestReresolutionLocked ( void * arg ,
grpc_error * error ) {
void XdsLb : : OnChildPolicy RequestReresolutionLocked ( void * arg ,
grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
if ( xdslb_policy - > shutting_down_ | | error ! = GRPC_ERROR_NONE ) {
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_rr _reresolution_requested " ) ;
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_child _reresolution_requested " ) ;
return ;
}
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log (
GPR_INFO ,
" [xdslb %p] Re-resolution requested from the internal RR policy (%p)." ,
xdslb_policy , xdslb_policy - > rr _policy_. get ( ) ) ;
gpr_log ( GPR_INFO ,
" [xdslb %p] Re-resolution requested from child policy "
" (%p). " ,
xdslb_policy , xdslb_policy - > child _policy_. get ( ) ) ;
}
// If we are talking to a balancer, we expect to get updated addresses form
// the balancer, so we can ignore the re-resolution request from the RR
// policy. Otherwise, handle the re-resolution request using the
// grpclb policy's original re-resolution closure.
// the balancer, so we can ignore the re-resolution request from the child
// policy.
// Otherwise, handle the re-resolution request using the xds policy's
// original re-resolution closure.
if ( xdslb_policy - > lb_calld_ = = nullptr | |
! xdslb_policy - > lb_calld_ - > seen_initial_response ( ) ) {
xdslb_policy - > TryReresolutionLocked ( & grpc_lb_xds_trace , GRPC_ERROR_NONE ) ;
}
// Give back the wrapper closure to the RR policy.
xdslb_policy - > rr _policy_- > SetReresolutionClosureLocked (
& xdslb_policy - > on_rr _request_reresolution_ ) ;
// Give back the wrapper closure to the child policy.
xdslb_policy - > child _policy_- > SetReresolutionClosureLocked (
& xdslb_policy - > on_child _request_reresolution_ ) ;
}
void XdsLb : : UpdateConnectivityStateFromRoundRobin PolicyLocked (
grpc_error * rr _state_error) {
void XdsLb : : UpdateConnectivityStateFromChild PolicyLocked (
grpc_error * child _state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check ( & state_tracker_ ) ;
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy .
* input coming from the status of the child policy .
*
* current state ( grpclb ' s )
* current state ( xds ' s )
* |
* v | | I | C | R | TF | SD | < - new state ( RR ' s )
* v | | I | C | R | TF | SD | < - new state ( child policy ' s )
* = = = + + = = = = + = = = = = + = = = = = + = = = = = = + = = = = = = +
* I | | I | C | R | [ I ] | [ I ] |
* - - - + + - - - - + - - - - - + - - - - - + - - - - - - + - - - - - - +
@ -1736,52 +1736,51 @@ void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
* SD | | NA | NA | NA | NA | NA | ( * )
* - - - + + - - - - + - - - - - + - - - - - + - - - - - - + - - - - - - +
*
* A [ STATE ] indicates that the old RR policy is kept . In those cases , STATE
* is the current state of grpclb , which is left untouched .
* A [ STATE ] indicates that the old child policy is kept . In those cases ,
* STATE is the current state of xds , which is left untouched .
*
* In summary , if the new state is TRANSIENT_FAILURE or SHUTDOWN , stick to
* the previous RR instance .
* the previous child policy instance .
*
* Note that the status is never updated to SHUTDOWN as a result of calling
* this function . Only glb_shutdown ( ) has the power to set that state .
*
* ( * ) This function mustn ' t be called during shutting down . */
GPR_ASSERT ( curr_glb_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
switch ( rr _connectivity_state_) {
switch ( child _connectivity_state_) {
case GRPC_CHANNEL_TRANSIENT_FAILURE :
case GRPC_CHANNEL_SHUTDOWN :
GPR_ASSERT ( rr _state_error ! = GRPC_ERROR_NONE ) ;
GPR_ASSERT ( child _state_error ! = GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHANNEL_IDLE :
case GRPC_CHANNEL_CONNECTING :
case GRPC_CHANNEL_READY :
GPR_ASSERT ( rr _state_error = = GRPC_ERROR_NONE ) ;
GPR_ASSERT ( child _state_error = = GRPC_ERROR_NONE ) ;
}
if ( grpc_lb_xds_trace . enabled ( ) ) {
gpr_log (
GPR_INFO ,
" [xdslb %p] Setting grpclb's state to %s from new RR policy %p state. " ,
this , grpc_connectivity_state_name ( rr_connectivity_state_ ) ,
rr_policy_ . get ( ) ) ;
gpr_log ( GPR_INFO ,
" [xdslb %p] Setting xds's state to %s from child policy %p state. " ,
this , grpc_connectivity_state_name ( child_connectivity_state_ ) ,
child_policy_ . get ( ) ) ;
}
grpc_connectivity_state_set ( & state_tracker_ , rr _connectivity_state_,
rr _state_error,
grpc_connectivity_state_set ( & state_tracker_ , child _connectivity_state_,
child _state_error,
" update_lb_connectivity_status_locked " ) ;
}
void XdsLb : : OnRoundRobin ConnectivityChangedLocked ( void * arg ,
grpc_error * error ) {
void XdsLb : : OnChildPolicy ConnectivityChangedLocked ( void * arg ,
grpc_error * error ) {
XdsLb * xdslb_policy = static_cast < XdsLb * > ( arg ) ;
if ( xdslb_policy - > shutting_down_ ) {
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_rr _connectivity_changed " ) ;
xdslb_policy - > Unref ( DEBUG_LOCATION , " on_child _connectivity_changed " ) ;
return ;
}
xdslb_policy - > UpdateConnectivityStateFromRoundRobin PolicyLocked (
xdslb_policy - > UpdateConnectivityStateFromChild PolicyLocked (
GRPC_ERROR_REF ( error ) ) ;
// Resubscribe. Reuse the "on_rr _connectivity_changed" ref.
xdslb_policy - > rr _policy_- > NotifyOnStateChangeLocked (
& xdslb_policy - > rr _connectivity_state_,
& xdslb_policy - > on_rr _connectivity_changed_ ) ;
// Resubscribe. Reuse the "on_child _connectivity_changed" ref.
xdslb_policy - > child _policy_- > NotifyOnStateChangeLocked (
& xdslb_policy - > child _connectivity_state_,
& xdslb_policy - > on_child _connectivity_changed_ ) ;
}
//