|
|
|
@ -152,41 +152,43 @@ class ChannelData { |
|
|
|
|
SubchannelInterface* subchannel) const; |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state CheckConnectivityState(bool try_to_connect); |
|
|
|
|
|
|
|
|
|
void AddExternalConnectivityWatcher(grpc_polling_entity pollent, |
|
|
|
|
grpc_connectivity_state* state, |
|
|
|
|
grpc_closure* on_complete, |
|
|
|
|
grpc_closure* watcher_timer_init) { |
|
|
|
|
MutexLock lock(&external_watchers_mu_); |
|
|
|
|
// Will be deleted when the watch is complete.
|
|
|
|
|
GPR_ASSERT(external_watchers_[on_complete] == nullptr); |
|
|
|
|
external_watchers_[on_complete] = New<ExternalConnectivityWatcher>( |
|
|
|
|
this, pollent, state, on_complete, watcher_timer_init); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, |
|
|
|
|
bool cancel) { |
|
|
|
|
MutexLock lock(&external_watchers_mu_); |
|
|
|
|
auto it = external_watchers_.find(on_complete); |
|
|
|
|
if (it != external_watchers_.end()) { |
|
|
|
|
if (cancel) it->second->Cancel(); |
|
|
|
|
external_watchers_.erase(it); |
|
|
|
|
} |
|
|
|
|
// Will delete itself.
|
|
|
|
|
New<ExternalConnectivityWatcher>(this, pollent, state, on_complete, |
|
|
|
|
watcher_timer_init); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int NumExternalConnectivityWatchers() const { |
|
|
|
|
MutexLock lock(&external_watchers_mu_); |
|
|
|
|
return static_cast<int>(external_watchers_.size()); |
|
|
|
|
return external_connectivity_watcher_list_.size(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class SubchannelWrapper; |
|
|
|
|
class ClientChannelControlHelper; |
|
|
|
|
|
|
|
|
|
// Represents a pending connectivity callback from an external caller
|
|
|
|
|
// via grpc_client_channel_watch_connectivity_state().
|
|
|
|
|
class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { |
|
|
|
|
class ExternalConnectivityWatcher { |
|
|
|
|
public: |
|
|
|
|
class WatcherList { |
|
|
|
|
public: |
|
|
|
|
WatcherList() { gpr_mu_init(&mu_); } |
|
|
|
|
~WatcherList() { gpr_mu_destroy(&mu_); } |
|
|
|
|
|
|
|
|
|
int size() const; |
|
|
|
|
ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const; |
|
|
|
|
void Add(ExternalConnectivityWatcher* watcher); |
|
|
|
|
void Remove(const ExternalConnectivityWatcher* watcher); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
// head_ is guarded by a mutex, since the size() method needs to
|
|
|
|
|
// iterate over the list, and it's called from the C-core API
|
|
|
|
|
// function grpc_channel_num_external_connectivity_watchers(), which
|
|
|
|
|
// is synchronous and therefore cannot run in the combiner.
|
|
|
|
|
mutable gpr_mu mu_; |
|
|
|
|
ExternalConnectivityWatcher* head_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent, |
|
|
|
|
grpc_connectivity_state* state, |
|
|
|
|
grpc_closure* on_complete, |
|
|
|
@ -194,23 +196,17 @@ class ChannelData { |
|
|
|
|
|
|
|
|
|
~ExternalConnectivityWatcher(); |
|
|
|
|
|
|
|
|
|
void Notify(grpc_connectivity_state state) override; |
|
|
|
|
|
|
|
|
|
void Cancel(); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static void AddWatcherLocked(void* arg, grpc_error* ignored); |
|
|
|
|
static void RemoveWatcherLocked(void* arg, grpc_error* ignored); |
|
|
|
|
static void OnWatchCompleteLocked(void* arg, grpc_error* error); |
|
|
|
|
static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored); |
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
|
grpc_polling_entity pollent_; |
|
|
|
|
grpc_connectivity_state initial_state_; |
|
|
|
|
grpc_connectivity_state* state_; |
|
|
|
|
grpc_closure* on_complete_; |
|
|
|
|
grpc_closure* watcher_timer_init_; |
|
|
|
|
grpc_closure add_closure_; |
|
|
|
|
grpc_closure remove_closure_; |
|
|
|
|
Atomic<bool> done_{false}; |
|
|
|
|
grpc_closure my_closure_; |
|
|
|
|
ExternalConnectivityWatcher* next_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
ChannelData(grpc_channel_element_args* args, grpc_error** error); |
|
|
|
@ -277,7 +273,8 @@ class ChannelData { |
|
|
|
|
grpc_pollset_set* interested_parties_; |
|
|
|
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; |
|
|
|
|
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_; |
|
|
|
|
ConnectivityStateTracker state_tracker_; |
|
|
|
|
grpc_connectivity_state_tracker state_tracker_; |
|
|
|
|
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_; |
|
|
|
|
UniquePtr<char> health_check_service_name_; |
|
|
|
|
RefCountedPtr<ServiceConfig> saved_service_config_; |
|
|
|
|
bool received_first_resolver_result_ = false; |
|
|
|
@ -308,13 +305,6 @@ class ChannelData { |
|
|
|
|
gpr_mu info_mu_; |
|
|
|
|
UniquePtr<char> info_lb_policy_name_; |
|
|
|
|
UniquePtr<char> info_service_config_json_; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Fields guarded by a mutex, since they need to be accessed
|
|
|
|
|
// synchronously via grpc_channel_num_external_connectivity_watchers().
|
|
|
|
|
//
|
|
|
|
|
mutable Mutex external_watchers_mu_; |
|
|
|
|
Map<grpc_closure*, ExternalConnectivityWatcher*> external_watchers_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1004,7 +994,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
"subchannel %p (connected_subchannel=%p state=%s); " |
|
|
|
|
"hopping into combiner", |
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_, |
|
|
|
|
connected_subchannel.get(), ConnectivityStateName(new_state)); |
|
|
|
|
connected_subchannel.get(), |
|
|
|
|
grpc_connectivity_state_name(new_state)); |
|
|
|
|
} |
|
|
|
|
// Will delete itself.
|
|
|
|
|
New<Updater>(Ref(), new_state, std::move(connected_subchannel)); |
|
|
|
@ -1053,7 +1044,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
self->parent_->parent_->chand_, self->parent_->parent_.get(), |
|
|
|
|
self->parent_->parent_->subchannel_, |
|
|
|
|
self->connected_subchannel_.get(), |
|
|
|
|
ConnectivityStateName(self->state_), |
|
|
|
|
grpc_connectivity_state_name(self->state_), |
|
|
|
|
self->parent_->watcher_.get()); |
|
|
|
|
} |
|
|
|
|
// Ignore update if the parent WatcherWrapper has been replaced
|
|
|
|
@ -1114,6 +1105,55 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ChannelData::ExternalConnectivityWatcher::WatcherList
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
int count = 0; |
|
|
|
|
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) { |
|
|
|
|
++count; |
|
|
|
|
} |
|
|
|
|
return count; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData::ExternalConnectivityWatcher* |
|
|
|
|
ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup( |
|
|
|
|
grpc_closure* on_complete) const { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
ExternalConnectivityWatcher* w = head_; |
|
|
|
|
while (w != nullptr && w->on_complete_ != on_complete) { |
|
|
|
|
w = w->next_; |
|
|
|
|
} |
|
|
|
|
return w; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::WatcherList::Add( |
|
|
|
|
ExternalConnectivityWatcher* watcher) { |
|
|
|
|
GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr); |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
GPR_ASSERT(watcher->next_ == nullptr); |
|
|
|
|
watcher->next_ = head_; |
|
|
|
|
head_ = watcher; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove( |
|
|
|
|
const ExternalConnectivityWatcher* watcher) { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
if (watcher == head_) { |
|
|
|
|
head_ = watcher->next_; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) { |
|
|
|
|
if (w->next_ == watcher) { |
|
|
|
|
w->next_ = w->next_->next_; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ChannelData::ExternalConnectivityWatcher
|
|
|
|
|
//
|
|
|
|
@ -1124,7 +1164,6 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( |
|
|
|
|
grpc_closure* watcher_timer_init) |
|
|
|
|
: chand_(chand), |
|
|
|
|
pollent_(pollent), |
|
|
|
|
initial_state_(*state), |
|
|
|
|
state_(state), |
|
|
|
|
on_complete_(on_complete), |
|
|
|
|
watcher_timer_init_(watcher_timer_init) { |
|
|
|
@ -1132,7 +1171,7 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( |
|
|
|
|
chand_->interested_parties_); |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, |
|
|
|
|
GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this, |
|
|
|
|
grpc_combiner_scheduler(chand_->combiner_)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
@ -1144,61 +1183,42 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { |
|
|
|
|
"ExternalConnectivityWatcher"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::Notify( |
|
|
|
|
grpc_connectivity_state state) { |
|
|
|
|
bool done = false; |
|
|
|
|
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED, |
|
|
|
|
MemoryOrder::RELAXED)) { |
|
|
|
|
return; // Already done.
|
|
|
|
|
} |
|
|
|
|
// Report new state to the user.
|
|
|
|
|
*state_ = state; |
|
|
|
|
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE); |
|
|
|
|
// Remove external watcher.
|
|
|
|
|
chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false); |
|
|
|
|
// Hop back into the combiner to clean up.
|
|
|
|
|
// Not needed in state SHUTDOWN, because the tracker will
|
|
|
|
|
// automatically remove all watchers in that case.
|
|
|
|
|
if (state != GRPC_CHANNEL_SHUTDOWN) { |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, |
|
|
|
|
grpc_combiner_scheduler(chand_->combiner_)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::Cancel() { |
|
|
|
|
bool done = false; |
|
|
|
|
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED, |
|
|
|
|
MemoryOrder::RELAXED)) { |
|
|
|
|
return; // Already done.
|
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED); |
|
|
|
|
// Hop back into the combiner to clean up.
|
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, |
|
|
|
|
grpc_combiner_scheduler(chand_->combiner_)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( |
|
|
|
|
void* arg, grpc_error* ignored) { |
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
ExternalConnectivityWatcher* self = |
|
|
|
|
static_cast<ExternalConnectivityWatcher*>(arg); |
|
|
|
|
// This assumes that the closure is scheduled on the ExecCtx scheduler
|
|
|
|
|
// and that GRPC_CLOSURE_RUN() will run the closure immediately.
|
|
|
|
|
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); |
|
|
|
|
// Add new watcher.
|
|
|
|
|
self->chand_->state_tracker_.AddWatcher( |
|
|
|
|
self->initial_state_, |
|
|
|
|
OrphanablePtr<ConnectivityStateWatcherInterface>(self)); |
|
|
|
|
grpc_closure* on_complete = self->on_complete_; |
|
|
|
|
self->chand_->external_connectivity_watcher_list_.Remove(self); |
|
|
|
|
Delete(self); |
|
|
|
|
GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( |
|
|
|
|
void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( |
|
|
|
|
void* arg, grpc_error* ignored) { |
|
|
|
|
ExternalConnectivityWatcher* self = |
|
|
|
|
static_cast<ExternalConnectivityWatcher*>(arg); |
|
|
|
|
self->chand_->state_tracker_.RemoveWatcher(self); |
|
|
|
|
if (self->state_ == nullptr) { |
|
|
|
|
// Handle cancellation.
|
|
|
|
|
GPR_ASSERT(self->watcher_timer_init_ == nullptr); |
|
|
|
|
ExternalConnectivityWatcher* found = |
|
|
|
|
self->chand_->external_connectivity_watcher_list_.Lookup( |
|
|
|
|
self->on_complete_); |
|
|
|
|
if (found != nullptr) { |
|
|
|
|
grpc_connectivity_state_notify_on_state_change( |
|
|
|
|
&found->chand_->state_tracker_, nullptr, &found->my_closure_); |
|
|
|
|
} |
|
|
|
|
Delete(self); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// New watcher.
|
|
|
|
|
self->chand_->external_connectivity_watcher_list_.Add(self); |
|
|
|
|
// This assumes that the closure is scheduled on the ExecCtx scheduler
|
|
|
|
|
// and that GRPC_CLOSURE_RUN would run the closure immediately.
|
|
|
|
|
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); |
|
|
|
|
GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self, |
|
|
|
|
grpc_combiner_scheduler(self->chand_->combiner_)); |
|
|
|
|
grpc_connectivity_state_notify_on_state_change( |
|
|
|
|
&self->chand_->state_tracker_, self->state_, &self->my_closure_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1251,7 +1271,7 @@ class ChannelData::ClientChannelControlHelper |
|
|
|
|
? "" |
|
|
|
|
: " (ignoring -- channel shutting down)"; |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_, |
|
|
|
|
ConnectivityStateName(state), picker.get(), extra); |
|
|
|
|
grpc_connectivity_state_name(state), picker.get(), extra); |
|
|
|
|
} |
|
|
|
|
// Do update only if not shutting down.
|
|
|
|
|
if (disconnect_error == GRPC_ERROR_NONE) { |
|
|
|
@ -1342,13 +1362,14 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) |
|
|
|
|
combiner_(grpc_combiner_create()), |
|
|
|
|
interested_parties_(grpc_pollset_set_create()), |
|
|
|
|
subchannel_pool_(GetSubchannelPool(args->channel_args)), |
|
|
|
|
state_tracker_("client_channel", GRPC_CHANNEL_IDLE), |
|
|
|
|
disconnect_error_(GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", |
|
|
|
|
this, owning_stack_); |
|
|
|
|
} |
|
|
|
|
// Initialize data members.
|
|
|
|
|
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, |
|
|
|
|
"client_channel"); |
|
|
|
|
gpr_mu_init(&info_mu_); |
|
|
|
|
// Start backup polling.
|
|
|
|
|
grpc_client_channel_start_backup_polling(interested_parties_); |
|
|
|
@ -1412,6 +1433,7 @@ ChannelData::~ChannelData() { |
|
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
|
GRPC_COMBINER_UNREF(combiner_, "client_channel"); |
|
|
|
|
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); |
|
|
|
|
grpc_connectivity_state_destroy(&state_tracker_); |
|
|
|
|
gpr_mu_destroy(&info_mu_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1425,7 +1447,7 @@ void ChannelData::UpdateStateAndPickerLocked( |
|
|
|
|
received_first_resolver_result_ = false; |
|
|
|
|
} |
|
|
|
|
// Update connectivity state.
|
|
|
|
|
state_tracker_.SetState(state, reason); |
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, state, reason); |
|
|
|
|
if (channelz_node_ != nullptr) { |
|
|
|
|
channelz_node_->SetConnectivityState(state); |
|
|
|
|
channelz_node_->AddTraceEvent( |
|
|
|
@ -1714,7 +1736,7 @@ bool ChannelData::ProcessResolverResultLocked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { |
|
|
|
|
if (state_tracker_.state() != GRPC_CHANNEL_READY) { |
|
|
|
|
if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) { |
|
|
|
|
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); |
|
|
|
|
} |
|
|
|
|
LoadBalancingPolicy::PickResult result = |
|
|
|
@ -1742,12 +1764,12 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { |
|
|
|
|
static_cast<grpc_channel_element*>(op->handler_private.extra_arg); |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
// Connectivity watch.
|
|
|
|
|
if (op->start_connectivity_watch != nullptr) { |
|
|
|
|
chand->state_tracker_.AddWatcher(op->start_connectivity_watch_state, |
|
|
|
|
std::move(op->start_connectivity_watch)); |
|
|
|
|
} |
|
|
|
|
if (op->stop_connectivity_watch != nullptr) { |
|
|
|
|
chand->state_tracker_.RemoveWatcher(op->stop_connectivity_watch); |
|
|
|
|
if (op->on_connectivity_state_change != nullptr) { |
|
|
|
|
grpc_connectivity_state_notify_on_state_change( |
|
|
|
|
&chand->state_tracker_, op->connectivity_state, |
|
|
|
|
op->on_connectivity_state_change); |
|
|
|
|
op->on_connectivity_state_change = nullptr; |
|
|
|
|
op->connectivity_state = nullptr; |
|
|
|
|
} |
|
|
|
|
// Ping.
|
|
|
|
|
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { |
|
|
|
@ -1878,7 +1900,7 @@ void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state ChannelData::CheckConnectivityState( |
|
|
|
|
bool try_to_connect) { |
|
|
|
|
grpc_connectivity_state out = state_tracker_.state(); |
|
|
|
|
grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_); |
|
|
|
|
if (out == GRPC_CHANNEL_IDLE && try_to_connect) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); |
|
|
|
|
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, |
|
|
|
@ -3928,13 +3950,6 @@ void grpc_client_channel_watch_connectivity_state( |
|
|
|
|
grpc_connectivity_state* state, grpc_closure* closure, |
|
|
|
|
grpc_closure* watcher_timer_init) { |
|
|
|
|
auto* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
if (state == nullptr) { |
|
|
|
|
// Handle cancellation.
|
|
|
|
|
GPR_ASSERT(watcher_timer_init == nullptr); |
|
|
|
|
chand->RemoveExternalConnectivityWatcher(closure, /*cancel=*/true); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Handle addition.
|
|
|
|
|
return chand->AddExternalConnectivityWatcher(pollent, state, closure, |
|
|
|
|
watcher_timer_init); |
|
|
|
|
} |
|
|
|
|