@ -52,6 +52,8 @@
/* Client channel implementation */
grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER ( false ) ;
/*************************************************************************
* METHOD - CONFIG TABLE
*/
@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF ( error ) ) ;
}
}
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: setting connectivity state to %s " , chand ,
grpc_connectivity_state_name ( state ) ) ;
}
grpc_connectivity_state_set ( exec_ctx , & chand - > state_tracker , state , error ,
reason ) ;
}
@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state publish_state = w - > state ;
/* check if the notification is for the latest policy */
if ( w - > lb_policy = = w - > chand - > lb_policy ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: lb_policy=%p state changed to %s " , w - > chand ,
w - > lb_policy , grpc_connectivity_state_name ( w - > state ) ) ;
}
if ( publish_state = = GRPC_CHANNEL_SHUTDOWN & & w - > chand - > resolver ! = NULL ) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE ;
grpc_resolver_channel_saw_error_locked ( exec_ctx , w - > chand - > resolver ) ;
@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
watch_lb_policy_locked ( exec_ctx , w - > chand , w - > lb_policy , w - > state ) ;
}
}
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , w - > chand - > owning_stack , " watch_lb_policy " ) ;
gpr_free ( w ) ;
}
@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_connectivity_state current_state ) {
lb_policy_connectivity_watcher * w = gpr_malloc ( sizeof ( * w ) ) ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " watch_lb_policy " ) ;
w - > chand = chand ;
GRPC_CLOSURE_INIT ( & w - > on_changed , on_lb_policy_state_changed_locked , w ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
& w - > on_changed ) ;
}
static void start_resolving_locked ( grpc_exec_ctx * exec_ctx ,
channel_data * chand ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: starting name resolution " , chand ) ;
}
GPR_ASSERT ( ! chand - > started_resolving ) ;
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver , & chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
}
typedef struct {
char * server_name ;
grpc_server_retry_throttle_data * retry_throttle_data ;
@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
static void on_resolver_result_changed_locked ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) {
channel_data * chand = arg ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: got resolver result: error=%s " , chand ,
grpc_error_string ( error ) ) ;
}
// Extract the following fields from the resolver result, if non-NULL.
char * lb_policy_name = NULL ;
bool lb_policy_name_changed = false ;
grpc_lb_policy * new_lb_policy = NULL ;
char * service_config_json = NULL ;
grpc_server_retry_throttle_data * retry_throttle_data = NULL ;
@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
const bool lb_policy_typ e_changed =
lb_policy_nam e_changed =
chand - > info_lb_policy_name = = NULL | |
strcmp ( chand - > info_lb_policy_name , lb_policy_name ) ! = 0 ;
if ( chand - > lb_policy ! = NULL & & ! lb_policy_typ e_changed ) {
if ( chand - > lb_policy ! = NULL & & ! lb_policy_nam e_changed ) {
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked ( exec_ctx , chand - > lb_policy , & lb_policy_args ) ;
} else {
@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy ( exec_ctx , chand - > resolver_result ) ;
chand - > resolver_result = NULL ;
}
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p: resolver result: lb_policy_name= \" %s \" %s, "
" service_config= \" %s \" " ,
chand , lb_policy_name , lb_policy_name_changed ? " (changed) " : " " ,
service_config_json ) ;
}
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
if ( new_lb_policy ! = NULL | | error ! = GRPC_ERROR_NONE | |
chand - > resolver = = NULL ) {
if ( chand - > lb_policy ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: unreffing lb_policy=%p " , chand ,
chand - > lb_policy ) ;
}
grpc_pollset_set_del_pollset_set ( exec_ctx ,
chand - > lb_policy - > interested_parties ,
chand - > interested_parties ) ;
@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if ( error ! = GRPC_ERROR_NONE | | chand - > resolver = = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: shutting down " , chand ) ;
}
if ( chand - > resolver ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: shutting down resolver " , chand ) ;
}
grpc_resolver_shutdown_locked ( exec_ctx , chand - > resolver ) ;
GRPC_RESOLVER_UNREF ( exec_ctx , chand - > resolver , " channel " ) ;
chand - > resolver = NULL ;
@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error * state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " No load balancing policy " ) ;
if ( new_lb_policy ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p: initializing new LB policy " , chand ) ;
}
GRPC_ERROR_UNREF ( state_error ) ;
state = grpc_lb_policy_check_connectivity_locked ( exec_ctx , new_lb_policy ,
& state_error ) ;
@ -772,7 +817,9 @@ typedef struct client_channel_call_data {
gpr_atm subchannel_call_or_error ;
gpr_arena * arena ;
bool pick_pending ;
grpc_lb_policy * lb_policy ; // Holds ref while LB pick is pending.
grpc_closure lb_pick_closure ;
grpc_connected_subchannel * connected_subchannel ;
grpc_call_context_element subchannel_call_context [ GRPC_CONTEXT_COUNT ] ;
grpc_polling_entity * pollent ;
@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked(
}
static void waiting_for_pick_batches_fail_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld ,
grpc_call_element * elem ,
grpc_error * error ) {
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: failing % " PRIdPTR " pending batches: %s " ,
elem - > channel_data , calld , calld - > waiting_for_pick_batches_count ,
grpc_error_string ( error ) ) ;
}
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > waiting_for_pick_batches [ i ] , GRPC_ERROR_REF ( error ) ) ;
@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
}
static void waiting_for_pick_batches_resume_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld ) {
grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
if ( calld - > waiting_for_pick_batches_count = = 0 ) return ;
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
waiting_for_pick_batches_fail_locked ( exec_ctx , calld ,
waiting_for_pick_batches_fail_locked ( exec_ctx , elem ,
GRPC_ERROR_REF ( coe . error ) ) ;
return ;
}
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: sending % " PRIdPTR
" pending batches to subchannel_call=%p " ,
elem - > channel_data , calld , calld - > waiting_for_pick_batches_count ,
coe . subchannel_call ) ;
}
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call ,
calld - > waiting_for_pick_batches [ i ] ) ;
@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: applying service config to call " ,
chand , calld ) ;
}
if ( chand - > retry_throttle_data ! = NULL ) {
calld - > retry_throttle_data =
grpc_server_retry_throttle_data_ref ( chand - > retry_throttle_data ) ;
@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
}
static void create_subchannel_call_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld , grpc_error * error ) {
grpc_call_element * elem ,
grpc_error * error ) {
call_data * calld = elem - > call_data ;
grpc_subchannel_call * subchannel_call = NULL ;
const grpc_connected_subchannel_call_args call_args = {
. pollent = calld - > pollent ,
@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
. context = calld - > subchannel_call_context } ;
grpc_error * new_error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , & call_args , & subchannel_call ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: create subchannel_call=%p: error=%s " ,
elem - > channel_data , calld , subchannel_call ,
grpc_error_string ( new_error ) ) ;
}
GPR_ASSERT ( set_call_or_error (
calld , ( call_or_error ) { . subchannel_call = subchannel_call } ) ) ;
if ( new_error ! = GRPC_ERROR_NONE ) {
new_error = grpc_error_add_child ( new_error , error ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , new_error ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , new_error ) ;
} else {
waiting_for_pick_batches_resume_locked ( exec_ctx , calld ) ;
waiting_for_pick_batches_resume_locked ( exec_ctx , elem ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error * error ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
GPR_ASSERT ( calld - > pick_pending ) ;
calld - > pick_pending = false ;
grpc_polling_entity_del_from_pollset_set ( exec_ctx , calld - > pollent ,
chand - > interested_parties ) ;
call_or_error coe = get_call_or_error ( calld ) ;
@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
" Call dropped by load balancing policy " )
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Failed to create subchannel " , & error , 1 ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: failed to create subchannel: error=%s " , chand ,
calld , grpc_error_string ( failure ) ) ;
}
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( failure ) } ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , failure ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , failure ) ;
} else if ( coe . error ! = GRPC_ERROR_NONE ) {
/* already cancelled before subchannel became ready */
grpc_error * child_errors [ ] = { error , coe . error } ;
@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error_set_int ( cancellation_error , GRPC_ERROR_INT_GRPC_STATUS ,
GRPC_STATUS_DEADLINE_EXCEEDED ) ;
}
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , cancellation_error ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: cancelled before subchannel became ready: %s " ,
chand , calld , grpc_error_string ( cancellation_error ) ) ;
}
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , cancellation_error ) ;
} else {
/* Create call on subchannel. */
create_subchannel_call_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
create_subchannel_call_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
}
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
GRPC_ERROR_UNREF ( error ) ;
@ -983,41 +1063,77 @@ typedef struct {
grpc_closure closure ;
} pick_after_resolver_result_args ;
static void continue_picking_after_resolver_result_locked (
grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
static void pick_after_resolver_result_done_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error ) {
pick_after_resolver_result_args * args = arg ;
if ( args - > cancelled ) {
/* cancelled, do nothing */
} else if ( error ! = GRPC_ERROR_NONE ) {
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " call cancelled before resolver result " ) ;
}
} else {
if ( pick_subchannel_locked ( exec_ctx , args - > elem ) ) {
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_NONE ) ;
channel_data * chand = args - > elem - > channel_data ;
call_data * calld = args - > elem - > call_data ;
if ( error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: resolver failed to return data " ,
chand , calld ) ;
}
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
} else {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: resolver returned, doing pick " ,
chand , calld ) ;
}
if ( pick_subchannel_locked ( exec_ctx , args - > elem ) ) {
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_NONE ) ;
}
}
}
gpr_free ( args ) ;
}
static void cancel_pick_locked ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_error * error ) {
static void pick_after_resolver_result_start_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: deferring pick pending resolver result " , chand ,
calld ) ;
}
pick_after_resolver_result_args * args =
( pick_after_resolver_result_args * ) gpr_zalloc ( sizeof ( * args ) ) ;
args - > elem = elem ;
GRPC_CLOSURE_INIT ( & args - > closure , pick_after_resolver_result_done_locked ,
args , grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_closure_list_append ( & chand - > waiting_for_resolver_result_closures ,
& args - > closure , GRPC_ERROR_NONE ) ;
}
static void pick_after_resolver_result_cancel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( chand - > lb_policy ! = NULL ) {
grpc_lb_policy_cancel_pick_locked ( exec_ctx , chand - > lb_policy ,
& calld - > connected_subchannel ,
GRPC_ERROR_REF ( error ) ) ;
}
// If we don't yet have a resolver result, then a closure for
// continue_picking_after_resolver_result_locked() will have been added to
// pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
// cancelled, so that when continue_picking_after_resolver_result_locked()
// cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for ( grpc_closure * closure = chand - > waiting_for_resolver_result_closures . head ;
closure ! = NULL ; closure = closure - > next_data . next ) {
pick_after_resolver_result_args * args = closure - > cb_arg ;
if ( ! args - > cancelled & & args - > elem = = elem ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: "
" cancelling pick waiting for resolver result " ,
chand , calld ) ;
}
args - > cancelled = true ;
subchannel_ready_locked ( exec_ctx , elem ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GRPC_ERROR_UNREF ( error ) ;
}
// State for pick callback that holds a reference to the LB policy
// from which the pick was requested.
typedef struct {
grpc_lb_policy * lb_policy ;
grpc_call_element * elem ;
grpc_closure closure ;
} pick_callback_args ;
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
pick_callback_args * args = arg ;
GPR_ASSERT ( args ! = NULL ) ;
GPR_ASSERT ( args - > lb_policy ! = NULL ) ;
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , args - > lb_policy , " pick_subchannel " ) ;
gpr_free ( args ) ;
grpc_call_element * elem = arg ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: pick completed asynchronously " ,
chand , calld ) ;
}
GPR_ASSERT ( calld - > lb_policy ! = NULL ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , calld - > lb_policy , " pick_subchannel " ) ;
calld - > lb_policy = NULL ;
subchannel_ready_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
const grpc_lb_policy_pick_args * inputs ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
pick_callback_args * pick_args = gpr_zalloc ( sizeof ( * pick_args ) ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: starting pick on lb_policy=%p " ,
chand , calld , chand - > lb_policy ) ;
}
// Keep a ref to the LB policy in calld while the pick is pending.
GRPC_LB_POLICY_REF ( chand - > lb_policy , " pick_subchannel " ) ;
pick_args - > lb_policy = chand - > lb_policy ;
pick_args - > elem = elem ;
GRPC_CLOSURE_INIT ( & pick_args - > closure , pick_callback_done_locked , pick_args ,
calld - > lb_policy = chand - > lb_policy ;
GRPC_CLOSURE_INIT ( & calld - > lb_pick_closure , pick_callback_done_locked , elem ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
const bool pick_done = grpc_lb_policy_pick_locked (
exec_ctx , chand - > lb_policy , inputs , & calld - > connected_subchannel ,
calld - > subchannel_call_context , NULL , & pick_args - > closure ) ;
calld - > subchannel_call_context , NULL , & calld - > lb_pick_ closure) ;
if ( pick_done ) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF ( exec_ctx , chand - > lb_policy , " pick_subchannel " ) ;
gpr_free ( pick_args ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: pick completed synchronously " ,
chand , calld ) ;
}
GRPC_LB_POLICY_UNREF ( exec_ctx , calld - > lb_policy , " pick_subchannel " ) ;
calld - > lb_policy = NULL ;
}
return pick_done ;
}
static void pick_callback_cancel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
GPR_ASSERT ( calld - > lb_policy ! = NULL ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: cancelling pick from LB policy %p " ,
chand , calld , calld - > lb_policy ) ;
}
grpc_lb_policy_cancel_pick_locked ( exec_ctx , calld - > lb_policy ,
& calld - > connected_subchannel , error ) ;
}
static bool pick_subchannel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
GPR_TIMER_BEGIN ( " pick_subchannel " , 0 ) ;
@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
pick_done = pick_callback_start_locked ( exec_ctx , elem , & inputs ) ;
} else if ( chand - > resolver ! = NULL ) {
if ( ! chand - > started_resolving ) {
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
start_resolving_locked ( exec_ctx , chand ) ;
}
pick_after_resolver_result_args * args =
( pick_after_resolver_result_args * ) gpr_zalloc ( sizeof ( * args ) ) ;
args - > elem = elem ;
GRPC_CLOSURE_INIT ( & args - > closure ,
continue_picking_after_resolver_result_locked , args ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_closure_list_append ( & chand - > waiting_for_resolver_result_closures ,
& args - > closure , GRPC_ERROR_NONE ) ;
pick_after_resolver_result_start_locked ( exec_ctx , elem ) ;
} else {
subchannel_ready_locked (
exec_ctx , elem , GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Disconnected " ) ) ;
@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void * arg ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " start_transport_stream_op_batch_locked " , 0 ) ;
grpc_transport_stream_op_batch * op = arg ;
grpc_call_element * elem = op - > handler_private . extra_arg ;
grpc_transport_stream_op_batch * batch = arg ;
grpc_call_element * elem = batch - > handler_private . extra_arg ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , grpc_error_string ( coe . error ) ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , op , GRPC_ERROR_REF ( coe . error ) ) ;
exec_ctx , batch , GRPC_ERROR_REF ( coe . error ) ) ;
goto done ;
}
if ( coe . subchannel_call ! = NULL ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , op ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: sending batch to subchannel_call=%p " , chand ,
calld , coe . subchannel_call ) ;
}
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , batch ) ;
goto done ;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
waiting_for_pick_batches_add_locked ( calld , op ) ;
/* if this is a cancellation, then we can raise our cancelled flag */
if ( op - > cancel_stream ) {
grpc_error * error = op - > payload - > cancel_stream . cancel_error ;
waiting_for_pick_batches_add_locked ( calld , batch ) ;
// If this is a cancellation, cancel the pending pick (if any) and
// fail any pending batches.
if ( batch - > cancel_stream ) {
grpc_error * error = batch - > payload - > cancel_stream . cancel_error ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: recording cancel_error=%s " , chand ,
calld , grpc_error_string ( error ) ) ;
}
/* Stash a copy of cancel_error in our call data, so that we can use
it for subsequent operations . This ensures that if the call is
cancelled before any ops are passed down ( e . g . , if the deadline
cancelled before any batche s are passed down ( e . g . , if the deadline
is in the past when the call starts ) , we can return the right
error to the caller when the first op does get passed down . */
error to the caller when the first batch does get passed down . */
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
if ( calld - > pick_pending ) {
cancel_pick_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
if ( calld - > lb_policy ! = NULL ) {
pick_callback_cancel_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
} else {
pick_after_resolver_result_cancel_locked ( exec_ctx , elem ,
GRPC_ERROR_REF ( error ) ) ;
}
waiting_for_pick_batches_fail_locked ( exec_ctx , calld ,
GRPC_ERROR_REF ( error ) ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
goto done ;
}
/* if we don't have a subchannel, try to get one */
if ( ! calld - > pick_pending & & calld - > connected_subchannel = = NULL & &
op - > send_initial_metadata ) {
calld - > initial_metadata_payload = op - > payload ;
calld - > pick_pending = true ;
if ( batch - > send_initial_metadata ) {
GPR_ASSERT ( calld - > connected_subchannel = = NULL ) ;
calld - > initial_metadata_payload = batch - > payload ;
GRPC_CALL_STACK_REF ( calld - > owning_call , " pick_subchannel " ) ;
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data ' s interested_parties , so
that IO of the lb_policy and resolver could be done under it . */
if ( pick_subchannel_locked ( exec_ctx , elem ) ) {
// Pick was returned synchronously.
calld - > pick_pending = false ;
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
if ( calld - > connected_subchannel = = NULL ) {
grpc_error * error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " ) ;
set_call_or_error ( calld ,
( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , error ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , error ) ;
} else {
// Create subchannel call.
create_subchannel_call_locked ( exec_ctx , calld , GRPC_ERROR_NONE ) ;
create_subchannel_call_locked ( exec_ctx , elem , GRPC_ERROR_NONE ) ;
}
} else {
grpc_polling_entity_add_to_pollset_set ( exec_ctx , calld - > pollent ,
@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
If it has , we proceed on the fast path . */
static void cc_start_transport_stream_op_batch (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_transport_stream_op_batch * op ) {
grpc_transport_stream_op_batch * batch ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) | |
GRPC_TRACER_ON ( grpc_trace_channel ) ) {
grpc_call_log_op ( GPR_INFO , elem , batch ) ;
}
if ( chand - > deadline_checking_enabled ) {
grpc_deadline_state_client_start_transport_stream_op_batch ( exec_ctx , elem ,
op ) ;
batch ) ;
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if ( op - > recv_trailing_metadata ) {
GPR_ASSERT ( op - > on_complete ! = NULL ) ;
calld - > original_on_complete = op - > on_complete ;
if ( batch - > recv_trailing_metadata ) {
GPR_ASSERT ( batch - > on_complete ! = NULL ) ;
calld - > original_on_complete = batch - > on_complete ;
GRPC_CLOSURE_INIT ( & calld - > on_complete , on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
op - > on_complete = & calld - > on_complete ;
batch - > on_complete = & calld - > on_complete ;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error ( calld ) ;
GPR_TIMER_BEGIN ( " cc_start_transport_stream_op_batch " , 0 ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , grpc_error_string ( coe . error ) ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , op , GRPC_ERROR_REF ( coe . error ) ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op_batch " , 0 ) ;
/* early out */
return ;
exec_ctx , batch , GRPC_ERROR_REF ( coe . error ) ) ;
goto done ;
}
if ( coe . subchannel_call ! = NULL ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , op ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op_batch " , 0 ) ;
/* early out */
return ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: sending batch to subchannel_call=%p " , chand ,
calld , coe . subchannel_call ) ;
}
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , batch ) ;
goto done ;
}
/* we failed; lock and figure out what to do */
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: entering combiner " , chand , calld ) ;
}
GRPC_CALL_STACK_REF ( calld - > owning_call , " start_transport_stream_op_batch " ) ;
op - > handler_private . extra_arg = elem ;
batch - > handler_private . extra_arg = elem ;
GRPC_CLOSURE_SCHED (
exec_ctx , GRPC_CLOSURE_INIT ( & op - > handler_private . closure ,
start_transport_stream_op_batch_locked , op ,
exec_ctx , GRPC_CLOSURE_INIT ( & batch - > handler_private . closure ,
start_transport_stream_op_batch_locked , batch ,
grpc_combiner_scheduler ( chand - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
done :
GPR_TIMER_END ( " cc_start_transport_stream_op_batch " , 0 ) ;
}
@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_CALL_UNREF ( exec_ctx , coe . subchannel_call ,
" client_channel_destroy_call " ) ;
}
GPR_ASSERT ( ! calld - > pick_pending ) ;
GPR_ASSERT ( calld - > lb_policy = = NULL ) ;
GPR_ASSERT ( calld - > waiting_for_pick_batches_count = = 0 ) ;
if ( calld - > connected_subchannel ! = NULL ) {
GRPC_CONNECTED_SUBCHANNEL_UNREF ( exec_ctx , calld - > connected_subchannel ,
@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
chand - > exit_idle_when_lb_policy_arrives = true ;
if ( ! chand - > started_resolving & & chand - > resolver ! = NULL ) {
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
chand - > started_resolving = true ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
start_resolving_locked ( exec_ctx , chand ) ;
}
}
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , chand - > owning_stack , " try_to_connect " ) ;