diff --git a/src/core/BUILD b/src/core/BUILD index 0eae2e9edc1..9979f981bc7 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3761,6 +3761,7 @@ grpc_cc_library( "absl/strings:str_format", "absl/synchronization", "absl/types:optional", + "absl/types:span", "absl/types:variant", "upb_lib", "upb_textformat_lib", @@ -4593,6 +4594,7 @@ grpc_cc_library( "closure", "error", "grpc_stateful_session_filter", + "grpc_xds_client", "iomgr_fwd", "json", "json_args", @@ -4600,6 +4602,7 @@ grpc_cc_library( "lb_policy", "lb_policy_factory", "lb_policy_registry", + "match", "pollset_set", "subchannel_interface", "validation_errors", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index 09bf0998afa..c3b5f09293a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -18,18 +18,23 @@ #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h" +#include + +#include #include #include #include #include +#include #include -#include +#include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "absl/types/variant.h" @@ -41,11 +46,13 @@ #include "src/core/ext/filters/client_channel/lb_call_state_internal.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" +#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -70,6 +77,31 @@ namespace grpc_core { TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb"); namespace { +template +struct PtrLessThan { + using is_transparent = void; + + bool operator()(const std::unique_ptr& v1, + const std::unique_ptr& v2) const { + return v1 < v2; + } + bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; } + bool operator()(const Value* v1, const std::unique_ptr& v2) const { + return v1 < v2.get(); + } + bool operator()(const std::unique_ptr& v1, const Value* v2) const { + return v1.get() < v2; + } +}; + +XdsHealthStatus GetAddressHealthStatus(const ServerAddress& address) { + auto attribute = address.GetAttribute(XdsEndpointHealthStatusAttribute::kKey); + if (attribute == nullptr) { + return XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown); + } + return static_cast(attribute) + ->status(); +} // // xds_override_host LB policy @@ -91,7 +123,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { public: SubchannelWrapper(RefCountedPtr subchannel, RefCountedPtr policy, - absl::optional key); + absl::string_view key); ~SubchannelWrapper() override; @@ -110,31 +142,32 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { private: class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface { public: - ConnectivityStateWatcher( - std::unique_ptr delegate, - RefCountedPtr subchannel) - : delegate_(std::move(delegate)), subchannel_(subchannel) {} + explicit ConnectivityStateWatcher( + WeakRefCountedPtr subchannel) + : subchannel_(std::move(subchannel)) {} void OnConnectivityStateChange(grpc_connectivity_state state, - absl::Status status) override { - delegate_->OnConnectivityStateChange(state, status); - subchannel_->connectivity_state_ = state; - } + absl::Status status) override; - grpc_pollset_set* interested_parties() override { - return delegate_->interested_parties(); - } + grpc_pollset_set* interested_parties() override; private: - std::unique_ptr delegate_; - RefCountedPtr subchannel_; + WeakRefCountedPtr subchannel_; }; - const absl::optional key_; + void Orphan() override; + + void UpdateConnectivityState(grpc_connectivity_state state, + absl::Status status); + + ConnectivityStateWatcher* watcher_; + absl::optional key_; RefCountedPtr policy_; - std::atomic connectivity_state_{GRPC_CHANNEL_IDLE}; - std::map + std::set, + PtrLessThan> watchers_; + std::atomic connectivity_state_ = { + GRPC_CHANNEL_IDLE}; }; // A picker that wraps the picker from the child for cases when cookie is @@ -142,7 +175,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: Picker(RefCountedPtr xds_override_host_lb, - RefCountedPtr picker); + RefCountedPtr picker, + XdsHealthStatusSet override_host_health_status_set); PickResult Pick(PickArgs args) override; @@ -159,11 +193,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { } private: - XdsOverrideHostLb* policy() { return subchannel_->policy(); } - static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); - self->policy()->work_serializer()->Run( + self->subchannel_->policy()->work_serializer()->Run( [self]() { self->subchannel_->RequestConnection(); delete self; @@ -180,6 +212,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { RefCountedPtr policy_; RefCountedPtr picker_; + XdsHealthStatusSet override_host_health_status_set_; }; class Helper : public ChannelControlHelper { @@ -207,25 +240,49 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { class SubchannelEntry { public: + explicit SubchannelEntry(XdsHealthStatus eds_health_status) + : eds_health_status_(eds_health_status) {} + void SetSubchannel(SubchannelWrapper* subchannel) { - subchannel_ = subchannel; + if (eds_health_status_.status() == XdsHealthStatus::kDraining) { + subchannel_ = subchannel->Ref(); + } else { + subchannel_ = subchannel; + } } - void ResetSubchannel(SubchannelWrapper* expected) { - if (subchannel_ == expected) { - subchannel_ = nullptr; - } + void UnsetSubchannel() { subchannel_ = nullptr; } + + SubchannelWrapper* GetSubchannel() const { + return Match( + subchannel_, + [](XdsOverrideHostLb::SubchannelWrapper* subchannel) { + return subchannel; + }, + [](RefCountedPtr subchannel) { + return subchannel.get(); + }); } - RefCountedPtr GetSubchannel() { - if (subchannel_ == nullptr) { - return nullptr; + void SetEdsHealthStatus(XdsHealthStatus eds_health_status) { + eds_health_status_ = eds_health_status; + auto subchannel = GetSubchannel(); + if (subchannel == nullptr) { + return; + } + if (eds_health_status_.status() == XdsHealthStatus::kDraining) { + subchannel_ = subchannel->Ref(); + } else { + subchannel_ = subchannel; } - return subchannel_->Ref(); } + XdsHealthStatus eds_health_status() const { return eds_health_status_; } + private: - SubchannelWrapper* subchannel_ = nullptr; + absl::variant> + subchannel_; + XdsHealthStatus eds_health_status_; }; ~XdsOverrideHostLb() override; @@ -237,17 +294,21 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void MaybeUpdatePickerLocked(); - RefCountedPtr LookupSubchannel(absl::string_view address); - - void UpdateAddressMap(const absl::StatusOr& addresses); + absl::StatusOr UpdateAddressMap( + absl::StatusOr addresses); RefCountedPtr AdoptSubchannel( ServerAddress address, RefCountedPtr subchannel); - void ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); + void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); RefCountedPtr GetSubchannelByAddress( - absl::string_view address); + absl::string_view address, XdsHealthStatusSet overriden_health_statuses); + + void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key) + ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker + // serializer and does not require + // additional synchronization // Current config from the resolver. RefCountedPtr config_; @@ -272,8 +333,11 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { XdsOverrideHostLb::Picker::Picker( RefCountedPtr xds_override_host_lb, - RefCountedPtr picker) - : policy_(std::move(xds_override_host_lb)), picker_(std::move(picker)) { + RefCountedPtr picker, + XdsHealthStatusSet override_host_health_status_set) + : policy_(std::move(xds_override_host_lb)), + picker_(std::move(picker)), + override_host_health_status_set_(override_host_health_status_set) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p", policy_.get(), this); @@ -285,7 +349,8 @@ XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) { if (override_host.length() == 0) { return absl::nullopt; } - auto subchannel = policy_->GetSubchannelByAddress(override_host); + auto subchannel = policy_->GetSubchannelByAddress( + override_host, override_host_health_status_set_); if (subchannel == nullptr) { return absl::nullopt; } @@ -348,6 +413,10 @@ void XdsOverrideHostLb::ShutdownLocked() { gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this); } shutting_down_ = true; + { + MutexLock lock(&subchannel_map_mu_); + subchannel_map_.clear(); + } // Remove the child policy's interested_parties pollset_set from the // xDS policy. if (child_policy_ != nullptr) { @@ -382,10 +451,9 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args.args); } - UpdateAddressMap(args.addresses); // Update child policy. UpdateArgs update_args; - update_args.addresses = std::move(args.addresses); + update_args.addresses = UpdateAddressMap(std::move(args.addresses)); update_args.resolution_note = std::move(args.resolution_note); update_args.config = config_->child_config(); update_args.args = std::move(args.args); @@ -399,7 +467,8 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { void XdsOverrideHostLb::MaybeUpdatePickerLocked() { if (picker_ != nullptr) { - auto xds_override_host_picker = MakeRefCounted(Ref(), picker_); + auto xds_override_host_picker = MakeRefCounted( + Ref(), picker_, config_->override_host_status_set()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] updating connectivity: state=%s " @@ -435,71 +504,100 @@ OrphanablePtr XdsOverrideHostLb::CreateChildPolicyLocked( return lb_policy; } -void XdsOverrideHostLb::UpdateAddressMap( - const absl::StatusOr& addresses) { - std::unordered_set keys(addresses->size()); - if (addresses.ok()) { - for (const auto& address : *addresses) { - auto key = grpc_sockaddr_to_string(&address.address(), false); - if (key.ok()) { - keys.insert(std::move(*key)); - } - } +absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( + absl::StatusOr addresses) { + if (!addresses.ok()) { + return addresses; } - MutexLock lock(&subchannel_map_mu_); - for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { - if (keys.find(it->first) == keys.end()) { - it = subchannel_map_.erase(it); - } else { - ++it; + ServerAddressList return_value; + std::map addresses_for_map; + for (const auto& address : *addresses) { + XdsHealthStatus status = GetAddressHealthStatus(address); + if (status.status() != XdsHealthStatus::kDraining) { + return_value.push_back(address); + } else if (!config_->override_host_status_set().Contains(status)) { + // Skip draining hosts if not in the override status set. + continue; + } + auto key = grpc_sockaddr_to_string(&address.address(), false); + if (key.ok()) { + addresses_for_map.emplace(std::move(*key), status); } } - for (const auto& key : keys) { - if (subchannel_map_.find(key) == subchannel_map_.end()) { - subchannel_map_.emplace(key, SubchannelEntry()); + { + MutexLock lock(&subchannel_map_mu_); + for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { + if (addresses_for_map.find(it->first) == addresses_for_map.end()) { + it = subchannel_map_.erase(it); + } else { + ++it; + } + } + for (const auto& key_status : addresses_for_map) { + auto it = subchannel_map_.find(key_status.first); + if (it == subchannel_map_.end()) { + subchannel_map_.emplace(std::piecewise_construct, + std::forward_as_tuple(key_status.first), + std::forward_as_tuple(key_status.second)); + } else { + it->second.SetEdsHealthStatus(key_status.second); + } } } + return return_value; } RefCountedPtr XdsOverrideHostLb::AdoptSubchannel( ServerAddress address, RefCountedPtr subchannel) { - auto subchannel_key = grpc_sockaddr_to_string(&address.address(), false); - absl::optional key; - if (subchannel_key.ok()) { - key = std::move(*subchannel_key); + auto key = grpc_sockaddr_to_string(&address.address(), false); + if (!key.ok()) { + return subchannel; } auto wrapper = - MakeRefCounted(std::move(subchannel), Ref(), key); - if (key.has_value()) { - MutexLock lock(&subchannel_map_mu_); - auto it = subchannel_map_.find(*key); - if (it != subchannel_map_.end()) { - it->second.SetSubchannel(wrapper.get()); - } + MakeRefCounted(std::move(subchannel), Ref(), *key); + MutexLock lock(&subchannel_map_mu_); + auto it = subchannel_map_.find(*key); + if (it != subchannel_map_.end()) { + it->second.SetSubchannel(wrapper.get()); } return wrapper; } -void XdsOverrideHostLb::ResetSubchannel(absl::string_view key, +void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel) { MutexLock lock(&subchannel_map_mu_); auto it = subchannel_map_.find(key); if (it != subchannel_map_.end()) { - it->second.ResetSubchannel(subchannel); + if (subchannel == it->second.GetSubchannel()) { + it->second.UnsetSubchannel(); + } } } RefCountedPtr -XdsOverrideHostLb::GetSubchannelByAddress(absl::string_view address) { +XdsOverrideHostLb::GetSubchannelByAddress( + absl::string_view address, XdsHealthStatusSet overriden_health_statuses) { MutexLock lock(&subchannel_map_mu_); auto it = subchannel_map_.find(address); - if (it != subchannel_map_.end()) { - return it->second.GetSubchannel(); + if (it != subchannel_map_.end() && + overriden_health_statuses.Contains(it->second.eds_health_status())) { + return it->second.GetSubchannel()->Ref(); } return nullptr; } +void XdsOverrideHostLb::OnSubchannelConnectivityStateChange( + absl::string_view subchannel_key) { + auto it = subchannel_map_.find(subchannel_key); + if (it == subchannel_map_.end()) { + return; + } + if (it->second.eds_health_status().status() == XdsHealthStatus::kDraining) { + MaybeUpdatePickerLocked(); + } +} + // // XdsOverrideHostLb::Helper // @@ -551,37 +649,70 @@ void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity, XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper( RefCountedPtr subchannel, - RefCountedPtr policy, - absl::optional key) + RefCountedPtr policy, absl::string_view key) : DelegatingSubchannel(std::move(subchannel)), - key_(std::move(key)), - policy_(std::move(policy)) {} + key_(key), + policy_(std::move(policy)) { + auto watcher = std::make_unique(WeakRef()); + watcher_ = watcher.get(); + wrapped_subchannel()->WatchConnectivityState(std::move(watcher)); +} XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() { if (key_.has_value()) { - policy_->ResetSubchannel(*key_, this); + policy_->UnsetSubchannel(*key_, this); } } void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState( std::unique_ptr watcher) { - auto watcher_id = watcher.get(); - auto wrapper = - std::make_unique(std::move(watcher), Ref()); - watchers_.emplace(watcher_id, wrapper.get()); - wrapped_subchannel()->WatchConnectivityState(std::move(wrapper)); + watchers_.insert(std::move(watcher)); } void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) { - auto original_watcher = watchers_.find(watcher); - if (original_watcher != watchers_.end()) { - wrapped_subchannel()->CancelConnectivityStateWatch( - original_watcher->second); - watchers_.erase(original_watcher); + auto it = watchers_.find(watcher); + if (it != watchers_.end()) { + watchers_.erase(it); + } +} + +void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState( + grpc_connectivity_state state, absl::Status status) { + connectivity_state_.store(state); + // Sending connectivity state notifications to the watchers may cause the set + // of watchers to change, so we can't be iterating over the set of watchers + // while we send the notifications + std::vector watchers(watchers_.size()); + for (const auto& watcher : watchers_) { + watchers.push_back(watcher.get()); + } + for (const auto& watcher : watchers) { + if (watchers_.find(watcher) != watchers_.end()) { + watcher->OnConnectivityStateChange(state, status); + } + } + if (key_.has_value()) { + policy_->OnSubchannelConnectivityStateChange(*key_); } } +void XdsOverrideHostLb::SubchannelWrapper::Orphan() { + key_.reset(); + wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); +} + +grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper:: + ConnectivityStateWatcher::interested_parties() { + return subchannel_->policy_->interested_parties(); +} + +void XdsOverrideHostLb::SubchannelWrapper::ConnectivityStateWatcher:: + OnConnectivityStateChange(grpc_connectivity_state state, + absl::Status status) { + subchannel_->UpdateConnectivityState(state, status); +} + // // factory // @@ -630,20 +761,46 @@ const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader( return kJsonLoader; } -void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&, +void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, + const JsonArgs& args, ValidationErrors* errors) { - ValidationErrors::ScopedField field(errors, ".childPolicy"); - auto it = json.object_value().find("childPolicy"); - if (it == json.object_value().end()) { - errors->AddError("field not present"); - } else { - auto child_policy_config = - CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( - it->second); - if (!child_policy_config.ok()) { - errors->AddError(child_policy_config.status().message()); + { + ValidationErrors::ScopedField field(errors, ".childPolicy"); + auto it = json.object_value().find("childPolicy"); + if (it == json.object_value().end()) { + errors->AddError("field not present"); + } else { + auto child_policy_config = CoreConfiguration::Get() + .lb_policy_registry() + .ParseLoadBalancingConfig(it->second); + if (!child_policy_config.ok()) { + errors->AddError(child_policy_config.status().message()); + } else { + child_config_ = std::move(*child_policy_config); + } + } + } + { + ValidationErrors::ScopedField field(errors, ".overrideHostStatus"); + auto host_status_list = LoadJsonObjectField>( + json.object_value(), args, "overrideHostStatus", errors, + /*required=*/false); + if (host_status_list.has_value()) { + for (size_t i = 0; i < host_status_list->size(); ++i) { + const std::string& host_status = (*host_status_list)[i]; + auto status = XdsHealthStatus::FromString(host_status); + if (!status.has_value()) { + ValidationErrors::ScopedField field(errors, + absl::StrCat("[", i, "]")); + errors->AddError("invalid host status"); + } else { + override_host_status_set_.Add(*status); + } + } } else { - child_config_ = std::move(*child_policy_config); + override_host_status_set_ = XdsHealthStatusSet( + {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), + XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h index 690bf666e7a..5e0d6dbb773 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h @@ -21,6 +21,7 @@ #include "absl/strings/string_view.h" +#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/json/json.h" @@ -49,12 +50,18 @@ class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config { return child_config_; } + XdsHealthStatusSet override_host_status_set() const { + return override_host_status_set_; + } + static const JsonLoaderInterface* JsonLoader(const JsonArgs&); void JsonPostLoad(const Json& json, const JsonArgs&, ValidationErrors* errors); private: RefCountedPtr child_config_; + XdsHealthStatusSet override_host_status_set_; }; + } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_OVERRIDE_HOST_H diff --git a/src/core/ext/xds/xds_health_status.h b/src/core/ext/xds/xds_health_status.h index 43802579cac..c9c27315953 100644 --- a/src/core/ext/xds/xds_health_status.h +++ b/src/core/ext/xds/xds_health_status.h @@ -26,6 +26,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "absl/types/span.h" #include "src/core/lib/resolver/server_address.h" @@ -53,6 +54,32 @@ class XdsHealthStatus { HealthStatus status_; }; +class XdsHealthStatusSet { + public: + XdsHealthStatusSet() = default; + + explicit XdsHealthStatusSet(absl::Span statuses) { + for (XdsHealthStatus status : statuses) { + Add(status); + } + } + + bool operator==(const XdsHealthStatusSet& other) const { + return status_mask_ == other.status_mask_; + } + + void Clear() { status_mask_ = 0; } + + void Add(XdsHealthStatus status) { status_mask_ |= (0x1 << status.status()); } + + bool Contains(XdsHealthStatus status) const { + return status_mask_ & (0x1 << status.status()); + } + + private: + int status_mask_ = 0; +}; + bool operator<(const XdsHealthStatus& hs1, const XdsHealthStatus& hs2); class XdsEndpointHealthStatusAttribute diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index d48fbdb01e2..cff0a04d27d 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -790,6 +790,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::unique_ptr* subchannel_call_tracker = nullptr, SourceLocation location = SourceLocation()) { + EXPECT_NE(picker, nullptr); + if (picker == nullptr) { + return absl::nullopt; + } auto pick_result = DoPick(picker, call_attributes); auto* complete = absl::get_if( &pick_result.result); diff --git a/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc b/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc index e3f7cbe19b3..e4fd323c575 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc @@ -22,6 +22,7 @@ #include "src/core/ext/filters/client_channel/client_channel_service_config.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h" +#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/service_config/service_config.h" @@ -42,13 +43,16 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) { " \"xds_override_host_experimental\":{\n" " \"childPolicy\":[\n" " {\"grpclb\":{}}\n" - " ]\n" + " ],\n" + " \"overrideHostStatus\": [\n" + " \"DRAINING\", \"HEALTHY\", \"UNKNOWN\"" + " ]" " }\n" " }]\n" "}\n"; auto service_config = ServiceConfigImpl::Create(ChannelArgs(), service_config_json); - ASSERT_TRUE(service_config.ok()); + ASSERT_TRUE(service_config.ok()) << service_config.status(); EXPECT_NE(*service_config, nullptr); auto global_config = static_cast( (*service_config) @@ -60,6 +64,12 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) { ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name()); auto override_host_lb_config = static_cast>(lb_config); + EXPECT_EQ(override_host_lb_config->override_host_status_set(), + XdsHealthStatusSet({ + XdsHealthStatus(XdsHealthStatus::HealthStatus::kDraining), + XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), + XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown), + })); ASSERT_NE(override_host_lb_config->child_config(), nullptr); ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); } @@ -93,6 +103,72 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfigWithRR) { ASSERT_EQ(override_host_lb_config->child_config()->name(), "round_robin"); } +TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoDraining) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":[\n" + " {\"grpclb\":{}}\n" + " ],\n" + " \"overrideHostStatus\": [\n" + " \"HEALTHY\", \"UNKNOWN\"" + " ]" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_TRUE(service_config.ok()); + EXPECT_NE(*service_config, nullptr); + auto global_config = static_cast( + (*service_config) + ->GetGlobalParsedConfig( + ClientChannelServiceConfigParser::ParserIndex())); + ASSERT_NE(global_config, nullptr); + auto lb_config = global_config->parsed_lb_config(); + ASSERT_NE(lb_config, nullptr); + ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name()); + auto override_host_lb_config = + static_cast>(lb_config); + EXPECT_EQ(override_host_lb_config->override_host_status_set(), + XdsHealthStatusSet( + {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), + XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)})); + ASSERT_NE(override_host_lb_config->child_config(), nullptr); + ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); +} + +TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoOverrideHostStatuses) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":[\n" + " {\"grpclb\":{}}\n" + " ]" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_TRUE(service_config.ok()); + EXPECT_NE(*service_config, nullptr); + auto global_config = static_cast( + (*service_config)->GetGlobalParsedConfig(0)); + ASSERT_NE(global_config, nullptr); + auto lb_config = global_config->parsed_lb_config(); + ASSERT_NE(lb_config, nullptr); + auto override_host_lb_config = + static_cast>(lb_config); + EXPECT_EQ(override_host_lb_config->override_host_status_set(), + XdsHealthStatusSet( + {XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy), + XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)})); + ASSERT_NE(override_host_lb_config->child_config(), nullptr); + EXPECT_EQ(override_host_lb_config->child_config()->name(), "grpclb"); +} + TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) { const char* service_config_json = "{\n" @@ -151,6 +227,30 @@ TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) { "error:errors validating xds_override_host LB policy config: " "[field:childPolicy error:No known policies in list: ]]")); } + +TEST(XdsOverrideHostConfigParsingTest, UnrecognizedHostStatus) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":[\n" + " {\"grpclb\":{}}\n" + " ],\n" + " \"overrideHostStatus\": [\n" + " \"NOTASTATUS\"" + " ]" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_FALSE(service_config.ok()) << service_config.status(); + EXPECT_EQ(service_config.status(), + absl::InvalidArgumentError( + "errors validating service config: [field:loadBalancingConfig " + "error:errors validating xds_override_host LB policy config: " + "[field:overrideHostStatus[0] error:invalid host status]]")); +} } // namespace } // namespace testing } // namespace grpc_core diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index 92b22777edf..888d1a2d664 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -18,9 +18,13 @@ #include #include +#include #include +#include +#include #include "absl/status/status.h" +#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "gtest/gtest.h" @@ -28,36 +32,40 @@ #include #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" +#include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" +#include "src/core/lib/resolver/server_address.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/util/test_config.h" namespace grpc_core { namespace testing { namespace { - class XdsOverrideHostTest : public LoadBalancingPolicyTest { protected: XdsOverrideHostTest() : policy_(MakeLbPolicy("xds_override_host_experimental")) {} - RefCountedPtr MakeXdsOverrideHostConfig( + static RefCountedPtr MakeXdsOverrideHostConfig( + Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}, std::string child_policy = "round_robin") { Json::Object child_policy_config = {{child_policy, Json::Object()}}; return MakeConfig(Json::Array{Json::Object{ {"xds_override_host_experimental", - Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}}); + Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}, + {"overrideHostStatus", override_host_status}}}}}); } RefCountedPtr - ExpectStartupWithRoundRobin(absl::Span addresses) { + ExpectStartupWithRoundRobin(absl::Span addresses, + RefCountedPtr + config = MakeXdsOverrideHostConfig()) { RefCountedPtr picker; - EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()), - policy_.get()), + EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, config), policy_.get()), absl::OkStatus()); ExpectConnectingUpdate(); for (size_t i = 0; i < addresses.size(); ++i) { @@ -79,6 +87,31 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { return picker; } + ServerAddress MakeAddressWithHealthStatus( + absl::string_view address, XdsHealthStatus::HealthStatus status) { + std::map> + attrs; + attrs.emplace(XdsEndpointHealthStatusAttribute::kKey, + std::make_unique( + XdsHealthStatus(status))); + return {MakeAddress(address), {}, std::move(attrs)}; + } + + void ApplyUpdateWithHealthStatuses( + absl::Span> + addresses_and_statuses, + Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}) { + LoadBalancingPolicy::UpdateArgs update; + update.config = MakeXdsOverrideHostConfig(std::move(override_host_status)); + update.addresses.emplace(); + for (auto address_and_status : addresses_and_statuses) { + update.addresses->push_back(MakeAddressWithHealthStatus( + address_and_status.first, address_and_status.second)); + } + EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus()); + } + OrphanablePtr policy_; }; @@ -213,6 +246,174 @@ TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) { picker = ExpectState(GRPC_CHANNEL_READY); ExpectPickQueued(picker.get(), pick_arg); } + +TEST_F(XdsOverrideHostTest, DrainingState) { + // Send address list to LB policy. + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, + {"UNKNOWN", "HEALTHY", "DRAINING"}); + auto picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + ExpectQueueEmpty(); + // Draining subchannel is returned + std::map pick_arg{ + {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}); + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + // Gone! + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, pick_arg); +} + +TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { + // Send address list to LB policy. + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + auto picker = ExpectStartupWithRoundRobin(kAddresses); + ASSERT_NE(picker, nullptr); + // Check that the host is overridden + std::map pick_arg{ + {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, + {"UNKNOWN", "HEALTHY", "DRAINING"}); + auto subchannel = FindSubchannel(kAddresses[1]); + ASSERT_NE(subchannel, nullptr); + // There are two notifications - one from child policy and one from the parent + // policy due to draining channel update + picker = ExpectState(GRPC_CHANNEL_READY); + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); + picker = ExpectState(GRPC_CHANNEL_READY); + ExpectPickQueued(picker.get(), pick_arg); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + EXPECT_TRUE(subchannel->ConnectionRequested()); + ExpectQueueEmpty(); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_READY); + ExpectPickQueued(picker.get(), pick_arg); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + picker = ExpectState(GRPC_CHANNEL_READY); + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); +} + +TEST_F(XdsOverrideHostTest, DrainingToHealthy) { + // Send address list to LB policy. + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kDraining}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, + {"UNKNOWN", "HEALTHY", "DRAINING"}); + auto picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + ExpectQueueEmpty(); + std::map pick_arg{ + {XdsOverrideHostTypeName(), "127.0.0.1:442"}}; + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, + {"UNKNOWN", "HEALTHY", "DRAINING"}); + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); + EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]); +} + +TEST_F(XdsOverrideHostTest, OverrideHostStatus) { + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, + {"UNKNOWN", "HEALTHY", "DRAINING"}); + auto picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), + kAddresses[0]); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), + kAddresses[1]); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}), + kAddresses[2]); + // UNKNOWN excluded - first chanel does not get overridden + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, + {"HEALTHY", "DRAINING"}); + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, + {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), + kAddresses[1]); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}), + kAddresses[2]); + // HEALTHY excluded - second chanel does not get overridden + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, + {"UNKNOWN", "HEALTHY"}); + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), + kAddresses[0]); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), + kAddresses[1]); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, + {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}); + // DRAINING excluded - third chanel does not get overridden + ApplyUpdateWithHealthStatuses( + {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, + {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, + {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, + {"UNKNOWN", "HEALTHY"}); + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:441"}}), + kAddresses[0]); + EXPECT_EQ(ExpectPickComplete(picker.get(), + {{XdsOverrideHostTypeName(), "127.0.0.1:442"}}), + kAddresses[1]); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, + {{XdsOverrideHostTypeName(), "127.0.0.1:443"}}); +} + } // namespace } // namespace testing } // namespace grpc_core