diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2a89eb80182..047bd164082 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -125,13 +125,15 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Swap(&to_merge[i]); - latencies.Merge(to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i]); } - delete[] to_merge; - std::unique_ptr timer(new UsageTimer); timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(to_merge[i]); + } + delete[] to_merge; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -213,6 +215,7 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), + new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -225,9 +228,16 @@ class Client { impl_.join(); } - void Swap(Histogram* n) { + void BeginSwap(Histogram* n) { std::lock_guard g(mu_); - n->Swap(&histogram_); + new_stats_ = n; + } + + void EndSwap() { + std::unique_lock g(mu_); + while (new_stats_ != nullptr) { + cv_.wait(g); + }; } void MergeStatsInto(Histogram* hist) { @@ -241,11 +251,10 @@ class Client { void ThreadFunc() { for (;;) { - // lock since the thread should only be doing one thing at a time - std::lock_guard g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // see if we're done + // lock, see if we're done + std::lock_guard g(mu_); if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -253,11 +262,19 @@ class Client { if (done_) { return; } + // check if we're resetting stats, swap out the histogram if so + if (new_stats_) { + new_stats_->Swap(&histogram_); + new_stats_ = nullptr; + cv_.notify_one(); + } } } std::mutex mu_; + std::condition_variable cv_; bool done_; + Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_;