[xDS] add support for multiple addresses per endpoint (#34506)

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/34550/head
Mark D. Roth 1 year ago committed by GitHub
parent 784e3b1c40
commit 24f995538c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build_autogenerated.yaml
  2. 103
      src/core/ext/xds/xds_endpoint.cc
  3. 12
      src/proto/grpc/testing/xds/v3/endpoint.proto
  4. 1
      test/core/xds/BUILD
  5. 250
      test/core/xds/xds_endpoint_resource_type_test.cc
  6. 1
      test/cpp/end2end/xds/BUILD
  7. 93
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  8. 24
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  9. 19
      test/cpp/end2end/xds/xds_end2end_test_lib.h

@ -17430,6 +17430,7 @@ targets:
run: false
language: c++
headers:
- test/core/util/scoped_env_var.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
@ -17831,7 +17832,8 @@ targets:
gtest: true
build: test
language: c++
headers: []
headers:
- test/core/util/scoped_env_var.h
src:
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/base.proto

@ -50,11 +50,26 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_core {
namespace {
// TODO(roth): Remove this once dualstack support is stable.
bool XdsDualstackEndpointsEnabled() {
auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
if (!value.has_value()) return false;
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
return parse_succeeded && parsed_value;
}
} // namespace
//
// XdsEndpointResource
//
@ -150,6 +165,38 @@ void MaybeLogClusterLoadAssignment(
}
}
absl::optional<grpc_resolved_address> ParseCoreAddress(
const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
if (address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
ValidationErrors::ScopedField field(errors, ".socket_address");
const envoy_config_core_v3_SocketAddress* socket_address =
envoy_config_core_v3_Address_socket_address(address);
if (socket_address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
std::string address_str = UpbStringToStdString(
envoy_config_core_v3_SocketAddress_address(socket_address));
uint32_t port;
{
ValidationErrors::ScopedField field(errors, ".port_value");
port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
if (GPR_UNLIKELY(port >> 16) != 0) {
errors->AddError("invalid port");
return absl::nullopt;
}
}
auto addr = StringToSockaddr(address_str, port);
if (!addr.ok()) {
errors->AddError(addr.status().message());
return absl::nullopt;
}
return *addr;
}
absl::optional<EndpointAddresses> EndpointAddressesParse(
const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
ValidationErrors* errors) {
@ -172,8 +219,7 @@ absl::optional<EndpointAddresses> EndpointAddressesParse(
}
}
// endpoint
// TODO(roth): add support for multiple addresses per endpoint
grpc_resolved_address grpc_address;
std::vector<grpc_resolved_address> addresses;
{
ValidationErrors::ScopedField field(errors, ".endpoint");
const envoy_config_endpoint_v3_Endpoint* endpoint =
@ -182,43 +228,34 @@ absl::optional<EndpointAddresses> EndpointAddressesParse(
errors->AddError("field not present");
return absl::nullopt;
}
ValidationErrors::ScopedField field2(errors, ".address");
const envoy_config_core_v3_Address* address =
envoy_config_endpoint_v3_Endpoint_address(endpoint);
if (address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
ValidationErrors::ScopedField field3(errors, ".socket_address");
const envoy_config_core_v3_SocketAddress* socket_address =
envoy_config_core_v3_Address_socket_address(address);
if (socket_address == nullptr) {
errors->AddError("field not present");
return absl::nullopt;
}
std::string address_str = UpbStringToStdString(
envoy_config_core_v3_SocketAddress_address(socket_address));
uint32_t port;
{
ValidationErrors::ScopedField field(errors, ".port_value");
port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
if (GPR_UNLIKELY(port >> 16) != 0) {
errors->AddError("invalid port");
return absl::nullopt;
}
ValidationErrors::ScopedField field(errors, ".address");
auto address = ParseCoreAddress(
envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
if (address.has_value()) addresses.push_back(*address);
}
auto addr = StringToSockaddr(address_str, port);
if (!addr.ok()) {
errors->AddError(addr.status().message());
} else {
grpc_address = *addr;
if (XdsDualstackEndpointsEnabled()) {
size_t size;
auto* additional_addresses =
envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
&size);
for (size_t i = 0; i < size; ++i) {
ValidationErrors::ScopedField field(
errors, absl::StrCat(".additional_addresses[", i, "].address"));
auto address = ParseCoreAddress(
envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
additional_addresses[i]),
errors);
if (address.has_value()) addresses.push_back(*address);
}
}
}
if (addresses.empty()) return absl::nullopt;
// Convert to EndpointAddresses.
return EndpointAddresses(
grpc_address, ChannelArgs()
.Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
.Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
addresses, ChannelArgs()
.Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
.Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
}
struct ParsedLocality {

@ -29,6 +29,11 @@ import "google/protobuf/wrappers.proto";
// Upstream host identifier.
message Endpoint {
message AdditionalAddress {
// Additional address that is associated with the endpoint.
core.v3.Address address = 1;
}
// The upstream host address.
//
// .. attention::
@ -39,6 +44,13 @@ message Endpoint {
// in the Address). For LOGICAL or STRICT DNS, it is expected to be hostname,
// and will be resolved via DNS.
core.v3.Address address = 1;
// An ordered list of addresses that together with `address` comprise the
// list of addresses for an endpoint. The address given in the `address` is
// prepended to this list. It is assumed that the list must already be
// sorted by preference order of the addresses. This will only be supported
// for STATIC and EDS clusters.
repeated AdditionalAddress additional_addresses = 4;
}
// An Endpoint that Envoy can route traffic to.

@ -328,5 +328,6 @@ grpc_cc_test(
"//src/core:grpc_xds_client",
"//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
],
)

@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <google/protobuf/wrappers.pb.h>
@ -54,6 +55,7 @@
#include "src/proto/grpc/testing/xds/v3/endpoint.pb.h"
#include "src/proto/grpc/testing/xds/v3/health_check.pb.h"
#include "src/proto/grpc/testing/xds/v3/percent.pb.h"
#include "test/core/util/scoped_env_var.h"
#include "test/core/util/test_config.h"
using envoy::config::endpoint::v3::ClusterLoadAssignment;
@ -489,6 +491,254 @@ TEST_F(XdsEndpointTest, MissingAddress) {
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, MultipleAddressesPerEndpoint) {
testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
socket_address = ep->add_additional_addresses()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(444);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
const auto& p = *priority.localities.begin();
ASSERT_EQ(p.first, p.second.name.get());
EXPECT_EQ(p.first->region(), "myregion");
EXPECT_EQ(p.first->zone(), "myzone");
EXPECT_EQ(p.first->sub_zone(), "mysubzone");
EXPECT_EQ(p.second.lb_weight, 1);
ASSERT_EQ(p.second.endpoints.size(), 1);
const auto& endpoint = p.second.endpoints.front();
ASSERT_EQ(endpoint.addresses().size(), 2);
auto addr =
grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
ASSERT_TRUE(addr.ok()) << addr.status();
EXPECT_EQ(*addr, "127.0.0.1:443");
addr = grpc_sockaddr_to_string(&endpoint.addresses()[1], /*normalize=*/false);
ASSERT_TRUE(addr.ok()) << addr.status();
EXPECT_EQ(*addr, "127.0.0.1:444");
EXPECT_EQ(endpoint.args(), ChannelArgs()
.Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
.Set(GRPC_ARG_XDS_HEALTH_STATUS,
XdsHealthStatus::HealthStatus::kUnknown));
ASSERT_NE(resource.drop_config, nullptr);
EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
}
TEST_F(XdsEndpointTest, AdditionalAddressesMissingAddress) {
testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
ep->add_additional_addresses();
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
EXPECT_EQ(decode_result.resource.status().code(),
absl::StatusCode::kInvalidArgument);
EXPECT_EQ(decode_result.resource.status().message(),
"errors parsing EDS resource: ["
"field:endpoints[0].lb_endpoints[0].endpoint"
".additional_addresses[0].address error:field not present]")
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, AdditionalAddressesMissingSocketAddress) {
testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
ep->add_additional_addresses()->mutable_address();
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
EXPECT_EQ(decode_result.resource.status().code(),
absl::StatusCode::kInvalidArgument);
EXPECT_EQ(decode_result.resource.status().message(),
"errors parsing EDS resource: ["
"field:endpoints[0].lb_endpoints[0].endpoint"
".additional_addresses[0].address.socket_address "
"error:field not present]")
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, AdditionalAddressesInvalidPort) {
testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
socket_address = ep->add_additional_addresses()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(65537);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
EXPECT_EQ(decode_result.resource.status().code(),
absl::StatusCode::kInvalidArgument);
EXPECT_EQ(decode_result.resource.status().message(),
"errors parsing EDS resource: ["
"field:endpoints[0].lb_endpoints[0].endpoint"
".additional_addresses[0].address.socket_address.port_value "
"error:invalid port]")
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, AdditionalAddressesInvalidAddress) {
testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
socket_address = ep->add_additional_addresses()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("not_an_ip_address");
socket_address->set_port_value(444);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
EXPECT_EQ(decode_result.resource.status().code(),
absl::StatusCode::kInvalidArgument);
EXPECT_EQ(decode_result.resource.status().message(),
"errors parsing EDS resource: ["
"field:endpoints[0].lb_endpoints[0].endpoint"
".additional_addresses[0].address.socket_address error:"
"Failed to parse address:not_an_ip_address:444]")
<< decode_result.resource.status();
}
TEST_F(XdsEndpointTest, IgnoresMultipleAddressesPerEndpointWhenNotEnabled) {
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
auto* locality = cla.add_endpoints();
locality->mutable_load_balancing_weight()->set_value(1);
auto* locality_name = locality->mutable_locality();
locality_name->set_region("myregion");
locality_name->set_zone("myzone");
locality_name->set_sub_zone("mysubzone");
auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
auto* socket_address = ep->mutable_address()->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(443);
socket_address = ep->add_additional_addresses()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(444);
std::string serialized_resource;
ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
auto* resource_type = XdsEndpointResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
const auto& p = *priority.localities.begin();
ASSERT_EQ(p.first, p.second.name.get());
EXPECT_EQ(p.first->region(), "myregion");
EXPECT_EQ(p.first->zone(), "myzone");
EXPECT_EQ(p.first->sub_zone(), "mysubzone");
EXPECT_EQ(p.second.lb_weight, 1);
ASSERT_EQ(p.second.endpoints.size(), 1);
const auto& endpoint = p.second.endpoints.front();
ASSERT_EQ(endpoint.addresses().size(), 1);
auto addr =
grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
ASSERT_TRUE(addr.ok()) << addr.status();
EXPECT_EQ(*addr, "127.0.0.1:443");
EXPECT_EQ(endpoint.args(), ChannelArgs()
.Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
.Set(GRPC_ARG_XDS_HEALTH_STATUS,
XdsHealthStatus::HealthStatus::kUnknown));
ASSERT_NE(resource.drop_config, nullptr);
EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
}
TEST_F(XdsEndpointTest, MissingEndpoint) {
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");

@ -156,6 +156,7 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
"//test/cpp/end2end:connection_attempt_injector",
],
)

@ -26,7 +26,9 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -411,6 +413,43 @@ TEST_P(EdsTest, Vanilla) {
channel_->GetLoadBalancingPolicyName());
}
TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
if (!grpc_core::IsRoundRobinDelegateToPickFirstEnabled()) return;
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
const size_t kNumRpcsPerAddress = 10;
// Create 3 backends, but leave backend 0 unstarted.
CreateBackends(3);
StartBackend(1);
StartBackend(2);
// The first endpoint is backends 0 and 1, the second endpoint is backend 2.
EdsResourceArgs args({
{"locality0",
{CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Initially, backend 0 is offline, so the first endpoint should
// connect to backend 1 instead. Traffic should round-robin across
// backends 1 and 2.
WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
EXPECT_EQ(kNumRpcsPerAddress,
backends_[1]->backend_service()->request_count());
EXPECT_EQ(kNumRpcsPerAddress,
backends_[2]->backend_service()->request_count());
// Now start backend 0 and shutdown backend 1.
StartBackend(0);
ShutdownBackend(1);
// Wait for traffic to go to backend 0.
WaitForBackend(DEBUG_LOCATION, 0);
// Traffic should now round-robin across backends 0 and 2.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
EXPECT_EQ(kNumRpcsPerAddress,
backends_[0]->backend_service()->request_count());
EXPECT_EQ(kNumRpcsPerAddress,
backends_[2]->backend_service()->request_count());
}
TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
CreateAndStartBackends(2);
const size_t kNumRpcsPerAddress = 100;
@ -435,32 +474,6 @@ TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
}
}
TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
CreateAndStartBackends(1);
// Initial EDS resource has one locality with no endpoints.
EdsResourceArgs::Locality empty_locality("locality0", {});
EdsResourceArgs args({std::move(empty_locality)});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should fail.
constexpr char kErrorMessage[] =
"no children in weighted_target policy: "
"EDS resource eds_service_name contains empty localities: "
"\\[\\{region=\"xds_default_locality_region\", "
"zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
// Send EDS resource that has an endpoint.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should eventually succeed.
WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessage));
}
});
}
// This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
CreateAndStartBackends(1);
@ -650,7 +663,7 @@ TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
// Tests that the localities in a locality map are picked according to their
// weights.
TEST_P(EdsTest, WeightedRoundRobin) {
TEST_P(EdsTest, LocalityWeights) {
CreateAndStartBackends(2);
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
@ -724,6 +737,32 @@ TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
CreateAndStartBackends(1);
// Initial EDS resource has one locality with no endpoints.
EdsResourceArgs::Locality empty_locality("locality0", {});
EdsResourceArgs args({std::move(empty_locality)});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should fail.
constexpr char kErrorMessage[] =
"no children in weighted_target policy: "
"EDS resource eds_service_name contains empty localities: "
"\\[\\{region=\"xds_default_locality_region\", "
"zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
// Send EDS resource that has an endpoint.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should eventually succeed.
WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessage));
}
});
}
// Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest, LocalityContainingNoEndpoints) {
CreateAndStartBackends(2);

@ -641,22 +641,28 @@ ClusterLoadAssignment XdsEnd2endTest::BuildEdsResource(
endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
for (size_t i = 0; i < locality.endpoints.size(); ++i) {
const int& port = locality.endpoints[i].port;
const auto& endpoint = locality.endpoints[i];
auto* lb_endpoints = endpoints->add_lb_endpoints();
if (locality.endpoints.size() > i &&
locality.endpoints[i].health_status != HealthStatus::UNKNOWN) {
lb_endpoints->set_health_status(locality.endpoints[i].health_status);
lb_endpoints->set_health_status(endpoint.health_status);
}
if (locality.endpoints.size() > i &&
locality.endpoints[i].lb_weight >= 1) {
if (locality.endpoints.size() > i && endpoint.lb_weight >= 1) {
lb_endpoints->mutable_load_balancing_weight()->set_value(
locality.endpoints[i].lb_weight);
endpoint.lb_weight);
}
auto* endpoint = lb_endpoints->mutable_endpoint();
auto* address = endpoint->mutable_address();
auto* socket_address = address->mutable_socket_address();
auto* endpoint_proto = lb_endpoints->mutable_endpoint();
auto* socket_address =
endpoint_proto->mutable_address()->mutable_socket_address();
socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
socket_address->set_port_value(port);
socket_address->set_port_value(endpoint.port);
for (int port : endpoint.additional_ports) {
socket_address = endpoint_proto->add_additional_addresses()
->mutable_address()
->mutable_socket_address();
socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
socket_address->set_port_value(port);
}
}
}
if (!args.drop_categories.empty()) {

@ -592,12 +592,17 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
explicit Endpoint(int port,
::envoy::config::core::v3::HealthStatus health_status =
::envoy::config::core::v3::HealthStatus::UNKNOWN,
int lb_weight = 1)
: port(port), health_status(health_status), lb_weight(lb_weight) {}
int lb_weight = 1,
std::vector<int> additional_ports = {})
: port(port),
health_status(health_status),
lb_weight(lb_weight),
additional_ports(std::move(additional_ports)) {}
int port;
::envoy::config::core::v3::HealthStatus health_status;
int lb_weight;
std::vector<int> additional_ports;
};
// A locality.
@ -632,9 +637,15 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
size_t backend_idx,
::envoy::config::core::v3::HealthStatus health_status =
::envoy::config::core::v3::HealthStatus::UNKNOWN,
int lb_weight = 1) {
int lb_weight = 1, std::vector<size_t> additional_backend_indxees = {}) {
std::vector<int> additional_ports;
additional_ports.reserve(additional_backend_indxees.size());
for (size_t idx : additional_backend_indxees) {
additional_ports.push_back(backends_[idx]->port());
}
return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(),
health_status, lb_weight);
health_status, lb_weight,
additional_ports);
}
// Creates a vector of endpoints for a specified range of backends,

Loading…
Cancel
Save