@ -33,6 +33,7 @@
# include "src/core/ext/client_config/client_channel.h"
# include <stdbool.h>
# include <stdio.h>
# include <string.h>
@ -41,10 +42,11 @@
# include <grpc/support/sync.h>
# include <grpc/support/useful.h>
# include "src/core/ext/client_config/subchannel_call_holder .h"
# include "src/core/ext/client_config/subchannel.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/channel/connected_channel.h"
# include "src/core/lib/iomgr/iomgr.h"
# include "src/core/lib/iomgr/polling_entity.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/support/string.h"
# include "src/core/lib/surface/channel.h"
@ -52,30 +54,31 @@
/* Client channel implementation */
typedef grpc_subchannel_call_holder call_data ;
/*************************************************************************
* CHANNEL - WIDE FUNCTIONS
*/
typedef struct client_channel_channel_data {
/** resolver for this channel */
grpc_resolver * resolver ;
/** have we started resolving this channel */
int started_resolving ;
bool started_resolving ;
/** mutex protecting client configuration, including all
variables below in this data structure */
gpr_mu mu_config ;
/** currently active load balancer - guarded by mu_config */
gpr_mu mu ;
/** currently active load balancer - guarded by mu */
grpc_lb_policy * lb_policy ;
/** incoming configuration - set by resolver.next
guarded by mu_config */
grpc_client_config * incoming_configuration ;
/** incoming resolver result - set by resolver.next(), guarded by mu */
grpc_resolver_result * resolver_result ;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures ;
/** resolver callback */
grpc_closure on_config _changed ;
grpc_closure on_resolver_result _changed ;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker ;
/** when an lb_policy arrives, should we try to exit idle */
int exit_idle_when_lb_policy_arrives ;
bool exit_idle_when_lb_policy_arrives ;
/** owning stack */
grpc_channel_stack * owning_stack ;
/** interested parties (owned) */
@ -83,10 +86,8 @@ typedef struct client_channel_channel_data {
} channel_data ;
/** We create one watcher for each new lb_policy that is returned from a
resolver ,
to watch for state changes from the lb_policy . When a state change is seen ,
we
update the channel , and create a new watcher */
resolver , to watch for state changes from the lb_policy . When a state
change is seen , we update the channel , and create a new watcher . */
typedef struct {
channel_data * chand ;
grpc_closure on_changed ;
@ -94,22 +95,6 @@ typedef struct {
grpc_lb_policy * lb_policy ;
} lb_policy_connectivity_watcher ;
typedef struct {
grpc_closure closure ;
grpc_call_element * elem ;
} waiting_call ;
static char * cc_get_peer ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ) {
return grpc_subchannel_call_holder_get_peer ( exec_ctx , elem - > call_data ) ;
}
static void cc_start_transport_stream_op ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_transport_stream_op * op ) {
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
grpc_subchannel_call_holder_perform_op ( exec_ctx , elem - > call_data , op ) ;
}
static void watch_lb_policy ( grpc_exec_ctx * exec_ctx , channel_data * chand ,
grpc_lb_policy * lb_policy ,
grpc_connectivity_state current_state ) ;
@ -156,9 +141,9 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error * error ) {
lb_policy_connectivity_watcher * w = arg ;
gpr_mu_lock ( & w - > chand - > mu_config ) ;
gpr_mu_lock ( & w - > chand - > mu ) ;
on_lb_policy_state_changed_locked ( exec_ctx , w , error ) ;
gpr_mu_unlock ( & w - > chand - > mu_config ) ;
gpr_mu_unlock ( & w - > chand - > mu ) ;
GRPC_CHANNEL_STACK_UNREF ( exec_ctx , w - > chand - > owning_stack , " watch_lb_policy " ) ;
gpr_free ( w ) ;
@ -178,17 +163,17 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
& w - > on_changed ) ;
}
static void cc_on_config _changed( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
static void on_resolver_result _changed( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
channel_data * chand = arg ;
grpc_lb_policy * lb_policy = NULL ;
grpc_lb_policy * old_lb_policy ;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE ;
int exit_idle = 0 ;
bool exit_idle = false ;
grpc_error * state_error = GRPC_ERROR_CREATE ( " No load balancing policy " ) ;
if ( chand - > incoming_configuration ! = NULL ) {
lb_policy = grpc_client_config _get_lb_policy ( chand - > incoming_configuration ) ;
if ( chand - > resolver_result ! = NULL ) {
lb_policy = grpc_resolver_result _get_lb_policy ( chand - > resolver_result ) ;
if ( lb_policy ! = NULL ) {
GRPC_LB_POLICY_REF ( lb_policy , " channel " ) ;
GRPC_LB_POLICY_REF ( lb_policy , " config_change " ) ;
@ -197,17 +182,17 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_check_connectivity ( exec_ctx , lb_policy , & state_error ) ;
}
grpc_client_config _unref ( exec_ctx , chand - > incoming_configuration ) ;
grpc_resolver_result _unref ( exec_ctx , chand - > resolver_result ) ;
}
chand - > incoming_configuration = NULL ;
chand - > resolver_result = NULL ;
if ( lb_policy ! = NULL ) {
grpc_pollset_set_add_pollset_set ( exec_ctx , lb_policy - > interested_parties ,
chand - > interested_parties ) ;
}
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
old_lb_policy = chand - > lb_policy ;
chand - > lb_policy = lb_policy ;
if ( lb_policy ! = NULL ) {
@ -222,8 +207,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if ( lb_policy ! = NULL & & chand - > exit_idle_when_lb_policy_arrives ) {
GRPC_LB_POLICY_REF ( lb_policy , " exit_idle " ) ;
exit_idle = 1 ;
chand - > exit_idle_when_lb_policy_arrives = 0 ;
exit_idle = true ;
chand - > exit_idle_when_lb_policy_arrives = false ;
}
if ( error = = GRPC_ERROR_NONE & & chand - > resolver ) {
@ -233,10 +218,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
watch_lb_policy ( exec_ctx , chand , lb_policy , state ) ;
}
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next ( exec_ctx , chand - > resolver ,
& chand - > incoming_configuration ,
& chand - > on_config_changed ) ;
gpr_mu_unlock ( & chand - > mu_config ) ;
grpc_resolver_next ( exec_ctx , chand - > resolver , & chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
gpr_mu_unlock ( & chand - > mu ) ;
} else {
if ( chand - > resolver ! = NULL ) {
grpc_resolver_shutdown ( exec_ctx , chand - > resolver ) ;
@ -249,7 +233,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING ( " Got config after disconnection " , refs ,
GPR_ARRAY_SIZE ( refs ) ) ,
" resolver_gone " ) ;
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
}
if ( exit_idle ) {
@ -284,7 +268,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op - > bind_pollset ) ;
}
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
if ( op - > on_connectivity_state_change ! = NULL ) {
grpc_connectivity_state_notify_on_state_change (
exec_ctx , & chand - > state_tracker , op - > connectivity_state ,
@ -329,7 +313,189 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
GRPC_ERROR_UNREF ( op - > disconnect_with_error ) ;
}
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
}
/* Constructor for channel_data */
static void cc_init_channel_elem ( grpc_exec_ctx * exec_ctx ,
grpc_channel_element * elem ,
grpc_channel_element_args * args ) {
channel_data * chand = elem - > channel_data ;
memset ( chand , 0 , sizeof ( * chand ) ) ;
GPR_ASSERT ( args - > is_last ) ;
GPR_ASSERT ( elem - > filter = = & grpc_client_channel_filter ) ;
gpr_mu_init ( & chand - > mu ) ;
grpc_closure_init ( & chand - > on_resolver_result_changed ,
on_resolver_result_changed , chand ) ;
chand - > owning_stack = args - > channel_stack ;
grpc_connectivity_state_init ( & chand - > state_tracker , GRPC_CHANNEL_IDLE ,
" client_channel " ) ;
chand - > interested_parties = grpc_pollset_set_create ( ) ;
}
/* Destructor for channel_data */
static void cc_destroy_channel_elem ( grpc_exec_ctx * exec_ctx ,
grpc_channel_element * elem ) {
channel_data * chand = elem - > channel_data ;
if ( chand - > resolver ! = NULL ) {
grpc_resolver_shutdown ( exec_ctx , chand - > resolver ) ;
GRPC_RESOLVER_UNREF ( exec_ctx , chand - > resolver , " channel " ) ;
}
if ( chand - > lb_policy ! = NULL ) {
grpc_pollset_set_del_pollset_set ( exec_ctx ,
chand - > lb_policy - > interested_parties ,
chand - > interested_parties ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , chand - > lb_policy , " channel " ) ;
}
grpc_connectivity_state_destroy ( exec_ctx , & chand - > state_tracker ) ;
grpc_pollset_set_destroy ( chand - > interested_parties ) ;
gpr_mu_destroy ( & chand - > mu ) ;
}
/*************************************************************************
* PER - CALL FUNCTIONS
*/
# define GET_CALL(call_data) \
( ( grpc_subchannel_call * ) ( gpr_atm_acq_load ( & ( call_data ) - > subchannel_call ) ) )
# define CANCELLED_CALL ((grpc_subchannel_call *)1)
typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING ,
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} subchannel_creation_phase ;
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer .
Handles queueing of stream ops until a call object is ready , waiting
for initial metadata before trying to create a call object ,
and handling cancellation gracefully . */
typedef struct client_channel_call_data {
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call ;
gpr_mu mu ;
subchannel_creation_phase creation_phase ;
grpc_connected_subchannel * connected_subchannel ;
grpc_polling_entity * pollent ;
grpc_transport_stream_op * waiting_ops ;
size_t waiting_ops_count ;
size_t waiting_ops_capacity ;
grpc_closure next_step ;
grpc_call_stack * owning_call ;
} call_data ;
static void add_waiting_locked ( call_data * calld , grpc_transport_stream_op * op ) {
GPR_TIMER_BEGIN ( " add_waiting_locked " , 0 ) ;
if ( calld - > waiting_ops_count = = calld - > waiting_ops_capacity ) {
calld - > waiting_ops_capacity = GPR_MAX ( 3 , 2 * calld - > waiting_ops_capacity ) ;
calld - > waiting_ops =
gpr_realloc ( calld - > waiting_ops ,
calld - > waiting_ops_capacity * sizeof ( * calld - > waiting_ops ) ) ;
}
calld - > waiting_ops [ calld - > waiting_ops_count + + ] = * op ;
GPR_TIMER_END ( " add_waiting_locked " , 0 ) ;
}
static void fail_locked ( grpc_exec_ctx * exec_ctx , call_data * calld ,
grpc_error * error ) {
size_t i ;
for ( i = 0 ; i < calld - > waiting_ops_count ; i + + ) {
grpc_transport_stream_op_finish_with_failure (
exec_ctx , & calld - > waiting_ops [ i ] , GRPC_ERROR_REF ( error ) ) ;
}
calld - > waiting_ops_count = 0 ;
GRPC_ERROR_UNREF ( error ) ;
}
typedef struct {
grpc_transport_stream_op * ops ;
size_t nops ;
grpc_subchannel_call * call ;
} retry_ops_args ;
static void retry_ops ( grpc_exec_ctx * exec_ctx , void * args , grpc_error * error ) {
retry_ops_args * a = args ;
size_t i ;
for ( i = 0 ; i < a - > nops ; i + + ) {
grpc_subchannel_call_process_op ( exec_ctx , a - > call , & a - > ops [ i ] ) ;
}
GRPC_SUBCHANNEL_CALL_UNREF ( exec_ctx , a - > call , " retry_ops " ) ;
gpr_free ( a - > ops ) ;
gpr_free ( a ) ;
}
static void retry_waiting_locked ( grpc_exec_ctx * exec_ctx , call_data * calld ) {
retry_ops_args * a = gpr_malloc ( sizeof ( * a ) ) ;
a - > ops = calld - > waiting_ops ;
a - > nops = calld - > waiting_ops_count ;
a - > call = GET_CALL ( calld ) ;
if ( a - > call = = CANCELLED_CALL ) {
gpr_free ( a ) ;
fail_locked ( exec_ctx , calld , GRPC_ERROR_CANCELLED ) ;
return ;
}
calld - > waiting_ops = NULL ;
calld - > waiting_ops_count = 0 ;
calld - > waiting_ops_capacity = 0 ;
GRPC_SUBCHANNEL_CALL_REF ( a - > call , " retry_ops " ) ;
grpc_exec_ctx_sched ( exec_ctx , grpc_closure_create ( retry_ops , a ) ,
GRPC_ERROR_NONE , NULL ) ;
}
static void subchannel_ready ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
call_data * calld = arg ;
gpr_mu_lock ( & calld - > mu ) ;
GPR_ASSERT ( calld - > creation_phase = =
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL ) ;
calld - > creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING ;
if ( calld - > connected_subchannel = = NULL ) {
gpr_atm_no_barrier_store ( & calld - > subchannel_call , 1 ) ;
fail_locked ( exec_ctx , calld , GRPC_ERROR_CREATE_REFERENCING (
" Failed to create subchannel " , & error , 1 ) ) ;
} else if ( 1 = = gpr_atm_acq_load ( & calld - > subchannel_call ) ) {
/* already cancelled before subchannel became ready */
fail_locked ( exec_ctx , calld ,
GRPC_ERROR_CREATE_REFERENCING (
" Cancelled before creating subchannel " , & error , 1 ) ) ;
} else {
grpc_subchannel_call * subchannel_call = NULL ;
grpc_error * new_error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , calld - > pollent ,
& subchannel_call ) ;
if ( new_error ! = GRPC_ERROR_NONE ) {
new_error = grpc_error_add_child ( new_error , error ) ;
subchannel_call = CANCELLED_CALL ;
fail_locked ( exec_ctx , calld , new_error ) ;
}
gpr_atm_rel_store ( & calld - > subchannel_call ,
( gpr_atm ) ( uintptr_t ) subchannel_call ) ;
retry_waiting_locked ( exec_ctx , calld ) ;
}
gpr_mu_unlock ( & calld - > mu ) ;
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
}
static char * cc_get_peer ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
grpc_subchannel_call * subchannel_call = GET_CALL ( calld ) ;
if ( subchannel_call = = NULL | | subchannel_call = = CANCELLED_CALL ) {
return NULL ;
} else {
return grpc_subchannel_call_get_peer ( exec_ctx , subchannel_call ) ;
}
}
typedef struct {
@ -341,11 +507,11 @@ typedef struct {
grpc_closure closure ;
} continue_picking_args ;
static int cc_ pick_subchannel( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_metadata_batch * initial_metadata ,
uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_closure * on_ready ) ;
static bool pick_subchannel ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_metadata_batch * initial_metadata ,
uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_closure * on_ready ) ;
static void continue_picking ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
@ -354,22 +520,21 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
/* cancelled, do nothing */
} else if ( error ! = GRPC_ERROR_NONE ) {
grpc_exec_ctx_sched ( exec_ctx , cpa - > on_ready , GRPC_ERROR_REF ( error ) , NULL ) ;
} else if ( cc_ pick_subchannel( exec_ctx , cpa - > elem , cpa - > initial_metadata ,
cpa - > initial_metadata_flags ,
cpa - > connected_subchannel , cpa - > on_ready ) ) {
} else if ( pick_subchannel ( exec_ctx , cpa - > elem , cpa - > initial_metadata ,
cpa - > initial_metadata_flags ,
cpa - > connected_subchannel , cpa - > on_ready ) ) {
grpc_exec_ctx_sched ( exec_ctx , cpa - > on_ready , GRPC_ERROR_NONE , NULL ) ;
}
gpr_free ( cpa ) ;
}
static int cc_ pick_subchannel( grpc_exec_ctx * exec_ctx , void * elemp ,
grpc_metadata_batch * initial_metadata ,
uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_closure * on_ready ) {
GPR_TIMER_BEGIN ( " cc_ pick_subchannel" , 0 ) ;
static bool pick_subchannel ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
grpc_metadata_batch * initial_metadata ,
uint32_t initial_metadata_flags ,
grpc_connected_subchannel * * connected_subchannel ,
grpc_closure * on_ready ) {
GPR_TIMER_BEGIN ( " pick_subchannel " , 0 ) ;
grpc_call_element * elem = elemp ;
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
continue_picking_args * cpa ;
@ -377,7 +542,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GPR_ASSERT ( connected_subchannel ) ;
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
if ( initial_metadata = = NULL ) {
if ( chand - > lb_policy ! = NULL ) {
grpc_lb_policy_cancel_pick ( exec_ctx , chand - > lb_policy ,
@ -392,28 +557,27 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_ERROR_CREATE ( " Pick cancelled " ) , NULL ) ;
}
}
gpr_mu_unlock ( & chand - > mu_config ) ;
GPR_TIMER_END ( " cc_ pick_subchannel" , 0 ) ;
return 1 ;
gpr_mu_unlock ( & chand - > mu ) ;
GPR_TIMER_END ( " pick_subchannel " , 0 ) ;
return true ;
}
if ( chand - > lb_policy ! = NULL ) {
grpc_lb_policy * lb_policy = chand - > lb_policy ;
int r ;
GRPC_LB_POLICY_REF ( lb_policy , " cc_ pick_subchannel" ) ;
gpr_mu_unlock ( & chand - > mu_config ) ;
GRPC_LB_POLICY_REF ( lb_policy , " pick_subchannel " ) ;
gpr_mu_unlock ( & chand - > mu ) ;
r = grpc_lb_policy_pick ( exec_ctx , lb_policy , calld - > pollent ,
initial_metadata , initial_metadata_flags ,
connected_subchannel , on_ready ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , lb_policy , " cc_ pick_subchannel" ) ;
GPR_TIMER_END ( " cc_ pick_subchannel" , 0 ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , lb_policy , " pick_subchannel " ) ;
GPR_TIMER_END ( " pick_subchannel " , 0 ) ;
return r ;
}
if ( chand - > resolver ! = NULL & & ! chand - > started_resolving ) {
chand - > started_resolving = 1 ;
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next ( exec_ctx , chand - > resolver ,
& chand - > incoming_configuration ,
& chand - > on_config_changed ) ;
grpc_resolver_next ( exec_ctx , chand - > resolver , & chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
}
if ( chand - > resolver ! = NULL ) {
cpa = gpr_malloc ( sizeof ( * cpa ) ) ;
@ -429,67 +593,145 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_exec_ctx_sched ( exec_ctx , on_ready , GRPC_ERROR_CREATE ( " Disconnected " ) ,
NULL ) ;
}
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
GPR_TIMER_END ( " cc_pick_subchannel " , 0 ) ;
return 0 ;
GPR_TIMER_END ( " pick_subchannel " , 0 ) ;
return false ;
}
// The logic here is fairly complicated, due to (a) the fact that we
// need to handle the case where we receive the send op before the
// initial metadata op, and (b) the need for efficiency, especially in
// the streaming case.
// TODO(ctiller): Explain this more thoroughly.
static void cc_start_transport_stream_op ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_transport_stream_op * op ) {
call_data * calld = elem - > call_data ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
/* try to (atomically) get the call */
grpc_subchannel_call * call = GET_CALL ( calld ) ;
GPR_TIMER_BEGIN ( " cc_start_transport_stream_op " , 0 ) ;
if ( call = = CANCELLED_CALL ) {
grpc_transport_stream_op_finish_with_failure ( exec_ctx , op ,
GRPC_ERROR_CANCELLED ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
return ;
}
if ( call ! = NULL ) {
grpc_subchannel_call_process_op ( exec_ctx , call , op ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
return ;
}
/* we failed; lock and figure out what to do */
gpr_mu_lock ( & calld - > mu ) ;
retry :
/* need to recheck that another thread hasn't set the call */
call = GET_CALL ( calld ) ;
if ( call = = CANCELLED_CALL ) {
gpr_mu_unlock ( & calld - > mu ) ;
grpc_transport_stream_op_finish_with_failure ( exec_ctx , op ,
GRPC_ERROR_CANCELLED ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
return ;
}
if ( call ! = NULL ) {
gpr_mu_unlock ( & calld - > mu ) ;
grpc_subchannel_call_process_op ( exec_ctx , call , op ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
return ;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if ( op - > cancel_error ! = GRPC_ERROR_NONE ) {
if ( ! gpr_atm_rel_cas ( & calld - > subchannel_call , 0 ,
( gpr_atm ) ( uintptr_t ) CANCELLED_CALL ) ) {
goto retry ;
} else {
switch ( calld - > creation_phase ) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING :
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( op - > cancel_error ) ) ;
break ;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL :
pick_subchannel ( exec_ctx , elem , NULL , 0 , & calld - > connected_subchannel ,
NULL ) ;
break ;
}
gpr_mu_unlock ( & calld - > mu ) ;
grpc_transport_stream_op_finish_with_failure ( exec_ctx , op ,
GRPC_ERROR_CANCELLED ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
return ;
}
}
/* if we don't have a subchannel, try to get one */
if ( calld - > creation_phase = = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING & &
calld - > connected_subchannel = = NULL & &
op - > send_initial_metadata ! = NULL ) {
calld - > creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL ;
grpc_closure_init ( & calld - > next_step , subchannel_ready , calld ) ;
GRPC_CALL_STACK_REF ( calld - > owning_call , " pick_subchannel " ) ;
if ( pick_subchannel ( exec_ctx , elem , op - > send_initial_metadata ,
op - > send_initial_metadata_flags ,
& calld - > connected_subchannel , & calld - > next_step ) ) {
calld - > creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING ;
GRPC_CALL_STACK_UNREF ( exec_ctx , calld - > owning_call , " pick_subchannel " ) ;
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if ( calld - > creation_phase = = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING & &
calld - > connected_subchannel ! = NULL ) {
grpc_subchannel_call * subchannel_call = NULL ;
grpc_error * error = grpc_connected_subchannel_create_call (
exec_ctx , calld - > connected_subchannel , calld - > pollent ,
& subchannel_call ) ;
if ( error ! = GRPC_ERROR_NONE ) {
subchannel_call = CANCELLED_CALL ;
fail_locked ( exec_ctx , calld , GRPC_ERROR_REF ( error ) ) ;
grpc_transport_stream_op_finish_with_failure ( exec_ctx , op , error ) ;
}
gpr_atm_rel_store ( & calld - > subchannel_call ,
( gpr_atm ) ( uintptr_t ) subchannel_call ) ;
retry_waiting_locked ( exec_ctx , calld ) ;
goto retry ;
}
/* nothing to be done but wait */
add_waiting_locked ( calld , op ) ;
gpr_mu_unlock ( & calld - > mu ) ;
GPR_TIMER_END ( " cc_start_transport_stream_op " , 0 ) ;
}
/* Constructor for call_data */
static grpc_error * init_call_elem ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_call_element_args * args ) {
grpc_subchannel_call_holder_init ( elem - > call_data , cc_pick_subchannel , elem ,
args - > call_stack ) ;
static grpc_error * cc_init_call_elem ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
grpc_call_element_args * args ) {
call_data * calld = elem - > call_data ;
gpr_atm_rel_store ( & calld - > subchannel_call , 0 ) ;
gpr_mu_init ( & calld - > mu ) ;
calld - > connected_subchannel = NULL ;
calld - > waiting_ops = NULL ;
calld - > waiting_ops_count = 0 ;
calld - > waiting_ops_capacity = 0 ;
calld - > creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING ;
calld - > owning_call = args - > call_stack ;
calld - > pollent = NULL ;
return GRPC_ERROR_NONE ;
}
/* Destructor for call_data */
static void destroy_call_elem ( grpc_exec_ctx * exec_ctx , grpc_call_element * elem ,
const grpc_call_final_info * final_info ,
void * and_free_memory ) {
grpc_subchannel_call_holder_destroy ( exec_ctx , elem - > call_data ) ;
gpr_free ( and_free_memory ) ;
}
/* Constructor for channel_data */
static void init_channel_elem ( grpc_exec_ctx * exec_ctx ,
grpc_channel_element * elem ,
grpc_channel_element_args * args ) {
channel_data * chand = elem - > channel_data ;
memset ( chand , 0 , sizeof ( * chand ) ) ;
GPR_ASSERT ( args - > is_last ) ;
GPR_ASSERT ( elem - > filter = = & grpc_client_channel_filter ) ;
gpr_mu_init ( & chand - > mu_config ) ;
grpc_closure_init ( & chand - > on_config_changed , cc_on_config_changed , chand ) ;
chand - > owning_stack = args - > channel_stack ;
grpc_connectivity_state_init ( & chand - > state_tracker , GRPC_CHANNEL_IDLE ,
" client_channel " ) ;
chand - > interested_parties = grpc_pollset_set_create ( ) ;
}
/* Destructor for channel_data */
static void destroy_channel_elem ( grpc_exec_ctx * exec_ctx ,
grpc_channel_element * elem ) {
channel_data * chand = elem - > channel_data ;
if ( chand - > resolver ! = NULL ) {
grpc_resolver_shutdown ( exec_ctx , chand - > resolver ) ;
GRPC_RESOLVER_UNREF ( exec_ctx , chand - > resolver , " channel " ) ;
}
if ( chand - > lb_policy ! = NULL ) {
grpc_pollset_set_del_pollset_set ( exec_ctx ,
chand - > lb_policy - > interested_parties ,
chand - > interested_parties ) ;
GRPC_LB_POLICY_UNREF ( exec_ctx , chand - > lb_policy , " channel " ) ;
static void cc_destroy_call_elem ( grpc_exec_ctx * exec_ctx ,
grpc_call_element * elem ,
const grpc_call_final_info * final_info ,
void * and_free_memory ) {
call_data * calld = elem - > call_data ;
grpc_subchannel_call * call = GET_CALL ( calld ) ;
if ( call ! = NULL & & call ! = CANCELLED_CALL ) {
GRPC_SUBCHANNEL_CALL_UNREF ( exec_ctx , call , " client_channel_destroy_call " ) ;
}
grpc_connectivity_state_destroy ( exec_ctx , & chand - > state_tracker ) ;
grpc_pollset_set_destroy ( chand - > interested_parties ) ;
gpr_mu_destroy ( & chand - > mu_config ) ;
GPR_ASSERT ( calld - > creation_phase = = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING ) ;
gpr_mu_destroy ( & calld - > mu ) ;
GPR_ASSERT ( calld - > waiting_ops_count = = 0 ) ;
gpr_free ( calld - > waiting_ops ) ;
gpr_free ( and_free_memory ) ;
}
static void cc_set_pollset_or_pollset_set ( grpc_exec_ctx * exec_ctx ,
@ -499,16 +741,20 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
calld - > pollent = pollent ;
}
/*************************************************************************
* EXPORTED SYMBOLS
*/
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op ,
cc_start_transport_op ,
sizeof ( call_data ) ,
init_call_elem ,
cc_ init_call_elem,
cc_set_pollset_or_pollset_set ,
destroy_call_elem ,
cc_ destroy_call_elem,
sizeof ( channel_data ) ,
init_channel_elem ,
destroy_channel_elem ,
cc_ init_channel_elem,
cc_ destroy_channel_elem,
cc_get_peer ,
" client-channel " ,
} ;
@ -519,41 +765,40 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
/* post construction initialization: set the transport setup pointer */
grpc_channel_element * elem = grpc_channel_stack_last_element ( channel_stack ) ;
channel_data * chand = elem - > channel_data ;
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
GPR_ASSERT ( ! chand - > resolver ) ;
chand - > resolver = resolver ;
GRPC_RESOLVER_REF ( resolver , " channel " ) ;
if ( ! grpc_closure_list_empty ( chand - > waiting_for_config_closures ) | |
chand - > exit_idle_when_lb_policy_arrives ) {
chand - > started_resolving = 1 ;
chand - > started_resolving = true ;
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
grpc_resolver_next ( exec_ctx , resolver , & chand - > incoming_configuration ,
& chand - > on_config _changed ) ;
grpc_resolver_next ( exec_ctx , resolver , & chand - > resolver_result ,
& chand - > on_resolver_result _changed ) ;
}
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state (
grpc_exec_ctx * exec_ctx , grpc_channel_element * elem , int try_to_connect ) {
channel_data * chand = elem - > channel_data ;
grpc_connectivity_state out ;
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
out = grpc_connectivity_state_check ( & chand - > state_tracker , NULL ) ;
if ( out = = GRPC_CHANNEL_IDLE & & try_to_connect ) {
if ( chand - > lb_policy ! = NULL ) {
grpc_lb_policy_exit_idle ( exec_ctx , chand - > lb_policy ) ;
} else {
chand - > exit_idle_when_lb_policy_arrives = 1 ;
chand - > exit_idle_when_lb_policy_arrives = true ;
if ( ! chand - > started_resolving & & chand - > resolver ! = NULL ) {
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " resolver " ) ;
chand - > started_resolving = 1 ;
grpc_resolver_next ( exec_ctx , chand - > resolver ,
& chand - > incoming_configuration ,
& chand - > on_config_changed ) ;
chand - > started_resolving = true ;
grpc_resolver_next ( exec_ctx , chand - > resolver , & chand - > resolver_result ,
& chand - > on_resolver_result_changed ) ;
}
}
}
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
return out ;
}
@ -588,8 +833,8 @@ void grpc_client_channel_watch_connectivity_state(
grpc_closure_init ( & w - > my_closure , on_external_watch_complete , w ) ;
GRPC_CHANNEL_STACK_REF ( w - > chand - > owning_stack ,
" external_connectivity_watcher " ) ;
gpr_mu_lock ( & chand - > mu_config ) ;
gpr_mu_lock ( & chand - > mu ) ;
grpc_connectivity_state_notify_on_state_change (
exec_ctx , & chand - > state_tracker , state , & w - > my_closure ) ;
gpr_mu_unlock ( & chand - > mu_config ) ;
gpr_mu_unlock ( & chand - > mu ) ;
}