From 5222eafdc1aad755cf4cbadd6396d26c6609261a Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 24 Jan 2023 12:03:49 -0800 Subject: [PATCH] Revert "xds_override_host LB: add support for draining state (#31985)" (#32189) This reverts commit e8e9514a11fb593dd606f1cd795a59efda93e16c. --- src/core/BUILD | 2 - .../lb_policy/xds/xds_override_host.cc | 367 +++++------------- .../lb_policy/xds/xds_override_host.h | 7 - src/core/ext/xds/xds_health_status.h | 27 -- .../lb_policy/lb_policy_test_lib.h | 4 - ...xds_override_host_lb_config_parser_test.cc | 104 +---- .../lb_policy/xds_override_host_test.cc | 213 +--------- 7 files changed, 113 insertions(+), 611 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 5d07a63d847..0eae2e9edc1 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4593,7 +4593,6 @@ grpc_cc_library( "closure", "error", "grpc_stateful_session_filter", - "grpc_xds_client", "iomgr_fwd", "json", "json_args", @@ -4601,7 +4600,6 @@ grpc_cc_library( "lb_policy", "lb_policy_factory", "lb_policy_registry", - "match", "pollset_set", "subchannel_interface", "validation_errors", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index c3b5f09293a..09bf0998afa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -18,23 +18,18 @@ #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h" -#include - -#include #include #include #include #include -#include #include -#include +#include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "absl/types/variant.h" @@ -46,13 +41,11 @@ #include "src/core/ext/filters/client_channel/lb_call_state_internal.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" -#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -77,31 +70,6 @@ namespace grpc_core { TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb"); namespace { -template -struct PtrLessThan { - using is_transparent = void; - - bool operator()(const std::unique_ptr& v1, - const std::unique_ptr& v2) const { - return v1 < v2; - } - bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; } - bool operator()(const Value* v1, const std::unique_ptr& v2) const { - return v1 < v2.get(); - } - bool operator()(const std::unique_ptr& v1, const Value* v2) const { - return v1.get() < v2; - } -}; - -XdsHealthStatus GetAddressHealthStatus(const ServerAddress& address) { - auto attribute = address.GetAttribute(XdsEndpointHealthStatusAttribute::kKey); - if (attribute == nullptr) { - return XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown); - } - return static_cast(attribute) - ->status(); -} // // xds_override_host LB policy @@ -123,7 +91,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { public: SubchannelWrapper(RefCountedPtr subchannel, RefCountedPtr policy, - absl::string_view key); + absl::optional key); ~SubchannelWrapper() override; @@ -142,32 +110,31 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { private: class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface { public: - explicit ConnectivityStateWatcher( - WeakRefCountedPtr subchannel) - : subchannel_(std::move(subchannel)) {} + ConnectivityStateWatcher( + std::unique_ptr delegate, + RefCountedPtr subchannel) + : delegate_(std::move(delegate)), subchannel_(subchannel) {} void OnConnectivityStateChange(grpc_connectivity_state state, - absl::Status status) override; + absl::Status status) override { + delegate_->OnConnectivityStateChange(state, status); + subchannel_->connectivity_state_ = state; + } - grpc_pollset_set* interested_parties() override; + grpc_pollset_set* interested_parties() override { + return delegate_->interested_parties(); + } private: - WeakRefCountedPtr subchannel_; + std::unique_ptr delegate_; + RefCountedPtr subchannel_; }; - void Orphan() override; - - void UpdateConnectivityState(grpc_connectivity_state state, - absl::Status status); - - ConnectivityStateWatcher* watcher_; - absl::optional key_; + const absl::optional key_; RefCountedPtr policy_; - std::set, - PtrLessThan> + std::atomic connectivity_state_{GRPC_CHANNEL_IDLE}; + std::map watchers_; - std::atomic connectivity_state_ = { - GRPC_CHANNEL_IDLE}; }; // A picker that wraps the picker from the child for cases when cookie is @@ -175,8 +142,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: Picker(RefCountedPtr xds_override_host_lb, - RefCountedPtr picker, - XdsHealthStatusSet override_host_health_status_set); + RefCountedPtr picker); PickResult Pick(PickArgs args) override; @@ -193,9 +159,11 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { } private: + XdsOverrideHostLb* policy() { return subchannel_->policy(); } + static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); - self->subchannel_->policy()->work_serializer()->Run( + self->policy()->work_serializer()->Run( [self]() { self->subchannel_->RequestConnection(); delete self; @@ -212,7 +180,6 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { RefCountedPtr policy_; RefCountedPtr picker_; - XdsHealthStatusSet override_host_health_status_set_; }; class Helper : public ChannelControlHelper { @@ -240,49 +207,25 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { class SubchannelEntry { public: - explicit SubchannelEntry(XdsHealthStatus eds_health_status) - : eds_health_status_(eds_health_status) {} - void SetSubchannel(SubchannelWrapper* subchannel) { - if (eds_health_status_.status() == XdsHealthStatus::kDraining) { - subchannel_ = subchannel->Ref(); - } else { - subchannel_ = subchannel; - } + subchannel_ = subchannel; } - void UnsetSubchannel() { subchannel_ = nullptr; } - - SubchannelWrapper* GetSubchannel() const { - return Match( - subchannel_, - [](XdsOverrideHostLb::SubchannelWrapper* subchannel) { - return subchannel; - }, - [](RefCountedPtr subchannel) { - return subchannel.get(); - }); + void ResetSubchannel(SubchannelWrapper* expected) { + if (subchannel_ == expected) { + subchannel_ = nullptr; + } } - void SetEdsHealthStatus(XdsHealthStatus eds_health_status) { - eds_health_status_ = eds_health_status; - auto subchannel = GetSubchannel(); - if (subchannel == nullptr) { - return; - } - if (eds_health_status_.status() == XdsHealthStatus::kDraining) { - subchannel_ = subchannel->Ref(); - } else { - subchannel_ = subchannel; + RefCountedPtr GetSubchannel() { + if (subchannel_ == nullptr) { + return nullptr; } + return subchannel_->Ref(); } - XdsHealthStatus eds_health_status() const { return eds_health_status_; } - private: - absl::variant> - subchannel_; - XdsHealthStatus eds_health_status_; + SubchannelWrapper* subchannel_ = nullptr; }; ~XdsOverrideHostLb() override; @@ -294,21 +237,17 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void MaybeUpdatePickerLocked(); - absl::StatusOr UpdateAddressMap( - absl::StatusOr addresses); + RefCountedPtr LookupSubchannel(absl::string_view address); + + void UpdateAddressMap(const absl::StatusOr& addresses); RefCountedPtr AdoptSubchannel( ServerAddress address, RefCountedPtr subchannel); - void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); + void ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); RefCountedPtr GetSubchannelByAddress( - absl::string_view address, XdsHealthStatusSet overriden_health_statuses); - - void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key) - ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker - // serializer and does not require - // additional synchronization + absl::string_view address); // Current config from the resolver. RefCountedPtr config_; @@ -333,11 +272,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { XdsOverrideHostLb::Picker::Picker( RefCountedPtr xds_override_host_lb, - RefCountedPtr picker, - XdsHealthStatusSet override_host_health_status_set) - : policy_(std::move(xds_override_host_lb)), - picker_(std::move(picker)), - override_host_health_status_set_(override_host_health_status_set) { + RefCountedPtr picker) + : policy_(std::move(xds_override_host_lb)), picker_(std::move(picker)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p", policy_.get(), this); @@ -349,8 +285,7 @@ XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) { if (override_host.length() == 0) { return absl::nullopt; } - auto subchannel = policy_->GetSubchannelByAddress( - override_host, override_host_health_status_set_); + auto subchannel = policy_->GetSubchannelByAddress(override_host); if (subchannel == nullptr) { return absl::nullopt; } @@ -413,10 +348,6 @@ void XdsOverrideHostLb::ShutdownLocked() { gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this); } shutting_down_ = true; - { - MutexLock lock(&subchannel_map_mu_); - subchannel_map_.clear(); - } // Remove the child policy's interested_parties pollset_set from the // xDS policy. if (child_policy_ != nullptr) { @@ -451,9 +382,10 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args.args); } + UpdateAddressMap(args.addresses); // Update child policy. UpdateArgs update_args; - update_args.addresses = UpdateAddressMap(std::move(args.addresses)); + update_args.addresses = std::move(args.addresses); update_args.resolution_note = std::move(args.resolution_note); update_args.config = config_->child_config(); update_args.args = std::move(args.args); @@ -467,8 +399,7 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { void XdsOverrideHostLb::MaybeUpdatePickerLocked() { if (picker_ != nullptr) { - auto xds_override_host_picker = MakeRefCounted( - Ref(), picker_, config_->override_host_status_set()); + auto xds_override_host_picker = MakeRefCounted(Ref(), picker_); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] updating connectivity: state=%s " @@ -504,100 +435,71 @@ OrphanablePtr XdsOverrideHostLb::CreateChildPolicyLocked( return lb_policy; } -absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( - absl::StatusOr addresses) { - if (!addresses.ok()) { - return addresses; - } - ServerAddressList return_value; - std::map addresses_for_map; - for (const auto& address : *addresses) { - XdsHealthStatus status = GetAddressHealthStatus(address); - if (status.status() != XdsHealthStatus::kDraining) { - return_value.push_back(address); - } else if (!config_->override_host_status_set().Contains(status)) { - // Skip draining hosts if not in the override status set. - continue; - } - auto key = grpc_sockaddr_to_string(&address.address(), false); - if (key.ok()) { - addresses_for_map.emplace(std::move(*key), status); +void XdsOverrideHostLb::UpdateAddressMap( + const absl::StatusOr& addresses) { + std::unordered_set keys(addresses->size()); + if (addresses.ok()) { + for (const auto& address : *addresses) { + auto key = grpc_sockaddr_to_string(&address.address(), false); + if (key.ok()) { + keys.insert(std::move(*key)); + } } } - { - MutexLock lock(&subchannel_map_mu_); - for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { - if (addresses_for_map.find(it->first) == addresses_for_map.end()) { - it = subchannel_map_.erase(it); - } else { - ++it; - } + MutexLock lock(&subchannel_map_mu_); + for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { + if (keys.find(it->first) == keys.end()) { + it = subchannel_map_.erase(it); + } else { + ++it; } - for (const auto& key_status : addresses_for_map) { - auto it = subchannel_map_.find(key_status.first); - if (it == subchannel_map_.end()) { - subchannel_map_.emplace(std::piecewise_construct, - std::forward_as_tuple(key_status.first), - std::forward_as_tuple(key_status.second)); - } else { - it->second.SetEdsHealthStatus(key_status.second); - } + } + for (const auto& key : keys) { + if (subchannel_map_.find(key) == subchannel_map_.end()) { + subchannel_map_.emplace(key, SubchannelEntry()); } } - return return_value; } RefCountedPtr XdsOverrideHostLb::AdoptSubchannel( ServerAddress address, RefCountedPtr subchannel) { - auto key = grpc_sockaddr_to_string(&address.address(), false); - if (!key.ok()) { - return subchannel; + auto subchannel_key = grpc_sockaddr_to_string(&address.address(), false); + absl::optional key; + if (subchannel_key.ok()) { + key = std::move(*subchannel_key); } auto wrapper = - MakeRefCounted(std::move(subchannel), Ref(), *key); - MutexLock lock(&subchannel_map_mu_); - auto it = subchannel_map_.find(*key); - if (it != subchannel_map_.end()) { - it->second.SetSubchannel(wrapper.get()); + MakeRefCounted(std::move(subchannel), Ref(), key); + if (key.has_value()) { + MutexLock lock(&subchannel_map_mu_); + auto it = subchannel_map_.find(*key); + if (it != subchannel_map_.end()) { + it->second.SetSubchannel(wrapper.get()); + } } return wrapper; } -void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key, +void XdsOverrideHostLb::ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel) { MutexLock lock(&subchannel_map_mu_); auto it = subchannel_map_.find(key); if (it != subchannel_map_.end()) { - if (subchannel == it->second.GetSubchannel()) { - it->second.UnsetSubchannel(); - } + it->second.ResetSubchannel(subchannel); } } RefCountedPtr -XdsOverrideHostLb::GetSubchannelByAddress( - absl::string_view address, XdsHealthStatusSet overriden_health_statuses) { +XdsOverrideHostLb::GetSubchannelByAddress(absl::string_view address) { MutexLock lock(&subchannel_map_mu_); auto it = subchannel_map_.find(address); - if (it != subchannel_map_.end() && - overriden_health_statuses.Contains(it->second.eds_health_status())) { - return it->second.GetSubchannel()->Ref(); + if (it != subchannel_map_.end()) { + return it->second.GetSubchannel(); } return nullptr; } -void XdsOverrideHostLb::OnSubchannelConnectivityStateChange( - absl::string_view subchannel_key) { - auto it = subchannel_map_.find(subchannel_key); - if (it == subchannel_map_.end()) { - return; - } - if (it->second.eds_health_status().status() == XdsHealthStatus::kDraining) { - MaybeUpdatePickerLocked(); - } -} - // // XdsOverrideHostLb::Helper // @@ -649,70 +551,37 @@ void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity, XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper( RefCountedPtr subchannel, - RefCountedPtr policy, absl::string_view key) + RefCountedPtr policy, + absl::optional key) : DelegatingSubchannel(std::move(subchannel)), - key_(key), - policy_(std::move(policy)) { - auto watcher = std::make_unique(WeakRef()); - watcher_ = watcher.get(); - wrapped_subchannel()->WatchConnectivityState(std::move(watcher)); -} + key_(std::move(key)), + policy_(std::move(policy)) {} XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() { if (key_.has_value()) { - policy_->UnsetSubchannel(*key_, this); + policy_->ResetSubchannel(*key_, this); } } void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState( std::unique_ptr watcher) { - watchers_.insert(std::move(watcher)); + auto watcher_id = watcher.get(); + auto wrapper = + std::make_unique(std::move(watcher), Ref()); + watchers_.emplace(watcher_id, wrapper.get()); + wrapped_subchannel()->WatchConnectivityState(std::move(wrapper)); } void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) { - auto it = watchers_.find(watcher); - if (it != watchers_.end()) { - watchers_.erase(it); - } -} - -void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState( - grpc_connectivity_state state, absl::Status status) { - connectivity_state_.store(state); - // Sending connectivity state notifications to the watchers may cause the set - // of watchers to change, so we can't be iterating over the set of watchers - // while we send the notifications - std::vector watchers(watchers_.size()); - for (const auto& watcher : watchers_) { - watchers.push_back(watcher.get()); - } - for (const auto& watcher : watchers) { - if (watchers_.find(watcher) != watchers_.end()) { - watcher->OnConnectivityStateChange(state, status); - } - } - if (key_.has_value()) { - policy_->OnSubchannelConnectivityStateChange(*key_); + auto original_watcher = watchers_.find(watcher); + if (original_watcher != watchers_.end()) { + wrapped_subchannel()->CancelConnectivityStateWatch( + original_watcher->second); + watchers_.erase(original_watcher); } } -void XdsOverrideHostLb::SubchannelWrapper::Orphan() { - key_.reset(); - wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); -} - -grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper:: - ConnectivityStateWatcher::interested_parties() { - return subchannel_->policy_->interested_parties(); -} - -void XdsOverrideHostLb::SubchannelWrapper::ConnectivityStateWatcher:: - OnConnectivityStateChange(grpc_connectivity_state state, - absl::Status status) { - subchannel_->UpdateConnectivityState(state, status); -} - // // factory // @@ -761,46 +630,20 @@ const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader( return kJsonLoader; } -void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, - const JsonArgs& args, +void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&, ValidationErrors* errors) { - { - ValidationErrors::ScopedField field(errors, ".childPolicy"); - auto it = json.object_value().find("childPolicy"); - if (it == json.object_value().end()) { - errors->AddError("field not present"); - } else { - auto child_policy_config = CoreConfiguration::Get() - .lb_policy_registry() - .ParseLoadBalancingConfig(it->second); - if (!child_policy_config.ok()) { - errors->AddError(child_policy_config.status().message()); - } else { - child_config_ = std::move(*child_policy_config); - } - } - } - { - ValidationErrors::ScopedField field(errors, ".overrideHostStatus"); - auto host_status_list = LoadJsonObjectField>( - json.object_value(), args, "overrideHostStatus", errors, - /*required=*/false); - if (host_status_list.has_value()) { - for (size_t i = 0; i < host_status_list->size(); ++i) { - const std::string& host_status = (*host_status_list)[i]; - auto status = XdsHealthStatus::FromString(host_status); - if (!status.has_value()) { - ValidationErrors::ScopedField field(errors, - absl::StrCat("[", i, "]")); - errors->AddError("invalid host status"); - } else { - override_host_status_set_.Add(*status); - } - } + ValidationErrors::ScopedField field(errors, ".childPolicy"); + auto it = json.object_value().find("childPolicy"); + if (it == json.object_value().end()) { + errors->AddError("field not present"); + } else { + auto child_policy_config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + it->second); + if (!child_policy_config.ok()) { + errors->AddError(child_policy_config.status().message()); } else { - override_host_status_set_ = XdsHealthStatusSet( - {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), - XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}); + child_config_ = std::move(*child_policy_config); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h index 5e0d6dbb773..690bf666e7a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h @@ -21,7 +21,6 @@ #include "absl/strings/string_view.h" -#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/json/json.h" @@ -50,18 +49,12 @@ class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config { return child_config_; } - XdsHealthStatusSet override_host_status_set() const { - return override_host_status_set_; - } - static const JsonLoaderInterface* JsonLoader(const JsonArgs&); void JsonPostLoad(const Json& json, const JsonArgs&, ValidationErrors* errors); private: RefCountedPtr child_config_; - XdsHealthStatusSet override_host_status_set_; }; - } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_OVERRIDE_HOST_H diff --git a/src/core/ext/xds/xds_health_status.h b/src/core/ext/xds/xds_health_status.h index c9c27315953..43802579cac 100644 --- a/src/core/ext/xds/xds_health_status.h +++ b/src/core/ext/xds/xds_health_status.h @@ -26,7 +26,6 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include "absl/types/span.h" #include "src/core/lib/resolver/server_address.h" @@ -54,32 +53,6 @@ class XdsHealthStatus { HealthStatus status_; }; -class XdsHealthStatusSet { - public: - XdsHealthStatusSet() = default; - - explicit XdsHealthStatusSet(absl::Span statuses) { - for (XdsHealthStatus status : statuses) { - Add(status); - } - } - - bool operator==(const XdsHealthStatusSet& other) const { - return status_mask_ == other.status_mask_; - } - - void Clear() { status_mask_ = 0; } - - void Add(XdsHealthStatus status) { status_mask_ |= (0x1 << status.status()); } - - bool Contains(XdsHealthStatus status) const { - return status_mask_ & (0x1 << status.status()); - } - - private: - int status_mask_ = 0; -}; - bool operator<(const XdsHealthStatus& hs1, const XdsHealthStatus& hs2); class XdsEndpointHealthStatusAttribute diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index cff0a04d27d..d48fbdb01e2 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -790,10 +790,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::unique_ptr* subchannel_call_tracker = nullptr, SourceLocation location = SourceLocation()) { - EXPECT_NE(picker, nullptr); - if (picker == nullptr) { - return absl::nullopt; - } auto pick_result = DoPick(picker, call_attributes); auto* complete = absl::get_if( &pick_result.result); diff --git a/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc b/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc index e4fd323c575..e3f7cbe19b3 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc @@ -22,7 +22,6 @@ #include "src/core/ext/filters/client_channel/client_channel_service_config.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h" -#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/service_config/service_config.h" @@ -43,16 +42,13 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) { " \"xds_override_host_experimental\":{\n" " \"childPolicy\":[\n" " {\"grpclb\":{}}\n" - " ],\n" - " \"overrideHostStatus\": [\n" - " \"DRAINING\", \"HEALTHY\", \"UNKNOWN\"" - " ]" + " ]\n" " }\n" " }]\n" "}\n"; auto service_config = ServiceConfigImpl::Create(ChannelArgs(), service_config_json); - ASSERT_TRUE(service_config.ok()) << service_config.status(); + ASSERT_TRUE(service_config.ok()); EXPECT_NE(*service_config, nullptr); auto global_config = static_cast( (*service_config) @@ -64,12 +60,6 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) { ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name()); auto override_host_lb_config = static_cast>(lb_config); - EXPECT_EQ(override_host_lb_config->override_host_status_set(), - XdsHealthStatusSet({ - XdsHealthStatus(XdsHealthStatus::HealthStatus::kDraining), - XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), - XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown), - })); ASSERT_NE(override_host_lb_config->child_config(), nullptr); ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); } @@ -103,72 +93,6 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfigWithRR) { ASSERT_EQ(override_host_lb_config->child_config()->name(), "round_robin"); } -TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoDraining) { - const char* service_config_json = - "{\n" - " \"loadBalancingConfig\":[{\n" - " \"xds_override_host_experimental\":{\n" - " \"childPolicy\":[\n" - " {\"grpclb\":{}}\n" - " ],\n" - " \"overrideHostStatus\": [\n" - " \"HEALTHY\", \"UNKNOWN\"" - " ]" - " }\n" - " }]\n" - "}\n"; - auto service_config = - ServiceConfigImpl::Create(ChannelArgs(), service_config_json); - ASSERT_TRUE(service_config.ok()); - EXPECT_NE(*service_config, nullptr); - auto global_config = static_cast( - (*service_config) - ->GetGlobalParsedConfig( - ClientChannelServiceConfigParser::ParserIndex())); - ASSERT_NE(global_config, nullptr); - auto lb_config = global_config->parsed_lb_config(); - ASSERT_NE(lb_config, nullptr); - ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name()); - auto override_host_lb_config = - static_cast>(lb_config); - EXPECT_EQ(override_host_lb_config->override_host_status_set(), - XdsHealthStatusSet( - {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), - XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)})); - ASSERT_NE(override_host_lb_config->child_config(), nullptr); - ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); -} - -TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoOverrideHostStatuses) { - const char* service_config_json = - "{\n" - " \"loadBalancingConfig\":[{\n" - " \"xds_override_host_experimental\":{\n" - " \"childPolicy\":[\n" - " {\"grpclb\":{}}\n" - " ]" - " }\n" - " }]\n" - "}\n"; - auto service_config = - ServiceConfigImpl::Create(ChannelArgs(), service_config_json); - ASSERT_TRUE(service_config.ok()); - EXPECT_NE(*service_config, nullptr); - auto global_config = static_cast( - (*service_config)->GetGlobalParsedConfig(0)); - ASSERT_NE(global_config, nullptr); - auto lb_config = global_config->parsed_lb_config(); - ASSERT_NE(lb_config, nullptr); - auto override_host_lb_config = - static_cast>(lb_config); - EXPECT_EQ(override_host_lb_config->override_host_status_set(), - XdsHealthStatusSet( - {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), - XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)})); - ASSERT_NE(override_host_lb_config->child_config(), nullptr); - EXPECT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); -} - TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) { const char* service_config_json = "{\n" @@ -227,30 +151,6 @@ TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) { "error:errors validating xds_override_host LB policy config: " "[field:childPolicy error:No known policies in list: ]]")); } - -TEST(XdsOverrideHostConfigParsingTest, UnrecognizedHostStatus) { - const char* service_config_json = - "{\n" - " \"loadBalancingConfig\":[{\n" - " \"xds_override_host_experimental\":{\n" - " \"childPolicy\":[\n" - " {\"grpclb\":{}}\n" - " ],\n" - " \"overrideHostStatus\": [\n" - " \"NOTASTATUS\"" - " ]" - " }\n" - " }]\n" - "}\n"; - auto service_config = - ServiceConfigImpl::Create(ChannelArgs(), service_config_json); - ASSERT_FALSE(service_config.ok()) << service_config.status(); - EXPECT_EQ(service_config.status(), - absl::InvalidArgumentError( - "errors validating service config: [field:loadBalancingConfig " - "error:errors validating xds_override_host LB policy config: " - "[field:overrideHostStatus[0] error:invalid host status]]")); -} } // namespace } // namespace testing } // namespace grpc_core diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index 888d1a2d664..92b22777edf 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -18,13 +18,9 @@ #include #include -#include #include -#include -#include #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "gtest/gtest.h" @@ -32,40 +28,36 @@ #include #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" -#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/resolver/server_address.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/util/test_config.h" namespace grpc_core { namespace testing { namespace { + class XdsOverrideHostTest : public LoadBalancingPolicyTest { protected: XdsOverrideHostTest() : policy_(MakeLbPolicy("xds_override_host_experimental")) {} - static RefCountedPtr MakeXdsOverrideHostConfig( - Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}, + RefCountedPtr MakeXdsOverrideHostConfig( std::string child_policy = "round_robin") { Json::Object child_policy_config = {{child_policy, Json::Object()}}; return MakeConfig(Json::Array{Json::Object{ {"xds_override_host_experimental", - Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}, - {"overrideHostStatus", override_host_status}}}}}); + Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}}); } RefCountedPtr - ExpectStartupWithRoundRobin(absl::Span addresses, - RefCountedPtr - config = MakeXdsOverrideHostConfig()) { + ExpectStartupWithRoundRobin(absl::Span addresses) { RefCountedPtr picker; - EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, config), policy_.get()), + EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()), + policy_.get()), absl::OkStatus()); ExpectConnectingUpdate(); for (size_t i = 0; i < addresses.size(); ++i) { @@ -87,31 +79,6 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { return picker; } - ServerAddress MakeAddressWithHealthStatus( - absl::string_view address, XdsHealthStatus::HealthStatus status) { - std::map> - attrs; - attrs.emplace(XdsEndpointHealthStatusAttribute::kKey, - std::make_unique( - XdsHealthStatus(status))); - return {MakeAddress(address), {}, std::move(attrs)}; - } - - void ApplyUpdateWithHealthStatuses( - absl::Span> - addresses_and_statuses, - Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}) { - LoadBalancingPolicy::UpdateArgs update; - update.config = MakeXdsOverrideHostConfig(std::move(override_host_status)); - update.addresses.emplace(); - for (auto address_and_status : addresses_and_statuses) { - update.addresses->push_back(MakeAddressWithHealthStatus( - address_and_status.first, address_and_status.second)); - } - EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus()); - } - OrphanablePtr policy_; }; @@ -246,174 +213,6 @@ TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) { picker = ExpectState(GRPC_CHANNEL_READY); ExpectPickQueued(picker.get(), pick_arg); } - -TEST_F(XdsOverrideHostTest, DrainingState) { - // Send address list to LB policy. - const std::array kAddresses = { - "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, - {"UNKNOWN", "HEALTHY", "DRAINING"}); - auto picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - ExpectQueueEmpty(); - // Draining subchannel is returned - std::map pick_arg{ - {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}); - picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - // Gone! - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, pick_arg); -} - -TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { - // Send address list to LB policy. - const std::array kAddresses = { - "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - auto picker = ExpectStartupWithRoundRobin(kAddresses); - ASSERT_NE(picker, nullptr); - // Check that the host is overridden - std::map pick_arg{ - {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, - {"UNKNOWN", "HEALTHY", "DRAINING"}); - auto subchannel = FindSubchannel(kAddresses[1]); - ASSERT_NE(subchannel, nullptr); - // There are two notifications - one from child policy and one from the parent - // policy due to draining channel update - picker = ExpectState(GRPC_CHANNEL_READY); - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); - picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), pick_arg); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - EXPECT_TRUE(subchannel->ConnectionRequested()); - ExpectQueueEmpty(); - subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), pick_arg); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - subchannel->SetConnectivityState(GRPC_CHANNEL_READY); - picker = ExpectState(GRPC_CHANNEL_READY); - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); -} - -TEST_F(XdsOverrideHostTest, DrainingToHealthy) { - // Send address list to LB policy. - const std::array kAddresses = { - "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, - {"UNKNOWN", "HEALTHY", "DRAINING"}); - auto picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - ExpectQueueEmpty(); - std::map pick_arg{ - {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, - {"UNKNOWN", "HEALTHY", "DRAINING"}); - picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); -} - -TEST_F(XdsOverrideHostTest, OverrideHostStatus) { - const std::array kAddresses = { - "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, - {"UNKNOWN", "HEALTHY", "DRAINING"}); - auto picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}), - kAddresses[2]); - // UNKNOWN excluded - first chanel does not get overridden - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, - {"HEALTHY", "DRAINING"}); - picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}), - kAddresses[2]); - // HEALTHY excluded - second chanel does not get overridden - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, - {"UNKNOWN", "HEALTHY"}); - picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), - kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}); - // DRAINING excluded - third chanel does not get overridden - ApplyUpdateWithHealthStatuses( - {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, - {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, - {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, - {"UNKNOWN", "HEALTHY"}); - picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), - kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}); -} - } // namespace } // namespace testing } // namespace grpc_core