diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index aee1544c24d..4f79a3f6747 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -19,9 +19,11 @@ #include "src/core/ext/xds/xds_endpoint.h" #include +#include #include #include +#include #include #include "absl/status/status.h" @@ -44,6 +46,7 @@ #include "src/core/ext/xds/upb_utils.h" #include "src/core/ext/xds/xds_resource_type.h" #include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/validation_errors.h" @@ -221,9 +224,19 @@ struct ParsedLocality { XdsEndpointResource::Priority::Locality locality; }; +struct ResolvedAddressLessThan { + bool operator()(const grpc_resolved_address& a1, + const grpc_resolved_address& a2) const { + if (a1.len != a2.len) return a1.len < a2.len; + return memcmp(a1.addr, a2.addr, a1.len) < 0; + } +}; +using ResolvedAddressSet = + std::set; + absl::optional LocalityParse( const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints, - ValidationErrors* errors) { + ResolvedAddressSet* address_set, ValidationErrors* errors) { const size_t original_error_size = errors->size(); ParsedLocality parsed_locality; // load_balancing_weight @@ -265,6 +278,13 @@ absl::optional LocalityParse( absl::StrCat(".lb_endpoints[", i, "]")); auto address = ServerAddressParse(lb_endpoints[i], errors); if (address.has_value()) { + bool inserted = address_set->insert(address->address()).second; + if (!inserted) { + errors->AddError(absl::StrCat( + "duplicate endpoint address \"", + grpc_sockaddr_to_uri(&address->address()).value_or(""), + "\"")); + } parsed_locality.locality.endpoints.push_back(std::move(*address)); } } @@ -335,13 +355,14 @@ absl::StatusOr EdsResourceParse( // endpoints { ValidationErrors::ScopedField field(&errors, "endpoints"); + ResolvedAddressSet address_set; size_t locality_size; const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( cluster_load_assignment, &locality_size); for (size_t i = 0; i < locality_size; ++i) { ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]")); - auto parsed_locality = LocalityParse(endpoints[i], &errors); + auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors); if (parsed_locality.has_value()) { GPR_ASSERT(parsed_locality->locality.lb_weight != 0); // Make sure prorities is big enough. Note that they might not diff --git a/test/core/xds/xds_endpoint_resource_type_test.cc b/test/core/xds/xds_endpoint_resource_type_test.cc index c60de87fb39..f5dc9f003f6 100644 --- a/test/core/xds/xds_endpoint_resource_type_test.cc +++ b/test/core/xds/xds_endpoint_resource_type_test.cc @@ -562,7 +562,17 @@ TEST_F(XdsEndpointTest, DuplicateLocalityName) { socket_address->set_address("127.0.0.1"); socket_address->set_port_value(443); locality = cla.add_endpoints(); - *locality = cla.endpoints(0); + locality->mutable_load_balancing_weight()->set_value(1); + locality_name = locality->mutable_locality(); + locality_name->set_region("myregion"); + locality_name->set_zone("myzone"); + locality_name->set_sub_zone("mysubzone"); + socket_address = locality->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("127.0.0.2"); + socket_address->set_port_value(443); std::string serialized_resource; ASSERT_TRUE(cla.SerializeToString(&serialized_resource)); auto* resource_type = XdsEndpointResourceType::Get(); @@ -596,7 +606,17 @@ TEST_F(XdsEndpointTest, SparsePriorityList) { socket_address->set_port_value(443); locality->set_priority(1); locality = cla.add_endpoints(); - *locality = cla.endpoints(0); + locality->mutable_load_balancing_weight()->set_value(1); + locality_name = locality->mutable_locality(); + locality_name->set_region("myregion2"); + locality_name->set_zone("myzone"); + locality_name->set_sub_zone("mysubzone"); + socket_address = locality->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("127.0.0.2"); + socket_address->set_port_value(443); locality->set_priority(3); std::string serialized_resource; ASSERT_TRUE(cla.SerializeToString(&serialized_resource)); @@ -632,10 +652,19 @@ TEST_F(XdsEndpointTest, LocalityWeightsWithinPriorityExceedUint32Max) { locality->set_priority(0); // Second locality has weight of uint32 max. locality = cla.add_endpoints(); - *locality = cla.endpoints(0); // Copy first locality. - locality->mutable_locality()->set_region("myregion2"); locality->mutable_load_balancing_weight()->set_value( std::numeric_limits::max()); + locality_name = locality->mutable_locality(); + locality_name->set_region("myregion2"); + locality_name->set_zone("myzone"); + locality_name->set_sub_zone("mysubzone"); + socket_address = locality->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("127.0.0.2"); + socket_address->set_port_value(443); + locality->set_priority(0); std::string serialized_resource; ASSERT_TRUE(cla.SerializeToString(&serialized_resource)); auto* resource_type = XdsEndpointResourceType::Get(); @@ -652,6 +681,51 @@ TEST_F(XdsEndpointTest, LocalityWeightsWithinPriorityExceedUint32Max) { << decode_result.resource.status(); } +TEST_F(XdsEndpointTest, DuplicateAddresses) { + ClusterLoadAssignment cla; + cla.set_cluster_name("foo"); + auto* locality = cla.add_endpoints(); + locality->mutable_load_balancing_weight()->set_value(1); + auto* locality_name = locality->mutable_locality(); + locality_name->set_region("myregion"); + locality_name->set_zone("myzone"); + locality_name->set_sub_zone("mysubzone"); + auto* socket_address = locality->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("127.0.0.1"); + socket_address->set_port_value(443); + locality->set_priority(0); + locality = cla.add_endpoints(); + locality->mutable_load_balancing_weight()->set_value(1); + locality_name = locality->mutable_locality(); + locality_name->set_region("myregion2"); + locality_name->set_zone("myzone"); + locality_name->set_sub_zone("mysubzone"); + socket_address = locality->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("127.0.0.1"); + socket_address->set_port_value(443); + locality->set_priority(0); + std::string serialized_resource; + ASSERT_TRUE(cla.SerializeToString(&serialized_resource)); + auto* resource_type = XdsEndpointResourceType::Get(); + auto decode_result = resource_type->Decode( + decode_context_, serialized_resource, /*is_v2=*/false); + ASSERT_TRUE(decode_result.name.has_value()); + EXPECT_EQ(*decode_result.name, "foo"); + EXPECT_EQ(decode_result.resource.status().code(), + absl::StatusCode::kInvalidArgument); + EXPECT_EQ(decode_result.resource.status().message(), + "errors parsing EDS resource: [" + "field:endpoints[1].lb_endpoints[0] " + "error:duplicate endpoint address \"ipv4:127.0.0.1:443\"]") + << decode_result.resource.status(); +} + TEST_F(XdsEndpointTest, DropConfig) { ClusterLoadAssignment cla; cla.set_cluster_name("foo"); diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 99d0cf13e29..8e35e7b15ed 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -421,27 +421,6 @@ TEST_P(EdsTest, IgnoresUnhealthyEndpoints) { } } -// Tests that subchannel sharing works when the same backend is listed -// multiple times. -TEST_P(EdsTest, SameBackendListedMultipleTimes) { - CreateAndStartBackends(1); - // Same backend listed twice. - auto endpoints = CreateEndpointsForBackends(); - endpoints.push_back(endpoints.front()); - EdsResourceArgs args({{"locality0", endpoints}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // We need to wait for the backend to come online. - WaitForAllBackends(DEBUG_LOCATION, /*start_index=*/0, /*stop_index=*/0, - /*check_status=*/nullptr, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(2000)); - // Send kNumRpcsPerAddress RPCs per server. - const size_t kNumRpcsPerAddress = 10; - CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * endpoints.size()); - // Backend should have gotten 20 requests. - EXPECT_EQ(kNumRpcsPerAddress * endpoints.size(), - backends_[0]->backend_service()->request_count()); -} - TEST_P(EdsTest, OneLocalityWithNoEndpoints) { CreateAndStartBackends(1); // Initial EDS resource has one locality with no endpoints. @@ -712,28 +691,31 @@ TEST_P(EdsTest, LocalityContainingNoEndpoints) { // Tests that the locality map can work properly even when it contains a large // number of localities. TEST_P(EdsTest, ManyLocalitiesStressTest) { - CreateAndStartBackends(2); - const size_t kNumLocalities = 100; + const size_t kNumLocalities = 50; + CreateAndStartBackends(kNumLocalities + 1); const uint32_t kRpcTimeoutMs = 5000; // The first ADS response contains kNumLocalities localities, each of which - // contains backend 0. + // contains its own backend. EdsResourceArgs args; for (size_t i = 0; i < kNumLocalities; ++i) { std::string name = absl::StrCat("locality", i); - EdsResourceArgs::Locality locality(name, CreateEndpointsForBackends(0, 1)); + EdsResourceArgs::Locality locality(name, + CreateEndpointsForBackends(i, i + 1)); args.locality_list.emplace_back(std::move(locality)); } balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Wait until backend 0 is ready. - WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr, - WaitForBackendOptions().set_reset_counters(false), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - EXPECT_EQ(0U, backends_[1]->backend_service()->request_count()); - // The second ADS response contains 1 locality, which contains backend 1. - args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + // Wait until all backends are ready. + WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities, + /*check_status=*/nullptr, + WaitForBackendOptions().set_reset_counters(false), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + // The second ADS response contains 1 locality, which contains backend 50. + args = + EdsResourceArgs({{"locality0", CreateEndpointsForBackends( + kNumLocalities, kNumLocalities + 1)}}); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Wait until backend 1 is ready. - WaitForBackend(DEBUG_LOCATION, 1); + // Wait until backend 50 is ready. + WaitForBackend(DEBUG_LOCATION, kNumLocalities); } // Tests that the localities in a locality map are picked correctly after