@ -178,8 +178,8 @@ typedef struct client_channel_channel_data {
grpc_slice_hash_table * method_params_table ;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args * resolver_result ;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config _closures ;
/** a list of closures that are all waiting for resolver result to come in */
grpc_closure_list waiting_for_resolver_result _closures ;
/** resolver callback */
grpc_closure on_resolver_result_changed ;
/** connectivity state being tracked */
@ -342,49 +342,15 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
// Wrap a closure associated with \a lb_policy. The associated callback (\a
// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
// scheduling \a wrapped_closure.
typedef struct wrapped_on_pick_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure ;
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance , respectively . */
grpc_closure * wrapped_closure ;
/* The policy instance related to the closure */
grpc_lb_policy * lb_policy ;
} wrapped_on_pick_closure_arg ;
// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
static void wrapped_on_pick_closure_cb ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
wrapped_on_pick_closure_arg * wc_arg = arg ;
GPR_ASSERT ( wc_arg ! = NULL ) ;
GPR_ASSERT ( wc_arg - > wrapped_closure ! = NULL ) ;
GPR_ASSERT ( wc_arg - > lb_policy ! = NULL ) ;
GRPC_CLOSURE_RUN ( exec_ctx , wc_arg - > wrapped_closure , GRPC_ERROR_REF ( error ) ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , wc_arg - > lb_policy , " pick_subchannel_wrapping " ) ;
gpr_free ( wc_arg ) ;
}
static void on_resolver_result_changed_locked ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) {
channel_data * chand = arg ;
// Extract the following fields from the resolver result, if non-NULL.
char * lb_policy_name = NULL ;
grpc_lb_policy * lb_policy = NULL ;
grpc_lb_policy * old_lb_policy = NULL ;
grpc_slice_hash_table * method_params_table = NULL ;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE ;
bool exit_idle = false ;
grpc_error * state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " No load balancing policy " ) ;
grpc_lb_policy * new_lb_policy = NULL ;
char * service_config_json = NULL ;
service_config_parsing_state parsing_state ;
memset ( & parsing_state , 0 , sizeof ( parsing_state ) ) ;
bool lb_policy_updated = false ;
grpc_server_retry_throttle_data * retry_throttle_data = NULL ;
grpc_slice_hash_table * method_params_table = NULL ;
if ( chand - > resolver_result ! = NULL ) {
// Find LB policy name.
const grpc_arg * channel_arg =
@ -419,32 +385,29 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if ( lb_policy_name = = NULL ) lb_policy_name = " pick_first " ;
// Instantiate LB policy.
grpc_lb_policy_args lb_policy_args ;
lb_policy_args . args = chand - > resolver_result ;
lb_policy_args . client_channel_factory = chand - > client_channel_factory ;
lb_policy_args . combiner = chand - > combiner ;
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
const bool lb_policy_type_changed =
( chand - > info_lb_policy_name = = NULL ) | |
( strcmp ( chand - > info_lb_policy_name , lb_policy_name ) ! = 0 ) ;
chand - > info_lb_policy_name = = NULL | |
strcmp ( chand - > info_lb_policy_name , lb_policy_name ) ! = 0 ;
if ( chand - > lb_policy ! = NULL & & ! lb_policy_type_changed ) {
// update
lb_policy_updated = true ;
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked ( exec_ctx , chand - > lb_policy , & lb_policy_args ) ;
} else {
lb_policy =
// Instantiate new LB policy.
new_lb_policy =
grpc_lb_policy_create ( exec_ctx , lb_policy_name , & lb_policy_args ) ;
if ( lb_policy ! = NULL ) {
GRPC_LB_POLICY_REF ( lb_policy , " config_change " ) ;
GRPC_ERROR_UNREF ( state_error ) ;
state = grpc_lb_policy_check_connectivity_locked ( exec_ctx , lb_policy ,
& state_error ) ;
old_lb_policy = chand - > lb_policy ;
chand - > lb_policy = lb_policy ;
if ( new_lb_policy = = NULL ) {
gpr_log ( GPR_ERROR , " could not create LB policy \" %s \" " , lb_policy_name ) ;
}
}
// Find service config.
channel_arg =
grpc_channel_args_find ( chand - > resolver_result , GRPC_ARG_SERVICE_CONFIG ) ;
@ -461,12 +424,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_uri * uri =
grpc_uri_parse ( exec_ctx , channel_arg - > value . string , true ) ;
GPR_ASSERT ( uri - > path [ 0 ] ! = ' \0 ' ) ;
service_config_parsing_state parsing_state ;
memset ( & parsing_state , 0 , sizeof ( parsing_state ) ) ;
parsing_state . server_name =
uri - > path [ 0 ] = = ' / ' ? uri - > path + 1 : uri - > path ;
grpc_service_config_parse_global_params (
service_config , parse_retry_throttle_params , & parsing_state ) ;
parsing_state . server_name = NULL ;
grpc_uri_destroy ( uri ) ;
retry_throttle_data = parsing_state . retry_throttle_data ;
method_params_table = grpc_service_config_create_method_config_table (
exec_ctx , service_config , method_parameters_create_from_json ,
method_parameters_free ) ;
@ -480,12 +445,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy ( exec_ctx , chand - > resolver_result ) ;
chand - > resolver_result = NULL ;
}
if ( lb_policy ! = NULL ) {
grpc_pollset_set_add_pollset_set ( exec_ctx , lb_policy - > interested_parties ,
chand - > interested_parties ) ;
}
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
//
// First, swap out the data used by cc_get_channel_info().
gpr_mu_lock ( & chand - > info_mu ) ;
if ( lb_policy_name ! = NULL ) {
gpr_free ( chand - > info_lb_policy_name ) ;
@ -496,75 +460,77 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand - > info_service_config_json = service_config_json ;
}
gpr_mu_unlock ( & chand - > info_mu ) ;
// Swap out the retry throttle data.
if ( chand - > retry_throttle_data ! = NULL ) {
grpc_server_retry_throttle_data_unref ( chand - > retry_throttle_data ) ;
}
chand - > retry_throttle_data = parsing_state . retry_throttle_data ;
chand - > retry_throttle_data = retry_throttle_data ;
// Swap out the method params table.
if ( chand - > method_params_table ! = NULL ) {
grpc_slice_hash_table_unref ( exec_ctx , chand - > method_params_table ) ;
}
chand - > method_params_table = method_params_table ;
if ( lb_policy ! = NULL ) {
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & chand - > waiting_for_config_closures ) ;
} else if ( chand - > resolver = = NULL /* disconnected */ ) {
grpc_closure_list_fail_all ( & chand - > waiting_for_config_closures ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Channel disconnected " , & error , 1 ) ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & chand - > waiting_for_config_closures ) ;
}
if ( ! lb_policy_updated & & lb_policy ! = NULL & &
chand - > exit_idle_when_lb_policy_arrives ) {
GRPC_LB_POLICY_REF ( lb_policy , " exit_idle " ) ;
exit_idle = true ;
chand - > exit_idle_when_lb_policy_arrives = false ;
}
if ( error = = GRPC_ERROR_NONE & & chand - > resolver ) {
if ( ! lb_policy_updated ) {
set_channel_connectivity_state_locked ( exec_ctx , chand , state ,
GRPC_ERROR_REF ( state_error ) ,
" new_lb+resolver " ) ;
if ( lb_policy ! = NULL ) {
watch_lb_policy_locked ( exec_ctx , chand , lb_policy , state ) ;
}
// If we have a new LB policy or are shutting down (in which case
// new_lb_policy will be NULL), swap out the LB policy, unreffing the
// old one and removing its fds from chand->interested_parties.
// Note that we do NOT do this if either (a) we updated the existing
// LB policy above or (b) we failed to create the new LB policy (in
// which case we want to continue using the most recent one we had).
if ( new_lb_policy ! = NULL | | error ! = GRPC_ERROR_NONE | |
chand - > resolver = = NULL ) {
if ( chand - > lb_policy ! = NULL ) {
grpc_pollset_set_del_pollset_set ( exec_ctx ,
chand - > lb_policy - > interested_parties ,
chand - > interested_parties ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , chand - > lb_policy , " channel " ) ;
}
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
} else {
chand - > lb_policy = new_lb_policy ;
}
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if ( error ! = GRPC_ERROR_NONE | | chand - > resolver = = NULL ) {
if ( chand - > resolver ! = NULL ) {
grpc_resolver_shutdown_locked ( exec_ctx , chand - > resolver ) ;
GRPC_RESOLVER_UNREF ( exec_ctx , chand - > resolver , " channel " ) ;
chand - > resolver = NULL ;
}
grpc_error * refs [ ] = { error , state_error } ;
set_channel_connectivity_state_locked (
exec_ctx , chand , GRPC_CHANNEL_SHUTDOWN ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Got config after disconnection " , refs , GPR_ARRAY_SIZE ( refs ) ) ,
" Got resolver result after disconnection " , & error , 1 ) ,
" resolver_gone " ) ;
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , chand - > owning_stack , " resolver " ) ;
grpc_closure_list_fail_all ( & chand - > waiting_for_resolver_result_closures ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Channel disconnected " , & error , 1 ) ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx ,
& chand - > waiting_for_resolver_result_closures ) ;
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE ;
grpc_error * state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " No load balancing policy " ) ;
if ( new_lb_policy ! = NULL ) {
GRPC_ERROR_UNREF ( state_error ) ;
state = grpc_lb_policy_check_connectivity_locked ( exec_ctx , new_lb_policy ,
& state_error ) ;
grpc_pollset_set_add_pollset_set ( exec_ctx ,
new_lb_policy - > interested_parties ,
chand - > interested_parties ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx ,
& chand - > waiting_for_resolver_result_closures ) ;
if ( chand - > exit_idle_when_lb_policy_arrives ) {
grpc_lb_policy_exit_idle_locked ( exec_ctx , new_lb_policy ) ;
chand - > exit_idle_when_lb_policy_arrives = false ;
}
watch_lb_policy_locked ( exec_ctx , chand , new_lb_policy , state ) ;
}
set_channel_connectivity_state_locked (
exec_ctx , chand , state , GRPC_ERROR_REF ( state_error ) , " new_lb+resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
GRPC_ERROR_UNREF ( state_error ) ;
}
if ( ! lb_policy_updated & & lb_policy ! = NULL & & exit_idle ) {
grpc_lb_policy_exit_idle_locked ( exec_ctx , lb_policy ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , lb_policy , " exit_idle " ) ;
}
if ( old_lb_policy ! = NULL ) {
grpc_pollset_set_del_pollset_set (
exec_ctx , old_lb_policy - > interested_parties , chand - > interested_parties ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , old_lb_policy , " channel " ) ;
old_lb_policy = NULL ;
}
if ( ! lb_policy_updated & & lb_policy ! = NULL ) {
GRPC_LB_POLICY_UNREF ( exec_ctx , lb_policy , " config_change " ) ;
}
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , chand - > owning_stack , " resolver " ) ;
GRPC_ERROR_UNREF ( state_error ) ;
}
static void start_transport_op_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
@ -602,9 +568,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF ( exec_ctx , chand - > resolver , " channel " ) ;
chand - > resolver = NULL ;
if ( ! chand - > started_resolving ) {
grpc_closure_list_fail_all ( & chand - > waiting_for_config _closures ,
grpc_closure_list_fail_all ( & chand - > waiting_for_resolver_result _closures ,
GRPC_ERROR_REF ( op - > disconnect_with_error ) ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & chand - > waiting_for_config_closures ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx ,
& chand - > waiting_for_resolver_result_closures ) ;
}
if ( chand - > lb_policy ! = NULL ) {
grpc_pollset_set_del_pollset_set ( exec_ctx ,
@ -770,6 +737,16 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
* PER - CALL FUNCTIONS
*/
// Max number of batches that can be pending on a call at any given
// time. This includes:
// recv_initial_metadata
// send_initial_metadata
// recv_message
// send_message
// recv_trailing_metadata
// send_trailing_metadata
# define MAX_WAITING_BATCHES 6
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer .
Handles queueing of stream ops until a call object is ready , waiting
@ -800,11 +777,10 @@ typedef struct client_channel_call_data {
grpc_call_context_element subchannel_call_context [ GRPC_CONTEXT_COUNT ] ;
grpc_polling_entity * pollent ;
grpc_transport_stream_op_batch * * waiting_ops ;
size_t waiting_ops_count ;
size_t waiting_ops_capacity ;
grpc_transport_stream_op_batch * waiting_for_pick_batches [ MAX_WAITING_BATCHES ] ;
size_t waiting_for_pick_batches_count ;
grpc_closure next_step ;
grpc_transport_stream_op_batch_payload * initial_metadata_payload ;
grpc_call_stack * owning_call ;
@ -853,57 +829,44 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return get_call_or_error ( call_elem - > call_data ) . subchannel_call ;
}
static void add_waiting_locked ( call_data * calld ,
grpc_transport_stream_op_batch * op ) {
GPR_TIMER_BEGIN ( " add_waiting_locked " , 0 ) ;
if ( calld - > waiting_ops_count = = calld - > waiting_ops_capacity ) {
calld - > waiting_ops_capacity = GPR_MAX ( 3 , 2 * calld - > waiting_ops_capacity ) ;
calld - > waiting_ops =
gpr_realloc ( calld - > waiting_ops ,
calld - > waiting_ops_capacity * sizeof ( * calld - > waiting_ops ) ) ;
}
calld - > waiting_ops [ calld - > waiting_ops_count + + ] = op ;
GPR_TIMER_END ( " add_waiting_locked " , 0 ) ;
static void waiting_for_pick_batches_add_locked (
call_data * calld , grpc_transport_stream_op_batch * batch ) {
GPR_ASSERT ( calld - > waiting_for_pick_batches_count < MAX_WAITING_BATCHES ) ;
calld - > waiting_for_pick_batches [ calld - > waiting_for_pick_batches_count + + ] =
batch ;
}
static void fail_locked ( grpc_exec_ctx * exec_ctx , call_data * calld ,
grpc_error * error ) {
size_t i ;
for ( i = 0 ; i < calld - > waiting_ops_count ; i + + ) {
static void waiting_for_pick_batches_fail_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld ,
grpc_error * error ) {
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > waiting_ops [ i ] , GRPC_ERROR_REF ( error ) ) ;
exec_ctx , calld - > waiting_f or_ pick_batche s [ i ] , GRPC_ERROR_REF ( error ) ) ;
}
calld - > waiting_ops_count = 0 ;
calld - > waiting_f or_ pick_batche s_count = 0 ;
GRPC_ERROR_UNREF ( error ) ;
}
static void retry_waiting_locked ( grpc_exec_ctx * exec_ctx , call_data * calld ) {
if ( calld - > waiting_ops_count = = 0 ) {
return ;
}
call_or_error call = get_call_or_error ( calld ) ;
grpc_transport_stream_op_batch * * ops = calld - > waiting_ops ;
size_t nops = calld - > waiting_ops_count ;
if ( call . error ! = GRPC_ERROR_NONE ) {
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( call . error ) ) ;
static void waiting_for_pick_batches_resume_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld ) {
if ( calld - > waiting_for_pick_batches_count = = 0 ) return ;
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
waiting_for_pick_batches_fail_locked ( exec_ctx , calld ,
GRPC_ERROR_REF ( coe . error ) ) ;
return ;
}
calld - > waiting_ops = NULL ;
calld - > waiting_ops_count = 0 ;
calld - > waiting_ops_capacity = 0 ;
for ( size_t i = 0 ; i < nops ; i + + ) {
grpc_subchannel_call_process_op ( exec_ctx , call . subchannel_call , ops [ i ] ) ;
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call ,
calld - > waiting_for_pick_batches [ i ] ) ;
}
gpr_free ( ops ) ;
calld - > waiting_for_pick_batches_count = 0 ;
}
// Sets calld->method_params and calld->retry_throttle_data.
// If the method params specify a timeout, populates
// *per_method_deadline and returns true.
static bool set_call_method_params_from_service_config_locked (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
gpr_timespec * per_method_deadline ) {
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
static void apply_service_config_to_call_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( chand - > retry_throttle_data ! = NULL ) {
@ -915,39 +878,48 @@ static bool set_call_method_params_from_service_config_locked(
exec_ctx , chand - > method_params_table , calld - > path ) ;
if ( calld - > method_params ! = NULL ) {
method_parameters_ref ( calld - > method_params ) ;
if ( gpr_time_cmp ( calld - > method_params - > timeout ,
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if ( chand - > deadline_checking_enabled & &
gpr_time_cmp ( calld - > method_params - > timeout ,
gpr_time_0 ( GPR_TIMESPAN ) ) ! = 0 ) {
* per_method_deadline =
const gpr_timespec per_method_deadline =
gpr_time_add ( calld - > call_start_time , calld - > method_params - > timeout ) ;
return true ;
if ( gpr_time_cmp ( per_method_deadline , calld - > deadline ) < 0 ) {
calld - > deadline = per_method_deadline ;
grpc_deadline_state_reset ( exec_ctx , elem , calld - > deadline ) ;
}
}
}
}
return false ;
}
static void apply_final_configuration_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
/* apply service-config level configuration to the call (now that we're
* certain it exists ) */
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
gpr_timespec per_method_deadline ;
if ( set_call_method_params_from_service_config_locked ( exec_ctx , elem ,
& per_method_deadline ) ) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if ( chand - > deadline_checking_enabled & &
gpr_time_cmp ( per_method_deadline , calld - > deadline ) < 0 ) {
calld - > deadline = per_method_deadline ;
grpc_deadline_state_reset ( exec_ctx , elem , calld - > deadline ) ;
}
static void create_subchannel_call_locked ( grpc_exec_ctx * exec_ctx ,
call_data * calld , grpc_error * error ) {
grpc_subchannel_call * subchannel_call = NULL ;
const grpc_connected_subchannel_call_args call_args = {
. pollent = calld - > pollent ,
. path = calld - > path ,
. start_time = calld - > call_start_time ,
. deadline = calld - > deadline ,
. arena = calld - > arena ,
. context = calld - > subchannel_call_context } ;
grpc_error * new_error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , & call_args , & subchannel_call ) ;
GPR_ASSERT ( set_call_or_error (
calld , ( call_or_error ) { . subchannel_call = subchannel_call } ) ) ;
if ( new_error ! = GRPC_ERROR_NONE ) {
new_error = grpc_error_add_child ( new_error , error ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , new_error ) ;
} else {
waiting_for_pick_batches_resume_locked ( exec_ctx , calld ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
static void subchannel_ready_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
static void subchannel_ready_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
grpc_call_element * elem = arg ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
GPR_ASSERT ( calld - > pick_pending ) ;
@ -956,6 +928,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
chand - > interested_parties ) ;
call_or_error coe = get_call_or_error ( calld ) ;
if ( calld - > connected_subchannel = = NULL ) {
// Failed to create subchannel.
grpc_error * failure =
error = = GRPC_ERROR_NONE
? GRPC_ERROR_CREATE_FROM_STATIC_STRING (
@ -963,7 +936,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Failed to create subchannel " , & error , 1 ) ;
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( failure ) } ) ;
fail_locked ( exec_ctx , calld , failure ) ;
waiting_for_pick_batches_ fail_locked( exec_ctx , calld , failure ) ;
} else if ( coe . error ! = GRPC_ERROR_NONE ) {
/* already cancelled before subchannel became ready */
grpc_error * child_errors [ ] = { error , coe . error } ;
@ -977,29 +950,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error_set_int ( cancellation_error , GRPC_ERROR_INT_GRPC_STATUS ,
GRPC_STATUS_DEADLINE_EXCEEDED ) ;
}
fail_locked ( exec_ctx , calld , cancellation_error ) ;
waiting_for_pick_batches_ fail_locked( exec_ctx , calld , cancellation_error ) ;
} else {
/* Create call on subchannel. */
grpc_subchannel_call * subchannel_call = NULL ;
const grpc_connected_subchannel_call_args call_args = {
. pollent = calld - > pollent ,
. path = calld - > path ,
. start_time = calld - > call_start_time ,
. deadline = calld - > deadline ,
. arena = calld - > arena ,
. context = calld - > subchannel_call_context } ;
grpc_error * new_error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , & call_args , & subchannel_call ) ;
GPR_ASSERT ( set_call_or_error (
calld , ( call_or_error ) { . subchannel_call = subchannel_call } ) ) ;
if ( new_error ! = GRPC_ERROR_NONE ) {
new_error = grpc_error_add_child ( new_error , error ) ;
fail_locked ( exec_ctx , calld , new_error ) ;
} else {
retry_waiting_locked ( exec_ctx , calld ) ;
}
create_subchannel_call_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
}
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
GRPC_ERROR_UNREF ( error ) ;
}
static char * cc_get_peer ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ) {
@ -1013,41 +970,32 @@ static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
}
}
/** Return true if subchannel is available immediately (in which case
subchannel_ready_locked ( ) should not be called ) , or false otherwise ( in
which case subchannel_ready_locked ( ) should be called when the subchannel
is available ) . */
static bool pick_subchannel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) ;
typedef struct {
grpc_metadata_batch * initial_metadata ;
uint32_t initial_metadata_flags ;
grpc_connected_subchannel * * connected_subchannel ;
grpc_call_context_element * subchannel_call_context ;
grpc_closure * on_ready ;
grpc_call_element * elem ;
bool cancelled ;
grpc_closure closure ;
} continue_picking _args;
} pick_after_resolver_result_args ;
/** Return true if subchannel is available immediately (in which case on_ready
should not be called ) , or false otherwise ( in which case on_ready should be
called when the subchannel is available ) . */
static bool pick_subchannel_locked (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_metadata_batch * initial_metadata , uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_call_context_element * subchannel_call_context , grpc_closure * on_ready ) ;
static void continue_picking_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
continue_picking_args * cpa = arg ;
if ( cpa - > connected_subchannel = = NULL ) {
static void continue_picking_after_resolver_result_locked (
grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
pick_after_resolver_result_args * args = arg ;
if ( args - > cancelled ) {
/* cancelled, do nothing */
} else if ( error ! = GRPC_ERROR_NONE ) {
GRPC_CLOSURE_SCHED ( exec_ctx , cp a- > on_ready , GRPC_ERROR_REF ( error ) ) ;
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
} else {
if ( pick_subchannel_locked ( exec_ctx , cpa - > elem , cpa - > initial_metadata ,
cpa - > initial_metadata_flags ,
cpa - > connected_subchannel ,
cpa - > subchannel_call_context , cpa - > on_ready ) ) {
GRPC_CLOSURE_SCHED ( exec_ctx , cpa - > on_ready , GRPC_ERROR_NONE ) ;
if ( pick_subchannel_locked ( exec_ctx , args - > elem ) ) {
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_NONE ) ;
}
}
gpr_free ( cp a) ;
gpr_free ( args ) ;
}
static void cancel_pick_locked ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
@ -1059,39 +1007,85 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
& calld - > connected_subchannel ,
GRPC_ERROR_REF ( error ) ) ;
}
for ( grpc_closure * closure = chand - > waiting_for_config_closures . head ;
// If we don't yet have a resolver result, then a closure for
// continue_picking_after_resolver_result_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
// cancelled, so that when continue_picking_after_resolver_result_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for ( grpc_closure * closure = chand - > waiting_for_resolver_result_closures . head ;
closure ! = NULL ; closure = closure - > next_data . next ) {
continue_picking_args * cpa = closure - > cb_arg ;
if ( cpa - > connected_subchannel = = & calld - > connected_subchannel ) {
cpa - > connected_subchannel = NULL ;
GRPC_CLOSURE_SCHED ( exec_ctx , cpa - > on_ready ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick cancelled " , & error , 1 ) ) ;
pick_after_resolver_result _args * args = closure - > cb_arg ;
if ( ! args - > cancelled & & args - > elem = = elem ) {
args - > cancelled = true ;
subchannel_ready_locked ( exec_ctx , elem ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick cancelled " , & error , 1 ) ) ;
}
}
GRPC_ERROR_UNREF ( error ) ;
}
static bool pick_subchannel_locked (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_metadata_batch * initial_metadata , uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_call_context_element * subchannel_call_context ,
grpc_closure * on_ready ) {
GPR_TIMER_BEGIN ( " pick_subchannel " , 0 ) ;
// State for pick callback that holds a reference to the LB policy
// from which the pick was requested.
typedef struct {
grpc_lb_policy * lb_policy ;
grpc_call_element * elem ;
grpc_closure closure ;
} pick_callback_args ;
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
pick_callback_args * args = arg ;
GPR_ASSERT ( args ! = NULL ) ;
GPR_ASSERT ( args - > lb_policy ! = NULL ) ;
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , args - > lb_policy , " pick_subchannel " ) ;
gpr_free ( args ) ;
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
// If the pick was completed synchronously, unrefs the LB policy and
// returns true.
static bool pick_callback_start_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
const grpc_lb_policy_pick_args * inputs ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
pick_callback_args * pick_args = gpr_zalloc ( sizeof ( * pick_args ) ) ;
GRPC_LB_POLICY_REF ( chand - > lb_policy , " pick_subchannel " ) ;
pick_args - > lb_policy = chand - > lb_policy ;
pick_args - > elem = elem ;
GRPC_CLOSURE_INIT ( & pick_args - > closure , pick_callback_done_locked , pick_args ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
const bool pick_done = grpc_lb_policy_pick_locked (
exec_ctx , chand - > lb_policy , inputs , & calld - > connected_subchannel ,
calld - > subchannel_call_context , NULL , & pick_args - > closure ) ;
if ( pick_done ) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF ( exec_ctx , chand - > lb_policy , " pick_subchannel " ) ;
gpr_free ( pick_args ) ;
}
return pick_done ;
}
GPR_ASSERT ( connected_subchannel ) ;
static bool pick_subchannel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
GPR_TIMER_BEGIN ( " pick_subchannel " , 0 ) ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
bool pick_done = false ;
if ( chand - > lb_policy ! = NULL ) {
apply_final_configuration_locked ( exec_ctx , elem ) ;
grpc_lb_policy * lb_policy = chand - > lb_policy ;
GRPC_LB_POLICY_REF ( lb_policy , " pick_subchannel " ) ;
apply_service_config_to_call_locked ( exec_ctx , elem ) ;
// If the application explicitly set wait_for_ready, use that.
// Otherwise, if the service config specified a value for this
// method, use that.
uint32_t initial_metadata_flags =
calld - > initial_metadata_payload - > send_initial_metadata
. send_initial_metadata_flags ;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET ;
@ -1107,78 +1101,57 @@ static bool pick_subchannel_locked(
}
}
const grpc_lb_policy_pick_args inputs = {
initial_metadata , initial_metadata_flags , & calld - > lb_token_mdelem } ;
// Wrap the user-provided callback in order to hold a strong reference to
// the LB policy for the duration of the pick.
wrapped_on_pick_closure_arg * w_on_pick_arg =
gpr_zalloc ( sizeof ( * w_on_pick_arg ) ) ;
GRPC_CLOSURE_INIT ( & w_on_pick_arg - > wrapper_closure ,
wrapped_on_pick_closure_cb , w_on_pick_arg ,
grpc_schedule_on_exec_ctx ) ;
w_on_pick_arg - > wrapped_closure = on_ready ;
GRPC_LB_POLICY_REF ( lb_policy , " pick_subchannel_wrapping " ) ;
w_on_pick_arg - > lb_policy = lb_policy ;
const bool pick_done = grpc_lb_policy_pick_locked (
exec_ctx , lb_policy , & inputs , connected_subchannel ,
subchannel_call_context , NULL , & w_on_pick_arg - > wrapper_closure ) ;
if ( pick_done ) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF ( exec_ctx , w_on_pick_arg - > lb_policy ,
" pick_subchannel_wrapping " ) ;
gpr_free ( w_on_pick_arg ) ;
calld - > initial_metadata_payload - > send_initial_metadata
. send_initial_metadata ,
initial_metadata_flags , & calld - > lb_token_mdelem } ;
pick_done = pick_callback_start_locked ( exec_ctx , elem , & inputs ) ;
} else if ( chand - > resolver ! = NULL ) {
if ( ! chand - > started_resolving ) {
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
}
GRPC_LB_POLICY_UNREF ( exec_ctx , lb_policy , " pick_subchannel " ) ;
GPR_TIMER_END ( " pick_subchannel " , 0 ) ;
return pick_done ;
}
if ( chand - > resolver ! = NULL & & ! chand - > started_resolving ) {
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next_locked ( exec_ctx , chand - > resolver ,
& chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
}
if ( chand - > resolver ! = NULL ) {
continue_picking_args * cpa = gpr_malloc ( sizeof ( * cpa ) ) ;
cpa - > initial_metadata = initial_metadata ;
cpa - > initial_metadata_flags = initial_metadata_flags ;
cpa - > connected_subchannel = connected_subchannel ;
cpa - > subchannel_call_context = subchannel_call_context ;
cpa - > on_ready = on_ready ;
cpa - > elem = elem ;
GRPC_CLOSURE_INIT ( & cpa - > closure , continue_picking_locked , cpa ,
pick_after_resolver_result_args * args =
( pick_after_resolver_result_args * ) gpr_zalloc ( sizeof ( * args ) ) ;
args - > elem = elem ;
GRPC_CLOSURE_INIT ( & args - > closure ,
continue_picking_after_resolver_result_locked , args ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_closure_list_append ( & chand - > waiting_for_config_closures , & cpa - > closure ,
GRPC_ERROR_NONE ) ;
grpc_closure_list_append ( & chand - > waiting_for_resolver_result_closures ,
& args - > closure , GRPC_ERROR_NONE ) ;
} else {
GRPC_CLOSURE_SCHED ( exec_ctx , on_ready ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Disconnected " ) ) ;
subchannel_ready_locked (
exec_ctx , elem , GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Disconnected " ) ) ;
}
GPR_TIMER_END ( " pick_subchannel " , 0 ) ;
return fals e;
return pick_done ;
}
static void start_transport_stream_op_batch_locked_inner (
grpc_exec_ctx * exec_ctx , grpc_transport_stream_op_batch * op ,
grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
static void start_transport_stream_op_batch_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " start_transport_stream_op_batch_locked " , 0 ) ;
grpc_transport_stream_op_batch * op = arg ;
grpc_call_element * elem = op - > handler_private . extra_arg ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , op , GRPC_ERROR_REF ( coe . error ) ) ;
/* early out */
return ;
goto done ;
}
if ( coe . subchannel_call ! = NULL ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , op ) ;
/* early out */
return ;
goto done ;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
waiting_for_pick_batches_add_locked ( calld , op ) ;
/* if this is a cancellation, then we can raise our cancelled flag */
if ( op - > cancel_stream ) {
grpc_error * error = op - > payload - > cancel_stream . cancel_error ;
@ -1190,30 +1163,22 @@ static void start_transport_stream_op_batch_locked_inner(
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
if ( calld - > pick_pending ) {
cancel_pick_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
} else {
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
}
grpc_transport_stream_op_batch_finish_with_failure ( exec_ctx , op ,
GRPC_ERROR_REF ( error ) ) ;
/* early out */
return ;
waiting_for_pick_batches_fail_locked ( exec_ctx , calld ,
GRPC_ERROR_REF ( error ) ) ;
goto done ;
}
/* if we don't have a subchannel, try to get one */
if ( ! calld - > pick_pending & & calld - > connected_subchannel = = NULL & &
op - > send_initial_metadata ) {
calld - > initial_metadata_payload = op - > payload ;
calld - > pick_pending = true ;
GRPC_CLOSURE_INIT ( & calld - > next_step , subchannel_ready_locked , elem ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
GRPC_CALL_STACK_REF ( calld - > owning_call , " pick_subchannel " ) ;
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data ' s interested_parties , so
that IO of the lb_policy and resolver could be done under it . */
if ( pick_subchannel_locked (
exec_ctx , elem ,
op - > payload - > send_initial_metadata . send_initial_metadata ,
op - > payload - > send_initial_metadata . send_initial_metadata_flags ,
& calld - > connected_subchannel , calld - > subchannel_call_context ,
& calld - > next_step ) ) {
if ( pick_subchannel_locked ( exec_ctx , elem ) ) {
// Pick was returned synchronously.
calld - > pick_pending = false ;
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
if ( calld - > connected_subchannel = = NULL ) {
@ -1221,42 +1186,20 @@ static void start_transport_stream_op_batch_locked_inner(
" Call dropped by load balancing policy " ) ;
set_call_or_error ( calld ,
( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
grpc_transport_stream_op_batch_finish_with_failure ( exec_ctx , op , error ) ;
return ; // Early out.
waiting_for_pick_batches_fail_locked ( exec_ctx , calld , error ) ;
} else {
// Create subchannel call.
create_subchannel_call_locked ( exec_ctx , calld , GRPC_ERROR_NONE ) ;
}
} else {
grpc_polling_entity_add_to_pollset_set ( exec_ctx , calld - > pollent ,
chand - > interested_parties ) ;
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if ( ! calld - > pick_pending & & calld - > connected_subchannel ! = NULL ) {
grpc_subchannel_call * subchannel_call = NULL ;
const grpc_connected_subchannel_call_args call_args = {
. pollent = calld - > pollent ,
. path = calld - > path ,
. start_time = calld - > call_start_time ,
. deadline = calld - > deadline ,
. arena = calld - > arena ,
. context = calld - > subchannel_call_context } ;
grpc_error * error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , & call_args , & subchannel_call ) ;
GPR_ASSERT ( set_call_or_error (
calld , ( call_or_error ) { . subchannel_call = subchannel_call } ) ) ;
if ( error ! = GRPC_ERROR_NONE ) {
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
grpc_transport_stream_op_batch_finish_with_failure ( exec_ctx , op , error ) ;
} else {
retry_waiting_locked ( exec_ctx , calld ) ;
/* recurse to retry */
start_transport_stream_op_batch_locked_inner ( exec_ctx , op , elem ) ;
}
/* early out */
return ;
}
/* nothing to be done but wait */
add_waiting_locked ( calld , op ) ;
done :
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call ,
" start_transport_stream_op_batch " ) ;
GPR_TIMER_END ( " start_transport_stream_op_batch_locked " , 0 ) ;
}
static void on_complete ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
@ -1279,30 +1222,6 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF ( error ) ) ;
}
static void start_transport_stream_op_batch_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " start_transport_stream_op_batch_locked " , 0 ) ;
grpc_transport_stream_op_batch * op = arg ;
grpc_call_element * elem = op - > handler_private . extra_arg ;
call_data * calld = elem - > call_data ;
if ( op - > recv_trailing_metadata ) {
GPR_ASSERT ( op - > on_complete ! = NULL ) ;
calld - > original_on_complete = op - > on_complete ;
GRPC_CLOSURE_INIT ( & calld - > on_complete , on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
op - > on_complete = & calld - > on_complete ;
}
start_transport_stream_op_batch_locked_inner ( exec_ctx , op , elem ) ;
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call ,
" start_transport_stream_op_batch " ) ;
GPR_TIMER_END ( " start_transport_stream_op_batch_locked " , 0 ) ;
}
/* The logic here is fairly complicated, due to (a) the fact that we
need to handle the case where we receive the send op before the
initial metadata op , and ( b ) the need for efficiency , especially in
@ -1321,6 +1240,15 @@ static void cc_start_transport_stream_op_batch(
grpc_deadline_state_client_start_transport_stream_op_batch ( exec_ctx , elem ,
op ) ;
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if ( op - > recv_trailing_metadata ) {
GPR_ASSERT ( op - > on_complete ! = NULL ) ;
calld - > original_on_complete = op - > on_complete ;
GRPC_CLOSURE_INIT ( & calld - > on_complete , on_complete , elem ,
grpc_schedule_on_exec_ctx ) ;
op - > on_complete = & calld - > on_complete ;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error ( calld ) ;
GPR_TIMER_BEGIN ( " cc_start_transport_stream_op_batch " , 0 ) ;
@ -1390,7 +1318,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
" client_channel_destroy_call " ) ;
}
GPR_ASSERT ( ! calld - > pick_pending ) ;
GPR_ASSERT ( calld - > waiting_ops_count = = 0 ) ;
GPR_ASSERT ( calld - > waiting_f or_ pick_batche s_count = = 0 ) ;
if ( calld - > connected_subchannel ! = NULL ) {
GRPC_CONNECTED_SUBCHANNEL_UNREF ( exec_ctx , calld - > connected_subchannel ,
" picked " ) ;
@ -1401,7 +1329,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
calld - > subchannel_call_context [ i ] . value ) ;
}
}
gpr_free ( calld - > waiting_ops ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , then_schedule_closure , GRPC_ERROR_NONE ) ;
}