Revert "xds_override_host LB: add support for draining state (#31985)" (#32189)

This reverts commit e8e9514a11.
pull/32203/head
AJ Heller 2 years ago committed by GitHub
parent 3980ed706a
commit 5222eafdc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/BUILD
  2. 367
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  3. 7
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h
  4. 27
      src/core/ext/xds/xds_health_status.h
  5. 4
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  6. 104
      test/core/client_channel/lb_policy/xds_override_host_lb_config_parser_test.cc
  7. 213
      test/core/client_channel/lb_policy/xds_override_host_test.cc

@ -4593,7 +4593,6 @@ grpc_cc_library(
"closure",
"error",
"grpc_stateful_session_filter",
"grpc_xds_client",
"iomgr_fwd",
"json",
"json_args",
@ -4601,7 +4600,6 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"match",
"pollset_set",
"subchannel_interface",
"validation_errors",

@ -18,23 +18,18 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <tuple>
#include <unordered_set>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
@ -46,13 +41,11 @@
#include "src/core/ext/filters/client_channel/lb_call_state_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -77,31 +70,6 @@ namespace grpc_core {
TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
namespace {
template <typename Value>
struct PtrLessThan {
using is_transparent = void;
bool operator()(const std::unique_ptr<Value>& v1,
const std::unique_ptr<Value>& v2) const {
return v1 < v2;
}
bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; }
bool operator()(const Value* v1, const std::unique_ptr<Value>& v2) const {
return v1 < v2.get();
}
bool operator()(const std::unique_ptr<Value>& v1, const Value* v2) const {
return v1.get() < v2;
}
};
XdsHealthStatus GetAddressHealthStatus(const ServerAddress& address) {
auto attribute = address.GetAttribute(XdsEndpointHealthStatusAttribute::kKey);
if (attribute == nullptr) {
return XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown);
}
return static_cast<const XdsEndpointHealthStatusAttribute*>(attribute)
->status();
}
//
// xds_override_host LB policy
@ -123,7 +91,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::string_view key);
absl::optional<const std::string> key);
~SubchannelWrapper() override;
@ -142,32 +110,31 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
private:
class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface {
public:
explicit ConnectivityStateWatcher(
WeakRefCountedPtr<SubchannelWrapper> subchannel)
: subchannel_(std::move(subchannel)) {}
ConnectivityStateWatcher(
std::unique_ptr<ConnectivityStateWatcherInterface> delegate,
RefCountedPtr<SubchannelWrapper> subchannel)
: delegate_(std::move(delegate)), subchannel_(subchannel) {}
void OnConnectivityStateChange(grpc_connectivity_state state,
absl::Status status) override;
absl::Status status) override {
delegate_->OnConnectivityStateChange(state, status);
subchannel_->connectivity_state_ = state;
}
grpc_pollset_set* interested_parties() override;
grpc_pollset_set* interested_parties() override {
return delegate_->interested_parties();
}
private:
WeakRefCountedPtr<SubchannelWrapper> subchannel_;
std::unique_ptr<ConnectivityStateWatcherInterface> delegate_;
RefCountedPtr<SubchannelWrapper> subchannel_;
};
void Orphan() override;
void UpdateConnectivityState(grpc_connectivity_state state,
absl::Status status);
ConnectivityStateWatcher* watcher_;
absl::optional<std::string> key_;
const absl::optional<const std::string> key_;
RefCountedPtr<XdsOverrideHostLb> policy_;
std::set<std::unique_ptr<ConnectivityStateWatcherInterface>,
PtrLessThan<ConnectivityStateWatcherInterface>>
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
std::map<ConnectivityStateWatcherInterface*, ConnectivityStateWatcher*>
watchers_;
std::atomic<grpc_connectivity_state> connectivity_state_ = {
GRPC_CHANNEL_IDLE};
};
// A picker that wraps the picker from the child for cases when cookie is
@ -175,8 +142,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker,
XdsHealthStatusSet override_host_health_status_set);
RefCountedPtr<SubchannelPicker> picker);
PickResult Pick(PickArgs args) override;
@ -193,9 +159,11 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
}
private:
XdsOverrideHostLb* policy() { return subchannel_->policy(); }
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<SubchannelConnectionRequester*>(arg);
self->subchannel_->policy()->work_serializer()->Run(
self->policy()->work_serializer()->Run(
[self]() {
self->subchannel_->RequestConnection();
delete self;
@ -212,7 +180,6 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<XdsOverrideHostLb> policy_;
RefCountedPtr<SubchannelPicker> picker_;
XdsHealthStatusSet override_host_health_status_set_;
};
class Helper : public ChannelControlHelper {
@ -240,49 +207,25 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
class SubchannelEntry {
public:
explicit SubchannelEntry(XdsHealthStatus eds_health_status)
: eds_health_status_(eds_health_status) {}
void SetSubchannel(SubchannelWrapper* subchannel) {
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
}
subchannel_ = subchannel;
}
void UnsetSubchannel() { subchannel_ = nullptr; }
SubchannelWrapper* GetSubchannel() const {
return Match(
subchannel_,
[](XdsOverrideHostLb::SubchannelWrapper* subchannel) {
return subchannel;
},
[](RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper> subchannel) {
return subchannel.get();
});
void ResetSubchannel(SubchannelWrapper* expected) {
if (subchannel_ == expected) {
subchannel_ = nullptr;
}
}
void SetEdsHealthStatus(XdsHealthStatus eds_health_status) {
eds_health_status_ = eds_health_status;
auto subchannel = GetSubchannel();
if (subchannel == nullptr) {
return;
}
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
RefCountedPtr<SubchannelWrapper> GetSubchannel() {
if (subchannel_ == nullptr) {
return nullptr;
}
return subchannel_->Ref();
}
XdsHealthStatus eds_health_status() const { return eds_health_status_; }
private:
absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
subchannel_;
XdsHealthStatus eds_health_status_;
SubchannelWrapper* subchannel_ = nullptr;
};
~XdsOverrideHostLb() override;
@ -294,21 +237,17 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
absl::StatusOr<ServerAddressList> UpdateAddressMap(
absl::StatusOr<ServerAddressList> addresses);
RefCountedPtr<SubchannelWrapper> LookupSubchannel(absl::string_view address);
void UpdateAddressMap(const absl::StatusOr<ServerAddressList>& addresses);
RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel);
void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
void ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
RefCountedPtr<SubchannelWrapper> GetSubchannelByAddress(
absl::string_view address, XdsHealthStatusSet overriden_health_statuses);
void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key)
ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker
// serializer and does not require
// additional synchronization
absl::string_view address);
// Current config from the resolver.
RefCountedPtr<XdsOverrideHostLbConfig> config_;
@ -333,11 +272,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
XdsOverrideHostLb::Picker::Picker(
RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker,
XdsHealthStatusSet override_host_health_status_set)
: policy_(std::move(xds_override_host_lb)),
picker_(std::move(picker)),
override_host_health_status_set_(override_host_health_status_set) {
RefCountedPtr<SubchannelPicker> picker)
: policy_(std::move(xds_override_host_lb)), picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
policy_.get(), this);
@ -349,8 +285,7 @@ XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) {
if (override_host.length() == 0) {
return absl::nullopt;
}
auto subchannel = policy_->GetSubchannelByAddress(
override_host, override_host_health_status_set_);
auto subchannel = policy_->GetSubchannelByAddress(override_host);
if (subchannel == nullptr) {
return absl::nullopt;
}
@ -413,10 +348,6 @@ void XdsOverrideHostLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this);
}
shutting_down_ = true;
{
MutexLock lock(&subchannel_map_mu_);
subchannel_map_.clear();
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
@ -451,9 +382,10 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
UpdateAddressMap(args.addresses);
// Update child policy.
UpdateArgs update_args;
update_args.addresses = UpdateAddressMap(std::move(args.addresses));
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_config();
update_args.args = std::move(args.args);
@ -467,8 +399,7 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto xds_override_host_picker = MakeRefCounted<Picker>(
Ref(), picker_, config_->override_host_status_set());
auto xds_override_host_picker = MakeRefCounted<Picker>(Ref(), picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] updating connectivity: state=%s "
@ -504,100 +435,71 @@ OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
absl::StatusOr<ServerAddressList> XdsOverrideHostLb::UpdateAddressMap(
absl::StatusOr<ServerAddressList> addresses) {
if (!addresses.ok()) {
return addresses;
}
ServerAddressList return_value;
std::map<const std::string, XdsHealthStatus> addresses_for_map;
for (const auto& address : *addresses) {
XdsHealthStatus status = GetAddressHealthStatus(address);
if (status.status() != XdsHealthStatus::kDraining) {
return_value.push_back(address);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
continue;
}
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (key.ok()) {
addresses_for_map.emplace(std::move(*key), status);
void XdsOverrideHostLb::UpdateAddressMap(
const absl::StatusOr<ServerAddressList>& addresses) {
std::unordered_set<std::string> keys(addresses->size());
if (addresses.ok()) {
for (const auto& address : *addresses) {
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (key.ok()) {
keys.insert(std::move(*key));
}
}
}
{
MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (addresses_for_map.find(it->first) == addresses_for_map.end()) {
it = subchannel_map_.erase(it);
} else {
++it;
}
MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (keys.find(it->first) == keys.end()) {
it = subchannel_map_.erase(it);
} else {
++it;
}
for (const auto& key_status : addresses_for_map) {
auto it = subchannel_map_.find(key_status.first);
if (it == subchannel_map_.end()) {
subchannel_map_.emplace(std::piecewise_construct,
std::forward_as_tuple(key_status.first),
std::forward_as_tuple(key_status.second));
} else {
it->second.SetEdsHealthStatus(key_status.second);
}
}
for (const auto& key : keys) {
if (subchannel_map_.find(key) == subchannel_map_.end()) {
subchannel_map_.emplace(key, SubchannelEntry());
}
}
return return_value;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel) {
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (!key.ok()) {
return subchannel;
auto subchannel_key = grpc_sockaddr_to_string(&address.address(), false);
absl::optional<std::string> key;
if (subchannel_key.ok()) {
key = std::move(*subchannel_key);
}
auto wrapper =
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), *key);
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), key);
if (key.has_value()) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
}
}
return wrapper;
}
void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key,
void XdsOverrideHostLb::ResetSubchannel(absl::string_view key,
SubchannelWrapper* subchannel) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(key);
if (it != subchannel_map_.end()) {
if (subchannel == it->second.GetSubchannel()) {
it->second.UnsetSubchannel();
}
it->second.ResetSubchannel(subchannel);
}
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::GetSubchannelByAddress(
absl::string_view address, XdsHealthStatusSet overriden_health_statuses) {
XdsOverrideHostLb::GetSubchannelByAddress(absl::string_view address) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(address);
if (it != subchannel_map_.end() &&
overriden_health_statuses.Contains(it->second.eds_health_status())) {
return it->second.GetSubchannel()->Ref();
if (it != subchannel_map_.end()) {
return it->second.GetSubchannel();
}
return nullptr;
}
void XdsOverrideHostLb::OnSubchannelConnectivityStateChange(
absl::string_view subchannel_key) {
auto it = subchannel_map_.find(subchannel_key);
if (it == subchannel_map_.end()) {
return;
}
if (it->second.eds_health_status().status() == XdsHealthStatus::kDraining) {
MaybeUpdatePickerLocked();
}
}
//
// XdsOverrideHostLb::Helper
//
@ -649,70 +551,37 @@ void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity,
XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy, absl::string_view key)
RefCountedPtr<XdsOverrideHostLb> policy,
absl::optional<const std::string> key)
: DelegatingSubchannel(std::move(subchannel)),
key_(key),
policy_(std::move(policy)) {
auto watcher = std::make_unique<ConnectivityStateWatcher>(WeakRef());
watcher_ = watcher.get();
wrapped_subchannel()->WatchConnectivityState(std::move(watcher));
}
key_(std::move(key)),
policy_(std::move(policy)) {}
XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() {
if (key_.has_value()) {
policy_->UnsetSubchannel(*key_, this);
policy_->ResetSubchannel(*key_, this);
}
}
void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
watchers_.insert(std::move(watcher));
auto watcher_id = watcher.get();
auto wrapper =
std::make_unique<ConnectivityStateWatcher>(std::move(watcher), Ref());
watchers_.emplace(watcher_id, wrapper.get());
wrapped_subchannel()->WatchConnectivityState(std::move(wrapper));
}
void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) {
auto it = watchers_.find(watcher);
if (it != watchers_.end()) {
watchers_.erase(it);
}
}
void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
grpc_connectivity_state state, absl::Status status) {
connectivity_state_.store(state);
// Sending connectivity state notifications to the watchers may cause the set
// of watchers to change, so we can't be iterating over the set of watchers
// while we send the notifications
std::vector<ConnectivityStateWatcherInterface*> watchers(watchers_.size());
for (const auto& watcher : watchers_) {
watchers.push_back(watcher.get());
}
for (const auto& watcher : watchers) {
if (watchers_.find(watcher) != watchers_.end()) {
watcher->OnConnectivityStateChange(state, status);
}
}
if (key_.has_value()) {
policy_->OnSubchannelConnectivityStateChange(*key_);
auto original_watcher = watchers_.find(watcher);
if (original_watcher != watchers_.end()) {
wrapped_subchannel()->CancelConnectivityStateWatch(
original_watcher->second);
watchers_.erase(original_watcher);
}
}
void XdsOverrideHostLb::SubchannelWrapper::Orphan() {
key_.reset();
wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
}
grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper::
ConnectivityStateWatcher::interested_parties() {
return subchannel_->policy_->interested_parties();
}
void XdsOverrideHostLb::SubchannelWrapper::ConnectivityStateWatcher::
OnConnectivityStateChange(grpc_connectivity_state state,
absl::Status status) {
subchannel_->UpdateConnectivityState(state, status);
}
//
// factory
//
@ -761,46 +630,20 @@ const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader(
return kJsonLoader;
}
void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json,
const JsonArgs& args,
void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors) {
{
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
child_config_ = std::move(*child_policy_config);
}
}
}
{
ValidationErrors::ScopedField field(errors, ".overrideHostStatus");
auto host_status_list = LoadJsonObjectField<std::vector<std::string>>(
json.object_value(), args, "overrideHostStatus", errors,
/*required=*/false);
if (host_status_list.has_value()) {
for (size_t i = 0; i < host_status_list->size(); ++i) {
const std::string& host_status = (*host_status_list)[i];
auto status = XdsHealthStatus::FromString(host_status);
if (!status.has_value()) {
ValidationErrors::ScopedField field(errors,
absl::StrCat("[", i, "]"));
errors->AddError("invalid host status");
} else {
override_host_status_set_.Add(*status);
}
}
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
override_host_status_set_ = XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)});
child_config_ = std::move(*child_policy_config);
}
}
}

@ -21,7 +21,6 @@
#include "absl/strings/string_view.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h"
@ -50,18 +49,12 @@ class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config {
return child_config_;
}
XdsHealthStatusSet override_host_status_set() const {
return override_host_status_set_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_config_;
XdsHealthStatusSet override_host_status_set_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_OVERRIDE_HOST_H

@ -26,7 +26,6 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "src/core/lib/resolver/server_address.h"
@ -54,32 +53,6 @@ class XdsHealthStatus {
HealthStatus status_;
};
class XdsHealthStatusSet {
public:
XdsHealthStatusSet() = default;
explicit XdsHealthStatusSet(absl::Span<const XdsHealthStatus> statuses) {
for (XdsHealthStatus status : statuses) {
Add(status);
}
}
bool operator==(const XdsHealthStatusSet& other) const {
return status_mask_ == other.status_mask_;
}
void Clear() { status_mask_ = 0; }
void Add(XdsHealthStatus status) { status_mask_ |= (0x1 << status.status()); }
bool Contains(XdsHealthStatus status) const {
return status_mask_ & (0x1 << status.status());
}
private:
int status_mask_ = 0;
};
bool operator<(const XdsHealthStatus& hs1, const XdsHealthStatus& hs2);
class XdsEndpointHealthStatusAttribute

@ -790,10 +790,6 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>*
subchannel_call_tracker = nullptr,
SourceLocation location = SourceLocation()) {
EXPECT_NE(picker, nullptr);
if (picker == nullptr) {
return absl::nullopt;
}
auto pick_result = DoPick(picker, call_attributes);
auto* complete = absl::get_if<LoadBalancingPolicy::PickResult::Complete>(
&pick_result.result);

@ -22,7 +22,6 @@
#include "src/core/ext/filters/client_channel/client_channel_service_config.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/service_config/service_config.h"
@ -43,16 +42,13 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) {
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"DRAINING\", \"HEALTHY\", \"UNKNOWN\""
" ]"
" ]\n"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok()) << service_config.status();
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<ClientChannelGlobalParsedConfig*>(
(*service_config)
@ -64,12 +60,6 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfig) {
ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name());
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet({
XdsHealthStatus(XdsHealthStatus::HealthStatus::kDraining),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown),
}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
@ -103,72 +93,6 @@ TEST(XdsOverrideHostConfigParsingTest, ValidConfigWithRR) {
ASSERT_EQ(override_host_lb_config->child_config()->name(), "round_robin");
}
TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoDraining) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"HEALTHY\", \"UNKNOWN\""
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<ClientChannelGlobalParsedConfig*>(
(*service_config)
->GetGlobalParsedConfig(
ClientChannelServiceConfigParser::ParserIndex()));
ASSERT_NE(global_config, nullptr);
auto lb_config = global_config->parsed_lb_config();
ASSERT_NE(lb_config, nullptr);
ASSERT_EQ(lb_config->name(), XdsOverrideHostLbConfig::Name());
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
ASSERT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
TEST(XdsOverrideHostConfigParsingTest, ValidConfigNoOverrideHostStatuses) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
auto global_config = static_cast<internal::ClientChannelGlobalParsedConfig*>(
(*service_config)->GetGlobalParsedConfig(0));
ASSERT_NE(global_config, nullptr);
auto lb_config = global_config->parsed_lb_config();
ASSERT_NE(lb_config, nullptr);
auto override_host_lb_config =
static_cast<RefCountedPtr<XdsOverrideHostLbConfig>>(lb_config);
EXPECT_EQ(override_host_lb_config->override_host_status_set(),
XdsHealthStatusSet(
{XdsHealthStatus(XdsHealthStatus::HealthStatus::kHealthy),
XdsHealthStatus(XdsHealthStatus::HealthStatus::kUnknown)}));
ASSERT_NE(override_host_lb_config->child_config(), nullptr);
EXPECT_EQ(override_host_lb_config->child_config()->name(), "grpclb");
}
TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) {
const char* service_config_json =
"{\n"
@ -227,30 +151,6 @@ TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) {
"error:errors validating xds_override_host LB policy config: "
"[field:childPolicy error:No known policies in list: ]]"));
}
TEST(XdsOverrideHostConfigParsingTest, UnrecognizedHostStatus) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ],\n"
" \"overrideHostStatus\": [\n"
" \"NOTASTATUS\""
" ]"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_FALSE(service_config.ok()) << service_config.status();
EXPECT_EQ(service_config.status(),
absl::InvalidArgumentError(
"errors validating service config: [field:loadBalancingConfig "
"error:errors validating xds_override_host LB policy config: "
"[field:overrideHostStatus[0] error:invalid host status]]"));
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -18,13 +18,9 @@
#include <array>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "gtest/gtest.h"
@ -32,40 +28,36 @@
#include <grpc/grpc.h>
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/resolver/server_address.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
class XdsOverrideHostTest : public LoadBalancingPolicyTest {
protected:
XdsOverrideHostTest()
: policy_(MakeLbPolicy("xds_override_host_experimental")) {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
Json::Array override_host_status = {"UNKNOWN", "HEALTHY"},
RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
std::string child_policy = "round_robin") {
Json::Object child_policy_config = {{child_policy, Json::Object()}};
return MakeConfig(Json::Array{Json::Object{
{"xds_override_host_experimental",
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}},
{"overrideHostStatus", override_host_status}}}}});
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}});
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
RefCountedPtr<LoadBalancingPolicy::Config>
config = MakeXdsOverrideHostConfig()) {
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, config), policy_.get()),
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
ExpectConnectingUpdate();
for (size_t i = 0; i < addresses.size(); ++i) {
@ -87,31 +79,6 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
return picker;
}
ServerAddress MakeAddressWithHealthStatus(
absl::string_view address, XdsHealthStatus::HealthStatus status) {
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attrs;
attrs.emplace(XdsEndpointHealthStatusAttribute::kKey,
std::make_unique<XdsEndpointHealthStatusAttribute>(
XdsHealthStatus(status)));
return {MakeAddress(address), {}, std::move(attrs)};
}
void ApplyUpdateWithHealthStatuses(
absl::Span<const std::pair<const absl::string_view,
XdsHealthStatus::HealthStatus>>
addresses_and_statuses,
Json::Array override_host_status = {"UNKNOWN", "HEALTHY"}) {
LoadBalancingPolicy::UpdateArgs update;
update.config = MakeXdsOverrideHostConfig(std::move(override_host_status));
update.addresses.emplace();
for (auto address_and_status : addresses_and_statuses) {
update.addresses->push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus());
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
@ -246,174 +213,6 @@ TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) {
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
}
TEST_F(XdsOverrideHostTest, DrainingState) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
// Draining subchannel is returned
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Gone!
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, pick_arg);
}
TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
// There are two notifications - one from child policy and one from the parent
// policy due to draining channel update
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
EXPECT_TRUE(subchannel->ConnectionRequested());
ExpectQueueEmpty();
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), pick_arg);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
}
TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
std::map<UniqueTypeName, std::string> pick_arg{
{XdsOverrideHostTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(), pick_arg), kAddresses[1]);
}
TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}}),
kAddresses[2]);
// UNKNOWN excluded - first chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}}),
kAddresses[2]);
// HEALTHY excluded - second chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}});
// DRAINING excluded - third chanel does not get overridden
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:441"}}),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
{{XdsOverrideHostTypeName(), "127.0.0.1:442"}}),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
{{XdsOverrideHostTypeName(), "127.0.0.1:443"}});
}
} // namespace
} // namespace testing
} // namespace grpc_core

Loading…
Cancel
Save