[health checking] move to generic health watch for dualstack design (#34222)

Rolls forward part of the dualstack changes, mostly from #33427 and a
little bit from #32692, both of which were reverted in #33718.
Specifically:
- For petiole policies, unconditionally start health watch on
subchannels, even if client side health checking is not enabled; in this
case, the health watch will report the subchannel's raw connectivity
state.
- Fix edge cases in health check reporting that occur when a watcher is
started before the initial state is reported.
- When client-side health checking fails, add the subchannel's address
to the RPC failure status message.
- Outlier detection now works only via the health checking watch, not
via the raw connectivity state watch.
- Remove now-unnecessary hack to ensure that outlier detection does not
work for pick_first.
pull/34244/head^2
Mark D. Roth 2 years ago committed by GitHub
parent 0991b86619
commit b7e680ad46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/BUILD
  2. 47
      src/core/ext/filters/client_channel/lb_policy/health_check_client.cc
  3. 5
      src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h
  4. 113
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  5. 7
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h
  6. 14
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 60
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  8. 2
      src/core/ext/filters/client_channel/subchannel.h
  9. 150
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  10. 7
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  11. 15
      test/cpp/end2end/client_lb_end2end_test.cc

@ -4737,6 +4737,7 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
"//:sockaddr_utils",
"//:work_serializer",
],
)
@ -4759,11 +4760,9 @@ grpc_cc_library(
"iomgr_fwd",
"lb_policy",
"subchannel_interface",
"//:channel_arg_names",
"//:debug_location",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:ref_counted_ptr",
"//:server_address",
"//:work_serializer",
@ -4789,7 +4788,6 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"grpc_outlier_detection_header",
"health_check_client",
"iomgr_fwd",
"json",
@ -4967,7 +4965,6 @@ grpc_cc_library(
"time",
"validation_errors",
"//:gpr_platform",
"//:server_address",
],
)
@ -4994,7 +4991,6 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"match",
"pollset_set",
"ref_counted",
"subchannel_interface",

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "upb/base/string_view.h"
@ -44,6 +45,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_stream_client.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/debug/trace.h"
@ -114,7 +116,7 @@ void HealthProducer::HealthChecker::Orphan() {
void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
watchers_.insert(watcher);
watcher->Notify(state_, status_);
if (state_.has_value()) watcher->Notify(*state_, status_);
}
bool HealthProducer::HealthChecker::RemoveWatcherLocked(
@ -128,13 +130,18 @@ void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
if (state == GRPC_CHANNEL_READY) {
// We should already be in CONNECTING, and we don't want to change
// that until we see the initial response on the stream.
GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING);
if (!state_.has_value()) {
state_ = GRPC_CHANNEL_CONNECTING;
status_ = absl::OkStatus();
} else {
GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING);
}
// Start the health watch stream.
StartHealthStreamLocked();
} else {
state_ = state;
status_ = status;
NotifyWatchersLocked(state_, status_);
NotifyWatchersLocked(*state_, status_);
// We're not connected, so stop health checking.
stream_client_.reset();
}
@ -177,12 +184,21 @@ void HealthProducer::HealthChecker::NotifyWatchersLocked(
void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
grpc_connectivity_state state, const absl::Status& status) {
if (state == GRPC_CHANNEL_SHUTDOWN) return;
// Prepend the subchannel's address to the status if needed.
absl::Status use_status;
if (!status.ok()) {
std::string address_str =
grpc_sockaddr_to_uri(&producer_->subchannel_->address())
.value_or("<unknown address type>");
use_status = absl::Status(
status.code(), absl::StrCat(address_str, ": ", status.message()));
}
work_serializer_->Schedule(
[self = Ref(), state, status]() {
[self = Ref(), state, status = std::move(use_status)]() mutable {
MutexLock lock(&self->producer_->mu_);
if (self->stream_client_ != nullptr) {
self->state_ = state;
self->status_ = status;
self->status_ = std::move(status);
for (HealthWatcher* watcher : self->watchers_) {
watcher->Notify(state, self->status_);
}
@ -364,7 +380,7 @@ void HealthProducer::AddWatcher(
grpc_pollset_set_add_pollset_set(interested_parties_,
watcher->interested_parties());
if (!health_check_service_name.has_value()) {
watcher->Notify(state_, status_);
if (state_.has_value()) watcher->Notify(*state_, status_);
non_health_watchers_.insert(watcher);
} else {
auto it =
@ -421,6 +437,13 @@ void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
//
HealthWatcher::~HealthWatcher() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthWatcher %p: unregistering from producer %p "
"(health_check_service_name=\"%s\")",
this, producer_.get(),
health_check_service_name_.value_or("N/A").c_str());
}
if (producer_ != nullptr) {
producer_->RemoveWatcher(this, health_check_service_name_);
}
@ -447,6 +470,13 @@ void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
if (created) producer_->Start(subchannel->Ref());
// Register ourself with the producer.
producer_->AddWatcher(this, health_check_service_name_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"HealthWatcher %p: registered with producer %p (created=%d, "
"health_check_service_name=\"%s\")",
this, producer_.get(), created,
health_check_service_name_.value_or("N/A").c_str());
}
}
void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
@ -472,6 +502,11 @@ MakeHealthCheckWatcher(
health_check_service_name =
args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO,
"creating HealthWatcher -- health_check_service_name=\"%s\"",
health_check_service_name.value_or("N/A").c_str());
}
return std::make_unique<HealthWatcher>(std::move(work_serializer),
std::move(health_check_service_name),
std::move(watcher));

@ -127,7 +127,8 @@ class HealthProducer : public Subchannel::DataProducerInterface {
std::shared_ptr<WorkSerializer> work_serializer_ =
std::make_shared<WorkSerializer>();
grpc_connectivity_state state_ ABSL_GUARDED_BY(&HealthProducer::mu_);
absl::optional<grpc_connectivity_state> state_
ABSL_GUARDED_BY(&HealthProducer::mu_);
absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_);
OrphanablePtr<SubchannelStreamClient> stream_client_
ABSL_GUARDED_BY(&HealthProducer::mu_);
@ -143,7 +144,7 @@ class HealthProducer : public Subchannel::DataProducerInterface {
grpc_pollset_set* interested_parties_;
Mutex mu_;
grpc_connectivity_state state_ ABSL_GUARDED_BY(&mu_);
absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_);
absl::Status status_ ABSL_GUARDED_BY(&mu_);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_
ABSL_GUARDED_BY(&mu_);

@ -50,7 +50,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -125,12 +124,9 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelState> subchannel_state,
RefCountedPtr<SubchannelInterface> subchannel,
bool disable_via_raw_connectivity_watch)
RefCountedPtr<SubchannelInterface> subchannel)
: DelegatingSubchannel(std::move(subchannel)),
subchannel_state_(std::move(subchannel_state)),
disable_via_raw_connectivity_watch_(
disable_via_raw_connectivity_watch) {
subchannel_state_(std::move(subchannel_state)) {
if (subchannel_state_ != nullptr) {
subchannel_state_->AddSubchannel(this);
if (subchannel_state_->ejection_time().has_value()) {
@ -149,12 +145,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Uneject();
void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override;
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override;
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
RefCountedPtr<SubchannelState> subchannel_state() const {
@ -162,11 +152,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
}
private:
// TODO(roth): As a temporary hack, this needs to handle watchers
// stored as both unique_ptr<> and shared_ptr<>, since the former is
// used for raw connectivity state watches and the latter is used
// for health watches. This hack will go away as part of implementing
// dualstack backend support.
class WatcherWrapper
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
@ -176,16 +161,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
bool ejected)
: watcher_(std::move(health_watcher)), ejected_(ejected) {}
WatcherWrapper(std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher,
bool ejected)
: watcher_(std::move(watcher)), ejected_(ejected) {}
void Eject() {
ejected_ = true;
if (last_seen_state_.has_value()) {
watcher()->OnConnectivityStateChange(
watcher_->OnConnectivityStateChange(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError(
"subchannel ejected by outlier detection"));
@ -195,8 +174,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Uneject() {
ejected_ = false;
if (last_seen_state_.has_value()) {
watcher()->OnConnectivityStateChange(*last_seen_state_,
last_seen_status_);
watcher_->OnConnectivityStateChange(*last_seen_state_,
last_seen_status_);
}
}
@ -211,30 +190,16 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
status = absl::UnavailableError(
"subchannel ejected by outlier detection");
}
watcher()->OnConnectivityStateChange(new_state, status);
watcher_->OnConnectivityStateChange(new_state, status);
}
}
grpc_pollset_set* interested_parties() override {
return watcher()->interested_parties();
return watcher_->interested_parties();
}
private:
SubchannelInterface::ConnectivityStateWatcherInterface* watcher() const {
return Match(
watcher_,
[](const std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); },
[](const std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); });
}
absl::variant<std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>,
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>>
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
absl::optional<grpc_connectivity_state> last_seen_state_;
absl::Status last_seen_status_;
@ -242,12 +207,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
};
RefCountedPtr<SubchannelState> subchannel_state_;
const bool disable_via_raw_connectivity_watch_;
bool ejected_ = false;
std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
WatcherWrapper*>
watchers_;
WatcherWrapper* watcher_wrapper_ = nullptr; // For health watching.
WatcherWrapper* watcher_wrapper_ = nullptr;
};
class SubchannelState : public RefCounted<SubchannelState> {
@ -428,50 +389,14 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end();) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject();
}
void OutlierDetectionLb::SubchannelWrapper::Uneject() {
ejected_ = false;
for (auto& watcher : watchers_) {
watcher.second->Uneject();
}
if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject();
}
void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
if (disable_via_raw_connectivity_watch_) {
wrapped_subchannel()->WatchConnectivityState(std::move(watcher));
return;
}
ConnectivityStateWatcherInterface* watcher_ptr = watcher.get();
auto watcher_wrapper =
std::make_unique<WatcherWrapper>(std::move(watcher), ejected_);
watchers_.emplace(watcher_ptr, watcher_wrapper.get());
wrapped_subchannel()->WatchConnectivityState(std::move(watcher_wrapper));
}
void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) {
if (disable_via_raw_connectivity_watch_) {
wrapped_subchannel()->CancelConnectivityStateWatch(watcher);
return;
}
auto it = watchers_.find(watcher);
if (it == watchers_.end()) return;
wrapped_subchannel()->CancelConnectivityStateWatch(it->second);
watchers_.erase(it);
}
void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
std::unique_ptr<DataWatcherInterface> watcher) {
auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
@ -777,22 +702,12 @@ OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
// If the address has the DisableOutlierDetectionAttribute attribute,
// ignore it for raw connectivity state updates.
// TODO(roth): This is a hack to prevent outlier_detection from
// working with pick_first, as per discussion in
// https://github.com/grpc/grpc/issues/32967. Remove this as part of
// implementing dualstack backend support.
const bool disable_via_raw_connectivity_watch =
address.args().GetInt(GRPC_ARG_OUTLIER_DETECTION_DISABLE) == 1;
RefCountedPtr<SubchannelState> subchannel_state;
std::string key = MakeKeyForAddress(address);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] using key %s for subchannel "
"address %s, disable_via_raw_connectivity_watch=%d",
parent(), key.c_str(), address.ToString().c_str(),
disable_via_raw_connectivity_watch);
"[outlier_detection_lb %p] using key %s for subchannel address %s",
parent(), key.c_str(), address.ToString().c_str());
}
if (!key.empty()) {
auto it = parent()->subchannel_state_map_.find(key);
@ -801,10 +716,8 @@ RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
}
}
auto subchannel = MakeRefCounted<SubchannelWrapper>(
subchannel_state,
parent()->channel_control_helper()->CreateSubchannel(std::move(address),
args),
disable_via_raw_connectivity_watch);
subchannel_state, parent()->channel_control_helper()->CreateSubchannel(
std::move(address), args));
if (subchannel_state != nullptr) {
subchannel_state->AddSubchannel(subchannel.get());
}

@ -28,7 +28,6 @@
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/resolver/server_address.h"
namespace grpc_core {
@ -90,12 +89,6 @@ struct OutlierDetectionConfig {
ValidationErrors* errors);
};
// TODO(roth): This is a horrible hack used to disable outlier detection
// when used with the pick_first policy. Remove this as part of
// implementing the dualstack backend design.
#define GRPC_ARG_OUTLIER_DETECTION_DISABLE \
GRPC_ARG_NO_SUBCHANNEL_PREFIX "outlier_detection_disable"
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_OUTLIER_DETECTION_OUTLIER_DETECTION_H

@ -39,7 +39,6 @@
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -396,19 +395,6 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
absl::c_shuffle(*args.addresses, bit_gen_);
}
}
// TODO(roth): This is a hack to disable outlier_detection when used
// with pick_first, for the reasons described in
// https://github.com/grpc/grpc/issues/32967. Remove this when
// implementing the dualstack design.
if (args.addresses.ok()) {
ServerAddressList addresses;
for (const auto& address : *args.addresses) {
addresses.emplace_back(
address.address(),
address.args().Set(GRPC_ARG_OUTLIER_DETECTION_DISABLE, 1));
}
args.addresses = std::move(addresses);
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) {

@ -30,11 +30,9 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -171,8 +169,6 @@ class SubchannelData {
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
nullptr;
SubchannelInterface::DataWatcherInterface* health_watcher_ = nullptr;
// Data updated by the watcher.
absl::optional<grpc_connectivity_state> connectivity_state_;
@ -230,8 +226,6 @@ class SubchannelList : public DualRefCounted<SubchannelListType> {
const char* tracer_;
absl::optional<std::string> health_check_service_name_;
// The list of subchannels.
// We use ManualConstructor here to support SubchannelDataType classes
// that are not copyable.
@ -260,7 +254,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
"status=%s, shutting_down=%d, pending_watcher=%p, health_watcher=%p",
"status=%s, shutting_down=%d, health_watcher=%p",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(),
@ -269,12 +263,10 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
? ConnectivityStateName(*subchannel_data_->connectivity_state_)
: "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str(),
subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_,
subchannel_data_->health_watcher_);
subchannel_list_->shutting_down(), subchannel_data_->health_watcher_);
}
if (!subchannel_list_->shutting_down() &&
(subchannel_data_->pending_watcher_ != nullptr ||
subchannel_data_->health_watcher_ != nullptr)) {
subchannel_data_->health_watcher_ != nullptr) {
absl::optional<grpc_connectivity_state> old_state =
subchannel_data_->connectivity_state_;
subchannel_data_->connectivity_state_ = new_state;
@ -328,46 +320,26 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::
StartConnectivityWatchLocked(const ChannelArgs& args) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(
GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): starting watch "
"(health_check_service_name=\"%s\")",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(),
subchannel_list()->health_check_service_name_.value_or("N/A").c_str());
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): starting watch",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get());
}
GPR_ASSERT(pending_watcher_ == nullptr);
GPR_ASSERT(health_watcher_ == nullptr);
auto watcher = std::make_unique<Watcher>(
this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher"));
if (subchannel_list()->health_check_service_name_.has_value()) {
auto health_watcher = MakeHealthCheckWatcher(
subchannel_list_->work_serializer(), args, std::move(watcher));
health_watcher_ = health_watcher.get();
subchannel_->AddDataWatcher(std::move(health_watcher));
} else {
pending_watcher_ = watcher.get();
subchannel_->WatchConnectivityState(std::move(watcher));
}
auto health_watcher = MakeHealthCheckWatcher(
subchannel_list_->work_serializer(), args, std::move(watcher));
health_watcher_ = health_watcher.get();
subchannel_->AddDataWatcher(std::move(health_watcher));
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::
CancelConnectivityWatchLocked(const char* reason) {
if (pending_watcher_ != nullptr) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): canceling connectivity watch (%s)",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(), reason);
}
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
pending_watcher_ = nullptr;
} else if (health_watcher_ != nullptr) {
if (health_watcher_ != nullptr) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@ -399,10 +371,6 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
: DualRefCounted<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
health_check_service_name_ =
args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
}
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",

@ -221,6 +221,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
channelz::SubchannelNode* channelz_node();
const grpc_resolved_address& address() const { return key_.address(); }
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered ~immediately.
// Subsequent callbacks will be delivered as the subchannel's state

@ -57,8 +57,10 @@
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h"
#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric_internal.h"
#include "src/core/ext/filters/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
@ -111,7 +113,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
~FakeSubchannel() override {
if (orca_watcher_ != nullptr) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
state_->watchers_.erase(orca_watcher_.get());
state_->orca_watchers_.erase(orca_watcher_.get());
}
for (const auto& p : watcher_map_) {
state_->state_tracker_.RemoveWatcher(p.second);
}
}
@ -121,6 +126,11 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Converts between
// SubchannelInterface::ConnectivityStateWatcherInterface and
// ConnectivityStateWatcherInterface.
//
// We support both unique_ptr<> and shared_ptr<>, since raw
// connectivity watches use the latter but health watches use the
// former.
// TODO(roth): Clean this up.
class WatcherWrapper : public AsyncConnectivityStateWatcherInterface {
public:
WatcherWrapper(
@ -132,33 +142,59 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::move(work_serializer)),
watcher_(std::move(watcher)) {}
WatcherWrapper(
std::shared_ptr<WorkSerializer> work_serializer,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher)
: AsyncConnectivityStateWatcherInterface(
std::move(work_serializer)),
watcher_(std::move(watcher)) {}
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
watcher_->OnConnectivityStateChange(new_state, status);
watcher()->OnConnectivityStateChange(new_state, status);
}
private:
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
SubchannelInterface::ConnectivityStateWatcherInterface* watcher()
const {
return Match(
watcher_,
[](const std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); },
[](const std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); });
}
absl::variant<
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>>
watcher_;
};
void WatchConnectivityState(
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) override {
watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
auto* watcher_ptr = watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
work_serializer_, std::move(watcher));
watcher_map_[watcher.get()] = watcher_wrapper.get();
MutexLock lock(&state_->mu_);
watcher_map_[watcher_ptr] = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
}
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override {
ConnectivityStateWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
auto it = watcher_map_.find(watcher);
if (it == watcher_map_.end()) return;
MutexLock lock(&state_->mu_);
state_->state_tracker_.RemoveWatcher(it->second);
watcher_map_.erase(it);
}
@ -168,19 +204,56 @@ class LoadBalancingPolicyTest : public ::testing::Test {
state_->requested_connection_ = true;
}
void AddDataWatcher(
std::unique_ptr<DataWatcherInterface> watcher) override {
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
GPR_ASSERT(orca_watcher_ == nullptr);
orca_watcher_.reset(static_cast<OrcaWatcher*>(watcher.release()));
state_->watchers_.insert(orca_watcher_.get());
auto* w =
static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
if (w->type() == OrcaProducer::Type()) {
GPR_ASSERT(orca_watcher_ == nullptr);
orca_watcher_.reset(static_cast<OrcaWatcher*>(watcher.release()));
state_->orca_watchers_.insert(orca_watcher_.get());
} else if (w->type() == HealthProducer::Type()) {
// TODO(roth): Support health checking in test framework.
// For now, we just hard-code this to the raw connectivity state.
GPR_ASSERT(health_watcher_ == nullptr);
GPR_ASSERT(health_watcher_wrapper_ == nullptr);
health_watcher_.reset(static_cast<HealthWatcher*>(watcher.release()));
auto connectivity_watcher = health_watcher_->TakeWatcher();
auto* connectivity_watcher_ptr = connectivity_watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
work_serializer_, std::move(connectivity_watcher));
health_watcher_wrapper_ = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
gpr_log(GPR_INFO,
"AddDataWatcher(): added HealthWatch=%p "
"connectivity_watcher=%p watcher_wrapper=%p",
health_watcher_.get(), connectivity_watcher_ptr,
health_watcher_wrapper_);
}
}
void CancelDataWatcher(DataWatcherInterface* watcher) override {
void CancelDataWatcher(DataWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
if (orca_watcher_.get() != static_cast<OrcaWatcher*>(watcher)) return;
state_->watchers_.erase(orca_watcher_.get());
orca_watcher_.reset();
auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
if (w->type() == OrcaProducer::Type()) {
if (orca_watcher_.get() != static_cast<OrcaWatcher*>(watcher)) return;
state_->orca_watchers_.erase(orca_watcher_.get());
orca_watcher_.reset();
} else if (w->type() == HealthProducer::Type()) {
if (health_watcher_.get() != static_cast<HealthWatcher*>(watcher)) {
return;
}
gpr_log(GPR_INFO,
"CancelDataWatcher(): cancelling HealthWatch=%p "
"watcher_wrapper=%p",
health_watcher_.get(), health_watcher_wrapper_);
state_->state_tracker_.RemoveWatcher(health_watcher_wrapper_);
health_watcher_wrapper_ = nullptr;
health_watcher_.reset();
}
}
// Don't need this method, so it's a no-op.
@ -191,11 +264,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
WatcherWrapper*>
watcher_map_;
std::unique_ptr<HealthWatcher> health_watcher_;
WatcherWrapper* health_watcher_wrapper_ = nullptr;
std::unique_ptr<OrcaWatcher> orca_watcher_;
};
explicit SubchannelState(absl::string_view address)
: address_(address), state_tracker_("LoadBalancingPolicyTest") {}
SubchannelState(absl::string_view address,
std::shared_ptr<WorkSerializer> work_serializer)
: address_(address),
work_serializer_(std::move(work_serializer)),
state_tracker_("LoadBalancingPolicyTest") {}
const std::string& address() const { return address_; }
@ -252,12 +330,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< "bug in test: " << ConnectivityStateName(state)
<< " must have OK status: " << status;
}
MutexLock lock(&mu_);
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(), state,
location);
}
state_tracker_.SetState(state, status, "set from test");
work_serializer_->Run(
[this, state, status, validate_state_transition, location]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) {
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(),
state, location);
}
state_tracker_.SetState(state, status, "set from test");
},
DEBUG_LOCATION);
}
// Indicates if any of the associated SubchannelInterface objects
@ -277,7 +359,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Sends an OOB backend metric report to all watchers.
void SendOobBackendMetricReport(const BackendMetricData& backend_metrics) {
MutexLock lock(&backend_metric_watcher_mu_);
for (const auto* watcher : watchers_) {
for (const auto* watcher : orca_watchers_) {
watcher->watcher()->OnBackendMetricReport(backend_metrics);
}
}
@ -286,7 +368,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void CheckOobReportingPeriod(Duration expected,
SourceLocation location = SourceLocation()) {
MutexLock lock(&backend_metric_watcher_mu_);
for (const auto* watcher : watchers_) {
for (const auto* watcher : orca_watchers_) {
EXPECT_EQ(watcher->report_interval(), expected)
<< location.file() << ":" << location.line();
}
@ -294,16 +376,15 @@ class LoadBalancingPolicyTest : public ::testing::Test {
private:
const std::string address_;
Mutex mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(&mu_);
std::shared_ptr<WorkSerializer> work_serializer_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_);
Mutex requested_connection_mu_;
bool requested_connection_ ABSL_GUARDED_BY(&requested_connection_mu_) =
false;
Mutex backend_metric_watcher_mu_;
std::set<OrcaWatcher*> watchers_
std::set<OrcaWatcher*> orca_watchers_
ABSL_GUARDED_BY(&backend_metric_watcher_mu_);
};
@ -421,7 +502,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
GPR_ASSERT(address_uri.ok());
it = test_->subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(std::move(*address_uri)))
std::forward_as_tuple(std::move(*address_uri),
work_serializer_))
.first;
}
return it->second.CreateSubchannel(work_serializer_);
@ -932,7 +1014,6 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Expect startup with RR with a set of addresses.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
ExpectConnectingUpdate();
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
@ -940,6 +1021,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
if (i == 0) ExpectConnectingUpdate();
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
@ -1009,7 +1091,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
SubchannelKey key(MakeAddress(address), args);
auto it = subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(address))
std::forward_as_tuple(address, work_serializer_))
.first;
return &it->second;
}

@ -183,8 +183,6 @@ TEST_F(OutlierDetectionTest, Basic) {
absl::Status status = ApplyUpdate(
BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// LB policy should have created a subchannel for the address.
auto* subchannel = FindSubchannel(kAddressUri);
ASSERT_NE(subchannel, nullptr);
@ -193,6 +191,8 @@ TEST_F(OutlierDetectionTest, Basic) {
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// When the subchannel becomes connected, it reports READY.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
@ -253,8 +253,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
// LB policy should have created a subchannel for the first address.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
// When the LB policy receives the subchannel's initial connectivity

@ -2057,7 +2057,8 @@ TEST_F(RoundRobinTest, HealthChecking) {
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: backend unhealthy");
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Clean up.
EnableDefaultHealthCheckService(false);
}
@ -2115,7 +2116,8 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: backend unhealthy");
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(DEBUG_LOCATION, stub2);
@ -2160,7 +2162,8 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: backend unhealthy");
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(DEBUG_LOCATION, stub2);
@ -2868,10 +2871,8 @@ TEST_F(ClientLbAddressTest, Basic) {
// Make sure that the attributes wind up on the subchannels.
std::vector<std::string> expected;
for (const int port : GetServersPorts()) {
expected.emplace_back(absl::StrCat(
ipv6_only_ ? "[::1]:" : "127.0.0.1:", port,
" args={grpc.internal.no_subchannel.outlier_detection_disable=1, "
"test_key=test_value}"));
expected.emplace_back(absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:",
port, " args={test_key=test_value}"));
}
EXPECT_EQ(addresses_seen(), expected);
}

Loading…
Cancel
Save