diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index c75a8d1a397..84afb0bbc40 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -379,8 +380,9 @@ RingHash::Ring::Ring(RingHash* parent, AddressWeight address_weight; address_weight.address = grpc_sockaddr_to_string(&sd->address().address(), false).value(); - if (weight_attribute != nullptr) { - GPR_ASSERT(weight_attribute->weight() != 0); + // Weight should never be zero, but ignore it just in case, since + // that value would screw up the ring-building algorithm. + if (weight_attribute != nullptr && weight_attribute->weight() > 0) { address_weight.weight = weight_attribute->weight(); } sum += address_weight.weight; @@ -800,16 +802,7 @@ void RingHash::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", this, args.addresses->size()); } - // Filter out any address with weight 0. - addresses.reserve(args.addresses->size()); - for (ServerAddress& address : *args.addresses) { - const ServerAddressWeightAttribute* weight_attribute = - static_cast(address.GetAttribute( - ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); - if (weight_attribute == nullptr || weight_attribute->weight() > 0) { - addresses.emplace_back(std::move(address)); - } - } + addresses = *std::move(args.addresses); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index 1c23cad7619..9228d204b8c 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -164,15 +164,15 @@ grpc_error_handle ServerAddressParseAndAppend( return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port."); } // Find load_balancing_weight for the endpoint. + uint32_t weight = 1; const google_protobuf_UInt32Value* load_balancing_weight = envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint); - const int32_t weight = - load_balancing_weight != nullptr - ? google_protobuf_UInt32Value_value(load_balancing_weight) - : 500; - if (weight == 0) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Invalid endpoint weight of 0."); + if (load_balancing_weight != nullptr) { + weight = google_protobuf_UInt32Value_value(load_balancing_weight); + if (weight == 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Invalid endpoint weight of 0."); + } } // Populate grpc_resolved_address. grpc_resolved_address addr; diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 8776e3ec42a..f2f84f5cfeb 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -609,6 +609,20 @@ TEST_P(EdsTest, NacksDuplicateLocalityInSamePriority) { "found in priority 0")); } +TEST_P(EdsTest, NacksEndpointWeightZero) { + EdsResourceArgs args({{"locality0", {MakeNonExistantEndpoint()}}}); + auto eds_resource = BuildEdsResource(args); + eds_resource.mutable_endpoints(0) + ->mutable_lb_endpoints(0) + ->mutable_load_balancing_weight() + ->set_value(0); + balancer_->ads_service()->SetEdsResource(eds_resource); + const auto response_state = WaitForEdsNack(DEBUG_LOCATION); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("Invalid endpoint weight of 0.")); +} + // Tests that if the balancer is down, the RPCs will still be sent to the // backends according to the last balancer response, until a new balancer is // reachable. diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index c8bf07cf763..c94ee35a50d 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -368,6 +368,52 @@ TEST_P(RingHashTest, NoHashPolicy) { ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); } +// Tests that we observe endpoint weights. +TEST_P(RingHashTest, EndpointWeights) { + CreateAndStartBackends(3); + const double kDistribution50Percent = 0.5; + const double kDistribution25Percent = 0.25; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + // Endpoint 0 has weight 0, will be treated as weight 1. + // Endpoint 1 has weight 1. + // Endpoint 2 has weight 2. + EdsResourceArgs args( + {{"locality0", + {CreateEndpoint(0, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, + 0), + CreateEndpoint(1, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, + 1), + CreateEndpoint(2, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, + 2)}}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr, + WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs); + // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should + // each see 25% of traffic. + const int request_count_0 = backends_[0]->backend_service()->request_count(); + const int request_count_1 = backends_[1]->backend_service()->request_count(); + const int request_count_2 = backends_[2]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_0) / kNumRpcs, + ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); +} + // Test that ring hash policy evaluation will continue past the terminal // policy if no results are produced yet. TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {