Merge pull request #12318 from dgquintas/fix_epoll1_flakiness_client_lb_e2e

{grpclb,client_lb}_end2end: Fix epoll1 flakes
pull/12456/merge
David G. Quintas 7 years ago committed by GitHub
commit 729448f2ed
  1. 51
      test/cpp/end2end/client_lb_end2end_test.cc
  2. 310
      test/cpp/end2end/grpclb_end2end_test.cc

@ -226,6 +226,31 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters(); ResetCounters();
} }
bool SeenAllServers() {
for (const auto& server : servers_) {
if (server->service_.request_count() == 0) return false;
}
return true;
}
// Updates \a connection_order by appending to it the index of the newly
// connected server. Must be called after every single RPC.
void UpdateConnectionOrder(
const std::vector<std::unique_ptr<ServerData>>& servers,
std::vector<int>* connection_order) {
for (size_t i = 0; i < servers.size(); ++i) {
if (servers[i]->service_.request_count() == 1) {
// Was the server index known? If not, update connection_order.
const auto it =
std::find(connection_order->begin(), connection_order->end(), i);
if (it == connection_order->end()) {
connection_order->push_back(i);
return;
}
}
}
}
const grpc::string server_host_; const grpc::string server_host_;
std::shared_ptr<Channel> channel_; std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@ -370,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
ports.emplace_back(server->port_); ports.emplace_back(server->port_);
} }
SetNextResolution(ports); SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) { // Wait until all backends are ready.
do {
CheckRpcSendOk(); CheckRpcSendOk();
} } while (!SeenAllServers());
// One request should have gone to each server. ResetCounters();
// "Sync" to the end of the list. Next sequence of picks will start at the
// first server (index 0).
WaitForServer(servers_.size() - 1);
std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
EXPECT_EQ(1, servers_[i]->service_.request_count()); CheckRpcSendOk();
UpdateConnectionOrder(servers_, &connection_order);
} }
// Backends should be iterated over in the order in which the addresses were
// given.
const auto expected = std::vector<int>{0, 1, 2};
EXPECT_EQ(expected, connection_order);
// Check LB policy name for the channel. // Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
} }
@ -529,13 +564,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
StartServers(kNumServers, ports); StartServers(kNumServers, ports);
ResetStub("round_robin"); ResetStub("round_robin");
SetNextResolution(ports); SetNextResolution(ports);
// Send one RPC per backend and make sure they are used in order. // Send a number of RPCs, which succeed.
// Note: This relies on the fact that the subchannels are reported in for (size_t i = 0; i < 100; ++i) {
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(); CheckRpcSendOk();
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
} }
// Kill all servers // Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {

@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends), num_backends_(num_backends),
num_balancers_(num_balancers), num_balancers_(num_balancers),
client_load_reporting_interval_seconds_( client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds) {} client_load_reporting_interval_seconds),
kRequestMessage_("Live long and prosper.") {}
void SetUp() override { void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create(); response_generator_ = grpc_fake_resolver_response_generator_create();
@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_); stub_ = grpc::testing::EchoTestService::NewStub(channel_);
} }
void ResetBackendCounters() {
for (const auto& backend : backends_) backend->ResetCounters();
}
ClientStats WaitForLoadReports() { ClientStats WaitForLoadReports() {
ClientStats client_stats; ClientStats client_stats;
for (const auto& balancer : balancers_) { for (const auto& balancer : balancers_) {
@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
return client_stats; return client_stats;
} }
bool SeenAllBackends() {
for (const auto& backend : backends_) {
if (backend->request_count() == 0) return false;
}
return true;
}
void WaitForAllBackends() {
while (!SeenAllBackends()) {
CheckRpcSendOk();
}
ResetBackendCounters();
}
void WaitForBackend(size_t backend_idx) {
do {
CheckRpcSendOk();
} while (backends_[backend_idx]->request_count() == 0);
ResetBackendCounters();
}
struct AddressData { struct AddressData {
int port; int port;
bool is_balancer; bool is_balancer;
@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test {
balancers_.at(i)->add_response(response, delay_ms); balancers_.at(i)->add_response(response, delay_ms);
} }
std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message, Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
int num_rpcs, const bool local_response = (response == nullptr);
int timeout_ms = 1000) { if (local_response) response = new EchoResponse;
std::vector<std::pair<Status, EchoResponse>> results;
EchoRequest request; EchoRequest request;
EchoResponse response; request.set_message(kRequestMessage_);
request.set_message(message); ClientContext context;
for (int i = 0; i < num_rpcs; i++) { context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
ClientContext context; Status status = stub_->Echo(&context, request, response);
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); if (local_response) delete response;
Status status = stub_->Echo(&context, request, &response); return status;
results.push_back(std::make_pair(status, response)); }
void CheckRpcSendOk(const size_t times = 1) {
for (size_t i = 0; i < times; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
} }
return results; }
void CheckRpcSendFailure() {
const Status status = SendRpc();
EXPECT_FALSE(status.ok());
} }
template <typename T> template <typename T>
@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test {
const int client_load_reporting_interval_seconds_; const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_; std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<BackendServiceImpl>> backends_; std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_; std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_;
grpc_fake_resolver_response_generator* response_generator_; grpc_fake_resolver_response_generator* response_generator_;
const grpc::string kRequestMessage_;
}; };
class SingleBalancerTest : public GrpclbEnd2endTest { class SingleBalancerTest : public GrpclbEnd2endTest {
@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) {
0); 0);
// Make sure that trying to connect works without a call. // Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */); channel_->GetState(true /* try_to_connect */);
// Send 100 RPCs per server.
const auto& statuses_and_responses = // We need to wait for all backends to come online.
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); WaitForAllBackends();
for (const auto& status_and_response : statuses_and_responses) { // Send kNumRpcsPerAddress RPCs per server.
const Status& status = status_and_response.first; CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// Each backend should have gotten 100 requests. // Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const auto t0 = system_clock::now(); const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist. // Client will block: LB will initially send empty serverlist.
const auto& statuses_and_responses = CheckRpcSendOk(num_backends_);
SendRpc(kMessage_, num_backends_, kCallDeadlineMs);
const auto ellapsed_ms = const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0); system_clock::now() - t0);
@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
} }
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
balancers_[0]->NotifyDoneWithServerlists(); balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request. // The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(SingleBalancerTest, RepeatedServerlist) {
constexpr int kServerlistDelayMs = 100;
// Send a serverlist right away.
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// ... and the same one a bit later.
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
// Send num_backends/2 requests.
auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2);
// only the first half of the backends will receive them.
for (size_t i = 0; i < backends_.size(); ++i) {
if (i < backends_.size() / 2)
EXPECT_EQ(1U, backend_servers_[i].service_->request_count())
<< "for backend #" << i;
else
EXPECT_EQ(0U, backend_servers_[i].service_->request_count())
<< "for backend #" << i;
}
EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// Wait for the (duplicated) serverlist update.
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
// Verify the LB has sent two responses.
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
// Some more calls to complete the total number of backends.
statuses_and_responses = SendRpc(
kMessage_,
num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */);
// Because a duplicated serverlist should have no effect, all backends must
// have been hit once now.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, BackendsRestart) { TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100; const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
0); 0);
// Make sure that trying to connect works without a call. // Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */); channel_->GetState(true /* try_to_connect */);
// Send 100 RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
auto statuses_and_responses = CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
balancers_[0]->NotifyDoneWithServerlists(); balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request. // The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
} }
statuses_and_responses = SendRpc(kMessage_, 1); CheckRpcSendFailure();
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
EXPECT_FALSE(status.ok());
}
for (size_t i = 0; i < num_backends_; ++i) { for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServiceImpl()); backends_.emplace_back(new BackendServiceImpl());
backend_servers_.emplace_back(ServerThread<BackendService>( backend_servers_.emplace_back(ServerThread<BackendService>(
@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// TODO(dgq): implement the "backend restart" component as well. We need extra // TODO(dgq): implement the "backend restart" component as well. We need extra
// machinery to either update the LB responses "on the fly" or instruct // machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on. // backends which ports to restart on.
statuses_and_responses = SendRpc(kMessage_, 1); CheckRpcSendFailure();
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
EXPECT_FALSE(status.ok());
}
// Check LB policy name for the channel. // Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
} }
@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Start servers and send 10 RPCs per server. // Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
auto statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should have gone to the first backend. // All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Wait until update has been processed, as signaled by the second backend // Wait until update has been processed, as signaled by the second backend
// receiving a request. // receiving a request.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
do { WaitForBackend(1);
auto statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
} while (backend_servers_[1].service_->request_count() == 0);
backend_servers_[1].service_->ResetCounters(); backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should have gone to the second backend. // All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// Start servers and send 10 RPCs per server. // Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
auto statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should have gone to the first backend. // All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs // Send 10 seconds worth of RPCs
do { do {
statuses_and_responses = SendRpc(kMessage_, 1); CheckRpcSendOk();
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which // grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend. // doesn't assign the second backend.
@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_time_from_millis(10000, GPR_TIMESPAN)); gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs // Send 10 seconds worth of RPCs
do { do {
statuses_and_responses = SendRpc(kMessage_, 1); CheckRpcSendOk();
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which // grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend. // doesn't assign the second backend.
@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// Start servers and send 10 RPCs per server. // Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
auto statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should have gone to the first backend. // All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// This is serviced by the existing RR policy // This is serviced by the existing RR policy
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should again have gone to the first backend. // All 10 requests should again have gone to the first backend.
EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// receiving a request. In the meantime, the client continues to be serviced // receiving a request. In the meantime, the client continues to be serviced
// (by the first backend) without interruption. // (by the first backend) without interruption.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
do { WaitForBackend(1);
auto statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
} while (backend_servers_[1].service_->request_count() == 0);
// This is serviced by the existing RR policy // This is serviced by the existing RR policy
backend_servers_[1].service_->ResetCounters(); backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
statuses_and_responses = SendRpc(kMessage_, 10); CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// All 10 requests should have gone to the second backend. // All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends( 0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0); 0);
// Send 100 RPCs for each server and drop address. // Send kNumRpcsPerAddress RPCs for each server and drop address.
const auto& statuses_and_responses =
SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
size_t num_drops = 0; size_t num_drops = 0;
for (const auto& status_and_response : statuses_and_responses) { for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
const Status& status = status_and_response.first; EchoResponse response;
const EchoResponse& response = status_and_response.second; const Status status = SendRpc(&response);
if (!status.ok() && if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") { status.error_message() == "Call dropped by load balancing policy") {
++num_drops; ++num_drops;
@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
0, BalancerServiceImpl::BuildResponseForBackends( 0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}), {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
0); 0);
const auto& statuses_and_responses = SendRpc(kMessage_, 1); const Status status = SendRpc();
for (const auto& status_and_response : statuses_and_responses) { EXPECT_FALSE(status.ok());
const Status& status = status_and_response.first; EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
} }
TEST_F(SingleBalancerTest, DropAll) { TEST_F(SingleBalancerTest, DropAll) {
@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) {
1000); 1000);
// First call succeeds. // First call succeeds.
auto statuses_and_responses = SendRpc(kMessage_, 1); CheckRpcSendOk();
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// But eventually, the update with only dropped servers is processed and calls // But eventually, the update with only dropped servers is processed and calls
// fail. // fail.
Status status;
do { do {
statuses_and_responses = SendRpc(kMessage_, 1); status = SendRpc();
ASSERT_EQ(statuses_and_responses.size(), 1UL); } while (status.ok());
} while (statuses_and_responses[0].first.ok());
const Status& status = statuses_and_responses[0].first;
EXPECT_FALSE(status.ok()); EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
} }
@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
ScheduleResponseForBalancer( ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0); 0);
// Send 100 RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
const auto& statuses_and_responses = CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// Each backend should have gotten 100 requests. // Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress, EXPECT_EQ(kNumRpcsPerAddress,
@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends( 0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0); 0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
size_t num_drops = 0; size_t num_drops = 0;
for (const auto& status_and_response : statuses_and_responses) { for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
const Status& status = status_and_response.first; EchoResponse response;
const EchoResponse& response = status_and_response.second; const Status status = SendRpc(&response);
if (!status.ok() && if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") { status.error_message() == "Call dropped by load balancing policy") {
++num_drops; ++num_drops;

Loading…
Cancel
Save