Added support for fixed load benchmarks, all the rpcs access one requestor to the get the next issue time for the RPC

pull/17461/head
Moiz Haidry 6 years ago
parent e6e1081499
commit 5ec78a286d
  1. 35
      test/cpp/qps/client_callback.cc

@ -66,7 +66,10 @@ class CallbackClient
config, BenchmarkStubCreator) { config, BenchmarkStubCreator) {
num_threads_ = NumThreads(config); num_threads_ = NumThreads(config);
rpcs_done_ = 0; rpcs_done_ = 0;
SetupLoadTest(config, num_threads_);
// Don't divide the fixed load among threads as the user threads
// only bootstrap the RPCs
SetupLoadTest(config, 1);
total_outstanding_rpcs_ = total_outstanding_rpcs_ =
config.client_channels() * config.outstanding_rpcs_per_channel(); config.client_channels() * config.outstanding_rpcs_per_channel();
} }
@ -87,6 +90,11 @@ class CallbackClient
} }
} }
gpr_timespec NextIssueTime() {
std::lock_guard<std::mutex> l(next_issue_time_mu_);
return Client::NextIssueTime(0);
}
protected: protected:
size_t num_threads_; size_t num_threads_;
size_t total_outstanding_rpcs_; size_t total_outstanding_rpcs_;
@ -108,6 +116,8 @@ class CallbackClient
} }
private: private:
std::mutex next_issue_time_mu_; // Used by next issue time
int NumThreads(const ClientConfig& config) { int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads(); int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing if (num_threads <= 0) { // Use dynamic sizing
@ -146,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient {
bool ThreadFuncImpl(Thread* t, size_t thread_idx) override { bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) { vector_idx += num_threads_) {
ScheduleRpc(t, thread_idx, vector_idx); ScheduleRpc(t, vector_idx);
} }
return true; return true;
} }
@ -154,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient {
void InitThreadFuncImpl(size_t thread_idx) override { return; } void InitThreadFuncImpl(size_t thread_idx) override { return; }
private: private:
void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) { void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) { if (!closed_loop_) {
gpr_timespec next_issue_time = NextIssueTime(thread_idx); gpr_timespec next_issue_time = NextIssueTime();
// Start an alarm callback to run the internal callback after // Start an alarm callback to run the internal callback after
// next_issue_time // next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set( ctx_[vector_idx]->alarm_.experimental().Set(
next_issue_time, [this, t, thread_idx, vector_idx](bool ok) { next_issue_time, [this, t, vector_idx](bool ok) {
IssueUnaryCallbackRpc(t, thread_idx, vector_idx); IssueUnaryCallbackRpc(t, vector_idx);
}); });
} else { } else {
IssueUnaryCallbackRpc(t, thread_idx, vector_idx); IssueUnaryCallbackRpc(t, vector_idx);
} }
} }
void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) { void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0); GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
double start = UsageTimer::Now(); double start = UsageTimer::Now();
ctx_[vector_idx]->stub_->experimental_async()->UnaryCall( ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
(&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_, (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
[this, t, thread_idx, start, vector_idx](grpc::Status s) { [this, t, start, vector_idx](grpc::Status s) {
// Update Histogram with data from the callback run // Update Histogram with data from the callback run
HistogramEntry entry; HistogramEntry entry;
if (s.ok()) { if (s.ok()) {
@ -190,7 +200,7 @@ class CallbackUnaryClient final : public CallbackClient {
ctx_[vector_idx].reset( ctx_[vector_idx].reset(
new CallbackClientRpcContext(ctx_[vector_idx]->stub_)); new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
// Schedule a new RPC // Schedule a new RPC
ScheduleRpc(t, thread_idx, vector_idx); ScheduleRpc(t, vector_idx);
} }
}); });
} }
@ -287,7 +297,7 @@ class CallbackStreamingPingPongReactor final
if (client_->ThreadCompleted()) return; if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) { if (!client_->IsClosedLoop()) {
gpr_timespec next_issue_time = client_->NextIssueTime(thread_idx_); gpr_timespec next_issue_time = client_->NextIssueTime();
// Start an alarm callback to run the internal callback after // Start an alarm callback to run the internal callback after
// next_issue_time // next_issue_time
ctx_->alarm_.experimental().Set(next_issue_time, ctx_->alarm_.experimental().Set(next_issue_time,
@ -298,11 +308,9 @@ class CallbackStreamingPingPongReactor final
} }
void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; } void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; }
void set_thread_idx(int thread_idx) { thread_idx_ = thread_idx; }
CallbackStreamingPingPongClient* client_; CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_; std::unique_ptr<CallbackClientRpcContext> ctx_;
int thread_idx_; // Needed to update histogram entries
void* thread_ptr_; // Needed to update histogram entries void* thread_ptr_; // Needed to update histogram entries
double start_; // Track message start time double start_; // Track message start time
int messages_issued_; // Messages issued by this stream int messages_issued_; // Messages issued by this stream
@ -323,7 +331,6 @@ class CallbackStreamingPingPongClientImpl final
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) { vector_idx += num_threads_) {
reactor_[vector_idx]->set_thread_ptr(t); reactor_[vector_idx]->set_thread_ptr(t);
reactor_[vector_idx]->set_thread_idx(thread_idx);
reactor_[vector_idx]->ScheduleRpc(); reactor_[vector_idx]->ScheduleRpc();
} }
return true; return true;

Loading…
Cancel
Save