Merge pull request #23418 from yashykt/dataraceexternalcancel

Store ref to the ExternalConnectivityWatcher in external_watchers_ map.
pull/23440/head
Yash Tibrewal 5 years ago committed by GitHub
commit 6dc44b33c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      src/core/ext/filters/client_channel/client_channel.cc

@ -171,31 +171,14 @@ class ChannelData {
grpc_connectivity_state* state, grpc_connectivity_state* state,
grpc_closure* on_complete, grpc_closure* on_complete,
grpc_closure* watcher_timer_init) { grpc_closure* watcher_timer_init) {
auto* watcher = new ExternalConnectivityWatcher( new ExternalConnectivityWatcher(this, pollent, state, on_complete,
this, pollent, state, on_complete, watcher_timer_init); watcher_timer_init);
{
MutexLock lock(&external_watchers_mu_);
// Will be deleted when the watch is complete.
GPR_ASSERT(external_watchers_[on_complete] == nullptr);
external_watchers_[on_complete] = watcher;
}
watcher->Start();
} }
void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
bool cancel) { bool cancel) {
ExternalConnectivityWatcher* watcher = nullptr; ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
{ this, on_complete, cancel);
MutexLock lock(&external_watchers_mu_);
auto it = external_watchers_.find(on_complete);
if (it != external_watchers_.end()) {
watcher = it->second;
external_watchers_.erase(it);
}
}
// watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
// the mutex before calling it.
if (watcher != nullptr && cancel) watcher->Cancel();
} }
int NumExternalConnectivityWatchers() const { int NumExternalConnectivityWatchers() const {
@ -226,13 +209,18 @@ class ChannelData {
~ExternalConnectivityWatcher(); ~ExternalConnectivityWatcher();
void Start(); // Removes the watcher from the external_watchers_ map.
static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
grpc_closure* on_complete,
bool cancel);
void Notify(grpc_connectivity_state state) override; void Notify(grpc_connectivity_state state) override;
void Cancel(); void Cancel();
private: private:
// Adds the watcher to state_tracker_. Consumes the ref that is passed to it
// from Start().
void AddWatcherLocked(); void AddWatcherLocked();
void RemoveWatcherLocked(); void RemoveWatcherLocked();
@ -360,7 +348,8 @@ class ChannelData {
// synchronously via grpc_channel_num_external_connectivity_watchers(). // synchronously via grpc_channel_num_external_connectivity_watchers().
// //
mutable Mutex external_watchers_mu_; mutable Mutex external_watchers_mu_;
std::map<grpc_closure*, ExternalConnectivityWatcher*> external_watchers_; std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
external_watchers_;
}; };
// //
@ -1180,6 +1169,21 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_, grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_); chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
{
MutexLock lock(&chand_->external_watchers_mu_);
// Will be deleted when the watch is complete.
GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
// Store a ref to the watcher in the external_watchers_ map.
chand->external_watchers_[on_complete] =
Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
}
// Pass the ref from creating the object to Start().
chand_->work_serializer_->Run(
[this]() {
// The ref is passed to AddWatcherLocked().
AddWatcherLocked();
},
DEBUG_LOCATION);
} }
ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
@ -1189,9 +1193,22 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
"ExternalConnectivityWatcher"); "ExternalConnectivityWatcher");
} }
void ChannelData::ExternalConnectivityWatcher::Start() { void ChannelData::ExternalConnectivityWatcher::
chand_->work_serializer_->Run([this]() { AddWatcherLocked(); }, RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
DEBUG_LOCATION); grpc_closure* on_complete,
bool cancel) {
RefCountedPtr<ExternalConnectivityWatcher> watcher;
{
MutexLock lock(&chand->external_watchers_mu_);
auto it = chand->external_watchers_.find(on_complete);
if (it != chand->external_watchers_.end()) {
watcher = std::move(it->second);
chand->external_watchers_.erase(it);
}
}
// watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
// the mutex before calling it.
if (watcher != nullptr && cancel) watcher->Cancel();
} }
void ChannelData::ExternalConnectivityWatcher::Notify( void ChannelData::ExternalConnectivityWatcher::Notify(
@ -1229,7 +1246,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE); Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
// Add new watcher. // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
chand_->state_tracker_.AddWatcher( chand_->state_tracker_.AddWatcher(
initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this)); initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
} }

Loading…
Cancel
Save