@ -26,6 +26,8 @@
# include <stdio.h>
# include <string.h>
# include <set>
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/string_util.h>
@ -53,7 +55,6 @@
# include "src/core/lib/gprpp/inlined_vector.h"
# include "src/core/lib/gprpp/manual_constructor.h"
# include "src/core/lib/gprpp/map.h"
# include "src/core/lib/gprpp/set.h"
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/iomgr/combiner.h"
# include "src/core/lib/iomgr/iomgr.h"
@ -161,7 +162,7 @@ class ChannelData {
MutexLock lock ( & external_watchers_mu_ ) ;
// Will be deleted when the watch is complete.
GPR_ASSERT ( external_watchers_ [ on_complete ] = = nullptr ) ;
external_watchers_ [ on_complete ] = New < ExternalConnectivityWatcher > (
external_watchers_ [ on_complete ] = new ExternalConnectivityWatcher (
this , pollent , state , on_complete , watcher_timer_init ) ;
}
@ -227,7 +228,7 @@ class ChannelData {
void UpdateStateAndPickerLocked (
grpc_connectivity_state state , const char * reason ,
UniqueP tr< LoadBalancingPolicy : : SubchannelPicker > picker ) ;
std : : unique_p tr< LoadBalancingPolicy : : SubchannelPicker > picker ) ;
void UpdateServiceConfigLocked (
RefCountedPtr < ServerRetryThrottleData > retry_throttle_data ,
@ -251,7 +252,7 @@ class ChannelData {
void ProcessLbPolicy (
const Resolver : : Result & resolver_result ,
const internal : : ClientChannelGlobalParsedConfig * parsed_service_config ,
UniqueP tr< char > * lb_policy_name ,
std : : unique_p tr< char > * lb_policy_name ,
RefCountedPtr < LoadBalancingPolicy : : Config > * lb_policy_config ) ;
//
@ -264,15 +265,15 @@ class ChannelData {
ClientChannelFactory * client_channel_factory_ ;
const grpc_channel_args * channel_args_ ;
RefCountedPtr < ServiceConfig > default_service_config_ ;
UniqueP tr< char > server_name_ ;
UniqueP tr< char > target_uri_ ;
std : : unique_p tr< char > server_name_ ;
std : : unique_p tr< char > target_uri_ ;
channelz : : ChannelNode * channelz_node_ ;
//
// Fields used in the data plane. Guarded by data_plane_mu.
//
mutable Mutex data_plane_mu_ ;
UniqueP tr< LoadBalancingPolicy : : SubchannelPicker > picker_ ;
std : : unique_p tr< LoadBalancingPolicy : : SubchannelPicker > picker_ ;
QueuedPick * queued_picks_ = nullptr ; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data_ = false ;
@ -287,20 +288,20 @@ class ChannelData {
RefCountedPtr < SubchannelPoolInterface > subchannel_pool_ ;
OrphanablePtr < ResolvingLoadBalancingPolicy > resolving_lb_policy_ ;
ConnectivityStateTracker state_tracker_ ;
UniqueP tr< char > health_check_service_name_ ;
std : : unique_p tr< char > health_check_service_name_ ;
RefCountedPtr < ServiceConfig > saved_service_config_ ;
bool received_first_resolver_result_ = false ;
// The number of SubchannelWrapper instances referencing a given Subchannel.
M ap< Subchannel * , int > subchannel_refcount_map_ ;
std : : m ap< Subchannel * , int > subchannel_refcount_map_ ;
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane
// combiner when the SubchannelWrappers are created and destroyed.
S et< SubchannelWrapper * > subchannel_wrappers_ ;
std : : s et< SubchannelWrapper * > subchannel_wrappers_ ;
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
// Updates are queued here in the control plane combiner and then applied
// in the data plane mutex when the picker is updated.
M ap< RefCountedPtr < SubchannelWrapper > , RefCountedPtr < ConnectedSubchannel > ,
RefCountedPtrLess < SubchannelWrapper > >
std : : m ap< RefCountedPtr < SubchannelWrapper > , RefCountedPtr < ConnectedSubchannel > ,
RefCountedPtrLess < SubchannelWrapper > >
pending_subchannel_updates_ ;
//
@ -313,15 +314,15 @@ class ChannelData {
// synchronously via get_channel_info().
//
gpr_mu info_mu_ ;
UniqueP tr< char > info_lb_policy_name_ ;
UniqueP tr< char > info_service_config_json_ ;
std : : unique_p tr< char > info_lb_policy_name_ ;
std : : unique_p tr< char > info_service_config_json_ ;
//
// Fields guarded by a mutex, since they need to be accessed
// synchronously via grpc_channel_num_external_connectivity_watchers().
//
mutable Mutex external_watchers_mu_ ;
M ap< grpc_closure * , ExternalConnectivityWatcher * > external_watchers_ ;
std : : m ap< grpc_closure * , ExternalConnectivityWatcher * > external_watchers_ ;
} ;
//
@ -403,8 +404,9 @@ class CallData {
intptr_t handle ) const override {
grpc_linked_mdelem * linked_mdelem =
reinterpret_cast < grpc_linked_mdelem * > ( handle ) ;
return std : : make_pair ( StringView ( GRPC_MDKEY ( linked_mdelem - > md ) ) ,
StringView ( GRPC_MDVALUE ( linked_mdelem - > md ) ) ) ;
return std : : make_pair (
StringViewFromSlice ( GRPC_MDKEY ( linked_mdelem - > md ) ) ,
StringViewFromSlice ( GRPC_MDVALUE ( linked_mdelem - > md ) ) ) ;
}
CallData * calld_ ;
@ -841,7 +843,7 @@ class CallData {
class ChannelData : : SubchannelWrapper : public SubchannelInterface {
public :
SubchannelWrapper ( ChannelData * chand , Subchannel * subchannel ,
UniqueP tr< char > health_check_service_name )
std : : unique_p tr< char > health_check_service_name )
: SubchannelInterface ( & grpc_client_channel_routing_trace ) ,
chand_ ( chand ) ,
subchannel_ ( subchannel ) ,
@ -896,15 +898,15 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
void WatchConnectivityState (
grpc_connectivity_state initial_state ,
UniqueP tr< ConnectivityStateWatcherInterface > watcher ) override {
std : : unique_p tr< ConnectivityStateWatcherInterface > watcher ) override {
auto & watcher_wrapper = watcher_map_ [ watcher . get ( ) ] ;
GPR_ASSERT ( watcher_wrapper = = nullptr ) ;
watcher_wrapper = New < WatcherWrapper > ( std : : move ( watcher ) ,
Ref ( DEBUG_LOCATION , " WatcherWrapper " ) ,
initial_state ) ;
watcher_wrapper = new WatcherWrapper ( std : : move ( watcher ) ,
Ref ( DEBUG_LOCATION , " WatcherWrapper " ) ,
initial_state ) ;
subchannel_ - > WatchConnectivityState (
initial_state ,
UniqueP tr< char > ( gpr_strdup ( health_check_service_name_ . get ( ) ) ) ,
std : : unique_p tr< char > ( gpr_strdup ( health_check_service_name_ . get ( ) ) ) ,
OrphanablePtr < Subchannel : : ConnectivityStateWatcherInterface > (
watcher_wrapper ) ) ;
}
@ -926,7 +928,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
return subchannel_ - > channel_args ( ) ;
}
void UpdateHealthCheckServiceName ( UniquePtr < char > health_check_service_name ) {
void UpdateHealthCheckServiceName (
std : : unique_ptr < char > health_check_service_name ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
gpr_log ( GPR_INFO ,
" chand=%p: subchannel wrapper %p: updating health check service "
@ -952,7 +955,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
watcher_wrapper = replacement ;
subchannel_ - > WatchConnectivityState (
replacement - > last_seen_state ( ) ,
UniqueP tr< char > ( gpr_strdup ( health_check_service_name . get ( ) ) ) ,
std : : unique_p tr< char > ( gpr_strdup ( health_check_service_name . get ( ) ) ) ,
OrphanablePtr < Subchannel : : ConnectivityStateWatcherInterface > (
replacement ) ) ;
}
@ -992,7 +995,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
class WatcherWrapper : public Subchannel : : ConnectivityStateWatcherInterface {
public :
WatcherWrapper (
UniqueP tr< SubchannelInterface : : ConnectivityStateWatcherInterface >
std : : unique_p tr< SubchannelInterface : : ConnectivityStateWatcherInterface >
watcher ,
RefCountedPtr < SubchannelWrapper > parent ,
grpc_connectivity_state initial_state )
@ -1016,7 +1019,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
connected_subchannel . get ( ) , ConnectivityStateName ( new_state ) ) ;
}
// Will delete itself.
New < Updater > ( Ref ( ) , new_state , std : : move ( connected_subchannel ) ) ;
new Updater ( Ref ( ) , new_state , std : : move ( connected_subchannel ) ) ;
}
grpc_pollset_set * interested_parties ( ) override {
@ -1028,7 +1031,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
WatcherWrapper * MakeReplacement ( ) {
auto * replacement =
New < WatcherWrapper > ( std : : move ( watcher_ ) , parent_ , last_seen_state_ ) ;
new WatcherWrapper ( std : : move ( watcher_ ) , parent_ , last_seen_state_ ) ;
replacement_ = replacement ;
return replacement ;
}
@ -1072,7 +1075,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
self - > parent_ - > parent_ - > MaybeUpdateConnectedSubchannel (
std : : move ( self - > connected_subchannel_ ) ) ;
self - > parent_ - > watcher_ - > OnConnectivityStateChange ( self - > state_ ) ;
Delete ( self ) ;
delete self ;
}
RefCountedPtr < WatcherWrapper > parent_ ;
@ -1081,7 +1084,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
grpc_closure closure_ ;
} ;
UniquePtr < SubchannelInterface : : ConnectivityStateWatcherInterface > watcher_ ;
std : : unique_ptr < SubchannelInterface : : ConnectivityStateWatcherInterface >
watcher_ ;
RefCountedPtr < SubchannelWrapper > parent_ ;
grpc_connectivity_state last_seen_state_ ;
WatcherWrapper * replacement_ = nullptr ;
@ -1110,13 +1114,13 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
ChannelData * chand_ ;
Subchannel * subchannel_ ;
UniqueP tr< char > health_check_service_name_ ;
std : : unique_p tr< char > health_check_service_name_ ;
// Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls
// CancelConnectivityStateWatch() with its watcher, we know the
// corresponding WrapperWatcher to cancel on the underlying subchannel.
M ap< ConnectivityStateWatcherInterface * , WatcherWrapper * > watcher_map_ ;
std : : m ap< ConnectivityStateWatcherInterface * , WatcherWrapper * > watcher_map_ ;
// To be accessed only in the control plane combiner.
RefCountedPtr < ConnectedSubchannel > connected_subchannel_ ;
// To be accessed only in the data plane mutex.
@ -1163,7 +1167,7 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
chand_ - > RemoveExternalConnectivityWatcher ( on_complete_ , /*cancel=*/ false ) ;
// Report new state to the user.
* state_ = state ;
GRPC_CLOSURE_SCHED ( on_complete_ , GRPC_ERROR_NONE ) ;
ExecCtx : : Run ( DEBUG_LOCATION , on_complete_ , GRPC_ERROR_NONE ) ;
// Hop back into the combiner to clean up.
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
@ -1180,7 +1184,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
MemoryOrder : : RELAXED ) ) {
return ; // Already done.
}
GRPC_CLOSURE_SCHED ( on_complete_ , GRPC_ERROR_CANCELLED ) ;
ExecCtx : : Run ( DEBUG_LOCATION , on_complete_ , GRPC_ERROR_CANCELLED ) ;
// Hop back into the combiner to clean up.
chand_ - > combiner_ - > Run (
GRPC_CLOSURE_INIT ( & remove_closure_ , RemoveWatcherLocked , this , nullptr ) ,
@ -1234,7 +1238,7 @@ class ChannelData::ConnectivityWatcherAdder {
std : : move ( self - > watcher_ ) ) ;
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack_ ,
" ConnectivityWatcherAdder " ) ;
Delete ( self ) ;
delete self ;
}
ChannelData * chand_ ;
@ -1267,7 +1271,7 @@ class ChannelData::ConnectivityWatcherRemover {
self - > chand_ - > state_tracker_ . RemoveWatcher ( self - > watcher_ ) ;
GRPC_CHANNEL_STACK_UNREF ( self - > chand_ - > owning_stack_ ,
" ConnectivityWatcherRemover " ) ;
Delete ( self ) ;
delete self ;
}
ChannelData * chand_ ;
@ -1295,7 +1299,7 @@ class ChannelData::ClientChannelControlHelper
const grpc_channel_args & args ) override {
bool inhibit_health_checking = grpc_channel_arg_get_bool (
grpc_channel_args_find ( & args , GRPC_ARG_INHIBIT_HEALTH_CHECKING ) , false ) ;
UniqueP tr< char > health_check_service_name ;
std : : unique_p tr< char > health_check_service_name ;
if ( ! inhibit_health_checking ) {
health_check_service_name . reset (
gpr_strdup ( chand_ - > health_check_service_name_ . get ( ) ) ) ;
@ -1318,7 +1322,7 @@ class ChannelData::ClientChannelControlHelper
void UpdateState (
grpc_connectivity_state state ,
UniqueP tr< LoadBalancingPolicy : : SubchannelPicker > picker ) override {
std : : unique_p tr< LoadBalancingPolicy : : SubchannelPicker > picker ) override {
grpc_error * disconnect_error = chand_ - > disconnect_error ( ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_client_channel_routing_trace ) ) {
const char * extra = disconnect_error = = GRPC_ERROR_NONE
@ -1491,7 +1495,7 @@ ChannelData::~ChannelData() {
void ChannelData : : UpdateStateAndPickerLocked (
grpc_connectivity_state state , const char * reason ,
UniqueP tr< LoadBalancingPolicy : : SubchannelPicker > picker ) {
std : : unique_p tr< LoadBalancingPolicy : : SubchannelPicker > picker ) {
// Clean the control plane when entering IDLE.
if ( picker_ = = nullptr ) {
health_check_service_name_ . reset ( ) ;
@ -1591,8 +1595,8 @@ void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
lb_args . combiner = combiner_ ;
lb_args . channel_control_helper = MakeUnique < ClientChannelControlHelper > ( this ) ;
lb_args . args = channel_args_ ;
UniqueP tr< char > target_uri ( gpr_strdup ( target_uri_ . get ( ) ) ) ;
resolving_lb_policy_ . reset ( New < ResolvingLoadBalancingPolicy > (
std : : unique_p tr< char > target_uri ( gpr_strdup ( target_uri_ . get ( ) ) ) ;
resolving_lb_policy_ . reset ( new ResolvingLoadBalancingPolicy (
std : : move ( lb_args ) , & grpc_client_channel_routing_trace ,
std : : move ( target_uri ) , ProcessResolverResultLocked , this ) ) ;
grpc_pollset_set_add_pollset_set ( resolving_lb_policy_ - > interested_parties ( ) ,
@ -1614,7 +1618,7 @@ void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
void ChannelData : : ProcessLbPolicy (
const Resolver : : Result & resolver_result ,
const internal : : ClientChannelGlobalParsedConfig * parsed_service_config ,
UniqueP tr< char > * lb_policy_name ,
std : : unique_p tr< char > * lb_policy_name ,
RefCountedPtr < LoadBalancingPolicy : : Config > * lb_policy_config ) {
// Prefer the LB policy name found in the service config.
if ( parsed_service_config ! = nullptr & &
@ -1709,7 +1713,7 @@ bool ChannelData::ProcessResolverResultLocked(
return false ;
}
// Process service config.
UniqueP tr< char > service_config_json ;
std : : unique_p tr< char > service_config_json ;
const internal : : ClientChannelGlobalParsedConfig * parsed_service_config =
nullptr ;
if ( service_config ! = nullptr ) {
@ -1743,8 +1747,8 @@ bool ChannelData::ProcessResolverResultLocked(
}
// Update health check service name used by existing subchannel wrappers.
for ( auto * subchannel_wrapper : chand - > subchannel_wrappers_ ) {
subchannel_wrapper - > UpdateHealthCheckServiceName (
UniquePtr < char > ( gpr_strdup ( chand - > health_check_service_name_ . get ( ) ) ) ) ;
subchannel_wrapper - > UpdateHealthCheckServiceName ( std : : unique_ptr < char > (
gpr_strdup ( chand - > health_check_service_name_ . get ( ) ) ) ) ;
}
// Save service config.
chand - > saved_service_config_ = std : : move ( service_config ) ;
@ -1769,7 +1773,7 @@ bool ChannelData::ProcessResolverResultLocked(
chand - > UpdateServiceConfigLocked ( std : : move ( retry_throttle_data ) ,
chand - > saved_service_config_ ) ;
}
UniqueP tr< char > processed_lb_policy_name ;
std : : unique_p tr< char > processed_lb_policy_name ;
chand - > ProcessLbPolicy ( result , parsed_service_config ,
& processed_lb_policy_name , lb_policy_config ) ;
// Swap out the data used by GetChannelInfo().
@ -1825,8 +1829,9 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
if ( op - > send_ping . on_initiate ! = nullptr | | op - > send_ping . on_ack ! = nullptr ) {
grpc_error * error = chand - > DoPingLocked ( op ) ;
if ( error ! = GRPC_ERROR_NONE ) {
GRPC_CLOSURE_SCHED ( op - > send_ping . on_initiate , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( op - > send_ping . on_ack , error ) ;
ExecCtx : : Run ( DEBUG_LOCATION , op - > send_ping . on_initiate ,
GRPC_ERROR_REF ( error ) ) ;
ExecCtx : : Run ( DEBUG_LOCATION , op - > send_ping . on_ack , error ) ;
}
op - > bind_pollset = nullptr ;
op - > send_ping . on_initiate = nullptr ;
@ -1868,7 +1873,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
}
}
GRPC_CHANNEL_STACK_UNREF ( chand - > owning_stack_ , " start_transport_op " ) ;
GRPC_CLOSURE_SCHED ( op - > on_consumed , GRPC_ERROR_NONE ) ;
ExecCtx : : Run ( DEBUG_LOCATION , op - > on_consumed , GRPC_ERROR_NONE ) ;
}
void ChannelData : : StartTransportOp ( grpc_channel_element * elem ,
@ -1960,12 +1965,12 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
void ChannelData : : AddConnectivityWatcher (
grpc_connectivity_state initial_state ,
OrphanablePtr < AsyncConnectivityStateWatcherInterface > watcher ) {
New < ConnectivityWatcherAdder > ( this , initial_state , std : : move ( watcher ) ) ;
new ConnectivityWatcherAdder ( this , initial_state , std : : move ( watcher ) ) ;
}
void ChannelData : : RemoveConnectivityWatcher (
AsyncConnectivityStateWatcherInterface * watcher ) {
New < ConnectivityWatcherRemover > ( this , watcher ) ;
new ConnectivityWatcherRemover ( this , watcher ) ;
}
//
@ -2057,7 +2062,8 @@ void CallData::Destroy(grpc_call_element* elem,
then_schedule_closure = nullptr ;
}
calld - > ~ CallData ( ) ;
GRPC_CLOSURE_SCHED ( then_schedule_closure , GRPC_ERROR_NONE ) ;
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx : : Run ( DEBUG_LOCATION , then_schedule_closure , GRPC_ERROR_NONE ) ;
}
void CallData : : StartTransportStreamOpBatch (
@ -3681,7 +3687,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
void CallData : : AsyncPickDone ( grpc_call_element * elem , grpc_error * error ) {
GRPC_CLOSURE_INIT ( & pick_closure_ , PickDone , elem , grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_SCHED ( & pick_closure_ , error ) ;
ExecCtx : : Run ( DEBUG_LOCATION , & pick_closure_ , error ) ;
}
void CallData : : PickDone ( void * arg , grpc_error * error ) {
@ -3733,7 +3739,7 @@ class CallData::QueuedPickCanceller {
YieldCallCombinerIfPendingBatchesFound ) ;
}
GRPC_CALL_STACK_UNREF ( calld - > owning_call_ , " QueuedPickCanceller " ) ;
Delete ( self ) ;
delete self ;
}
grpc_call_element * elem_ ;
@ -3762,7 +3768,7 @@ void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
pick_ . elem = elem ;
chand - > AddQueuedPick ( & pick_ , pollent_ ) ;
// Register call combiner cancellation callback.
pick_canceller_ = New < QueuedPickCanceller > ( elem ) ;
pick_canceller_ = new QueuedPickCanceller ( elem ) ;
}
void CallData : : ApplyServiceConfigToCallLocked ( grpc_call_element * elem ) {