From 702dd6fd74f4bbe5dc08fa944c80a78f1b6f8999 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 7 Jul 2020 16:49:06 -0700 Subject: [PATCH 1/5] Store ref to the ExternalConnectivityWatcher in external_watchers_ map. --- .../filters/client_channel/client_channel.cc | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 921de985972..f652a6113e6 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -171,12 +171,13 @@ class ChannelData { grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { - auto* watcher = new ExternalConnectivityWatcher( + auto watcher = MakeRefCounted( this, pollent, state, on_complete, watcher_timer_init); { MutexLock lock(&external_watchers_mu_); // Will be deleted when the watch is complete. GPR_ASSERT(external_watchers_[on_complete] == nullptr); + // Pass a ref to the external_watchers_ map. external_watchers_[on_complete] = watcher; } watcher->Start(); @@ -184,12 +185,12 @@ class ChannelData { void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, bool cancel) { - ExternalConnectivityWatcher* watcher = nullptr; + RefCountedPtr watcher; { MutexLock lock(&external_watchers_mu_); auto it = external_watchers_.find(on_complete); if (it != external_watchers_.end()) { - watcher = it->second; + watcher = std::move(it->second); external_watchers_.erase(it); } } @@ -360,7 +361,8 @@ class ChannelData { // synchronously via grpc_channel_num_external_connectivity_watchers(). // mutable Mutex external_watchers_mu_; - std::map external_watchers_; + std::map> + external_watchers_; }; // @@ -1181,8 +1183,14 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { } void ChannelData::ExternalConnectivityWatcher::Start() { - chand_->work_serializer_->Run([this]() { AddWatcherLocked(); }, - DEBUG_LOCATION); + // Ref owned by the lambda + Ref(DEBUG_LOCATION, "Start").release(); + chand_->work_serializer_->Run( + [this]() { + AddWatcherLocked(); + Unref(DEBUG_LOCATION, "Start"); + }, + DEBUG_LOCATION); } void ChannelData::ExternalConnectivityWatcher::Notify( @@ -1222,7 +1230,8 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE); // Add new watcher. chand_->state_tracker_.AddWatcher( - initial_state_, OrphanablePtr(this)); + initial_state_, + OrphanablePtr(Ref().release())); } void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() { From 4d0269f0fdfe64912855f22481d77a5d710fb876 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 7 Jul 2020 18:36:41 -0700 Subject: [PATCH 2/5] Add comment --- src/core/ext/filters/client_channel/client_channel.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f652a6113e6..43a3688e2ab 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -177,7 +177,9 @@ class ChannelData { MutexLock lock(&external_watchers_mu_); // Will be deleted when the watch is complete. GPR_ASSERT(external_watchers_[on_complete] == nullptr); - // Pass a ref to the external_watchers_ map. + // Pass a ref to the external_watchers_ map. We are taking an additional + // ref on the watcher so that we can maintain lifetime guarantees when + // watcher->Start() is called after the critical section. external_watchers_[on_complete] = watcher; } watcher->Start(); From 98684b59f954bf6ffae89dbe95cc27fcf60fa7d5 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 8 Jul 2020 15:03:37 -0700 Subject: [PATCH 3/5] Reviewer comments --- .../filters/client_channel/client_channel.cc | 86 +++++++++++++------ 1 file changed, 61 insertions(+), 25 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 43a3688e2ab..72d351fb915 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -171,34 +171,22 @@ class ChannelData { grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { - auto watcher = MakeRefCounted( + auto watcher = new ExternalConnectivityWatcher( this, pollent, state, on_complete, watcher_timer_init); { MutexLock lock(&external_watchers_mu_); - // Will be deleted when the watch is complete. - GPR_ASSERT(external_watchers_[on_complete] == nullptr); - // Pass a ref to the external_watchers_ map. We are taking an additional - // ref on the watcher so that we can maintain lifetime guarantees when - // watcher->Start() is called after the critical section. - external_watchers_[on_complete] = watcher; + // Store a ref to the watcher in the external_watchers_ map. + watcher->AddWatcherToExternalWatchersMapLocked(&external_watchers_, + on_complete); } + // Pass the ref from creating the object to Start(). watcher->Start(); } void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, bool cancel) { - RefCountedPtr watcher; - { - MutexLock lock(&external_watchers_mu_); - auto it = external_watchers_.find(on_complete); - if (it != external_watchers_.end()) { - watcher = std::move(it->second); - external_watchers_.erase(it); - } - } - // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock - // the mutex before calling it. - if (watcher != nullptr && cancel) watcher->Cancel(); + ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( + &external_watchers_mu_, &external_watchers_, on_complete, cancel); } int NumExternalConnectivityWatchers() const { @@ -229,6 +217,22 @@ class ChannelData { ~ExternalConnectivityWatcher(); + // Adds the watcher to the external_watchers_ map. Synchronized by + // external_watchers_mu_ + void AddWatcherToExternalWatchersMapLocked( + std::map>* + external_watchers, + grpc_closure* on_complete); + + // Removes the watcher from the external_watchers_ map. + static void RemoveWatcherFromExternalWatchersMap( + Mutex* external_watchers_mu, + std::map>* + external_watchers, + grpc_closure* on_complete, bool cancel); + + // Starts the watch. Consumes the ref from the creation of the + // ExternalConnectivityWatcher object. void Start(); void Notify(grpc_connectivity_state state) override; @@ -236,6 +240,8 @@ class ChannelData { void Cancel(); private: + // Adds the watcher to state_tracker_. Consumes the ref that is passed to it + // from Start(). void AddWatcherLocked(); void RemoveWatcherLocked(); @@ -1184,13 +1190,44 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { "ExternalConnectivityWatcher"); } +void ChannelData::ExternalConnectivityWatcher:: + AddWatcherToExternalWatchersMapLocked( + std::map>* + external_watchers, + grpc_closure* on_complete) { + // Will be deleted when the watch is complete. + GPR_ASSERT((*external_watchers)[on_complete] == nullptr); + (*external_watchers)[on_complete] = + Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked"); +} + +void ChannelData::ExternalConnectivityWatcher:: + RemoveWatcherFromExternalWatchersMap( + Mutex* external_watchers_mu, + std::map>* + external_watchers, + grpc_closure* on_complete, bool cancel) { + RefCountedPtr watcher; + { + MutexLock lock(external_watchers_mu); + auto it = (*external_watchers).find(on_complete); + if (it != (*external_watchers).end()) { + watcher = std::move(it->second); + (*external_watchers).erase(it); + } + } + if (watcher != nullptr && cancel) { + watcher->Cancel(); + } +} + void ChannelData::ExternalConnectivityWatcher::Start() { - // Ref owned by the lambda - Ref(DEBUG_LOCATION, "Start").release(); + // No need to take a ref since Start() consumes the ref from the + // creation of the object. chand_->work_serializer_->Run( [this]() { + // The ref is passed to AddWatcherLocked(). AddWatcherLocked(); - Unref(DEBUG_LOCATION, "Start"); }, DEBUG_LOCATION); } @@ -1230,10 +1267,9 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() { void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE); - // Add new watcher. + // Add new watcher. Pass the ref of the object from creation to OrphanablePtr. chand_->state_tracker_.AddWatcher( - initial_state_, - OrphanablePtr(Ref().release())); + initial_state_, OrphanablePtr(this)); } void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() { From e03b0b6deba187f1431f338cb9d93d22da2aa865 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 8 Jul 2020 19:09:14 -0700 Subject: [PATCH 4/5] Another comment --- src/core/ext/filters/client_channel/client_channel.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 72d351fb915..75d8c9e369a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1216,9 +1216,9 @@ void ChannelData::ExternalConnectivityWatcher:: (*external_watchers).erase(it); } } - if (watcher != nullptr && cancel) { - watcher->Cancel(); - } + // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock + // the mutex before calling it. + if (watcher != nullptr && cancel) watcher->Cancel(); } void ChannelData::ExternalConnectivityWatcher::Start() { From 01f992f9d6d9e48c512f9bfc32e12273b0ef6644 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 9 Jul 2020 11:58:33 -0700 Subject: [PATCH 5/5] Reviewer comments --- .../filters/client_channel/client_channel.cc | 86 ++++++------------- 1 file changed, 28 insertions(+), 58 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 75d8c9e369a..3bfd12280cf 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -171,22 +171,14 @@ class ChannelData { grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { - auto watcher = new ExternalConnectivityWatcher( - this, pollent, state, on_complete, watcher_timer_init); - { - MutexLock lock(&external_watchers_mu_); - // Store a ref to the watcher in the external_watchers_ map. - watcher->AddWatcherToExternalWatchersMapLocked(&external_watchers_, - on_complete); - } - // Pass the ref from creating the object to Start(). - watcher->Start(); + new ExternalConnectivityWatcher(this, pollent, state, on_complete, + watcher_timer_init); } void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, bool cancel) { ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( - &external_watchers_mu_, &external_watchers_, on_complete, cancel); + this, on_complete, cancel); } int NumExternalConnectivityWatchers() const { @@ -217,23 +209,10 @@ class ChannelData { ~ExternalConnectivityWatcher(); - // Adds the watcher to the external_watchers_ map. Synchronized by - // external_watchers_mu_ - void AddWatcherToExternalWatchersMapLocked( - std::map>* - external_watchers, - grpc_closure* on_complete); - // Removes the watcher from the external_watchers_ map. - static void RemoveWatcherFromExternalWatchersMap( - Mutex* external_watchers_mu, - std::map>* - external_watchers, - grpc_closure* on_complete, bool cancel); - - // Starts the watch. Consumes the ref from the creation of the - // ExternalConnectivityWatcher object. - void Start(); + static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand, + grpc_closure* on_complete, + bool cancel); void Notify(grpc_connectivity_state state) override; @@ -1181,6 +1160,21 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( grpc_polling_entity_add_to_pollset_set(&pollent_, chand_->interested_parties_); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); + { + MutexLock lock(&chand_->external_watchers_mu_); + // Will be deleted when the watch is complete. + GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr); + // Store a ref to the watcher in the external_watchers_ map. + chand->external_watchers_[on_complete] = + Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked"); + } + // Pass the ref from creating the object to Start(). + chand_->work_serializer_->Run( + [this]() { + // The ref is passed to AddWatcherLocked(). + AddWatcherLocked(); + }, + DEBUG_LOCATION); } ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { @@ -1191,29 +1185,16 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { } void ChannelData::ExternalConnectivityWatcher:: - AddWatcherToExternalWatchersMapLocked( - std::map>* - external_watchers, - grpc_closure* on_complete) { - // Will be deleted when the watch is complete. - GPR_ASSERT((*external_watchers)[on_complete] == nullptr); - (*external_watchers)[on_complete] = - Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked"); -} - -void ChannelData::ExternalConnectivityWatcher:: - RemoveWatcherFromExternalWatchersMap( - Mutex* external_watchers_mu, - std::map>* - external_watchers, - grpc_closure* on_complete, bool cancel) { + RemoveWatcherFromExternalWatchersMap(ChannelData* chand, + grpc_closure* on_complete, + bool cancel) { RefCountedPtr watcher; { - MutexLock lock(external_watchers_mu); - auto it = (*external_watchers).find(on_complete); - if (it != (*external_watchers).end()) { + MutexLock lock(&chand->external_watchers_mu_); + auto it = chand->external_watchers_.find(on_complete); + if (it != chand->external_watchers_.end()) { watcher = std::move(it->second); - (*external_watchers).erase(it); + chand->external_watchers_.erase(it); } } // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock @@ -1221,17 +1202,6 @@ void ChannelData::ExternalConnectivityWatcher:: if (watcher != nullptr && cancel) watcher->Cancel(); } -void ChannelData::ExternalConnectivityWatcher::Start() { - // No need to take a ref since Start() consumes the ref from the - // creation of the object. - chand_->work_serializer_->Run( - [this]() { - // The ref is passed to AddWatcherLocked(). - AddWatcherLocked(); - }, - DEBUG_LOCATION); -} - void ChannelData::ExternalConnectivityWatcher::Notify( grpc_connectivity_state state) { bool done = false;