Addressed PR comments. Made Client::Thread public and removed use of void ptr to refer it. Avoided overloading of NextIssue TIme by renaming it NextRPCIssueTime

pull/17461/head
Moiz Haidry 6 years ago
parent 5ec78a286d
commit 7bb853ebdd
  1. 116
      test/cpp/qps/client.h
  2. 14
      test/cpp/qps/client_callback.cc

@ -251,64 +251,6 @@ class Client {
return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}
void EndThreads() {
MaybeStartRequests();
threads_.clear();
}
virtual void DestroyMultithreading() = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
const auto& load = config.load_params();
std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(gpr_time_add(
now,
gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
}
}
}
std::function<gpr_timespec()> NextIssuer(int thread_idx) {
return closed_loop_ ? std::function<gpr_timespec()>()
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
class Thread {
public:
Thread(Client* client, size_t idx)
@ -387,6 +329,64 @@ class Client {
double interval_start_time_;
};
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}
void EndThreads() {
MaybeStartRequests();
threads_.clear();
}
virtual void DestroyMultithreading() = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
const auto& load = config.load_params();
std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(gpr_time_add(
now,
gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
}
}
}
std::function<gpr_timespec()> NextIssuer(int thread_idx) {
return closed_loop_ ? std::function<gpr_timespec()>()
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
std::vector<std::unique_ptr<Thread>> threads_;

@ -90,7 +90,7 @@ class CallbackClient
}
}
gpr_timespec NextIssueTime() {
gpr_timespec NextRPCIssueTime() {
std::lock_guard<std::mutex> l(next_issue_time_mu_);
return Client::NextIssueTime(0);
}
@ -166,7 +166,7 @@ class CallbackUnaryClient final : public CallbackClient {
private:
void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) {
gpr_timespec next_issue_time = NextIssueTime();
gpr_timespec next_issue_time = NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set(
@ -221,13 +221,13 @@ class CallbackStreamingClient : public CallbackClient {
}
~CallbackStreamingClient() {}
void AddHistogramEntry(double start_, bool ok, void* thread_ptr) {
void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (ok) {
entry.set_value((UsageTimer::Now() - start_) * 1e9);
}
((Client::Thread*)thread_ptr)->UpdateHistogram(&entry);
thread_ptr->UpdateHistogram(&entry);
}
int messages_per_stream() { return messages_per_stream_; }
@ -297,7 +297,7 @@ class CallbackStreamingPingPongReactor final
if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) {
gpr_timespec next_issue_time = client_->NextIssueTime();
gpr_timespec next_issue_time = client_->NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_->alarm_.experimental().Set(next_issue_time,
@ -307,11 +307,11 @@ class CallbackStreamingPingPongReactor final
}
}
void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; }
void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_;
void* thread_ptr_; // Needed to update histogram entries
Client::Thread* thread_ptr_; // Needed to update histogram entries
double start_; // Track message start time
int messages_issued_; // Messages issued by this stream
};

Loading…
Cancel
Save