xDS: NACK EDS resources with duplicate addresses (#31321)

* xDS: NACK EDS resources with duplicate addresses

* iwyu

* update e2e tests

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/31338/head
Mark D. Roth 2 years ago committed by GitHub
parent a1cb2f3d6f
commit 3475489bf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/core/ext/xds/xds_endpoint.cc
  2. 82
      test/core/xds/xds_endpoint_resource_type_test.cc
  3. 50
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc

@ -19,9 +19,11 @@
#include "src/core/ext/xds/xds_endpoint.h"
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <limits>
#include <set>
#include <vector>
#include "absl/status/status.h"
@ -44,6 +46,7 @@
#include "src/core/ext/xds/upb_utils.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/validation_errors.h"
@ -221,9 +224,19 @@ struct ParsedLocality {
XdsEndpointResource::Priority::Locality locality;
};
struct ResolvedAddressLessThan {
bool operator()(const grpc_resolved_address& a1,
const grpc_resolved_address& a2) const {
if (a1.len != a2.len) return a1.len < a2.len;
return memcmp(a1.addr, a2.addr, a1.len) < 0;
}
};
using ResolvedAddressSet =
std::set<grpc_resolved_address, ResolvedAddressLessThan>;
absl::optional<ParsedLocality> LocalityParse(
const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
ValidationErrors* errors) {
ResolvedAddressSet* address_set, ValidationErrors* errors) {
const size_t original_error_size = errors->size();
ParsedLocality parsed_locality;
// load_balancing_weight
@ -265,6 +278,13 @@ absl::optional<ParsedLocality> LocalityParse(
absl::StrCat(".lb_endpoints[", i, "]"));
auto address = ServerAddressParse(lb_endpoints[i], errors);
if (address.has_value()) {
bool inserted = address_set->insert(address->address()).second;
if (!inserted) {
errors->AddError(absl::StrCat(
"duplicate endpoint address \"",
grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"),
"\""));
}
parsed_locality.locality.endpoints.push_back(std::move(*address));
}
}
@ -335,13 +355,14 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
// endpoints
{
ValidationErrors::ScopedField field(&errors, "endpoints");
ResolvedAddressSet address_set;
size_t locality_size;
const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
cluster_load_assignment, &locality_size);
for (size_t i = 0; i < locality_size; ++i) {
ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
auto parsed_locality = LocalityParse(endpoints[i], &errors);
auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
if (parsed_locality.has_value()) {
GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
// Make sure prorities is big enough. Note that they might not

@ -562,7 +562,17 @@ TEST_F(XdsEndpointTest, DuplicateLocalityName) {
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
locality = cla.add_endpoints();
*locality = cla.endpoints(0);
locality->mutable_load_balancing_weight()->set_value(1);
locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
socket_address = locality->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.2");
socket_address->set_port_value(443);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
@ -596,7 +606,17 @@ TEST_F(XdsEndpointTest, SparsePriorityList) {
socket_address->set_port_value(443);
locality->set_priority(1);
locality = cla.add_endpoints();
*locality = cla.endpoints(0);
locality->mutable_load_balancing_weight()->set_value(1);
locality_name = locality->mutable_locality();
locality_name->set_region("myregion2");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
socket_address = locality->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.2");
socket_address->set_port_value(443);
locality->set_priority(3);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
@ -632,10 +652,19 @@ TEST_F(XdsEndpointTest, LocalityWeightsWithinPriorityExceedUint32Max) {
locality->set_priority(0);
// Second locality has weight of uint32 max.
locality = cla.add_endpoints();
*locality = cla.endpoints(0); // Copy first locality.
locality->mutable_locality()->set_region("myregion2");
locality->mutable_load_balancing_weight()->set_value(
std::numeric_limits<uint32_t>::max());
locality_name = locality->mutable_locality();
locality_name->set_region("myregion2");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
socket_address = locality->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.2");
socket_address->set_port_value(443);
locality->set_priority(0);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
@ -652,6 +681,51 @@ TEST_F(XdsEndpointTest, LocalityWeightsWithinPriorityExceedUint32Max) {
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, DuplicateAddresses) {
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* socket_address = locality->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
locality->set_priority(0);
locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
locality_name = locality->mutable_locality();
locality_name->set_region("myregion2");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
socket_address = locality->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
locality->set_priority(0);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result = resource_type->Decode(
decode_context_, serialized_resource, /*is_v2=*/false);
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
EXPECT_EQ(decode_result.resource.status().code(),
absl::StatusCode::kInvalidArgument);
EXPECT_EQ(decode_result.resource.status().message(),
"errors parsing EDS resource: ["
"field:endpoints[1].lb_endpoints[0] "
"error:duplicate endpoint address \"ipv4:127.0.0.1:443\"]")
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, DropConfig) {
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");

@ -421,27 +421,6 @@ TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
}
}
// Tests that subchannel sharing works when the same backend is listed
// multiple times.
TEST_P(EdsTest, SameBackendListedMultipleTimes) {
CreateAndStartBackends(1);
// Same backend listed twice.
auto endpoints = CreateEndpointsForBackends();
endpoints.push_back(endpoints.front());
EdsResourceArgs args({{"locality0", endpoints}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// We need to wait for the backend to come online.
WaitForAllBackends(DEBUG_LOCATION, /*start_index=*/0, /*stop_index=*/0,
/*check_status=*/nullptr, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(2000));
// Send kNumRpcsPerAddress RPCs per server.
const size_t kNumRpcsPerAddress = 10;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * endpoints.size());
// Backend should have gotten 20 requests.
EXPECT_EQ(kNumRpcsPerAddress * endpoints.size(),
backends_[0]->backend_service()->request_count());
}
TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
CreateAndStartBackends(1);
// Initial EDS resource has one locality with no endpoints.
@ -712,28 +691,31 @@ TEST_P(EdsTest, LocalityContainingNoEndpoints) {
// Tests that the locality map can work properly even when it contains a large
// number of localities.
TEST_P(EdsTest, ManyLocalitiesStressTest) {
CreateAndStartBackends(2);
const size_t kNumLocalities = 100;
const size_t kNumLocalities = 50;
CreateAndStartBackends(kNumLocalities + 1);
const uint32_t kRpcTimeoutMs = 5000;
// The first ADS response contains kNumLocalities localities, each of which
// contains backend 0.
// contains its own backend.
EdsResourceArgs args;
for (size_t i = 0; i < kNumLocalities; ++i) {
std::string name = absl::StrCat("locality", i);
EdsResourceArgs::Locality locality(name, CreateEndpointsForBackends(0, 1));
EdsResourceArgs::Locality locality(name,
CreateEndpointsForBackends(i, i + 1));
args.locality_list.emplace_back(std::move(locality));
}
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until backend 0 is ready.
WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
// The second ADS response contains 1 locality, which contains backend 1.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
// Wait until all backends are ready.
WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
/*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
// The second ADS response contains 1 locality, which contains backend 50.
args =
EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
kNumLocalities, kNumLocalities + 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until backend 1 is ready.
WaitForBackend(DEBUG_LOCATION, 1);
// Wait until backend 50 is ready.
WaitForBackend(DEBUG_LOCATION, kNumLocalities);
}
// Tests that the localities in a locality map are picked correctly after

Loading…
Cancel
Save