From bac25901d7125f30092fd63aba95ca9675654875 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 1 Apr 2020 08:28:46 -0700 Subject: [PATCH] Fix flakiness in grpclb SingleBalancerWithClientLoadReportingTest.Vanilla test. --- test/cpp/end2end/grpclb_end2end_test.cc | 56 +++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 79969f9c5e3..5e42c5ee017 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -16,6 +16,7 @@ * */ +#include #include #include #include @@ -253,30 +254,31 @@ class BalancerServiceImpl : public BalancerService { if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); - if (stream->Read(&request)) { + while (stream->Read(&request)) { gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", this, request.DebugString().c_str()); GPR_ASSERT(request.has_client_stats()); - // We need to acquire the lock here in order to prevent the notify_one - // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(&mu_); - client_stats_.num_calls_started += + ClientStats load_report; + load_report.num_calls_started = request.client_stats().num_calls_started(); - client_stats_.num_calls_finished += + load_report.num_calls_finished = request.client_stats().num_calls_finished(); - client_stats_.num_calls_finished_with_client_failed_to_send += + load_report.num_calls_finished_with_client_failed_to_send = request.client_stats() .num_calls_finished_with_client_failed_to_send(); - client_stats_.num_calls_finished_known_received += + load_report.num_calls_finished_known_received = request.client_stats().num_calls_finished_known_received(); for (const auto& drop_token_count : request.client_stats().calls_finished_with_drop()) { - client_stats_ - .drop_token_counts[drop_token_count.load_balance_token()] += + load_report + .drop_token_counts[drop_token_count.load_balance_token()] = drop_token_count.num_calls(); } - load_report_ready_ = true; - load_report_cond_.Signal(); + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + grpc::internal::MutexLock lock(&mu_); + load_report_queue_.emplace_back(std::move(load_report)); + if (load_report_cond_ != nullptr) load_report_cond_->Signal(); } } } @@ -293,9 +295,8 @@ class BalancerServiceImpl : public BalancerService { void Start() { grpc::internal::MutexLock lock(&mu_); serverlist_done_ = false; - load_report_ready_ = false; responses_and_delays_.clear(); - client_stats_.Reset(); + load_report_queue_.clear(); } void Shutdown() { @@ -327,11 +328,18 @@ class BalancerServiceImpl : public BalancerService { return response; } - const ClientStats& WaitForLoadReport() { + ClientStats WaitForLoadReport() { grpc::internal::MutexLock lock(&mu_); - load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; }); - load_report_ready_ = false; - return client_stats_; + grpc::internal::CondVar cv; + if (load_report_queue_.empty()) { + load_report_cond_ = &cv; + load_report_cond_->WaitUntil( + &mu_, [this] { return !load_report_queue_.empty(); }); + load_report_cond_ = nullptr; + } + ClientStats load_report = std::move(load_report_queue_.front()); + load_report_queue_.pop_front(); + return load_report; } void NotifyDoneWithServerlists() { @@ -357,12 +365,12 @@ class BalancerServiceImpl : public BalancerService { const int client_load_reporting_interval_seconds_; std::vector responses_and_delays_; + grpc::internal::Mutex mu_; - grpc::internal::CondVar load_report_cond_; - bool load_report_ready_ = false; grpc::internal::CondVar serverlist_cond_; bool serverlist_done_ = false; - ClientStats client_stats_; + grpc::internal::CondVar* load_report_cond_ = nullptr; + std::deque load_report_queue_; }; class GrpclbEnd2endTest : public ::testing::Test { @@ -1900,7 +1908,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { // and sent a single response. EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - const ClientStats client_stats = WaitForLoadReports(); + ClientStats client_stats; + do { + client_stats += WaitForLoadReports(); + } while (client_stats.num_calls_finished != + kNumRpcsPerAddress * num_backends_ + num_ok); EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,