[xDS] fix aggregate cluster design (#35313)

Implements the aggregate cluster changes described in gRFC A75 (https://github.com/grpc/proposal/pull/405).

Closes #35313

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35313 from markdroth:xds_aggregate_cluster_fix 85cfd70c59
PiperOrigin-RevId: 597326624
pull/35503/head
Mark D. Roth 1 year ago committed by Copybara-Service
parent d8d67f3e91
commit aa326c947c
  1. 1
      build_autogenerated.yaml
  2. 1
      src/core/BUILD
  3. 718
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  4. 8
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  5. 1
      test/cpp/end2end/xds/BUILD
  6. 139
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  7. 82
      test/cpp/end2end/xds/xds_outlier_detection_end2end_test.cc
  8. 89
      test/cpp/end2end/xds/xds_override_host_end2end_test.cc
  9. 7
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -18101,6 +18101,7 @@ targets:
run: false
language: c++
headers:
- test/core/util/scoped_env_var.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h

@ -4740,6 +4740,7 @@ grpc_cc_library(
deps = [
"channel_args",
"delegating_helper",
"env",
"grpc_lb_address_filtering",
"grpc_lb_xds_channel_args",
"grpc_outlier_detection_header",

@ -48,6 +48,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -70,6 +71,15 @@ TraceFlag grpc_cds_lb_trace(false, "cds_lb");
namespace {
// TODO(roth): Remove this after the 1.63 release.
bool XdsAggregateClusterBackwardCompatibilityEnabled() {
auto value = GetEnv("GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
if (!value.has_value()) return false;
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
return parse_succeeded && parsed_value;
}
using XdsConfig = XdsDependencyManager::XdsConfig;
constexpr absl::string_view kCds = "cds_experimental";
@ -123,29 +133,33 @@ class CdsLb : public LoadBalancingPolicy {
struct ChildNameState {
std::vector<size_t /*child_number*/> priority_child_numbers;
size_t next_available_child_number = 0;
void Reset() {
priority_child_numbers.clear();
next_available_child_number = 0;
}
};
~CdsLb() override;
void ShutdownLocked() override;
// Computes child numbers for new_cluster_list, reusing child numbers
// from old_cluster_list and child_name_state_list_ in an intelligent
// Computes child numbers for new_cluster, reusing child numbers
// from old_cluster and child_name_state_ in an intelligent
// way to avoid unnecessary churn.
std::vector<ChildNameState> ComputeChildNames(
const std::vector<const XdsConfig::ClusterConfig*>& old_cluster_list,
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list)
const;
ChildNameState ComputeChildNames(
const XdsConfig::ClusterConfig* old_cluster,
const XdsConfig::ClusterConfig& new_cluster,
const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const;
std::string GetChildPolicyName(const std::string& cluster, size_t priority);
Json CreateChildPolicyConfig(
const Json::Array& lb_policy_config,
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list);
std::shared_ptr<EndpointAddressesIterator> CreateChildPolicyAddresses(
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list);
std::string CreateChildPolicyResolutionNote(
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list);
Json CreateChildPolicyConfigForLeafCluster(
const XdsConfig::ClusterConfig& new_cluster,
const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
const XdsClusterResource* aggregate_cluster_resource);
Json CreateChildPolicyConfigForAggregateCluster(
const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config);
void ResetState();
@ -157,9 +171,7 @@ class CdsLb : public LoadBalancingPolicy {
// Cluster subscription, for dynamic clusters (e.g., RLS).
RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription_;
// The elements in this vector correspond to those in
// xds_config_->clusters[cluster_name_].
std::vector<ChildNameState> child_name_state_list_;
ChildNameState child_name_state_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
@ -214,43 +226,58 @@ const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
return update->priorities;
}
absl::StatusOr<std::vector<const XdsConfig::ClusterConfig*>>
BuildLeafClusterConfigList(const XdsConfig* xds_config,
const XdsConfig::ClusterConfig* cluster_config) {
if (cluster_config == nullptr) {
return std::vector<const XdsConfig::ClusterConfig*>();
}
GPR_ASSERT(xds_config != nullptr);
std::vector<absl::string_view> tmp_leaf_clusters;
const std::vector<absl::string_view>& leaf_clusters = Match(
cluster_config->children,
[&](const XdsConfig::ClusterConfig::EndpointConfig&) {
tmp_leaf_clusters.push_back(cluster_config->cluster_name);
return tmp_leaf_clusters;
},
[&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
return aggregate_config.leaf_clusters;
});
std::vector<const XdsConfig::ClusterConfig*> leaf_cluster_configs;
leaf_cluster_configs.reserve(leaf_clusters.size());
for (const absl::string_view cluster_name : leaf_clusters) {
auto it = xds_config->clusters.find(cluster_name);
if (it == xds_config->clusters.end() || !it->second.ok() ||
it->second->cluster == nullptr) {
return absl::InternalError(absl::StrCat(
"xDS config does not contain an entry for cluster ", cluster_name));
}
const XdsConfig::ClusterConfig& cluster_config = *it->second;
if (!absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config.children)) {
return absl::InternalError(absl::StrCat("xDS config entry for cluster ",
cluster_name,
" has no endpoint config"));
std::string MakeChildPolicyName(absl::string_view cluster,
size_t child_number) {
return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
"}");
}
class PriorityEndpointIterator : public EndpointAddressesIterator {
public:
PriorityEndpointIterator(
std::string cluster_name,
std::shared_ptr<const XdsEndpointResource> endpoints,
std::vector<size_t /*child_number*/> priority_child_numbers)
: cluster_name_(std::move(cluster_name)),
endpoints_(std::move(endpoints)),
priority_child_numbers_(std::move(priority_child_numbers)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
const auto& priority_list = GetUpdatePriorityList(endpoints_.get());
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name =
MakeChildPolicyName(cluster_name_, priority_child_numbers_[priority]);
for (const auto& p : priority_entry.localities) {
const auto& locality_name = p.first;
const auto& locality = p.second;
std::vector<RefCountedStringValue> hierarchical_path = {
RefCountedStringValue(priority_child_name),
RefCountedStringValue(locality_name->AsHumanReadableString())};
auto hierarchical_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
for (const auto& endpoint : locality.endpoints) {
uint32_t endpoint_weight =
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
callback(EndpointAddresses(
endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
.SetObject(locality_name->Ref())
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
}
}
}
leaf_cluster_configs.push_back(&cluster_config);
}
return leaf_cluster_configs;
}
private:
std::string cluster_name_;
std::shared_ptr<const XdsEndpointResource> endpoints_;
std::vector<size_t /*child_number*/> priority_child_numbers_;
};
absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
// Get new config.
@ -313,13 +340,13 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
return status;
}
auto& new_cluster_config = it->second;
// If new config is not OK, report TRANSIENT_FAILURE.
// If new list is not OK, report TRANSIENT_FAILURE.
if (!new_cluster_config.ok()) {
ReportTransientFailure(new_cluster_config.status());
return new_cluster_config.status();
}
GPR_ASSERT(new_cluster_config->cluster != nullptr);
// Find old cluster config, if any.
// Find old cluster, if any.
const XdsConfig::ClusterConfig* old_cluster_config = nullptr;
if (xds_config_ != nullptr) {
auto it_old = xds_config_->clusters.find(cluster_name_);
@ -335,29 +362,68 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
}
}
}
// Construct lists of old and new leaf cluster configs.
auto old_leaf_cluster_configs =
BuildLeafClusterConfigList(xds_config_.get(), old_cluster_config);
if (!old_leaf_cluster_configs.ok()) {
ReportTransientFailure(old_leaf_cluster_configs.status());
return old_leaf_cluster_configs.status();
}
auto new_leaf_cluster_configs =
BuildLeafClusterConfigList(new_xds_config.get(), &*new_cluster_config);
if (!new_leaf_cluster_configs.ok()) {
ReportTransientFailure(new_leaf_cluster_configs.status());
return new_leaf_cluster_configs.status();
// TODO(roth): Remove this after the 1.63 release.
const XdsClusterResource* aggregate_cluster_resource = nullptr;
static constexpr absl::string_view kArgXdsAggregateClusterName =
GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_aggregate_cluster_name";
if (XdsAggregateClusterBackwardCompatibilityEnabled()) {
if (absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
new_cluster_config->children)) {
auto aggregate_cluster = args.args.GetString(kArgXdsAggregateClusterName);
if (aggregate_cluster.has_value()) {
auto it = new_xds_config->clusters.find(*aggregate_cluster);
if (it == new_xds_config->clusters.end()) {
// Cluster not present. This should never happen.
absl::Status status = absl::UnavailableError(
absl::StrCat("xDS config has no entry for aggregate cluster ",
*aggregate_cluster));
ReportTransientFailure(status);
return status;
}
auto& aggregate_cluster_config = it->second;
if (!aggregate_cluster_config.ok()) {
ReportTransientFailure(aggregate_cluster_config.status());
return aggregate_cluster_config.status();
}
GPR_ASSERT(aggregate_cluster_config->cluster != nullptr);
aggregate_cluster_resource = aggregate_cluster_config->cluster.get();
}
} else {
args.args = args.args.Set(kArgXdsAggregateClusterName, cluster_name_);
}
}
// Swap in new config and compute new child numbers.
child_name_state_list_ =
ComputeChildNames(*old_leaf_cluster_configs, *new_leaf_cluster_configs);
// Construct child policy config and update state based on the cluster type.
Json child_policy_config_json;
UpdateArgs update_args;
Match(
new_cluster_config->children,
// Leaf cluster.
[&](const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) {
// Compute new child numbers.
child_name_state_ = ComputeChildNames(
old_cluster_config, *new_cluster_config, endpoint_config);
// Populate addresses and resolution_note for child policy.
update_args.addresses = std::make_shared<PriorityEndpointIterator>(
new_cluster_config->cluster_name, endpoint_config.endpoints,
child_name_state_.priority_child_numbers);
update_args.resolution_note = endpoint_config.resolution_note;
// Construct child policy config.
child_policy_config_json = CreateChildPolicyConfigForLeafCluster(
*new_cluster_config, endpoint_config, aggregate_cluster_resource);
},
// Aggregate cluster.
[&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
child_name_state_.Reset();
// Construct child policy config.
child_policy_config_json =
CreateChildPolicyConfigForAggregateCluster(aggregate_config);
});
// Swap in new xDS config, now that we're done with the old one.
xds_config_ = std::move(new_xds_config);
// Construct child policy config.
Json json = CreateChildPolicyConfig(
new_cluster_config->cluster->lb_policy_config, *new_leaf_cluster_configs);
// Validate child policy config.
auto child_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
json);
child_policy_config_json);
if (!child_config.ok()) {
// Should never happen.
absl::Status status = absl::InternalError(
@ -392,342 +458,256 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
}
}
// Update child policy.
UpdateArgs update_args;
update_args.config = std::move(*child_config);
update_args.addresses = CreateChildPolicyAddresses(*new_leaf_cluster_configs);
update_args.resolution_note =
CreateChildPolicyResolutionNote(*new_leaf_cluster_configs);
update_args.args = args.args;
return child_policy_->UpdateLocked(std::move(update_args));
}
std::vector<CdsLb::ChildNameState> CdsLb::ComputeChildNames(
const std::vector<const XdsConfig::ClusterConfig*>& old_cluster_list,
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list)
const {
CdsLb::ChildNameState CdsLb::ComputeChildNames(
const XdsConfig::ClusterConfig* old_cluster,
const XdsConfig::ClusterConfig& new_cluster,
const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const {
GPR_ASSERT(
!absl::holds_alternative<XdsConfig::ClusterConfig::AggregateConfig>(
new_cluster.children));
// First, build some maps from locality to child number and the reverse
// from old_cluster_list and child_name_state_list_.
struct LocalityChildNumberMapping {
std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
locality_child_map;
std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
child_locality_map;
size_t next_available_child_number;
};
std::map<absl::string_view, LocalityChildNumberMapping> cluster_mappings;
size_t old_index = 0;
for (const auto* cluster_config : old_cluster_list) {
GPR_ASSERT(old_index < child_name_state_list_.size());
const auto& old_numbers = child_name_state_list_[old_index++];
const auto& endpoint_config =
absl::get<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config->children);
const auto& prev_priority_list =
GetUpdatePriorityList(endpoint_config.endpoints.get());
auto& mappings = cluster_mappings[cluster_config->cluster_name];
mappings.next_available_child_number =
old_numbers.next_available_child_number;
for (size_t priority = 0; priority < prev_priority_list.size();
++priority) {
size_t child_number = old_numbers.priority_child_numbers[priority];
const auto& localities = prev_priority_list[priority].localities;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
mappings.locality_child_map[locality_name] = child_number;
mappings.child_locality_map[child_number].insert(locality_name);
// from old_cluster and child_name_state_.
std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
locality_child_map;
std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
child_locality_map;
if (old_cluster != nullptr &&
old_cluster->cluster_name == new_cluster.cluster_name) {
auto* old_endpoint_config =
absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
&old_cluster->children);
if (old_endpoint_config != nullptr) {
const auto& prev_priority_list =
GetUpdatePriorityList(old_endpoint_config->endpoints.get());
for (size_t priority = 0; priority < prev_priority_list.size();
++priority) {
size_t child_number =
child_name_state_.priority_child_numbers[priority];
const auto& localities = prev_priority_list[priority].localities;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
locality_child_map[locality_name] = child_number;
child_locality_map[child_number].insert(locality_name);
}
}
}
}
// Now construct a new list containing priority child numbers for the new
// list based on cluster_mappings.
std::vector<ChildNameState> new_numbers_list;
for (const auto* cluster_config : new_cluster_list) {
auto& mappings = cluster_mappings[cluster_config->cluster_name];
new_numbers_list.emplace_back();
auto& new_numbers = new_numbers_list.back();
new_numbers.next_available_child_number =
mappings.next_available_child_number;
const auto& endpoint_config =
absl::get<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config->children);
const XdsEndpointResource::PriorityList& priority_list =
GetUpdatePriorityList(endpoint_config.endpoints.get());
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& localities = priority_list[priority].localities;
absl::optional<size_t> child_number;
// If one of the localities in this priority already existed, reuse its
// child number.
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
if (!child_number.has_value()) {
auto it = mappings.locality_child_map.find(locality_name);
if (it != mappings.locality_child_map.end()) {
child_number = it->second;
mappings.locality_child_map.erase(it);
// Remove localities that *used* to be in this child number, so
// that we don't incorrectly reuse this child number for a
// subsequent priority.
for (XdsLocalityName* old_locality :
mappings.child_locality_map[*child_number]) {
mappings.locality_child_map.erase(old_locality);
}
}
} else {
// Remove all localities that are now in this child number, so
// that we don't accidentally reuse this child number for a
// Now construct new state containing priority child numbers for the new
// cluster based on the maps constructed above.
ChildNameState new_child_name_state;
new_child_name_state.next_available_child_number =
child_name_state_.next_available_child_number;
const XdsEndpointResource::PriorityList& priority_list =
GetUpdatePriorityList(endpoint_config.endpoints.get());
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& localities = priority_list[priority].localities;
absl::optional<size_t> child_number;
// If one of the localities in this priority already existed, reuse its
// child number.
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
if (!child_number.has_value()) {
auto it = locality_child_map.find(locality_name);
if (it != locality_child_map.end()) {
child_number = it->second;
locality_child_map.erase(it);
// Remove localities that *used* to be in this child number, so
// that we don't incorrectly reuse this child number for a
// subsequent priority.
mappings.locality_child_map.erase(locality_name);
for (XdsLocalityName* old_locality :
child_locality_map[*child_number]) {
locality_child_map.erase(old_locality);
}
}
} else {
// Remove all localities that are now in this child number, so
// that we don't accidentally reuse this child number for a
// subsequent priority.
locality_child_map.erase(locality_name);
}
// If we didn't find an existing child number, assign a new one.
if (!child_number.has_value()) {
for (child_number = new_numbers.next_available_child_number;
mappings.child_locality_map.find(*child_number) !=
mappings.child_locality_map.end();
++(*child_number)) {
}
new_numbers.next_available_child_number = *child_number + 1;
// Add entry so we know that the child number is in use.
// (Don't need to add the list of localities, since we won't use them.)
mappings.child_locality_map[*child_number];
}
// If we didn't find an existing child number, assign a new one.
if (!child_number.has_value()) {
for (child_number = new_child_name_state.next_available_child_number;
child_locality_map.find(*child_number) != child_locality_map.end();
++(*child_number)) {
}
new_numbers.priority_child_numbers.push_back(*child_number);
new_child_name_state.next_available_child_number = *child_number + 1;
// Add entry so we know that the child number is in use.
// (Don't need to add the list of localities, since we won't use them.)
child_locality_map[*child_number];
}
new_child_name_state.priority_child_numbers.push_back(*child_number);
}
return new_numbers_list;
}
std::string MakeChildPolicyName(absl::string_view cluster,
size_t child_number) {
return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
"}");
return new_child_name_state;
}
Json CdsLb::CreateChildPolicyConfig(
const Json::Array& lb_policy_config,
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list) {
Json CdsLb::CreateChildPolicyConfigForLeafCluster(
const XdsConfig::ClusterConfig& new_cluster,
const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
const XdsClusterResource* aggregate_cluster_resource) {
const auto& cluster_resource = *new_cluster.cluster;
const bool is_logical_dns =
absl::holds_alternative<XdsClusterResource::LogicalDns>(
cluster_resource.type);
// Determine what xDS LB policy to use.
Json xds_lb_policy;
if (is_logical_dns) {
xds_lb_policy = Json::FromArray({
Json::FromObject({
{"pick_first", Json::FromObject({})},
}),
});
}
// TODO(roth): Remove this "else if" block after the 1.63 release.
else if (XdsAggregateClusterBackwardCompatibilityEnabled() &&
aggregate_cluster_resource != nullptr) {
xds_lb_policy =
Json::FromArray(aggregate_cluster_resource->lb_policy_config);
} else {
xds_lb_policy = Json::FromArray(new_cluster.cluster->lb_policy_config);
}
// Wrap it in the priority policy.
Json::Object priority_children;
Json::Array priority_priorities;
size_t numbers_index = 0;
for (const auto* cluster_config : new_cluster_list) {
const bool is_logical_dns =
absl::holds_alternative<XdsClusterResource::LogicalDns>(
cluster_config->cluster->type);
const auto& endpoint_config =
absl::get<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config->children);
const auto& priority_list =
GetUpdatePriorityList(endpoint_config.endpoints.get());
const auto& cluster_resource = *cluster_config->cluster;
GPR_ASSERT(numbers_index < child_name_state_list_.size());
const auto& child_numbers = child_name_state_list_[numbers_index++];
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
// Determine what xDS LB policy to use.
Json child_policy;
if (is_logical_dns) {
child_policy = Json::FromArray({
Json::FromObject({
{"pick_first", Json::FromObject({})},
}),
});
} else {
child_policy = Json::FromArray(lb_policy_config);
}
// Wrap the xDS LB policy in the xds_override_host policy.
Json::Object xds_override_host_lb_config = {
{"clusterName", Json::FromString(cluster_config->cluster_name)},
{"childPolicy", std::move(child_policy)},
};
Json::Array xds_override_host_config = {Json::FromObject({
{"xds_override_host_experimental",
Json::FromObject(std::move(xds_override_host_lb_config))},
})};
// Wrap it in the xds_cluster_impl policy.
Json::Array xds_cluster_impl_config = {Json::FromObject(
{{"xds_cluster_impl_experimental",
Json::FromObject({
{"clusterName", Json::FromString(cluster_config->cluster_name)},
{"childPolicy",
Json::FromArray(std::move(xds_override_host_config))},
})}})};
// Wrap it in the outlier_detection policy.
Json::Object outlier_detection_config;
if (cluster_resource.outlier_detection.has_value()) {
auto& outlier_detection_update = *cluster_resource.outlier_detection;
outlier_detection_config["interval"] =
Json::FromString(outlier_detection_update.interval.ToJsonString());
outlier_detection_config["baseEjectionTime"] = Json::FromString(
outlier_detection_update.base_ejection_time.ToJsonString());
outlier_detection_config["maxEjectionTime"] = Json::FromString(
outlier_detection_update.max_ejection_time.ToJsonString());
outlier_detection_config["maxEjectionPercent"] =
Json::FromNumber(outlier_detection_update.max_ejection_percent);
if (outlier_detection_update.success_rate_ejection.has_value()) {
outlier_detection_config["successRateEjection"] = Json::FromObject({
{"stdevFactor",
Json::FromNumber(outlier_detection_update.success_rate_ejection
->stdev_factor)},
{"enforcementPercentage",
Json::FromNumber(outlier_detection_update.success_rate_ejection
->enforcement_percentage)},
{"minimumHosts",
Json::FromNumber(outlier_detection_update.success_rate_ejection
->minimum_hosts)},
{"requestVolume",
Json::FromNumber(outlier_detection_update.success_rate_ejection
->request_volume)},
});
}
if (outlier_detection_update.failure_percentage_ejection.has_value()) {
outlier_detection_config["failurePercentageEjection"] =
Json::FromObject({
{"threshold",
Json::FromNumber(
outlier_detection_update.failure_percentage_ejection
->threshold)},
{"enforcementPercentage",
Json::FromNumber(
outlier_detection_update.failure_percentage_ejection
->enforcement_percentage)},
{"minimumHosts",
Json::FromNumber(
outlier_detection_update.failure_percentage_ejection
->minimum_hosts)},
{"requestVolume",
Json::FromNumber(
outlier_detection_update.failure_percentage_ejection
->request_volume)},
});
}
}
outlier_detection_config["childPolicy"] =
Json::FromArray(std::move(xds_cluster_impl_config));
Json locality_picking_policy = Json::FromArray({Json::FromObject({
{"outlier_detection_experimental",
Json::FromObject(std::move(outlier_detection_config))},
})});
// Add priority entry, with the appropriate child name.
std::string child_name =
MakeChildPolicyName(cluster_config->cluster_name,
child_numbers.priority_child_numbers[priority]);
priority_priorities.emplace_back(Json::FromString(child_name));
Json::Object child_config = {
{"config", std::move(locality_picking_policy)},
};
if (!is_logical_dns) {
child_config["ignore_reresolution_requests"] = Json::FromBool(true);
}
priority_children[child_name] = Json::FromObject(std::move(child_config));
const auto& priority_list =
GetUpdatePriorityList(endpoint_config.endpoints.get());
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
// Add priority entry, with the appropriate child name.
std::string child_name =
MakeChildPolicyName(new_cluster.cluster_name,
child_name_state_.priority_child_numbers[priority]);
priority_priorities.emplace_back(Json::FromString(child_name));
Json::Object child_config = {{"config", xds_lb_policy}};
if (!is_logical_dns) {
child_config["ignore_reresolution_requests"] = Json::FromBool(true);
}
priority_children[child_name] = Json::FromObject(std::move(child_config));
}
Json json = Json::FromArray({Json::FromObject({
Json priority_policy = Json::FromArray({Json::FromObject({
{"priority_experimental",
Json::FromObject({
{"children", Json::FromObject(std::move(priority_children))},
{"priorities", Json::FromArray(std::move(priority_priorities))},
})},
})});
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
JsonDump(json, /*indent=*/1).c_str());
}
return json;
}
class PriorityEndpointIterator : public EndpointAddressesIterator {
public:
struct ClusterEntry {
std::string cluster_name;
std::shared_ptr<const XdsEndpointResource> endpoints;
std::vector<size_t /*child_number*/> priority_child_numbers;
ClusterEntry(std::string cluster,
std::shared_ptr<const XdsEndpointResource> resource,
std::vector<size_t> child_numbers)
: cluster_name(std::move(cluster)),
endpoints(std::move(resource)),
priority_child_numbers(std::move(child_numbers)) {}
std::string GetChildPolicyName(size_t priority) const {
return MakeChildPolicyName(cluster_name,
priority_child_numbers[priority]);
}
// Wrap the priority policy in the xds_override_host policy.
Json xds_override_host_policy = Json::FromArray({Json::FromObject({
{"xds_override_host_experimental",
Json::FromObject({
{"clusterName", Json::FromString(new_cluster.cluster_name)},
{"childPolicy", std::move(priority_policy)},
})},
})});
// Wrap the xds_override_host policy in the xds_cluster_impl policy.
Json xds_cluster_impl_policy = Json::FromArray({Json::FromObject({
{"xds_cluster_impl_experimental",
Json::FromObject({
{"clusterName", Json::FromString(new_cluster.cluster_name)},
{"childPolicy", std::move(xds_override_host_policy)},
})},
})});
// Wrap the xds_cluster_impl policy in the outlier_detection policy.
Json::Object outlier_detection_config = {
{"childPolicy", std::move(xds_cluster_impl_policy)},
};
explicit PriorityEndpointIterator(std::vector<ClusterEntry> results)
: results_(std::move(results)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (const auto& entry : results_) {
const auto& priority_list = GetUpdatePriorityList(entry.endpoints.get());
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name = entry.GetChildPolicyName(priority);
for (const auto& p : priority_entry.localities) {
const auto& locality_name = p.first;
const auto& locality = p.second;
std::vector<RefCountedStringValue> hierarchical_path = {
RefCountedStringValue(priority_child_name),
RefCountedStringValue(locality_name->AsHumanReadableString())};
auto hierarchical_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
for (const auto& endpoint : locality.endpoints) {
uint32_t endpoint_weight =
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
callback(EndpointAddresses(
endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
.SetObject(locality_name->Ref())
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
}
}
}
if (cluster_resource.outlier_detection.has_value()) {
auto& outlier_detection_update = *cluster_resource.outlier_detection;
outlier_detection_config["interval"] =
Json::FromString(outlier_detection_update.interval.ToJsonString());
outlier_detection_config["baseEjectionTime"] = Json::FromString(
outlier_detection_update.base_ejection_time.ToJsonString());
outlier_detection_config["maxEjectionTime"] = Json::FromString(
outlier_detection_update.max_ejection_time.ToJsonString());
outlier_detection_config["maxEjectionPercent"] =
Json::FromNumber(outlier_detection_update.max_ejection_percent);
if (outlier_detection_update.success_rate_ejection.has_value()) {
outlier_detection_config["successRateEjection"] = Json::FromObject({
{"stdevFactor",
Json::FromNumber(
outlier_detection_update.success_rate_ejection->stdev_factor)},
{"enforcementPercentage",
Json::FromNumber(outlier_detection_update.success_rate_ejection
->enforcement_percentage)},
{"minimumHosts",
Json::FromNumber(
outlier_detection_update.success_rate_ejection->minimum_hosts)},
{"requestVolume",
Json::FromNumber(
outlier_detection_update.success_rate_ejection->request_volume)},
});
}
if (outlier_detection_update.failure_percentage_ejection.has_value()) {
outlier_detection_config["failurePercentageEjection"] = Json::FromObject({
{"threshold",
Json::FromNumber(outlier_detection_update
.failure_percentage_ejection->threshold)},
{"enforcementPercentage",
Json::FromNumber(
outlier_detection_update.failure_percentage_ejection
->enforcement_percentage)},
{"minimumHosts",
Json::FromNumber(outlier_detection_update
.failure_percentage_ejection->minimum_hosts)},
{"requestVolume",
Json::FromNumber(outlier_detection_update
.failure_percentage_ejection->request_volume)},
});
}
}
private:
std::vector<ClusterEntry> results_;
};
std::shared_ptr<EndpointAddressesIterator> CdsLb::CreateChildPolicyAddresses(
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list) {
std::vector<PriorityEndpointIterator::ClusterEntry> entries;
entries.reserve(new_cluster_list.size());
size_t numbers_index = 0;
for (const auto* cluster_config : new_cluster_list) {
GPR_ASSERT(numbers_index < child_name_state_list_.size());
const auto& endpoint_config =
absl::get<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config->children);
entries.emplace_back(
cluster_config->cluster_name, endpoint_config.endpoints,
child_name_state_list_[numbers_index++].priority_child_numbers);
Json outlier_detection_policy = Json::FromArray({Json::FromObject({
{"outlier_detection_experimental",
Json::FromObject(std::move(outlier_detection_config))},
})});
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
JsonDump(outlier_detection_policy, /*indent=*/1).c_str());
}
return std::make_shared<PriorityEndpointIterator>(std::move(entries));
return outlier_detection_policy;
}
std::string CdsLb::CreateChildPolicyResolutionNote(
const std::vector<const XdsConfig::ClusterConfig*>& new_cluster_list) {
std::vector<absl::string_view> resolution_notes;
for (const auto* cluster_config : new_cluster_list) {
const auto& endpoint_config =
absl::get<XdsConfig::ClusterConfig::EndpointConfig>(
cluster_config->children);
if (!endpoint_config.resolution_note.empty()) {
resolution_notes.push_back(endpoint_config.resolution_note);
}
Json CdsLb::CreateChildPolicyConfigForAggregateCluster(
const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
Json::Object priority_children;
Json::Array priority_priorities;
for (const absl::string_view& leaf_cluster : aggregate_config.leaf_clusters) {
priority_children[std::string(leaf_cluster)] = Json::FromObject({
{"config",
Json::FromArray({
Json::FromObject({
{"cds_experimental",
Json::FromObject({
{"cluster", Json::FromString(std::string(leaf_cluster))},
})},
}),
})},
});
priority_priorities.emplace_back(
Json::FromString(std::string(leaf_cluster)));
}
Json json = Json::FromArray({Json::FromObject({
{"priority_experimental",
Json::FromObject({
{"children", Json::FromObject(std::move(priority_children))},
{"priorities", Json::FromArray(std::move(priority_priorities))},
})},
})});
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
JsonDump(json, /*indent=*/1).c_str());
}
return absl::StrJoin(resolution_notes, "; ");
return json;
}
void CdsLb::ResetState() {
cluster_name_.clear();
xds_config_.reset();
child_name_state_list_.clear();
child_name_state_.Reset();
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());

@ -889,12 +889,14 @@ void XdsOverrideHostLb::UpdateAddressMap(
addresses.reserve(endpoint.addresses().size());
for (const auto& address : endpoint.addresses()) {
auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
if (key.ok()) {
if (!key.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: adding map key %s",
this, endpoint.ToString().c_str(), key->c_str());
"[xds_override_host_lb %p] no key for endpoint address; "
"not adding to map",
this);
}
} else {
addresses.push_back(*std::move(key));
}
}

@ -197,6 +197,7 @@ grpc_cc_test(
"//:grpc_resolver_fake",
"//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
"//test/cpp/end2end:connection_attempt_injector",
],
)

@ -32,6 +32,7 @@
#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -172,6 +173,144 @@ TEST_P(AggregateClusterTest, Basic) {
WaitForBackend(DEBUG_LOCATION, 0);
}
TEST_P(AggregateClusterTest, LoadBalancingPolicyComesFromUnderlyingCluster) {
CreateAndStartBackends(4);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
// Populate new EDS resources.
EdsResourceArgs args1({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
EdsResourceArgs args2({
{"locality0", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
// First cluster uses RING_HASH, second cluster uses ROUND_ROBIN.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
new_cluster1.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetRouteConfiguration(balancer_.get(), new_route_config);
// Traffic should all go to one of the two backends in the first
// cluster, because we're using RING_HASH.
CheckRpcSendOk(DEBUG_LOCATION, 100);
bool found = false;
for (size_t i = 0; i < 2; ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
// Now shut down backends 0 and 1, so that we fail over to the second cluster.
backends_[0]->StopListeningAndSendGoaways();
backends_[1]->StopListeningAndSendGoaways();
WaitForAllBackends(DEBUG_LOCATION, 2, 4);
// Traffic should be evenly split between the two backends, since the
// second cluster uses ROUND_ROBIN.
CheckRpcSendOk(DEBUG_LOCATION, 100);
EXPECT_EQ(backends_[2]->backend_service()->request_count(), 50);
EXPECT_EQ(backends_[3]->backend_service()->request_count(), 50);
}
// TODO(roth): Remove this after the 1.63 release.
TEST_P(AggregateClusterTest, LoadBalancingPolicyComesFromAggregateCluster) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
CreateAndStartBackends(4);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
// Populate new EDS resources.
EdsResourceArgs args1({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
EdsResourceArgs args2({
{"locality0", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
// First cluster uses RING_HASH, second cluster uses ROUND_ROBIN.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
new_cluster1.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
new_cluster2.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
cluster.set_lb_policy(Cluster::ROUND_ROBIN);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing, so that if we use ring_hash,
// all RPCs will go to the same endpoint.
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetRouteConfiguration(balancer_.get(), new_route_config);
// We should initially use the first cluster.
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Traffic should be evenly split between the two backends in the
// first cluster, because we're using ROUND_ROBIN.
CheckRpcSendOk(DEBUG_LOCATION, 100);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 50);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 50);
// Now shut down backends 0 and 1, so that we fail over to the second cluster.
backends_[0]->StopListeningAndSendGoaways();
backends_[1]->StopListeningAndSendGoaways();
WaitForAllBackends(DEBUG_LOCATION, 2, 4);
// Traffic should be evenly split between the two backends in the
// second cluster as well.
CheckRpcSendOk(DEBUG_LOCATION, 100);
EXPECT_EQ(backends_[2]->backend_service()->request_count(), 50);
EXPECT_EQ(backends_[3]->backend_service()->request_count(), 50);
}
// This test covers a bug found in the following scenario:
// 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1.
// 2. While P1 is still in CONNECTING, P0 goes back to READY, so we

@ -80,7 +80,7 @@ TEST_P(OutlierDetectionTest, SuccessRateEjectionAndUnejection) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -140,7 +140,7 @@ TEST_P(OutlierDetectionTest, SuccessRateMaxPercent) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -243,7 +243,7 @@ TEST_P(OutlierDetectionTest, SuccessRateStdevFactor) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -303,7 +303,7 @@ TEST_P(OutlierDetectionTest, SuccessRateEnforcementPercentage) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -363,7 +363,7 @@ TEST_P(OutlierDetectionTest, SuccessRateMinimumHosts) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -424,7 +424,7 @@ TEST_P(OutlierDetectionTest, SuccessRateRequestVolume) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -487,7 +487,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageEjectionAndUnejection) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -554,7 +554,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageMaxPercentage) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -653,7 +653,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageThreshold) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -714,7 +714,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageEnforcementPercentage) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -779,7 +779,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageMinimumHosts) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -842,7 +842,7 @@ TEST_P(OutlierDetectionTest, FailurePercentageRequestVolume) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -907,7 +907,7 @@ TEST_P(OutlierDetectionTest, SuccessRateAndFailurePercentage) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -1014,7 +1014,7 @@ TEST_P(OutlierDetectionTest, SuccessRateAndFailurePercentageBothDisabled) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -1070,7 +1070,7 @@ TEST_P(OutlierDetectionTest, DisableOutlierDetectionWhileAddressesAreEjected) {
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// Note each type of RPC will contain a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
@ -1115,6 +1115,58 @@ TEST_P(OutlierDetectionTest, DisableOutlierDetectionWhileAddressesAreEjected) {
.set_server_expected_error(StatusCode::CANCELLED));
}
TEST_P(OutlierDetectionTest, EjectionRetainedAcrossPriorities) {
CreateAndStartBackends(3);
auto cluster = default_cluster_;
// Setup outlier failure percentage parameters.
// Any failure will cause an potential ejection with the probability of 100%
// (to eliminate flakiness of the test).
auto* outlier_detection = cluster.mutable_outlier_detection();
SetProtoDuration(grpc_core::Duration::Seconds(1),
outlier_detection->mutable_interval());
SetProtoDuration(grpc_core::Duration::Minutes(10),
outlier_detection->mutable_base_ejection_time());
outlier_detection->mutable_failure_percentage_threshold()->set_value(0);
outlier_detection->mutable_enforcing_failure_percentage()->set_value(100);
outlier_detection->mutable_failure_percentage_minimum_hosts()->set_value(1);
outlier_detection->mutable_failure_percentage_request_volume()->set_value(1);
balancer_->ads_service()->SetCdsResource(cluster);
// Priority 0: backend 0 and a non-existent backend.
// Priority 1: backend 1.
EdsResourceArgs args({
{"locality0", {CreateEndpoint(0), MakeNonExistantEndpoint()}},
{"locality1", {CreateEndpoint(1)}, kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// Trigger an error to backend 0.
// The success rate enforcement_percentage is 100%, so this will cause
// the backend to be ejected when the ejection timer fires.
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::CANCELLED, "",
RpcOptions().set_server_expected_error(StatusCode::CANCELLED));
// Wait for traffic aimed at backend 0 to start going to backend 1.
// This tells us that backend 0 has been ejected.
// It should take no more than one ejection timer interval.
WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
WaitForBackendOptions().set_timeout_ms(
3000 * grpc_test_slowdown_factor()));
// Now send an EDS update that moves backend 0 to priority 1.
// We also add backend 2, so that we know when the client sees the update.
args = EdsResourceArgs({
{"locality0", {MakeNonExistantEndpoint()}},
{"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 2);
// Now send 100 RPCs and make sure they all go to backends 1 and 2,
// because backend 0 should still be ejected.
CheckRpcSendOk(DEBUG_LOCATION, 100);
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(50, backends_[1]->backend_service()->request_count());
EXPECT_EQ(50, backends_[2]->backend_service()->request_count());
}
} // namespace
} // namespace testing
} // namespace grpc

@ -268,7 +268,7 @@ TEST_P(OverrideHostTest, HappyPath) {
WaitForAllBackends(DEBUG_LOCATION);
// Get cookie for backend #0.
auto cookies = GetCookiesForBackend(DEBUG_LOCATION, 0);
EXPECT_THAT(cookies,
ASSERT_THAT(cookies,
::testing::ElementsAre(::testing::AllOf(
::testing::Field("name", &Cookie::name, kCookieName),
::testing::Field("attributes", &Cookie::attributes,
@ -293,6 +293,91 @@ TEST_P(OverrideHostTest, HappyPath) {
EXPECT_EQ(backends_[0]->backend_service2()->request_count(), 5);
}
TEST_P(OverrideHostTest, AffinityWorksAcrossPriorities) {
CreateAndStartBackends(3);
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
// Locality 0 contains backends 0 and 1. We start with this locality
// in priority 0.
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0", CreateEndpointsForBackends(0, 2)}})));
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Get cookie for backend 1.
auto cookies = GetCookiesForBackend(DEBUG_LOCATION, 1);
ASSERT_THAT(cookies,
::testing::ElementsAre(::testing::AllOf(
::testing::Field("name", &Cookie::name, kCookieName),
::testing::Field("attributes", &Cookie::attributes,
::testing::ElementsAre("HttpOnly")),
::testing::Field("value", &Cookie::value,
::testing::Not(::testing::IsEmpty())))));
// The cookie should send all traffic to backend 1.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
// Send an update that moves locality 0 to priority 1.
// Add a new locality in priority 0 containing backend 2.
balancer_->ads_service()->SetEdsResource(BuildEdsResource(EdsResourceArgs({
{"locality1", CreateEndpointsForBackends(2, 3)},
{"locality0", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
/*priority=*/1},
})));
WaitForBackend(DEBUG_LOCATION, 2);
// Using the cookie should continue to send traffic to backend 1.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
}
TEST_P(OverrideHostTest,
AffinityWorksAcrossPrioritiesHeuristicChangesChildName) {
CreateAndStartBackends(3);
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
// Priority 0:
// - locality 0: backend 0
// - locality 1: backend 1
balancer_->ads_service()->SetEdsResource(BuildEdsResource(EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 1)},
{"locality1", CreateEndpointsForBackends(1, 2)},
})));
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Get cookie for backend 1.
// It may take more requests than usual to hit the backend we want,
// since the weighted_target policy does not do a strict round-robin.
auto cookies =
GetCookiesForBackend(DEBUG_LOCATION, 1, /*max_requests_per_backend=*/10);
ASSERT_THAT(cookies,
::testing::ElementsAre(::testing::AllOf(
::testing::Field("name", &Cookie::name, kCookieName),
::testing::Field("attributes", &Cookie::attributes,
::testing::ElementsAre("HttpOnly")),
::testing::Field("value", &Cookie::value,
::testing::Not(::testing::IsEmpty())))));
// The cookie should send all traffic to backend 1.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
// Priority 0:
// - locality 0: backend 0
// - locality 2: backend 2
// Priority 1:
// - locality 1: backend 1
balancer_->ads_service()->SetEdsResource(BuildEdsResource(EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 1)},
{"locality2", CreateEndpointsForBackends(2, 3)},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
/*priority=*/1},
})));
WaitForBackend(DEBUG_LOCATION, 2);
// Using the cookie should continue to send traffic to backend 1.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
}
TEST_P(OverrideHostTest, DrainingIncludedFromOverrideSet) {
CreateAndStartBackends(3);
Cluster cluster = default_cluster_;
@ -641,7 +726,7 @@ TEST_P(OverrideHostTest, MultipleAddressesPerEndpoint) {
ResetBackendCounters();
// Get cookie for backend 1.
auto cookies = GetCookiesForBackend(DEBUG_LOCATION, 1);
EXPECT_THAT(cookies,
ASSERT_THAT(cookies,
::testing::ElementsAre(::testing::AllOf(
::testing::Field("name", &Cookie::name, kCookieName),
::testing::Field("attributes", &Cookie::attributes,

@ -117,15 +117,16 @@ TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
new_cluster1.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
new_cluster2.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
@ -175,6 +176,7 @@ TEST_P(RingHashTest,
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
eds_cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
@ -191,7 +193,6 @@ TEST_P(RingHashTest,
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
@ -243,6 +244,7 @@ TEST_P(RingHashTest,
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
eds_cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
@ -259,7 +261,6 @@ TEST_P(RingHashTest,
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;

Loading…
Cancel
Save