From b7e680ad4620996be56acdfda1afc9e836ffd229 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 5 Sep 2023 10:17:30 -0700 Subject: [PATCH] [health checking] move to generic health watch for dualstack design (#34222) Rolls forward part of the dualstack changes, mostly from #33427 and a little bit from #32692, both of which were reverted in #33718. Specifically: - For petiole policies, unconditionally start health watch on subchannels, even if client side health checking is not enabled; in this case, the health watch will report the subchannel's raw connectivity state. - Fix edge cases in health check reporting that occur when a watcher is started before the initial state is reported. - When client-side health checking fails, add the subchannel's address to the RPC failure status message. - Outlier detection now works only via the health checking watch, not via the raw connectivity state watch. - Remove now-unnecessary hack to ensure that outlier detection does not work for pick_first. --- src/core/BUILD | 6 +- .../lb_policy/health_check_client.cc | 47 +++++- .../lb_policy/health_check_client_internal.h | 5 +- .../outlier_detection/outlier_detection.cc | 113 ++----------- .../outlier_detection/outlier_detection.h | 7 - .../lb_policy/pick_first/pick_first.cc | 14 -- .../lb_policy/subchannel_list.h | 60 ++----- .../ext/filters/client_channel/subchannel.h | 2 + .../lb_policy/lb_policy_test_lib.h | 150 ++++++++++++++---- .../lb_policy/outlier_detection_test.cc | 7 +- test/cpp/end2end/client_lb_end2end_test.cc | 15 +- 11 files changed, 201 insertions(+), 225 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index cb7ee5e9ff7..d996c7a01d9 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4737,6 +4737,7 @@ grpc_cc_library( "//:grpc_trace", "//:orphanable", "//:ref_counted_ptr", + "//:sockaddr_utils", "//:work_serializer", ], ) @@ -4759,11 +4760,9 @@ grpc_cc_library( "iomgr_fwd", "lb_policy", "subchannel_interface", - "//:channel_arg_names", "//:debug_location", "//:gpr", "//:grpc_base", - "//:grpc_client_channel", "//:ref_counted_ptr", "//:server_address", "//:work_serializer", @@ -4789,7 +4788,6 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "grpc_outlier_detection_header", "health_check_client", "iomgr_fwd", "json", @@ -4967,7 +4965,6 @@ grpc_cc_library( "time", "validation_errors", "//:gpr_platform", - "//:server_address", ], ) @@ -4994,7 +4991,6 @@ grpc_cc_library( "lb_policy", "lb_policy_factory", "lb_policy_registry", - "match", "pollset_set", "ref_counted", "subchannel_interface", diff --git a/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc b/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc index d434f1b5287..c33d23d9916 100644 --- a/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc +++ b/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc @@ -28,6 +28,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "upb/base/string_view.h" @@ -44,6 +45,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_stream_client.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/debug/trace.h" @@ -114,7 +116,7 @@ void HealthProducer::HealthChecker::Orphan() { void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) { watchers_.insert(watcher); - watcher->Notify(state_, status_); + if (state_.has_value()) watcher->Notify(*state_, status_); } bool HealthProducer::HealthChecker::RemoveWatcherLocked( @@ -128,13 +130,18 @@ void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked( if (state == GRPC_CHANNEL_READY) { // We should already be in CONNECTING, and we don't want to change // that until we see the initial response on the stream. - GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING); + if (!state_.has_value()) { + state_ = GRPC_CHANNEL_CONNECTING; + status_ = absl::OkStatus(); + } else { + GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING); + } // Start the health watch stream. StartHealthStreamLocked(); } else { state_ = state; status_ = status; - NotifyWatchersLocked(state_, status_); + NotifyWatchersLocked(*state_, status_); // We're not connected, so stop health checking. stream_client_.reset(); } @@ -177,12 +184,21 @@ void HealthProducer::HealthChecker::NotifyWatchersLocked( void HealthProducer::HealthChecker::OnHealthWatchStatusChange( grpc_connectivity_state state, const absl::Status& status) { if (state == GRPC_CHANNEL_SHUTDOWN) return; + // Prepend the subchannel's address to the status if needed. + absl::Status use_status; + if (!status.ok()) { + std::string address_str = + grpc_sockaddr_to_uri(&producer_->subchannel_->address()) + .value_or(""); + use_status = absl::Status( + status.code(), absl::StrCat(address_str, ": ", status.message())); + } work_serializer_->Schedule( - [self = Ref(), state, status]() { + [self = Ref(), state, status = std::move(use_status)]() mutable { MutexLock lock(&self->producer_->mu_); if (self->stream_client_ != nullptr) { self->state_ = state; - self->status_ = status; + self->status_ = std::move(status); for (HealthWatcher* watcher : self->watchers_) { watcher->Notify(state, self->status_); } @@ -364,7 +380,7 @@ void HealthProducer::AddWatcher( grpc_pollset_set_add_pollset_set(interested_parties_, watcher->interested_parties()); if (!health_check_service_name.has_value()) { - watcher->Notify(state_, status_); + if (state_.has_value()) watcher->Notify(*state_, status_); non_health_watchers_.insert(watcher); } else { auto it = @@ -421,6 +437,13 @@ void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state, // HealthWatcher::~HealthWatcher() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { + gpr_log(GPR_INFO, + "HealthWatcher %p: unregistering from producer %p " + "(health_check_service_name=\"%s\")", + this, producer_.get(), + health_check_service_name_.value_or("N/A").c_str()); + } if (producer_ != nullptr) { producer_->RemoveWatcher(this, health_check_service_name_); } @@ -447,6 +470,13 @@ void HealthWatcher::SetSubchannel(Subchannel* subchannel) { if (created) producer_->Start(subchannel->Ref()); // Register ourself with the producer. producer_->AddWatcher(this, health_check_service_name_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { + gpr_log(GPR_INFO, + "HealthWatcher %p: registered with producer %p (created=%d, " + "health_check_service_name=\"%s\")", + this, producer_.get(), created, + health_check_service_name_.value_or("N/A").c_str()); + } } void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) { @@ -472,6 +502,11 @@ MakeHealthCheckWatcher( health_check_service_name = args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME); } + if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { + gpr_log(GPR_INFO, + "creating HealthWatcher -- health_check_service_name=\"%s\"", + health_check_service_name.value_or("N/A").c_str()); + } return std::make_unique(std::move(work_serializer), std::move(health_check_service_name), std::move(watcher)); diff --git a/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h b/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h index d606e42ae87..eee94904ebb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h +++ b/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h @@ -127,7 +127,8 @@ class HealthProducer : public Subchannel::DataProducerInterface { std::shared_ptr work_serializer_ = std::make_shared(); - grpc_connectivity_state state_ ABSL_GUARDED_BY(&HealthProducer::mu_); + absl::optional state_ + ABSL_GUARDED_BY(&HealthProducer::mu_); absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_); OrphanablePtr stream_client_ ABSL_GUARDED_BY(&HealthProducer::mu_); @@ -143,7 +144,7 @@ class HealthProducer : public Subchannel::DataProducerInterface { grpc_pollset_set* interested_parties_; Mutex mu_; - grpc_connectivity_state state_ ABSL_GUARDED_BY(&mu_); + absl::optional state_ ABSL_GUARDED_BY(&mu_); absl::Status status_ ABSL_GUARDED_BY(&mu_); RefCountedPtr connected_subchannel_ ABSL_GUARDED_BY(&mu_); diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 3cdf715b402..8b52dbe8108 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -50,7 +50,6 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -125,12 +124,9 @@ class OutlierDetectionLb : public LoadBalancingPolicy { class SubchannelWrapper : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel_state, - RefCountedPtr subchannel, - bool disable_via_raw_connectivity_watch) + RefCountedPtr subchannel) : DelegatingSubchannel(std::move(subchannel)), - subchannel_state_(std::move(subchannel_state)), - disable_via_raw_connectivity_watch_( - disable_via_raw_connectivity_watch) { + subchannel_state_(std::move(subchannel_state)) { if (subchannel_state_ != nullptr) { subchannel_state_->AddSubchannel(this); if (subchannel_state_->ejection_time().has_value()) { @@ -149,12 +145,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy { void Uneject(); - void WatchConnectivityState( - std::unique_ptr watcher) override; - - void CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface* watcher) override; - void AddDataWatcher(std::unique_ptr watcher) override; RefCountedPtr subchannel_state() const { @@ -162,11 +152,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy { } private: - // TODO(roth): As a temporary hack, this needs to handle watchers - // stored as both unique_ptr<> and shared_ptr<>, since the former is - // used for raw connectivity state watches and the latter is used - // for health watches. This hack will go away as part of implementing - // dualstack backend support. class WatcherWrapper : public SubchannelInterface::ConnectivityStateWatcherInterface { public: @@ -176,16 +161,10 @@ class OutlierDetectionLb : public LoadBalancingPolicy { bool ejected) : watcher_(std::move(health_watcher)), ejected_(ejected) {} - WatcherWrapper(std::unique_ptr< - SubchannelInterface::ConnectivityStateWatcherInterface> - watcher, - bool ejected) - : watcher_(std::move(watcher)), ejected_(ejected) {} - void Eject() { ejected_ = true; if (last_seen_state_.has_value()) { - watcher()->OnConnectivityStateChange( + watcher_->OnConnectivityStateChange( GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError( "subchannel ejected by outlier detection")); @@ -195,8 +174,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy { void Uneject() { ejected_ = false; if (last_seen_state_.has_value()) { - watcher()->OnConnectivityStateChange(*last_seen_state_, - last_seen_status_); + watcher_->OnConnectivityStateChange(*last_seen_state_, + last_seen_status_); } } @@ -211,30 +190,16 @@ class OutlierDetectionLb : public LoadBalancingPolicy { status = absl::UnavailableError( "subchannel ejected by outlier detection"); } - watcher()->OnConnectivityStateChange(new_state, status); + watcher_->OnConnectivityStateChange(new_state, status); } } grpc_pollset_set* interested_parties() override { - return watcher()->interested_parties(); + return watcher_->interested_parties(); } private: - SubchannelInterface::ConnectivityStateWatcherInterface* watcher() const { - return Match( - watcher_, - [](const std::shared_ptr< - SubchannelInterface::ConnectivityStateWatcherInterface>& - watcher) { return watcher.get(); }, - [](const std::unique_ptr< - SubchannelInterface::ConnectivityStateWatcherInterface>& - watcher) { return watcher.get(); }); - } - - absl::variant, - std::unique_ptr< - SubchannelInterface::ConnectivityStateWatcherInterface>> + std::shared_ptr watcher_; absl::optional last_seen_state_; absl::Status last_seen_status_; @@ -242,12 +207,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy { }; RefCountedPtr subchannel_state_; - const bool disable_via_raw_connectivity_watch_; bool ejected_ = false; - std::map - watchers_; - WatcherWrapper* watcher_wrapper_ = nullptr; // For health watching. + WatcherWrapper* watcher_wrapper_ = nullptr; }; class SubchannelState : public RefCounted { @@ -428,50 +389,14 @@ class OutlierDetectionLb : public LoadBalancingPolicy { void OutlierDetectionLb::SubchannelWrapper::Eject() { ejected_ = true; - // Ejecting the subchannel may cause the child policy to cancel the watch, - // so we need to be prepared for the map to be modified while we are - // iterating. - for (auto it = watchers_.begin(); it != watchers_.end();) { - WatcherWrapper* watcher = it->second; - ++it; - watcher->Eject(); - } if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject(); } void OutlierDetectionLb::SubchannelWrapper::Uneject() { ejected_ = false; - for (auto& watcher : watchers_) { - watcher.second->Uneject(); - } if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject(); } -void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState( - std::unique_ptr watcher) { - if (disable_via_raw_connectivity_watch_) { - wrapped_subchannel()->WatchConnectivityState(std::move(watcher)); - return; - } - ConnectivityStateWatcherInterface* watcher_ptr = watcher.get(); - auto watcher_wrapper = - std::make_unique(std::move(watcher), ejected_); - watchers_.emplace(watcher_ptr, watcher_wrapper.get()); - wrapped_subchannel()->WatchConnectivityState(std::move(watcher_wrapper)); -} - -void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface* watcher) { - if (disable_via_raw_connectivity_watch_) { - wrapped_subchannel()->CancelConnectivityStateWatch(watcher); - return; - } - auto it = watchers_.find(watcher); - if (it == watchers_.end()) return; - wrapped_subchannel()->CancelConnectivityStateWatch(it->second); - watchers_.erase(it); -} - void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher( std::unique_ptr watcher) { auto* w = static_cast(watcher.get()); @@ -777,22 +702,12 @@ OrphanablePtr OutlierDetectionLb::CreateChildPolicyLocked( RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel( ServerAddress address, const ChannelArgs& args) { if (parent()->shutting_down_) return nullptr; - // If the address has the DisableOutlierDetectionAttribute attribute, - // ignore it for raw connectivity state updates. - // TODO(roth): This is a hack to prevent outlier_detection from - // working with pick_first, as per discussion in - // https://github.com/grpc/grpc/issues/32967. Remove this as part of - // implementing dualstack backend support. - const bool disable_via_raw_connectivity_watch = - address.args().GetInt(GRPC_ARG_OUTLIER_DETECTION_DISABLE) == 1; RefCountedPtr subchannel_state; std::string key = MakeKeyForAddress(address); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, - "[outlier_detection_lb %p] using key %s for subchannel " - "address %s, disable_via_raw_connectivity_watch=%d", - parent(), key.c_str(), address.ToString().c_str(), - disable_via_raw_connectivity_watch); + "[outlier_detection_lb %p] using key %s for subchannel address %s", + parent(), key.c_str(), address.ToString().c_str()); } if (!key.empty()) { auto it = parent()->subchannel_state_map_.find(key); @@ -801,10 +716,8 @@ RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel( } } auto subchannel = MakeRefCounted( - subchannel_state, - parent()->channel_control_helper()->CreateSubchannel(std::move(address), - args), - disable_via_raw_connectivity_watch); + subchannel_state, parent()->channel_control_helper()->CreateSubchannel( + std::move(address), args)); if (subchannel_state != nullptr) { subchannel_state->AddSubchannel(subchannel.get()); } diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h index c8c7f52afd3..4118e99555c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h @@ -28,7 +28,6 @@ #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" -#include "src/core/lib/resolver/server_address.h" namespace grpc_core { @@ -90,12 +89,6 @@ struct OutlierDetectionConfig { ValidationErrors* errors); }; -// TODO(roth): This is a horrible hack used to disable outlier detection -// when used with the pick_first policy. Remove this as part of -// implementing the dualstack backend design. -#define GRPC_ARG_OUTLIER_DETECTION_DISABLE \ - GRPC_ARG_NO_SUBCHANNEL_PREFIX "outlier_detection_disable" - } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_OUTLIER_DETECTION_OUTLIER_DETECTION_H diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9b934a36819..be41867712a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -39,7 +39,6 @@ #include #include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h" -#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" @@ -396,19 +395,6 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) { absl::c_shuffle(*args.addresses, bit_gen_); } } - // TODO(roth): This is a hack to disable outlier_detection when used - // with pick_first, for the reasons described in - // https://github.com/grpc/grpc/issues/32967. Remove this when - // implementing the dualstack design. - if (args.addresses.ok()) { - ServerAddressList addresses; - for (const auto& address : *args.addresses) { - addresses.emplace_back( - address.address(), - address.args().Set(GRPC_ARG_OUTLIER_DETECTION_DISABLE, 1)); - } - args.addresses = std::move(addresses); - } // If the update contains a resolver error and we have a previous update // that was not a resolver error, keep using the previous addresses. if (!args.addresses.ok() && latest_update_args_.config != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 4acd432d7eb..7e9b4df8648 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -30,11 +30,9 @@ #include "absl/status/status.h" #include "absl/types/optional.h" -#include #include #include -#include "src/core/ext/filters/client_channel/client_channel_internal.h" #include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" @@ -171,8 +169,6 @@ class SubchannelData { // The subchannel. RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. - SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = - nullptr; SubchannelInterface::DataWatcherInterface* health_watcher_ = nullptr; // Data updated by the watcher. absl::optional connectivity_state_; @@ -230,8 +226,6 @@ class SubchannelList : public DualRefCounted { const char* tracer_; - absl::optional health_check_service_name_; - // The list of subchannels. // We use ManualConstructor here to support SubchannelDataType classes // that are not copyable. @@ -260,7 +254,7 @@ void SubchannelData::Watcher:: GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, " - "status=%s, shutting_down=%d, pending_watcher=%p, health_watcher=%p", + "status=%s, shutting_down=%d, health_watcher=%p", subchannel_list_->tracer(), subchannel_list_->policy(), subchannel_list_.get(), subchannel_data_->Index(), subchannel_list_->num_subchannels(), @@ -269,12 +263,10 @@ void SubchannelData::Watcher:: ? ConnectivityStateName(*subchannel_data_->connectivity_state_) : "N/A"), ConnectivityStateName(new_state), status.ToString().c_str(), - subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_, - subchannel_data_->health_watcher_); + subchannel_list_->shutting_down(), subchannel_data_->health_watcher_); } if (!subchannel_list_->shutting_down() && - (subchannel_data_->pending_watcher_ != nullptr || - subchannel_data_->health_watcher_ != nullptr)) { + subchannel_data_->health_watcher_ != nullptr) { absl::optional old_state = subchannel_data_->connectivity_state_; subchannel_data_->connectivity_state_ = new_state; @@ -328,46 +320,26 @@ template void SubchannelData:: StartConnectivityWatchLocked(const ChannelArgs& args) { if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log( - GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): starting watch " - "(health_check_service_name=\"%s\")", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), - subchannel_list()->health_check_service_name_.value_or("N/A").c_str()); + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): starting watch", + subchannel_list_->tracer(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_.get()); } - GPR_ASSERT(pending_watcher_ == nullptr); GPR_ASSERT(health_watcher_ == nullptr); auto watcher = std::make_unique( this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher")); - if (subchannel_list()->health_check_service_name_.has_value()) { - auto health_watcher = MakeHealthCheckWatcher( - subchannel_list_->work_serializer(), args, std::move(watcher)); - health_watcher_ = health_watcher.get(); - subchannel_->AddDataWatcher(std::move(health_watcher)); - } else { - pending_watcher_ = watcher.get(); - subchannel_->WatchConnectivityState(std::move(watcher)); - } + auto health_watcher = MakeHealthCheckWatcher( + subchannel_list_->work_serializer(), args, std::move(watcher)); + health_watcher_ = health_watcher.get(); + subchannel_->AddDataWatcher(std::move(health_watcher)); } template void SubchannelData:: CancelConnectivityWatchLocked(const char* reason) { - if (pending_watcher_ != nullptr) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling connectivity watch (%s)", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), reason); - } - subchannel_->CancelConnectivityStateWatch(pending_watcher_); - pending_watcher_ = nullptr; - } else if (health_watcher_ != nullptr) { + if (health_watcher_ != nullptr) { if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -399,10 +371,6 @@ SubchannelList::SubchannelList( : DualRefCounted(tracer), policy_(policy), tracer_(tracer) { - if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) { - health_check_service_name_ = - args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME); - } if (GPR_UNLIKELY(tracer_ != nullptr)) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 48b8d9cf3e7..79258512647 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -221,6 +221,8 @@ class Subchannel : public DualRefCounted { channelz::SubchannelNode* channelz_node(); + const grpc_resolved_address& address() const { return key_.address(); } + // Starts watching the subchannel's connectivity state. // The first callback to the watcher will be delivered ~immediately. // Subsequent callbacks will be delivered as the subchannel's state diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 1c6f0a5c616..b4d16ff5252 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -57,8 +57,10 @@ #include "src/core/ext/filters/client_channel/client_channel_internal.h" #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" +#include "src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h" #include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h" #include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric_internal.h" +#include "src/core/ext/filters/client_channel/subchannel_interface_internal.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/sockaddr_utils.h" @@ -111,7 +113,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { ~FakeSubchannel() override { if (orca_watcher_ != nullptr) { MutexLock lock(&state_->backend_metric_watcher_mu_); - state_->watchers_.erase(orca_watcher_.get()); + state_->orca_watchers_.erase(orca_watcher_.get()); + } + for (const auto& p : watcher_map_) { + state_->state_tracker_.RemoveWatcher(p.second); } } @@ -121,6 +126,11 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Converts between // SubchannelInterface::ConnectivityStateWatcherInterface and // ConnectivityStateWatcherInterface. + // + // We support both unique_ptr<> and shared_ptr<>, since raw + // connectivity watches use the latter but health watches use the + // former. + // TODO(roth): Clean this up. class WatcherWrapper : public AsyncConnectivityStateWatcherInterface { public: WatcherWrapper( @@ -132,33 +142,59 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::move(work_serializer)), watcher_(std::move(watcher)) {} + WatcherWrapper( + std::shared_ptr work_serializer, + std::shared_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface> + watcher) + : AsyncConnectivityStateWatcherInterface( + std::move(work_serializer)), + watcher_(std::move(watcher)) {} + void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { - watcher_->OnConnectivityStateChange(new_state, status); + watcher()->OnConnectivityStateChange(new_state, status); } private: - std::unique_ptr + SubchannelInterface::ConnectivityStateWatcherInterface* watcher() + const { + return Match( + watcher_, + [](const std::unique_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>& + watcher) { return watcher.get(); }, + [](const std::shared_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>& + watcher) { return watcher.get(); }); + } + + absl::variant< + std::unique_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>, + std::shared_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>> watcher_; }; void WatchConnectivityState( std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface> - watcher) override { + watcher) override + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { + auto* watcher_ptr = watcher.get(); auto watcher_wrapper = MakeOrphanable( work_serializer_, std::move(watcher)); - watcher_map_[watcher.get()] = watcher_wrapper.get(); - MutexLock lock(&state_->mu_); + watcher_map_[watcher_ptr] = watcher_wrapper.get(); state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN, std::move(watcher_wrapper)); } void CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface* watcher) override { + ConnectivityStateWatcherInterface* watcher) override + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { auto it = watcher_map_.find(watcher); if (it == watcher_map_.end()) return; - MutexLock lock(&state_->mu_); state_->state_tracker_.RemoveWatcher(it->second); watcher_map_.erase(it); } @@ -168,19 +204,56 @@ class LoadBalancingPolicyTest : public ::testing::Test { state_->requested_connection_ = true; } - void AddDataWatcher( - std::unique_ptr watcher) override { + void AddDataWatcher(std::unique_ptr watcher) + override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { MutexLock lock(&state_->backend_metric_watcher_mu_); - GPR_ASSERT(orca_watcher_ == nullptr); - orca_watcher_.reset(static_cast(watcher.release())); - state_->watchers_.insert(orca_watcher_.get()); + auto* w = + static_cast(watcher.get()); + if (w->type() == OrcaProducer::Type()) { + GPR_ASSERT(orca_watcher_ == nullptr); + orca_watcher_.reset(static_cast(watcher.release())); + state_->orca_watchers_.insert(orca_watcher_.get()); + } else if (w->type() == HealthProducer::Type()) { + // TODO(roth): Support health checking in test framework. + // For now, we just hard-code this to the raw connectivity state. + GPR_ASSERT(health_watcher_ == nullptr); + GPR_ASSERT(health_watcher_wrapper_ == nullptr); + health_watcher_.reset(static_cast(watcher.release())); + auto connectivity_watcher = health_watcher_->TakeWatcher(); + auto* connectivity_watcher_ptr = connectivity_watcher.get(); + auto watcher_wrapper = MakeOrphanable( + work_serializer_, std::move(connectivity_watcher)); + health_watcher_wrapper_ = watcher_wrapper.get(); + state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN, + std::move(watcher_wrapper)); + gpr_log(GPR_INFO, + "AddDataWatcher(): added HealthWatch=%p " + "connectivity_watcher=%p watcher_wrapper=%p", + health_watcher_.get(), connectivity_watcher_ptr, + health_watcher_wrapper_); + } } - void CancelDataWatcher(DataWatcherInterface* watcher) override { + void CancelDataWatcher(DataWatcherInterface* watcher) override + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { MutexLock lock(&state_->backend_metric_watcher_mu_); - if (orca_watcher_.get() != static_cast(watcher)) return; - state_->watchers_.erase(orca_watcher_.get()); - orca_watcher_.reset(); + auto* w = static_cast(watcher); + if (w->type() == OrcaProducer::Type()) { + if (orca_watcher_.get() != static_cast(watcher)) return; + state_->orca_watchers_.erase(orca_watcher_.get()); + orca_watcher_.reset(); + } else if (w->type() == HealthProducer::Type()) { + if (health_watcher_.get() != static_cast(watcher)) { + return; + } + gpr_log(GPR_INFO, + "CancelDataWatcher(): cancelling HealthWatch=%p " + "watcher_wrapper=%p", + health_watcher_.get(), health_watcher_wrapper_); + state_->state_tracker_.RemoveWatcher(health_watcher_wrapper_); + health_watcher_wrapper_ = nullptr; + health_watcher_.reset(); + } } // Don't need this method, so it's a no-op. @@ -191,11 +264,16 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::map watcher_map_; + std::unique_ptr health_watcher_; + WatcherWrapper* health_watcher_wrapper_ = nullptr; std::unique_ptr orca_watcher_; }; - explicit SubchannelState(absl::string_view address) - : address_(address), state_tracker_("LoadBalancingPolicyTest") {} + SubchannelState(absl::string_view address, + std::shared_ptr work_serializer) + : address_(address), + work_serializer_(std::move(work_serializer)), + state_tracker_("LoadBalancingPolicyTest") {} const std::string& address() const { return address_; } @@ -252,12 +330,16 @@ class LoadBalancingPolicyTest : public ::testing::Test { << "bug in test: " << ConnectivityStateName(state) << " must have OK status: " << status; } - MutexLock lock(&mu_); - if (validate_state_transition) { - AssertValidConnectivityStateTransition(state_tracker_.state(), state, - location); - } - state_tracker_.SetState(state, status, "set from test"); + work_serializer_->Run( + [this, state, status, validate_state_transition, location]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) { + if (validate_state_transition) { + AssertValidConnectivityStateTransition(state_tracker_.state(), + state, location); + } + state_tracker_.SetState(state, status, "set from test"); + }, + DEBUG_LOCATION); } // Indicates if any of the associated SubchannelInterface objects @@ -277,7 +359,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Sends an OOB backend metric report to all watchers. void SendOobBackendMetricReport(const BackendMetricData& backend_metrics) { MutexLock lock(&backend_metric_watcher_mu_); - for (const auto* watcher : watchers_) { + for (const auto* watcher : orca_watchers_) { watcher->watcher()->OnBackendMetricReport(backend_metrics); } } @@ -286,7 +368,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { void CheckOobReportingPeriod(Duration expected, SourceLocation location = SourceLocation()) { MutexLock lock(&backend_metric_watcher_mu_); - for (const auto* watcher : watchers_) { + for (const auto* watcher : orca_watchers_) { EXPECT_EQ(watcher->report_interval(), expected) << location.file() << ":" << location.line(); } @@ -294,16 +376,15 @@ class LoadBalancingPolicyTest : public ::testing::Test { private: const std::string address_; - - Mutex mu_; - ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(&mu_); + std::shared_ptr work_serializer_; + ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_); Mutex requested_connection_mu_; bool requested_connection_ ABSL_GUARDED_BY(&requested_connection_mu_) = false; Mutex backend_metric_watcher_mu_; - std::set watchers_ + std::set orca_watchers_ ABSL_GUARDED_BY(&backend_metric_watcher_mu_); }; @@ -421,7 +502,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { GPR_ASSERT(address_uri.ok()); it = test_->subchannel_pool_ .emplace(std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(std::move(*address_uri))) + std::forward_as_tuple(std::move(*address_uri), + work_serializer_)) .first; } return it->second.CreateSubchannel(work_serializer_); @@ -932,7 +1014,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Expect startup with RR with a set of addresses. RefCountedPtr ExpectRoundRobinStartup( absl::Span addresses) { - ExpectConnectingUpdate(); RefCountedPtr picker; for (size_t i = 0; i < addresses.size(); ++i) { auto* subchannel = FindSubchannel(addresses[i]); @@ -940,6 +1021,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { if (subchannel == nullptr) return nullptr; EXPECT_TRUE(subchannel->ConnectionRequested()); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + if (i == 0) ExpectConnectingUpdate(); subchannel->SetConnectivityState(GRPC_CHANNEL_READY); if (i == 0) { picker = WaitForConnected(); @@ -1009,7 +1091,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { SubchannelKey key(MakeAddress(address), args); auto it = subchannel_pool_ .emplace(std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(address)) + std::forward_as_tuple(address, work_serializer_)) .first; return &it->second; } diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index 38b90e2f30f..ea3c0a477c2 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -183,8 +183,6 @@ TEST_F(OutlierDetectionTest, Basic) { absl::Status status = ApplyUpdate( BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have reported CONNECTING state. - ExpectConnectingUpdate(); // LB policy should have created a subchannel for the address. auto* subchannel = FindSubchannel(kAddressUri); ASSERT_NE(subchannel, nullptr); @@ -193,6 +191,8 @@ TEST_F(OutlierDetectionTest, Basic) { EXPECT_TRUE(subchannel->ConnectionRequested()); // This causes the subchannel to start to connect, so it reports CONNECTING. subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); // When the subchannel becomes connected, it reports READY. subchannel->SetConnectivityState(GRPC_CHANNEL_READY); // The LB policy will report CONNECTING some number of times (doesn't @@ -253,8 +253,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) { .Build()), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for the first address with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. + // LB policy should have created a subchannel for the first address. auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); // When the LB policy receives the subchannel's initial connectivity diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 159572bb863..3dd450b673c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -2057,7 +2057,8 @@ TEST_F(RoundRobinTest, HealthChecking) { EXPECT_TRUE(WaitForChannelNotReady(channel.get())); CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "UNAVAILABLE: backend unhealthy"); + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "backend unhealthy"); // Clean up. EnableDefaultHealthCheckService(false); } @@ -2115,7 +2116,8 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) { EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "UNAVAILABLE: backend unhealthy"); + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "backend unhealthy"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); CheckRpcSendOk(DEBUG_LOCATION, stub2); @@ -2160,7 +2162,8 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) { EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "UNAVAILABLE: backend unhealthy"); + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "backend unhealthy"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); CheckRpcSendOk(DEBUG_LOCATION, stub2); @@ -2868,10 +2871,8 @@ TEST_F(ClientLbAddressTest, Basic) { // Make sure that the attributes wind up on the subchannels. std::vector expected; for (const int port : GetServersPorts()) { - expected.emplace_back(absl::StrCat( - ipv6_only_ ? "[::1]:" : "127.0.0.1:", port, - " args={grpc.internal.no_subchannel.outlier_detection_disable=1, " - "test_key=test_value}")); + expected.emplace_back(absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", + port, " args={test_key=test_value}")); } EXPECT_EQ(addresses_seen(), expected); }