diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 43a3688e2ab..72d351fb915 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -171,34 +171,22 @@ class ChannelData { grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { - auto watcher = MakeRefCounted( + auto watcher = new ExternalConnectivityWatcher( this, pollent, state, on_complete, watcher_timer_init); { MutexLock lock(&external_watchers_mu_); - // Will be deleted when the watch is complete. - GPR_ASSERT(external_watchers_[on_complete] == nullptr); - // Pass a ref to the external_watchers_ map. We are taking an additional - // ref on the watcher so that we can maintain lifetime guarantees when - // watcher->Start() is called after the critical section. - external_watchers_[on_complete] = watcher; + // Store a ref to the watcher in the external_watchers_ map. + watcher->AddWatcherToExternalWatchersMapLocked(&external_watchers_, + on_complete); } + // Pass the ref from creating the object to Start(). watcher->Start(); } void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, bool cancel) { - RefCountedPtr watcher; - { - MutexLock lock(&external_watchers_mu_); - auto it = external_watchers_.find(on_complete); - if (it != external_watchers_.end()) { - watcher = std::move(it->second); - external_watchers_.erase(it); - } - } - // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock - // the mutex before calling it. - if (watcher != nullptr && cancel) watcher->Cancel(); + ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( + &external_watchers_mu_, &external_watchers_, on_complete, cancel); } int NumExternalConnectivityWatchers() const { @@ -229,6 +217,22 @@ class ChannelData { ~ExternalConnectivityWatcher(); + // Adds the watcher to the external_watchers_ map. Synchronized by + // external_watchers_mu_ + void AddWatcherToExternalWatchersMapLocked( + std::map>* + external_watchers, + grpc_closure* on_complete); + + // Removes the watcher from the external_watchers_ map. + static void RemoveWatcherFromExternalWatchersMap( + Mutex* external_watchers_mu, + std::map>* + external_watchers, + grpc_closure* on_complete, bool cancel); + + // Starts the watch. Consumes the ref from the creation of the + // ExternalConnectivityWatcher object. void Start(); void Notify(grpc_connectivity_state state) override; @@ -236,6 +240,8 @@ class ChannelData { void Cancel(); private: + // Adds the watcher to state_tracker_. Consumes the ref that is passed to it + // from Start(). void AddWatcherLocked(); void RemoveWatcherLocked(); @@ -1184,13 +1190,44 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { "ExternalConnectivityWatcher"); } +void ChannelData::ExternalConnectivityWatcher:: + AddWatcherToExternalWatchersMapLocked( + std::map>* + external_watchers, + grpc_closure* on_complete) { + // Will be deleted when the watch is complete. + GPR_ASSERT((*external_watchers)[on_complete] == nullptr); + (*external_watchers)[on_complete] = + Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked"); +} + +void ChannelData::ExternalConnectivityWatcher:: + RemoveWatcherFromExternalWatchersMap( + Mutex* external_watchers_mu, + std::map>* + external_watchers, + grpc_closure* on_complete, bool cancel) { + RefCountedPtr watcher; + { + MutexLock lock(external_watchers_mu); + auto it = (*external_watchers).find(on_complete); + if (it != (*external_watchers).end()) { + watcher = std::move(it->second); + (*external_watchers).erase(it); + } + } + if (watcher != nullptr && cancel) { + watcher->Cancel(); + } +} + void ChannelData::ExternalConnectivityWatcher::Start() { - // Ref owned by the lambda - Ref(DEBUG_LOCATION, "Start").release(); + // No need to take a ref since Start() consumes the ref from the + // creation of the object. chand_->work_serializer_->Run( [this]() { + // The ref is passed to AddWatcherLocked(). AddWatcherLocked(); - Unref(DEBUG_LOCATION, "Start"); }, DEBUG_LOCATION); } @@ -1230,10 +1267,9 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() { void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE); - // Add new watcher. + // Add new watcher. Pass the ref of the object from creation to OrphanablePtr. chand_->state_tracker_.AddWatcher( - initial_state_, - OrphanablePtr(Ref().release())); + initial_state_, OrphanablePtr(this)); } void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {