@ -54,7 +54,7 @@
* operations in progress over the old RR instance . This is done by
* decreasing the reference count on the old policy . The moment no more
* references are held on the old RR policy , it ' ll be destroyed and \ a
* on _rr_connectivity_changed notified with a \ a GRPC_CHANNEL_SHUTDOWN
* glb _rr_connectivity_changed notified with a \ a GRPC_CHANNEL_SHUTDOWN
* state . At this point we can transition to a new RR instance safely , which
* is done once again via \ a rr_handover_locked ( ) .
*
@ -128,48 +128,187 @@
grpc_core : : TraceFlag grpc_lb_glb_trace ( false , " glb " ) ;
struct glb_lb_policy ;
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static grpc_error * initial_metadata_add_lb_token (
grpc_metadata_batch * initial_metadata ,
grpc_linked_mdelem * lb_token_mdelem_storage , grpc_mdelem lb_token ) {
GPR_ASSERT ( lb_token_mdelem_storage ! = nullptr ) ;
GPR_ASSERT ( ! GRPC_MDISNULL ( lb_token ) ) ;
return grpc_metadata_batch_add_tail ( initial_metadata , lb_token_mdelem_storage ,
lb_token ) ;
}
namespace {
static void destroy_client_stats ( void * arg ) {
grpc_grpclb_client_stats_unref ( ( grpc_grpclb_client_stats * ) arg ) ;
}
/// Linked list of pending pick requests. It stores all information needed to
/// eventually call (Round Robin's) pick() on them. They mainly stay pending
/// waiting for the RR policy to be created.
///
/// Note that when a pick is sent to the RR policy, we inject our own
/// on_complete callback, so that we can intercept the result before
/// invoking the original on_complete callback. This allows us to set the
/// LB token metadata and add client_stats to the call context.
/// See \a pending_pick_complete() for details.
struct pending_pick {
// Our on_complete closure and the original one.
grpc_closure on_complete ;
grpc_closure * original_on_complete ;
// The original pick.
grpc_lb_policy_pick_state * pick ;
// Stats for client-side load reporting. Note that this holds a
// reference, which must be either passed on via context or unreffed.
typedef struct wrapped_rr_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure ;
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance , respectively . */
grpc_closure * wrapped_closure ;
/* the pick's initial metadata, kept in order to append the LB token for the
* pick */
grpc_metadata_batch * initial_metadata ;
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
grpc_connected_subchannel * * target ;
/* the context to be populated for the subchannel call */
grpc_call_context_element * context ;
/* Stats for client-side load reporting. Note that this holds a
* reference , which must be either passed on via context or unreffed . */
grpc_grpclb_client_stats * client_stats ;
// The LB token associated with the pick. This is set via user_data in
// the pick.
/* the LB token associated with the pick */
grpc_mdelem lb_token ;
// The grpclb instance that created the wrapping. This instance is not owned,
// reference counts are untouched. It's used only for logging purposes.
glb_lb_policy * glb_policy ;
// Next pending pick.
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem * lb_token_mdelem_storage ;
/* The RR instance related to the closure */
grpc_lb_policy * rr_policy ;
/* The grpclb instance that created the wrapping. This instance is not owned,
* reference counts are untouched . It ' s used only for logging purposes . */
grpc_lb_policy * glb_policy ;
/* heap memory to be freed upon closure execution. */
void * free_when_done ;
} wrapped_rr_closure_arg ;
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance . We wrap this closure in
* order to unref the round robin instance upon its invocation */
static void wrapped_rr_closure ( void * arg , grpc_error * error ) {
wrapped_rr_closure_arg * wc_arg = ( wrapped_rr_closure_arg * ) arg ;
GPR_ASSERT ( wc_arg - > wrapped_closure ! = nullptr ) ;
GRPC_CLOSURE_SCHED ( wc_arg - > wrapped_closure , GRPC_ERROR_REF ( error ) ) ;
if ( wc_arg - > rr_policy ! = nullptr ) {
/* if *target is nullptr, no pick has been made by the RR policy (eg, all
* addresses failed to connect ) . There won ' t be any user_data / token
* available */
if ( * wc_arg - > target ! = nullptr ) {
if ( ! GRPC_MDISNULL ( wc_arg - > lb_token ) ) {
initial_metadata_add_lb_token ( wc_arg - > initial_metadata ,
wc_arg - > lb_token_mdelem_storage ,
GRPC_MDELEM_REF ( wc_arg - > lb_token ) ) ;
} else {
gpr_log (
GPR_ERROR ,
" [grpclb %p] No LB token for connected subchannel pick %p (from RR "
" instance %p). " ,
wc_arg - > glb_policy , * wc_arg - > target , wc_arg - > rr_policy ) ;
abort ( ) ;
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT ( wc_arg - > client_stats ! = nullptr ) ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . value = wc_arg - > client_stats ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy = destroy_client_stats ;
} else {
grpc_grpclb_client_stats_unref ( wc_arg - > client_stats ) ;
}
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] Unreffing RR %p " , wc_arg - > glb_policy ,
wc_arg - > rr_policy ) ;
}
GRPC_LB_POLICY_UNREF ( wc_arg - > rr_policy , " wrapped_rr_closure " ) ;
}
GPR_ASSERT ( wc_arg - > free_when_done ! = nullptr ) ;
gpr_free ( wc_arg - > free_when_done ) ;
}
namespace {
/* Linked list of pending pick requests. It stores all information needed to
* eventually call ( Round Robin ' s ) pick ( ) on them . They mainly stay pending
* waiting for the RR policy to be created / updated .
*
* One particularity is the wrapping of the user - provided \ a on_complete closure
* ( in \ a wrapped_on_complete and \ a wrapped_on_complete_arg ) . This is needed in
* order to correctly unref the RR policy instance upon completion of the pick .
* See \ a wrapped_rr_closure for details . */
struct pending_pick {
struct pending_pick * next ;
/* original pick()'s arguments */
grpc_lb_policy_pick_args pick_args ;
/* output argument where to store the pick()ed connected subchannel, or
* nullptr upon error . */
grpc_connected_subchannel * * target ;
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg ;
} ;
} // namespace
/// A linked list of pending pings waiting for the RR policy to be created.
struct pending_ping {
grpc_closure * on_initiate ;
grpc_closure * on_ack ;
static void add_pending_pick ( pending_pick * * root ,
const grpc_lb_policy_pick_args * pick_args ,
grpc_connected_subchannel * * target ,
grpc_call_context_element * context ,
grpc_closure * on_complete ) {
pending_pick * pp = ( pending_pick * ) gpr_zalloc ( sizeof ( * pp ) ) ;
pp - > next = * root ;
pp - > pick_args = * pick_args ;
pp - > target = target ;
pp - > wrapped_on_complete_arg . wrapped_closure = on_complete ;
pp - > wrapped_on_complete_arg . target = target ;
pp - > wrapped_on_complete_arg . context = context ;
pp - > wrapped_on_complete_arg . initial_metadata = pick_args - > initial_metadata ;
pp - > wrapped_on_complete_arg . lb_token_mdelem_storage =
pick_args - > lb_token_mdelem_storage ;
pp - > wrapped_on_complete_arg . free_when_done = pp ;
GRPC_CLOSURE_INIT ( & pp - > wrapped_on_complete_arg . wrapper_closure ,
wrapped_rr_closure , & pp - > wrapped_on_complete_arg ,
grpc_schedule_on_exec_ctx ) ;
* root = pp ;
}
/* Same as the \a pending_pick struct but for ping operations */
typedef struct pending_ping {
struct pending_ping * next ;
} ;
} // namespace
/* args for sending the ping */
wrapped_rr_closure_arg * on_initiate ;
wrapped_rr_closure_arg * on_ack ;
} pending_ping ;
static void add_pending_ping ( pending_ping * * root , grpc_closure * on_initiate ,
grpc_closure * on_ack ) {
pending_ping * pping = ( pending_ping * ) gpr_zalloc ( sizeof ( * pping ) ) ;
if ( on_initiate ! = nullptr ) {
pping - > on_initiate =
( wrapped_rr_closure_arg * ) gpr_zalloc ( sizeof ( * pping - > on_initiate ) ) ;
pping - > on_initiate - > wrapped_closure = on_initiate ;
pping - > on_initiate - > free_when_done = pping - > on_initiate ;
GRPC_CLOSURE_INIT ( & pping - > on_initiate - > wrapper_closure , wrapped_rr_closure ,
& pping - > on_initiate , grpc_schedule_on_exec_ctx ) ;
}
if ( on_ack ! = nullptr ) {
pping - > on_ack = ( wrapped_rr_closure_arg * ) gpr_zalloc ( sizeof ( * pping - > on_ack ) ) ;
pping - > on_ack - > wrapped_closure = on_ack ;
pping - > on_ack - > free_when_done = pping - > on_ack ;
GRPC_CLOSURE_INIT ( & pping - > on_ack - > wrapper_closure , wrapped_rr_closure ,
& pping - > on_ack , grpc_schedule_on_exec_ctx ) ;
}
pping - > next = * root ;
* root = pping ;
}
/*
* glb_lb_policy
*/
typedef struct rr_connectivity_data rr_connectivity_data ;
struct glb_lb_policy {
typedef struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base ;
@ -194,9 +333,6 @@ struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy * rr_policy ;
grpc_closure on_rr_connectivity_changed ;
grpc_connectivity_state rr_connectivity_state ;
bool started_picking ;
/** our connectivity state tracker */
@ -301,84 +437,14 @@ struct glb_lb_policy {
grpc_closure client_load_report_closure ;
/* Client load report message payload. */
grpc_byte_buffer * client_load_report_payload ;
} ;
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static grpc_error * initial_metadata_add_lb_token (
grpc_metadata_batch * initial_metadata ,
grpc_linked_mdelem * lb_token_mdelem_storage , grpc_mdelem lb_token ) {
GPR_ASSERT ( lb_token_mdelem_storage ! = nullptr ) ;
GPR_ASSERT ( ! GRPC_MDISNULL ( lb_token ) ) ;
return grpc_metadata_batch_add_tail ( initial_metadata , lb_token_mdelem_storage ,
lb_token ) ;
}
static void destroy_client_stats ( void * arg ) {
grpc_grpclb_client_stats_unref ( ( grpc_grpclb_client_stats * ) arg ) ;
}
} glb_lb_policy ;
static void pending_pick_set_metadata_and_context ( pending_pick * pp ) {
/* if connected_subchannel is nullptr, no pick has been made by the RR
* policy ( e . g . , all addresses failed to connect ) . There won ' t be any
* user_data / token available */
if ( pp - > pick - > connected_subchannel ! = nullptr ) {
if ( ! GRPC_MDISNULL ( pp - > lb_token ) ) {
initial_metadata_add_lb_token ( pp - > pick - > initial_metadata ,
& pp - > pick - > lb_token_mdelem_storage ,
GRPC_MDELEM_REF ( pp - > lb_token ) ) ;
} else {
gpr_log ( GPR_ERROR ,
" [grpclb %p] No LB token for connected subchannel pick %p " ,
pp - > glb_policy , pp - > pick ) ;
abort ( ) ;
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT ( pp - > client_stats ! = nullptr ) ;
pp - > pick - > subchannel_call_context [ GRPC_GRPCLB_CLIENT_STATS ] . value =
pp - > client_stats ;
pp - > pick - > subchannel_call_context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy =
destroy_client_stats ;
} else {
grpc_grpclb_client_stats_unref ( pp - > client_stats ) ;
}
}
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance . We wrap this closure in
* order to unref the round robin instance upon its invocation */
static void pending_pick_complete ( void * arg , grpc_error * error ) {
pending_pick * pp = ( pending_pick * ) arg ;
pending_pick_set_metadata_and_context ( pp ) ;
GRPC_CLOSURE_SCHED ( pp - > original_on_complete , GRPC_ERROR_REF ( error ) ) ;
gpr_free ( pp ) ;
}
static pending_pick * pending_pick_create ( glb_lb_policy * glb_policy ,
grpc_lb_policy_pick_state * pick ) {
pending_pick * pp = ( pending_pick * ) gpr_zalloc ( sizeof ( * pp ) ) ;
pp - > pick = pick ;
pp - > glb_policy = glb_policy ;
GRPC_CLOSURE_INIT ( & pp - > on_complete , pending_pick_complete , pp ,
grpc_schedule_on_exec_ctx ) ;
pp - > original_on_complete = pick - > on_complete ;
pp - > pick - > on_complete = & pp - > on_complete ;
return pp ;
}
static void pending_pick_add ( pending_pick * * root , pending_pick * new_pp ) {
new_pp - > next = * root ;
* root = new_pp ;
}
static void pending_ping_add ( pending_ping * * root , grpc_closure * on_initiate ,
grpc_closure * on_ack ) {
pending_ping * pping = ( pending_ping * ) gpr_zalloc ( sizeof ( * pping ) ) ;
pping - > on_initiate = on_initiate ;
pping - > on_ack = on_ack ;
pping - > next = * root ;
* root = pping ;
}
/* Keeps track and reacts to changes in connectivity of the RR instance */
struct rr_connectivity_data {
grpc_closure on_change ;
grpc_connectivity_state state ;
glb_lb_policy * glb_policy ;
} ;
static bool is_server_valid ( const grpc_grpclb_server * server , size_t idx ,
bool log ) {
@ -491,6 +557,7 @@ static grpc_lb_addresses* process_serverlist_locked(
gpr_free ( uri ) ;
user_data = ( void * ) GRPC_MDELEM_LB_TOKEN_EMPTY . payload ;
}
grpc_lb_addresses_set_address ( lb_addresses , addr_idx , & addr . addr , addr . len ,
false /* is_balancer */ ,
nullptr /* balancer_name */ , user_data ) ;
@ -531,6 +598,7 @@ static void update_lb_connectivity_status_locked(
grpc_error * rr_state_error ) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check ( & glb_policy - > state_tracker ) ;
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy .
*
@ -560,6 +628,7 @@ static void update_lb_connectivity_status_locked(
*
* ( * ) This function mustn ' t be called during shutting down . */
GPR_ASSERT ( curr_glb_state ! = GRPC_CHANNEL_SHUTDOWN ) ;
switch ( rr_state ) {
case GRPC_CHANNEL_TRANSIENT_FAILURE :
case GRPC_CHANNEL_SHUTDOWN :
@ -570,6 +639,7 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_READY :
GPR_ASSERT ( rr_state_error = = GRPC_ERROR_NONE ) ;
}
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log (
GPR_INFO ,
@ -587,8 +657,10 @@ static void update_lb_connectivity_status_locked(
* cleanups this callback would otherwise be responsible for .
* If \ a force_async is true , then we will manually schedule the
* completion callback even if the pick is available immediately . */
static bool pick_from_internal_rr_locked ( glb_lb_policy * glb_policy ,
bool force_async , pending_pick * pp ) {
static bool pick_from_internal_rr_locked (
glb_lb_policy * glb_policy , const grpc_lb_policy_pick_args * pick_args ,
bool force_async , grpc_connected_subchannel * * target ,
wrapped_rr_closure_arg * wc_arg ) {
// Check for drops if we are not using fallback backend addresses.
if ( glb_policy - > serverlist ! = nullptr ) {
// Look at the index into the serverlist to see if we should drop this call.
@ -598,36 +670,57 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
glb_policy - > serverlist_index = 0 ; // Wrap-around.
}
if ( server - > drop ) {
// Not using the RR policy, so unref it.
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] Unreffing RR %p for drop " , glb_policy ,
wc_arg - > rr_policy ) ;
}
GRPC_LB_POLICY_UNREF ( wc_arg - > rr_policy , " glb_pick_sync " ) ;
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
GPR_ASSERT ( glb_policy - > client_stats ! = nullptr ) ;
GPR_ASSERT ( wc_arg - > client_stats ! = nullptr ) ;
grpc_grpclb_client_stats_add_call_dropped_locked (
server - > load_balance_token , glb_policy - > client_stats ) ;
server - > load_balance_token , wc_arg - > client_stats ) ;
grpc_grpclb_client_stats_unref ( wc_arg - > client_stats ) ;
if ( force_async ) {
GRPC_CLOSURE_SCHED ( pp - > original_on_complete , GRPC_ERROR_NONE ) ;
gpr_free ( pp ) ;
GPR_ASSERT ( wc_arg - > wrapped_closure ! = nullptr ) ;
GRPC_CLOSURE_SCHED ( wc_arg - > wrapped_closure , GRPC_ERROR_NONE ) ;
gpr_free ( wc_arg - > free_when_done ) ;
return false ;
}
gpr_free ( pp ) ;
gpr_free ( wc_arg - > free_when_done ) ;
return true ;
}
}
// Set client_stats and user_data.
pp - > client_stats = grpc_grpclb_client_stats_ref ( glb_policy - > client_stats ) ;
GPR_ASSERT ( pp - > pick - > user_data = = nullptr ) ;
pp - > pick - > user_data = ( void * * ) & pp - > lb_token ;
// Pick via the RR policy.
bool pick_done = grpc_lb_policy_pick_locked ( glb_policy - > rr_policy , pp - > pick ) ;
const bool pick_done = grpc_lb_policy_pick_locked (
wc_arg - > rr_policy , pick_args , target , wc_arg - > context ,
( void * * ) & wc_arg - > lb_token , & wc_arg - > wrapper_closure ) ;
if ( pick_done ) {
pending_pick_set_metadata_and_context ( pp ) ;
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] Unreffing RR %p " , glb_policy ,
wc_arg - > rr_policy ) ;
}
GRPC_LB_POLICY_UNREF ( wc_arg - > rr_policy , " glb_pick_sync " ) ;
/* add the load reporting initial metadata */
initial_metadata_add_lb_token ( pick_args - > initial_metadata ,
pick_args - > lb_token_mdelem_storage ,
GRPC_MDELEM_REF ( wc_arg - > lb_token ) ) ;
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT ( wc_arg - > client_stats ! = nullptr ) ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . value = wc_arg - > client_stats ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy = destroy_client_stats ;
if ( force_async ) {
GRPC_CLOSURE_SCHED ( pp - > original_on_complete , GRPC_ERROR_NONE ) ;
pick_done = false ;
GPR_ASSERT ( wc_arg - > wrapped_closure ! = nullptr ) ;
GRPC_CLOSURE_SCHED ( wc_arg - > wrapped_closure , GRPC_ERROR_NONE ) ;
gpr_free ( wc_arg - > free_when_done ) ;
return false ;
}
gpr_free ( pp ) ;
gpr_free ( wc_arg - > free_when_done ) ;
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy ( glb_policy - > rr_policy ) .
@ -669,7 +762,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free ( args ) ;
}
static void on _rr_connectivity_changed_locked( void * arg , grpc_error * error ) ;
static void glb _rr_connectivity_changed_locked( void * arg , grpc_error * error ) ;
static void create_rr_locked ( glb_lb_policy * glb_policy ,
grpc_lb_policy_args * args ) {
GPR_ASSERT ( glb_policy - > rr_policy = = nullptr ) ;
@ -691,46 +784,72 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy - > base . request_reresolution = nullptr ;
glb_policy - > rr_policy = new_rr_policy ;
grpc_error * rr_state_error = nullptr ;
glb_policy - > rr_connectivity_state = grpc_lb_policy_check_connectivity_locked (
glb_policy - > rr_policy , & rr_state_error ) ;
const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked ( glb_policy - > rr_policy ,
& rr_state_error ) ;
/* Connectivity state is a function of the RR policy updated/created */
update_lb_connectivity_status_locked (
glb_policy , glb_policy - > rr_connectivity_state , rr_state_error ) ;
update_lb_connectivity_status_locked ( glb_policy , rr_state , rr_state_error ) ;
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy . This will make the RR policy progress upon activity on
* gRPC LB , which in turn is tied to the application ' s call */
grpc_pollset_set_add_pollset_set ( glb_policy - > rr_policy - > interested_parties ,
glb_policy - > base . interested_parties ) ;
GRPC_CLOSURE_INIT ( & glb_policy - > on_rr_connectivity_changed ,
on_rr_connectivity_changed_locked , glb_policy ,
/* Allocate the data for the tracking of the new RR policy's connectivity.
* It ' ll be deallocated in glb_rr_connectivity_changed ( ) */
rr_connectivity_data * rr_connectivity =
( rr_connectivity_data * ) gpr_zalloc ( sizeof ( rr_connectivity_data ) ) ;
GRPC_CLOSURE_INIT ( & rr_connectivity - > on_change ,
glb_rr_connectivity_changed_locked , rr_connectivity ,
grpc_combiner_scheduler ( glb_policy - > base . combiner ) ) ;
rr_connectivity - > glb_policy = glb_policy ;
rr_connectivity - > state = rr_state ;
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_REF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
grpc_lb_policy_notify_on_state_change_locked (
glb_policy - > rr_policy , & glb_policy - > rr_connectivity_state ,
& glb_polic y- > on_rr_connectivity_ changed ) ;
GRPC_LB_POLICY_WEAK_ REF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
grpc_lb_policy_notify_on_state_change_locked ( glb_policy - > rr_policy ,
& rr_connectivit y - > state ,
& rr_connectivit y - > on_change ) ;
grpc_lb_policy_exit_idle_locked ( glb_policy - > rr_policy ) ;
// Send pending picks to RR policy.
/* Update picks and pings in wait */
pending_pick * pp ;
while ( ( pp = glb_policy - > pending_picks ) ) {
glb_policy - > pending_picks = pp - > next ;
GRPC_LB_POLICY_REF ( glb_policy - > rr_policy , " rr_handover_pending_pick " ) ;
pp - > wrapped_on_complete_arg . rr_policy = glb_policy - > rr_policy ;
pp - > wrapped_on_complete_arg . client_stats =
grpc_grpclb_client_stats_ref ( glb_policy - > client_stats ) ;
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
" [grpclb %p] Pending pick about to (async) PICK from RR %p " ,
glb_policy , glb_policy - > rr_policy ) ;
}
pick_from_internal_rr_locked ( glb_policy , true /* force_async */ , pp ) ;
pick_from_internal_rr_locked ( glb_policy , & pp - > pick_args ,
true /* force_async */ , pp - > target ,
& pp - > wrapped_on_complete_arg ) ;
}
// Send pending pings to RR policy.
pending_ping * pping ;
while ( ( pping = glb_policy - > pending_pings ) ) {
glb_policy - > pending_pings = pping - > next ;
grpc_closure * on_initiate = nullptr ;
grpc_closure * on_ack = nullptr ;
if ( pping - > on_initiate ! = nullptr ) {
GRPC_LB_POLICY_REF ( glb_policy - > rr_policy , " rr_handover_pending_ping " ) ;
pping - > on_initiate - > rr_policy = glb_policy - > rr_policy ;
on_initiate = & pping - > on_initiate - > wrapper_closure ;
}
if ( pping - > on_ack ! = nullptr ) {
GRPC_LB_POLICY_REF ( glb_policy - > rr_policy , " rr_handover_pending_ping " ) ;
pping - > on_ack - > rr_policy = glb_policy - > rr_policy ;
on_ack = & pping - > on_ack - > wrapper_closure ;
}
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] Pending ping about to PING from RR %p " ,
glb_policy , glb_policy - > rr_policy ) ;
}
grpc_lb_policy_ping_one_locked ( glb_policy - > rr_policy , pping - > on_initiate ,
pping - > on_ack ) ;
grpc_lb_policy_ping_one_locked ( glb_policy - > rr_policy , on_initiate , on_ack ) ;
gpr_free ( pping ) ;
}
}
@ -756,28 +875,31 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy ( args ) ;
}
static void on_rr_connectivity_changed_locked ( void * arg , grpc_error * error ) {
glb_lb_policy * glb_policy = ( glb_lb_policy * ) arg ;
static void glb_rr_connectivity_changed_locked ( void * arg , grpc_error * error ) {
rr_connectivity_data * rr_connectivity = ( rr_connectivity_data * ) arg ;
glb_lb_policy * glb_policy = rr_connectivity - > glb_policy ;
if ( glb_policy - > shutting_down ) {
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
GRPC_LB_POLICY_WEAK_UNREF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
gpr_free ( rr_connectivity ) ;
return ;
}
if ( glb_polic y- > rr_connectivity_ state = = GRPC_CHANNEL_SHUTDOWN ) {
if ( rr_connectivit y- > state = = GRPC_CHANNEL_SHUTDOWN ) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates : the SHUTDOWN state is a
* sink , policies can ' t transition back from it . . */
GRPC_LB_POLICY_UNREF ( glb_policy - > rr_policy , " rr_connectivity_shutdown " ) ;
glb_policy - > rr_policy = nullptr ;
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
GRPC_LB_POLICY_WEAK_UNREF ( & glb_policy - > base , " glb_rr_connectivity_cb " ) ;
gpr_free ( rr_connectivity ) ;
return ;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
update_lb_connectivity_status_locked (
glb_policy , glb_policy - > rr_connectivity_state , GRPC_ERROR_REF ( error ) ) ;
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
grpc_lb_policy_notify_on_state_change_locked (
glb_policy - > rr_policy , & glb_polic y- > rr_connectivity_ state,
& glb_polic y- > on_rr_connectivity_ changed ) ;
update_lb_connectivity_status_locked ( glb_policy , rr_connectivity - > state ,
GRPC_ERROR_REF ( error ) ) ;
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change_locked ( glb_policy - > rr_policy ,
& rr_connectivit y - > state ,
& rr_connectivit y - > on_change ) ;
}
static void destroy_balancer_name ( void * balancer_name ) {
@ -885,17 +1007,22 @@ static void glb_destroy(grpc_lb_policy* pol) {
gpr_free ( glb_policy ) ;
}
static void glb_shutdown_locked ( grpc_lb_policy * pol ,
grpc_lb_policy * new_policy ) {
static void glb_shutdown_locked ( grpc_lb_policy * pol ) {
glb_lb_policy * glb_policy = ( glb_lb_policy * ) pol ;
grpc_error * error = GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Channel shutdown " ) ;
glb_policy - > shutting_down = true ;
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy - > mu : lb_on_server_status_received , invoked due to
* the cancel , needs to acquire that same lock */
grpc_call * lb_call = glb_policy - > lb_call ;
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy - > lb_call is only assigned in lb_call_init_locked as part
* of query_for_backends_locked , which can only be invoked while
* glb_policy - > shutting_down is false . */
if ( glb_policy - > lb_call ! = nullptr ) {
grpc_call_cancel ( glb_policy - > lb_call , nullptr ) ;
if ( lb_call ! = nullptr ) {
grpc_call_cancel ( lb_call , nullptr ) ;
/* lb_on_server_status_received will pick up the cancel and clean up */
}
if ( glb_policy - > retry_timer_callback_pending ) {
@ -904,8 +1031,12 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
if ( glb_policy - > fallback_timer_callback_pending ) {
grpc_timer_cancel ( & glb_policy - > lb_fallback_timer ) ;
}
pending_pick * pp = glb_policy - > pending_picks ;
glb_policy - > pending_picks = nullptr ;
pending_ping * pping = glb_policy - > pending_pings ;
glb_policy - > pending_pings = nullptr ;
if ( glb_policy - > rr_policy ! = nullptr ) {
grpc_lb_policy_shutdown_locked ( glb_policy - > rr_policy , nullptr ) ;
GRPC_LB_POLICY_UNREF ( glb_policy - > rr_policy , " glb_shutdown " ) ;
} else {
grpc_lb_policy_try_reresolve ( pol , & grpc_lb_glb_trace , GRPC_ERROR_CANCELLED ) ;
@ -920,33 +1051,28 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
grpc_connectivity_state_set ( & glb_policy - > state_tracker , GRPC_CHANNEL_SHUTDOWN ,
GRPC_ERROR_REF ( error ) , " glb_shutdown " ) ;
// Clear pending picks.
pending_pick * pp = glb_policy - > pending_picks ;
glb_policy - > pending_picks = nullptr ;
while ( pp ! = nullptr ) {
pending_pick * next = pp - > next ;
if ( new_policy ! = nullptr ) {
// Hand pick over to new policy.
grpc_grpclb_client_stats_unref ( pp - > client_stats ) ;
pp - > pick - > on_complete = pp - > original_on_complete ;
if ( grpc_lb_policy_pick_locked ( new_policy , pp - > pick ) ) {
// Synchronous return; schedule callback.
GRPC_CLOSURE_SCHED ( pp - > pick - > on_complete , GRPC_ERROR_NONE ) ;
}
* pp - > target = nullptr ;
GRPC_CLOSURE_SCHED ( & pp - > wrapped_on_complete_arg . wrapper_closure ,
GRPC_ERROR_REF ( error ) ) ;
gpr_free ( pp ) ;
} else {
pp - > pick - > connected_subchannel = nullptr ;
GRPC_CLOSURE_SCHED ( & pp - > on_complete , GRPC_ERROR_REF ( error ) ) ;
}
pp = next ;
}
// Clear pending pings.
pending_ping * pping = glb_policy - > pending_pings ;
glb_policy - > pending_pings = nullptr ;
while ( pping ! = nullptr ) {
pending_ping * next = pping - > next ;
GRPC_CLOSURE_SCHED ( pping - > on_initiate , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( pping - > on_ack , GRPC_ERROR_REF ( error ) ) ;
if ( pping - > on_initiate ! = nullptr ) {
GRPC_CLOSURE_SCHED ( & pping - > on_initiate - > wrapper_closure ,
GRPC_ERROR_REF ( error ) ) ;
gpr_free ( pping - > on_initiate ) ;
}
if ( pping - > on_ack ! = nullptr ) {
GRPC_CLOSURE_SCHED ( & pping - > on_ack - > wrapper_closure ,
GRPC_ERROR_REF ( error ) ) ;
gpr_free ( pping - > on_ack ) ;
}
gpr_free ( pping ) ;
pping = next ;
}
@ -964,16 +1090,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked ( grpc_lb_policy * pol ,
grpc_lb_policy_pick_state * pick ,
grpc_connected_subchannel * * target ,
grpc_error * error ) {
glb_lb_policy * glb_policy = ( glb_lb_policy * ) pol ;
pending_pick * pp = glb_policy - > pending_picks ;
glb_policy - > pending_picks = nullptr ;
while ( pp ! = nullptr ) {
pending_pick * next = pp - > next ;
if ( pp - > pick = = pick ) {
pick - > connected_subchannel = nullptr ;
GRPC_CLOSURE_SCHED ( & pp - > on_complete ,
if ( pp - > target = = target ) {
* target = nullptr ;
GRPC_CLOSURE_SCHED ( & pp - > wrapped_ on_complete_arg . wrapper_closur e,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick Cancelled " , & error , 1 ) ) ;
} else {
@ -983,7 +1109,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
pp = next ;
}
if ( glb_policy - > rr_policy ! = nullptr ) {
grpc_lb_policy_cancel_pick_locked ( glb_policy - > rr_policy , pick ,
grpc_lb_policy_cancel_pick_locked ( glb_policy - > rr_policy , target ,
GRPC_ERROR_REF ( error ) ) ;
}
GRPC_ERROR_UNREF ( error ) ;
@ -1008,9 +1134,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol,
glb_policy - > pending_picks = nullptr ;
while ( pp ! = nullptr ) {
pending_pick * next = pp - > next ;
if ( ( pp - > pick - > initial_metadata_flags & initial_metadata_flags_mask ) = =
if ( ( pp - > pick_args . initial_metadata_flags & initial_metadata_flags_mask ) = =
initial_metadata_flags_eq ) {
GRPC_CLOSURE_SCHED ( & pp - > on_complete ,
GRPC_CLOSURE_SCHED ( & pp - > wrapped_ on_complete_arg . wrapper_closur e,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick Cancelled " , & error , 1 ) ) ;
} else {
@ -1036,7 +1162,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
! glb_policy - > fallback_timer_callback_pending ) {
grpc_millis deadline =
grpc_core : : ExecCtx : : Get ( ) - > Now ( ) + glb_policy - > lb_fallback_timeout_ms ;
GRPC_LB_POLICY_REF ( & glb_policy - > base , " grpclb_fallback_timer " ) ;
GRPC_LB_POLICY_WEAK_ REF ( & glb_policy - > base , " grpclb_fallback_timer " ) ;
GRPC_CLOSURE_INIT ( & glb_policy - > lb_on_fallback , lb_on_fallback_timer_locked ,
glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner ) ) ;
@ -1058,9 +1184,19 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
}
static int glb_pick_locked ( grpc_lb_policy * pol ,
grpc_lb_policy_pick_state * pick ) {
const grpc_lb_policy_pick_args * pick_args ,
grpc_connected_subchannel * * target ,
grpc_call_context_element * context , void * * user_data ,
grpc_closure * on_complete ) {
if ( pick_args - > lb_token_mdelem_storage = = nullptr ) {
* target = nullptr ;
GRPC_CLOSURE_SCHED ( on_complete ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" No mdelem storage for the LB token. Load reporting "
" won't work without it. Failing " ) ) ;
return 0 ;
}
glb_lb_policy * glb_policy = ( glb_lb_policy * ) pol ;
pending_pick * pp = pending_pick_create ( glb_policy , pick ) ;
bool pick_done = false ;
if ( glb_policy - > rr_policy ! = nullptr ) {
const grpc_connectivity_state rr_connectivity_state =
@ -1068,7 +1204,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
nullptr ) ;
// The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
// callback registered to capture this event
// (on _rr_connectivity_changed_locked) may not have been invoked yet. We
// (glb _rr_connectivity_changed_locked) may not have been invoked yet. We
// need to make sure we aren't trying to pick from a RR policy instance
// that's in shutdown.
if ( rr_connectivity_state = = GRPC_CHANNEL_SHUTDOWN ) {
@ -1078,16 +1214,32 @@ static int glb_pick_locked(grpc_lb_policy* pol,
glb_policy , glb_policy - > rr_policy ,
grpc_connectivity_state_name ( rr_connectivity_state ) ) ;
}
pending_pick_add ( & glb_policy - > pending_picks , pp ) ;
add_pending_pick ( & glb_policy - > pending_picks , pick_args , target , context ,
on_complete ) ;
pick_done = false ;
} else { // RR not in shutdown
if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO , " [grpclb %p] about to PICK from RR %p " , glb_policy ,
glb_policy - > rr_policy ) ;
}
GRPC_LB_POLICY_REF ( glb_policy - > rr_policy , " glb_pick " ) ;
wrapped_rr_closure_arg * wc_arg =
( wrapped_rr_closure_arg * ) gpr_zalloc ( sizeof ( wrapped_rr_closure_arg ) ) ;
GRPC_CLOSURE_INIT ( & wc_arg - > wrapper_closure , wrapped_rr_closure , wc_arg ,
grpc_schedule_on_exec_ctx ) ;
wc_arg - > rr_policy = glb_policy - > rr_policy ;
wc_arg - > target = target ;
wc_arg - > context = context ;
GPR_ASSERT ( glb_policy - > client_stats ! = nullptr ) ;
pick_done =
pick_from_internal_rr_locked ( glb_policy , false /* force_async */ , pp ) ;
wc_arg - > client_stats =
grpc_grpclb_client_stats_ref ( glb_policy - > client_stats ) ;
wc_arg - > wrapped_closure = on_complete ;
wc_arg - > lb_token_mdelem_storage = pick_args - > lb_token_mdelem_storage ;
wc_arg - > initial_metadata = pick_args - > initial_metadata ;
wc_arg - > free_when_done = wc_arg ;
wc_arg - > glb_policy = pol ;
pick_done = pick_from_internal_rr_locked (
glb_policy , pick_args , false /* force_async */ , target , wc_arg ) ;
}
} else { // glb_policy->rr_policy == NULL
if ( grpc_lb_glb_trace . enabled ( ) ) {
@ -1095,7 +1247,8 @@ static int glb_pick_locked(grpc_lb_policy* pol,
" [grpclb %p] No RR policy. Adding to grpclb's pending picks " ,
glb_policy ) ;
}
pending_pick_add ( & glb_policy - > pending_picks , pp ) ;
add_pending_pick ( & glb_policy - > pending_picks , pick_args , target , context ,
on_complete ) ;
if ( ! glb_policy - > started_picking ) {
start_picking_locked ( glb_policy ) ;
}
@ -1117,7 +1270,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if ( glb_policy - > rr_policy ) {
grpc_lb_policy_ping_one_locked ( glb_policy - > rr_policy , on_initiate , on_ack ) ;
} else {
pending_ping_add ( & glb_policy - > pending_pings , on_initiate , on_ack ) ;
add_ pending_ping( & glb_policy - > pending_pings , on_initiate , on_ack ) ;
if ( ! glb_policy - > started_picking ) {
start_picking_locked ( glb_policy ) ;
}
@ -1142,7 +1295,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
}
query_for_backends_locked ( glb_policy ) ;
}
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " grpclb_retry_timer " ) ;
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base , " grpclb_retry_timer " ) ;
}
static void maybe_restart_lb_call ( glb_lb_policy * glb_policy ) {
@ -1168,7 +1321,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy ) ;
}
}
GRPC_LB_POLICY_REF ( & glb_policy - > base , " grpclb_retry_timer " ) ;
GRPC_LB_POLICY_WEAK_ REF ( & glb_policy - > base , " grpclb_retry_timer " ) ;
GRPC_CLOSURE_INIT ( & glb_policy - > lb_on_call_retry ,
lb_call_on_retry_timer_locked , glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner ) ) ;
@ -1176,7 +1329,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
grpc_timer_init ( & glb_policy - > lb_call_retry_timer , next_try ,
& glb_policy - > lb_on_call_retry ) ;
}
GRPC_LB_POLICY_UNREF ( & glb_policy - > base ,
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base ,
" lb_on_server_status_received_locked " ) ;
}
@ -1200,7 +1353,7 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
glb_policy - > client_load_report_payload = nullptr ;
if ( error ! = GRPC_ERROR_NONE | | glb_policy - > lb_call = = nullptr ) {
glb_policy - > client_load_report_timer_callback_pending = false ;
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " client_load_report " ) ;
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base , " client_load_report " ) ;
if ( glb_policy - > lb_call = = nullptr ) {
maybe_restart_lb_call ( glb_policy ) ;
}
@ -1241,7 +1394,7 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy * glb_policy = ( glb_lb_policy * ) arg ;
if ( error = = GRPC_ERROR_CANCELLED | | glb_policy - > lb_call = = nullptr ) {
glb_policy - > client_load_report_timer_callback_pending = false ;
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " client_load_report " ) ;
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base , " client_load_report " ) ;
if ( glb_policy - > lb_call = = nullptr ) {
maybe_restart_lb_call ( glb_policy ) ;
}
@ -1394,8 +1547,10 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op - > flags = 0 ;
op - > reserved = nullptr ;
op + + ;
/* take a ref to be released in lb_on_sent_initial_request_locked() */
GRPC_LB_POLICY_REF ( & glb_policy - > base , " lb_on_sent_initial_request_locked " ) ;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero ) to be unref ' d in lb_on_sent_initial_request_locked ( ) */
GRPC_LB_POLICY_WEAK_REF ( & glb_policy - > base ,
" lb_on_sent_initial_request_locked " ) ;
call_error = grpc_call_start_batch_and_execute (
glb_policy - > lb_call , ops , ( size_t ) ( op - ops ) ,
& glb_policy - > lb_on_sent_initial_request ) ;
@ -1411,8 +1566,10 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op - > flags = 0 ;
op - > reserved = nullptr ;
op + + ;
/* take a ref to be released in lb_on_server_status_received_locked() */
GRPC_LB_POLICY_REF ( & glb_policy - > base , " lb_on_server_status_received_locked " ) ;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero ) to be unref ' d in lb_on_server_status_received_locked */
GRPC_LB_POLICY_WEAK_REF ( & glb_policy - > base ,
" lb_on_server_status_received_locked " ) ;
call_error = grpc_call_start_batch_and_execute (
glb_policy - > lb_call , ops , ( size_t ) ( op - ops ) ,
& glb_policy - > lb_on_server_status_received ) ;
@ -1424,8 +1581,9 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op - > flags = 0 ;
op - > reserved = nullptr ;
op + + ;
/* take a ref to be unref'd/reused in lb_on_response_received_locked() */
GRPC_LB_POLICY_REF ( & glb_policy - > base , " lb_on_response_received_locked " ) ;
/* take another weak ref to be unref'd/reused in
* lb_on_response_received_locked */
GRPC_LB_POLICY_WEAK_REF ( & glb_policy - > base , " lb_on_response_received_locked " ) ;
call_error = grpc_call_start_batch_and_execute (
glb_policy - > lb_call , ops , ( size_t ) ( op - ops ) ,
& glb_policy - > lb_on_response_received ) ;
@ -1440,7 +1598,8 @@ static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
if ( glb_policy - > client_load_report_payload ! = nullptr ) {
do_send_client_load_report_locked ( glb_policy ) ;
}
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " lb_on_sent_initial_request_locked " ) ;
GRPC_LB_POLICY_WEAK_UNREF ( & glb_policy - > base ,
" lb_on_sent_initial_request_locked " ) ;
}
static void lb_on_response_received_locked ( void * arg , grpc_error * error ) {
@ -1472,9 +1631,11 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
" client load reporting interval = % " PRIdPTR " milliseconds " ,
glb_policy , glb_policy - > client_stats_report_interval ) ;
}
/* take a ref to be unref'd in send_client_load_report_locked() */
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
* strong ref count goes to zero ) to be unref ' d in
* send_client_load_report_locked ( ) */
glb_policy - > client_load_report_timer_callback_pending = true ;
GRPC_LB_POLICY_REF ( & glb_policy - > base , " client_load_report " ) ;
GRPC_LB_POLICY_WEAK_ REF ( & glb_policy - > base , " client_load_report " ) ;
schedule_next_client_load_report ( glb_policy ) ;
} else if ( grpc_lb_glb_trace . enabled ( ) ) {
gpr_log ( GPR_INFO ,
@ -1556,20 +1717,20 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
op - > flags = 0 ;
op - > reserved = nullptr ;
op + + ;
/* reuse the "lb_on_response_received_locked" ref taken in
/* reuse the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked ( ) */
const grpc_call_error call_error = grpc_call_start_batch_and_execute (
glb_policy - > lb_call , ops , ( size_t ) ( op - ops ) ,
& glb_policy - > lb_on_response_received ) ; /* loop */
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
} else {
GRPC_LB_POLICY_UNREF ( & glb_policy - > base ,
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base ,
" lb_on_response_received_locked_shutdown " ) ;
}
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received_locked" ref taken in
/* dispose of the "lb_on_response_received_locked" weak ref taken in
* query_for_backends_locked ( ) and reused in every reception loop */
GRPC_LB_POLICY_UNREF ( & glb_policy - > base ,
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base ,
" lb_on_response_received_locked_empty_payload " ) ;
}
}
@ -1590,7 +1751,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
rr_handover_locked ( glb_policy ) ;
}
}
GRPC_LB_POLICY_UNREF ( & glb_policy - > base , " grpclb_fallback_timer " ) ;
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base , " grpclb_fallback_timer " ) ;
}
static void lb_on_server_status_received_locked ( void * arg , grpc_error * error ) {
@ -1674,7 +1835,7 @@ static void glb_update_locked(grpc_lb_policy* policy,
grpc_channel_get_channel_stack ( glb_policy - > lb_channel ) ) ;
GPR_ASSERT ( client_channel_elem - > filter = = & grpc_client_channel_filter ) ;
glb_policy - > watching_lb_channel = true ;
GRPC_LB_POLICY_REF ( & glb_policy - > base , " watch_lb_channel_connectivity " ) ;
GRPC_LB_POLICY_WEAK_ REF ( & glb_policy - > base , " watch_lb_channel_connectivity " ) ;
grpc_client_channel_watch_connectivity_state (
client_channel_elem ,
grpc_polling_entity_create_from_pollset_set (
@ -1730,7 +1891,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
case GRPC_CHANNEL_SHUTDOWN :
done :
glb_policy - > watching_lb_channel = false ;
GRPC_LB_POLICY_UNREF ( & glb_policy - > base ,
GRPC_LB_POLICY_WEAK_ UNREF ( & glb_policy - > base ,
" watch_lb_channel_connectivity_cb_shutdown " ) ;
break ;
}