@ -113,10 +113,6 @@ namespace grpc_core {
using internal : : ClientChannelMethodParsedConfig ;
TraceFlag grpc_client_channel_trace ( false , " client_channel " ) ;
TraceFlag grpc_client_channel_call_trace ( false , " client_channel_call " ) ;
TraceFlag grpc_client_channel_lb_call_trace ( false , " client_channel_lb_call " ) ;
//
// ClientChannelFilter::CallData definition
//
@ -411,11 +407,9 @@ class DynamicTerminationFilter::CallData final {
args , pollent , nullptr ,
[ service_config_call_data ] ( ) { service_config_call_data - > Commit ( ) ; } ,
/*is_transparent_retry=*/ false ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p dynamic_termination_calld=%p: create lb_call=%p " , chand ,
client_channel , calld - > lb_call_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " dynamic_termination_calld= " < < client_channel
< < " : create lb_call= " < < calld - > lb_call_ . get ( ) ;
}
private :
@ -466,9 +460,8 @@ class ClientChannelFilter::ResolverResultHandler final
}
~ ResolverResultHandler ( ) override {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: resolver shutdown complete " , chand_ ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < chand_ < < " : resolver shutdown complete " ;
GRPC_CHANNEL_STACK_UNREF ( chand_ - > owning_stack_ , " ResolverResultHandler " ) ;
}
@ -498,16 +491,14 @@ class ClientChannelFilter::SubchannelWrapper final
public :
SubchannelWrapper ( ClientChannelFilter * chand ,
RefCountedPtr < Subchannel > subchannel )
: SubchannelInterface ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_trace )
: SubchannelInterface ( GRPC_TRACE_FLAG_ENABLED ( client_channel )
? " SubchannelWrapper "
: nullptr ) ,
chand_ ( chand ) ,
subchannel_ ( std : : move ( subchannel ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: creating subchannel wrapper %p for subchannel %p " ,
chand , this , subchannel_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < chand < < " : creating subchannel wrapper " < < this
< < " for subchannel " < < subchannel_ . get ( ) ;
GRPC_CHANNEL_STACK_REF ( chand_ - > owning_stack_ , " SubchannelWrapper " ) ;
# ifndef NDEBUG
DCHECK ( chand_ - > work_serializer_ - > RunningInWorkSerializer ( ) ) ;
@ -528,11 +519,9 @@ class ClientChannelFilter::SubchannelWrapper final
}
~ SubchannelWrapper ( ) override {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: destroying subchannel wrapper %p for subchannel %p " ,
chand_ , this , subchannel_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < chand_ < < " : destroying subchannel wrapper " < < this
< < " for subchannel " < < subchannel_ . get ( ) ;
if ( ! IsWorkSerializerDispatchEnabled ( ) ) {
chand_ - > subchannel_wrappers_ . erase ( this ) ;
if ( chand_ - > channelz_node_ ! = nullptr ) {
@ -665,12 +654,11 @@ class ClientChannelFilter::SubchannelWrapper final
void OnConnectivityStateChange (
RefCountedPtr < ConnectivityStateWatcherInterface > self ,
grpc_connectivity_state state , const absl : : Status & status ) override {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: connectivity change for subchannel wrapper %p "
" subchannel %p; hopping into work_serializer " ,
parent_ - > chand_ , parent_ . get ( ) , parent_ - > subchannel_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < parent_ - > chand_
< < " : connectivity change for subchannel wrapper " < < parent_ . get ( )
< < " subchannel " < < parent_ - > subchannel_ . get ( )
< < " hopping into work_serializer " ;
self . release ( ) ; // Held by callback.
parent_ - > chand_ - > work_serializer_ - > Run (
[ this , state , status ] ( ) ABSL_EXCLUSIVE_LOCKS_REQUIRED (
@ -689,15 +677,13 @@ class ClientChannelFilter::SubchannelWrapper final
void ApplyUpdateInControlPlaneWorkSerializer ( grpc_connectivity_state state ,
const absl : : Status & status )
ABSL_EXCLUSIVE_LOCKS_REQUIRED ( * parent_ - > chand_ - > work_serializer_ ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: processing connectivity change in work serializer "
" for subchannel wrapper %p subchannel %p watcher=%p "
" state=%s status=%s " ,
parent_ - > chand_ , parent_ . get ( ) , parent_ - > subchannel_ . get ( ) ,
watcher_ . get ( ) , ConnectivityStateName ( state ) ,
status . ToString ( ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < parent_ - > chand_
< < " : processing connectivity change in work serializer for "
" subchannel wrapper "
< < parent_ . get ( ) < < " subchannel " < < parent_ - > subchannel_ . get ( )
< < " watcher= " < < watcher_ . get ( )
< < " state= " < < ConnectivityStateName ( state ) < < " status= " < < status ;
absl : : optional < absl : : Cord > keepalive_throttling =
status . GetPayload ( kKeepaliveThrottlingKey ) ;
if ( keepalive_throttling . has_value ( ) ) {
@ -706,10 +692,10 @@ class ClientChannelFilter::SubchannelWrapper final
& new_keepalive_time ) ) {
if ( new_keepalive_time > parent_ - > chand_ - > keepalive_time_ ) {
parent_ - > chand_ - > keepalive_time_ = new_keepalive_time ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: throttling keepalive time to %d " ,
parent_ - > chand_ , parent_ - > chand_ - > keepalive_time_ ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < parent_ - > chand_
< < " : throttling keepalive time to "
< < parent_ - > chand_ - > keepalive_time_ ;
// Propagate the new keepalive time to all subchannels. This is so
// that new transports created by any subchannel (and not just the
// subchannel that received the GOAWAY), use the new keepalive time.
@ -992,14 +978,13 @@ class ClientChannelFilter::ClientChannelControlHelper final
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > picker )
override ABSL_EXCLUSIVE_LOCKS_REQUIRED ( * chand_ - > work_serializer_ ) {
if ( chand_ - > resolver_ = = nullptr ) return ; // Shutting down.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
const char * extra = chand_ - > disconnect_error_ . ok ( )
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < chand_
< < " : update: state= " < < ConnectivityStateName ( state ) < < " status=( "
< < status < < " ) picker= " < < picker . get ( )
< < ( chand_ - > disconnect_error_ . ok ( )
? " "
: " (ignoring -- channel shutting down) " ;
gpr_log ( GPR_INFO , " chand=%p: update: state=%s status=(%s) picker=%p%s " ,
chand_ , ConnectivityStateName ( state ) , status . ToString ( ) . c_str ( ) ,
picker . get ( ) , extra ) ;
}
: " (ignoring -- channel shutting down) " ) ;
// Do update only if not shutting down.
if ( chand_ - > disconnect_error_ . ok ( ) ) {
chand_ - > UpdateStateAndPickerLocked ( state , status , " helper " ,
@ -1010,9 +995,8 @@ class ClientChannelFilter::ClientChannelControlHelper final
void RequestReresolution ( ) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED ( * chand_ - > work_serializer_ ) {
if ( chand_ - > resolver_ = = nullptr ) return ; // Shutting down.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: started name re-resolving " , chand_ ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < chand_ < < " : started name re-resolving " ;
chand_ - > resolver_ - > RequestReresolutionLocked ( ) ;
}
@ -1104,10 +1088,9 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
std : : make_shared < WorkSerializer > ( * args - > channel_stack - > event_engine ) ) ,
state_tracker_ ( " client_channel " , GRPC_CHANNEL_IDLE ) ,
subchannel_pool_ ( GetSubchannelPool ( channel_args_ ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: creating client_channel for channel stack %p " ,
this , owning_stack_ ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : creating client_channel for channel stack "
< < owning_stack_ ;
// Start backup polling.
grpc_client_channel_start_backup_polling ( interested_parties_ ) ;
// Check client channel factory.
@ -1176,9 +1159,8 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
}
ClientChannelFilter : : ~ ClientChannelFilter ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: destroying channel " , this ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : destroying channel " ;
DestroyResolverAndLbPolicyLocked ( ) ;
// Stop backup polling.
grpc_client_channel_stop_backup_polling ( interested_parties_ ) ;
@ -1270,9 +1252,8 @@ void ClientChannelFilter::OnResolverResultChangedLocked(
Resolver : : Result result ) {
// Handle race conditions.
if ( resolver_ = = nullptr ) return ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: got resolver result " , this ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : got resolver result " ;
// Grab resolver result health callback.
auto resolver_callback = std : : move ( result . result_health_callback ) ;
absl : : Status resolver_result_status ;
@ -1306,19 +1287,16 @@ void ClientChannelFilter::OnResolverResultChangedLocked(
RefCountedPtr < ServiceConfig > service_config ;
RefCountedPtr < ConfigSelector > config_selector ;
if ( ! result . service_config . ok ( ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: resolver returned service config error: %s " ,
this , result . service_config . status ( ) . ToString ( ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : resolver returned service config error: "
< < result . service_config . status ( ) ;
// If the service config was invalid, then fallback to the
// previously returned service config.
if ( saved_service_config_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: resolver returned invalid service config. "
" Continuing to use previous service config. " ,
this ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this
< < " : resolver returned invalid service config. "
" Continuing to use previous service config. " ;
service_config = saved_service_config_ ;
config_selector = saved_config_selector_ ;
} else {
@ -1332,12 +1310,10 @@ void ClientChannelFilter::OnResolverResultChangedLocked(
}
} else if ( * result . service_config = = nullptr ) {
// Resolver did not return any service config.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: resolver returned no service config. Using default "
" service config for channel. " ,
this ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this
< < " : resolver returned no service config. Using default service "
" config for channel. " ;
service_config = default_service_config_ ;
} else {
// Use ServiceConfig and ConfigSelector returned by resolver.
@ -1369,8 +1345,9 @@ void ClientChannelFilter::OnResolverResultChangedLocked(
UpdateServiceConfigInControlPlaneLocked (
std : : move ( service_config ) , std : : move ( config_selector ) ,
std : : string ( lb_policy_config - > name ( ) ) ) ;
} else if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: service config not changed " , this ) ;
} else {
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : service config not changed " ;
}
// Create or update LB policy, as needed.
resolver_result_status = CreateOrUpdateLbPolicyLocked (
@ -1404,10 +1381,8 @@ void ClientChannelFilter::OnResolverResultChangedLocked(
void ClientChannelFilter : : OnResolverErrorLocked ( absl : : Status status ) {
if ( resolver_ = = nullptr ) return ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: resolver transient failure: %s " , this ,
status . ToString ( ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : resolver transient failure: " < < status ;
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
@ -1453,10 +1428,8 @@ absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked(
lb_policy_ = CreateLbPolicyLocked ( update_args . args ) ;
}
// Update the policy.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: Updating child policy %p " , this ,
lb_policy_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : Updating child policy " < < lb_policy_ . get ( ) ;
return lb_policy_ - > UpdateLocked ( std : : move ( update_args ) ) ;
}
@ -1478,11 +1451,9 @@ OrphanablePtr<LoadBalancingPolicy> ClientChannelFilter::CreateLbPolicyLocked(
lb_policy_args . args = args ;
OrphanablePtr < LoadBalancingPolicy > lb_policy =
MakeOrphanable < ChildPolicyHandler > ( std : : move ( lb_policy_args ) ,
& grpc_client_channel_trace ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: created new LB policy %p " , this ,
lb_policy . get ( ) ) ;
}
& client_channel_trace ) ;
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : created new LB policy " < < lb_policy . get ( ) ;
grpc_pollset_set_add_pollset_set ( lb_policy - > interested_parties ( ) ,
interested_parties_ ) ;
return lb_policy ;
@ -1492,10 +1463,9 @@ void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr < ServiceConfig > service_config ,
RefCountedPtr < ConfigSelector > config_selector , std : : string lb_policy_name ) {
std : : string service_config_json ( service_config - > json_string ( ) ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: using service config: \" %s \" " , this ,
service_config_json . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : using service config: \" " < < service_config_json
< < " \" " ;
// Save service config.
saved_service_config_ = std : : move ( service_config ) ;
// Swap out the data used by GetChannelInfo().
@ -1506,10 +1476,9 @@ void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
}
// Save config selector.
saved_config_selector_ = std : : move ( config_selector ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: using ConfigSelector %p " , this ,
saved_config_selector_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : using ConfigSelector "
< < saved_config_selector_ . get ( ) ;
}
void ClientChannelFilter : : UpdateServiceConfigInDataPlaneLocked ( ) {
@ -1517,10 +1486,9 @@ void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked() {
RefCountedPtr < ServiceConfig > service_config = saved_service_config_ ;
// Grab ref to config selector. Use default if resolver didn't supply one.
RefCountedPtr < ConfigSelector > config_selector = saved_config_selector_ ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: switching to ConfigSelector %p " , this ,
saved_config_selector_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : switching to ConfigSelector "
< < saved_config_selector_ . get ( ) ;
if ( config_selector = = nullptr ) {
config_selector =
MakeRefCounted < DefaultConfigSelector > ( saved_service_config_ ) ;
@ -1562,10 +1530,9 @@ void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked() {
}
void ClientChannelFilter : : CreateResolverLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: starting name resolution for %s " , this ,
uri_to_resolve_ . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : starting name resolution for "
< < uri_to_resolve_ ;
resolver_ = CoreConfiguration : : Get ( ) . resolver_registry ( ) . CreateResolver (
uri_to_resolve_ , channel_args_ , interested_parties_ , work_serializer_ ,
std : : make_unique < ResolverResultHandler > ( this ) ) ;
@ -1575,17 +1542,14 @@ void ClientChannelFilter::CreateResolverLocked() {
UpdateStateLocked ( GRPC_CHANNEL_CONNECTING , absl : : Status ( ) ,
" started resolving " ) ;
resolver_ - > StartLocked ( ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: created resolver=%p " , this , resolver_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : created resolver= " < < resolver_ . get ( ) ;
}
void ClientChannelFilter : : DestroyResolverAndLbPolicyLocked ( ) {
if ( resolver_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: shutting down resolver=%p " , this ,
resolver_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : shutting down resolver= " < < resolver_ . get ( ) ;
resolver_ . reset ( ) ;
// Clear resolution state.
saved_service_config_ . reset ( ) ;
@ -1605,10 +1569,9 @@ void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() {
}
// Clear LB policy if set.
if ( lb_policy_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: shutting down lb_policy=%p " , this ,
lb_policy_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this
< < " : shutting down lb_policy= " < < lb_policy_ . get ( ) ;
grpc_pollset_set_del_pollset_set ( lb_policy_ - > interested_parties ( ) ,
interested_parties_ ) ;
lb_policy_ . reset ( ) ;
@ -1754,10 +1717,9 @@ void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) {
}
// Disconnect or enter IDLE.
if ( ! op - > disconnect_with_error . ok ( ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p: disconnect_with_error: %s " , this ,
StatusToString ( op - > disconnect_with_error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel , INFO )
< < " chand= " < < this < < " : disconnect_with_error: "
< < StatusToString ( op - > disconnect_with_error ) ;
DestroyResolverAndLbPolicyLocked ( ) ;
intptr_t value ;
if ( grpc_error_get_int ( op - > disconnect_with_error ,
@ -1862,11 +1824,9 @@ void ClientChannelFilter::RemoveConnectivityWatcher(
//
void ClientChannelFilter : : CallData : : RemoveCallFromResolverQueuedCallsLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: removing from resolver queued picks list " ,
chand ( ) , this ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : removing from resolver queued picks list " ;
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set ( pollent ( ) ,
chand ( ) - > interested_parties_ ) ;
@ -1877,12 +1837,10 @@ void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() {
}
void ClientChannelFilter : : CallData : : AddCallToResolverQueuedCallsLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log (
GPR_INFO ,
" chand=%p calld=%p: adding to resolver queued picks list; pollent=%s " ,
chand ( ) , this , grpc_polling_entity_string ( pollent ( ) ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : adding to resolver queued picks list; pollent= "
< < grpc_polling_entity_string ( pollent ( ) ) ;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set ( pollent ( ) ,
@ -1894,10 +1852,9 @@ void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() {
grpc_error_handle ClientChannelFilter : : CallData : : ApplyServiceConfigToCallLocked (
const absl : : StatusOr < RefCountedPtr < ConfigSelector > > & config_selector ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: applying service config to call " ,
chand ( ) , this ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : applying service config to call " ;
if ( ! config_selector . ok ( ) ) return config_selector . status ( ) ;
// Create a ClientChannelServiceConfigCallData for the call. This stores
// a ref to the ServiceConfig and caches the right set of parsed configs
@ -1962,11 +1919,9 @@ absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
}
// Handle errors.
if ( ! error . ok ( ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: error applying config to call: error=%s " ,
chand ( ) , this , StatusToString ( error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : error applying config to call: error= " < < StatusToString ( error ) ;
return error ;
}
// If the call was queued, add trace annotation.
@ -1989,20 +1944,18 @@ bool ClientChannelFilter::CallData::CheckResolutionLocked(
absl : : Status resolver_error = chand ( ) - > resolver_transient_failure_error_ ;
if ( ! resolver_error . ok ( ) & &
! send_initial_metadata ( ) - > GetOrCreatePointer ( WaitForReady ( ) ) - > value ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: resolution failed, failing call " ,
chand ( ) , this ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : resolution failed, failing call " ;
* config_selector = absl_status_to_grpc_error ( resolver_error ) ;
return true ;
}
// Either the resolver has not yet returned a result, or it has
// returned transient failure but the call is wait_for_ready. In
// either case, queue the call.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: no resolver result yet " , chand ( ) ,
this ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : no resolver result yet " ;
return false ;
}
// Result found.
@ -2024,9 +1977,8 @@ ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
elem_ ( elem ) ,
owning_call_ ( args . call_stack ) ,
call_combiner_ ( args . call_combiner ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: created call " , chand ( ) , this ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this < < " : created call " ;
}
ClientChannelFilter : : FilterBasedCallData : : ~ FilterBasedCallData ( ) {
@ -2062,8 +2014,8 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
grpc_call_element * elem , grpc_transport_stream_op_batch * batch ) {
auto * calld = static_cast < FilterBasedCallData * > ( elem - > call_data ) ;
auto * chand = static_cast < ClientChannelFilter * > ( elem - > channel_data ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_call_trace ) & &
! GRPC_TRACE_FLAG_ENABLED ( grpc_trace_ channel) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_call ) & &
! GRPC_TRACE_FLAG_ENABLED ( channel ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: batch started from above: %s " , chand ,
calld , grpc_transport_stream_op_batch_string ( batch , false ) . c_str ( ) ) ;
}
@ -2082,10 +2034,9 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
// Note that once we have done so, we do not need to acquire the channel's
// resolution mutex, which is more efficient (especially for streaming calls).
if ( calld - > dynamic_call_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: starting batch on dynamic_call=%p " ,
chand , calld , calld - > dynamic_call_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : starting batch on dynamic_call= " < < calld - > dynamic_call_ . get ( ) ;
calld - > dynamic_call_ - > StartTransportStreamOpBatch ( batch ) ;
return ;
}
@ -2093,10 +2044,10 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
//
// If we've previously been cancelled, immediately fail any new batches.
if ( GPR_UNLIKELY ( ! calld - > cancel_error_ . ok ( ) ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: failing batch with error: %s " ,
chand , calld , StatusToString ( calld - > cancel_error_ ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : failing batch with error: "
< < StatusToString ( calld - > cancel_error_ ) ;
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure (
batch , calld - > cancel_error_ , calld - > call_combiner ( ) ) ;
@ -2110,10 +2061,9 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
// is in the past when the call starts), we can return the right
// error to the caller when the first batch does get passed down.
calld - > cancel_error_ = batch - > payload - > cancel_stream . cancel_error ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: recording cancel_error=%s " , chand ,
calld , StatusToString ( calld - > cancel_error_ ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : recording cancel_error= " < < StatusToString ( calld - > cancel_error_ ) ;
// Fail all pending batches.
calld - > PendingBatchesFail ( calld - > cancel_error_ , NoYieldCallCombiner ) ;
// Note: This will release the call combiner.
@ -2127,19 +2077,15 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
// channel's resolution mutex to apply the service config to the call,
// after which we will create a dynamic call.
if ( GPR_LIKELY ( batch - > send_initial_metadata ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: grabbing resolution mutex to apply service "
" config " ,
chand , calld ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : grabbing resolution mutex to apply service " ;
// If we're still in IDLE, we need to start resolving.
if ( GPR_UNLIKELY ( chand - > CheckConnectivityState ( false ) = =
GRPC_CHANNEL_IDLE ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p calld=%p: triggering exit idle " , chand ,
calld ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : triggering exit idle " ;
// Bounce into the control plane work serializer to start resolving.
GRPC_CHANNEL_STACK_REF ( chand - > owning_stack_ , " ExitIdle " ) ;
chand - > work_serializer_ - > Run (
@ -2152,11 +2098,9 @@ void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
calld - > TryCheckResolution ( /*was_queued=*/ false ) ;
} else {
// For all other batches, release the call combiner.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: saved batch, yielding call combiner " , chand ,
calld ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : saved batch, yielding call combiner " ;
GRPC_CALL_COMBINER_STOP ( calld - > call_combiner ( ) ,
" batch does not include send_initial_metadata " ) ;
}
@ -2185,11 +2129,9 @@ size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex(
void ClientChannelFilter : : FilterBasedCallData : : PendingBatchesAdd (
grpc_transport_stream_op_batch * batch ) {
const size_t idx = GetBatchIndex ( batch ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: adding pending batch at index % " PRIuPTR ,
chand ( ) , this , idx ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : adding pending batch at index " < < idx ;
grpc_transport_stream_op_batch * & pending = pending_batches_ [ idx ] ;
CHECK_EQ ( pending , nullptr ) ;
pending = batch ;
@ -2212,7 +2154,7 @@ void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail(
grpc_error_handle error ,
YieldCallCombinerPredicate yield_call_combiner_predicate ) {
CHECK ( ! error . ok ( ) ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_call_trace ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_call ) ) {
size_t num_batches = 0 ;
for ( size_t i = 0 ; i < GPR_ARRAY_SIZE ( pending_batches_ ) ; + + i ) {
if ( pending_batches_ [ i ] ! = nullptr ) + + num_batches ;
@ -2255,7 +2197,7 @@ void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner(
// This is called via the call combiner, so access to calld is synchronized.
void ClientChannelFilter : : FilterBasedCallData : : PendingBatchesResume ( ) {
// Retries not enabled; send down batches as-is.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_call_trace ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_call ) ) {
size_t num_batches = 0 ;
for ( size_t i = 0 ; i < GPR_ARRAY_SIZE ( pending_batches_ ) ; + + i ) {
if ( pending_batches_ [ i ] ! = nullptr ) + + num_batches ;
@ -2301,13 +2243,13 @@ class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller
auto * chand = calld - > chand ( ) ;
{
MutexLock lock ( & chand - > resolution_mu_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p : cancelling resolver queued pick: "
" error=%s self=%p calld->resolver_pick_canceller=%p " ,
chand , calld , StatusToString ( error ) . c_str ( ) , self ,
calld - > resolver_call_canceller_ ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : cancelling resolver queued pick: "
" error= "
< < StatusToString ( error ) < < " self= " < < self
< < " calld->resolver_pick_canceller= "
< < calld - > resolver_call_canceller_ ;
if ( calld - > resolver_call_canceller_ = = self & & ! error . ok ( ) ) {
// Remove pick from list of queued picks.
calld - > RemoveCallFromResolverQueuedCallsLocked ( ) ;
@ -2360,19 +2302,14 @@ void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() {
call_combiner ( ) } ;
grpc_error_handle error ;
DynamicFilters * channel_stack = args . channel_stack . get ( ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log (
GPR_INFO ,
" chand=%p calld=%p: creating dynamic call stack on channel_stack=%p " ,
chand ( ) , this , channel_stack ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : creating dynamic call stack on channel_stack= " < < channel_stack ;
dynamic_call_ = channel_stack - > CreateCall ( std : : move ( args ) , & error ) ;
if ( ! error . ok ( ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: failed to create dynamic call: error=%s " ,
chand ( ) , this , StatusToString ( error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand ( ) < < " calld= " < < this
< < " : failed to create dynamic call: error= " < < StatusToString ( error ) ;
PendingBatchesFail ( error , YieldCallCombiner ) ;
return ;
}
@ -2385,13 +2322,10 @@ void ClientChannelFilter::FilterBasedCallData::
auto * calld = static_cast < FilterBasedCallData * > ( arg ) ;
auto * chand = calld - > chand ( ) ;
auto * service_config_call_data = GetServiceConfigCallData ( calld - > arena ( ) ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
" service_config_call_data=%p " ,
chand , calld , StatusToString ( error ) . c_str ( ) ,
service_config_call_data ) ;
}
GRPC_TRACE_LOG ( client_channel_call , INFO )
< < " chand= " < < chand < < " calld= " < < calld
< < " : got recv_trailing_metadata_ready: error= " < < StatusToString ( error )
< < " service_config_call_data= " < < service_config_call_data ;
if ( service_config_call_data ! = nullptr ) {
service_config_call_data - > Commit ( ) ;
}
@ -2578,17 +2512,15 @@ void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) {
ClientChannelFilter : : LoadBalancedCall : : LoadBalancedCall (
ClientChannelFilter * chand , Arena * arena ,
absl : : AnyInvocable < void ( ) > on_commit , bool is_transparent_retry )
: InternallyRefCounted (
GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace )
: InternallyRefCounted ( GRPC_TRACE_FLAG_ENABLED ( client_channel_lb_call )
? " LoadBalancedCall "
: nullptr ) ,
chand_ ( chand ) ,
on_commit_ ( std : : move ( on_commit ) ) ,
arena_ ( arena ) {
CreateCallAttemptTracer ( arena , is_transparent_retry ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: created " , chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this < < " : created " ;
}
ClientChannelFilter : : LoadBalancedCall : : ~ LoadBalancedCall ( ) {
@ -2629,10 +2561,9 @@ void ClientChannelFilter::LoadBalancedCall::RecordLatency() {
void ClientChannelFilter : : LoadBalancedCall : :
RemoveCallFromLbQueuedCallsLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: removing from queued picks list " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : removing from queued picks list " ;
// Remove pollset_set linkage.
grpc_polling_entity_del_from_pollset_set ( pollent ( ) ,
chand_ - > interested_parties_ ) ;
@ -2643,10 +2574,9 @@ void ClientChannelFilter::LoadBalancedCall::
}
void ClientChannelFilter : : LoadBalancedCall : : AddCallToLbQueuedCallsLocked ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: adding to queued picks list " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : adding to queued picks list " ;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set ( pollent ( ) ,
@ -2688,10 +2618,9 @@ ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
} ;
}
// Grab mutex and take a ref to the picker.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: grabbing LB mutex to get picker " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : grabbing LB mutex to get picker " ;
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > picker ;
{
MutexLock lock ( & chand_ - > lb_mu_ ) ;
@ -2701,17 +2630,15 @@ ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
// TODO(roth): Fix race condition in channel_idle filter and any
// other possible causes of this.
if ( pickers . back ( ) = = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_ERROR , " chand=%p lb_call=%p: picker is null, failing call " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : picker is null, failing call " ;
return absl : : InternalError ( " picker is null -- shouldn't happen " ) ;
}
// Do pick.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: performing pick with picker=%p " ,
chand_ , this , pickers . back ( ) . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : performing pick with picker= " < < pickers . back ( ) . get ( ) ;
grpc_error_handle error ;
bool pick_complete = PickSubchannelImpl ( pickers . back ( ) . get ( ) , & error ) ;
if ( ! pick_complete ) {
@ -2719,11 +2646,9 @@ ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
MutexLock lock ( & chand_ - > lb_mu_ ) ;
// If picker has been swapped out since we grabbed it, try again.
if ( pickers . back ( ) ! = chand_ - > picker_ ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: pick not complete, but picker changed " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : pick not complete, but picker changed " ;
if ( IsWorkSerializerDispatchEnabled ( ) ) {
// Don't unref until after we release the mutex.
old_picker = std : : move ( pickers . back ( ) ) ;
@ -2742,11 +2667,9 @@ ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
}
// If the pick failed, fail the call.
if ( ! error . ok ( ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: failed to pick subchannel: error=%s " ,
chand_ , this , StatusToString ( error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : failed to pick subchannel: error= " < < StatusToString ( error ) ;
return error ;
}
// Pick succeeded.
@ -2772,11 +2695,10 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
& result ,
// CompletePick
[ this ] ( LoadBalancingPolicy : : PickResult : : Complete * complete_pick ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: LB pick succeeded: subchannel=%p " ,
chand_ , this , complete_pick - > subchannel . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : LB pick succeeded: subchannel= "
< < complete_pick - > subchannel . get ( ) ;
CHECK ( complete_pick - > subchannel ! = nullptr ) ;
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
@ -2788,12 +2710,10 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
// yet seen that change and given us a new picker), then just
// queue the pick. We'll try again as soon as we get a new picker.
if ( connected_subchannel_ = = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: subchannel returned by LB picker "
" has no connected subchannel; queueing pick " ,
chand_ , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : subchannel returned by LB picker "
" has no connected subchannel; queueing pick " ;
return false ;
}
lb_subchannel_call_tracker_ =
@ -2805,18 +2725,15 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
} ,
// QueuePick
[ this ] ( LoadBalancingPolicy : : PickResult : : Queue * /*queue_pick*/ ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: LB pick queued " , chand_ ,
this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this < < " : LB pick queued " ;
return false ;
} ,
// FailPick
[ this , & error ] ( LoadBalancingPolicy : : PickResult : : Fail * fail_pick ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: LB pick failed: %s " , chand_ ,
this , fail_pick - > status . ToString ( ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : LB pick failed: " < < fail_pick - > status ;
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if ( ! send_initial_metadata ( )
@ -2832,10 +2749,9 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
} ,
// DropPick
[ this , & error ] ( LoadBalancingPolicy : : PickResult : : Drop * drop_pick ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: LB pick dropped: %s " , chand_ ,
this , drop_pick - > status . ToString ( ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand_ < < " lb_call= " < < this
< < " : LB pick dropped: " < < drop_pick - > status ;
* error = grpc_error_set_int (
absl_status_to_grpc_error ( MaybeRewriteIllegalStatusCode (
std : : move ( drop_pick - > status ) , " LB drop " ) ) ,
@ -2901,11 +2817,9 @@ size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex(
void ClientChannelFilter : : FilterBasedLoadBalancedCall : : PendingBatchesAdd (
grpc_transport_stream_op_batch * batch ) {
const size_t idx = GetBatchIndex ( batch ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: adding pending batch at index % " PRIuPTR ,
chand ( ) , this , idx ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : adding pending batch at index " < < idx ;
CHECK_EQ ( pending_batches_ [ idx ] , nullptr ) ;
pending_batches_ [ idx ] = batch ;
}
@ -2928,7 +2842,7 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail(
YieldCallCombinerPredicate yield_call_combiner_predicate ) {
CHECK ( ! error . ok ( ) ) ;
failure_error_ = error ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_lb_call_trace ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_lb_call ) ) {
size_t num_batches = 0 ;
for ( size_t i = 0 ; i < GPR_ARRAY_SIZE ( pending_batches_ ) ; + + i ) {
if ( pending_batches_ [ i ] ! = nullptr ) + + num_batches ;
@ -2970,7 +2884,7 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
// This is called via the call combiner, so access to calld is synchronized.
void ClientChannelFilter : : FilterBasedLoadBalancedCall : : PendingBatchesResume ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_lb_call_trace ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_lb_call ) ) {
size_t num_batches = 0 ;
for ( size_t i = 0 ; i < GPR_ARRAY_SIZE ( pending_batches_ ) ; + + i ) {
if ( pending_batches_ [ i ] ! = nullptr ) + + num_batches ;
@ -2999,8 +2913,8 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() {
void ClientChannelFilter : : FilterBasedLoadBalancedCall : :
StartTransportStreamOpBatch ( grpc_transport_stream_op_batch * batch ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_ client_channel_lb_call_trace ) | |
GRPC_TRACE_FLAG_ENABLED ( grpc_trace_ channel) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( client_channel_lb_call ) | |
GRPC_TRACE_FLAG_ENABLED ( channel ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: batch started from above: %s, "
" call_attempt_tracer()=%p " ,
@ -3054,11 +2968,9 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
// the channel's data plane mutex, which is more efficient (especially for
// streaming calls).
if ( subchannel_call_ ! = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: starting batch on subchannel_call=%p " ,
chand ( ) , this , subchannel_call_ . get ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : starting batch on subchannel_call= " < < subchannel_call_ . get ( ) ;
subchannel_call_ - > StartTransportStreamOpBatch ( batch ) ;
return ;
}
@ -3066,10 +2978,9 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
//
// If we've previously been cancelled, immediately fail any new batches.
if ( GPR_UNLIKELY ( ! cancel_error_ . ok ( ) ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: failing batch with error: %s " ,
chand ( ) , this , StatusToString ( cancel_error_ ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : failing batch with error: " < < StatusToString ( cancel_error_ ) ;
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure ( batch , cancel_error_ ,
call_combiner_ ) ;
@ -3083,10 +2994,9 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
// is in the past when the call starts), we can return the right
// error to the caller when the first batch does get passed down.
cancel_error_ = batch - > payload - > cancel_stream . cancel_error ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO , " chand=%p lb_call=%p: recording cancel_error=%s " ,
chand ( ) , this , StatusToString ( cancel_error_ ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : recording cancel_error= " < < StatusToString ( cancel_error_ ) . c_str ( ) ;
// Fail all pending batches.
PendingBatchesFail ( cancel_error_ , NoYieldCallCombiner ) ;
// Note: This will release the call combiner.
@ -3102,11 +3012,9 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
TryPick ( /*was_queued=*/ false ) ;
} else {
// For all other batches, release the call combiner.
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: saved batch, yielding call combiner " ,
chand ( ) , this ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : saved batch, yielding call combiner " ;
GRPC_CALL_COMBINER_STOP ( call_combiner_ ,
" batch does not include send_initial_metadata " ) ;
}
@ -3115,11 +3023,9 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
void ClientChannelFilter : : FilterBasedLoadBalancedCall : : RecvInitialMetadataReady (
void * arg , grpc_error_handle error ) {
auto * self = static_cast < FilterBasedLoadBalancedCall * > ( arg ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s " ,
self - > chand ( ) , self , StatusToString ( error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < self - > chand ( ) < < " lb_call= " < < self
< < " : got recv_initial_metadata_ready: error= " < < StatusToString ( error ) ;
if ( error . ok ( ) ) {
// recv_initial_metadata_flags is not populated for clients
self - > call_attempt_tracer ( ) - > RecordReceivedInitialMetadata (
@ -3134,15 +3040,12 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
void ClientChannelFilter : : FilterBasedLoadBalancedCall : :
RecvTrailingMetadataReady ( void * arg , grpc_error_handle error ) {
auto * self = static_cast < FilterBasedLoadBalancedCall * > ( arg ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
" call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
" failure_error_=%s " ,
self - > chand ( ) , self , StatusToString ( error ) . c_str ( ) ,
self - > call_attempt_tracer ( ) , self - > lb_subchannel_call_tracker ( ) ,
StatusToString ( self - > failure_error_ ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < self - > chand ( ) < < " lb_call= " < < self
< < " : got recv_trailing_metadata_ready: error= " < < StatusToString ( error )
< < " call_attempt_tracer()= " < < self - > call_attempt_tracer ( )
< < " lb_subchannel_call_tracker_= " < < self - > lb_subchannel_call_tracker ( )
< < " failure_error_= " < < StatusToString ( self - > failure_error_ ) ;
// Check if we have a tracer or an LB callback to invoke.
if ( self - > call_attempt_tracer ( ) ! = nullptr | |
self - > lb_subchannel_call_tracker ( ) ! = nullptr ) {
@ -3210,13 +3113,11 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller
auto * chand = lb_call - > chand ( ) ;
{
MutexLock lock ( & chand - > lb_mu_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: cancelling queued pick: "
" error=%s self=%p calld->pick_canceller=%p " ,
chand , lb_call , StatusToString ( error ) . c_str ( ) , self ,
lb_call - > lb_call_canceller_ ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand < < " lb_call= " < < lb_call
< < " : cancelling queued pick: error= " < < StatusToString ( error )
< < " self= " < < self
< < " calld->pick_canceller= " < < lb_call - > lb_call_canceller_ ;
if ( lb_call - > lb_call_canceller_ = = self & & ! error . ok ( ) ) {
lb_call - > Commit ( ) ;
// Remove pick from list of queued picks.
@ -3298,11 +3199,10 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
arena ( ) , call_combiner_ } ;
grpc_error_handle error ;
subchannel_call_ = SubchannelCall : : Create ( std : : move ( call_args ) , & error ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_lb_call_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p lb_call=%p: create subchannel_call=%p: error=%s " , chand ( ) ,
this , subchannel_call_ . get ( ) , StatusToString ( error ) . c_str ( ) ) ;
}
GRPC_TRACE_LOG ( client_channel_lb_call , INFO )
< < " chand= " < < chand ( ) < < " lb_call= " < < this
< < " : create subchannel_call= " < < subchannel_call_ . get ( )
< < " : error= " < < StatusToString ( error ) ;
if ( on_call_destruction_complete_ ! = nullptr ) {
subchannel_call_ - > SetAfterCallStackDestroy ( on_call_destruction_complete_ ) ;
on_call_destruction_complete_ = nullptr ;