@ -123,7 +123,7 @@ class GrpcLb : public LoadBalancingPolicy {
GrpcLb ( const grpc_lb_addresses * addresses , const Args & args ) ;
void UpdateLocked ( const grpc_channel_args & args ) override ;
bool PickLocked ( PickState * pick ) override ;
bool PickLocked ( PickState * pick , grpc_error * * error ) override ;
void CancelPickLocked ( PickState * pick , grpc_error * error ) override ;
void CancelMatchingPicksLocked ( uint32_t initial_metadata_flags_mask ,
uint32_t initial_metadata_flags_eq ,
@ -133,7 +133,6 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked (
grpc_error * * connectivity_error ) override ;
void HandOffPendingPicksLocked ( LoadBalancingPolicy * new_policy ) override ;
void PingOneLocked ( grpc_closure * on_initiate , grpc_closure * on_ack ) override ;
void ExitIdleLocked ( ) override ;
void FillChildRefsForChannelz ( ChildRefsList * child_subchannels ,
ChildRefsList * child_channels ) override ;
@ -167,13 +166,6 @@ class GrpcLb : public LoadBalancingPolicy {
PendingPick * next = nullptr ;
} ;
/// A linked list of pending pings waiting for the RR policy to be created.
struct PendingPing {
grpc_closure * on_initiate ;
grpc_closure * on_ack ;
PendingPing * next = nullptr ;
} ;
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState
: public InternallyRefCountedWithTracing < BalancerCallState > {
@ -272,14 +264,12 @@ class GrpcLb : public LoadBalancingPolicy {
void AddPendingPick ( PendingPick * pp ) ;
static void OnPendingPickComplete ( void * arg , grpc_error * error ) ;
// Pending ping methods.
void AddPendingPing ( grpc_closure * on_initiate , grpc_closure * on_ack ) ;
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked ( ) ;
grpc_channel_args * CreateRoundRobinPolicyArgsLocked ( ) ;
void CreateRoundRobinPolicyLocked ( const Args & args ) ;
bool PickFromRoundRobinPolicyLocked ( bool force_async , PendingPick * pp ) ;
bool PickFromRoundRobinPolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) ;
void UpdateConnectivityStateFromRoundRobinPolicyLocked (
grpc_error * rr_state_error ) ;
static void OnRoundRobinConnectivityChangedLocked ( void * arg ,
@ -342,9 +332,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_ ;
grpc_closure lb_on_fallback_ ;
// Pending picks and pings that are waiting on the RR policy's connectivity.
// Pending picks that are waiting on the RR policy's connectivity.
PendingPick * pending_picks_ = nullptr ;
PendingPing * pending_pings_ = nullptr ;
// The RR policy to use for the backends.
OrphanablePtr < LoadBalancingPolicy > rr_policy_ ;
@ -1080,7 +1069,6 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb : : ~ GrpcLb ( ) {
GPR_ASSERT ( pending_picks_ = = nullptr ) ;
GPR_ASSERT ( pending_pings_ = = nullptr ) ;
gpr_mu_destroy ( & lb_channel_mu_ ) ;
gpr_free ( ( void * ) server_name_ ) ;
grpc_channel_args_destroy ( args_ ) ;
@ -1126,14 +1114,6 @@ void GrpcLb::ShutdownLocked() {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED ( & pp - > on_complete , GRPC_ERROR_REF ( error ) ) ;
}
// Clear pending pings.
PendingPing * pping ;
while ( ( pping = pending_pings_ ) ! = nullptr ) {
pending_pings_ = pping - > next ;
GRPC_CLOSURE_SCHED ( pping - > on_initiate , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( pping - > on_ack , GRPC_ERROR_REF ( error ) ) ;
Delete ( pping ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
@ -1147,9 +1127,10 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
pending_picks_ = pp - > next ;
pp - > pick - > on_complete = pp - > original_on_complete ;
pp - > pick - > user_data = nullptr ;
if ( new_policy - > PickLocked ( pp - > pick ) ) {
grpc_error * error = GRPC_ERROR_NONE ;
if ( new_policy - > PickLocked ( pp - > pick , & error ) ) {
// Synchronous return; schedule closure.
GRPC_CLOSURE_SCHED ( pp - > pick - > on_complete , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( pp - > pick - > on_complete , error ) ;
}
Delete ( pp ) ;
}
@ -1233,58 +1214,37 @@ void GrpcLb::ExitIdleLocked() {
}
}
bool GrpcLb : : PickLocked ( PickState * pick ) {
bool GrpcLb : : PickLocked ( PickState * pick , grpc_error * * error ) {
PendingPick * pp = PendingPickCreate ( pick ) ;
bool pick_done = false ;
if ( rr_policy_ ! = nullptr ) {
const grpc_connectivity_state rr_connectivity_state =
rr_policy_ - > CheckConnectivityLocked ( nullptr ) ;
// The RR policy may have transitioned to SHUTDOWN but the callback
// registered to capture this event (on_rr_connectivity_changed_) may not
// have been invoked yet. We need to make sure we aren't trying to pick
// from an RR policy instance that's in shutdown.
if ( rr_connectivity_state = = GRPC_CHANNEL_SHUTDOWN ) {
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] about to PICK from RR %p " , this ,
rr_policy_ . get ( ) ) ;
}
pick_done =
PickFromRoundRobinPolicyLocked ( false /* force_async */ , pp , error ) ;
} else { // rr_policy_ == NULL
if ( pick - > on_complete = = nullptr ) {
* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" No pick result available but synchronous result required. " ) ;
pick_done = true ;
} else {
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [grpclb %p] NOT picking from from RR %p: RR conn state=%s " ,
this , rr_policy_ . get ( ) ,
grpc_connectivity_state_name ( rr_connectivity_state ) ) ;
" [grpclb %p] No RR policy. Adding to grpclb's pending picks " ,
this ) ;
}
AddPendingPick ( pp ) ;
pick_done = false ;
} else { // RR not in shutdown
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] about to PICK from RR %p " , this ,
rr_policy_ . get ( ) ) ;
if ( ! started_picking_ ) {
StartPickingLocked ( ) ;
}
pick_done = PickFromRoundRobinPolicyLocked ( false /* force_async */ , pp ) ;
}
} else { // rr_policy_ == NULL
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [grpclb %p] No RR policy. Adding to grpclb's pending picks " ,
this ) ;
}
AddPendingPick ( pp ) ;
if ( ! started_picking_ ) {
StartPickingLocked ( ) ;
pick_done = false ;
}
pick_done = false ;
}
return pick_done ;
}
void GrpcLb : : PingOneLocked ( grpc_closure * on_initiate , grpc_closure * on_ack ) {
if ( rr_policy_ ! = nullptr ) {
rr_policy_ - > PingOneLocked ( on_initiate , on_ack ) ;
} else {
AddPendingPing ( on_initiate , on_ack ) ;
if ( ! started_picking_ ) {
StartPickingLocked ( ) ;
}
}
}
void GrpcLb : : FillChildRefsForChannelz ( ChildRefsList * child_subchannels ,
ChildRefsList * child_channels ) {
// delegate to the RoundRobin to fill the children subchannels.
@ -1598,18 +1558,6 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
pending_picks_ = pp ;
}
//
// PendingPing
//
void GrpcLb : : AddPendingPing ( grpc_closure * on_initiate , grpc_closure * on_ack ) {
PendingPing * pping = New < PendingPing > ( ) ;
pping - > on_initiate = on_initiate ;
pping - > on_ack = on_ack ;
pping - > next = pending_pings_ ;
pending_pings_ = pping ;
}
//
// code for interacting with the RR policy
//
@ -1619,7 +1567,8 @@ void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
// cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately.
bool GrpcLb : : PickFromRoundRobinPolicyLocked ( bool force_async , PendingPick * pp ) {
bool GrpcLb : : PickFromRoundRobinPolicyLocked ( bool force_async , PendingPick * pp ,
grpc_error * * error ) {
// Check for drops if we are not using fallback backend addresses.
if ( serverlist_ ! = nullptr ) {
// Look at the index into the serverlist to see if we should drop this call.
@ -1653,11 +1602,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
GPR_ASSERT ( pp - > pick - > user_data = = nullptr ) ;
pp - > pick - > user_data = ( void * * ) & pp - > lb_token ;
// Pick via the RR policy.
bool pick_done = rr_policy_ - > PickLocked ( pp - > pick ) ;
bool pick_done = rr_policy_ - > PickLocked ( pp - > pick , error ) ;
if ( pick_done ) {
PendingPickSetMetadataAndContext ( pp ) ;
if ( force_async ) {
GRPC_CLOSURE_SCHED ( pp - > original_on_complete , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( pp - > original_on_complete , * error ) ;
* error = GRPC_ERROR_NONE ;
pick_done = false ;
}
Delete ( pp ) ;
@ -1709,18 +1659,8 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
" [grpclb %p] Pending pick about to (async) PICK from RR %p " , this ,
rr_policy_ . get ( ) ) ;
}
PickFromRoundRobinPolicyLocked ( true /* force_async */ , pp ) ;
}
// Send pending pings to RR policy.
PendingPing * pping ;
while ( ( pping = pending_pings_ ) ) {
pending_pings_ = pping - > next ;
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] Pending ping about to PING from RR %p " ,
this , rr_policy_ . get ( ) ) ;
}
rr_policy_ - > PingOneLocked ( pping - > on_initiate , pping - > on_ack ) ;
Delete ( pping ) ;
grpc_error * error = GRPC_ERROR_NONE ;
PickFromRoundRobinPolicyLocked ( true /* force_async */ , pp , & error ) ;
}
}