diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2dc5b3860f3..45481a39180 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,6 +35,7 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -46,7 +47,8 @@ namespace testing { class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer) { + explicit Client(const ClientConfig& config) : timer_(new Timer), + interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -105,7 +107,60 @@ class Client { void EndThreads() { threads_.clear(); } virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + if (config.load_type() == CLOSED_LOOP) { + closed_loop_ = true; + } + else { + closed_loop_ = false; + + std::unique_ptr random_dist; + auto& load = config.load_params(); + switch (config.load_type()) { + case POISSON: + random_dist.reset + (new ExpDist(load.poisson().offered_load()/num_threads)); + break; + case UNIFORM: + random_dist.reset + (new UniformDist(load.uniform().interarrival_lo()*num_threads, + load.uniform().interarrival_hi()*num_threads)); + break; + case DETERMINISTIC: + random_dist.reset + (new DetDist(num_threads/load.determ().offered_load())); + break; + case PARETO: + random_dist.reset + (new ParetoDist(load.pareto().interarrival_base()*num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + break; + } + interarrival_timer_.init(*random_dist, num_threads); + for (size_t i = 0; i + bool NextIssueTime(int thread_idx, Timepoint *time_delay) { + if (closed_loop_) { + return false; + } + else { + *time_delay = next_time_[thread_idx]; + next_time_[thread_idx] += interarrival_timer_(thread_idx); + return true; + } + } + private: class Thread { public: @@ -166,6 +221,11 @@ class Client { std::vector> threads_; std::unique_ptr timer_; + + bool closed_loop_; + InterarrivalTimer interarrival_timer_; + std::vector> next_time_; }; std::unique_ptr diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 609d0034603..8573ddb542c 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -66,70 +66,25 @@ namespace testing { class SynchronousClient : public Client { public: - SynchronousClient(const ClientConfig& config) : Client(config), - interarrival_timer_() { + SynchronousClient(const ClientConfig& config) : Client(config) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); - - // Now sort out the load test type - if (config.load_type() == CLOSED_LOOP) { - closed_loop_ = true; - } - else { - closed_loop_ = false; - - std::unique_ptr random_dist; - auto& load = config.load_params(); - switch (config.load_type()) { - case POISSON: - random_dist.reset - (new ExpDist(load.poisson().offered_load()/num_threads_)); - break; - case UNIFORM: - random_dist.reset - (new UniformDist(load.uniform().interarrival_lo()*num_threads_, - load.uniform().interarrival_hi()*num_threads_)); - break; - case DETERMINISTIC: - random_dist.reset - (new DetDist(num_threads_/load.determ().offered_load())); - break; - case PARETO: - random_dist.reset - (new ParetoDist(load.pareto().interarrival_base()*num_threads_, - load.pareto().alpha())); - break; - default: - GPR_ASSERT(false); - break; - } - - interarrival_timer_.init(*random_dist, num_threads_); - for (size_t i = 0; i next_time; + if (NextIssueTime(thread_idx, &next_time)) { + std::this_thread::sleep_until(next_time); } } size_t num_threads_; std::vector responses_; - private: - bool closed_loop_; - InterarrivalTimer interarrival_timer_; - std::vector> next_time_; }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {