Merge pull request #19858 from AspirinSJL/locality_map_test

Add tests for xds locality map
pull/19936/head
Juanli Shen 5 years ago committed by GitHub
commit 58aca9a2be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 98
      test/cpp/end2end/xds_end2end_test.cc

@ -509,7 +509,7 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<PickerWrapper> picker_wrapper_; RefCountedPtr<PickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
uint32_t locality_weight_; uint32_t locality_weight_;
}; };

@ -98,6 +98,7 @@ constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone";
constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone"; constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone";
constexpr char kLbDropType[] = "lb"; constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle"; constexpr char kThrottleDropType[] = "throttle";
constexpr int kDefaultLocalityWeight = 3;
template <typename ServiceType> template <typename ServiceType>
class CountedService : public ServiceType { class CountedService : public ServiceType {
@ -317,6 +318,7 @@ class EdsServiceImpl : public EdsService {
static DiscoveryResponse BuildResponse( static DiscoveryResponse BuildResponse(
const std::vector<std::vector<int>>& backend_ports, const std::vector<std::vector<int>>& backend_ports,
const std::vector<int>& lb_weights = {},
const std::map<grpc::string, uint32_t>& drop_categories = {}, const std::map<grpc::string, uint32_t>& drop_categories = {},
const FractionalPercent::DenominatorType denominator = const FractionalPercent::DenominatorType denominator =
FractionalPercent::MILLION) { FractionalPercent::MILLION) {
@ -324,7 +326,9 @@ class EdsServiceImpl : public EdsService {
assignment.set_cluster_name("service name"); assignment.set_cluster_name("service name");
for (size_t i = 0; i < backend_ports.size(); ++i) { for (size_t i = 0; i < backend_ports.size(); ++i) {
auto* endpoints = assignment.add_endpoints(); auto* endpoints = assignment.add_endpoints();
endpoints->mutable_load_balancing_weight()->set_value(3); const int lb_weight =
lb_weights.empty() ? kDefaultLocalityWeight : lb_weights[i];
endpoints->mutable_load_balancing_weight()->set_value(lb_weight);
endpoints->set_priority(0); endpoints->set_priority(0);
endpoints->mutable_locality()->set_region(kDefaultLocalityRegion); endpoints->mutable_locality()->set_region(kDefaultLocalityRegion);
endpoints->mutable_locality()->set_zone(kDefaultLocalityZone); endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
@ -620,11 +624,14 @@ class XdsEnd2endTest : public ::testing::Test {
return std::make_tuple(num_ok, num_failure, num_drops); return std::make_tuple(num_ok, num_failure, num_drops);
} }
void WaitForBackend(size_t backend_idx) { void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
gpr_log(GPR_INFO,
"========= WAITING FOR BACKEND %lu ==========", backend_idx);
do { do {
(void)SendRpc(); (void)SendRpc();
} while (backends_[backend_idx]->backend_service()->request_count() == 0); } while (backends_[backend_idx]->backend_service()->request_count() == 0);
ResetBackendCounters(); if (reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========", backend_idx);
} }
grpc_core::ServerAddressList CreateLbAddressesFromPortList( grpc_core::ServerAddressList CreateLbAddressesFromPortList(
@ -1043,6 +1050,75 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
} }
TEST_F(SingleBalancerTest, LocalityMapWeightedRoundRobin) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
const double kLocalityWeightRate0 =
static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
// EDS response contains 2 localities, each of which contains 1 backend.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(0, 2, 2),
{kLocalityWeight0, kLocalityWeight1}),
0);
// Wait for both backends to be ready.
WaitForAllBackends(1, 0, 2);
// Send kNumRpcs RPCs.
CheckRpcSendOk(kNumRpcs);
// The locality picking rates should be roughly equal to the expectation.
const double locality_picked_rate_0 =
static_cast<double>(backends_[0]->backend_service()->request_count()) /
kNumRpcs;
const double locality_picked_rate_1 =
static_cast<double>(backends_[1]->backend_service()->request_count()) /
kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(locality_picked_rate_0,
::testing::AllOf(
::testing::Ge(kLocalityWeightRate0 * (1 - kErrorTolerance)),
::testing::Le(kLocalityWeightRate0 * (1 + kErrorTolerance))));
EXPECT_THAT(locality_picked_rate_1,
::testing::AllOf(
::testing::Ge(kLocalityWeightRate1 * (1 - kErrorTolerance)),
::testing::Le(kLocalityWeightRate1 * (1 + kErrorTolerance))));
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, LocalityMapStressTest) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumLocalities = 100;
// The first EDS response contains kNumLocalities localities, each of which
// contains backend 0.
const std::vector<std::vector<int>> locality_list_0(kNumLocalities,
{backends_[0]->port()});
// The second EDS response contains 1 locality, which contains backend 1.
const std::vector<std::vector<int>> locality_list_1 =
GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_0),
0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_1),
60 * 1000);
// Wait until backend 0 is ready, before which kNumLocalities localities are
// received and handled by the xds policy.
WaitForBackend(0, /*reset_counters=*/false);
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
// Wait until backend 1 is ready, before which kNumLocalities localities are
// removed by the xds policy.
WaitForBackend(1);
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, Drop) { TEST_F(SingleBalancerTest, Drop) {
SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers(); SetNextResolutionForLbChannelAllBalancers();
@ -1057,7 +1133,7 @@ TEST_F(SingleBalancerTest, Drop) {
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse( EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}, {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}), {kThrottleDropType, kDropPerMillionForThrottle}}),
0); 0);
@ -1098,7 +1174,7 @@ TEST_F(SingleBalancerTest, DropPerHundred) {
// The EDS response contains one drop category. // The EDS response contains one drop category.
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerHundredForLb}}, {{kLbDropType, kDropPerHundredForLb}},
FractionalPercent::HUNDRED), FractionalPercent::HUNDRED),
0); 0);
@ -1138,7 +1214,7 @@ TEST_F(SingleBalancerTest, DropPerTenThousand) {
// The EDS response contains one drop category. // The EDS response contains one drop category.
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerTenThousandForLb}}, {{kLbDropType, kDropPerTenThousandForLb}},
FractionalPercent::TEN_THOUSAND), FractionalPercent::TEN_THOUSAND),
0); 0);
@ -1182,7 +1258,7 @@ TEST_F(SingleBalancerTest, DropUpdate) {
// The first EDS response contains one drop category. // The first EDS response contains one drop category.
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}}), {{kLbDropType, kDropPerMillionForLb}}),
0); 0);
// The second EDS response contains two drop categories. // The second EDS response contains two drop categories.
@ -1192,7 +1268,7 @@ TEST_F(SingleBalancerTest, DropUpdate) {
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse( EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}, {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}), {kThrottleDropType, kDropPerMillionForThrottle}}),
10000); 10000);
@ -1278,7 +1354,7 @@ TEST_F(SingleBalancerTest, DropAll) {
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse( EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}, {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}), {kThrottleDropType, kDropPerMillionForThrottle}}),
0); 0);
@ -1471,7 +1547,7 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Return a new balancer that sends a response to drop all calls. // Return a new balancer that sends a response to drop all calls.
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, 1000000}}), {{kLbDropType, 1000000}}),
0); 0);
SetNextResolutionForLbChannelAllBalancers(); SetNextResolutionForLbChannelAllBalancers();
@ -1929,7 +2005,7 @@ TEST_F(SingleBalancerWithClientLoadReportingAndDropTest, Vanilla) {
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, 0,
EdsServiceImpl::BuildResponse( EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}, {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}), {kThrottleDropType, kDropPerMillionForThrottle}}),
0); 0);

Loading…
Cancel
Save