From f9dd439421c659d06a3e898bbf6e4b3a9ecfcb1b Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 27 Feb 2020 12:36:29 -0800 Subject: [PATCH] Trying out a solution --- .../filters/client_channel/client_channel.cc | 57 ++++++++----------- .../ext/filters/client_channel/subchannel.cc | 46 +++++---------- .../ext/filters/client_channel/subchannel.h | 31 ++++++---- 3 files changed, 59 insertions(+), 75 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 4b403a38e4d..f68b02e526a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1018,19 +1018,16 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { DEBUG_LOCATION); } - void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) override { + void OnConnectivityStateChange() override { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: connectivity change for subchannel wrapper %p " - "subchannel %p (connected_subchannel=%p state=%s); " + "subchannel %p; " "hopping into work_serializer", - parent_->chand_, parent_.get(), parent_->subchannel_, - connected_subchannel.get(), ConnectivityStateName(new_state)); + parent_->chand_, parent_.get(), parent_->subchannel_); } // Will delete itself. - new Updater(Ref(), new_state, std::move(connected_subchannel)); + new Updater(Ref()); } grpc_pollset_set* interested_parties() override { @@ -1052,20 +1049,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { private: class Updater { public: - Updater(RefCountedPtr parent, - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) - : parent_(std::move(parent)), - state_(new_state), - connected_subchannel_(std::move(connected_subchannel)) { - gpr_log(GPR_ERROR, "updater run %p", parent_->mu()); - running_exec_ctx_ = ExecCtx::Get(); + Updater(RefCountedPtr parent) + : parent_(std::move(parent)) { parent_->parent_->chand_->work_serializer_->Run( [this]() { ApplyUpdateInControlPlaneWorkSerializer(); }, DEBUG_LOCATION); - if (!run_inline_) { - gpr_mu_unlock(parent_->mu()->get()); - } } private: @@ -1074,23 +1062,28 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { gpr_log(GPR_INFO, "chand=%p: processing connectivity change in work serializer " "for subchannel wrapper %p subchannel %p " - "(connected_subchannel=%p state=%s): watcher=%p", + "watcher=%p", parent_->parent_->chand_, parent_->parent_.get(), - parent_->parent_->subchannel_, connected_subchannel_.get(), - ConnectivityStateName(state_), parent_->watcher_.get()); + parent_->parent_->subchannel_, parent_->watcher_.get()); } - if (ExecCtx::Get() == running_exec_ctx_) { - // Running inline - gpr_mu_unlock(parent_->mu()->get()); - run_inline_ = true; + while (true) { + grpc_connectivity_state state; + RefCountedPtr connected_subchannel; + gpr_log(GPR_ERROR, "bout to popping connectivity state change %d", + state); + if (!parent_->PopConnectivityStateChange(&state, + &connected_subchannel)) { + break; + } + gpr_log(GPR_ERROR, "popping connectivity state change %d", state); + // Ignore update if the parent WatcherWrapper has been replaced + // since this callback was scheduled. + if (parent_->watcher_ == nullptr) continue; + parent_->last_seen_state_ = state; + parent_->parent_->MaybeUpdateConnectedSubchannel( + std::move(connected_subchannel)); + parent_->watcher_->OnConnectivityStateChange(state); } - // Ignore update if the parent WatcherWrapper has been replaced - // since this callback was scheduled. - if (parent_->watcher_ == nullptr) return; - parent_->last_seen_state_ = state_; - parent_->parent_->MaybeUpdateConnectedSubchannel( - std::move(connected_subchannel_)); - parent_->watcher_->OnConnectivityStateChange(state_); delete this; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 47288c7c198..51fb70a7945 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -371,40 +371,24 @@ class Subchannel::AsyncWatcherNotifierLocked { Subchannel* subchannel, grpc_connectivity_state state) : subchannel_(subchannel), watcher_(std::move(watcher)) { gpr_log(GPR_ERROR, "pushing connectivity state change %d", state); - { - MutexLock(watcher_->mu()); - watcher_->PushConnectivityStateChangeLocked(state); + RefCountedPtr connected_subchannel; + if (state == GRPC_CHANNEL_READY) { + connected_subchannel = subchannel_->connected_subchannel_; } + watcher_->PushConnectivityStateChange(state, + std::move(connected_subchannel)); + gpr_log(GPR_ERROR, "done pushing"); ExecCtx::Run( DEBUG_LOCATION, - GRPC_CLOSURE_INIT( - &closure_, - [](void* arg, grpc_error* /*error*/) { - auto* self = static_cast(arg); - while (true) { - grpc_connectivity_state state; - RefCountedPtr connected_subchannel; - gpr_log(GPR_ERROR, "lock %p", self->watcher_->mu()); - gpr_mu_lock(self->watcher_->mu()->get()); - { - if (!self->watcher_->PopConnectivityStateChangeLocked( - &state)) { - gpr_mu_unlock(self->watcher_->mu()->get()); - break; - } - gpr_log(GPR_ERROR, "popping connectivity state change %d", - state); - if (state == GRPC_CHANNEL_READY) { - connected_subchannel = - self->subchannel_->connected_subchannel_; - } - } - self->watcher_->OnConnectivityStateChange( - state, std::move(connected_subchannel)); - } - delete self; - }, - this, nullptr), + GRPC_CLOSURE_INIT(&closure_, + [](void* arg, grpc_error* /*error*/) { + gpr_log(GPR_ERROR, "done conn state change exec"); + auto* self = + static_cast(arg); + self->watcher_->OnConnectivityStateChange(); + delete self; + }, + this, nullptr), GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 34dce735768..87192aa989d 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -190,42 +190,49 @@ class Subchannel { // contain a ref to the connected subchannel. When it changes from // READY to some other state, the implementation must release its // ref to the connected subchannel. - virtual void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) // NOLINT + virtual void OnConnectivityStateChange() // NOLINT = 0; virtual grpc_pollset_set* interested_parties() = 0; + // Enqueues connectivity state change notifications. // TODO(yashkt): This is currently needed to send the state updates in the // right order when asynchronously notifying. This will no longer be - // necessary when we have access to EventManager. Enqueues connectivity - // state change notifications. Does NOT - void PushConnectivityStateChangeLocked(grpc_connectivity_state state) { - connectivity_state_queue_.push_back(state); + // necessary when we have access to EventManager. + void PushConnectivityStateChange( + grpc_connectivity_state state, + RefCountedPtr connected_subchannel) { + MutexLock lock(&mu_); + connectivity_state_queue_.push_back( + std::make_pair(state, std::move(connected_subchannel))); } // Dequeues connectivity state change notifications. If the queue is empty, // it returns false, otherwise returns true and sets \a state to the popped // state change. - bool PopConnectivityStateChangeLocked(grpc_connectivity_state* state) { + bool PopConnectivityStateChange( + grpc_connectivity_state* state, + RefCountedPtr* connected_subchannel) { + MutexLock lock(&mu_); if (connectivity_state_queue_.empty()) { return false; } else { - *state = connectivity_state_queue_.front(); + *state = connectivity_state_queue_.front().first; + *connected_subchannel = + std::move(connectivity_state_queue_.front().second); connectivity_state_queue_.pop_front(); return true; } } - Mutex* mu() { return &mu_; } - private: // Keeps track of the updates that the watcher instance must be notified of. // TODO(yashkt): This is currently needed to send the state updates in the // right order when asynchronously notifying. This will no longer be // necessary when we have access to EventManager. - std::deque connectivity_state_queue_; + std::deque< + std::pair>> + connectivity_state_queue_; Mutex mu_; // protects the queue };