LB policy API: add status to subchannel connectivity state notifications (#29867)

* ring_hash: don't recreate ring when individual subchannel states change

* client_channel: remove synchronous subchannel connectivity state API

* change subchannel list to automatically start watching all subchannels

* use a separate loop to start watches, so list size is logged correctly

* fix RR to re-resolve on IDLE again

* fix ring_hash to delay promoting new subchannel list

* fix pick_first to wait for all subchannels to report state

* clean up SubchannelList API

* fix unused argument error

* fix another unused argument error

* clang-format

* fix RR to not re-resolve on initial IDLE state

* also don't re-resolve in initial TF state; same for ring_hash

* clang-format

* change RR and PF to initially report CONNECTING, and add second loop to priority policy

* simplify priority logic a bit

* fix grpclb to drop ref to stats object even if the subchannel call is never started

* fix memory leak in ring_hash

* fix tsan failure in grpclb code

* iwyu

* add missing BUILD deps

* update outlier_detection policy

* fix test

* fix pick_first to not report TF prematurely due to subchannel sharing

* add status to SubchannelInterface connectivity state notifications

* fix test to not depend on timing

* fix dumb overloaded method name bug

* fix sanity

* fix include path
pull/29436/head
Mark D. Roth 3 years ago committed by GitHub
parent 35b7d88654
commit 7c8d1b335c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 11
      src/core/ext/filters/client_channel/client_channel.cc
  3. 28
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  4. 14
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 38
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  6. 4
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 16
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  8. 13
      src/core/ext/filters/client_channel/subchannel_interface.h

@ -4118,6 +4118,7 @@ grpc_cc_library(
],
external_deps = [
"absl/container:inlined_vector",
"absl/status",
"absl/types:optional",
],
language = "c++",

@ -30,6 +30,7 @@
#include "absl/container/inlined_vector.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
#include "absl/strings/numbers.h"
@ -656,7 +657,15 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (watcher_ != nullptr) {
watcher_->OnConnectivityStateChange(state_change.state);
// Propagate status only in state TF.
// We specifically want to avoid propagating the status for
// state IDLE that the real subchannel gave us only for the
// purpose of keepalive propagation.
if (state_change.state != GRPC_CHANNEL_TRANSIENT_FAILURE) {
state_change.status = absl::OkStatus();
}
watcher_->OnConnectivityStateChange(state_change.state,
state_change.status);
}
}

@ -176,27 +176,34 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject() {
ejected_ = true;
if (last_seen_state_.has_value() &&
*last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
watcher_->OnConnectivityStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE);
if (last_seen_state_.has_value()) {
watcher_->OnConnectivityStateChange(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError(
"subchannel ejected by outlier detection"));
}
}
void Uneject() {
ejected_ = false;
if (last_seen_state_.has_value() &&
*last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
watcher_->OnConnectivityStateChange(*last_seen_state_);
if (last_seen_state_.has_value()) {
watcher_->OnConnectivityStateChange(*last_seen_state_,
last_seen_status_);
}
}
void OnConnectivityStateChange(
grpc_connectivity_state new_state) override {
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override {
const bool send_update = !last_seen_state_.has_value() || !ejected_;
last_seen_state_ = new_state;
last_seen_status_ = status;
if (send_update) {
watcher_->OnConnectivityStateChange(
ejected_ ? GRPC_CHANNEL_TRANSIENT_FAILURE : new_state);
if (ejected_) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError(
"subchannel ejected by outlier detection");
}
watcher_->OnConnectivityStateChange(new_state, status);
}
}
@ -208,6 +215,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
absl::optional<grpc_connectivity_state> last_seen_state_;
absl::Status last_seen_status_;
bool ejected_;
};

@ -324,8 +324,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
absl::Status status = absl::UnavailableError(
"selected subchannel failed; switching to pending update");
absl::Status status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
p->subchannel_list_
->subchannel(p->subchannel_list_->num_subchannels())
->connectivity_status()
.ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
@ -419,8 +424,9 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status =
absl::UnavailableError("failed to connect to all addresses");
absl::Status status = absl::UnavailableError(
absl::StrCat("failed to connect to all addresses; last error: ",
connectivity_status().ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));

@ -29,6 +29,7 @@
#include <vector>
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
@ -38,8 +39,6 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
@ -60,6 +59,8 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -177,6 +178,11 @@ class RingHash : public LoadBalancingPolicy {
return connectivity_state_.load(std::memory_order_relaxed);
}
absl::Status GetConnectivityStatus() const {
MutexLock lock(&mu_);
return connectivity_status_;
}
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
@ -193,6 +199,9 @@ class RingHash : public LoadBalancingPolicy {
// so we skip any interim stops in CONNECTING.
// Uses an atomic so that it can be accessed outside of the WorkSerializer.
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
mutable Mutex mu_;
absl::Status connectivity_status_ ABSL_GUARDED_BY(&mu_);
};
// A list of subchannels.
@ -458,7 +467,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
return PickResult::Fail(
absl::InternalError("xds ring hash value is not a number"));
absl::InternalError("ring hash value is not a number"));
}
const std::vector<Ring::Entry>& ring = ring_->ring();
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
@ -553,8 +562,9 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
}
}
}
return PickResult::Fail(absl::UnavailableError(
"xds ring hash found a subchannel that is in TRANSIENT_FAILURE state"));
return PickResult::Fail(absl::UnavailableError(absl::StrCat(
"ring hash cannot find a connected subchannel; first failure: ",
ring[first_index].subchannel->GetConnectivityStatus().ToString())));
}
//
@ -719,20 +729,34 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
// picker behavior.
// If the last recorded state was TRANSIENT_FAILURE, ignore the update
// unless the new state is READY.
bool update_status = true;
absl::Status status = connectivity_status();
if (last_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE &&
new_state != GRPC_CHANNEL_READY) {
new_state != GRPC_CHANNEL_READY &&
new_state != GRPC_CHANNEL_TRANSIENT_FAILURE) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
{
MutexLock lock(&mu_);
status = connectivity_status_;
}
update_status = false;
}
// Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state,
new_state);
// Update status seen by picker if needed.
if (update_status) {
MutexLock lock(&mu_);
connectivity_status_ = connectivity_status();
}
// Update last seen state, also used by picker.
connectivity_state_.store(new_state, std::memory_order_relaxed);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete,
absl::UnavailableError("connections to backends failing"));
absl::UnavailableError(absl::StrCat(
"no reachable subchannels; last error: ", status.ToString())));
}
//

@ -435,7 +435,9 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// If it changed, update the policy state.
if (UpdateLogicalConnectivityStateLocked(new_state)) {
subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
absl::UnavailableError("connections to all backends failing"));
absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
connectivity_status().ToString())));
}
}

@ -27,6 +27,7 @@
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
@ -97,6 +98,7 @@ class SubchannelData {
absl::optional<grpc_connectivity_state> connectivity_state() {
return connectivity_state_;
}
absl::Status connectivity_status() { return connectivity_status_; }
// Resets the connection backoff.
void ResetBackoffLocked();
@ -137,7 +139,8 @@ class SubchannelData {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
}
void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override;
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties();
@ -168,6 +171,7 @@ class SubchannelData {
nullptr;
// Data updated by the watcher.
absl::optional<grpc_connectivity_state> connectivity_state_;
absl::Status connectivity_status_;
};
// A list of subchannels.
@ -240,13 +244,14 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(grpc_connectivity_state new_state) {
OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(
GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
"shutting_down=%d, pending_watcher=%p",
"status=%s, shutting_down=%d, pending_watcher=%p",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(),
@ -254,14 +259,15 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
(subchannel_data_->connectivity_state_.has_value()
? ConnectivityStateName(*subchannel_data_->connectivity_state_)
: "N/A"),
ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
subchannel_data_->pending_watcher_);
ConnectivityStateName(new_state), status.ToString().c_str(),
subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_);
}
if (!subchannel_list_->shutting_down() &&
subchannel_data_->pending_watcher_ != nullptr) {
absl::optional<grpc_connectivity_state> old_state =
subchannel_data_->connectivity_state_;
subchannel_data_->connectivity_state_ = new_state;
subchannel_data_->connectivity_status_ = status;
// Call the subclass's ProcessConnectivityChangeLocked() method.
subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state);
}

@ -22,6 +22,8 @@
#include <memory>
#include <utility>
#include "absl/status/status.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -38,11 +40,12 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
public:
virtual ~ConnectivityStateWatcherInterface() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state) = 0;
// Will be invoked whenever the subchannel's connectivity state changes.
// If the new state is TRANSIENT_FAILURE, status indicates the reason
// for the failure. There will be only one invocation of this method
// on a given watcher instance at any given time.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) = 0;
// TODO(roth): Remove this as soon as we move to EventManager-based
// polling.

Loading…
Cancel
Save