@ -796,7 +796,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
// send_message
// recv_trailing_metadata
// send_trailing_metadata
# define MAX_WAITING_BATCHES 6
// We also add room for a single cancel_stream batch.
# define MAX_WAITING_BATCHES 7
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer .
@ -808,23 +809,25 @@ typedef struct client_channel_call_data {
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store a pointer to the call
// stack and each has its own mutex. If/when we have time, find a way
// to avoid this without breaking the grpc_deadline_state abstraction.
// combiner. If/when we have time, find a way to avoid this without
// breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state ;
grpc_slice path ; // Request path.
gpr_timespec call_start_time ;
gpr_timespec deadline ;
gpr_arena * arena ;
grpc_call_combiner * call_combiner ;
grpc_server_retry_throttle_data * retry_throttle_data ;
method_parameters * method_params ;
/** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
bit is 0 ) , or a pointer to an error ( if the lowest bit is 1 ) */
gpr_atm subchannel_call_or_error ;
gpr_arena * arena ;
grpc_subchannel_call * subchannel_call ;
grpc_error * error ;
grpc_lb_policy * lb_policy ; // Holds ref while LB pick is pending.
grpc_closure lb_pick_closure ;
grpc_closure cancel_closure ;
grpc_connected_subchannel * connected_subchannel ;
grpc_call_context_element subchannel_call_context [ GRPC_CONTEXT_COUNT ] ;
@ -832,10 +835,9 @@ typedef struct client_channel_call_data {
grpc_transport_stream_op_batch * waiting_for_pick_batches [ MAX_WAITING_BATCHES ] ;
size_t waiting_for_pick_batches_count ;
grpc_closure handle_pending_batch_in_call_combiner [ MAX_WAITING_BATCHES ] ;
grpc_transport_stream_op_batch_payload * initial_metadata_payload ;
grpc_call_stack * owning_call ;
grpc_transport_stream_op_batch * initial_metadata_batch ;
grpc_linked_mdelem lb_token_mdelem ;
@ -843,55 +845,42 @@ typedef struct client_channel_call_data {
grpc_closure * original_on_complete ;
} call_data ;
typedef struct {
grpc_subchannel_call * subchannel_call ;
grpc_error * error ;
} call_or_error ;
static call_or_error get_call_or_error ( call_data * p ) {
gpr_atm c = gpr_atm_acq_load ( & p - > subchannel_call_or_error ) ;
if ( c = = 0 )
return ( call_or_error ) { NULL , NULL } ;
else if ( c & 1 )
return ( call_or_error ) { NULL , ( grpc_error * ) ( ( c ) & ~ ( gpr_atm ) 1 ) } ;
else
return ( call_or_error ) { ( grpc_subchannel_call * ) c , NULL } ;
grpc_subchannel_call * grpc_client_channel_get_subchannel_call (
grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
return calld - > subchannel_call ;
}
static bool set_call_or_error ( call_data * p , call_or_error coe ) {
// this should always be under a lock
call_or_error existing = get_call_or_error ( p ) ;
if ( existing . error ! = GRPC_ERROR_NONE ) {
GRPC_ERROR_UNREF ( coe . error ) ;
return false ;
}
GPR_ASSERT ( existing . subchannel_call = = NULL ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
GPR_ASSERT ( coe . subchannel_call = = NULL ) ;
gpr_atm_rel_store ( & p - > subchannel_call_or_error , 1 | ( gpr_atm ) coe . error ) ;
// This is called via the call combiner, so access to calld is synchronized.
static void waiting_for_pick_batches_add (
call_data * calld , grpc_transport_stream_op_batch * batch ) {
if ( batch - > send_initial_metadata ) {
GPR_ASSERT ( calld - > initial_metadata_batch = = NULL ) ;
calld - > initial_metadata_batch = batch ;
} else {
GPR_ASSERT ( coe . subchannel_call ! = NULL ) ;
gpr_atm_rel_store ( & p - > subchannel_call_or_error ,
( gpr_atm ) coe . subchannel_call ) ;
GPR_ASSERT ( calld - > waiting_for_pick_batches_count < MAX_WAITING_BATCHES ) ;
calld - > waiting_for_pick_batches [ calld - > waiting_for_pick_batches_count + + ] =
batch ;
}
return true ;
}
grpc_subchannel_call * grpc_client_channel_get_subchannel_call (
grpc_call_element * call_elem ) {
return get_call_or_error ( call_elem - > call_data ) . subchannel_call ;
}
static void waiting_for_pick_batches_add_locked (
call_data * calld , grpc_transport_stream_op_batch * batch ) {
GPR_ASSERT ( calld - > waiting_for_pick_batches_count < MAX_WAITING_BATCHES ) ;
calld - > waiting_for_pick_batches [ calld - > waiting_for_pick_batches_count + + ] =
batch ;
// This is called via the call combiner, so access to calld is synchronized.
static void fail_pending_batch_in_call_combiner ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) {
call_data * calld = arg ;
if ( calld - > waiting_for_pick_batches_count > 0 ) {
- - calld - > waiting_for_pick_batches_count ;
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx ,
calld - > waiting_for_pick_batches [ calld - > waiting_for_pick_batches_count ] ,
GRPC_ERROR_REF ( error ) , calld - > call_combiner ) ;
}
}
static void waiting_for_pick_batches_fail_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
// This is called via the call combiner, so access to calld is synchronized.
static void waiting_for_pick_batches_fail ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
@ -900,34 +889,60 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
grpc_error_string ( error ) ) ;
}
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
GRPC_CLOSURE_INIT ( & calld - > handle_pending_batch_in_call_combiner [ i ] ,
fail_pending_batch_in_call_combiner , calld ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CALL_COMBINER_START ( exec_ctx , calld - > call_combiner ,
& calld - > handle_pending_batch_in_call_combiner [ i ] ,
GRPC_ERROR_REF ( error ) ,
" waiting_for_pick_batches_fail " ) ;
}
if ( calld - > initial_metadata_batch ! = NULL ) {
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , calld - > waiting_for_pick_batches [ i ] , GRPC_ERROR_REF ( error ) ) ;
exec_ctx , calld - > initial_metadata_batch , GRPC_ERROR_REF ( error ) ,
calld - > call_combiner ) ;
} else {
GRPC_CALL_COMBINER_STOP ( exec_ctx , calld - > call_combiner ,
" waiting_for_pick_batches_fail " ) ;
}
calld - > waiting_for_pick_batches_count = 0 ;
GRPC_ERROR_UNREF ( error ) ;
}
static void waiting_for_pick_batches_resume_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
if ( calld - > waiting_for_pick_batches_count = = 0 ) return ;
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
waiting_for_pick_batches_fail_locked ( exec_ctx , elem ,
GRPC_ERROR_REF ( coe . error ) ) ;
return ;
// This is called via the call combiner, so access to calld is synchronized.
static void run_pending_batch_in_call_combiner ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * ignored ) {
call_data * calld = arg ;
if ( calld - > waiting_for_pick_batches_count > 0 ) {
- - calld - > waiting_for_pick_batches_count ;
grpc_subchannel_call_process_op (
exec_ctx , calld - > subchannel_call ,
calld - > waiting_for_pick_batches [ calld - > waiting_for_pick_batches_count ] ) ;
}
}
// This is called via the call combiner, so access to calld is synchronized.
static void waiting_for_pick_batches_resume ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: sending % " PRIdPTR
" pending batches to subchannel_call=%p " ,
elem - > channel_data , calld , calld - > waiting_for_pick_batches_count ,
coe . subchannel_call ) ;
chand , calld , calld - > waiting_for_pick_batches_count ,
calld - > subchannel_call ) ;
}
for ( size_t i = 0 ; i < calld - > waiting_for_pick_batches_count ; + + i ) {
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call ,
calld - > waiting_for_pick_batches [ i ] ) ;
}
calld - > waiting_for_pick_batches_count = 0 ;
GRPC_CLOSURE_INIT ( & calld - > handle_pending_batch_in_call_combiner [ i ] ,
run_pending_batch_in_call_combiner , calld ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CALL_COMBINER_START ( exec_ctx , calld - > call_combiner ,
& calld - > handle_pending_batch_in_call_combiner [ i ] ,
GRPC_ERROR_NONE ,
" waiting_for_pick_batches_resume " ) ;
}
GPR_ASSERT ( calld - > initial_metadata_batch ! = NULL ) ;
grpc_subchannel_call_process_op ( exec_ctx , calld - > subchannel_call ,
calld - > initial_metadata_batch ) ;
}
// Applies service config to the call. Must be invoked once we know
@ -968,29 +983,28 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
static void create_subchannel_call_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
grpc_subchannel_call * subchannel_call = NULL ;
const grpc_connected_subchannel_call_args call_args = {
. pollent = calld - > pollent ,
. path = calld - > path ,
. start_time = calld - > call_start_time ,
. deadline = calld - > deadline ,
. arena = calld - > arena ,
. context = calld - > subchannel_call_context } ;
. context = calld - > subchannel_call_context ,
. call_combiner = calld - > call_combiner } ;
grpc_error * new_error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , & call_args , & subchannel_call ) ;
exec_ctx , calld - > connected_subchannel , & call_args ,
& calld - > subchannel_call ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: create subchannel_call=%p: error=%s " ,
elem - > channel_data , calld , subchannel_call ,
grpc_error_string ( new_error ) ) ;
chand , calld , calld - > subchannel_call , grpc_error_string ( new_error ) ) ;
}
GPR_ASSERT ( set_call_or_error (
calld , ( call_or_error ) { . subchannel_call = subchannel_call } ) ) ;
if ( new_error ! = GRPC_ERROR_NONE ) {
new_error = grpc_error_add_child ( new_error , error ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , new_error ) ;
waiting_for_pick_batches_fail ( exec_ctx , elem , new_error ) ;
} else {
waiting_for_pick_batches_resume_locked ( exec_ctx , elem ) ;
waiting_for_pick_batches_resume ( exec_ctx , elem ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
@ -1002,60 +1016,27 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
channel_data * chand = elem - > channel_data ;
grpc_polling_entity_del_from_pollset_set ( exec_ctx , calld - > pollent ,
chand - > interested_parties ) ;
call_or_error coe = get_call_or_error ( calld ) ;
if ( calld - > connected_subchannel = = NULL ) {
// Failed to create subchannel.
grpc_error * failure =
error = = GRPC_ERROR_NONE
? GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " )
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Failed to create subchannel " , & error , 1 ) ;
GRPC_ERROR_UNREF ( calld - > error ) ;
calld - > error = error = = GRPC_ERROR_NONE
? GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " )
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Failed to create subchannel " , & error , 1 ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: failed to create subchannel: error=%s " , chand ,
calld , grpc_error_string ( failure ) ) ;
}
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( failure ) } ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , failure ) ;
} else if ( coe . error ! = GRPC_ERROR_NONE ) {
/* already cancelled before subchannel became ready */
grpc_error * child_errors [ ] = { error , coe . error } ;
grpc_error * cancellation_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Cancelled before creating subchannel " , child_errors ,
GPR_ARRAY_SIZE ( child_errors ) ) ;
/* if due to deadline, attach the deadline exceeded status to the error */
if ( gpr_time_cmp ( calld - > deadline , gpr_now ( GPR_CLOCK_MONOTONIC ) ) < 0 ) {
cancellation_error =
grpc_error_set_int ( cancellation_error , GRPC_ERROR_INT_GRPC_STATUS ,
GRPC_STATUS_DEADLINE_EXCEEDED ) ;
calld , grpc_error_string ( calld - > error ) ) ;
}
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: cancelled before subchannel became ready: %s " ,
chand , calld , grpc_error_string ( cancellation_error ) ) ;
}
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , cancellation_error ) ;
waiting_for_pick_batches_fail ( exec_ctx , elem , GRPC_ERROR_REF ( calld - > error ) ) ;
} else {
/* Create call on subchannel. */
create_subchannel_call_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
}
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
GRPC_ERROR_UNREF ( error ) ;
}
static char * cc_get_peer ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
grpc_subchannel_call * subchannel_call =
get_call_or_error ( calld ) . subchannel_call ;
if ( subchannel_call = = NULL ) {
return NULL ;
} else {
return grpc_subchannel_call_get_peer ( exec_ctx , subchannel_call ) ;
}
}
/** Return true if subchannel is available immediately (in which case
subchannel_ready_locked ( ) should not be called ) , or false otherwise ( in
which case subchannel_ready_locked ( ) should be called when the subchannel
@ -1069,6 +1050,44 @@ typedef struct {
grpc_closure closure ;
} pick_after_resolver_result_args ;
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
static void pick_after_resolver_result_cancel_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error ) {
grpc_call_element * elem = arg ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
// If we don't yet have a resolver result, then a closure for
// pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
// cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for ( grpc_closure * closure = chand - > waiting_for_resolver_result_closures . head ;
closure ! = NULL ; closure = closure - > next_data . next ) {
pick_after_resolver_result_args * args = closure - > cb_arg ;
if ( ! args - > cancelled & & args - > elem = = elem ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: "
" cancelling pick waiting for resolver result " ,
chand , calld ) ;
}
args - > cancelled = true ;
// Note: Although we are not in the call combiner here, we are
// basically stealing the call combiner from the pending pick, so
// it's safe to call subchannel_ready_locked() here -- we are
// essentially calling it here instead of calling it in
// pick_after_resolver_result_done_locked().
subchannel_ready_locked ( exec_ctx , elem ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick cancelled " , & error , 1 ) ) ;
}
}
}
static void pick_after_resolver_result_done_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error ) {
@ -1079,21 +1098,24 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
gpr_log ( GPR_DEBUG , " call cancelled before resolver result " ) ;
}
} else {
channel_data * chand = args - > elem - > channel_data ;
call_data * calld = args - > elem - > call_data ;
grpc_call_element * elem = args - > elem ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
grpc_call_combiner_set_notify_on_cancel ( exec_ctx , calld - > call_combiner ,
NULL ) ;
if ( error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: resolver failed to return data " ,
chand , calld ) ;
}
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_REF ( error ) ) ;
subchannel_ready_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
} else {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: resolver returned, doing pick " ,
chand , calld ) ;
}
if ( pick_subchannel_locked ( exec_ctx , args - > elem ) ) {
subchannel_ready_locked ( exec_ctx , args - > elem , GRPC_ERROR_NONE ) ;
if ( pick_subchannel_locked ( exec_ctx , elem ) ) {
subchannel_ready_locked ( exec_ctx , elem , GRPC_ERROR_NONE ) ;
}
}
}
@ -1116,41 +1138,33 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
args , grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_closure_list_append ( & chand - > waiting_for_resolver_result_closures ,
& args - > closure , GRPC_ERROR_NONE ) ;
grpc_call_combiner_set_notify_on_cancel (
exec_ctx , calld - > call_combiner ,
GRPC_CLOSURE_INIT ( & calld - > cancel_closure ,
pick_after_resolver_result_cancel_locked , elem ,
grpc_combiner_scheduler ( chand - > combiner ) ) ) ;
}
static void pick_after_resolver_result_cancel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
static void pick_callback_cancel_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_call_element * elem = arg ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
// If we don't yet have a resolver result, then a closure for
// pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
// cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for ( grpc_closure * closure = chand - > waiting_for_resolver_result_closures . head ;
closure ! = NULL ; closure = closure - > next_data . next ) {
pick_after_resolver_result_args * args = closure - > cb_arg ;
if ( ! args - > cancelled & & args - > elem = = elem ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: "
" cancelling pick waiting for resolver result " ,
chand , calld ) ;
}
args - > cancelled = true ;
subchannel_ready_locked ( exec_ctx , elem ,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Pick cancelled " , & error , 1 ) ) ;
if ( calld - > lb_policy ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: cancelling pick from LB policy %p " ,
chand , calld , calld - > lb_policy ) ;
}
grpc_lb_policy_cancel_pick_locked ( exec_ctx , calld - > lb_policy ,
& calld - > connected_subchannel ,
GRPC_ERROR_REF ( error ) ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
// Unrefs the LB policy and invokes subchannel_ready_locked().
static void pick_callback_done_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_call_element * elem = arg ;
@ -1160,6 +1174,7 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: pick completed asynchronously " ,
chand , calld ) ;
}
grpc_call_combiner_set_notify_on_cancel ( exec_ctx , calld - > call_combiner , NULL ) ;
GPR_ASSERT ( calld - > lb_policy ! = NULL ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , calld - > lb_policy , " pick_subchannel " ) ;
calld - > lb_policy = NULL ;
@ -1194,24 +1209,15 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
}
GRPC_LB_POLICY_UNREF ( exec_ctx , calld - > lb_policy , " pick_subchannel " ) ;
calld - > lb_policy = NULL ;
} else {
grpc_call_combiner_set_notify_on_cancel (
exec_ctx , calld - > call_combiner ,
GRPC_CLOSURE_INIT ( & calld - > cancel_closure , pick_callback_cancel_locked ,
elem , grpc_combiner_scheduler ( chand - > combiner ) ) ) ;
}
return pick_done ;
}
static void pick_callback_cancel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_error * error ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
GPR_ASSERT ( calld - > lb_policy ! = NULL ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: cancelling pick from LB policy %p " ,
chand , calld , calld - > lb_policy ) ;
}
grpc_lb_policy_cancel_pick_locked ( exec_ctx , calld - > lb_policy ,
& calld - > connected_subchannel , error ) ;
}
static bool pick_subchannel_locked ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ) {
GPR_TIMER_BEGIN ( " pick_subchannel " , 0 ) ;
@ -1224,7 +1230,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
// Otherwise, if the service config specified a value for this
// method, use that.
uint32_t initial_metadata_flags =
calld - > initial_metadata_payload - > send_initial_metadata
calld - > initial_metadata_batch - > payload - > send_initial_metadata
. send_initial_metadata_flags ;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
@ -1241,7 +1247,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
}
}
const grpc_lb_policy_pick_args inputs = {
calld - > initial_metadata_payload - > send_initial_metadata
calld - > initial_metadata_batch - > payload - > send_initial_metadata
. send_initial_metadata ,
initial_metadata_flags , & calld - > lb_token_mdelem } ;
pick_done = pick_callback_start_locked ( exec_ctx , elem , & inputs ) ;
@ -1258,91 +1264,33 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
return pick_done ;
}
static void start_transport_stream_op_batch_locked ( grpc_exec_ctx * exec_ctx ,
void * arg ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " start_transport_stream_op_batch_locked " , 0 ) ;
grpc_transport_stream_op_batch * batch = arg ;
grpc_call_element * elem = batch - > handler_private . extra_arg ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error ( calld ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , grpc_error_string ( coe . error ) ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , batch , GRPC_ERROR_REF ( coe . error ) ) ;
goto done ;
}
if ( coe . subchannel_call ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: sending batch to subchannel_call=%p " , chand ,
calld , coe . subchannel_call ) ;
}
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , batch ) ;
goto done ;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
waiting_for_pick_batches_add_locked ( calld , batch ) ;
// If this is a cancellation, cancel the pending pick (if any) and
// fail any pending batches.
if ( batch - > cancel_stream ) {
grpc_error * error = batch - > payload - > cancel_stream . cancel_error ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: recording cancel_error=%s " , chand ,
calld , grpc_error_string ( error ) ) ;
}
/* Stash a copy of cancel_error in our call data, so that we can use
it for subsequent operations . This ensures that if the call is
cancelled before any batches are passed down ( e . g . , if the deadline
is in the past when the call starts ) , we can return the right
error to the caller when the first batch does get passed down . */
set_call_or_error ( calld , ( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
if ( calld - > lb_policy ! = NULL ) {
pick_callback_cancel_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
static void start_pick_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " start_pick_locked " , 0 ) ;
grpc_call_element * elem = ( grpc_call_element * ) arg ;
call_data * calld = ( call_data * ) elem - > call_data ;
channel_data * chand = ( channel_data * ) elem - > channel_data ;
GPR_ASSERT ( calld - > connected_subchannel = = NULL ) ;
if ( pick_subchannel_locked ( exec_ctx , elem ) ) {
// Pick was returned synchronously.
if ( calld - > connected_subchannel = = NULL ) {
GRPC_ERROR_UNREF ( calld - > error ) ;
calld - > error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " ) ;
waiting_for_pick_batches_fail ( exec_ctx , elem ,
GRPC_ERROR_REF ( calld - > error ) ) ;
} else {
pick_after_resolver_result_cancel_locked ( exec_ctx , elem ,
GRPC_ERROR_REF ( error ) ) ;
}
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , GRPC_ERROR_REF ( error ) ) ;
goto done ;
}
/* if we don't have a subchannel, try to get one */
if ( batch - > send_initial_metadata ) {
GPR_ASSERT ( calld - > connected_subchannel = = NULL ) ;
calld - > initial_metadata_payload = batch - > payload ;
GRPC_CALL_STACK_REF ( calld - > owning_call , " pick_subchannel " ) ;
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data ' s interested_parties , so
that IO of the lb_policy and resolver could be done under it . */
if ( pick_subchannel_locked ( exec_ctx , elem ) ) {
// Pick was returned synchronously.
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
if ( calld - > connected_subchannel = = NULL ) {
grpc_error * error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " ) ;
set_call_or_error ( calld ,
( call_or_error ) { . error = GRPC_ERROR_REF ( error ) } ) ;
waiting_for_pick_batches_fail_locked ( exec_ctx , elem , error ) ;
} else {
// Create subchannel call.
create_subchannel_call_locked ( exec_ctx , elem , GRPC_ERROR_NONE ) ;
}
} else {
grpc_polling_entity_add_to_pollset_set ( exec_ctx , calld - > pollent ,
chand - > interested_parties ) ;
// Create subchannel call.
create_subchannel_call_locked ( exec_ctx , elem , GRPC_ERROR_NONE ) ;
}
} else {
// Pick will be done asynchronously. Add the call's polling entity to
// the channel's interested_parties, so that I/O for the resolver
// and LB policy can be done under it.
grpc_polling_entity_add_to_pollset_set ( exec_ctx , calld - > pollent ,
chand - > interested_parties ) ;
}
done :
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call ,
" start_transport_stream_op_batch " ) ;
GPR_TIMER_END ( " start_transport_stream_op_batch_locked " , 0 ) ;
GPR_TIMER_END ( " start_pick_locked " , 0 ) ;
}
static void on_complete ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
@ -1365,27 +1313,49 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF ( error ) ) ;
}
/* The logic here is fairly complicated, due to (a) the fact that we
need to handle the case where we receive the send op before the
initial metadata op , and ( b ) the need for efficiency , especially in
the streaming case .
We use double - checked locking to initially see if initialization has been
performed . If it has not , we acquire the combiner and perform initialization .
If it has , we proceed on the fast path . */
static void cc_start_transport_stream_op_batch (
grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_transport_stream_op_batch * batch ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) | |
GRPC_TRACER_ON ( grpc_trace_channel ) ) {
grpc_call_log_op ( GPR_INFO , elem , batch ) ;
}
if ( chand - > deadline_checking_enabled ) {
grpc_deadline_state_client_start_transport_stream_op_batch ( exec_ctx , elem ,
batch ) ;
}
GPR_TIMER_BEGIN ( " cc_start_transport_stream_op_batch " , 0 ) ;
// If we've previously been cancelled, immediately fail any new batches.
if ( calld - > error ! = GRPC_ERROR_NONE ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , grpc_error_string ( calld - > error ) ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , batch , GRPC_ERROR_REF ( calld - > error ) , calld - > call_combiner ) ;
goto done ;
}
if ( batch - > cancel_stream ) {
// Stash a copy of cancel_error in our call data, so that we can use
// it for subsequent operations. This ensures that if the call is
// cancelled before any batches are passed down (e.g., if the deadline
// is in the past when the call starts), we can return the right
// error to the caller when the first batch does get passed down.
GRPC_ERROR_UNREF ( calld - > error ) ;
calld - > error = GRPC_ERROR_REF ( batch - > payload - > cancel_stream . cancel_error ) ;
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: recording cancel_error=%s " , chand ,
calld , grpc_error_string ( calld - > error ) ) ;
}
// If we have a subchannel call, send the cancellation batch down.
// Otherwise, fail all pending batches.
if ( calld - > subchannel_call ! = NULL ) {
grpc_subchannel_call_process_op ( exec_ctx , calld - > subchannel_call , batch ) ;
} else {
waiting_for_pick_batches_add ( calld , batch ) ;
waiting_for_pick_batches_fail ( exec_ctx , elem ,
GRPC_ERROR_REF ( calld - > error ) ) ;
}
goto done ;
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if ( batch - > recv_trailing_metadata ) {
@ -1395,38 +1365,43 @@ static void cc_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx ) ;
batch - > on_complete = & calld - > on_complete ;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error ( calld ) ;
GPR_TIMER_BEGIN ( " cc_start_transport_stream_op_batch " , 0 ) ;
if ( coe . error ! = GRPC_ERROR_NONE ) {
// Check if we've already gotten a subchannel call.
// Note that once we have completed the pick, we do not need to enter
// the channel combiner, which is more efficient (especially for
// streaming calls).
if ( calld - > subchannel_call ! = NULL ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , grpc_error_string ( coe . error ) ) ;
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: sending batch to subchannel_call=%p " , chand ,
calld , calld - > subchannel_call ) ;
}
grpc_transport_stream_op_batch_finish_with_failure (
exec_ctx , batch , GRPC_ERROR_REF ( coe . error ) ) ;
grpc_subchannel_call_process_op ( exec_ctx , calld - > subchannel_call , batch ) ;
goto done ;
}
if ( coe . subchannel_call ! = NULL ) {
// We do not yet have a subchannel call.
// Add the batch to the waiting-for-pick list.
waiting_for_pick_batches_add ( calld , batch ) ;
// For batches containing a send_initial_metadata op, enter the channel
// combiner to start a pick.
if ( batch - > send_initial_metadata ) {
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: entering combiner " , chand , calld ) ;
}
GRPC_CLOSURE_SCHED (
exec_ctx ,
GRPC_CLOSURE_INIT ( & batch - > handler_private . closure , start_pick_locked ,
elem , grpc_combiner_scheduler ( chand - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
} else {
// For all other batches, release the call combiner.
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG ,
" chand=%p calld=%p: sending batch to subchannel_call=%p " , chand ,
calld , coe . subchannel_call ) ;
" chand=%p calld=%p: saved batch, yeilding call combiner " , chand ,
calld ) ;
}
grpc_subchannel_call_process_op ( exec_ctx , coe . subchannel_call , batch ) ;
goto done ;
}
/* we failed; lock and figure out what to do */
if ( GRPC_TRACER_ON ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_DEBUG , " chand=%p calld=%p: entering combiner " , chand , calld ) ;
GRPC_CALL_COMBINER_STOP ( exec_ctx , calld - > call_combiner ,
" batch does not include send_initial_metadata " ) ;
}
GRPC_CALL_STACK_REF ( calld - > owning_call , " start_transport_stream_op_batch " ) ;
batch - > handler_private . extra_arg = elem ;
GRPC_CLOSURE_SCHED (
exec_ctx , GRPC_CLOSURE_INIT ( & batch - > handler_private . closure ,
start_transport_stream_op_batch_locked , batch ,
grpc_combiner_scheduler ( chand - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
done :
GPR_TIMER_END ( " cc_start_transport_stream_op_batch " , 0 ) ;
}
@ -1441,10 +1416,11 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld - > path = grpc_slice_ref_internal ( args - > path ) ;
calld - > call_start_time = args - > start_time ;
calld - > deadline = gpr_convert_clock_type ( args - > deadline , GPR_CLOCK_MONOTONIC ) ;
calld - > owning_call = args - > call_stack ;
calld - > arena = args - > arena ;
calld - > call_combiner = args - > call_combiner ;
if ( chand - > deadline_checking_enabled ) {
grpc_deadline_state_init ( exec_ctx , elem , args - > call_stack , calld - > deadline ) ;
grpc_deadline_state_init ( exec_ctx , elem , args - > call_stack ,
args - > call_combiner , calld - > deadline ) ;
}
return GRPC_ERROR_NONE ;
}
@ -1463,13 +1439,12 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if ( calld - > method_params ! = NULL ) {
method_parameters_unref ( calld - > method_params ) ;
}
call_or_error coe = get_call_or_error ( calld ) ;
GRPC_ERROR_UNREF ( coe . error ) ;
if ( coe . subchannel_call ! = NULL ) {
grpc_subchannel_call_set_cleanup_closure ( coe . subchannel_call ,
GRPC_ERROR_UNREF ( calld - > error ) ;
if ( calld - > subchannel_call ! = NULL ) {
grpc_subchannel_call_set_cleanup_closure ( calld - > subchannel_call ,
then_schedule_closure ) ;
then_schedule_closure = NULL ;
GRPC_SUBCHANNEL_CALL_UNREF ( exec_ctx , coe . subchannel_call ,
GRPC_SUBCHANNEL_CALL_UNREF ( exec_ctx , calld - > subchannel_call ,
" client_channel_destroy_call " ) ;
}
GPR_ASSERT ( calld - > lb_policy = = NULL ) ;
@ -1508,7 +1483,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
sizeof ( channel_data ) ,
cc_init_channel_elem ,
cc_destroy_channel_elem ,
cc_get_peer ,
cc_get_channel_info ,
" client-channel " ,
} ;