xDS: fix endpoint weight defaults (#30079)

* xDS: fix endpoint weight defaults

* add test

* NACK endpoint weight of 0

* iwyu

* clang-tidy

* fix test
pull/30001/head^2
Mark D. Roth 3 years ago committed by GitHub
parent e026f5759c
commit d0c8b29ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  2. 14
      src/core/ext/xds/xds_endpoint.cc
  3. 14
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  4. 46
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -25,6 +25,7 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <type_traits>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -379,8 +380,9 @@ RingHash::Ring::Ring(RingHash* parent,
AddressWeight address_weight; AddressWeight address_weight;
address_weight.address = address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false).value(); grpc_sockaddr_to_string(&sd->address().address(), false).value();
if (weight_attribute != nullptr) { // Weight should never be zero, but ignore it just in case, since
GPR_ASSERT(weight_attribute->weight() != 0); // that value would screw up the ring-building algorithm.
if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
address_weight.weight = weight_attribute->weight(); address_weight.weight = weight_attribute->weight();
} }
sum += address_weight.weight; sum += address_weight.weight;
@ -800,16 +802,7 @@ void RingHash::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size()); this, args.addresses->size());
} }
// Filter out any address with weight 0. addresses = *std::move(args.addresses);
addresses.reserve(args.addresses->size());
for (ServerAddress& address : *args.addresses) {
const ServerAddressWeightAttribute* weight_attribute =
static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
addresses.emplace_back(std::move(address));
}
}
} else { } else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",

@ -164,15 +164,15 @@ grpc_error_handle ServerAddressParseAndAppend(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port."); return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
} }
// Find load_balancing_weight for the endpoint. // Find load_balancing_weight for the endpoint.
uint32_t weight = 1;
const google_protobuf_UInt32Value* load_balancing_weight = const google_protobuf_UInt32Value* load_balancing_weight =
envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint); envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
const int32_t weight = if (load_balancing_weight != nullptr) {
load_balancing_weight != nullptr weight = google_protobuf_UInt32Value_value(load_balancing_weight);
? google_protobuf_UInt32Value_value(load_balancing_weight) if (weight == 0) {
: 500; return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
if (weight == 0) { "Invalid endpoint weight of 0.");
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( }
"Invalid endpoint weight of 0.");
} }
// Populate grpc_resolved_address. // Populate grpc_resolved_address.
grpc_resolved_address addr; grpc_resolved_address addr;

@ -609,6 +609,20 @@ TEST_P(EdsTest, NacksDuplicateLocalityInSamePriority) {
"found in priority 0")); "found in priority 0"));
} }
TEST_P(EdsTest, NacksEndpointWeightZero) {
EdsResourceArgs args({{"locality0", {MakeNonExistantEndpoint()}}});
auto eds_resource = BuildEdsResource(args);
eds_resource.mutable_endpoints(0)
->mutable_lb_endpoints(0)
->mutable_load_balancing_weight()
->set_value(0);
balancer_->ads_service()->SetEdsResource(eds_resource);
const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("Invalid endpoint weight of 0."));
}
// Tests that if the balancer is down, the RPCs will still be sent to the // Tests that if the balancer is down, the RPCs will still be sent to the
// backends according to the last balancer response, until a new balancer is // backends according to the last balancer response, until a new balancer is
// reachable. // reachable.

@ -368,6 +368,52 @@ TEST_P(RingHashTest, NoHashPolicy) {
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
} }
// Tests that we observe endpoint weights.
TEST_P(RingHashTest, EndpointWeights) {
CreateAndStartBackends(3);
const double kDistribution50Percent = 0.5;
const double kDistribution25Percent = 0.25;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
// Endpoint 0 has weight 0, will be treated as weight 1.
// Endpoint 1 has weight 1.
// Endpoint 2 has weight 2.
EdsResourceArgs args(
{{"locality0",
{CreateEndpoint(0, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
0),
CreateEndpoint(1, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
1),
CreateEndpoint(2, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
2)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
// each see 25% of traffic.
const int request_count_0 = backends_[0]->backend_service()->request_count();
const int request_count_1 = backends_[1]->backend_service()->request_count();
const int request_count_2 = backends_[2]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Test that ring hash policy evaluation will continue past the terminal // Test that ring hash policy evaluation will continue past the terminal
// policy if no results are produced yet. // policy if no results are produced yet.
TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) { TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {

Loading…
Cancel
Save