Refactor response building in xds test

pull/20022/head
Juanli Shen 6 years ago
parent 76ac4595e3
commit a1ca6a099d
  1. 390
      test/cpp/end2end/xds_end2end_test.cc

@ -96,7 +96,6 @@ constexpr char kEdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region";
constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone";
constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone";
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
constexpr int kDefaultLocalityWeight = 3;
@ -260,6 +259,31 @@ class ClientStats {
class EdsServiceImpl : public EdsService {
public:
struct ResponseArgs {
struct Locality {
Locality(const grpc::string& sub_zone, std::vector<int> ports,
int lb_weight = kDefaultLocalityWeight, int priority = 0)
: sub_zone(std::move(sub_zone)),
ports(std::move(ports)),
lb_weight(lb_weight),
priority(priority) {}
const grpc::string sub_zone;
std::vector<int> ports;
int lb_weight;
int priority;
};
ResponseArgs() = default;
explicit ResponseArgs(std::vector<Locality> locality_list)
: locality_list(std::move(locality_list)) {}
std::vector<Locality> locality_list;
std::map<grpc::string, uint32_t> drop_categories;
FractionalPercent::DenominatorType drop_denominator =
FractionalPercent::MILLION;
};
using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
using ResponseDelayPair = std::pair<DiscoveryResponse, int>;
@ -317,47 +341,35 @@ class EdsServiceImpl : public EdsService {
gpr_log(GPR_INFO, "LB[%p]: shut down", this);
}
// TODO(juanlishen): Put the args into a struct.
static DiscoveryResponse BuildResponse(
const std::vector<std::vector<int>>& backend_ports,
const std::vector<int>& lb_weights = {},
size_t first_locality_name_index = 0,
const std::map<grpc::string, uint32_t>& drop_categories = {},
const FractionalPercent::DenominatorType denominator =
FractionalPercent::MILLION) {
static DiscoveryResponse BuildResponse(const ResponseArgs& args) {
ClusterLoadAssignment assignment;
assignment.set_cluster_name("service name");
for (size_t i = 0; i < backend_ports.size(); ++i) {
for (const auto& locality : args.locality_list) {
auto* endpoints = assignment.add_endpoints();
const int lb_weight =
lb_weights.empty() ? kDefaultLocalityWeight : lb_weights[i];
endpoints->mutable_load_balancing_weight()->set_value(lb_weight);
endpoints->set_priority(0);
endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
endpoints->set_priority(locality.priority);
endpoints->mutable_locality()->set_region(kDefaultLocalityRegion);
endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
std::ostringstream sub_zone;
sub_zone << kDefaultLocalitySubzone << '_'
<< first_locality_name_index + i;
endpoints->mutable_locality()->set_sub_zone(sub_zone.str());
for (const int& backend_port : backend_ports[i]) {
endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
for (const int& port : locality.ports) {
auto* lb_endpoints = endpoints->add_lb_endpoints();
auto* endpoint = lb_endpoints->mutable_endpoint();
auto* address = endpoint->mutable_address();
auto* socket_address = address->mutable_socket_address();
socket_address->set_address("127.0.0.1");
socket_address->set_port_value(backend_port);
socket_address->set_port_value(port);
}
}
if (!drop_categories.empty()) {
if (!args.drop_categories.empty()) {
auto* policy = assignment.mutable_policy();
for (const auto& p : drop_categories) {
for (const auto& p : args.drop_categories) {
const grpc::string& name = p.first;
const uint32_t parts_per_million = p.second;
auto* drop_overload = policy->add_drop_overloads();
drop_overload->set_category(name);
auto* drop_percentage = drop_overload->mutable_drop_percentage();
drop_percentage->set_numerator(parts_per_million);
drop_percentage->set_denominator(denominator);
drop_percentage->set_denominator(args.drop_denominator);
}
}
DiscoveryResponse response;
@ -729,24 +741,6 @@ class XdsEnd2endTest : public ::testing::Test {
return backend_ports;
}
const std::vector<std::vector<int>> GetBackendPortsInGroups(
size_t start_index = 0, size_t stop_index = 0,
size_t num_group = 1) const {
if (stop_index == 0) stop_index = backends_.size();
size_t group_size = (stop_index - start_index) / num_group;
std::vector<std::vector<int>> backend_ports;
for (size_t i = 0; i < num_group; ++i) {
backend_ports.emplace_back();
size_t group_start = group_size * i + start_index;
size_t group_stop =
i == num_group - 1 ? stop_index : group_start + group_size;
for (size_t j = group_start; j < group_stop; ++j) {
backend_ports[i].push_back(backends_[j]->port());
}
}
return backend_ports;
}
void ScheduleResponseForBalancer(size_t i, const DiscoveryResponse& response,
int delay_ms) {
balancers_[i]->eds_service()->add_response(response, delay_ms);
@ -938,8 +932,10 @@ TEST_F(SingleBalancerTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
@ -962,17 +958,18 @@ TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Same backend listed twice.
std::vector<int> ports;
ports.push_back(backends_[0]->port());
ports.push_back(backends_[0]->port());
std::vector<int> ports(2, backends_[0]->port());
EdsServiceImpl::ResponseArgs args({
{"locality0", ports},
});
const size_t kNumRpcsPerAddress = 10;
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// We need to wait for the backend to come online.
WaitForBackend(0);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * ports.size());
// Backend should have gotten 20 requests.
EXPECT_EQ(kNumRpcsPerAddress * 2,
EXPECT_EQ(kNumRpcsPerAddress * ports.size(),
backends_[0]->backend_service()->request_count());
// And they should have come from a single client port, because of
// subchannel sharing.
@ -985,8 +982,10 @@ TEST_F(SingleBalancerTest, SecureNaming) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
@ -1031,11 +1030,17 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const int kCallDeadlineMs = kServerlistDelayMs * 2;
// First response is an empty serverlist, sent right away.
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({{}}), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()),
kServerlistDelayMs);
EdsServiceImpl::ResponseArgs::Locality empty_locality("locality0", {});
EdsServiceImpl::ResponseArgs args({
empty_locality,
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Send non-empty serverlist only after kServerlistDelayMs.
args = EdsServiceImpl::ResponseArgs({
{"locality0", GetBackendPorts()},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args),
kServerlistDelayMs);
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */);
@ -1061,7 +1066,10 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
for (size_t i = 0; i < kNumUnreachableServers; ++i) {
ports.push_back(grpc_pick_unused_port_or_die());
}
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", ports},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
const Status status = SendRpc();
// The error shouldn't be DEADLINE_EXCEEDED.
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
@ -1082,11 +1090,11 @@ TEST_F(SingleBalancerTest, LocalityMapWeightedRoundRobin) {
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
// EDS response contains 2 localities, each of which contains 1 backend.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(0, 2, 2),
{kLocalityWeight0, kLocalityWeight1}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kLocalityWeight0},
{"locality1", GetBackendPorts(1, 2), kLocalityWeight1},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Wait for both backends to be ready.
WaitForAllBackends(1, 0, 2);
// Send kNumRpcs RPCs.
@ -1118,14 +1126,19 @@ TEST_F(SingleBalancerTest, LocalityMapStressTest) {
const size_t kNumLocalities = 100;
// The first EDS response contains kNumLocalities localities, each of which
// contains backend 0.
const std::vector<std::vector<int>> locality_list_0(kNumLocalities,
{backends_[0]->port()});
EdsServiceImpl::ResponseArgs args;
for (size_t i = 0; i < kNumLocalities; ++i) {
grpc::string name = "locality" + std::to_string(i);
EdsServiceImpl::ResponseArgs::Locality locality(name,
{backends_[0]->port()});
args.locality_list.emplace_back(std::move(locality));
}
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// The second EDS response contains 1 locality, which contains backend 1.
const std::vector<std::vector<int>> locality_list_1 =
GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_0),
0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_1),
args = EdsServiceImpl::ResponseArgs({
{"locality0", GetBackendPorts(1, 2)},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args),
60 * 1000);
// Wait until backend 0 is ready, before which kNumLocalities localities are
// received and handled by the xds policy.
@ -1162,20 +1175,18 @@ TEST_F(SingleBalancerTest, LocalityMapUpdate) {
for (int weight : kLocalityWeights1) {
locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
}
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(0 /*start_index*/, 3 /*stop_index*/,
3 /*num_group*/),
kLocalityWeights0),
0);
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(1 /*start_index*/, 4 /*stop_index*/,
3 /*num_group*/),
kLocalityWeights1, 1 /*first_locality_name_index*/),
5000);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), 2},
{"locality1", GetBackendPorts(1, 2), 3},
{"locality2", GetBackendPorts(2, 3), 4},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality1", GetBackendPorts(1, 2), 3},
{"locality2", GetBackendPorts(2, 3), 2},
{"locality3", GetBackendPorts(3, 4), 6},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 5000);
// Wait for the first 3 backends to be ready.
WaitForAllBackends(1, 0, 3);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -1244,13 +1255,12 @@ TEST_F(SingleBalancerTest, Drop) {
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
@ -1286,12 +1296,12 @@ TEST_F(SingleBalancerTest, DropPerHundred) {
const uint32_t kDropPerHundredForLb = 10;
const double kDropRateForLb = kDropPerHundredForLb / 100.0;
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerHundredForLb}},
FractionalPercent::HUNDRED),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
args.drop_denominator = FractionalPercent::HUNDRED;
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
@ -1326,12 +1336,12 @@ TEST_F(SingleBalancerTest, DropPerTenThousand) {
const uint32_t kDropPerTenThousandForLb = 1000;
const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerTenThousandForLb}},
FractionalPercent::TEN_THOUSAND),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
args.drop_denominator = FractionalPercent::TEN_THOUSAND;
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
@ -1370,22 +1380,18 @@ TEST_F(SingleBalancerTest, DropUpdate) {
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The first EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerMillionForLb}}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// The second EDS response contains two drop categories.
// TODO(juanlishen): Change the EDS response sending to deterministic style
// (e.g., by using condition variable) so that we can shorten the test
// duration.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
10000);
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 10000);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
@ -1465,13 +1471,12 @@ TEST_F(SingleBalancerTest, DropAll) {
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 1000000;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Send kNumRpcs RPCs and all of them are dropped.
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
@ -1493,11 +1498,11 @@ TEST_F(SingleBalancerTest, Fallback) {
kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(kNumBackendsInResolution /* start_index */)),
kServerlistDelayMs);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(kNumBackendsInResolution)},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
@ -1542,12 +1547,12 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */)),
kServerlistDelayMs);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(kNumBackendsInResolution +
kNumBackendsInResolutionUpdate)},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
@ -1645,8 +1650,10 @@ TEST_F(SingleBalancerTest, FallbackIfResponseReceivedButChildNotReady) {
SetNextResolutionForLbChannelAllBalancers();
// Send a serverlist that only contains an unreachable backend before fallback
// timeout.
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse({{grpc_pick_unused_port_or_die()}}), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {grpc_pick_unused_port_or_die()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Because no child policy is ready before fallback timeout, we enter fallback
// mode.
WaitForBackend(0);
@ -1659,11 +1666,11 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
// Return a new balancer that sends a response to drop all calls.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, 1000000}}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, 1000000}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
SetNextResolutionForLbChannelAllBalancers();
// Send RPCs until failure.
gpr_timespec deadline = gpr_time_add(
@ -1683,8 +1690,10 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedAfterChildRready) {
WaitForBackend(0);
// Return a new balancer that sends a dead backend.
ShutdownBackend(1);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse({{backends_[1]->port()}}), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[1]->port()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
SetNextResolutionForLbChannelAllBalancers();
// The state (TRANSIENT_FAILURE) update from the child policy will be ignored
// because we are still in fallback mode.
@ -1708,8 +1717,10 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedAfterChildRready) {
TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForAllBackends();
// Stop backends. RPCs should fail.
ShutdownAllBackends();
@ -1728,12 +1739,14 @@ class UpdatesTest : public XdsEnd2endTest {
TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", {backends_[1]->port()}},
});
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(args), 0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1781,12 +1794,14 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
TEST_F(UpdatesTest, UpdateBalancerName) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", {backends_[1]->port()}},
});
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(args), 0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1852,12 +1867,14 @@ TEST_F(UpdatesTest, UpdateBalancerName) {
TEST_F(UpdatesTest, UpdateBalancersRepeated) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", {backends_[1]->port()}},
});
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(args), 0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1920,12 +1937,14 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", {backends_[1]->port()}},
});
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(args), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -2007,10 +2026,10 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
// TODO(juanlishen): Partition the backends after multiple localities is
// tested.
ScheduleResponseForBalancer(0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(0, backends_.size())),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Wait until all backends are ready.
int num_ok = 0;
int num_failure = 0;
@ -2046,11 +2065,10 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
const size_t kNumBackendsFirstPass = backends_.size() / 2;
const size_t kNumBackendsSecondPass =
backends_.size() - kNumBackendsFirstPass;
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(0, kNumBackendsFirstPass)),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, kNumBackendsFirstPass)},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Wait until all backends returned by the balancer are ready.
int num_ok = 0;
int num_failure = 0;
@ -2077,11 +2095,10 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
}
// Now restart the balancer, this time pointing to the new backends.
balancers_[0]->Start(server_host_);
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(kNumBackendsFirstPass)),
0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", GetBackendPorts(kNumBackendsFirstPass)},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
// Wait for queries to start going to one of the new backends.
// This tells us that we're now using the new serverlist.
std::tie(num_ok, num_failure, num_drops) =
@ -2116,13 +2133,12 @@ TEST_F(SingleBalancerWithClientLoadReportingAndDropTest, Vanilla) {
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(), {}, 0,
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;

Loading…
Cancel
Save