|
|
|
@ -207,67 +207,74 @@ class BalancerServiceImpl : public BalancerService { |
|
|
|
|
client_load_reporting_interval_seconds) {} |
|
|
|
|
|
|
|
|
|
Status BalanceLoad(ServerContext* context, Stream* stream) override { |
|
|
|
|
// Balancer shouldn't receive the call credentials metadata.
|
|
|
|
|
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), |
|
|
|
|
context->client_metadata().end()); |
|
|
|
|
gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); |
|
|
|
|
LoadBalanceRequest request; |
|
|
|
|
std::vector<ResponseDelayPair> responses_and_delays; |
|
|
|
|
|
|
|
|
|
if (!stream->Read(&request)) { |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
IncreaseRequestCount(); |
|
|
|
|
gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, |
|
|
|
|
request.DebugString().c_str()); |
|
|
|
|
|
|
|
|
|
// TODO(juanlishen): Initial response should always be the first response.
|
|
|
|
|
if (client_load_reporting_interval_seconds_ > 0) { |
|
|
|
|
LoadBalanceResponse initial_response; |
|
|
|
|
initial_response.mutable_initial_response() |
|
|
|
|
->mutable_client_stats_report_interval() |
|
|
|
|
->set_seconds(client_load_reporting_interval_seconds_); |
|
|
|
|
stream->Write(initial_response); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
responses_and_delays = responses_and_delays_; |
|
|
|
|
} |
|
|
|
|
for (const auto& response_and_delay : responses_and_delays) { |
|
|
|
|
SendResponse(stream, response_and_delay.first, response_and_delay.second); |
|
|
|
|
if (serverlist_done_) goto done; |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); |
|
|
|
|
} |
|
|
|
|
// Balancer shouldn't receive the call credentials metadata.
|
|
|
|
|
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), |
|
|
|
|
context->client_metadata().end()); |
|
|
|
|
LoadBalanceRequest request; |
|
|
|
|
std::vector<ResponseDelayPair> responses_and_delays; |
|
|
|
|
|
|
|
|
|
if (!stream->Read(&request)) { |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
IncreaseRequestCount(); |
|
|
|
|
gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, |
|
|
|
|
request.DebugString().c_str()); |
|
|
|
|
|
|
|
|
|
// TODO(juanlishen): Initial response should always be the first response.
|
|
|
|
|
if (client_load_reporting_interval_seconds_ > 0) { |
|
|
|
|
LoadBalanceResponse initial_response; |
|
|
|
|
initial_response.mutable_initial_response() |
|
|
|
|
->mutable_client_stats_report_interval() |
|
|
|
|
->set_seconds(client_load_reporting_interval_seconds_); |
|
|
|
|
stream->Write(initial_response); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
responses_and_delays = responses_and_delays_; |
|
|
|
|
} |
|
|
|
|
for (const auto& response_and_delay : responses_and_delays) { |
|
|
|
|
SendResponse(stream, response_and_delay.first, |
|
|
|
|
response_and_delay.second); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
std::unique_lock<std::mutex> lock(mu_); |
|
|
|
|
serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (client_load_reporting_interval_seconds_ > 0) { |
|
|
|
|
request.Clear(); |
|
|
|
|
if (stream->Read(&request)) { |
|
|
|
|
gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", |
|
|
|
|
this, request.DebugString().c_str()); |
|
|
|
|
GPR_ASSERT(request.has_client_stats()); |
|
|
|
|
// We need to acquire the lock here in order to prevent the notify_one
|
|
|
|
|
// below from firing before its corresponding wait is executed.
|
|
|
|
|
std::lock_guard<std::mutex> lock(mu_); |
|
|
|
|
client_stats_.num_calls_started += |
|
|
|
|
request.client_stats().num_calls_started(); |
|
|
|
|
client_stats_.num_calls_finished += |
|
|
|
|
request.client_stats().num_calls_finished(); |
|
|
|
|
client_stats_.num_calls_finished_with_client_failed_to_send += |
|
|
|
|
request.client_stats() |
|
|
|
|
.num_calls_finished_with_client_failed_to_send(); |
|
|
|
|
client_stats_.num_calls_finished_known_received += |
|
|
|
|
request.client_stats().num_calls_finished_known_received(); |
|
|
|
|
for (const auto& drop_token_count : |
|
|
|
|
request.client_stats().calls_finished_with_drop()) { |
|
|
|
|
client_stats_ |
|
|
|
|
.drop_token_counts[drop_token_count.load_balance_token()] += |
|
|
|
|
drop_token_count.num_calls(); |
|
|
|
|
if (client_load_reporting_interval_seconds_ > 0) { |
|
|
|
|
request.Clear(); |
|
|
|
|
if (stream->Read(&request)) { |
|
|
|
|
gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", |
|
|
|
|
this, request.DebugString().c_str()); |
|
|
|
|
GPR_ASSERT(request.has_client_stats()); |
|
|
|
|
// We need to acquire the lock here in order to prevent the notify_one
|
|
|
|
|
// below from firing before its corresponding wait is executed.
|
|
|
|
|
std::lock_guard<std::mutex> lock(mu_); |
|
|
|
|
client_stats_.num_calls_started += |
|
|
|
|
request.client_stats().num_calls_started(); |
|
|
|
|
client_stats_.num_calls_finished += |
|
|
|
|
request.client_stats().num_calls_finished(); |
|
|
|
|
client_stats_.num_calls_finished_with_client_failed_to_send += |
|
|
|
|
request.client_stats() |
|
|
|
|
.num_calls_finished_with_client_failed_to_send(); |
|
|
|
|
client_stats_.num_calls_finished_known_received += |
|
|
|
|
request.client_stats().num_calls_finished_known_received(); |
|
|
|
|
for (const auto& drop_token_count : |
|
|
|
|
request.client_stats().calls_finished_with_drop()) { |
|
|
|
|
client_stats_ |
|
|
|
|
.drop_token_counts[drop_token_count.load_balance_token()] += |
|
|
|
|
drop_token_count.num_calls(); |
|
|
|
|
} |
|
|
|
|
load_report_ready_ = true; |
|
|
|
|
load_report_cond_.notify_one(); |
|
|
|
|
} |
|
|
|
|
load_report_ready_ = true; |
|
|
|
|
load_report_cond_.notify_one(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
done: |
|
|
|
@ -1365,7 +1372,7 @@ class UpdatesTest : public GrpclbEnd2endTest { |
|
|
|
|
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST_F(UpdatesTest, UpdateBalancers) { |
|
|
|
|
TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { |
|
|
|
|
SetNextResolutionAllBalancers(); |
|
|
|
|
const std::vector<int> first_backend{GetBackendPorts()[0]}; |
|
|
|
|
const std::vector<int> second_backend{GetBackendPorts()[1]}; |
|
|
|
@ -1385,9 +1392,6 @@ TEST_F(UpdatesTest, UpdateBalancers) { |
|
|
|
|
// All 10 requests should have gone to the first backend.
|
|
|
|
|
EXPECT_EQ(10U, backends_[0]->service_.request_count()); |
|
|
|
|
|
|
|
|
|
balancers_[0]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[1]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[2]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
// Balancer 0 got a single request.
|
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
@ -1403,25 +1407,21 @@ TEST_F(UpdatesTest, UpdateBalancers) { |
|
|
|
|
SetNextResolution(addresses); |
|
|
|
|
gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); |
|
|
|
|
|
|
|
|
|
// Wait until update has been processed, as signaled by the second backend
|
|
|
|
|
// receiving a request.
|
|
|
|
|
EXPECT_EQ(0U, backends_[1]->service_.request_count()); |
|
|
|
|
WaitForBackend(1); |
|
|
|
|
|
|
|
|
|
backends_[1]->service_.ResetCounters(); |
|
|
|
|
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); |
|
|
|
|
CheckRpcSendOk(10); |
|
|
|
|
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); |
|
|
|
|
// All 10 requests should have gone to the second backend.
|
|
|
|
|
EXPECT_EQ(10U, backends_[1]->service_.request_count()); |
|
|
|
|
gpr_timespec deadline = gpr_time_add( |
|
|
|
|
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); |
|
|
|
|
// Send 10 seconds worth of RPCs
|
|
|
|
|
do { |
|
|
|
|
CheckRpcSendOk(); |
|
|
|
|
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); |
|
|
|
|
// The current LB call is still working, so grpclb continued using it to the
|
|
|
|
|
// first balancer, which doesn't assign the second backend.
|
|
|
|
|
EXPECT_EQ(0U, backends_[1]->service_.request_count()); |
|
|
|
|
|
|
|
|
|
balancers_[0]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[1]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[2]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.response_count()); |
|
|
|
|
EXPECT_EQ(1U, balancers_[1]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancers_[1]->service_.response_count()); |
|
|
|
|
EXPECT_EQ(0U, balancers_[1]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(0U, balancers_[1]->service_.response_count()); |
|
|
|
|
EXPECT_EQ(0U, balancers_[2]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(0U, balancers_[2]->service_.response_count()); |
|
|
|
|
} |
|
|
|
@ -1532,9 +1532,6 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { |
|
|
|
|
EXPECT_EQ(20U, backends_[0]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(0U, backends_[1]->service_.request_count()); |
|
|
|
|
|
|
|
|
|
balancers_[0]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[1]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[2]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
// Balancer 0 got a single request.
|
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.request_count()); |
|
|
|
|
// and sent a single response.
|
|
|
|
@ -1564,9 +1561,6 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { |
|
|
|
|
// All 10 requests should have gone to the second backend.
|
|
|
|
|
EXPECT_EQ(10U, backends_[1]->service_.request_count()); |
|
|
|
|
|
|
|
|
|
balancers_[0]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[1]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
balancers_[2]->service_.NotifyDoneWithServerlists(); |
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.request_count()); |
|
|
|
|
EXPECT_EQ(1U, balancers_[0]->service_.response_count()); |
|
|
|
|
// The second balancer, published as part of the first update, may end up
|
|
|
|
|