xds_override_host LB: add support for draining state (#31985)

Co-authored-by: Mark D. Roth <roth@google.com>
pull/32189/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent 7b02b7c253
commit e8e9514a11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/BUILD
  2. 367
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  3. 7
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h
  4. 27
      src/core/ext/xds/xds_health_status.h
  5. 4
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  6. 104
      test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc
  7. 213
      test/core/client_channel/lb_policy/xds_override_host_test.cc

@ -4593,6 +4593,7 @@ grpc_cc_library(
"closure",
"error",
"grpc_stateful_session_filter",
"grpc_xds_client",
"iomgr_fwd",
"json",
"json_args",
@ -4600,6 +4601,7 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"match",
"pollset_set",
"subchannel_interface",
"validation_errors",

@ -18,18 +18,23 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <tuple>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
@ -41,11 +46,13 @@
#include "src/core/ext/filters/client_channel/lb_call_state_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -70,6 +77,31 @@ namespace grpc_core {
TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
namespace {
template <typename Value>
struct PtrLessThan {
using is_transparent = void;
bool operator()(const std::unique_ptr<Value>& v1,
const std::unique_ptr<Value>& v2) const {
return v1 < v2;
}
bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; }
bool operator()(const Value* v1, const std::unique_ptr<Value>& v2) const {
return v1 < v2.get();
}
bool operator()(const std::unique_ptr<Value>& v1, const Value* v2) const {
return v1.get() < v2;
}
};
XdsHealthStatus GetAddressHealthStatus(const ServerAddress& address) {
auto attribute = address.GetAttribute(XdsEndpointHealthStatusAttribute::kKey);
if (attribute == nullptr) {
return XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown);
}
return static_cast<const XdsEndpointHealthStatusAttribute*>(attribute)
->status();
}
//
// xds_override_host LB policy
@ -91,7 +123,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::optional<const std::string> key);
absl::string_view key);
~SubchannelWrapper() override;
@ -110,31 +142,32 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
private:
class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface {
public:
ConnectivityStateWatcher(
std::unique_ptr<ConnectivityStateWatcherInterface> delegate,
RefCountedPtr<SubchannelWrapper> subchannel)
: delegate_(std::move(delegate)), subchannel_(subchannel) {}
explicit ConnectivityStateWatcher(
WeakRefCountedPtr<SubchannelWrapper> subchannel)
: subchannel_(std::move(subchannel)) {}
void OnConnectivityStateChange(grpc_connectivity_state state,
absl::Status status) override {
delegate_->OnConnectivityStateChange(state, status);
subchannel_->connectivity_state_ = state;
}
absl::Status status) override;
grpc_pollset_set* interested_parties() override {
return delegate_->interested_parties();
}
grpc_pollset_set* interested_parties() override;
private:
std::unique_ptr<ConnectivityStateWatcherInterface> delegate_;
RefCountedPtr<SubchannelWrapper> subchannel_;
WeakRefCountedPtr<SubchannelWrapper> subchannel_;
};
const absl::optional<const std::string> key_;
void Orphan() override;
void UpdateConnectivityState(grpc_connectivity_state state,
absl::Status status);
ConnectivityStateWatcher* watcher_;
absl::optional<std::string> key_;
RefCountedPtr<XdsOverrideHostLb> policy_;
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
std::map<ConnectivityStateWatcherInterface*, ConnectivityStateWatcher*>
std::set<std::unique_ptr<ConnectivityStateWatcherInterface>,
PtrLessThan<ConnectivityStateWatcherInterface>>
watchers_;
std::atomic<grpc_connectivity_state> connectivity_state_ = {
GRPC_CHANNEL_IDLE};
};
// A picker that wraps the picker from the child for cases when cookie is
@ -142,7 +175,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker);
RefCountedPtr<SubchannelPicker> picker,
XdsHealthStatusSet override_host_health_status_set);
PickResult Pick(PickArgs args) override;
@ -159,11 +193,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
}
private:
XdsOverrideHostLb* policy() { return subchannel_->policy(); }
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<SubchannelConnectionRequester*>(arg);
self->policy()->work_serializer()->Run(
self->subchannel_->policy()->work_serializer()->Run(
[self]() {
self->subchannel_->RequestConnection();
delete self;
@ -180,6 +212,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<XdsOverrideHostLb> policy_;
RefCountedPtr<SubchannelPicker> picker_;
XdsHealthStatusSet override_host_health_status_set_;
};
class Helper : public ChannelControlHelper {
@ -207,25 +240,49 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
class SubchannelEntry {
public:
explicit SubchannelEntry(XdsHealthStatus eds_health_status)
: eds_health_status_(eds_health_status) {}
void SetSubchannel(SubchannelWrapper* subchannel) {
subchannel_ = subchannel;
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
}
}
void ResetSubchannel(SubchannelWrapper* expected) {
if (subchannel_ == expected) {
subchannel_ = nullptr;
}
void UnsetSubchannel() { subchannel_ = nullptr; }
SubchannelWrapper* GetSubchannel() const {
return Match(
subchannel_,
[](XdsOverrideHostLb::SubchannelWrapper* subchannel) {
return subchannel;
},
[](RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper> subchannel) {
return subchannel.get();
});
}
RefCountedPtr<SubchannelWrapper> GetSubchannel() {
if (subchannel_ == nullptr) {
return nullptr;
void SetEdsHealthStatus(XdsHealthStatus eds_health_status) {
eds_health_status_ = eds_health_status;
auto subchannel = GetSubchannel();
if (subchannel == nullptr) {
return;
}
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
}
return subchannel_->Ref();
}
XdsHealthStatus eds_health_status() const { return eds_health_status_; }
private:
SubchannelWrapper* subchannel_ = nullptr;
absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
subchannel_;
XdsHealthStatus eds_health_status_;
};
~XdsOverrideHostLb() override;
@ -237,17 +294,21 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
RefCountedPtr<SubchannelWrapper> LookupSubchannel(absl::string_view address);
void UpdateAddressMap(const absl::StatusOr<ServerAddressList>& addresses);
absl::StatusOr<ServerAddressList> UpdateAddressMap(
absl::StatusOr<ServerAddressList> addresses);
RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel);
void ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
RefCountedPtr<SubchannelWrapper> GetSubchannelByAddress(
absl::string_view address);
absl::string_view address, XdsHealthStatusSet overriden_health_statuses);
void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key)
ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker
// serializer and does not require
// additional synchronization
// Current config from the resolver.
RefCountedPtr<XdsOverrideHostLbConfig> config_;
@ -272,8 +333,11 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
XdsOverrideHostLb::Picker::Picker(
RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker)
: policy_(std::move(xds_override_host_lb)), picker_(std::move(picker)) {
RefCountedPtr<SubchannelPicker> picker,
XdsHealthStatusSet override_host_health_status_set)
: policy_(std::move(xds_override_host_lb)),
picker_(std::move(picker)),
override_host_health_status_set_(override_host_health_status_set) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
policy_.get(), this);
@ -285,7 +349,8 @@ XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) {
if (override_host.length() == 0) {
return absl::nullopt;
}
auto subchannel = policy_->GetSubchannelByAddress(override_host);
auto subchannel = policy_->GetSubchannelByAddress(
override_host, override_host_health_status_set_);
if (subchannel == nullptr) {
return absl::nullopt;
}
@ -348,6 +413,10 @@ void XdsOverrideHostLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this);
}
shutting_down_ = true;
{
MutexLock lock(&subchannel_map_mu_);
subchannel_map_.clear();
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
@ -382,10 +451,9 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
UpdateAddressMap(args.addresses);
// Update child policy.
UpdateArgs update_args;
update_args.addresses = std::move(args.addresses);
update_args.addresses = UpdateAddressMap(std::move(args.addresses));
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_config();
update_args.args = std::move(args.args);
@ -399,7 +467,8 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto xds_override_host_picker = MakeRefCounted<Picker>(Ref(), picker_);
auto xds_override_host_picker = MakeRefCounted<Picker>(
Ref(), picker_, config_->override_host_status_set());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] updating connectivity: state=%s "
@ -435,71 +504,100 @@ OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
void XdsOverrideHostLb::UpdateAddressMap(
const absl::StatusOr<ServerAddressList>& addresses) {
std::unordered_set<std::string> keys(addresses->size());
if (addresses.ok()) {
for (const auto& address : *addresses) {
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (key.ok()) {
keys.insert(std::move(*key));
}
}
absl::StatusOr<ServerAddressList> XdsOverrideHostLb::UpdateAddressMap(
absl::StatusOr<ServerAddressList> addresses) {
if (!addresses.ok()) {
return addresses;
}
MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (keys.find(it->first) == keys.end()) {
it = subchannel_map_.erase(it);
} else {
++it;
ServerAddressList return_value;
std::map<const std::string, XdsHealthStatus> addresses_for_map;
for (const auto& address : *addresses) {
XdsHealthStatus status = GetAddressHealthStatus(address);
if (status.status() != XdsHealthStatus::kDraining) {
return_value.push_back(address);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
continue;
}
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (key.ok()) {
addresses_for_map.emplace(std::move(*key), status);
}
}
for (const auto& key : keys) {
if (subchannel_map_.find(key) == subchannel_map_.end()) {
subchannel_map_.emplace(key, SubchannelEntry());
{
MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (addresses_for_map.find(it->first) == addresses_for_map.end()) {
it = subchannel_map_.erase(it);
} else {
++it;
}
}
for (const auto& key_status : addresses_for_map) {
auto it = subchannel_map_.find(key_status.first);
if (it == subchannel_map_.end()) {
subchannel_map_.emplace(std::piecewise_construct,
std::forward_as_tuple(key_status.first),
std::forward_as_tuple(key_status.second));
} else {
it->second.SetEdsHealthStatus(key_status.second);
}
}
}
return return_value;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel) {
auto subchannel_key = grpc_sockaddr_to_string(&address.address(), false);
absl::optional<std::string> key;
if (subchannel_key.ok()) {
key = std::move(*subchannel_key);
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (!key.ok()) {
return subchannel;
}
auto wrapper =
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), key);
if (key.has_value()) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
}
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), *key);
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
}
return wrapper;
}
void XdsOverrideHostLb::ResetSubchannel(absl::string_view key,
void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key,
SubchannelWrapper* subchannel) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(key);
if (it != subchannel_map_.end()) {
it->second.ResetSubchannel(subchannel);
if (subchannel == it->second.GetSubchannel()) {
it->second.UnsetSubchannel();
}
}
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::GetSubchannelByAddress(absl::string_view address) {
XdsOverrideHostLb::GetSubchannelByAddress(
absl::string_view address, XdsHealthStatusSet overriden_health_statuses) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(address);
if (it != subchannel_map_.end()) {
return it->second.GetSubchannel();
if (it != subchannel_map_.end() &&
overriden_health_statuses.Contains(it->second.eds_health_status())) {
return it->second.GetSubchannel()->Ref();
}
return nullptr;
}
void XdsOverrideHostLb::OnSubchannelConnectivityStateChange(
absl::string_view subchannel_key) {
auto it = subchannel_map_.find(subchannel_key);
if (it == subchannel_map_.end()) {
return;
}
if (it->second.eds_health_status().status() == XdsHealthStatus::kDraining) {
MaybeUpdatePickerLocked();
}
}
//
// XdsOverrideHostLb::Helper
//
@ -551,37 +649,70 @@ void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity,
XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::optional<const std::string> key)
RefCountedPtr<XdsOverrideHostLb> policy, absl::string_view key)
: DelegatingSubchannel(std::move(subchannel)),
key_(std::move(key)),
policy_(std::move(policy)) {}
key_(key),
policy_(std::move(policy)) {
auto watcher = std::make_unique<ConnectivityStateWatcher>(WeakRef());
watcher_ = watcher.get();
wrapped_subchannel()->WatchConnectivityState(std::move(watcher));
}
XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() {
if (key_.has_value()) {
policy_->ResetSubchannel(*key_, this);
policy_->UnsetSubchannel(*key_, this);
}
}
void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
auto watcher_id = watcher.get();
auto wrapper =
std::make_unique<ConnectivityStateWatcher>(std::move(watcher), Ref());
watchers_.emplace(watcher_id, wrapper.get());
wrapped_subchannel()->WatchConnectivityState(std::move(wrapper));
watchers_.insert(std::move(watcher));
}
void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) {
auto original_watcher = watchers_.find(watcher);
if (original_watcher != watchers_.end()) {
wrapped_subchannel()->CancelConnectivityStateWatch(
original_watcher->second);
watchers_.erase(original_watcher);
auto it = watchers_.find(watcher);
if (it != watchers_.end()) {
watchers_.erase(it);
}
}
void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
grpc_connectivity_state state, absl::Status status) {
connectivity_state_.store(state);
// Sending connectivity state notifications to the watchers may cause the set
// of watchers to change, so we can't be iterating over the set of watchers
// while we send the notifications
std::vector<ConnectivityStateWatcherInterface*> watchers(watchers_.size());
for (const auto& watcher : watchers_) {
watchers.push_back(watcher.get());
}
for (const auto& watcher : watchers) {
if (watchers_.find(watcher) != watchers_.end()) {
watcher->OnConnectivityStateChange(state, status);
}
}
if (key_.has_value()) {
policy_->OnSubchannelConnectivityStateChange(*key_);
}
}
void XdsOverrideHostLb::SubchannelWrapper::Orphan() {
key_.reset();
wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
}
grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper::
ConnectivityStateWatcher::interested_parties() {
return subchannel_->policy_->interested_parties();
}
void XdsOverrideHostLb::SubchannelWrapper::ConnectivityStateWatcher::
OnConnectivityStateChange(grpc_connectivity_state state,
absl::Status status) {
subchannel_->UpdateConnectivityState(state, status);
}
//
// factory
//
@ -630,20 +761,46 @@ const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader(
return kJsonLoader;
}
void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json,
const JsonArgs& args,
ValidationErrors* errors) {
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
{
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
child_config_ = std::move(*child_policy_config);
}
}
}
{
ValidationErrors::ScopedField field(errors, ".overrideHostStatus");
auto host_status_list = LoadJsonObjectField<std::vector<std::string>>(
json.object_value(), args, "overrideHostStatus", errors,
/*required=*/false);
if (host_status_list.has_value()) {
for (size_t i = 0; i < host_status_list->size(); ++i) {
const std::string& host_status = (*host_status_list)[i];
auto status = XdsHealthStatus::FromString(host_status);
if (!status.has_value()) {
ValidationErrors::ScopedField field(errors,
absl::StrCat("[", i, "]"));
errors->AddError("invalid host status");
} else {
override_host_status_set_.Add(*status);
}
}
} else {
child_config_ = std::move(*child_policy_config);
override_host_status_set_ = XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)});
}
}
}

@ -21,6 +21,7 @@
#include "absl/strings/string_view.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h"
@ -49,12 +50,18 @@ class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config {
return child_config_;
}
XdsHealthStatusSet override_host_status_set() const {
return override_host_status_set_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_config_;
XdsHealthStatusSet override_host_status_set_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_OVERRIDE_HOST_H

@ -26,6 +26,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "src/core/lib/resolver/server_address.h"
@ -53,6 +54,32 @@ class XdsHealthStatus {
HealthStatus status_;
};
class XdsHealthStatusSet {
public:
XdsHealthStatusSet() = default;
explicit XdsHealthStatusSet(absl::Span<const XdsHealthStatus> statuses) {
for (XdsHealthStatus status : statuses) {
Add(status);
}
}
bool operator==(const XdsHealthStatusSet& other) const {
return status_mask_ == other.status_mask_;
}
void Clear() { status_mask_ = 0; }
void Add(XdsHealthStatus status) { status_mask_ |= (0x1 << status.status()); }
bool Contains(XdsHealthStatus status) const {
return status_mask_ & (0x1 << status.status());
}
private:
int status_mask_ = 0;
};
bool operator<(const XdsHealthStatus& hs1, const XdsHealthStatus& hs2);
class XdsEndpointHealthStatusAttribute

@ -790,6 +790,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>*
subchannel_call_tracker = nullptr,
SourceLocation location = SourceLocation()) {
EXPECT_NE(picker, nullptr);
if (picker == nullptr) {
return absl::nullopt;
}
auto pick_result = DoPick(picker, call_attributes);
auto* complete = absl::get_if<LoadBalancingPolicy::PickResult::Complete>(
&pick_result.result);

@ -22,6 +22,7 @@
#include "src/core/ext/filters/client_channel/client_channel_service_config.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/service_config/service_config.h"
@ -42,13 +43,16 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) {
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ]\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"DRAINING\", \"HEALTHY\", \"UNKNOWN\""
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
ASSERT_TRUE(service_config.ok()) << service_config.status();
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<ClientChannelGlobalParsedConfig*>(
(*service_config)
@ -60,6 +64,12 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) {
ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name());
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet({
XdsHealthStatus(XdsHealthStatus::HealthStatus::kDraining),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown),
}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
@ -93,6 +103,72 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfigWithRR) {
ASSERT_EQ(override_host_lb_config->child_config()->name(), "round_robin");
}
TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoDraining) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"HEALTHY\", \"UNKNOWN\""
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<ClientChannelGlobalParsedConfig*>(
(*service_config)
->GetGlobalParsedConfig(
ClientChannelServiceConfigParser::ParserIndex()));
ASSERT_NE(global_config, nullptr);
auto lb_config = global_config->parsed_lb_config();
ASSERT_NE(lb_config, nullptr);
ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name());
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoOverrideHostStatuses) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<internal::ClientChannelGlobalParsedConfig*>(
(*service_config)->GetGlobalParsedConfig(0));
ASSERT_NE(global_config, nullptr);
auto lb_config = global_config->parsed_lb_config();
ASSERT_NE(lb_config, nullptr);
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
EXPECT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) {
const char* service_config_json =
"{\n"
@ -151,6 +227,30 @@ TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) {
"error:errors validating xds_override_host LB policy config: "
"[field:childPolicy error:No known policies in list: ]]"));
}
TEST(XdsOverrideHostConfigParsingTest, UnrecognizedHostStatus) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"NOTASTATUS\""
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_FALSE(service_config.ok()) << service_config.status();
EXPECT_EQ(service_config.status(),
absl::InvalidArgumentError(
"errors validating service config: [field:loadBalancingConfig "
"error:errors validating xds_override_host LB policy config: "
"[field:overrideHostStatus[0] error:invalid host status]]"));
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -18,9 +18,13 @@
#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "gtest/gtest.h"
@ -28,36 +32,40 @@
#include <grpc/grpc.h>
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/resolver/server_address.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
class XdsOverrideHostTest : public LoadBalancingPolicyTest {
protected:
XdsOverrideHostTest()
: policy_(MakeLbPolicy("xds_override_host_experimental")) {}
RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
static RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
Json::Array override_host_status = {"UNKNOWN", "HEALTHY"},
std::string child_policy = "round_robin") {
Json::Object child_policy_config = {{child_policy, Json::Object()}};
return MakeConfig(Json::Array{Json::Object{
{"xds_override_host_experimental",
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}});
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}},
{"overrideHostStatus", override_host_status}}}}});
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
RefCountedPtr<LoadBalancingPolicy::Config>
config = MakeXdsOverrideHostConfig()) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
policy_.get()),
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, config), policy_.get()),
absl::OkStatus());
ExpectConnectingUpdate();
for (size_t i = 0; i < addresses.size(); ++i) {
@ -79,6 +87,31 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
return picker;
}
ServerAddress MakeAddressWithHealthStatus(
absl::string_view address, XdsHealthStatus::HealthStatus status) {
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attrs;
attrs.emplace(XdsEndpointHealthStatusAttribute::kKey,
std::make_unique<XdsEndpointHealthStatusAttribute>(
XdsHealthStatus(status)));
return {MakeAddress(address), {}, std::move(attrs)};
}
void ApplyUpdateWithHealthStatuses(
absl::Span<const std::pair<const absl::string_view,
XdsHealthStatus::HealthStatus>>
addresses_and_statuses,
Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}) {
LoadBalancingPolicy::UpdateArgs update;
update.config = MakeXdsOverrideHostConfig(std::move(override_host_status));
update.addresses.emplace();
for (auto address_and_status : addresses_and_statuses) {
update.addresses->push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus());
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
@ -213,6 +246,174 @@ TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) {
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
}
TEST_F(XdsOverrideHostTest, DrainingState) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
// Draining subchannel is returned
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Gone!
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, pick_arg);
}
TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
// There are two notifications - one from child policy and one from the parent
// policy due to draining channel update
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
EXPECT_TRUE(subchannel->ConnectionRequested());
ExpectQueueEmpty();
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
}
TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
}
TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}}),
kAddresses[2]);
// UNKNOWN excluded - first chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}}),
kAddresses[2]);
// HEALTHY excluded - second chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}});
// DRAINING excluded - third chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}});
}
} // namespace
} // namespace testing
} // namespace grpc_core

Loading…
Cancel
Save