Simplify Mark processing code

pull/5444/head
vjpai 9 years ago
parent 02eda41d21
commit e7042b5ddf
  1. 35
      test/cpp/qps/client.h

@ -123,15 +123,13 @@ class Client {
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
threads_[i]->Swap(&to_merge[i]);
latencies.Merge(to_merge[i]);
}
delete[] to_merge;
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer_.swap(timer);
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
@ -227,7 +225,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@ -240,16 +237,9 @@ class Client {
impl_.join();
}
void BeginSwap(Histogram* n) {
void Swap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_stats_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_stats_ != nullptr) {
cv_.wait(g);
};
n->Swap(&histogram_);
}
void MergeStatsInto(Histogram* hist) {
@ -263,10 +253,11 @@ class Client {
void ThreadFunc() {
for (;;) {
// lock since the thread should only be doing one thing at a time
std::lock_guard<std::mutex> g(mu_);
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
// see if we're done
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
@ -274,19 +265,11 @@ class Client {
if (done_) {
return;
}
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;

Loading…
Cancel
Save