use WorkSerializer for subchannel connectivity state notifications (#28111)

pull/28247/head
Mark D. Roth 3 years ago committed by GitHub
parent 9be7ca5a55
commit cfca3e5419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/core/ext/filters/client_channel/client_channel.cc
  2. 281
      src/core/ext/filters/client_channel/subchannel.cc
  3. 48
      src/core/ext/filters/client_channel/subchannel.h

@ -578,7 +578,8 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
DEBUG_LOCATION);
}
void OnConnectivityStateChange() override {
void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: connectivity change for subchannel wrapper %p "
@ -587,9 +588,9 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
}
Ref().release(); // ref owned by lambda
parent_->chand_->work_serializer_->Run(
[this]()
[this, state, status]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
ApplyUpdateInControlPlaneWorkSerializer();
ApplyUpdateInControlPlaneWorkSerializer(state, status);
Unref();
},
DEBUG_LOCATION);
@ -612,19 +613,20 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
private:
void ApplyUpdateInControlPlaneWorkSerializer()
void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
const absl::Status& status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: processing connectivity change in work serializer "
"for subchannel wrapper %p subchannel %p "
"watcher=%p",
"for subchannel wrapper %p subchannel %p watcher=%p: "
"state=%s status=%s",
parent_->chand_, parent_.get(), parent_->subchannel_.get(),
watcher_.get());
watcher_.get(), ConnectivityStateName(state),
status.ToString().c_str());
}
ConnectivityStateChange state_change = PopConnectivityStateChange();
absl::optional<absl::Cord> keepalive_throttling =
state_change.status.GetPayload(kKeepaliveThrottlingKey);
status.GetPayload(kKeepaliveThrottlingKey);
if (keepalive_throttling.has_value()) {
int new_keepalive_time = -1;
if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
@ -652,8 +654,8 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (watcher_ != nullptr) {
last_seen_state_ = state_change.state;
watcher_->OnConnectivityStateChange(state_change.state);
last_seen_state_ = state;
watcher_->OnConnectivityStateChange(state);
}
}

@ -311,77 +311,54 @@ class Subchannel::ConnectedSubchannelStateWatcher
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
Subchannel* c = subchannel_.get();
MutexLock lock(&c->mu_);
switch (new_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN: {
if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
gpr_log(GPR_INFO,
"subchannel %p %s: Connected subchannel %p has gone into "
"%s. Attempting to reconnect.",
c, c->key_.ToString().c_str(),
c->connected_subchannel_.get(),
ConnectivityStateName(new_state));
}
c->connected_subchannel_.reset();
if (c->channelz_node() != nullptr) {
c->channelz_node()->SetChildSocket(nullptr);
{
MutexLock lock(&c->mu_);
switch (new_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN: {
if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
gpr_log(GPR_INFO,
"subchannel %p %s: Connected subchannel %p has gone into "
"%s. Attempting to reconnect.",
c, c->key_.ToString().c_str(),
c->connected_subchannel_.get(),
ConnectivityStateName(new_state));
}
c->connected_subchannel_.reset();
if (c->channelz_node() != nullptr) {
c->channelz_node()->SetChildSocket(nullptr);
}
// We need to construct our own status if the underlying state was
// shutdown since the accompanying status will be StatusCode::OK
// otherwise.
c->SetConnectivityStateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
new_state == GRPC_CHANNEL_SHUTDOWN
? absl::Status(absl::StatusCode::kUnavailable,
"Subchannel has disconnected.")
: status);
c->backoff_begun_ = false;
c->backoff_.Reset();
}
// We need to construct our own status if the underlying state was
// shutdown since the accompanying status will be StatusCode::OK
// otherwise.
c->SetConnectivityStateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
new_state == GRPC_CHANNEL_SHUTDOWN
? absl::Status(absl::StatusCode::kUnavailable,
"Subchannel has disconnected.")
: status);
c->backoff_begun_ = false;
c->backoff_.Reset();
break;
}
default: {
// In principle, this should never happen. We should not get
// a callback for READY, because that was the state we started
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
c->SetConnectivityStateLocked(new_state, status);
}
break;
}
default: {
// In principle, this should never happen. We should not get
// a callback for READY, because that was the state we started
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
c->SetConnectivityStateLocked(new_state, status);
}
}
// Drain any connectivity state notifications after releasing the mutex.
c->work_serializer_.DrainQueue();
}
WeakRefCountedPtr<Subchannel> subchannel_;
};
// Asynchronously notifies the \a watcher of a change in the connectvity state
// of \a subchannel to the current \a state. Deletes itself when done.
class Subchannel::AsyncWatcherNotifierLocked {
public:
AsyncWatcherNotifierLocked(
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state, const absl::Status& status)
: watcher_(std::move(watcher)) {
watcher_->PushConnectivityStateChange({state, status});
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_INIT(
&closure_,
[](void* arg, grpc_error_handle /*error*/) {
auto* self =
static_cast<AsyncWatcherNotifierLocked*>(arg);
self->watcher_->OnConnectivityStateChange();
delete self;
},
this, nullptr),
GRPC_ERROR_NONE);
}
private:
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_;
grpc_closure closure_;
};
//
// Subchannel::ConnectivityStateWatcherList
//
@ -399,7 +376,13 @@ void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
grpc_connectivity_state state, const absl::Status& status) {
for (const auto& p : watchers_) {
new AsyncWatcherNotifierLocked(p.second, state, status);
auto* watcher = p.second->Ref().release();
subchannel_->work_serializer_.Schedule(
[watcher, state, status]() {
watcher->OnConnectivityStateChange(state, status);
watcher->Unref();
},
DEBUG_LOCATION);
}
}
@ -418,7 +401,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher
health_check_service_name_(std::move(health_check_service_name)),
state_(subchannel_->state_ == GRPC_CHANNEL_READY
? GRPC_CHANNEL_CONNECTING
: subchannel_->state_) {
: subchannel_->state_),
watcher_list_(subchannel_.get()) {
// If the subchannel is already connected, start health checking.
if (subchannel_->state_ == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
}
@ -437,7 +421,15 @@ class Subchannel::HealthWatcherMap::HealthWatcher
grpc_connectivity_state initial_state,
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
if (state_ != initial_state) {
new AsyncWatcherNotifierLocked(watcher, state_, status_);
auto* watcher_ptr = watcher->Ref().release();
auto state = state_;
auto status = status_;
subchannel_->work_serializer_.Schedule(
[watcher_ptr, state, status]() {
watcher_ptr->OnConnectivityStateChange(state, status);
watcher_ptr->Unref();
},
DEBUG_LOCATION);
}
watcher_list_.AddWatcherLocked(std::move(watcher));
}
@ -481,12 +473,17 @@ class Subchannel::HealthWatcherMap::HealthWatcher
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
MutexLock lock(&subchannel_->mu_);
if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
state_ = new_state;
status_ = status;
watcher_list_.NotifyLocked(new_state, status);
{
MutexLock lock(&subchannel_->mu_);
if (new_state != GRPC_CHANNEL_SHUTDOWN &&
health_check_client_ != nullptr) {
state_ = new_state;
status_ = status;
watcher_list_.NotifyLocked(new_state, status);
}
}
// Drain any connectivity state notifications after releasing the mutex.
subchannel_->work_serializer_.DrainQueue();
}
void StartHealthCheckingLocked()
@ -623,21 +620,6 @@ BackOff::Options ParseArgsForBackoffValues(
} // namespace
void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
ConnectivityStateChange state_change) {
MutexLock lock(&mu_);
connectivity_state_queue_.push_back(std::move(state_change));
}
Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
MutexLock lock(&mu_);
GPR_ASSERT(!connectivity_state_queue_.empty());
ConnectivityStateChange state_change = connectivity_state_queue_.front();
connectivity_state_queue_.pop_front();
return state_change;
}
Subchannel::Subchannel(SubchannelKey key,
OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args)
@ -647,6 +629,7 @@ Subchannel::Subchannel(SubchannelKey key,
key_(std::move(key)),
pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)),
watcher_list_(this),
backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
@ -755,21 +738,33 @@ void Subchannel::WatchConnectivityState(
grpc_connectivity_state initial_state,
const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
if (!health_check_service_name.has_value()) {
if (state_ != initial_state) {
new AsyncWatcherNotifierLocked(watcher, state_, status_);
{
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
if (!health_check_service_name.has_value()) {
if (state_ != initial_state) {
auto* watcher_ptr = watcher->Ref().release();
auto state = state_;
auto status = status_;
work_serializer_.Schedule(
[watcher_ptr, state, status]() {
watcher_ptr->OnConnectivityStateChange(state, status);
watcher_ptr->Unref();
},
DEBUG_LOCATION);
}
watcher_list_.AddWatcherLocked(std::move(watcher));
} else {
health_watcher_map_.AddWatcherLocked(
WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state,
*health_check_service_name, std::move(watcher));
}
watcher_list_.AddWatcherLocked(std::move(watcher));
} else {
health_watcher_map_.AddWatcherLocked(
WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state,
*health_check_service_name, std::move(watcher));
}
// Drain any connectivity state notifications after releasing the mutex.
work_serializer_.DrainQueue();
}
void Subchannel::CancelConnectivityStateWatch(
@ -789,20 +784,28 @@ void Subchannel::CancelConnectivityStateWatch(
}
void Subchannel::AttemptToConnect() {
MutexLock lock(&mu_);
MaybeStartConnectingLocked();
{
MutexLock lock(&mu_);
MaybeStartConnectingLocked();
}
// Drain any connectivity state notifications after releasing the mutex.
work_serializer_.DrainQueue();
}
void Subchannel::ResetBackoff() {
MutexLock lock(&mu_);
backoff_.Reset();
if (have_retry_alarm_) {
retry_immediately_ = true;
grpc_timer_cancel(&retry_alarm_);
} else {
backoff_begun_ = false;
MaybeStartConnectingLocked();
{
MutexLock lock(&mu_);
backoff_.Reset();
if (have_retry_alarm_) {
retry_immediately_ = true;
grpc_timer_cancel(&retry_alarm_);
} else {
backoff_begun_ = false;
MaybeStartConnectingLocked();
}
}
// Drain any connectivity state notifications after releasing the mutex.
work_serializer_.DrainQueue();
}
void Subchannel::Orphan() {
@ -812,12 +815,16 @@ void Subchannel::Orphan() {
subchannel_pool_->UnregisterSubchannel(key_, this);
subchannel_pool_.reset();
}
MutexLock lock(&mu_);
GPR_ASSERT(!disconnected_);
disconnected_ = true;
connector_.reset();
connected_subchannel_.reset();
health_watcher_map_.ShutdownLocked();
{
MutexLock lock(&mu_);
GPR_ASSERT(!disconnected_);
disconnected_ = true;
connector_.reset();
connected_subchannel_.reset();
health_watcher_map_.ShutdownLocked();
}
// Drain any connectivity state notifications after releasing the mutex.
work_serializer_.DrainQueue();
}
namespace {
@ -900,27 +907,31 @@ void Subchannel::MaybeStartConnectingLocked() {
void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) {
WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
MutexLock lock(&c->mu_);
c->have_retry_alarm_ = false;
if (c->disconnected_) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
&error, 1);
} else if (c->retry_immediately_) {
c->retry_immediately_ = false;
error = GRPC_ERROR_NONE;
} else {
(void)GRPC_ERROR_REF(error);
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"subchannel %p %s: failed to connect to channel, retrying", c.get(),
c->key_.ToString().c_str());
c->ContinueConnectingLocked();
// Still connecting, keep ref around. Note that this stolen ref won't
// be dropped without first acquiring c->mu_.
c.release();
{
MutexLock lock(&c->mu_);
c->have_retry_alarm_ = false;
if (c->disconnected_) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
&error, 1);
} else if (c->retry_immediately_) {
c->retry_immediately_ = false;
error = GRPC_ERROR_NONE;
} else {
(void)GRPC_ERROR_REF(error);
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"subchannel %p %s: failed to connect to channel, retrying",
c.get(), c->key_.ToString().c_str());
c->ContinueConnectingLocked();
// Still connecting, keep ref around. Note that this stolen ref won't
// be dropped without first acquiring c->mu_.
c.release();
}
GRPC_ERROR_UNREF(error);
}
GRPC_ERROR_UNREF(error);
// Drain any connectivity state notifications after releasing the mutex.
static_cast<Subchannel*>(arg)->work_serializer_.DrainQueue();
}
void Subchannel::ContinueConnectingLocked() {
@ -954,6 +965,8 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
}
}
grpc_channel_args_destroy(delete_channel_args);
// Drain any connectivity state notifications after releasing the mutex.
c->work_serializer_.DrainQueue();
c.reset(DEBUG_LOCATION, "connecting");
}

@ -34,6 +34,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@ -145,43 +146,13 @@ class Subchannel : public DualRefCounted<Subchannel> {
class ConnectivityStateWatcherInterface
: public RefCounted<ConnectivityStateWatcherInterface> {
public:
struct ConnectivityStateChange {
grpc_connectivity_state state;
absl::Status status;
};
~ConnectivityStateWatcherInterface() override = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
// Implementations should call PopConnectivityStateChange to get the next
// connectivity state change.
virtual void OnConnectivityStateChange() = 0;
// Invoked whenever the subchannel's connectivity state changes.
// There will be only one invocation of this method on a given watcher
// instance at any given time.
virtual void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) = 0;
virtual grpc_pollset_set* interested_parties() = 0;
// Enqueues connectivity state change notifications.
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
// TODO(yashkt): This is currently needed to send the state updates in the
// right order when asynchronously notifying. This will no longer be
// necessary when we have access to EventManager.
void PushConnectivityStateChange(ConnectivityStateChange state_change);
// Dequeues connectivity state change notifications.
ConnectivityStateChange PopConnectivityStateChange();
private:
Mutex mu_; // protects the queue
// Keeps track of the updates that the watcher instance must be notified of.
// TODO(yashkt): This is currently needed to send the state updates in the
// right order when asynchronously notifying. This will no longer be
// necessary when we have access to EventManager.
std::deque<ConnectivityStateChange> connectivity_state_queue_
ABSL_GUARDED_BY(&mu_);
};
// Creates a subchannel.
@ -251,6 +222,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
// the subchannel's state.
class ConnectivityStateWatcherList {
public:
explicit ConnectivityStateWatcherList(Subchannel* subchannel)
: subchannel_(subchannel) {}
~ConnectivityStateWatcherList() { Clear(); }
void AddWatcherLocked(
@ -266,6 +239,7 @@ class Subchannel : public DualRefCounted<Subchannel> {
bool empty() const { return watchers_.empty(); }
private:
Subchannel* subchannel_;
// TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
// be a set instead of a map.
std::map<ConnectivityStateWatcherInterface*,
@ -310,8 +284,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
class ConnectedSubchannelStateWatcher;
class AsyncWatcherNotifierLocked;
// Sets the subchannel's connectivity state to \a state.
void SetConnectivityStateLocked(grpc_connectivity_state state,
const absl::Status& status)
@ -360,6 +332,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
// The map of watchers with health check service names.
HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_);
// Used for sending connectivity state notifications.
WorkSerializer work_serializer_;
// Backoff state.
BackOff backoff_ ABSL_GUARDED_BY(mu_);

Loading…
Cancel
Save