@ -130,7 +130,7 @@ class ChannelData {
return disconnect_error_ . Load ( MemoryOrder : : ACQUIRE ) ;
}
grpc_combiner * data_plane_combiner ( ) const { return data_plane_combiner _; }
Mutex * data_plane_mu ( ) const { return & data_plane_mu _; }
LoadBalancingPolicy : : SubchannelPicker * picker ( ) const {
return picker_ . get ( ) ;
@ -166,8 +166,6 @@ class ChannelData {
private :
class SubchannelWrapper ;
class ConnectivityStateAndPickerSetter ;
class ServiceConfigSetter ;
class ClientChannelControlHelper ;
class ExternalConnectivityWatcher {
@ -214,6 +212,14 @@ class ChannelData {
ChannelData ( grpc_channel_element_args * args , grpc_error * * error ) ;
~ ChannelData ( ) ;
void UpdateStateAndPickerLocked (
grpc_connectivity_state state , const char * reason ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker ) ;
void UpdateServiceConfigLocked (
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ,
RefCountedPtr < ServiceConfig > service_config ) ;
void CreateResolvingLoadBalancingPolicyLocked ( ) ;
void DestroyResolvingLoadBalancingPolicyLocked ( ) ;
@ -250,9 +256,9 @@ class ChannelData {
channelz : : ChannelNode * channelz_node_ ;
//
// Fields used in the data plane. Guarded by data_plane_combiner .
// Fields used in the data plane. Guarded by data_plane_mu .
//
grpc_combiner * data_plane_combiner _;
mutable Mutex data_plane_mu _;
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker_ ;
QueuedPick * queued_picks_ = nullptr ; // Linked list of queued picks.
// Data from service config.
@ -282,13 +288,13 @@ class ChannelData {
Map < SubchannelWrapper * , bool > subchannel_wrappers_ ;
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
// Updates are queued here in the control plane combiner and then applied
// in the data plane combiner when the picker is updated.
// in the data plane mutex when the picker is updated.
Map < RefCountedPtr < SubchannelWrapper > , RefCountedPtr < ConnectedSubchannel > ,
RefCountedPtrLess < SubchannelWrapper > >
pending_subchannel_updates_ ;
//
// Fields accessed from both data plane and control plane combiners .
// Fields accessed from both data plane mutex and control plane combiner.
//
Atomic < grpc_error * > disconnect_error_ ;
@ -322,7 +328,16 @@ class CallData {
void MaybeApplyServiceConfigToCallLocked ( grpc_call_element * elem ) ;
// Invoked by channel for queued picks when the picker is updated.
static void StartPickLocked ( void * arg , grpc_error * error ) ;
static void PickSubchannel ( void * arg , grpc_error * error ) ;
// Helper function for performing a pick while holding the data plane
// mutex. Returns true if the pick is complete, in which case the caller
// must invoke PickDone() or AsyncPickDone() with the returned error.
bool PickSubchannelLocked ( grpc_call_element * elem , grpc_error * * error ) ;
// Schedules a callback to process the completed pick. The callback
// will not run until after this method returns.
void AsyncPickDone ( grpc_call_element * elem , grpc_error * error ) ;
private :
class QueuedPickCanceller ;
@ -705,7 +720,7 @@ class CallData {
grpc_deadline_state deadline_state_ ;
grpc_slice path_ ; // Request path.
gpr_timespec call_start_time_ ;
gpr_cycle_counter call_start_time_ ;
grpc_millis deadline_ ;
Arena * arena_ ;
grpc_call_stack * owning_call_ ;
@ -931,7 +946,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
return connected_subchannel_ . get ( ) ;
}
// Caller must be holding the data-plane combiner .
// Caller must be holding the data-plane mutex .
ConnectedSubchannel * connected_subchannel_in_data_plane ( ) const {
return connected_subchannel_in_data_plane_ . get ( ) ;
}
@ -1059,7 +1074,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
// Update the connected subchannel only if the channel is not shutting
// down. This is because once the channel is shutting down, we
// ignore picker updates from the LB policy, which means that
// ConnectivityStateAndPickerSetter will never process the entries
// UpdateStateAndPickerLocked() will never process the entries
// in chand_->pending_subchannel_updates_. So we don't want to add
// entries there that will never be processed, since that would
// leave dangling refs to the channel and prevent its destruction.
@ -1069,7 +1084,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
if ( connected_subchannel_ ! = connected_subchannel ) {
connected_subchannel_ = std : : move ( connected_subchannel ) ;
// Record the new connected subchannel so that it can be updated
// in the data plane combiner the next time the picker is updated.
// in the data plane mutex the next time the picker is updated.
chand_ - > pending_subchannel_updates_ [ Ref (
DEBUG_LOCATION , " ConnectedSubchannelUpdate " ) ] = connected_subchannel_ ;
}
@ -1086,159 +1101,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
Map < ConnectivityStateWatcherInterface * , WatcherWrapper * > watcher_map_ ;
// To be accessed only in the control plane combiner.
RefCountedPtr < ConnectedSubchannel > connected_subchannel_ ;
// To be accessed only in the data plane combiner .
// To be accessed only in the data plane mutex .
RefCountedPtr < ConnectedSubchannel > connected_subchannel_in_data_plane_ ;
} ;
//
// ChannelData::ConnectivityStateAndPickerSetter
//
// A fire-and-forget class that sets the channel's connectivity state
// and then hops into the data plane combiner to update the picker.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ChannelData : : ConnectivityStateAndPickerSetter {
public :
ConnectivityStateAndPickerSetter (
ChannelData * chand , grpc_connectivity_state state , const char * reason ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker )
: chand_ ( chand ) , picker_ ( std : : move ( picker ) ) {
// Clean the control plane when entering IDLE, while holding control plane
// combiner.
if ( picker_ = = nullptr ) {
chand - > health_check_service_name_ . reset ( ) ;
chand - > saved_service_config_ . reset ( ) ;
chand - > received_first_resolver_result_ = false ;
}
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set ( & chand - > state_tracker_ , state , reason ) ;
if ( chand - > channelz_node_ ! = nullptr ) {
chand - > channelz_node_ - > SetConnectivityState ( state ) ;
chand - > channelz_node_ - > AddTraceEvent (
channelz : : ChannelTrace : : Severity : : Info ,
grpc_slice_from_static_string (
channelz : : ChannelNode : : GetChannelConnectivityStateChangeString (
state ) ) ) ;
}
// Grab any pending subchannel updates.
pending_subchannel_updates_ =
std : : move ( chand_ - > pending_subchannel_updates_ ) ;
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack_ ,
" ConnectivityStateAndPickerSetter " ) ;
GRPC_CLOSURE_INIT ( & closure_ , SetPickerInDataPlane , this ,
grpc_combiner_scheduler ( chand - > data_plane_combiner_ ) ) ;
GRPC_CLOSURE_SCHED ( & closure_ , GRPC_ERROR_NONE ) ;
}
private :
static void SetPickerInDataPlane ( void * arg , grpc_error * ignored ) {
auto * self = static_cast < ConnectivityStateAndPickerSetter * > ( arg ) ;
// Handle subchannel updates.
for ( auto & p : self - > pending_subchannel_updates_ ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: updating subchannel wrapper %p data plane "
" connected_subchannel to %p " ,
self - > chand_ , p . first . get ( ) , p . second . get ( ) ) ;
}
p . first - > set_connected_subchannel_in_data_plane ( std : : move ( p . second ) ) ;
}
// Swap out the picker. We hang on to the old picker so that it can
// be deleted in the control-plane combiner, since that's where we need
// to unref the subchannel wrappers that are reffed by the picker.
self - > picker_ . swap ( self - > chand_ - > picker_ ) ;
// Clean the data plane if the updated picker is nullptr.
if ( self - > chand_ - > picker_ = = nullptr ) {
self - > chand_ - > received_service_config_data_ = false ;
self - > chand_ - > retry_throttle_data_ . reset ( ) ;
self - > chand_ - > service_config_ . reset ( ) ;
}
// Re-process queued picks.
for ( QueuedPick * pick = self - > chand_ - > queued_picks_ ; pick ! = nullptr ;
pick = pick - > next ) {
CallData : : StartPickLocked ( pick - > elem , GRPC_ERROR_NONE ) ;
}
// Pop back into the control plane combiner to delete ourself, so
// that we make sure to unref subchannel wrappers there. This
// includes both the ones reffed by the old picker (now stored in
// self->picker_) and the ones in self->pending_subchannel_updates_.
GRPC_CLOSURE_INIT ( & self - > closure_ , CleanUpInControlPlane , self ,
grpc_combiner_scheduler ( self - > chand_ - > combiner_ ) ) ;
GRPC_CLOSURE_SCHED ( & self - > closure_ , GRPC_ERROR_NONE ) ;
}
static void CleanUpInControlPlane ( void * arg , grpc_error * ignored ) {
auto * self = static_cast < ConnectivityStateAndPickerSetter * > ( arg ) ;
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack_ ,
" ConnectivityStateAndPickerSetter " ) ;
Delete ( self ) ;
}
ChannelData * chand_ ;
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker_ ;
Map < RefCountedPtr < SubchannelWrapper > , RefCountedPtr < ConnectedSubchannel > ,
RefCountedPtrLess < SubchannelWrapper > >
pending_subchannel_updates_ ;
grpc_closure closure_ ;
} ;
//
// ChannelData::ServiceConfigSetter
//
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ChannelData : : ServiceConfigSetter {
public :
ServiceConfigSetter (
ChannelData * chand ,
Optional < internal : : ClientChannelGlobalParsedConfig : : RetryThrottling >
retry_throttle_data ,
RefCountedPtr < ServiceConfig > service_config )
: chand_ ( chand ) ,
retry_throttle_data_ ( retry_throttle_data ) ,
service_config_ ( std : : move ( service_config ) ) {
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack_ , " ServiceConfigSetter " ) ;
GRPC_CLOSURE_INIT ( & closure_ , SetServiceConfigData , this ,
grpc_combiner_scheduler ( chand - > data_plane_combiner_ ) ) ;
GRPC_CLOSURE_SCHED ( & closure_ , GRPC_ERROR_NONE ) ;
}
private :
static void SetServiceConfigData ( void * arg , grpc_error * ignored ) {
ServiceConfigSetter * self = static_cast < ServiceConfigSetter * > ( arg ) ;
ChannelData * chand = self - > chand_ ;
// Update channel state.
chand - > received_service_config_data_ = true ;
if ( self - > retry_throttle_data_ . has_value ( ) ) {
chand - > retry_throttle_data_ =
internal : : ServerRetryThrottleMap : : GetDataForServer (
chand - > server_name_ . get ( ) ,
self - > retry_throttle_data_ . value ( ) . max_milli_tokens ,
self - > retry_throttle_data_ . value ( ) . milli_token_ratio ) ;
}
chand - > service_config_ = std : : move ( self - > service_config_ ) ;
// Apply service config to queued picks.
for ( QueuedPick * pick = chand - > queued_picks_ ; pick ! = nullptr ;
pick = pick - > next ) {
CallData * calld = static_cast < CallData * > ( pick - > elem - > call_data ) ;
calld - > MaybeApplyServiceConfigToCallLocked ( pick - > elem ) ;
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack_ ,
" ServiceConfigSetter " ) ;
Delete ( self ) ;
}
ChannelData * chand_ ;
Optional < internal : : ClientChannelGlobalParsedConfig : : RetryThrottling >
retry_throttle_data_ ;
RefCountedPtr < ServiceConfig > service_config_ ;
grpc_closure closure_ ;
} ;
//
// ChannelData::ExternalConnectivityWatcher::WatcherList
//
@ -1409,9 +1275,7 @@ class ChannelData::ClientChannelControlHelper
}
// Do update only if not shutting down.
if ( disconnect_error = = GRPC_ERROR_NONE ) {
// Will delete itself.
New < ConnectivityStateAndPickerSetter > ( chand_ , state , " helper " ,
std : : move ( picker ) ) ;
chand_ - > UpdateStateAndPickerLocked ( state , " helper " , std : : move ( picker ) ) ;
}
}
@ -1495,7 +1359,6 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_ (
ClientChannelFactory : : GetFromChannelArgs ( args - > channel_args ) ) ,
channelz_node_ ( GetChannelzNode ( args - > channel_args ) ) ,
data_plane_combiner_ ( grpc_combiner_create ( ) ) ,
combiner_ ( grpc_combiner_create ( ) ) ,
interested_parties_ ( grpc_pollset_set_create ( ) ) ,
subchannel_pool_ ( GetSubchannelPool ( args - > channel_args ) ) ,
@ -1568,13 +1431,108 @@ ChannelData::~ChannelData() {
// Stop backup polling.
grpc_client_channel_stop_backup_polling ( interested_parties_ ) ;
grpc_pollset_set_destroy ( interested_parties_ ) ;
GRPC_COMBINER_UNREF ( data_plane_combiner_ , " client_channel " ) ;
GRPC_COMBINER_UNREF ( combiner_ , " client_channel " ) ;
GRPC_ERROR_UNREF ( disconnect_error_ . Load ( MemoryOrder : : RELAXED ) ) ;
grpc_connectivity_state_destroy ( & state_tracker_ ) ;
gpr_mu_destroy ( & info_mu_ ) ;
}
void ChannelData : : UpdateStateAndPickerLocked (
grpc_connectivity_state state , const char * reason ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > picker ) {
// Clean the control plane when entering IDLE.
if ( picker_ = = nullptr ) {
health_check_service_name_ . reset ( ) ;
saved_service_config_ . reset ( ) ;
received_first_resolver_result_ = false ;
}
// Update connectivity state.
grpc_connectivity_state_set ( & state_tracker_ , state , reason ) ;
if ( channelz_node_ ! = nullptr ) {
channelz_node_ - > SetConnectivityState ( state ) ;
channelz_node_ - > AddTraceEvent (
channelz : : ChannelTrace : : Severity : : Info ,
grpc_slice_from_static_string (
channelz : : ChannelNode : : GetChannelConnectivityStateChangeString (
state ) ) ) ;
}
// Grab data plane lock to do subchannel updates and update the picker.
//
// Note that we want to minimize the work done while holding the data
// plane lock, to keep the critical section small. So, for all of the
// objects that we might wind up unreffing here, we actually hold onto
// the refs until after we release the lock, and then unref them at
// that point. This includes the following:
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
// - ref stored in retry_throttle_data_
// - ref stored in service_config_
// - ownership of the existing picker in picker_
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data_to_unref ;
RefCountedPtr < ServiceConfig > service_config_to_unref ;
{
MutexLock lock ( & data_plane_mu_ ) ;
// Handle subchannel updates.
for ( auto & p : pending_subchannel_updates_ ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: updating subchannel wrapper %p data plane "
" connected_subchannel to %p " ,
this , p . first . get ( ) , p . second . get ( ) ) ;
}
// Note: We do not remove the entry from pending_subchannel_updates_
// here, since this would unref the subchannel wrapper; instead,
// we wait until we've released the lock to clear the map.
p . first - > set_connected_subchannel_in_data_plane ( std : : move ( p . second ) ) ;
}
// Swap out the picker.
// Note: Original value will be destroyed after the lock is released.
picker_ . swap ( picker ) ;
// Clean the data plane if the updated picker is nullptr.
if ( picker_ = = nullptr ) {
received_service_config_data_ = false ;
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std : : move ( retry_throttle_data_ ) ;
service_config_to_unref = std : : move ( service_config_ ) ;
}
// Re-process queued picks.
for ( QueuedPick * pick = queued_picks_ ; pick ! = nullptr ; pick = pick - > next ) {
grpc_call_element * elem = pick - > elem ;
CallData * calld = static_cast < CallData * > ( elem - > call_data ) ;
grpc_error * error = GRPC_ERROR_NONE ;
if ( calld - > PickSubchannelLocked ( elem , & error ) ) {
calld - > AsyncPickDone ( elem , error ) ;
}
}
}
// Clear the pending update map after releasing the lock, to keep the
// critical section small.
pending_subchannel_updates_ . clear ( ) ;
}
void ChannelData : : UpdateServiceConfigLocked (
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ,
RefCountedPtr < ServiceConfig > service_config ) {
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until
// after releasing the lock to keep the critical section small.
{
MutexLock lock ( & data_plane_mu_ ) ;
// Update service config.
received_service_config_data_ = true ;
// Old values will be unreffed after lock is released.
retry_throttle_data_ . swap ( retry_throttle_data ) ;
service_config_ . swap ( service_config ) ;
// Apply service config to queued picks.
for ( QueuedPick * pick = queued_picks_ ; pick ! = nullptr ; pick = pick - > next ) {
CallData * calld = static_cast < CallData * > ( pick - > elem - > call_data ) ;
calld - > MaybeApplyServiceConfigToCallLocked ( pick - > elem ) ;
}
}
// Old values will be unreffed after lock is released when they go out
// of scope.
}
void ChannelData : : CreateResolvingLoadBalancingPolicyLocked ( ) {
// Instantiate resolving LB policy.
LoadBalancingPolicy : : Args lb_args ;
@ -1746,15 +1704,20 @@ bool ChannelData::ProcessResolverResultLocked(
// if we feel it is unnecessary.
if ( service_config_changed | | ! chand - > received_first_resolver_result_ ) {
chand - > received_first_resolver_result_ = true ;
Optional < internal : : ClientChannelGlobalParsedConfig : : RetryThrottling >
retry_throttle_data ;
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ;
if ( parsed_service_config ! = nullptr ) {
retry_throttle_data = parsed_service_config - > retry_throttling ( ) ;
Optional < internal : : ClientChannelGlobalParsedConfig : : RetryThrottling >
retry_throttle_config = parsed_service_config - > retry_throttling ( ) ;
if ( retry_throttle_config . has_value ( ) ) {
retry_throttle_data =
internal : : ServerRetryThrottleMap : : GetDataForServer (
chand - > server_name_ . get ( ) ,
retry_throttle_config . value ( ) . max_milli_tokens ,
retry_throttle_config . value ( ) . milli_token_ratio ) ;
}
}
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
New < ServiceConfigSetter > ( chand , retry_throttle_data ,
chand - > saved_service_config_ ) ;
chand - > UpdateServiceConfigLocked ( std : : move ( retry_throttle_data ) ,
chand - > saved_service_config_ ) ;
}
UniquePtr < char > processed_lb_policy_name ;
chand - > ProcessLbPolicy ( result , parsed_service_config ,
@ -1838,8 +1801,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
static_cast < grpc_connectivity_state > ( value ) = = GRPC_CHANNEL_IDLE ) {
if ( chand - > disconnect_error ( ) = = GRPC_ERROR_NONE ) {
// Enter IDLE state.
New < ConnectivityStateAndPickerSetter > ( chand , GRPC_CHANNEL_IDLE ,
" channel entering IDLE " , nullptr ) ;
chand - > UpdateStateAndPickerLocked ( GRPC_CHANNEL_IDLE ,
" channel entering IDLE " , nullptr ) ;
}
GRPC_ERROR_UNREF ( op - > disconnect_with_error ) ;
} else {
@ -1848,8 +1811,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
GRPC_ERROR_NONE ) ;
chand - > disconnect_error_ . Store ( op - > disconnect_with_error ,
MemoryOrder : : RELEASE ) ;
New < ConnectivityStateAndPickerSetter > (
chand , GRPC_CHANNEL_SHUTDOWN , " shutdown from API " ,
chand - > UpdateStateAndPickerLocked (
GRPC_CHANNEL_SHUTDOWN , " shutdown from API " ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > (
New < LoadBalancingPolicy : : TransientFailurePicker > (
GRPC_ERROR_REF ( op - > disconnect_with_error ) ) ) ) ;
@ -2092,8 +2055,8 @@ void CallData::StartTransportStreamOpBatch(
// Add the batch to the pending list.
calld - > PendingBatchesAdd ( elem , batch ) ;
// Check if we've already gotten a subchannel call.
// Note that once we have completed the pick, we do not need to enter
// the channel combiner , which is more efficient (especially for
// Note that once we have picked a subchannel, we do not need to acquire
// the channel's data plane mutex , which is more efficient (especially for
// streaming calls).
if ( calld - > subchannel_call_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
@ -2105,18 +2068,15 @@ void CallData::StartTransportStreamOpBatch(
return ;
}
// We do not yet have a subchannel call.
// For batches containing a send_initial_metadata op, enter the channel
// combiner to start a pick .
// For batches containing a send_initial_metadata op, acquire the
// channel's data plane mutex to pick a subchannel .
if ( GPR_LIKELY ( batch - > send_initial_metadata ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: entering client_channel combiner " ,
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: grabbing data plane mutex to perform pick " ,
chand , calld ) ;
}
GRPC_CLOSURE_SCHED (
GRPC_CLOSURE_INIT (
& batch - > handler_private . closure , StartPickLocked , elem ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ( ) ) ) ,
GRPC_ERROR_NONE ) ;
PickSubchannel ( elem , GRPC_ERROR_NONE ) ;
} else {
// For all other batches, release the call combiner.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
@ -2544,8 +2504,8 @@ void CallData::DoRetry(grpc_call_element* elem,
this , next_attempt_time - ExecCtx : : Get ( ) - > Now ( ) ) ;
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT ( & pick_closure_ , StartPickLocked , elem ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ( ) ) ) ;
GRPC_CLOSURE_INIT ( & pick_closure_ , PickSubchannel , elem ,
grpc_schedule_on_exec_ctx ) ;
grpc_timer_init ( & retry_timer_ , next_attempt_time , & pick_closure_ ) ;
// Update bookkeeping.
if ( retry_state ! = nullptr ) retry_state - > retry_dispatched = true ;
@ -3660,6 +3620,11 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
}
}
void CallData : : AsyncPickDone ( grpc_call_element * elem , grpc_error * error ) {
GRPC_CLOSURE_INIT ( & pick_closure_ , PickDone , elem , grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_SCHED ( & pick_closure_ , error ) ;
}
void CallData : : PickDone ( void * arg , grpc_error * error ) {
grpc_call_element * elem = static_cast < grpc_call_element * > ( arg ) ;
ChannelData * chand = static_cast < ChannelData * > ( elem - > channel_data ) ;
@ -3682,10 +3647,9 @@ class CallData::QueuedPickCanceller {
public :
explicit QueuedPickCanceller ( grpc_call_element * elem ) : elem_ ( elem ) {
auto * calld = static_cast < CallData * > ( elem - > call_data ) ;
auto * chand = static_cast < ChannelData * > ( elem - > channel_data ) ;
GRPC_CALL_STACK_REF ( calld - > owning_call_ , " QueuedPickCanceller " ) ;
GRPC_CLOSURE_INIT ( & closure_ , & CancelLocked , this ,
grpc_combiner_scheduler ( chand - > data_plane_combiner ( ) ) ) ;
grpc_schedule_on_exec_ctx ) ;
calld - > call_combiner_ - > SetNotifyOnCancel ( & closure_ ) ;
}
@ -3694,6 +3658,7 @@ class CallData::QueuedPickCanceller {
auto * self = static_cast < QueuedPickCanceller * > ( arg ) ;
auto * chand = static_cast < ChannelData * > ( self - > elem_ - > channel_data ) ;
auto * calld = static_cast < CallData * > ( self - > elem_ - > call_data ) ;
MutexLock lock ( chand - > data_plane_mu ( ) ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: cancelling queued pick: "
@ -3765,7 +3730,7 @@ void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
// from the client API, reset the deadline timer.
if ( chand - > deadline_checking_enabled ( ) & & method_params_ - > timeout ( ) ! = 0 ) {
const grpc_millis per_method_deadline =
grpc_timespec _to_millis_round_up ( call_start_time_ ) +
grpc_cycle_counter _to_millis_round_up ( call_start_time_ ) +
method_params_ - > timeout ( ) ;
if ( per_method_deadline < deadline_ ) {
deadline_ = per_method_deadline ;
@ -3818,23 +3783,38 @@ const char* PickResultTypeName(
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
void CallData : : StartPickLocked ( void * arg , grpc_error * error ) {
void CallData : : PickSubchannel ( void * arg , grpc_error * error ) {
grpc_call_element * elem = static_cast < grpc_call_element * > ( arg ) ;
CallData * calld = static_cast < CallData * > ( elem - > call_data ) ;
ChannelData * chand = static_cast < ChannelData * > ( elem - > channel_data ) ;
GPR_ASSERT ( calld - > connected_subchannel_ = = nullptr ) ;
GPR_ASSERT ( calld - > subchannel_call_ = = nullptr ) ;
// picker's being null means the channel is currently in IDLE state. The
// incoming call will make the channel exit IDLE and queue itself.
bool pick_complete ;
{
MutexLock lock ( chand - > data_plane_mu ( ) ) ;
pick_complete = calld - > PickSubchannelLocked ( elem , & error ) ;
}
if ( pick_complete ) {
PickDone ( elem , error ) ;
GRPC_ERROR_UNREF ( error ) ;
}
}
bool CallData : : PickSubchannelLocked ( grpc_call_element * elem ,
grpc_error * * error ) {
ChannelData * chand = static_cast < ChannelData * > ( elem - > channel_data ) ;
GPR_ASSERT ( connected_subchannel_ = = nullptr ) ;
GPR_ASSERT ( subchannel_call_ = = nullptr ) ;
// The picker being null means that the channel is currently in IDLE state.
// The incoming call will make the channel exit IDLE.
if ( chand - > picker ( ) = = nullptr ) {
// We are currently in the data plane.
// Bounce into the control plane to exit IDLE.
chand - > CheckConnectivityState ( true ) ;
calld - > AddCallToQueuedPicksLocked ( elem ) ;
return ;
// Bounce into the control plane combiner to exit IDLE.
chand - > CheckConnectivityState ( /*try_to_connect=*/ true ) ;
// Queue the pick, so that it will be attempted once the channel
// becomes connected.
AddCallToQueuedPicksLocked ( elem ) ;
return false ;
}
// Apply service config to call if needed.
calld - > MaybeApplyServiceConfigToCallLocked ( elem ) ;
MaybeApplyServiceConfigToCallLocked ( elem ) ;
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
@ -3846,31 +3826,27 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
// subchannel's copy of the metadata batch (which is copied for each
// attempt) to the LB policy instead the one from the parent channel.
LoadBalancingPolicy : : PickArgs pick_args ;
pick_args . call_state = & calld - > lb_call_state_ ;
pick_args . call_state = & lb_call_state_ ;
Metadata initial_metadata (
calld ,
calld - > seen_send_initial_metadata_
? & calld - > send_initial_metadata_
: calld - > pending_batches_ [ 0 ]
this ,
seen_send_initial_metadata_
? & send_initial_metadata_
: pending_batches_ [ 0 ]
. batch - > payload - > send_initial_metadata . send_initial_metadata ) ;
pick_args . initial_metadata = & initial_metadata ;
// Grab initial metadata flags so that we can check later if the call has
// wait_for_ready enabled.
const uint32_t send_initial_metadata_flags =
calld - > seen_send_initial_metadata_
? calld - > send_initial_metadata_flags_
: calld - > pending_batches_ [ 0 ]
. batch - > payload - > send_initial_metadata
. send_initial_metadata_flags ;
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT ( & calld - > pick_closure_ , PickDone , elem ,
grpc_schedule_on_exec_ctx ) ;
seen_send_initial_metadata_ ? send_initial_metadata_flags_
: pending_batches_ [ 0 ]
. batch - > payload - > send_initial_metadata
. send_initial_metadata_flags ;
// Attempt pick.
auto result = chand - > picker ( ) - > Pick ( pick_args ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s) " ,
chand , calld , PickResultTypeName ( result . type ) ,
chand , this , PickResultTypeName ( result . type ) ,
result . subchannel . get ( ) , grpc_error_string ( result . error ) ) ;
}
switch ( result . type ) {
@ -3879,10 +3855,9 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
grpc_error * disconnect_error = chand - > disconnect_error ( ) ;
if ( disconnect_error ! = GRPC_ERROR_NONE ) {
GRPC_ERROR_UNREF ( result . error ) ;
GRPC_CLOSURE_SCHED ( & calld - > pick_closure_ ,
GRPC_ERROR_REF ( disconnect_error ) ) ;
if ( calld - > pick_queued_ ) calld - > RemoveCallFromQueuedPicksLocked ( elem ) ;
break ;
if ( pick_queued_ ) RemoveCallFromQueuedPicksLocked ( elem ) ;
* error = GRPC_ERROR_REF ( disconnect_error ) ;
return true ;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
@ -3890,19 +3865,20 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
GRPC_INITIAL_METADATA_WAIT_FOR_READY ) = = 0 ) {
// Retry if appropriate; otherwise, fail.
grpc_status_code status = GRPC_STATUS_OK ;
grpc_error_get_status ( result . error , calld - > deadline_ , & status , nullptr ,
grpc_error_get_status ( result . error , deadline_ , & status , nullptr ,
nullptr , nullptr ) ;
if ( ! calld - > enable_retries_ | |
! calld - > MaybeRetry ( elem , nullptr /* batch_data */ , status ,
nullptr /* server_pushback_md */ ) ) {
const bool retried = enable_retries_ & &
MaybeRetry ( elem , nullptr /* batch_data */ , status ,
nullptr /* server_pushback_md */ ) ;
if ( ! retried ) {
grpc_error * new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING (
" Failed to pick subchannel " , & result . error , 1 ) ;
GRPC_ERROR_UNREF ( result . error ) ;
GRPC_CLOSURE_SCHED ( & calld - > pick_closure_ , new_error ) ;
* error = new_error ;
}
if ( calld - > pick_queued_ ) calld - > RemoveCallFromQueuedPicksLocked ( elem ) ;
break ;
if ( pick_queued_ ) RemoveCallFromQueuedPicksLocked ( elem ) ;
return ! retried ;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
@ -3910,26 +3886,26 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
}
// Fallthrough
case LoadBalancingPolicy : : PickResult : : PICK_QUEUE :
if ( ! calld - > pick_queued_ ) calld - > AddCallToQueuedPicksLocked ( elem ) ;
break ;
if ( ! pick_queued_ ) AddCallToQueuedPicksLocked ( elem ) ;
return false ;
default : // PICK_COMPLETE
if ( pick_queued_ ) RemoveCallFromQueuedPicksLocked ( elem ) ;
// Handle drops.
if ( GPR_UNLIKELY ( result . subchannel = = nullptr ) ) {
result . error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Call dropped by load balancing policy " ) ;
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane combiner .
calld - > c onnected_subchannel_ =
// holding the data plane mutex .
connected_subchannel_ =
chand - > GetConnectedSubchannelInDataPlane ( result . subchannel . get ( ) ) ;
GPR_ASSERT ( calld - > c onnected_subchannel_ ! = nullptr ) ;
GPR_ASSERT ( connected_subchannel_ ! = nullptr ) ;
}
calld - > lb_recv_trailing_metadata_ready_ =
result . recv_trailing_metadata_ready ;
calld - > lb_recv_trailing_metadata_ready_user_data_ =
lb_recv_trailing_metadata_ready_ = result . recv_trailing_metadata_ready ;
lb_recv_trailing_metadata_ready_user_data_ =
result . recv_trailing_metadata_ready_user_data ;
GRPC_CLOSURE_SCHED ( & calld - > pick_closure_ , result . error ) ;
if ( calld - > pick_queued_ ) calld - > RemoveCallFromQueuedPicksLocked ( elem ) ;
* error = result . error ;
return true ;
}
}