diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2dc83f0f292..6a822e35c78 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -123,15 +123,13 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); - } - std::unique_ptr timer(new UsageTimer); - timer_.swap(timer); - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); + threads_[i]->Swap(&to_merge[i]); latencies.Merge(to_merge[i]); } delete[] to_merge; + + std::unique_ptr timer(new UsageTimer); + timer_.swap(timer); timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -227,7 +225,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -240,16 +237,9 @@ class Client { impl_.join(); } - void BeginSwap(Histogram* n) { + void Swap(Histogram* n) { std::lock_guard g(mu_); - new_stats_ = n; - } - - void EndSwap() { - std::unique_lock g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; + n->Swap(&histogram_); } void MergeStatsInto(Histogram* hist) { @@ -263,10 +253,11 @@ class Client { void ThreadFunc() { for (;;) { + // lock since the thread should only be doing one thing at a time + std::lock_guard g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done - std::lock_guard g(mu_); + // see if we're done if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -274,19 +265,11 @@ class Client { if (done_) { return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_;