|
|
|
@ -98,12 +98,12 @@ namespace { |
|
|
|
|
template <typename ServiceType> |
|
|
|
|
class CountedService : public ServiceType { |
|
|
|
|
public: |
|
|
|
|
int request_count() { |
|
|
|
|
size_t request_count() { |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
return request_count_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int response_count() { |
|
|
|
|
size_t response_count() { |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
return response_count_; |
|
|
|
|
} |
|
|
|
@ -121,8 +121,8 @@ class CountedService : public ServiceType { |
|
|
|
|
std::mutex mu_; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
int request_count_ = 0; |
|
|
|
|
int response_count_ = 0; |
|
|
|
|
size_t request_count_ = 0; |
|
|
|
|
size_t response_count_ = 0; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
using BackendService = CountedService<TestServiceImpl>; |
|
|
|
@ -243,9 +243,18 @@ class BalancerServiceImpl : public BalancerService { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static LoadBalanceResponse BuildResponseForBackends( |
|
|
|
|
const std::vector<int>& backend_ports) { |
|
|
|
|
const std::vector<int>& backend_ports, int num_drops_for_rate_limiting, |
|
|
|
|
int num_drops_for_load_balancing) { |
|
|
|
|
LoadBalanceResponse response; |
|
|
|
|
for (const int backend_port : backend_ports) { |
|
|
|
|
for (int i = 0; i < num_drops_for_rate_limiting; ++i) { |
|
|
|
|
auto* server = response.mutable_server_list()->add_servers(); |
|
|
|
|
server->set_drop_for_rate_limiting(true); |
|
|
|
|
} |
|
|
|
|
for (int i = 0; i < num_drops_for_load_balancing; ++i) { |
|
|
|
|
auto* server = response.mutable_server_list()->add_servers(); |
|
|
|
|
server->set_drop_for_load_balancing(true); |
|
|
|
|
} |
|
|
|
|
for (const int& backend_port : backend_ports) { |
|
|
|
|
auto* server = response.mutable_server_list()->add_servers(); |
|
|
|
|
server->set_ip_address(Ip4ToPackedString("127.0.0.1")); |
|
|
|
|
server->set_port(backend_port); |
|
|
|
@ -327,10 +336,8 @@ class GrpclbEnd2endTest : public ::testing::Test { |
|
|
|
|
ChannelArguments args; |
|
|
|
|
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, |
|
|
|
|
response_generator_); |
|
|
|
|
std::ostringstream uri; |
|
|
|
|
uri << "test:///servername_not_used"; |
|
|
|
|
channel_ = |
|
|
|
|
CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); |
|
|
|
|
channel_ = CreateCustomChannel("test:///not_used", |
|
|
|
|
InsecureChannelCredentials(), args); |
|
|
|
|
stub_ = grpc::testing::EchoTestService::NewStub(channel_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -467,26 +474,33 @@ class SingleBalancerTest : public GrpclbEnd2endTest { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST_F(SingleBalancerTest, Vanilla) { |
|
|
|
|
const size_t kNumRpcsPerAddress = 100; |
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), |
|
|
|
|
0); |
|
|
|
|
// Make sure that trying to connect works without a call.
|
|
|
|
|
channel_->GetState(true /* try_to_connect */); |
|
|
|
|
// Start servers and send 100 RPCs per server.
|
|
|
|
|
const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); |
|
|
|
|
// Send 100 RPCs per server.
|
|
|
|
|
const auto& statuses_and_responses = |
|
|
|
|
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); |
|
|
|
|
|
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
EXPECT_TRUE(status_and_response.first.ok()); |
|
|
|
|
EXPECT_EQ(status_and_response.second.message(), kMessage_); |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Each backend should have gotten 100 requests.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(100, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress, |
|
|
|
|
backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
|
|
|
|
|
// Check LB policy name for the channel.
|
|
|
|
|
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); |
|
|
|
@ -500,7 +514,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { |
|
|
|
|
ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); |
|
|
|
|
// Send non-empty serverlist only after kServerlistDelayMs
|
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), |
|
|
|
|
kServerlistDelayMs); |
|
|
|
|
|
|
|
|
|
const auto t0 = system_clock::now(); |
|
|
|
@ -518,17 +532,20 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { |
|
|
|
|
|
|
|
|
|
// Each backend should have gotten 1 request.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(1, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
EXPECT_TRUE(status_and_response.first.ok()); |
|
|
|
|
EXPECT_EQ(status_and_response.second.message(), kMessage_); |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// and sent two responses.
|
|
|
|
|
EXPECT_EQ(2, balancer_servers_[0].service_->response_count()); |
|
|
|
|
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
|
|
|
|
|
// Check LB policy name for the channel.
|
|
|
|
|
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); |
|
|
|
@ -539,10 +556,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { |
|
|
|
|
|
|
|
|
|
// Send a serverlist right away.
|
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), |
|
|
|
|
0); |
|
|
|
|
// ... and the same one a bit later.
|
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), |
|
|
|
|
kServerlistDelayMs); |
|
|
|
|
|
|
|
|
|
// Send num_backends/2 requests.
|
|
|
|
@ -550,14 +568,17 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { |
|
|
|
|
// only the first half of the backends will receive them.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
if (i < backends_.size() / 2) |
|
|
|
|
EXPECT_EQ(1, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); |
|
|
|
|
else |
|
|
|
|
EXPECT_EQ(0, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); |
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
EXPECT_TRUE(status_and_response.first.ok()); |
|
|
|
|
EXPECT_EQ(status_and_response.second.message(), kMessage_); |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Wait for the (duplicated) serverlist update.
|
|
|
|
@ -566,7 +587,7 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { |
|
|
|
|
gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); |
|
|
|
|
|
|
|
|
|
// Verify the LB has sent two responses.
|
|
|
|
|
EXPECT_EQ(2, balancer_servers_[0].service_->response_count()); |
|
|
|
|
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
|
|
|
|
|
// Some more calls to complete the total number of backends.
|
|
|
|
|
statuses_and_responses = SendRpc( |
|
|
|
@ -575,52 +596,146 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { |
|
|
|
|
// Because a duplicated serverlist should have no effect, all backends must
|
|
|
|
|
// have been hit once now.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(1, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); |
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
EXPECT_TRUE(status_and_response.first.ok()); |
|
|
|
|
EXPECT_EQ(status_and_response.second.message(), kMessage_); |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// Check LB policy name for the channel.
|
|
|
|
|
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(SingleBalancerTest, Drop) { |
|
|
|
|
const size_t kNumRpcsPerAddress = 100; |
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2), |
|
|
|
|
0); |
|
|
|
|
// Send 100 RPCs for each server and drop address.
|
|
|
|
|
const auto& statuses_and_responses = |
|
|
|
|
SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); |
|
|
|
|
|
|
|
|
|
size_t num_drops = 0; |
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
if (!status.ok() && |
|
|
|
|
status.error_message() == "Call dropped by load balancing policy") { |
|
|
|
|
++num_drops; |
|
|
|
|
} else { |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops); |
|
|
|
|
|
|
|
|
|
// Each backend should have gotten 100 requests.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress, |
|
|
|
|
backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { |
|
|
|
|
public: |
|
|
|
|
SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { |
|
|
|
|
const size_t kNumRpcsPerAddress = 100; |
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); |
|
|
|
|
// Start servers and send 100 RPCs per server.
|
|
|
|
|
const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), |
|
|
|
|
0); |
|
|
|
|
// Send 100 RPCs per server.
|
|
|
|
|
const auto& statuses_and_responses = |
|
|
|
|
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); |
|
|
|
|
|
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
EXPECT_TRUE(status_and_response.first.ok()); |
|
|
|
|
EXPECT_EQ(status_and_response.second.message(), kMessage_); |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Each backend should have gotten 100 requests.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(100, backend_servers_[i].service_->request_count()); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress, |
|
|
|
|
backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
|
EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); |
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
|
|
|
|
|
const ClientStats client_stats = WaitForLoadReports(); |
|
|
|
|
EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started); |
|
|
|
|
EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, |
|
|
|
|
client_stats.num_calls_finished); |
|
|
|
|
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); |
|
|
|
|
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); |
|
|
|
|
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); |
|
|
|
|
EXPECT_EQ(100 * num_backends_, |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, |
|
|
|
|
client_stats.num_calls_finished_known_received); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { |
|
|
|
|
const size_t kNumRpcsPerAddress = 3; |
|
|
|
|
ScheduleResponseForBalancer( |
|
|
|
|
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1), |
|
|
|
|
0); |
|
|
|
|
// Send 100 RPCs for each server and drop address.
|
|
|
|
|
const auto& statuses_and_responses = |
|
|
|
|
SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); |
|
|
|
|
|
|
|
|
|
size_t num_drops = 0; |
|
|
|
|
for (const auto& status_and_response : statuses_and_responses) { |
|
|
|
|
const Status& status = status_and_response.first; |
|
|
|
|
const EchoResponse& response = status_and_response.second; |
|
|
|
|
if (!status.ok() && |
|
|
|
|
status.error_message() == "Call dropped by load balancing policy") { |
|
|
|
|
++num_drops; |
|
|
|
|
} else { |
|
|
|
|
EXPECT_TRUE(status.ok()) << "code=" << status.error_code() |
|
|
|
|
<< " message=" << status.error_message(); |
|
|
|
|
EXPECT_EQ(response.message(), kMessage_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops); |
|
|
|
|
|
|
|
|
|
// Each backend should have gotten 100 requests.
|
|
|
|
|
for (size_t i = 0; i < backends_.size(); ++i) { |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress, |
|
|
|
|
backend_servers_[i].service_->request_count()); |
|
|
|
|
} |
|
|
|
|
// The balancer got a single request.
|
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
|
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); |
|
|
|
|
|
|
|
|
|
const ClientStats client_stats = WaitForLoadReports(); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), |
|
|
|
|
client_stats.num_calls_started); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), |
|
|
|
|
client_stats.num_calls_finished); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * 2, |
|
|
|
|
client_stats.num_calls_finished_with_drop_for_rate_limiting); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress, |
|
|
|
|
client_stats.num_calls_finished_with_drop_for_load_balancing); |
|
|
|
|
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); |
|
|
|
|
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, |
|
|
|
|
client_stats.num_calls_finished_known_received); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|