Trying out a solution

pull/22169/head
Yash Tibrewal 5 years ago
parent 35697a8f4d
commit f9dd439421
  1. 57
      src/core/ext/filters/client_channel/client_channel.cc
  2. 46
      src/core/ext/filters/client_channel/subchannel.cc
  3. 31
      src/core/ext/filters/client_channel/subchannel.h

@ -1018,19 +1018,16 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void OnConnectivityStateChange( void OnConnectivityStateChange() override {
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: connectivity change for subchannel wrapper %p " "chand=%p: connectivity change for subchannel wrapper %p "
"subchannel %p (connected_subchannel=%p state=%s); " "subchannel %p; "
"hopping into work_serializer", "hopping into work_serializer",
parent_->chand_, parent_.get(), parent_->subchannel_, parent_->chand_, parent_.get(), parent_->subchannel_);
connected_subchannel.get(), ConnectivityStateName(new_state));
} }
// Will delete itself. // Will delete itself.
new Updater(Ref(), new_state, std::move(connected_subchannel)); new Updater(Ref());
} }
grpc_pollset_set* interested_parties() override { grpc_pollset_set* interested_parties() override {
@ -1052,20 +1049,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
private: private:
class Updater { class Updater {
public: public:
Updater(RefCountedPtr<WatcherWrapper> parent, Updater(RefCountedPtr<WatcherWrapper> parent)
grpc_connectivity_state new_state, : parent_(std::move(parent)) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel)
: parent_(std::move(parent)),
state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) {
gpr_log(GPR_ERROR, "updater run %p", parent_->mu());
running_exec_ctx_ = ExecCtx::Get();
parent_->parent_->chand_->work_serializer_->Run( parent_->parent_->chand_->work_serializer_->Run(
[this]() { ApplyUpdateInControlPlaneWorkSerializer(); }, [this]() { ApplyUpdateInControlPlaneWorkSerializer(); },
DEBUG_LOCATION); DEBUG_LOCATION);
if (!run_inline_) {
gpr_mu_unlock(parent_->mu()->get());
}
} }
private: private:
@ -1074,23 +1062,28 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: processing connectivity change in work serializer " "chand=%p: processing connectivity change in work serializer "
"for subchannel wrapper %p subchannel %p " "for subchannel wrapper %p subchannel %p "
"(connected_subchannel=%p state=%s): watcher=%p", "watcher=%p",
parent_->parent_->chand_, parent_->parent_.get(), parent_->parent_->chand_, parent_->parent_.get(),
parent_->parent_->subchannel_, connected_subchannel_.get(), parent_->parent_->subchannel_, parent_->watcher_.get());
ConnectivityStateName(state_), parent_->watcher_.get());
} }
if (ExecCtx::Get() == running_exec_ctx_) { while (true) {
// Running inline grpc_connectivity_state state;
gpr_mu_unlock(parent_->mu()->get()); RefCountedPtr<ConnectedSubchannel> connected_subchannel;
run_inline_ = true; gpr_log(GPR_ERROR, "bout to popping connectivity state change %d",
state);
if (!parent_->PopConnectivityStateChange(&state,
&connected_subchannel)) {
break;
}
gpr_log(GPR_ERROR, "popping connectivity state change %d", state);
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (parent_->watcher_ == nullptr) continue;
parent_->last_seen_state_ = state;
parent_->parent_->MaybeUpdateConnectedSubchannel(
std::move(connected_subchannel));
parent_->watcher_->OnConnectivityStateChange(state);
} }
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (parent_->watcher_ == nullptr) return;
parent_->last_seen_state_ = state_;
parent_->parent_->MaybeUpdateConnectedSubchannel(
std::move(connected_subchannel_));
parent_->watcher_->OnConnectivityStateChange(state_);
delete this; delete this;
} }

@ -371,40 +371,24 @@ class Subchannel::AsyncWatcherNotifierLocked {
Subchannel* subchannel, grpc_connectivity_state state) Subchannel* subchannel, grpc_connectivity_state state)
: subchannel_(subchannel), watcher_(std::move(watcher)) { : subchannel_(subchannel), watcher_(std::move(watcher)) {
gpr_log(GPR_ERROR, "pushing connectivity state change %d", state); gpr_log(GPR_ERROR, "pushing connectivity state change %d", state);
{ RefCountedPtr<ConnectedSubchannel> connected_subchannel;
MutexLock(watcher_->mu()); if (state == GRPC_CHANNEL_READY) {
watcher_->PushConnectivityStateChangeLocked(state); connected_subchannel = subchannel_->connected_subchannel_;
} }
watcher_->PushConnectivityStateChange(state,
std::move(connected_subchannel));
gpr_log(GPR_ERROR, "done pushing");
ExecCtx::Run( ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(&closure_,
&closure_, [](void* arg, grpc_error* /*error*/) {
[](void* arg, grpc_error* /*error*/) { gpr_log(GPR_ERROR, "done conn state change exec");
auto* self = static_cast<AsyncWatcherNotifierLocked*>(arg); auto* self =
while (true) { static_cast<AsyncWatcherNotifierLocked*>(arg);
grpc_connectivity_state state; self->watcher_->OnConnectivityStateChange();
RefCountedPtr<ConnectedSubchannel> connected_subchannel; delete self;
gpr_log(GPR_ERROR, "lock %p", self->watcher_->mu()); },
gpr_mu_lock(self->watcher_->mu()->get()); this, nullptr),
{
if (!self->watcher_->PopConnectivityStateChangeLocked(
&state)) {
gpr_mu_unlock(self->watcher_->mu()->get());
break;
}
gpr_log(GPR_ERROR, "popping connectivity state change %d",
state);
if (state == GRPC_CHANNEL_READY) {
connected_subchannel =
self->subchannel_->connected_subchannel_;
}
}
self->watcher_->OnConnectivityStateChange(
state, std::move(connected_subchannel));
}
delete self;
},
this, nullptr),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }

@ -190,42 +190,49 @@ class Subchannel {
// contain a ref to the connected subchannel. When it changes from // contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its // READY to some other state, the implementation must release its
// ref to the connected subchannel. // ref to the connected subchannel.
virtual void OnConnectivityStateChange( virtual void OnConnectivityStateChange() // NOLINT
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
= 0; = 0;
virtual grpc_pollset_set* interested_parties() = 0; virtual grpc_pollset_set* interested_parties() = 0;
// Enqueues connectivity state change notifications.
// TODO(yashkt): This is currently needed to send the state updates in the // TODO(yashkt): This is currently needed to send the state updates in the
// right order when asynchronously notifying. This will no longer be // right order when asynchronously notifying. This will no longer be
// necessary when we have access to EventManager. Enqueues connectivity // necessary when we have access to EventManager.
// state change notifications. Does NOT void PushConnectivityStateChange(
void PushConnectivityStateChangeLocked(grpc_connectivity_state state) { grpc_connectivity_state state,
connectivity_state_queue_.push_back(state); RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
MutexLock lock(&mu_);
connectivity_state_queue_.push_back(
std::make_pair(state, std::move(connected_subchannel)));
} }
// Dequeues connectivity state change notifications. If the queue is empty, // Dequeues connectivity state change notifications. If the queue is empty,
// it returns false, otherwise returns true and sets \a state to the popped // it returns false, otherwise returns true and sets \a state to the popped
// state change. // state change.
bool PopConnectivityStateChangeLocked(grpc_connectivity_state* state) { bool PopConnectivityStateChange(
grpc_connectivity_state* state,
RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
MutexLock lock(&mu_);
if (connectivity_state_queue_.empty()) { if (connectivity_state_queue_.empty()) {
return false; return false;
} else { } else {
*state = connectivity_state_queue_.front(); *state = connectivity_state_queue_.front().first;
*connected_subchannel =
std::move(connectivity_state_queue_.front().second);
connectivity_state_queue_.pop_front(); connectivity_state_queue_.pop_front();
return true; return true;
} }
} }
Mutex* mu() { return &mu_; }
private: private:
// Keeps track of the updates that the watcher instance must be notified of. // Keeps track of the updates that the watcher instance must be notified of.
// TODO(yashkt): This is currently needed to send the state updates in the // TODO(yashkt): This is currently needed to send the state updates in the
// right order when asynchronously notifying. This will no longer be // right order when asynchronously notifying. This will no longer be
// necessary when we have access to EventManager. // necessary when we have access to EventManager.
std::deque<grpc_connectivity_state> connectivity_state_queue_; std::deque<
std::pair<grpc_connectivity_state, RefCountedPtr<ConnectedSubchannel>>>
connectivity_state_queue_;
Mutex mu_; // protects the queue Mutex mu_; // protects the queue
}; };

Loading…
Cancel
Save