@ -73,23 +73,127 @@ class RoundRobin : public LoadBalancingPolicy {
private :
~ RoundRobin ( ) ;
void ShutdownLocked ( ) override ;
// Forward declaration.
class RoundRobinSubchannelList ;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks user_data associated with each address, which will be
// returned along with picks that select the subchannel.
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RoundRobinSubchannelData
: public SubchannelData < RoundRobinSubchannelList ,
RoundRobinSubchannelData > {
public :
RoundRobinSubchannelData ( RoundRobinSubchannelList * subchannel_list ,
const grpc_lb_user_data_vtable * user_data_vtable ,
const grpc_lb_address & address ,
grpc_subchannel * subchannel ,
grpc_combiner * combiner )
: SubchannelData ( subchannel_list , user_data_vtable , address , subchannel ,
combiner ) ,
user_data_vtable_ ( user_data_vtable ) ,
user_data_ ( user_data_vtable_ ! = nullptr
? user_data_vtable_ - > copy ( address . user_data )
: nullptr ) { }
void UnrefSubchannelLocked ( const char * reason ) override {
SubchannelData : : UnrefSubchannelLocked ( reason ) ;
if ( user_data_ ! = nullptr ) {
GPR_ASSERT ( user_data_vtable_ ! = nullptr ) ;
user_data_vtable_ - > destroy ( user_data_ ) ;
user_data_ = nullptr ;
}
}
void StartPickingLocked ( ) ;
size_t GetNextReadySubchannelIndexLocked ( ) ;
void UpdateLastReadySubchannelIndexLocked ( size_t last_ready_index ) ;
void UpdateConnectivityStatusLocked ( grpc_lb_subchannel_data * sd ,
grpc_error * error ) ;
void * user_data ( ) const { return user_data_ ; }
grpc_connectivity_state connectivity_state ( ) const {
return last_connectivity_state_ ;
}
static void OnConnectivityChangedLocked ( void * arg , grpc_error * error ) ;
void UpdateConnectivityStateLocked (
grpc_connectivity_state connectivity_state , grpc_error * error ) ;
private :
void ProcessConnectivityChangeLocked (
grpc_connectivity_state connectivity_state , grpc_error * error ) override ;
const grpc_lb_user_data_vtable * user_data_vtable_ ;
void * user_data_ = nullptr ;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE ;
} ;
// A list of subchannels.
class RoundRobinSubchannelList
: public SubchannelList < RoundRobinSubchannelList ,
RoundRobinSubchannelData > {
public :
RoundRobinSubchannelList (
RoundRobin * policy , TraceFlag * tracer ,
const grpc_lb_addresses * addresses , grpc_combiner * combiner ,
grpc_client_channel_factory * client_channel_factory ,
const grpc_channel_args & args )
: SubchannelList ( policy , tracer , addresses , combiner ,
client_channel_factory , args ) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy - > Ref ( DEBUG_LOCATION , " subchannel_list " ) . release ( ) ;
}
void SubchannelListRefForConnectivityWatch (
grpc_lb_subchannel_list * subchannel_list , const char * reason ) ;
void SubchannelListUnrefForConnectivityWatch (
grpc_lb_subchannel_list * subchannel_list , const char * reason ) ;
~ RoundRobinSubchannelList ( ) {
GRPC_ERROR_UNREF ( last_transient_failure_error_ ) ;
RoundRobin * p = static_cast < RoundRobin * > ( policy ( ) ) ;
p - > Unref ( DEBUG_LOCATION , " subchannel_list " ) ;
}
// Starts watching the subchannels in this list.
void StartWatchingLocked ( ) ;
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
// transient_failure_error is the error that is reported when
// new_state is TRANSIENT_FAILURE.
void UpdateStateCountersLocked ( grpc_connectivity_state old_state ,
grpc_connectivity_state new_state ,
grpc_error * transient_failure_error ) ;
// If this subchannel list is the RR policy's current subchannel
// list, updates the RR policy's connectivity state based on the
// subchannel list's state counters.
void MaybeUpdateRoundRobinConnectivityStateLocked ( ) ;
// Updates the RR policy's overall state based on the counters of
// subchannels in each state.
void UpdateRoundRobinStateFromSubchannelStateCountsLocked ( ) ;
size_t GetNextReadySubchannelIndexLocked ( ) ;
void UpdateLastReadySubchannelIndexLocked ( size_t last_ready_index ) ;
private :
size_t num_ready_ = 0 ;
size_t num_connecting_ = 0 ;
size_t num_transient_failure_ = 0 ;
grpc_error * last_transient_failure_error_ = GRPC_ERROR_NONE ;
size_t last_ready_index_ = - 1 ; // Index into list of last pick.
} ;
void ShutdownLocked ( ) override ;
void StartPickingLocked ( ) ;
bool DoPickLocked ( PickState * pick ) ;
void DrainPendingPicksLocked ( ) ;
/** list of subchannels */
grpc_lb_subchannel_list * subchannel_list_ = nullptr ;
OrphanablePtr < RoundRobinSubchannelList > subchannel_list_ ;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \ a latest_pending_subchannel_list . In other words ,
* racing callbacks that reference outdated subchannel lists won ' t perform any
* update . */
OrphanablePtr < RoundRobinSubchannelList > latest_pending_subchannel_list_ ;
/** have we started picking? */
bool started_picking_ = false ;
/** are we shutting down? */
@ -98,14 +202,6 @@ class RoundRobin : public LoadBalancingPolicy {
PickState * pending_picks_ = nullptr ;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_ ;
/** Index into subchannels for last pick. */
size_t last_ready_subchannel_index_ = 0 ;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \ a latest_pending_subchannel_list . In other words ,
* racing callbacks that reference outdated subchannel lists won ' t perform any
* update . */
grpc_lb_subchannel_list * latest_pending_subchannel_list_ = nullptr ;
} ;
RoundRobin : : RoundRobin ( const Args & args ) : LoadBalancingPolicy ( args ) {
@ -114,15 +210,15 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
" round_robin " ) ;
UpdateLocked ( * args . args ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] Created with % " PRIuPTR " subchannels " , this ,
subchannel_list_ - > num_subchannels ) ;
gpr_log ( GPR_INFO , " [RR %p] Created with % " PRIuPTR " subchannels " , this ,
subchannel_list_ - > num_subchannels ( ) ) ;
}
grpc_subchannel_index_ref ( ) ;
}
RoundRobin : : ~ RoundRobin ( ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] Destroying Round Robin policy " , this ) ;
gpr_log ( GPR_INFO , " [RR %p] Destroying Round Robin policy " , this ) ;
}
GPR_ASSERT ( subchannel_list_ = = nullptr ) ;
GPR_ASSERT ( latest_pending_subchannel_list_ = = nullptr ) ;
@ -131,68 +227,6 @@ RoundRobin::~RoundRobin() {
grpc_subchannel_index_unref ( ) ;
}
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state , or p - > subchannel_list - > num_subchannels if no
* subchannel is READY .
*
* Note that this function does * not * update p - > last_ready_subchannel_index .
* The caller must do that if it returns a pick . */
size_t RoundRobin : : GetNextReadySubchannelIndexLocked ( ) {
GPR_ASSERT ( subchannel_list_ ! = nullptr ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] getting next ready subchannel (out of % " PRIuPTR
" ), "
" last_ready_subchannel_index=% " PRIuPTR ,
this , subchannel_list_ - > num_subchannels ,
last_ready_subchannel_index_ ) ;
}
for ( size_t i = 0 ; i < subchannel_list_ - > num_subchannels ; + + i ) {
const size_t index = ( i + last_ready_subchannel_index_ + 1 ) %
subchannel_list_ - > num_subchannels ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log (
GPR_DEBUG ,
" [RR %p] checking subchannel %p, subchannel_list %p, index % " PRIuPTR
" : state=%s " ,
this , subchannel_list_ - > subchannels [ index ] . subchannel ,
subchannel_list_ , index ,
grpc_connectivity_state_name (
subchannel_list_ - > subchannels [ index ] . curr_connectivity_state ) ) ;
}
if ( subchannel_list_ - > subchannels [ index ] . curr_connectivity_state = =
GRPC_CHANNEL_READY ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG ,
" [RR %p] found next ready subchannel (%p) at index % " PRIuPTR
" of subchannel_list %p " ,
this , subchannel_list_ - > subchannels [ index ] . subchannel , index ,
subchannel_list_ ) ;
}
return index ;
}
}
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] no subchannels in ready state " , this ) ;
}
return subchannel_list_ - > num_subchannels ;
}
// Sets last_ready_subchannel_index_ to last_ready_index.
void RoundRobin : : UpdateLastReadySubchannelIndexLocked ( size_t last_ready_index ) {
GPR_ASSERT ( last_ready_index < subchannel_list_ - > num_subchannels ) ;
last_ready_subchannel_index_ = last_ready_index ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG ,
" [RR %p] setting last_ready_subchannel_index=% " PRIuPTR
" (SC %p, CSC %p) " ,
this , last_ready_index ,
subchannel_list_ - > subchannels [ last_ready_index ] . subchannel ,
subchannel_list_ - > subchannels [ last_ready_index ]
. connected_subchannel . get ( ) ) ;
}
}
void RoundRobin : : HandOffPendingPicksLocked ( LoadBalancingPolicy * new_policy ) {
PickState * pick ;
while ( ( pick = pending_picks_ ) ! = nullptr ) {
@ -207,7 +241,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
void RoundRobin : : ShutdownLocked ( ) {
grpc_error * error = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Channel shutdown " ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] Shutting down " , this ) ;
gpr_log ( GPR_INFO , " [RR %p] Shutting down " , this ) ;
}
shutdown_ = true ;
PickState * pick ;
@ -218,16 +252,8 @@ void RoundRobin::ShutdownLocked() {
}
grpc_connectivity_state_set ( & state_tracker_ , GRPC_CHANNEL_SHUTDOWN ,
GRPC_ERROR_REF ( error ) , " rr_shutdown " ) ;
if ( subchannel_list_ ! = nullptr ) {
grpc_lb_subchannel_list_shutdown_and_unref ( subchannel_list_ ,
" sl_shutdown_rr_shutdown " ) ;
subchannel_list_ = nullptr ;
}
if ( latest_pending_subchannel_list_ ! = nullptr ) {
grpc_lb_subchannel_list_shutdown_and_unref (
latest_pending_subchannel_list_ , " sl_shutdown_pending_rr_shutdown " ) ;
latest_pending_subchannel_list_ = nullptr ;
}
subchannel_list_ . reset ( ) ;
latest_pending_subchannel_list_ . reset ( ) ;
TryReresolutionLocked ( & grpc_lb_round_robin_trace , GRPC_ERROR_CANCELLED ) ;
GRPC_ERROR_UNREF ( error ) ;
}
@ -273,70 +299,59 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
GRPC_ERROR_UNREF ( error ) ;
}
void RoundRobin : : SubchannelListRefForConnectivityWatch (
grpc_lb_subchannel_list * subchannel_list , const char * reason ) {
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is ready and the subchannel_list code has been
// converted to a C++ API, find a way to hold the RefCountedPtr<>
// somewhere (maybe in the subchannel_data object) instead of doing
// this manually.
auto self = Ref ( DEBUG_LOCATION , reason ) ;
self . release ( ) ;
grpc_lb_subchannel_list_ref ( subchannel_list , reason ) ;
void RoundRobin : : StartPickingLocked ( ) {
started_picking_ = true ;
subchannel_list_ - > StartWatchingLocked ( ) ;
}
void RoundRobin : : SubchannelListUnrefForConnectivityWatch (
grpc_lb_subchannel_list * subchannel_list , const char * reason ) {
Unref ( DEBUG_LOCATION , reason ) ;
grpc_lb_subchannel_list_unref ( subchannel_list , reason ) ;
void RoundRobin : : ExitIdleLocked ( ) {
if ( ! started_picking_ ) {
StartPickingLocked ( ) ;
}
}
void RoundRobin : : StartPickingLocked ( ) {
started_picking_ = true ;
for ( size_t i = 0 ; i < subchannel_list_ - > num_subchannels ; i + + ) {
if ( subchannel_list_ - > subchannels [ i ] . subchannel ! = nullptr ) {
SubchannelListRefForConnectivityWatch ( subchannel_list_ ,
" connectivity_watch " ) ;
grpc_lb_subchannel_data_start_connectivity_watch (
& subchannel_list_ - > subchannels [ i ] ) ;
bool RoundRobin : : DoPickLocked ( PickState * pick ) {
const size_t next_ready_index =
subchannel_list_ - > GetNextReadySubchannelIndexLocked ( ) ;
if ( next_ready_index < subchannel_list_ - > num_subchannels ( ) ) {
/* readily available, report right away */
RoundRobinSubchannelData * sd =
subchannel_list_ - > subchannel ( next_ready_index ) ;
GPR_ASSERT ( sd - > connected_subchannel ( ) ! = nullptr ) ;
pick - > connected_subchannel = sd - > connected_subchannel ( ) - > Ref ( ) ;
if ( pick - > user_data ! = nullptr ) {
* pick - > user_data = sd - > user_data ( ) ;
}
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
" index % " PRIuPTR " ) " ,
this , sd - > subchannel ( ) , pick - > connected_subchannel . get ( ) ,
sd - > subchannel_list ( ) , next_ready_index ) ;
}
/* only advance the last picked pointer if the selection was used */
subchannel_list_ - > UpdateLastReadySubchannelIndexLocked ( next_ready_index ) ;
return true ;
}
return false ;
}
void RoundRobin : : ExitIdleLocked ( ) {
if ( ! started_picking_ ) {
StartPickingLocked ( ) ;
void RoundRobin : : DrainPendingPicksLocked ( ) {
PickState * pick ;
while ( ( pick = pending_picks_ ) ) {
pending_picks_ = pick - > next ;
GPR_ASSERT ( DoPickLocked ( pick ) ) ;
GRPC_CLOSURE_SCHED ( pick - > on_complete , GRPC_ERROR_NONE ) ;
}
}
bool RoundRobin : : PickLocked ( PickState * pick ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] Trying to pick (shutdown: %d) " , this ,
shutdown_ ) ;
gpr_log ( GPR_INFO , " [RR %p] Trying to pick (shutdown: %d) " , this , shutdown_ ) ;
}
GPR_ASSERT ( ! shutdown_ ) ;
if ( subchannel_list_ ! = nullptr ) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked ( ) ;
if ( next_ready_index < subchannel_list_ - > num_subchannels ) {
/* readily available, report right away */
grpc_lb_subchannel_data * sd =
& subchannel_list_ - > subchannels [ next_ready_index ] ;
pick - > connected_subchannel = sd - > connected_subchannel ;
if ( pick - > user_data ! = nullptr ) {
* pick - > user_data = sd - > user_data ;
}
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log (
GPR_DEBUG ,
" [RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
" index % " PRIuPTR " ) " ,
this , sd - > subchannel , pick - > connected_subchannel . get ( ) ,
sd - > subchannel_list , next_ready_index ) ;
}
/* only advance the last picked pointer if the selection was used */
UpdateLastReadySubchannelIndexLocked ( next_ready_index ) ;
return true ;
}
if ( DoPickLocked ( pick ) ) return true ;
}
/* no pick currently available. Save for later in list of pending picks */
if ( ! started_picking_ ) {
@ -347,36 +362,62 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false ;
}
void UpdateStateCountersLocked ( grpc_lb_subchannel_data * sd ) {
grpc_lb_subchannel_list * subchannel_list = sd - > subchannel_list ;
GPR_ASSERT ( sd - > prev_connectivity_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
GPR_ASSERT ( sd - > curr_connectivity_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
if ( sd - > prev_connectivity_state = = GRPC_CHANNEL_READY ) {
GPR_ASSERT ( subchannel_list - > num_ready > 0 ) ;
- - subchannel_list - > num_ready ;
} else if ( sd - > prev_connectivity_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
GPR_ASSERT ( subchannel_list - > num_transient_failures > 0 ) ;
- - subchannel_list - > num_transient_failures ;
} else if ( sd - > prev_connectivity_state = = GRPC_CHANNEL_IDLE ) {
GPR_ASSERT ( subchannel_list - > num_idle > 0 ) ;
- - subchannel_list - > num_idle ;
void RoundRobin : : RoundRobinSubchannelList : : StartWatchingLocked ( ) {
if ( num_subchannels ( ) = = 0 ) return ;
// Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE
// state.
for ( size_t i = 0 ; i < num_subchannels ( ) ; + + i ) {
grpc_error * error = GRPC_ERROR_NONE ;
grpc_connectivity_state state =
subchannel ( i ) - > CheckConnectivityStateLocked ( & error ) ;
if ( state ! = GRPC_CHANNEL_IDLE ) {
subchannel ( i ) - > UpdateConnectivityStateLocked ( state , error ) ;
}
}
sd - > prev_connectivity_state = sd - > curr_connectivity_state ;
if ( sd - > curr_connectivity_state = = GRPC_CHANNEL_READY ) {
+ + subchannel_list - > num_ready ;
} else if ( sd - > curr_connectivity_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
+ + subchannel_list - > num_transient_failures ;
} else if ( sd - > curr_connectivity_state = = GRPC_CHANNEL_IDLE ) {
+ + subchannel_list - > num_idle ;
// Now set the LB policy's state based on the subchannels' states.
UpdateRoundRobinStateFromSubchannelStateCountsLocked ( ) ;
// Start connectivity watch for each subchannel.
for ( size_t i = 0 ; i < num_subchannels ( ) ; i + + ) {
if ( subchannel ( i ) - > subchannel ( ) ! = nullptr ) {
subchannel ( i ) - > StartConnectivityWatchLocked ( ) ;
}
}
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
* ( the grpc_lb_subchannel_data associated with the updated subchannel ) and the
* subchannel list \ a sd belongs to ( sd - > subchannel_list ) . \ a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE . */
void RoundRobin : : UpdateConnectivityStatusLocked ( grpc_lb_subchannel_data * sd ,
grpc_error * error ) {
void RoundRobin : : RoundRobinSubchannelList : : UpdateStateCountersLocked (
grpc_connectivity_state old_state , grpc_connectivity_state new_state ,
grpc_error * transient_failure_error ) {
GPR_ASSERT ( old_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
GPR_ASSERT ( new_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
if ( old_state = = GRPC_CHANNEL_READY ) {
GPR_ASSERT ( num_ready_ > 0 ) ;
- - num_ready_ ;
} else if ( old_state = = GRPC_CHANNEL_CONNECTING ) {
GPR_ASSERT ( num_connecting_ > 0 ) ;
- - num_connecting_ ;
} else if ( old_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
GPR_ASSERT ( num_transient_failure_ > 0 ) ;
- - num_transient_failure_ ;
}
if ( new_state = = GRPC_CHANNEL_READY ) {
+ + num_ready_ ;
} else if ( new_state = = GRPC_CHANNEL_CONNECTING ) {
+ + num_connecting_ ;
} else if ( new_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
+ + num_transient_failure_ ;
}
GRPC_ERROR_UNREF ( last_transient_failure_error_ ) ;
last_transient_failure_error_ = transient_failure_error ;
}
// Sets the RR policy's connectivity state based on the current
// subchannel list.
void RoundRobin : : RoundRobinSubchannelList : :
MaybeUpdateRoundRobinConnectivityStateLocked ( ) {
RoundRobin * p = static_cast < RoundRobin * > ( policy ( ) ) ;
// Only set connectivity state if this is the current subchannel list.
if ( p - > subchannel_list_ . get ( ) ! = this ) return ;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n , all previous rules were unfulfilled ) .
*
@ -391,155 +432,151 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
* CHECK : subchannel_list - > num_transient_failures = =
* subchannel_list - > num_subchannels .
*/
grpc_lb_subchannel_list * subchannel_list = sd - > subchannel_list ;
GPR_ASSERT ( sd - > curr_connectivity_state ! = GRPC_CHANNEL_IDLE ) ;
if ( subchannel_list - > num_ready > 0 ) {
if ( num_ready_ > 0 ) {
/* 1) READY */
grpc_connectivity_state_set ( & state_tracker_ , GRPC_CHANNEL_READY ,
grpc_connectivity_state_set ( & p - > state_tracker_ , GRPC_CHANNEL_READY ,
GRPC_ERROR_NONE , " rr_ready " ) ;
} else if ( sd - > curr_connectivity_state = = GRPC_CHANNEL_CONNECTING ) {
} else if ( num_connecting_ > 0 ) {
/* 2) CONNECTING */
grpc_connectivity_state_set ( & state_tracker_ , GRPC_CHANNEL_CONNECTING ,
grpc_connectivity_state_set ( & p - > state_tracker_ , GRPC_CHANNEL_CONNECTING ,
GRPC_ERROR_NONE , " rr_connecting " ) ;
} else if ( subchannel_list - > num_transient_failures = =
subchannel_list - > num_subchannels ) {
} else if ( num_transient_failure_ = = num_subchannels ( ) ) {
/* 3) TRANSIENT_FAILURE */
grpc_connectivity_state_set ( & state_tracker_ , GRPC_CHANNEL_TRANSIENT_FAILURE ,
GRPC_ERROR_REF ( error ) ,
grpc_connectivity_state_set ( & p - > state_tracker_ ,
GRPC_CHANNEL_TRANSIENT_FAILURE ,
GRPC_ERROR_REF ( last_transient_failure_error_ ) ,
" rr_exhausted_subchannels " ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
void RoundRobin : : OnConnectivityChangedLocked ( void * arg , grpc_error * error ) {
grpc_lb_subchannel_data * sd = static_cast < grpc_lb_subchannel_data * > ( arg ) ;
RoundRobin * p = static_cast < RoundRobin * > ( sd - > subchannel_list - > policy ) ;
void RoundRobin : : RoundRobinSubchannelList : :
UpdateRoundRobinStateFromSubchannelStateCountsLocked ( ) {
RoundRobin * p = static_cast < RoundRobin * > ( policy ( ) ) ;
if ( num_ready_ > 0 ) {
if ( p - > subchannel_list_ . get ( ) ! = this ) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because
// any previous update would have been shut down already and
// therefore we would not be receiving a notification for them.
GPR_ASSERT ( p - > latest_pending_subchannel_list_ . get ( ) = = this ) ;
GPR_ASSERT ( ! shutting_down ( ) ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
const size_t old_num_subchannels =
p - > subchannel_list_ ! = nullptr
? p - > subchannel_list_ - > num_subchannels ( )
: 0 ;
gpr_log ( GPR_INFO ,
" [RR %p] phasing out subchannel list %p (size % " PRIuPTR
" ) in favor of %p (size % " PRIuPTR " ) " ,
p , p - > subchannel_list_ . get ( ) , old_num_subchannels , this ,
num_subchannels ( ) ) ;
}
p - > subchannel_list_ = std : : move ( p - > latest_pending_subchannel_list_ ) ;
}
// Drain pending picks.
p - > DrainPendingPicksLocked ( ) ;
}
// Update the RR policy's connectivity state if needed.
MaybeUpdateRoundRobinConnectivityStateLocked ( ) ;
}
void RoundRobin : : RoundRobinSubchannelData : : UpdateConnectivityStateLocked (
grpc_connectivity_state connectivity_state , grpc_error * error ) {
RoundRobin * p = static_cast < RoundRobin * > ( subchannel_list ( ) - > policy ( ) ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log (
GPR_DEBUG ,
" [RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
" prev_state=%s new_state=%s p->shutdown=%d "
" sd->subchannel_list->shutting_down=%d error=%s " ,
p , sd - > subchannel , sd - > subchannel_list ,
grpc_connectivity_state_name ( sd - > prev_connectivity_state ) ,
grpc_connectivity_state_name ( sd - > pending_connectivity_state_unsafe ) ,
p - > shutdown_ , sd - > subchannel_list - > shutting_down ,
grpc_error_string ( error ) ) ;
}
GPR_ASSERT ( sd - > subchannel ! = nullptr ) ;
// If the policy is shutting down, unref and return.
if ( p - > shutdown_ ) {
grpc_lb_subchannel_data_stop_connectivity_watch ( sd ) ;
grpc_lb_subchannel_data_unref_subchannel ( sd , " rr_shutdown " ) ;
p - > SubchannelListUnrefForConnectivityWatch ( sd - > subchannel_list ,
" rr_shutdown " ) ;
return ;
GPR_INFO ,
" [RR %p] connectivity changed for subchannel %p, subchannel_list %p "
" (index % " PRIuPTR " of % " PRIuPTR " ): prev_state=%s new_state=%s " ,
p , subchannel ( ) , subchannel_list ( ) , Index ( ) ,
subchannel_list ( ) - > num_subchannels ( ) ,
grpc_connectivity_state_name ( last_connectivity_state_ ) ,
grpc_connectivity_state_name ( connectivity_state ) ) ;
}
subchannel_list ( ) - > UpdateStateCountersLocked ( last_connectivity_state_ ,
connectivity_state , error ) ;
last_connectivity_state_ = connectivity_state ;
}
void RoundRobin : : RoundRobinSubchannelData : : ProcessConnectivityChangeLocked (
grpc_connectivity_state connectivity_state , grpc_error * error ) {
RoundRobin * p = static_cast < RoundRobin * > ( subchannel_list ( ) - > policy ( ) ) ;
GPR_ASSERT ( subchannel ( ) ! = nullptr ) ;
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
if ( connectivity_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
" Requesting re-resolution " ,
p , subchannel ( ) ) ;
}
p - > TryReresolutionLocked ( & grpc_lb_round_robin_trace , GRPC_ERROR_NONE ) ;
}
// If the subchannel list is shutting down, stop watching.
if ( sd - > subchannel_list - > shutting_down | | error = = GRPC_ERROR_CANCELLED ) {
grpc_lb_subchannel_data_stop_connectivity_watch ( sd ) ;
grpc_lb_subchannel_data_unref_subchannel ( sd , " rr_sl_shutdown " ) ;
p - > SubchannelListUnrefForConnectivityWatch ( sd - > subchannel_list ,
" rr_sl_shutdown " ) ;
return ;
// Update state counters.
UpdateConnectivityStateLocked ( connectivity_state , error ) ;
// Update overall state and renew notification.
subchannel_list ( ) - > UpdateRoundRobinStateFromSubchannelStateCountsLocked ( ) ;
RenewConnectivityWatchLocked ( ) ;
}
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state , or p - > subchannel_list - > num_subchannels if no
* subchannel is READY .
*
* Note that this function does * not * update p - > last_ready_subchannel_index .
* The caller must do that if it returns a pick . */
size_t
RoundRobin : : RoundRobinSubchannelList : : GetNextReadySubchannelIndexLocked ( ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] getting next ready subchannel (out of % " PRIuPTR
" ), last_ready_index=% " PRIuPTR ,
policy ( ) , num_subchannels ( ) , last_ready_index_ ) ;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
GPR_ASSERT ( sd - > subchannel_list = = p - > subchannel_list_ | |
sd - > subchannel_list = = p - > latest_pending_subchannel_list_ ) ;
GPR_ASSERT ( sd - > pending_connectivity_state_unsafe ! = GRPC_CHANNEL_SHUTDOWN ) ;
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd - > curr_connectivity_state = sd - > pending_connectivity_state_unsafe ;
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
switch ( sd - > curr_connectivity_state ) {
case GRPC_CHANNEL_TRANSIENT_FAILURE : {
sd - > connected_subchannel . reset ( ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG ,
" [RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
" Requesting re-resolution " ,
p , sd - > subchannel ) ;
}
p - > TryReresolutionLocked ( & grpc_lb_round_robin_trace , GRPC_ERROR_NONE ) ;
break ;
for ( size_t i = 0 ; i < num_subchannels ( ) ; + + i ) {
const size_t index = ( i + last_ready_index_ + 1 ) % num_subchannels ( ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log (
GPR_INFO ,
" [RR %p] checking subchannel %p, subchannel_list %p, index % " PRIuPTR
" : state=%s " ,
policy ( ) , subchannel ( index ) - > subchannel ( ) , this , index ,
grpc_connectivity_state_name (
subchannel ( index ) - > connectivity_state ( ) ) ) ;
}
case GRPC_CHANNEL_READY : {
if ( sd - > connected_subchannel = = nullptr ) {
sd - > connected_subchannel =
grpc_subchannel_get_connected_subchannel ( sd - > subchannel ) ;
}
if ( sd - > subchannel_list ! = p - > subchannel_list_ ) {
// promote sd->subchannel_list to p->subchannel_list_.
// sd->subchannel_list must be equal to
// p->latest_pending_subchannel_list_ because we have already filtered
// for sds belonging to outdated subchannel lists.
GPR_ASSERT ( sd - > subchannel_list = = p - > latest_pending_subchannel_list_ ) ;
GPR_ASSERT ( ! sd - > subchannel_list - > shutting_down ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
const size_t num_subchannels =
p - > subchannel_list_ ! = nullptr
? p - > subchannel_list_ - > num_subchannels
: 0 ;
gpr_log ( GPR_DEBUG ,
" [RR %p] phasing out subchannel list %p (size % " PRIuPTR
" ) in favor of %p (size % " PRIuPTR " ) " ,
p , p - > subchannel_list_ , num_subchannels , sd - > subchannel_list ,
num_subchannels ) ;
}
if ( p - > subchannel_list_ ! = nullptr ) {
// dispose of the current subchannel_list
grpc_lb_subchannel_list_shutdown_and_unref ( p - > subchannel_list_ ,
" sl_phase_out_shutdown " ) ;
}
p - > subchannel_list_ = p - > latest_pending_subchannel_list_ ;
p - > latest_pending_subchannel_list_ = nullptr ;
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p - > pending_picks . This preemptively replicates rr_pick ( ) ' s actions . */
const size_t next_ready_index = p - > GetNextReadySubchannelIndexLocked ( ) ;
GPR_ASSERT ( next_ready_index < p - > subchannel_list_ - > num_subchannels ) ;
grpc_lb_subchannel_data * selected =
& p - > subchannel_list_ - > subchannels [ next_ready_index ] ;
if ( p - > pending_picks_ ! = nullptr ) {
// if the selected subchannel is going to be used for the pending
// picks, update the last picked pointer
p - > UpdateLastReadySubchannelIndexLocked ( next_ready_index ) ;
}
PickState * pick ;
while ( ( pick = p - > pending_picks_ ) ) {
p - > pending_picks_ = pick - > next ;
pick - > connected_subchannel = selected - > connected_subchannel ;
if ( pick - > user_data ! = nullptr ) {
* pick - > user_data = selected - > user_data ;
}
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG ,
" [RR %p] Fulfilling pending pick. Target <-- subchannel %p "
" (subchannel_list %p, index % " PRIuPTR " ) " ,
p , selected - > subchannel , p - > subchannel_list_ ,
next_ready_index ) ;
}
GRPC_CLOSURE_SCHED ( pick - > on_complete , GRPC_ERROR_NONE ) ;
if ( subchannel ( index ) - > connectivity_state ( ) = = GRPC_CHANNEL_READY ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] found next ready subchannel (%p) at index % " PRIuPTR
" of subchannel_list %p " ,
policy ( ) , subchannel ( index ) - > subchannel ( ) , index , this ) ;
}
break ;
return index ;
}
case GRPC_CHANNEL_SHUTDOWN :
GPR_UNREACHABLE_CODE ( return ) ;
case GRPC_CHANNEL_CONNECTING :
case GRPC_CHANNEL_IDLE : ; // fallthrough
}
// Update state counters.
UpdateStateCountersLocked ( sd ) ;
// Only update connectivity based on the selected subchannel list.
if ( sd - > subchannel_list = = p - > subchannel_list_ ) {
p - > UpdateConnectivityStatusLocked ( sd , GRPC_ERROR_REF ( error ) ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [RR %p] no subchannels in ready state " , this ) ;
}
return num_subchannels ( ) ;
}
// Sets last_ready_index_ to last_ready_index.
void RoundRobin : : RoundRobinSubchannelList : : UpdateLastReadySubchannelIndexLocked (
size_t last_ready_index ) {
GPR_ASSERT ( last_ready_index < num_subchannels ( ) ) ;
last_ready_index_ = last_ready_index ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] setting last_ready_subchannel_index=% " PRIuPTR
" (SC %p, CSC %p) " ,
policy ( ) , last_ready_index ,
subchannel ( last_ready_index ) - > subchannel ( ) ,
subchannel ( last_ready_index ) - > connected_subchannel ( ) ) ;
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch ( sd ) ;
}
grpc_connectivity_state RoundRobin : : CheckConnectivityLocked (
@ -555,11 +592,12 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin : : PingOneLocked ( grpc_closure * on_initiate ,
grpc_closure * on_ack ) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked ( ) ;
if ( next_ready_index < subchannel_list_ - > num_subchannels ) {
grpc_lb_subchannel_data * selected =
& subchannel_list_ - > subchannels [ next_ready_index ] ;
selected - > connected_subchannel - > Ping ( on_initiate , on_ack ) ;
const size_t next_ready_index =
subchannel_list_ - > GetNextReadySubchannelIndexLocked ( ) ;
if ( next_ready_index < subchannel_list_ - > num_subchannels ( ) ) {
RoundRobinSubchannelData * selected =
subchannel_list_ - > subchannel ( next_ready_index ) ;
selected - > connected_subchannel ( ) - > Ping ( on_initiate , on_ack ) ;
} else {
GRPC_CLOSURE_SCHED ( on_initiate , GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Round Robin not connected " ) ) ;
@ -570,7 +608,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
void RoundRobin : : UpdateLocked ( const grpc_channel_args & args ) {
const grpc_arg * arg = grpc_channel_args_find ( & args , GRPC_ARG_LB_ADDRESSES ) ;
if ( arg = = nullptr | | arg - > type ! = GRPC_ARG_POINTER ) {
if ( GPR_UNLIKELY ( arg = = nullptr | | arg - > type ! = GRPC_ARG_POINTER ) ) {
gpr_log ( GPR_ERROR , " [RR %p] update provided no addresses; ignoring " , this ) ;
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
@ -582,80 +620,37 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
}
return ;
}
grpc_lb_addresses * addresses = ( grpc_lb_addresses * ) arg - > value . pointer . p ;
grpc_lb_addresses * addresses =
static_cast < grpc_lb_addresses * > ( arg - > value . pointer . p ) ;
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG , " [RR %p] received update with % " PRIuPTR " addresses " ,
gpr_log ( GPR_INFO , " [RR %p] received update with % " PRIuPTR " addresses " ,
this , addresses - > num_addresses ) ;
}
grpc_lb_subchannel_list * subchannel_list = grpc_lb_subchannel_list_create (
this , & grpc_lb_round_robin_trace , addresses , combiner ( ) ,
client_channel_factory ( ) , args , & RoundRobin : : OnConnectivityChangedLocked ) ;
if ( subchannel_list - > num_subchannels = = 0 ) {
grpc_connectivity_state_set (
& state_tracker_ , GRPC_CHANNEL_TRANSIENT_FAILURE ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Empty update " ) ,
" rr_update_empty " ) ;
if ( subchannel_list_ ! = nullptr ) {
grpc_lb_subchannel_list_shutdown_and_unref ( subchannel_list_ ,
" sl_shutdown_empty_update " ) ;
// Replace latest_pending_subchannel_list_.
if ( latest_pending_subchannel_list_ ! = nullptr ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [RR %p] Shutting down previous pending subchannel list %p " , this ,
latest_pending_subchannel_list_ . get ( ) ) ;
}
subchannel_list_ = subchannel_list ; // empty list
return ;
}
if ( started_picking_ ) {
for ( size_t i = 0 ; i < subchannel_list - > num_subchannels ; + + i ) {
const grpc_connectivity_state subchannel_state =
grpc_subchannel_check_connectivity (
subchannel_list - > subchannels [ i ] . subchannel , nullptr ) ;
// Override the default setting of IDLE for connectivity notification
// purposes if the subchannel is already in transient failure. Otherwise
// we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
// discrepancy, attempt to re-resolve and end up here again.
// TODO(roth): As part of C++-ifying the subchannel_list API, design a
// better API for notifying the LB policy of subchannel states, which can
// be used both for the subchannel's initial state and for subsequent
// state changes. This will allow us to handle this more generally instead
// of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
// pending picks across all READY subchannels rather than sending them all
// to the first one).
if ( subchannel_state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
subchannel_list - > subchannels [ i ] . pending_connectivity_state_unsafe =
subchannel_list - > subchannels [ i ] . curr_connectivity_state =
subchannel_list - > subchannels [ i ] . prev_connectivity_state =
subchannel_state ;
- - subchannel_list - > num_idle ;
+ + subchannel_list - > num_transient_failures ;
}
}
if ( latest_pending_subchannel_list_ ! = nullptr ) {
if ( grpc_lb_round_robin_trace . enabled ( ) ) {
gpr_log ( GPR_DEBUG ,
" [RR %p] Shutting down latest pending subchannel list %p, "
" about to be replaced by newer latest %p " ,
this , latest_pending_subchannel_list_ , subchannel_list ) ;
}
grpc_lb_subchannel_list_shutdown_and_unref (
latest_pending_subchannel_list_ , " sl_outdated " ) ;
}
latest_pending_subchannel_list_ = subchannel_list ;
for ( size_t i = 0 ; i < subchannel_list - > num_subchannels ; + + i ) {
/* Watch every new subchannel. A subchannel list becomes active the
* moment one of its subchannels is READY . At that moment , we swap
* p - > subchannel_list for sd - > subchannel_list , provided the subchannel
* list is still valid ( ie , isn ' t shutting down ) */
SubchannelListRefForConnectivityWatch ( subchannel_list ,
" connectivity_watch " ) ;
grpc_lb_subchannel_data_start_connectivity_watch (
& subchannel_list - > subchannels [ i ] ) ;
latest_pending_subchannel_list_ = MakeOrphanable < RoundRobinSubchannelList > (
this , & grpc_lb_round_robin_trace , addresses , combiner ( ) ,
client_channel_factory ( ) , args ) ;
// If we haven't started picking yet or the new list is empty,
// immediately promote the new list to the current list.
if ( ! started_picking_ | |
latest_pending_subchannel_list_ - > num_subchannels ( ) = = 0 ) {
if ( latest_pending_subchannel_list_ - > num_subchannels ( ) = = 0 ) {
grpc_connectivity_state_set (
& state_tracker_ , GRPC_CHANNEL_TRANSIENT_FAILURE ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Empty update " ) ,
" rr_update_empty " ) ;
}
subchannel_list_ = std : : move ( latest_pending_subchannel_list_ ) ;
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if ( subchannel_list_ ! = nullptr ) {
grpc_lb_subchannel_list_shutdown_and_unref (
subchannel_list_ , " rr_update_before_started_picking " ) ;
}
subchannel_list_ = subchannel_list ;
// If we've started picking, start watching the new list.
latest_pending_subchannel_list_ - > StartWatchingLocked ( ) ;
}
}