From 5ec78a286d7be61aec929b133c031a7a1af262df Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Fri, 14 Dec 2018 10:36:51 -0800 Subject: [PATCH] Added support for fixed load benchmarks, all the rpcs access one requestor to the get the next issue time for the RPC --- test/cpp/qps/client_callback.cc | 35 ++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc index 00d5853a8e8..1880f46d43d 100644 --- a/test/cpp/qps/client_callback.cc +++ b/test/cpp/qps/client_callback.cc @@ -66,7 +66,10 @@ class CallbackClient config, BenchmarkStubCreator) { num_threads_ = NumThreads(config); rpcs_done_ = 0; - SetupLoadTest(config, num_threads_); + + // Don't divide the fixed load among threads as the user threads + // only bootstrap the RPCs + SetupLoadTest(config, 1); total_outstanding_rpcs_ = config.client_channels() * config.outstanding_rpcs_per_channel(); } @@ -87,6 +90,11 @@ class CallbackClient } } + gpr_timespec NextIssueTime() { + std::lock_guard l(next_issue_time_mu_); + return Client::NextIssueTime(0); + } + protected: size_t num_threads_; size_t total_outstanding_rpcs_; @@ -108,6 +116,8 @@ class CallbackClient } private: + std::mutex next_issue_time_mu_; // Used by next issue time + int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing @@ -146,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient { bool ThreadFuncImpl(Thread* t, size_t thread_idx) override { for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; vector_idx += num_threads_) { - ScheduleRpc(t, thread_idx, vector_idx); + ScheduleRpc(t, vector_idx); } return true; } @@ -154,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient { void InitThreadFuncImpl(size_t thread_idx) override { return; } private: - void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) { + void ScheduleRpc(Thread* t, size_t vector_idx) { if (!closed_loop_) { - gpr_timespec next_issue_time = NextIssueTime(thread_idx); + gpr_timespec next_issue_time = NextIssueTime(); // Start an alarm callback to run the internal callback after // next_issue_time ctx_[vector_idx]->alarm_.experimental().Set( - next_issue_time, [this, t, thread_idx, vector_idx](bool ok) { - IssueUnaryCallbackRpc(t, thread_idx, vector_idx); + next_issue_time, [this, t, vector_idx](bool ok) { + IssueUnaryCallbackRpc(t, vector_idx); }); } else { - IssueUnaryCallbackRpc(t, thread_idx, vector_idx); + IssueUnaryCallbackRpc(t, vector_idx); } } - void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) { + void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) { GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0); double start = UsageTimer::Now(); ctx_[vector_idx]->stub_->experimental_async()->UnaryCall( (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_, - [this, t, thread_idx, start, vector_idx](grpc::Status s) { + [this, t, start, vector_idx](grpc::Status s) { // Update Histogram with data from the callback run HistogramEntry entry; if (s.ok()) { @@ -190,7 +200,7 @@ class CallbackUnaryClient final : public CallbackClient { ctx_[vector_idx].reset( new CallbackClientRpcContext(ctx_[vector_idx]->stub_)); // Schedule a new RPC - ScheduleRpc(t, thread_idx, vector_idx); + ScheduleRpc(t, vector_idx); } }); } @@ -287,7 +297,7 @@ class CallbackStreamingPingPongReactor final if (client_->ThreadCompleted()) return; if (!client_->IsClosedLoop()) { - gpr_timespec next_issue_time = client_->NextIssueTime(thread_idx_); + gpr_timespec next_issue_time = client_->NextIssueTime(); // Start an alarm callback to run the internal callback after // next_issue_time ctx_->alarm_.experimental().Set(next_issue_time, @@ -298,11 +308,9 @@ class CallbackStreamingPingPongReactor final } void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; } - void set_thread_idx(int thread_idx) { thread_idx_ = thread_idx; } CallbackStreamingPingPongClient* client_; std::unique_ptr ctx_; - int thread_idx_; // Needed to update histogram entries void* thread_ptr_; // Needed to update histogram entries double start_; // Track message start time int messages_issued_; // Messages issued by this stream @@ -323,7 +331,6 @@ class CallbackStreamingPingPongClientImpl final for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; vector_idx += num_threads_) { reactor_[vector_idx]->set_thread_ptr(t); - reactor_[vector_idx]->set_thread_idx(thread_idx); reactor_[vector_idx]->ScheduleRpc(); } return true;