@ -139,8 +139,7 @@ struct grpc_subchannel_call {
# define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
( ( ( grpc_subchannel_call * ) ( callstack ) ) - 1 )
static void subchannel_connected ( grpc_exec_ctx * exec_ctx , void * subchannel ,
grpc_error * error ) ;
static void subchannel_connected ( void * subchannel , grpc_error * error ) ;
# ifndef NDEBUG
# define REF_REASON reason
@ -157,10 +156,9 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* subchannel,
* connection implementation
*/
static void connection_destroy ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void connection_destroy ( void * arg , grpc_error * error ) {
grpc_connected_subchannel * c = ( grpc_connected_subchannel * ) arg ;
grpc_channel_stack_destroy ( exec_ctx , CHANNEL_STACK_FROM_CONNECTION ( c ) ) ;
grpc_channel_stack_destroy ( CHANNEL_STACK_FROM_CONNECTION ( c ) ) ;
gpr_free ( c ) ;
}
@ -170,26 +168,23 @@ grpc_connected_subchannel* grpc_connected_subchannel_ref(
return c ;
}
void grpc_connected_subchannel_unref ( grpc_exec_ctx * exec_ctx ,
grpc_connected_subchannel * c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , CHANNEL_STACK_FROM_CONNECTION ( c ) ,
REF_REASON ) ;
void grpc_connected_subchannel_unref (
grpc_connected_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
GRPC_CHANNEL_STACK_UNREF ( CHANNEL_STACK_FROM_CONNECTION ( c ) , REF_REASON ) ;
}
/*
* grpc_subchannel implementation
*/
static void subchannel_destroy ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void subchannel_destroy ( void * arg , grpc_error * error ) {
grpc_subchannel * c = ( grpc_subchannel * ) arg ;
gpr_free ( ( void * ) c - > filters ) ;
grpc_channel_args_destroy ( exec_ctx , c - > args ) ;
grpc_connectivity_state_destroy ( exec_ctx , & c - > state_tracker ) ;
grpc_connector_unref ( exec_ctx , c - > connector ) ;
grpc_pollset_set_destroy ( exec_ctx , c - > pollset_set ) ;
grpc_subchannel_key_destroy ( exec_ctx , c - > key ) ;
grpc_channel_args_destroy ( c - > args ) ;
grpc_connectivity_state_destroy ( & c - > state_tracker ) ;
grpc_connector_unref ( c - > connector ) ;
grpc_pollset_set_destroy ( c - > pollset_set ) ;
grpc_subchannel_key_destroy ( c - > key ) ;
gpr_mu_destroy ( & c - > mu ) ;
gpr_free ( c ) ;
}
@ -241,59 +236,54 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
}
}
static void disconnect ( grpc_exec_ctx * exec_ctx , grpc_ subchannel * c ) {
static void disconnect ( grpc_subchannel * c ) {
grpc_connected_subchannel * con ;
grpc_subchannel_index_unregister ( exec_ctx , c - > key , c ) ;
grpc_subchannel_index_unregister ( c - > key , c ) ;
gpr_mu_lock ( & c - > mu ) ;
GPR_ASSERT ( ! c - > disconnected ) ;
c - > disconnected = true ;
grpc_connector_shutdown (
exec_ctx , c - > connector ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Subchannel disconnected " ) ) ;
grpc_connector_shutdown ( c - > connector , GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Subchannel disconnected " ) ) ;
con = GET_CONNECTED_SUBCHANNEL ( c , no_barrier ) ;
if ( con ! = nullptr ) {
GRPC_CONNECTED_SUBCHANNEL_UNREF ( exec_ctx , con , " connection " ) ;
GRPC_CONNECTED_SUBCHANNEL_UNREF ( con , " connection " ) ;
gpr_atm_no_barrier_store ( & c - > connected_subchannel , ( gpr_atm ) 0xdeadbeef ) ;
}
gpr_mu_unlock ( & c - > mu ) ;
}
void grpc_subchannel_unref ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
void grpc_subchannel_unref ( grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
gpr_atm old_refs ;
// add a weak ref and subtract a strong ref (atomically)
old_refs = ref_mutate ( c , ( gpr_atm ) 1 - ( gpr_atm ) ( 1 < < INTERNAL_REF_BITS ) ,
1 REF_MUTATE_PURPOSE ( " STRONG_UNREF " ) ) ;
if ( ( old_refs & STRONG_REF_MASK ) = = ( 1 < < INTERNAL_REF_BITS ) ) {
disconnect ( exec_ctx , c ) ;
disconnect ( c ) ;
}
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " strong-unref " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " strong-unref " ) ;
}
void grpc_subchannel_weak_unref ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel * c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
void grpc_subchannel_weak_unref (
grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
gpr_atm old_refs ;
old_refs = ref_mutate ( c , - ( gpr_atm ) 1 , 1 REF_MUTATE_PURPOSE ( " WEAK_UNREF " ) ) ;
if ( old_refs = = 1 ) {
GRPC_CLOSURE_SCHED (
exec_ctx ,
GRPC_CLOSURE_CREATE ( subchannel_destroy , c , grpc_schedule_on_exec_ctx ) ,
GRPC_ERROR_NONE ) ;
}
}
grpc_subchannel * grpc_subchannel_create ( grpc_exec_ctx * exec_ctx ,
grpc_connector * connector ,
grpc_subchannel * grpc_subchannel_create ( grpc_connector * connector ,
const grpc_subchannel_args * args ) {
grpc_subchannel_key * key = grpc_subchannel_key_create ( args ) ;
grpc_subchannel * c = grpc_subchannel_index_find ( exec_ctx , key ) ;
grpc_subchannel * c = grpc_subchannel_index_find ( key ) ;
if ( c ) {
grpc_subchannel_key_destroy ( exec_ctx , key ) ;
grpc_subchannel_key_destroy ( key ) ;
return c ;
}
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED ( exec_ctx ) ;
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED ( ) ;
c = ( grpc_subchannel * ) gpr_zalloc ( sizeof ( * c ) ) ;
c - > key = key ;
gpr_atm_no_barrier_store ( & c - > ref_pair , 1 < < INTERNAL_REF_BITS ) ;
@ -311,10 +301,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
c - > pollset_set = grpc_pollset_set_create ( ) ;
grpc_resolved_address * addr =
( grpc_resolved_address * ) gpr_malloc ( sizeof ( * addr ) ) ;
grpc_get_subchannel_address_arg ( exec_ctx , args - > args , addr ) ;
grpc_get_subchannel_address_arg ( args - > args , addr ) ;
grpc_resolved_address * new_address = nullptr ;
grpc_channel_args * new_args = nullptr ;
if ( grpc_proxy_mappers_map_address ( exec_ctx , addr , args - > args , & new_address ,
if ( grpc_proxy_mappers_map_address ( addr , args - > args , & new_address ,
& new_args ) ) {
GPR_ASSERT ( new_address ! = nullptr ) ;
gpr_free ( addr ) ;
@ -327,7 +317,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
new_args ! = nullptr ? new_args : args - > args , keys_to_remove ,
GPR_ARRAY_SIZE ( keys_to_remove ) , & new_arg , 1 ) ;
gpr_free ( new_arg . value . string ) ;
if ( new_args ! = nullptr ) grpc_channel_args_destroy ( exec_ctx , new_args ) ;
if ( new_args ! = nullptr ) grpc_channel_args_destroy ( new_args ) ;
c - > root_external_state_watcher . next = c - > root_external_state_watcher . prev =
& c - > root_external_state_watcher ;
GRPC_CLOSURE_INIT ( & c - > connected , subchannel_connected , c ,
@ -373,21 +363,19 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
min_backoff_ms , max_backoff_ms ) ;
gpr_mu_init ( & c - > mu ) ;
return grpc_subchannel_index_register ( exec_ctx , key , c ) ;
return grpc_subchannel_index_register ( key , c ) ;
}
static void continue_connect_locked ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel * c ) {
static void continue_connect_locked ( grpc_subchannel * c ) {
grpc_connect_in_args args ;
args . interested_parties = c - > pollset_set ;
args . deadline = c - > backoff_result . current_deadline ;
args . channel_args = c - > args ;
grpc_connectivity_state_set ( exec_ctx , & c - > state_tracker ,
GRPC_CHANNEL_CONNECTING , GRPC_ERROR_NONE ,
" state_change " ) ;
grpc_connector_connect ( exec_ctx , c - > connector , & args , & c - > connecting_result ,
grpc_connectivity_state_set ( & c - > state_tracker , GRPC_CHANNEL_CONNECTING ,
GRPC_ERROR_NONE , " state_change " ) ;
grpc_connector_connect ( c - > connector , & args , & c - > connecting_result ,
& c - > connected ) ;
}
@ -400,24 +388,23 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
return state ;
}
static void on_external_state_watcher_done ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void on_external_state_watcher_done ( void * arg , grpc_error * error ) {
external_state_watcher * w = ( external_state_watcher * ) arg ;
grpc_closure * follow_up = w - > notify ;
if ( w - > pollset_set ! = nullptr ) {
grpc_pollset_set_del_pollset_set ( exec_ctx , w - > subchannel - > pollset_set ,
grpc_pollset_set_del_pollset_set ( w - > subchannel - > pollset_set ,
w - > pollset_set ) ;
}
gpr_mu_lock ( & w - > subchannel - > mu ) ;
w - > next - > prev = w - > prev ;
w - > prev - > next = w - > next ;
gpr_mu_unlock ( & w - > subchannel - > mu ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , w - > subchannel , " external_state_watcher " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( w - > subchannel , " external_state_watcher " ) ;
gpr_free ( w ) ;
GRPC_CLOSURE_RUN ( exec_ctx , follow_up , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_RUN ( follow_up , GRPC_ERROR_REF ( error ) ) ;
}
static void on_alarm ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
static void on_alarm ( void * arg , grpc_error * error ) {
grpc_subchannel * c = ( grpc_subchannel * ) arg ;
gpr_mu_lock ( & c - > mu ) ;
c - > have_alarm = false ;
@ -429,18 +416,17 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
}
if ( error = = GRPC_ERROR_NONE ) {
gpr_log ( GPR_INFO , " Failed to connect to channel, retrying " ) ;
c - > backoff_result = grpc_backoff_step ( exec_ctx , & c - > backoff_state ) ;
continue_connect_locked ( exec_ctx , c ) ;
c - > backoff_result = grpc_backoff_step ( & c - > backoff_state ) ;
continue_connect_locked ( c ) ;
gpr_mu_unlock ( & c - > mu ) ;
} else {
gpr_mu_unlock ( & c - > mu ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " connecting " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " connecting " ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
static void maybe_start_connecting_locked ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel * c ) {
static void maybe_start_connecting_locked ( grpc_subchannel * c ) {
if ( c - > disconnected ) {
/* Don't try to connect if we're already disconnected */
return ;
@ -466,28 +452,28 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx,
if ( ! c - > backoff_begun ) {
c - > backoff_begun = true ;
c - > backoff_result = grpc_backoff_begin ( exec_ctx , & c - > backoff_state ) ;
continue_connect_locked ( exec_ctx , c ) ;
c - > backoff_result = grpc_backoff_begin ( & c - > backoff_state ) ;
continue_connect_locked ( c ) ;
} else {
GPR_ASSERT ( ! c - > have_alarm ) ;
c - > have_alarm = true ;
const grpc_millis time_til_next =
c - > backoff_result . next_attempt_start_time - grpc_exec_ctx_now ( exec_ctx ) ;
c - > backoff_result . next_attempt_start_time -
grpc_core : : ExecCtx : : Get ( ) - > Now ( ) ;
if ( time_til_next < = 0 ) {
gpr_log ( GPR_INFO , " Retry immediately " ) ;
} else {
gpr_log ( GPR_INFO , " Retry in % " PRIdPTR " milliseconds " , time_til_next ) ;
}
GRPC_CLOSURE_INIT ( & c - > on_alarm , on_alarm , c , grpc_schedule_on_exec_ctx ) ;
grpc_timer_init ( exec_ctx , & c - > alarm ,
c - > backoff_result . next_attempt_start_time , & c - > on_alarm ) ;
grpc_timer_init ( & c - > alarm , c - > backoff_result . next_attempt_start_time ,
& c - > on_alarm ) ;
}
}
void grpc_subchannel_notify_on_state_change (
grpc_exec_ctx * exec_ctx , grpc_subchannel * c ,
grpc_pollset_set * interested_parties , grpc_connectivity_state * state ,
grpc_closure * notify ) {
grpc_subchannel * c , grpc_pollset_set * interested_parties ,
grpc_connectivity_state * state , grpc_closure * notify ) {
external_state_watcher * w ;
if ( state = = nullptr ) {
@ -495,8 +481,8 @@ void grpc_subchannel_notify_on_state_change(
for ( w = c - > root_external_state_watcher . next ;
w ! = & c - > root_external_state_watcher ; w = w - > next ) {
if ( w - > notify = = notify ) {
grpc_connectivity_state_notify_on_state_change (
exec_ctx , & c - > state_tracker , nullptr , & w - > closure ) ;
grpc_connectivity_state_notify_on_state_change ( & c - > state_tracker ,
nullptr , & w - > closure ) ;
}
}
gpr_mu_unlock ( & c - > mu ) ;
@ -508,31 +494,28 @@ void grpc_subchannel_notify_on_state_change(
GRPC_CLOSURE_INIT ( & w - > closure , on_external_state_watcher_done , w ,
grpc_schedule_on_exec_ctx ) ;
if ( interested_parties ! = nullptr ) {
grpc_pollset_set_add_pollset_set ( exec_ctx , c - > pollset_set ,
interested_parties ) ;
grpc_pollset_set_add_pollset_set ( c - > pollset_set , interested_parties ) ;
}
GRPC_SUBCHANNEL_WEAK_REF ( c , " external_state_watcher " ) ;
gpr_mu_lock ( & c - > mu ) ;
w - > next = & c - > root_external_state_watcher ;
w - > prev = w - > next - > prev ;
w - > next - > prev = w - > prev - > next = w ;
grpc_connectivity_state_notify_on_state_change ( exec_ctx , & c - > state_tracker ,
state , & w - > closure ) ;
maybe_start_connecting_locked ( exec_ctx , c ) ;
grpc_connectivity_state_notify_on_state_change ( & c - > state_tracker , state ,
& w - > closure ) ;
maybe_start_connecting_locked ( c ) ;
gpr_mu_unlock ( & c - > mu ) ;
}
}
void grpc_connected_subchannel_process_transport_op (
grpc_exec_ctx * exec_ctx , grpc_connected_subchannel * con ,
grpc_transport_op * op ) {
grpc_connected_subchannel * con , grpc_transport_op * op ) {
grpc_channel_stack * channel_stack = CHANNEL_STACK_FROM_CONNECTION ( con ) ;
grpc_channel_element * top_elem = grpc_channel_stack_element ( channel_stack , 0 ) ;
top_elem - > filter - > start_transport_op ( exec_ctx , top_elem , op ) ;
top_elem - > filter - > start_transport_op ( top_elem , op ) ;
}
static void subchannel_on_child_state_changed ( grpc_exec_ctx * exec_ctx , void * p ,
grpc_error * error ) {
static void subchannel_on_child_state_changed ( void * p , grpc_error * error ) {
state_watcher * sw = ( state_watcher * ) p ;
grpc_subchannel * c = sw - > subchannel ;
gpr_mu * mu = & c - > mu ;
@ -544,24 +527,22 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx* exec_ctx, void* p,
/* any errors on a subchannel ==> we're done, create a new one */
sw - > connectivity_state = GRPC_CHANNEL_SHUTDOWN ;
}
grpc_connectivity_state_set ( exec_ctx , & c - > state_tracker ,
sw - > connectivity_state , GRPC_ERROR_REF ( error ) ,
" reflect_child " ) ;
grpc_connectivity_state_set ( & c - > state_tracker , sw - > connectivity_state ,
GRPC_ERROR_REF ( error ) , " reflect_child " ) ;
if ( sw - > connectivity_state ! = GRPC_CHANNEL_SHUTDOWN ) {
grpc_connected_subchannel_notify_on_state_change (
exec_ctx , GET_CONNECTED_SUBCHANNEL ( c , no_barrier ) , nullptr ,
GET_CONNECTED_SUBCHANNEL ( c , no_barrier ) , nullptr ,
& sw - > connectivity_state , & sw - > closure ) ;
GRPC_SUBCHANNEL_WEAK_REF ( c , " state_watcher " ) ;
sw = nullptr ;
}
gpr_mu_unlock ( mu ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " state_watcher " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " state_watcher " ) ;
gpr_free ( sw ) ;
}
static void connected_subchannel_state_op ( grpc_exec_ctx * exec_ctx ,
grpc_connected_subchannel * con ,
static void connected_subchannel_state_op ( grpc_connected_subchannel * con ,
grpc_pollset_set * interested_parties ,
grpc_connectivity_state * state ,
grpc_closure * closure ) {
@ -571,29 +552,27 @@ static void connected_subchannel_state_op(grpc_exec_ctx* exec_ctx,
op - > on_connectivity_state_change = closure ;
op - > bind_pollset_set = interested_parties ;
elem = grpc_channel_stack_element ( CHANNEL_STACK_FROM_CONNECTION ( con ) , 0 ) ;
elem - > filter - > start_transport_op ( exec_ctx , e lem , op ) ;
elem - > filter - > start_transport_op ( elem , op ) ;
}
void grpc_connected_subchannel_notify_on_state_change (
grpc_exec_ctx * exec_ctx , grpc_connected_subchannel * con ,
grpc_pollset_set * interested_parties , grpc_connectivity_state * state ,
grpc_closure * closure ) {
connected_subchannel_state_op ( exec_ctx , con , interested_parties , state ,
closure ) ;
grpc_connected_subchannel * con , grpc_pollset_set * interested_parties ,
grpc_connectivity_state * state , grpc_closure * closure ) {
connected_subchannel_state_op ( con , interested_parties , state , closure ) ;
}
void grpc_connected_subchannel_ping ( grpc_exec_ctx * exec_ctx ,
grpc_connected_subchannel * con ,
grpc_closure * closure ) {
void grpc_connected_subchannel_ping ( grpc_connected_subchannel * con ,
grpc_closure * on_initiate ,
grpc_closure * on_ack ) {
grpc_transport_op * op = grpc_make_transport_op ( nullptr ) ;
grpc_channel_element * elem ;
op - > send_ping = closure ;
op - > send_ping . on_initiate = on_initiate ;
op - > send_ping . on_ack = on_ack ;
elem = grpc_channel_stack_element ( CHANNEL_STACK_FROM_CONNECTION ( con ) , 0 ) ;
elem - > filter - > start_transport_op ( exec_ctx , e lem , op ) ;
elem - > filter - > start_transport_op ( elem , op ) ;
}
static bool publish_transport_locked ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel * c ) {
static bool publish_transport_locked ( grpc_subchannel * c ) {
grpc_connected_subchannel * con ;
grpc_channel_stack * stk ;
state_watcher * sw_subchannel ;
@ -601,19 +580,18 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx,
/* construct channel stack */
grpc_channel_stack_builder * builder = grpc_channel_stack_builder_create ( ) ;
grpc_channel_stack_builder_set_channel_arguments (
exec_ctx , builder , c - > connecting_result . channel_args ) ;
builder , c - > connecting_result . channel_args ) ;
grpc_channel_stack_builder_set_transport ( builder ,
c - > connecting_result . transport ) ;
if ( ! grpc_channel_init_create_stack ( exec_ctx , builder ,
GRPC_CLIENT_SUBCHANNEL ) ) {
grpc_channel_stack_builder_destroy ( exec_ctx , builder ) ;
if ( ! grpc_channel_init_create_stack ( builder , GRPC_CLIENT_SUBCHANNEL ) ) {
grpc_channel_stack_builder_destroy ( builder ) ;
return false ;
}
grpc_error * error = grpc_channel_stack_builder_finish (
exec_ctx , builder , 0 , 1 , connection_destroy , nullptr , ( void * * ) & con ) ;
builder , 0 , 1 , connection_destroy , nullptr , ( void * * ) & con ) ;
if ( error ! = GRPC_ERROR_NONE ) {
grpc_transport_destroy ( exec_ctx , c - > connecting_result . transport ) ;
grpc_transport_destroy ( c - > connecting_result . transport ) ;
gpr_log ( GPR_ERROR , " error initializing subchannel stack: %s " ,
grpc_error_string ( error ) ) ;
GRPC_ERROR_UNREF ( error ) ;
@ -631,7 +609,7 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx,
if ( c - > disconnected ) {
gpr_free ( sw_subchannel ) ;
grpc_channel_stack_destroy ( exec_ctx , stk ) ;
grpc_channel_stack_destroy ( stk ) ;
gpr_free ( con ) ;
return false ;
}
@ -647,19 +625,18 @@ static bool publish_transport_locked(grpc_exec_ctx* exec_ctx,
/* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF ( c , " state_watcher " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " connecting " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " connecting " ) ;
grpc_connected_subchannel_notify_on_state_change (
exec_ctx , con , c - > pollset_set , & sw_subchannel - > connectivity_state ,
con , c - > pollset_set , & sw_subchannel - > connectivity_state ,
& sw_subchannel - > closure ) ;
/* signal completion */
grpc_connectivity_state_set ( exec_ctx , & c - > state_tracker , GRPC_CHANNEL_READY ,
grpc_connectivity_state_set ( & c - > state_tracker , GRPC_CHANNEL_READY ,
GRPC_ERROR_NONE , " connected " ) ;
return true ;
}
static void subchannel_connected ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void subchannel_connected ( void * arg , grpc_error * error ) {
grpc_subchannel * c = ( grpc_subchannel * ) arg ;
grpc_channel_args * delete_channel_args = c - > connecting_result . channel_args ;
@ -667,13 +644,13 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg,
gpr_mu_lock ( & c - > mu ) ;
c - > connecting = false ;
if ( c - > connecting_result . transport ! = nullptr & &
publish_transport_locked ( exec_ctx , c ) ) {
publish_transport_locked ( c ) ) {
/* do nothing, transport was published */
} else if ( c - > disconnected ) {
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " connecting " ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " connecting " ) ;
} else {
grpc_connectivity_state_set (
exec_ctx , & c - > state_tracker , GRPC_CHANNEL_TRANSIENT_FAILURE ,
& c - > state_tracker , GRPC_CHANNEL_TRANSIENT_FAILURE ,
grpc_error_set_int ( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Connect Failed " , & error , 1 ) ,
GRPC_ERROR_INT_GRPC_STATUS , GRPC_STATUS_UNAVAILABLE ) ,
@ -682,27 +659,26 @@ static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg,
const char * errmsg = grpc_error_string ( error ) ;
gpr_log ( GPR_INFO , " Connect failed: %s " , errmsg ) ;
maybe_start_connecting_locked ( exec_ctx , c ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " connecting " ) ;
maybe_start_connecting_locked ( c ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " connecting " ) ;
}
gpr_mu_unlock ( & c - > mu ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( exec_ctx , c , " connected " ) ;
grpc_channel_args_destroy ( exec_ctx , delete_channel_args ) ;
GRPC_SUBCHANNEL_WEAK_UNREF ( c , " connected " ) ;
grpc_channel_args_destroy ( delete_channel_args ) ;
}
/*
* grpc_subchannel_call implementation
*/
static void subchannel_call_destroy ( grpc_exec_ctx * exec_ctx , void * call ,
grpc_error * error ) {
static void subchannel_call_destroy ( void * call , grpc_error * error ) {
grpc_subchannel_call * c = ( grpc_subchannel_call * ) call ;
GPR_ASSERT ( c - > schedule_closure_after_destroy ! = nullptr ) ;
GPR_TIMER_BEGIN ( " grpc_subchannel_call_unref.destroy " , 0 ) ;
grpc_connected_subchannel * connection = c - > connection ;
grpc_call_stack_destroy ( exec_ctx , SUBCHANNEL_CALL_TO_CALL_STACK ( c ) , nullptr ,
grpc_call_stack_destroy ( SUBCHANNEL_CALL_TO_CALL_STACK ( c ) , nullptr ,
c - > schedule_closure_after_destroy ) ;
GRPC_CONNECTED_SUBCHANNEL_UNREF ( exec_ctx , connection , " subchannel_call " ) ;
GRPC_CONNECTED_SUBCHANNEL_UNREF ( connection , " subchannel_call " ) ;
GPR_TIMER_END ( " grpc_subchannel_call_unref.destroy " , 0 ) ;
}
@ -718,20 +694,18 @@ void grpc_subchannel_call_ref(
GRPC_CALL_STACK_REF ( SUBCHANNEL_CALL_TO_CALL_STACK ( c ) , REF_REASON ) ;
}
void grpc_subchannel_call_unref ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel_call * c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
GRPC_CALL_STACK_UNREF ( exec_ctx , SUBCHANNEL_CALL_TO_CALL_STACK ( c ) , REF_REASON ) ;
void grpc_subchannel_call_unref (
grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS ) {
GRPC_CALL_STACK_UNREF ( SUBCHANNEL_CALL_TO_CALL_STACK ( c ) , REF_REASON ) ;
}
void grpc_subchannel_call_process_op ( grpc_exec_ctx * exec_ctx ,
grpc_subchannel_call * call ,
void grpc_subchannel_call_process_op ( grpc_subchannel_call * call ,
grpc_transport_stream_op_batch * batch ) {
GPR_TIMER_BEGIN ( " grpc_subchannel_call_process_op " , 0 ) ;
grpc_call_stack * call_stack = SUBCHANNEL_CALL_TO_CALL_STACK ( call ) ;
grpc_call_element * top_elem = grpc_call_stack_element ( call_stack , 0 ) ;
GRPC_CALL_LOG_OP ( GPR_INFO , top_elem , batch ) ;
top_elem - > filter - > start_transport_stream_op_batch ( exec_ctx , top_elem , batch ) ;
top_elem - > filter - > start_transport_stream_op_batch ( top_elem , batch ) ;
GPR_TIMER_END ( " grpc_subchannel_call_process_op " , 0 ) ;
}
@ -746,7 +720,7 @@ const grpc_subchannel_key* grpc_subchannel_get_key(
}
grpc_error * grpc_connected_subchannel_create_call (
grpc_exec_ctx * exec_ctx , grpc_ connected_subchannel * con ,
grpc_connected_subchannel * con ,
const grpc_connected_subchannel_call_args * args ,
grpc_subchannel_call * * call ) {
grpc_channel_stack * chanstk = CHANNEL_STACK_FROM_CONNECTION ( con ) ;
@ -764,14 +738,14 @@ grpc_error* grpc_connected_subchannel_create_call(
args - > arena , /* arena */
args - > call_combiner /* call_combiner */
} ;
grpc_error * error = grpc_call_stack_init (
exec_ctx , chanstk , 1 , subchannel_call_destroy , * call , & call_args ) ;
grpc_error * error = grpc_call_stack_init ( chanstk , 1 , subchannel_call_destroy ,
* call , & call_args ) ;
if ( error ! = GRPC_ERROR_NONE ) {
const char * error_string = grpc_error_string ( error ) ;
gpr_log ( GPR_ERROR , " error: %s " , error_string ) ;
return error ;
}
grpc_call_stack_set_pollset_or_pollset_set ( exec_ctx , callstk , args - > pollent ) ;
grpc_call_stack_set_pollset_or_pollset_set ( callstk , args - > pollent ) ;
return GRPC_ERROR_NONE ;
}
@ -780,21 +754,20 @@ grpc_call_stack* grpc_subchannel_call_get_call_stack(
return SUBCHANNEL_CALL_TO_CALL_STACK ( subchannel_call ) ;
}
static void grpc_uri_to_sockaddr ( grpc_exec_ctx * exec_ctx , const char * uri_str ,
static void grpc_uri_to_sockaddr ( const char * uri_str ,
grpc_resolved_address * addr ) {
grpc_uri * uri = grpc_uri_parse ( exec_ctx , uri_str , 0 /* suppress_errors */ ) ;
grpc_uri * uri = grpc_uri_parse ( uri_str , 0 /* suppress_errors */ ) ;
GPR_ASSERT ( uri ! = nullptr ) ;
if ( ! grpc_parse_uri ( uri , addr ) ) memset ( addr , 0 , sizeof ( * addr ) ) ;
grpc_uri_destroy ( uri ) ;
}
void grpc_get_subchannel_address_arg ( grpc_exec_ctx * exec_ctx ,
const grpc_channel_args * args ,
void grpc_get_subchannel_address_arg ( const grpc_channel_args * args ,
grpc_resolved_address * addr ) {
const char * addr_uri_str = grpc_get_subchannel_address_uri_arg ( args ) ;
memset ( addr , 0 , sizeof ( * addr ) ) ;
if ( * addr_uri_str ! = ' \0 ' ) {
grpc_uri_to_sockaddr ( exec_ctx , addr_uri_str , addr ) ;
grpc_uri_to_sockaddr ( addr_uri_str , addr ) ;
}
}