Stop holding histogram for a long time

pull/7285/head
Vijay Pai 9 years ago
parent c28a6c1b3b
commit f373f2cf8b
  1. 35
      test/cpp/qps/client.h
  2. 19
      test/cpp/qps/client_async.cc
  3. 10
      test/cpp/qps/client_sync.cc

@ -112,6 +112,17 @@ class ClientRequestCreator<ByteBuffer> {
}
};
class HistogramEntry GRPC_FINAL {
public:
HistogramEntry(): used_(false) {}
bool used() const {return used_;}
double value() const {return value_;}
void set_value(double v) {used_ = true; value_ = v;}
private:
bool used_;
double value_;
};
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@ -162,7 +173,7 @@ class Client {
void EndThreads() { threads_.clear(); }
virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@ -215,7 +226,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@ -230,14 +240,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_stats_ = n;
n->Swap(&histogram_);
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
void MergeStatsInto(Histogram* hist) {
@ -252,9 +258,13 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
HistogramEntry entry;
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
if (entry.used()) {
histogram_.Add(entry.value());
}
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
@ -262,17 +272,10 @@ class Client {
if (done_) {
return;
}
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_stats_;
Histogram histogram_;

@ -48,7 +48,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
@ -64,7 +63,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
@ -104,7 +103,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
@ -114,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
@ -201,7 +200,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
bool ThreadFunc(Histogram* histogram,
bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
@ -209,7 +208,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (!ctx->RunNextState(ok, histogram)) {
if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx
// and kickstart the new one
auto clone = ctx->StartNewClone();
@ -298,7 +297,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@ -330,7 +329,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@ -430,7 +429,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@ -462,7 +461,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around

@ -46,7 +46,6 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -55,7 +54,6 @@
#include "src/core/lib/profiling/timers.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
@ -100,7 +98,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
}
~SynchronousUnaryClient() { EndThreads(); }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
@ -108,7 +106,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
histogram->Add((UsageTimer::Now() - start) * 1e9);
entry->set_value((UsageTimer::Now() - start) * 1e9);
return s.ok();
}
};
@ -139,13 +137,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
delete[] context_;
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((UsageTimer::Now() - start) * 1e9);
entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
return false;

Loading…
Cancel
Save