diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 0b9837660be..4b8ac9bd94e 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -251,64 +251,6 @@ class Client { return static_cast(gpr_atm_acq_load(&thread_pool_done_)); } - protected: - bool closed_loop_; - gpr_atm thread_pool_done_; - double median_latency_collection_interval_seconds_; // In seconds - - void StartThreads(size_t num_threads) { - gpr_atm_rel_store(&thread_pool_done_, static_cast(false)); - threads_remaining_ = num_threads; - for (size_t i = 0; i < num_threads; i++) { - threads_.emplace_back(new Thread(this, i)); - } - } - - void EndThreads() { - MaybeStartRequests(); - threads_.clear(); - } - - virtual void DestroyMultithreading() = 0; - - void SetupLoadTest(const ClientConfig& config, size_t num_threads) { - // Set up the load distribution based on the number of threads - const auto& load = config.load_params(); - - std::unique_ptr random_dist; - switch (load.load_case()) { - case LoadParams::kClosedLoop: - // Closed-loop doesn't use random dist at all - break; - case LoadParams::kPoisson: - random_dist.reset( - new ExpDist(load.poisson().offered_load() / num_threads)); - break; - default: - GPR_ASSERT(false); - } - - // Set closed_loop_ based on whether or not random_dist is set - if (!random_dist) { - closed_loop_ = true; - } else { - closed_loop_ = false; - // set up interarrival timer according to random dist - interarrival_timer_.init(*random_dist, num_threads); - const auto now = gpr_now(GPR_CLOCK_MONOTONIC); - for (size_t i = 0; i < num_threads; i++) { - next_time_.push_back(gpr_time_add( - now, - gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN))); - } - } - } - - std::function NextIssuer(int thread_idx) { - return closed_loop_ ? std::function() - : std::bind(&Client::NextIssueTime, this, thread_idx); - } - class Thread { public: Thread(Client* client, size_t idx) @@ -387,6 +329,64 @@ class Client { double interval_start_time_; }; + protected: + bool closed_loop_; + gpr_atm thread_pool_done_; + double median_latency_collection_interval_seconds_; // In seconds + + void StartThreads(size_t num_threads) { + gpr_atm_rel_store(&thread_pool_done_, static_cast(false)); + threads_remaining_ = num_threads; + for (size_t i = 0; i < num_threads; i++) { + threads_.emplace_back(new Thread(this, i)); + } + } + + void EndThreads() { + MaybeStartRequests(); + threads_.clear(); + } + + virtual void DestroyMultithreading() = 0; + + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + const auto& load = config.load_params(); + + std::unique_ptr random_dist; + switch (load.load_case()) { + case LoadParams::kClosedLoop: + // Closed-loop doesn't use random dist at all + break; + case LoadParams::kPoisson: + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); + break; + default: + GPR_ASSERT(false); + } + + // Set closed_loop_ based on whether or not random_dist is set + if (!random_dist) { + closed_loop_ = true; + } else { + closed_loop_ = false; + // set up interarrival timer according to random dist + interarrival_timer_.init(*random_dist, num_threads); + const auto now = gpr_now(GPR_CLOCK_MONOTONIC); + for (size_t i = 0; i < num_threads; i++) { + next_time_.push_back(gpr_time_add( + now, + gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN))); + } + } + } + + std::function NextIssuer(int thread_idx) { + return closed_loop_ ? std::function() + : std::bind(&Client::NextIssueTime, this, thread_idx); + } + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; std::vector> threads_; diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc index 1880f46d43d..4a06325f2b7 100644 --- a/test/cpp/qps/client_callback.cc +++ b/test/cpp/qps/client_callback.cc @@ -90,7 +90,7 @@ class CallbackClient } } - gpr_timespec NextIssueTime() { + gpr_timespec NextRPCIssueTime() { std::lock_guard l(next_issue_time_mu_); return Client::NextIssueTime(0); } @@ -166,7 +166,7 @@ class CallbackUnaryClient final : public CallbackClient { private: void ScheduleRpc(Thread* t, size_t vector_idx) { if (!closed_loop_) { - gpr_timespec next_issue_time = NextIssueTime(); + gpr_timespec next_issue_time = NextRPCIssueTime(); // Start an alarm callback to run the internal callback after // next_issue_time ctx_[vector_idx]->alarm_.experimental().Set( @@ -221,13 +221,13 @@ class CallbackStreamingClient : public CallbackClient { } ~CallbackStreamingClient() {} - void AddHistogramEntry(double start_, bool ok, void* thread_ptr) { + void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) { // Update Histogram with data from the callback run HistogramEntry entry; if (ok) { entry.set_value((UsageTimer::Now() - start_) * 1e9); } - ((Client::Thread*)thread_ptr)->UpdateHistogram(&entry); + thread_ptr->UpdateHistogram(&entry); } int messages_per_stream() { return messages_per_stream_; } @@ -297,7 +297,7 @@ class CallbackStreamingPingPongReactor final if (client_->ThreadCompleted()) return; if (!client_->IsClosedLoop()) { - gpr_timespec next_issue_time = client_->NextIssueTime(); + gpr_timespec next_issue_time = client_->NextRPCIssueTime(); // Start an alarm callback to run the internal callback after // next_issue_time ctx_->alarm_.experimental().Set(next_issue_time, @@ -307,13 +307,13 @@ class CallbackStreamingPingPongReactor final } } - void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; } + void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; } CallbackStreamingPingPongClient* client_; std::unique_ptr ctx_; - void* thread_ptr_; // Needed to update histogram entries - double start_; // Track message start time - int messages_issued_; // Messages issued by this stream + Client::Thread* thread_ptr_; // Needed to update histogram entries + double start_; // Track message start time + int messages_issued_; // Messages issued by this stream }; class CallbackStreamingPingPongClientImpl final