@ -149,45 +149,48 @@ struct QueuedPick {
} ;
struct client_channel_channel_data {
//
// Fields set at construction and never modified.
//
bool deadline_checking_enabled ;
bool enable_retries ;
size_t per_rpc_retry_buffer_size ;
/** combiner protecting all variables below in this data structure */
grpc_combiner * combiner ;
/** owning stack */
grpc_channel_stack * owning_stack ;
/** interested parties (owned) */
grpc_pollset_set * interested_parties ;
// Client channel factory.
grpc_core : : ClientChannelFactory * client_channel_factory ;
// Subchannel pool.
grpc_core : : RefCountedPtr < grpc_core : : SubchannelPoolInterface > subchannel_pool ;
grpc_core : : channelz : : ClientChannelNode * channelz_node ;
// Resolving LB policy.
grpc_core : : OrphanablePtr < LoadBalancingPolicy > resolving_lb_policy ;
// Subchannel picker from LB policy.
//
// Fields used in the data plane. Protected by data_plane_combiner.
//
grpc_combiner * data_plane_combiner ;
grpc_core : : UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker ;
// Linked list of queued picks.
QueuedPick * queued_picks ;
bool have_service_config ;
/** retry throttle data from service config */
QueuedPick * queued_picks ; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data ;
grpc_core : : RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ;
/** per-method service config data */
grpc_core : : RefCountedPtr < ClientChannelMethodParamsTable > method_params_table ;
/* the following properties are guarded by a mutex since APIs require them
to be instantaneously available */
//
// Fields used in the control plane. Protected by combiner.
//
grpc_combiner * combiner ;
grpc_pollset_set * interested_parties ;
grpc_core : : RefCountedPtr < grpc_core : : SubchannelPoolInterface > subchannel_pool ;
grpc_core : : OrphanablePtr < LoadBalancingPolicy > resolving_lb_policy ;
grpc_connectivity_state_tracker state_tracker ;
//
// Fields accessed from both data plane and control plane combiners.
//
grpc_core : : Atomic < grpc_error * > disconnect_error ;
// The following properties are guarded by a mutex since APIs require them
// to be instantaneously available.
gpr_mu info_mu ;
grpc_core : : UniquePtr < char > info_lb_policy_name ;
grpc_core : : UniquePtr < char > info_service_config_json ;
grpc_connectivity_state_tracker state_tracker ;
grpc_error * disconnect_error ;
grpc_core : : ManualConstructor <
grpc_core : : ExternalConnectivityWatcher : : WatcherList >
external_connectivity_watcher_list ;
@ -214,30 +217,98 @@ static const char* get_channel_connectivity_state_change_string(
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
static void set_connectivity_state_and_picker_locked (
channel_data * chand , grpc_connectivity_state state , grpc_error * state_error ,
const char * reason ,
grpc_core : : UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker ) {
// Update connectivity state.
grpc_connectivity_state_set ( & chand - > state_tracker , state , state_error ,
reason ) ;
if ( chand - > channelz_node ! = nullptr ) {
chand - > channelz_node - > AddTraceEvent (
grpc_core : : channelz : : ChannelTrace : : Severity : : Info ,
grpc_slice_from_static_string (
get_channel_connectivity_state_change_string ( state ) ) ) ;
namespace grpc_core {
namespace {
// A fire-and-forget class that sets the channel's connectivity state
// and then hops into the data plane combiner to update the picker.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ConnectivityStateAndPickerSetter {
public :
ConnectivityStateAndPickerSetter (
channel_data * chand , grpc_connectivity_state state ,
grpc_error * state_error , const char * reason ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker )
: chand_ ( chand ) , picker_ ( std : : move ( picker ) ) {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set ( & chand - > state_tracker , state , state_error ,
reason ) ;
if ( chand - > channelz_node ! = nullptr ) {
chand - > channelz_node - > AddTraceEvent (
channelz : : ChannelTrace : : Severity : : Info ,
grpc_slice_from_static_string (
get_channel_connectivity_state_change_string ( state ) ) ) ;
}
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack ,
" ConnectivityStateAndPickerSetter " ) ;
GRPC_CLOSURE_INIT ( & closure_ , SetPicker , this ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ) ) ;
GRPC_CLOSURE_SCHED ( & closure_ , GRPC_ERROR_NONE ) ;
}
// Update picker.
chand - > picker = std : : move ( picker ) ;
// Re-process queued picks.
for ( QueuedPick * pick = chand - > queued_picks ; pick ! = nullptr ;
pick = pick - > next ) {
start_pick_locked ( pick - > elem , GRPC_ERROR_NONE ) ;
private :
static void SetPicker ( void * arg , grpc_error * ignored ) {
auto * self = static_cast < ConnectivityStateAndPickerSetter * > ( arg ) ;
// Update picker.
self - > chand_ - > picker = std : : move ( self - > picker_ ) ;
// Re-process queued picks.
for ( QueuedPick * pick = self - > chand_ - > queued_picks ; pick ! = nullptr ;
pick = pick - > next ) {
start_pick_locked ( pick - > elem , GRPC_ERROR_NONE ) ;
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack ,
" ConnectivityStateAndPickerSetter " ) ;
Delete ( self ) ;
}
}
namespace grpc_core {
namespace {
channel_data * chand_ ;
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker_ ;
grpc_closure closure_ ;
} ;
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ServiceConfigSetter {
public :
ServiceConfigSetter (
channel_data * chand ,
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ,
RefCountedPtr < ClientChannelMethodParamsTable > method_params_table )
: chand_ ( chand ) ,
retry_throttle_data_ ( std : : move ( retry_throttle_data ) ) ,
method_params_table_ ( std : : move ( method_params_table ) ) {
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack , " ServiceConfigSetter " ) ;
GRPC_CLOSURE_INIT ( & closure_ , SetServiceConfigData , this ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ) ) ;
GRPC_CLOSURE_SCHED ( & closure_ , GRPC_ERROR_NONE ) ;
}
private :
static void SetServiceConfigData ( void * arg , grpc_error * ignored ) {
ServiceConfigSetter * self = static_cast < ServiceConfigSetter * > ( arg ) ;
channel_data * chand = self - > chand_ ;
// Update channel state.
chand - > received_service_config_data = true ;
chand - > retry_throttle_data = std : : move ( self - > retry_throttle_data_ ) ;
chand - > method_params_table = std : : move ( self - > method_params_table_ ) ;
// Apply service config to queued picks.
for ( QueuedPick * pick = chand - > queued_picks ; pick ! = nullptr ;
pick = pick - > next ) {
maybe_apply_service_config_to_call_locked ( pick - > elem ) ;
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack , " ServiceConfigSetter " ) ;
Delete ( self ) ;
}
channel_data * chand_ ;
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data_ ;
RefCountedPtr < ClientChannelMethodParamsTable > method_params_table_ ;
grpc_closure closure_ ;
} ;
//
// ExternalConnectivityWatcher::WatcherList
@ -387,8 +458,10 @@ class ClientChannelControlHelper
void UpdateState (
grpc_connectivity_state state , grpc_error * state_error ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker ) override {
grpc_error * disconnect_error =
chand_ - > disconnect_error . Load ( grpc_core : : MemoryOrder : : ACQUIRE ) ;
if ( grpc_client_channel_routing_trace . enabled ( ) ) {
const char * extra = chand_ - > disconnect_error = = GRPC_ERROR_NONE
const char * extra = disconnect_error = = GRPC_ERROR_NONE
? " "
: " (ignoring -- channel shutting down) " ;
gpr_log ( GPR_INFO , " chand=%p: update: state=%s error=%s picker=%p%s " ,
@ -396,9 +469,10 @@ class ClientChannelControlHelper
grpc_error_string ( state_error ) , picker . get ( ) , extra ) ;
}
// Do update only if not shutting down.
if ( chand_ - > disconnect_error = = GRPC_ERROR_NONE ) {
set_connectivity_state_and_picker_locked ( chand_ , state , state_error ,
" helper " , std : : move ( picker ) ) ;
if ( disconnect_error = = GRPC_ERROR_NONE ) {
// Will delete itself.
New < ConnectivityStateAndPickerSetter > ( chand_ , state , state_error ,
" helper " , std : : move ( picker ) ) ;
} else {
GRPC_ERROR_UNREF ( state_error ) ;
}
@ -420,7 +494,6 @@ static bool process_resolver_result_locked(
void * arg , grpc_core : : Resolver : : Result * result , const char * * lb_policy_name ,
grpc_core : : RefCountedPtr < LoadBalancingPolicy : : Config > * lb_policy_config ) {
channel_data * chand = static_cast < channel_data * > ( arg ) ;
chand - > have_service_config = true ;
ProcessedResolverResult resolver_result ( result , chand - > enable_retries ) ;
grpc_core : : UniquePtr < char > service_config_json =
resolver_result . service_config_json ( ) ;
@ -428,9 +501,11 @@ static bool process_resolver_result_locked(
gpr_log ( GPR_INFO , " chand=%p: resolver returned service config: \" %s \" " ,
chand , service_config_json . get ( ) ) ;
}
// Update channel state.
chand - > retry_throttle_data = resolver_result . retry_throttle_data ( ) ;
chand - > method_params_table = resolver_result . method_params_table ( ) ;
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
grpc_core : : New < grpc_core : : ServiceConfigSetter > (
chand , resolver_result . retry_throttle_data ( ) ,
resolver_result . method_params_table ( ) ) ;
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock ( & chand - > info_mu ) ;
chand - > info_lb_policy_name = resolver_result . lb_policy_name ( ) ;
@ -445,11 +520,6 @@ static bool process_resolver_result_locked(
// Return results.
* lb_policy_name = chand - > info_lb_policy_name . get ( ) ;
* lb_policy_config = resolver_result . lb_policy_config ( ) ;
// Apply service config to queued picks.
for ( QueuedPick * pick = chand - > queued_picks ; pick ! = nullptr ;
pick = pick - > next ) {
maybe_apply_service_config_to_call_locked ( pick - > elem ) ;
}
return service_config_changed ;
}
@ -507,12 +577,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
if ( op - > disconnect_with_error ! = GRPC_ERROR_NONE ) {
chand - > disconnect_error = op - > disconnect_with_error ;
grpc_error * error = GRPC_ERROR_NONE ;
GPR_ASSERT ( chand - > disconnect_error . CompareExchangeStrong (
& error , op - > disconnect_with_error , grpc_core : : MemoryOrder : : ACQ_REL ,
grpc_core : : MemoryOrder : : ACQUIRE ) ) ;
grpc_pollset_set_del_pollset_set (
chand - > resolving_lb_policy - > interested_parties ( ) ,
chand - > interested_parties ) ;
chand - > resolving_lb_policy . reset ( ) ;
set_connectivity_state_and_picker_locked (
// Will delete itself.
grpc_core : : New < grpc_core : : ConnectivityStateAndPickerSetter > (
chand , GRPC_CHANNEL_SHUTDOWN , GRPC_ERROR_REF ( op - > disconnect_with_error ) ,
" shutdown from API " ,
grpc_core : : UniquePtr < LoadBalancingPolicy : : SubchannelPicker > (
@ -562,10 +636,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
GPR_ASSERT ( args - > is_last ) ;
GPR_ASSERT ( elem - > filter = = & grpc_client_channel_filter ) ;
// Initialize data members.
chand - > data_plane_combiner = grpc_combiner_create ( ) ;
chand - > combiner = grpc_combiner_create ( ) ;
grpc_connectivity_state_init ( & chand - > state_tracker , GRPC_CHANNEL_IDLE ,
" client_channel " ) ;
chand - > disconnect_error = GRPC_ERROR_NONE ;
chand - > disconnect_error . Store ( GRPC_ERROR_NONE ,
grpc_core : : MemoryOrder : : RELAXED ) ;
gpr_mu_init ( & chand - > info_mu ) ;
chand - > external_connectivity_watcher_list . Init ( ) ;
chand - > owning_stack = args - > channel_stack ;
@ -671,8 +747,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand - > method_params_table . reset ( ) ;
grpc_client_channel_stop_backup_polling ( chand - > interested_parties ) ;
grpc_pollset_set_destroy ( chand - > interested_parties ) ;
GRPC_COMBINER_UNREF ( chand - > data_plane_combiner , " client_channel " ) ;
GRPC_COMBINER_UNREF ( chand - > combiner , " client_channel " ) ;
GRPC_ERROR_UNREF ( chand - > disconnect_error ) ;
GRPC_ERROR_UNREF (
chand - > disconnect_error . Load ( grpc_core : : MemoryOrder : : RELAXED ) ) ;
grpc_connectivity_state_destroy ( & chand - > state_tracker ) ;
gpr_mu_destroy ( & chand - > info_mu ) ;
chand - > external_connectivity_watcher_list . Destroy ( ) ;
@ -1421,7 +1499,7 @@ static void do_retry(grpc_call_element* elem,
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT ( & calld - > pick_closure , start_pick_locked , elem ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_combiner_scheduler ( chand - > data_plane_ combiner) ) ;
grpc_timer_init ( & calld - > retry_timer , next_attempt_time , & calld - > pick_closure ) ;
// Update bookkeeping.
if ( retry_state ! = nullptr ) retry_state - > retry_dispatched = true ;
@ -2648,7 +2726,7 @@ class QueuedPickCanceller {
auto * chand = static_cast < channel_data * > ( elem - > channel_data ) ;
GRPC_CALL_STACK_REF ( calld - > owning_call , " QueuedPickCanceller " ) ;
GRPC_CLOSURE_INIT ( & closure_ , & CancelLocked , this ,
grpc_combiner_scheduler ( chand - > combiner ) ) ;
grpc_combiner_scheduler ( chand - > data_plane_ combiner) ) ;
grpc_call_combiner_set_notify_on_cancel ( calld - > call_combiner , & closure_ ) ;
}
@ -2788,7 +2866,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
call_data * calld = static_cast < call_data * > ( elem - > call_data ) ;
// Apply service config data to the call only once, and only if the
// channel has the data available.
if ( GPR_LIKELY ( chand - > have_service_config & &
if ( GPR_LIKELY ( chand - > received_service_config_data & &
! calld - > service_config_applied ) ) {
calld - > service_config_applied = true ;
apply_service_config_to_call_locked ( elem ) ;
@ -2836,7 +2914,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
. send_initial_metadata_flags ;
// Apply service config to call if needed.
maybe_apply_service_config_to_call_locked ( elem ) ;
// When done, we schedule this closure to leave the channel combiner.
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT ( & calld - > pick_closure , pick_done , elem ,
grpc_schedule_on_exec_ctx ) ;
// Attempt pick.
@ -2851,12 +2929,14 @@ static void start_pick_locked(void* arg, grpc_error* error) {
grpc_error_string ( error ) ) ;
}
switch ( pick_result ) {
case LoadBalancingPolicy : : PICK_TRANSIENT_FAILURE :
case LoadBalancingPolicy : : PICK_TRANSIENT_FAILURE : {
// If we're shutting down, fail all RPCs.
if ( chand - > disconnect_error ! = GRPC_ERROR_NONE ) {
grpc_error * disconnect_error =
chand - > disconnect_error . Load ( grpc_core : : MemoryOrder : : ACQUIRE ) ;
if ( disconnect_error ! = GRPC_ERROR_NONE ) {
GRPC_ERROR_UNREF ( error ) ;
GRPC_CLOSURE_SCHED ( & calld - > pick_closure ,
GRPC_ERROR_REF ( chand - > disconnect_error ) ) ;
GRPC_ERROR_REF ( disconnect_error ) ) ;
break ;
}
// If wait_for_ready is false, then the error indicates the RPC
@ -2882,7 +2962,8 @@ static void start_pick_locked(void* arg, grpc_error* error) {
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF ( error ) ;
// Fallthrough
}
// Fallthrough
case LoadBalancingPolicy : : PICK_QUEUE :
if ( ! calld - > pick_queued ) add_call_to_queued_picks_locked ( elem ) ;
break ;
@ -2976,7 +3057,8 @@ static void cc_start_transport_stream_op_batch(
}
GRPC_CLOSURE_SCHED (
GRPC_CLOSURE_INIT ( & batch - > handler_private . closure , start_pick_locked ,
elem , grpc_combiner_scheduler ( chand - > combiner ) ) ,
elem ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ) ) ,
GRPC_ERROR_NONE ) ;
} else {
// For all other batches, release the call combiner.