Merge branch 'alarm_openloop' into loaded

pull/5240/head
Vijay Pai 9 years ago
commit c84cb3e00a
  1. 2
      Makefile
  2. 1
      build.yaml
  3. 2
      src/core/surface/alarm.c
  4. 63
      test/cpp/qps/client.h
  5. 469
      test/cpp/qps/client_async.cc
  6. 7
      test/cpp/qps/client_sync.cc
  7. 41
      test/cpp/qps/interarrival.h
  8. 10
      test/cpp/qps/qps_interarrival_test.cc
  9. 2
      test/cpp/qps/qps_openloop_test.cc
  10. 2
      test/cpp/qps/qps_test.cc
  11. 18
      tools/run_tests/tests.json

@ -1597,6 +1597,8 @@ test_cxx: test_zookeeper buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 )
$(E) "[RUN] Testing mock_test" $(E) "[RUN] Testing mock_test"
$(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 )
$(E) "[RUN] Testing qps_openloop_test"
$(Q) $(BINDIR)/$(CONFIG)/qps_openloop_test || ( echo test qps_openloop_test failed ; exit 1 )
$(E) "[RUN] Testing qps_test" $(E) "[RUN] Testing qps_test"
$(Q) $(BINDIR)/$(CONFIG)/qps_test || ( echo test qps_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/qps_test || ( echo test qps_test failed ; exit 1 )
$(E) "[RUN] Testing secure_auth_context_test" $(E) "[RUN] Testing secure_auth_context_test"

@ -2299,7 +2299,6 @@ targets:
- posix - posix
- name: qps_openloop_test - name: qps_openloop_test
build: test build: test
run: false
language: c++ language: c++
src: src:
- test/cpp/qps/qps_openloop_test.cc - test/cpp/qps/qps_openloop_test.cc

@ -63,9 +63,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq; alarm->cq = cq;
alarm->tag = tag; alarm->tag = tag;
grpc_cq_begin_op(cq, tag);
grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm,
gpr_now(GPR_CLOCK_MONOTONIC)); gpr_now(GPR_CLOCK_MONOTONIC));
grpc_cq_begin_op(cq, tag);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
return alarm; return alarm;
} }

@ -41,6 +41,7 @@
#include <grpc++/support/byte_buffer.h> #include <grpc++/support/byte_buffer.h>
#include <grpc++/support/slice.h> #include <grpc++/support/slice.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/proto/grpc/testing/payloads.grpc.pb.h" #include "src/proto/grpc/testing/payloads.grpc.pb.h"
#include "src/proto/grpc/testing/services.grpc.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h"
@ -52,27 +53,8 @@
#include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/create_test_channel.h"
namespace grpc { namespace grpc {
#if defined(__APPLE__)
// Specialize Timepoint for high res clock as we need that
template <>
class TimePoint<std::chrono::high_resolution_clock::time_point> {
public:
TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
TimepointHR2Timespec(time, &time_);
}
gpr_timespec raw_time() const { return time_; }
private:
gpr_timespec time_;
};
#endif
namespace testing { namespace testing {
typedef std::chrono::high_resolution_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
template <class RequestType> template <class RequestType>
class ClientRequestCreator { class ClientRequestCreator {
public: public:
@ -184,7 +166,7 @@ class Client {
// Set up the load distribution based on the number of threads // Set up the load distribution based on the number of threads
const auto& load = config.load_params(); const auto& load = config.load_params();
std::unique_ptr<RandomDist> random_dist; std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) { switch (load.load_case()) {
case LoadParams::kClosedLoop: case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all // Closed-loop doesn't use random dist at all
@ -218,25 +200,26 @@ class Client {
closed_loop_ = false; closed_loop_ = false;
// set up interarrival timer according to random dist // set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads); interarrival_timer_.init(*random_dist, num_threads);
const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back( next_time_.push_back(gpr_time_add(
grpc_time_source::now() + now,
std::chrono::duration_cast<grpc_time_source::duration>( gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
interarrival_timer_(i)));
} }
} }
} }
bool NextIssueTime(int thread_idx, grpc_time* time_delay) { gpr_timespec NextIssueTime(int thread_idx) {
if (closed_loop_) { const gpr_timespec result = next_time_[thread_idx];
return false; next_time_[thread_idx] =
} else { gpr_time_add(next_time_[thread_idx],
*time_delay = next_time_[thread_idx]; gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
next_time_[thread_idx] += GPR_TIMESPAN));
std::chrono::duration_cast<grpc_time_source::duration>( return result;
interarrival_timer_(thread_idx)); }
return true; std::function<gpr_timespec()> NextIssuer(int thread_idx) {
} return closed_loop_ ? std::function<gpr_timespec()>()
: std::bind(&Client::NextIssueTime, this, thread_idx);
} }
private: private:
@ -306,7 +289,7 @@ class Client {
Histogram* new_stats_; Histogram* new_stats_;
Histogram histogram_; Histogram histogram_;
Client* client_; Client* client_;
size_t idx_; const size_t idx_;
std::thread impl_; std::thread impl_;
}; };
@ -314,7 +297,7 @@ class Client {
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
InterarrivalTimer interarrival_timer_; InterarrivalTimer interarrival_timer_;
std::vector<grpc_time> next_time_; std::vector<gpr_timespec> next_time_;
}; };
template <class StubType, class RequestType> template <class StubType, class RequestType>
@ -323,9 +306,9 @@ class ClientImpl : public Client {
ClientImpl(const ClientConfig& config, ClientImpl(const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub) create_stub)
: channels_(config.client_channels()), create_stub_(create_stub) { : cores_(LimitCores(config.core_list().data(), config.core_list_size())),
cores_ = LimitCores(config.core_list().data(), config.core_list_size()); channels_(config.client_channels()),
create_stub_(create_stub) {
for (int i = 0; i < config.client_channels(); i++) { for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()), channels_[i].init(config.server_targets(i % config.server_targets_size()),
config, create_stub_); config, create_stub_);
@ -337,7 +320,7 @@ class ClientImpl : public Client {
virtual ~ClientImpl() {} virtual ~ClientImpl() {}
protected: protected:
int cores_; const int cores_;
RequestType request_; RequestType request_;
class ClientChannelInfo { class ClientChannelInfo {

@ -43,9 +43,9 @@
#include <vector> #include <vector>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc++/alarm.h>
#include <grpc++/channel.h> #include <grpc++/channel.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include <grpc++/client_context.h>
#include <grpc++/generic/generic_stub.h> #include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
@ -60,11 +60,9 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
typedef std::list<grpc_time> deadline_list;
class ClientRpcContext { class ClientRpcContext {
public: public:
explicit ClientRpcContext(int ch) : channel_id_(ch) {} ClientRpcContext() {}
virtual ~ClientRpcContext() {} virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate // next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0; virtual bool RunNextState(bool, Histogram* hist) = 0;
@ -74,72 +72,73 @@ class ClientRpcContext {
return reinterpret_cast<ClientRpcContext*>(t); return reinterpret_cast<ClientRpcContext*>(t);
} }
deadline_list::iterator deadline_posn() const { return deadline_posn_; }
void set_deadline_posn(const deadline_list::iterator& it) {
deadline_posn_ = it;
}
virtual void Start(CompletionQueue* cq) = 0; virtual void Start(CompletionQueue* cq) = 0;
int channel_id() const { return channel_id_; }
protected:
int channel_id_;
private:
deadline_list::iterator deadline_posn_;
}; };
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext { class ClientRpcContextUnaryImpl : public ClientRpcContext {
public: public:
ClientRpcContextUnaryImpl( ClientRpcContextUnaryImpl(
int channel_id, BenchmarkService::Stub* stub, const RequestType& req, BenchmarkService::Stub* stub, const RequestType& req,
std::function<gpr_timespec()> next_issue,
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req, CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id), : context_(),
context_(),
stub_(stub), stub_(stub),
cq_(nullptr),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextUnaryImpl::RespDone), next_state_(State::READY),
callback_(on_done), callback_(on_done),
next_issue_(next_issue),
start_req_(start_req) {} start_req_(start_req) {}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
void Start(CompletionQueue* cq) GRPC_OVERRIDE { void Start(CompletionQueue* cq) GRPC_OVERRIDE {
start_ = Timer::Now(); cq_ = cq;
response_reader_ = start_req_(stub_, &context_, req_, cq); if (!next_issue_) { // ready to issue
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); RunNextState(true, nullptr);
} else { // wait for the issue time
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
} }
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool ret = (this->*next_state_)(ok); switch (next_state_) {
if (!ret) { case State::READY:
hist->Add((Timer::Now() - start_) * 1e9); start_ = Timer::Now();
response_reader_ = start_req_(stub_, &context_, req_, cq_);
response_reader_->Finish(&response_, &status_,
ClientRpcContext::tag(this));
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
hist->Add((Timer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
default:
GPR_ASSERT(false);
return false;
} }
return ret;
} }
ClientRpcContext* StartNewClone() GRPC_OVERRIDE { ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
callback_); callback_);
} }
private: private:
bool RespDone(bool) {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false;
}
bool DoCallBack(bool) {
callback_(status_, &response_);
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_; grpc::ClientContext context_;
BenchmarkService::Stub* stub_; BenchmarkService::Stub* stub_;
CompletionQueue* cq_;
std::unique_ptr<Alarm> alarm_;
RequestType req_; RequestType req_;
ResponseType response_; ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(bool); enum State { INVALID, READY, RESP_DONE };
State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_; CompletionQueue*)> start_req_;
@ -157,49 +156,35 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
// member name resolution until the template types are fully resolved // member name resolution until the template types are fully resolved
public: public:
using Client::SetupLoadTest; using Client::SetupLoadTest;
using Client::NextIssueTime;
using Client::closed_loop_; using Client::closed_loop_;
using Client::NextIssuer;
using ClientImpl<StubType, RequestType>::cores_; using ClientImpl<StubType, RequestType>::cores_;
using ClientImpl<StubType, RequestType>::channels_; using ClientImpl<StubType, RequestType>::channels_;
using ClientImpl<StubType, RequestType>::request_; using ClientImpl<StubType, RequestType>::request_;
AsyncClient(const ClientConfig& config, AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(int, StubType*, std::function<ClientRpcContext*(
const RequestType&)> setup_ctx, StubType*, std::function<gpr_timespec()> next_issue,
const RequestType&)> setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub) create_stub)
: ClientImpl<StubType, RequestType>(config, create_stub), : ClientImpl<StubType, RequestType>(config, create_stub),
num_async_threads_(NumThreads(config)), num_async_threads_(NumThreads(config)) {
channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()),
pref_channel_inc_(num_async_threads_) {
SetupLoadTest(config, num_async_threads_); SetupLoadTest(config, num_async_threads_);
for (int i = 0; i < num_async_threads_; i++) { for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
if (!closed_loop_) { next_issuers_.emplace_back(NextIssuer(i));
rpc_deadlines_.emplace_back();
next_channel_.push_back(i % channel_count_);
issue_allowed_.emplace_back(true);
grpc_time next_issue;
NextIssueTime(i, &next_issue);
next_issue_.push_back(next_issue);
}
} }
using namespace std::placeholders;
int t = 0; int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (int ch = 0; ch < channel_count_; ch++) { for (int ch = 0; ch < config.client_channels(); ch++) {
auto* cq = cli_cqs_[t].get(); auto* cq = cli_cqs_[t].get();
auto ctx =
setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
ctx->Start(cq);
t = (t + 1) % cli_cqs_.size(); t = (t + 1) % cli_cqs_.size();
auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
if (closed_loop_) {
ctx->Start(cq);
} else {
contexts_[ch].push_front(ctx);
}
} }
} }
} }
@ -212,140 +197,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
delete ClientRpcContext::detag(got_tag); delete ClientRpcContext::detag(got_tag);
} }
} }
// Now clear out all the pre-allocated idle contexts
for (int ch = 0; ch < channel_count_; ch++) {
while (!contexts_[ch].empty()) {
// Get an idle context from the front of the list
auto* ctx = *(contexts_[ch].begin());
contexts_[ch].pop_front();
delete ctx;
}
}
delete[] channel_lock_;
} }
bool ThreadFunc(Histogram* histogram, bool ThreadFunc(Histogram* histogram,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag; void* got_tag;
bool ok; bool ok;
grpc_time deadline, short_deadline;
if (closed_loop_) {
deadline = grpc_time_source::now() + std::chrono::seconds(1);
short_deadline = deadline;
} else {
if (rpc_deadlines_[thread_idx].empty()) {
deadline = grpc_time_source::now() + std::chrono::seconds(1);
} else {
deadline = *(rpc_deadlines_[thread_idx].begin());
}
short_deadline =
issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
}
bool got_event;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
case CompletionQueue::SHUTDOWN: // Got a regular event, so process it
return false;
case CompletionQueue::TIMEOUT:
got_event = false;
break;
case CompletionQueue::GOT_EVENT:
got_event = true;
break;
default:
GPR_ASSERT(false);
break;
}
if (got_event) {
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState(ok, histogram) == false) { if (!ctx->RunNextState(ok, histogram)) {
// call the callback and then clone the ctx // The RPC and callback are done, so clone the ctx
ctx->RunNextState(ok, histogram); // and kickstart the new one
ClientRpcContext* clone_ctx = ctx->StartNewClone(); auto clone = ctx->StartNewClone();
if (closed_loop_) { clone->Start(cli_cqs_[thread_idx].get());
clone_ctx->Start(cli_cqs_[thread_idx].get());
} else {
// Remove the entry from the rpc deadlines list
rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
// Put the clone_ctx in the list of idle contexts for this channel
// Under lock
int ch = clone_ctx->channel_id();
std::lock_guard<std::mutex> g(channel_lock_[ch]);
contexts_[ch].push_front(clone_ctx);
}
// delete the old version // delete the old version
delete ctx; delete ctx;
} }
if (!closed_loop_) return true;
issue_allowed_[thread_idx] = } else { // queue is shutting down
true; // may be ok now even if it hadn't been return false;
} }
if (!closed_loop_ && issue_allowed_[thread_idx] &&
grpc_time_source::now() >= next_issue_[thread_idx]) {
// Attempt to issue
bool issued = false;
for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
num_attempts < channel_count_ && !issued; num_attempts++) {
bool can_issue = false;
ClientRpcContext* ctx = nullptr;
{
std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
if (!contexts_[channel_attempt].empty()) {
// Get an idle context from the front of the list
ctx = *(contexts_[channel_attempt].begin());
contexts_[channel_attempt].pop_front();
can_issue = true;
}
}
if (can_issue) {
// do the work to issue
rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
std::chrono::seconds(1));
auto it = rpc_deadlines_[thread_idx].end();
--it;
ctx->set_deadline_posn(it);
ctx->Start(cli_cqs_[thread_idx].get());
issued = true;
// If we did issue, then next time, try our thread's next
// preferred channel
next_channel_[thread_idx] += pref_channel_inc_;
if (next_channel_[thread_idx] >= channel_count_)
next_channel_[thread_idx] = (thread_idx % channel_count_);
} else {
// Do a modular increment of channel attempt if we couldn't issue
channel_attempt = (channel_attempt + 1) % channel_count_;
}
}
if (issued) {
// We issued one; see when we can issue the next
grpc_time next_issue;
NextIssueTime(thread_idx, &next_issue);
next_issue_[thread_idx] = next_issue;
} else {
issue_allowed_[thread_idx] = false;
}
}
return true;
} }
protected: protected:
int num_async_threads_; const int num_async_threads_;
private: private:
class boolean { // exists only to avoid data-race on vector<bool>
public:
boolean() : val_(false) {}
boolean(bool b) : val_(b) {}
operator bool() const { return val_; }
boolean& operator=(bool b) {
val_ = b;
return *this;
}
private:
bool val_;
};
int NumThreads(const ClientConfig& config) { int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads(); int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing if (num_threads <= 0) { // Use dynamic sizing
@ -356,18 +235,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
} }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
std::vector<int> next_channel_; // per thread round-robin channel ctr
std::vector<boolean> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue?
std::mutex*
channel_lock_; // a vector, but avoid std::vector for old compilers
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
int channel_count_;
int pref_channel_inc_;
}; };
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@ -391,11 +259,11 @@ class AsyncUnaryClient GRPC_FINAL
const SimpleRequest& request, CompletionQueue* cq) { const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq); return stub->AsyncUnaryCall(ctx, request, cq);
}; };
static ClientRpcContext* SetupCtx(int channel_id, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
BenchmarkService::Stub* stub, std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncUnaryClient::StartReq, stub, req, next_issue, AsyncUnaryClient::StartReq,
AsyncUnaryClient::CheckDone); AsyncUnaryClient::CheckDone);
} }
}; };
@ -404,62 +272,94 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext { class ClientRpcContextStreamingImpl : public ClientRpcContext {
public: public:
ClientRpcContextStreamingImpl( ClientRpcContextStreamingImpl(
int channel_id, BenchmarkService::Stub* stub, const RequestType& req, BenchmarkService::Stub* stub, const RequestType& req,
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr< std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req, void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id), : context_(),
context_(),
stub_(stub), stub_(stub),
cq_(nullptr),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextStreamingImpl::ReqSent), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue),
start_req_(start_req), start_req_(start_req),
start_(Timer::Now()) {} start_(Timer::Now()) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
cq_ = cq;
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist); while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
if (!next_issue_) { // ready to issue
next_state_ = State::READY_TO_WRITE;
} else {
next_state_ = State::WAIT;
}
break; // loop around, don't return
case State::WAIT:
alarm_.reset(
new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
next_state_ = State::READY_TO_WRITE;
return true;
case State::READY_TO_WRITE:
if (!ok) {
return false;
}
start_ = Timer::Now();
next_state_ = State::WRITE_DONE;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
case State::WRITE_DONE:
if (!ok) {
return false;
}
next_state_ = State::READ_DONE;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
break;
case State::READ_DONE:
hist->Add((Timer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
default:
GPR_ASSERT(false);
return false;
}
}
} }
ClientRpcContext* StartNewClone() GRPC_OVERRIDE { ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
start_req_, callback_); start_req_, callback_);
} }
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
}
private: private:
bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
bool StartWrite(bool ok) {
if (!ok) {
return (false);
}
start_ = Timer::Now();
next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
}
bool WriteDone(bool ok, Histogram*) {
if (!ok) {
return (false);
}
next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
bool ReadDone(bool ok, Histogram* hist) {
hist->Add((Timer::Now() - start_) * 1e9);
return StartWrite(ok);
}
grpc::ClientContext context_; grpc::ClientContext context_;
BenchmarkService::Stub* stub_; BenchmarkService::Stub* stub_;
CompletionQueue* cq_;
std::unique_ptr<Alarm> alarm_;
RequestType req_; RequestType req_;
ResponseType response_; ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); enum State {
INVALID,
STREAM_IDLE,
WAIT,
READY_TO_WRITE,
WRITE_DONE,
READ_DONE
};
State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<gpr_timespec()> next_issue_;
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
@ -475,9 +375,6 @@ class AsyncStreamingClient GRPC_FINAL
public: public:
explicit AsyncStreamingClient(const ClientConfig& config) explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkStubCreator) { : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
StartThreads(num_async_threads_); StartThreads(num_async_threads_);
} }
@ -492,11 +389,11 @@ class AsyncStreamingClient GRPC_FINAL
auto stream = stub->AsyncStreamingCall(ctx, cq, tag); auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(int channel_id, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
BenchmarkService::Stub* stub, std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncStreamingClient::StartReq, stub, req, next_issue, AsyncStreamingClient::StartReq,
AsyncStreamingClient::CheckDone); AsyncStreamingClient::CheckDone);
} }
}; };
@ -504,64 +401,96 @@ class AsyncStreamingClient GRPC_FINAL
class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
public: public:
ClientRpcContextGenericStreamingImpl( ClientRpcContextGenericStreamingImpl(
int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, grpc::GenericStub* stub, const ByteBuffer& req,
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, grpc::GenericStub*, grpc::ClientContext*,
const grpc::string& method_name, CompletionQueue*, void*)> start_req, const grpc::string& method_name, CompletionQueue*, void*)> start_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done) std::function<void(grpc::Status, ByteBuffer*)> on_done)
: ClientRpcContext(channel_id), : context_(),
context_(),
stub_(stub), stub_(stub),
cq_(nullptr),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue),
start_req_(start_req), start_req_(start_req),
start_(Timer::Now()) {} start_(Timer::Now()) {}
~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
start_req_, callback_);
}
void Start(CompletionQueue* cq) GRPC_OVERRIDE { void Start(CompletionQueue* cq) GRPC_OVERRIDE {
cq_ = cq;
const grpc::string kMethodName( const grpc::string kMethodName(
"/grpc.testing.BenchmarkService/StreamingCall"); "/grpc.testing.BenchmarkService/StreamingCall");
stream_ = start_req_(stub_, &context_, kMethodName, cq, stream_ = start_req_(stub_, &context_, kMethodName, cq,
ClientRpcContext::tag(this)); ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
} }
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
private: while (true) {
bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } switch (next_state_) {
bool StartWrite(bool ok) { case State::STREAM_IDLE:
if (!ok) { if (!next_issue_) { // ready to issue
return (false); next_state_ = State::READY_TO_WRITE;
} } else {
start_ = Timer::Now(); next_state_ = State::WAIT;
next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; }
stream_->Write(req_, ClientRpcContext::tag(this)); break; // loop around, don't return
return true; case State::WAIT:
} alarm_.reset(
bool WriteDone(bool ok, Histogram*) { new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
if (!ok) { next_state_ = State::READY_TO_WRITE;
return (false); return true;
case State::READY_TO_WRITE:
if (!ok) {
return false;
}
start_ = Timer::Now();
next_state_ = State::WRITE_DONE;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
case State::WRITE_DONE:
if (!ok) {
return false;
}
next_state_ = State::READ_DONE;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
break;
case State::READ_DONE:
hist->Add((Timer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
default:
GPR_ASSERT(false);
return false;
}
} }
next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
} }
bool ReadDone(bool ok, Histogram* hist) { ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
hist->Add((Timer::Now() - start_) * 1e9); return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
return StartWrite(ok); start_req_, callback_);
} }
private:
grpc::ClientContext context_; grpc::ClientContext context_;
grpc::GenericStub* stub_; grpc::GenericStub* stub_;
CompletionQueue* cq_;
std::unique_ptr<Alarm> alarm_;
ByteBuffer req_; ByteBuffer req_;
ByteBuffer response_; ByteBuffer response_;
bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); enum State {
INVALID,
STREAM_IDLE,
WAIT,
READY_TO_WRITE,
WRITE_DONE,
READ_DONE
};
State next_state_;
std::function<void(grpc::Status, ByteBuffer*)> callback_; std::function<void(grpc::Status, ByteBuffer*)> callback_;
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
CompletionQueue*, void*)> start_req_; CompletionQueue*, void*)> start_req_;
@ -580,9 +509,6 @@ class GenericAsyncStreamingClient GRPC_FINAL
public: public:
explicit GenericAsyncStreamingClient(const ClientConfig& config) explicit GenericAsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, GenericStubCreator) { : AsyncClient(config, SetupCtx, GenericStubCreator) {
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
StartThreads(num_async_threads_); StartThreads(num_async_threads_);
} }
@ -596,10 +522,11 @@ class GenericAsyncStreamingClient GRPC_FINAL
auto stream = stub->Call(ctx, method_name, cq, tag); auto stream = stub->Call(ctx, method_name, cq, tag);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
std::function<gpr_timespec()> next_issue,
const ByteBuffer& req) { const ByteBuffer& req) {
return new ClientRpcContextGenericStreamingImpl( return new ClientRpcContextGenericStreamingImpl(
channel_id, stub, req, GenericAsyncStreamingClient::StartReq, stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
GenericAsyncStreamingClient::CheckDone); GenericAsyncStreamingClient::CheckDone);
} }
}; };

@ -84,11 +84,8 @@ class SynchronousClient
protected: protected:
void WaitToIssue(int thread_idx) { void WaitToIssue(int thread_idx) {
grpc_time next_time; if (!closed_loop_) {
if (NextIssueTime(thread_idx, &next_time)) { gpr_sleep_until(NextIssueTime(thread_idx));
gpr_timespec next_timespec;
TimepointHR2Timespec(next_time, &next_timespec);
gpr_sleep_until(next_timespec);
} }
} }

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -51,15 +51,15 @@ namespace testing {
// stacks. Thus, this code only uses a uniform distribution of doubles [0,1) // stacks. Thus, this code only uses a uniform distribution of doubles [0,1)
// and then provides the distribution functions itself. // and then provides the distribution functions itself.
class RandomDist { class RandomDistInterface {
public: public:
RandomDist() {} RandomDistInterface() {}
virtual ~RandomDist() = 0; virtual ~RandomDistInterface() = 0;
// Argument to operator() is a uniform double in the range [0,1) // Argument to transform is a uniform double in the range [0,1)
virtual double operator()(double uni) const = 0; virtual double transform(double uni) const = 0;
}; };
inline RandomDist::~RandomDist() {} inline RandomDistInterface::~RandomDistInterface() {}
// ExpDist implements an exponential distribution, which is the // ExpDist implements an exponential distribution, which is the
// interarrival distribution for a Poisson process. The parameter // interarrival distribution for a Poisson process. The parameter
@ -69,11 +69,11 @@ inline RandomDist::~RandomDist() {}
// independent identical stationary sources. For more information, // independent identical stationary sources. For more information,
// see http://en.wikipedia.org/wiki/Exponential_distribution // see http://en.wikipedia.org/wiki/Exponential_distribution
class ExpDist GRPC_FINAL : public RandomDist { class ExpDist GRPC_FINAL : public RandomDistInterface {
public: public:
explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {} explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {}
~ExpDist() GRPC_OVERRIDE {} ~ExpDist() GRPC_OVERRIDE {}
double operator()(double uni) const GRPC_OVERRIDE { double transform(double uni) const GRPC_OVERRIDE {
// Note: Use 1.0-uni above to avoid NaN if uni is 0 // Note: Use 1.0-uni above to avoid NaN if uni is 0
return lambda_recip_ * (-log(1.0 - uni)); return lambda_recip_ * (-log(1.0 - uni));
} }
@ -87,11 +87,11 @@ class ExpDist GRPC_FINAL : public RandomDist {
// mean interarrival time is (lo+hi)/2. For more information, // mean interarrival time is (lo+hi)/2. For more information,
// see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29 // see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29
class UniformDist GRPC_FINAL : public RandomDist { class UniformDist GRPC_FINAL : public RandomDistInterface {
public: public:
UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {} UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {}
~UniformDist() GRPC_OVERRIDE {} ~UniformDist() GRPC_OVERRIDE {}
double operator()(double uni) const GRPC_OVERRIDE { double transform(double uni) const GRPC_OVERRIDE {
return uni * range_ + lo_; return uni * range_ + lo_;
} }
@ -106,11 +106,11 @@ class UniformDist GRPC_FINAL : public RandomDist {
// clients) will not preserve any deterministic interarrival gap across // clients) will not preserve any deterministic interarrival gap across
// requests. // requests.
class DetDist GRPC_FINAL : public RandomDist { class DetDist GRPC_FINAL : public RandomDistInterface {
public: public:
explicit DetDist(double val) : val_(val) {} explicit DetDist(double val) : val_(val) {}
~DetDist() GRPC_OVERRIDE {} ~DetDist() GRPC_OVERRIDE {}
double operator()(double uni) const GRPC_OVERRIDE { return val_; } double transform(double uni) const GRPC_OVERRIDE { return val_; }
private: private:
double val_; double val_;
@ -123,12 +123,12 @@ class DetDist GRPC_FINAL : public RandomDist {
// good representation of the response times of data center jobs. See // good representation of the response times of data center jobs. See
// http://en.wikipedia.org/wiki/Pareto_distribution // http://en.wikipedia.org/wiki/Pareto_distribution
class ParetoDist GRPC_FINAL : public RandomDist { class ParetoDist GRPC_FINAL : public RandomDistInterface {
public: public:
ParetoDist(double base, double alpha) ParetoDist(double base, double alpha)
: base_(base), alpha_recip_(1.0 / alpha) {} : base_(base), alpha_recip_(1.0 / alpha) {}
~ParetoDist() GRPC_OVERRIDE {} ~ParetoDist() GRPC_OVERRIDE {}
double operator()(double uni) const GRPC_OVERRIDE { double transform(double uni) const GRPC_OVERRIDE {
// Note: Use 1.0-uni above to avoid div by zero if uni is 0 // Note: Use 1.0-uni above to avoid div by zero if uni is 0
return base_ / pow(1.0 - uni, alpha_recip_); return base_ / pow(1.0 - uni, alpha_recip_);
} }
@ -145,13 +145,14 @@ class ParetoDist GRPC_FINAL : public RandomDist {
class InterarrivalTimer { class InterarrivalTimer {
public: public:
InterarrivalTimer() {} InterarrivalTimer() {}
void init(const RandomDist& r, int threads, int entries = 1000000) { void init(const RandomDistInterface& r, int threads, int entries = 1000000) {
for (int i = 0; i < entries; i++) { for (int i = 0; i < entries; i++) {
// rand is the only choice that is portable across POSIX and Windows // rand is the only choice that is portable across POSIX and Windows
// and that supports new and old compilers // and that supports new and old compilers
const double uniform_0_1 = rand() / RAND_MAX; const double uniform_0_1 =
static_cast<double>(rand()) / static_cast<double>(RAND_MAX);
random_table_.push_back( random_table_.push_back(
std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1)))); static_cast<int64_t>(1e9 * r.transform(uniform_0_1)));
} }
// Now set up the thread positions // Now set up the thread positions
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
@ -160,7 +161,7 @@ class InterarrivalTimer {
} }
virtual ~InterarrivalTimer(){}; virtual ~InterarrivalTimer(){};
std::chrono::nanoseconds operator()(int thread_num) { int64_t next(int thread_num) {
auto ret = *(thread_posns_[thread_num]++); auto ret = *(thread_posns_[thread_num]++);
if (thread_posns_[thread_num] == random_table_.end()) if (thread_posns_[thread_num] == random_table_.end())
thread_posns_[thread_num] = random_table_.begin(); thread_posns_[thread_num] = random_table_.begin();
@ -168,7 +169,7 @@ class InterarrivalTimer {
} }
private: private:
typedef std::vector<std::chrono::nanoseconds> time_table; typedef std::vector<int64_t> time_table;
std::vector<time_table::const_iterator> thread_posns_; std::vector<time_table::const_iterator> thread_posns_;
time_table random_table_; time_table random_table_;
}; };

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -39,17 +39,17 @@
#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/interarrival.h"
using grpc::testing::RandomDist; using grpc::testing::RandomDistInterface;
using grpc::testing::InterarrivalTimer; using grpc::testing::InterarrivalTimer;
static void RunTest(RandomDist &&r, int threads, std::string title) { static void RunTest(RandomDistInterface &&r, int threads, std::string title) {
InterarrivalTimer timer; InterarrivalTimer timer;
timer.init(r, threads); timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); gpr_histogram *h(gpr_histogram_create(0.01, 60e9));
for (int i = 0; i < 10000000; i++) { for (int i = 0; i < 10000000; i++) {
for (int j = 0; j < threads; j++) { for (int j = 0; j < threads; j++) {
gpr_histogram_add(h, timer(j).count()); gpr_histogram_add(h, timer.next(j));
} }
} }
@ -70,7 +70,7 @@ using grpc::testing::ParetoDist;
int main(int argc, char **argv) { int main(int argc, char **argv) {
RunTest(ExpDist(10.0), 5, std::string("Exponential(10)")); RunTest(ExpDist(10.0), 5, std::string("Exponential(10)"));
RunTest(DetDist(5.0), 5, std::string("Det(5)")); RunTest(DetDist(5.0), 5, std::string("Det(5)"));
RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)")); RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(0,10)"));
RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)")); RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)"));
return 0; return 0;
} }

@ -53,7 +53,7 @@ static void RunQPS() {
client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8); client_config.set_client_channels(8);
client_config.set_async_client_threads(8); client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY); client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load( client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0); 1000.0);

@ -53,7 +53,7 @@ static void RunQPS() {
client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8); client_config.set_client_channels(8);
client_config.set_async_client_threads(8); client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY); client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop(); client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config; ServerConfig server_config;

@ -2077,6 +2077,24 @@
"windows" "windows"
] ]
}, },
{
"args": [],
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
"language": "c++",
"name": "qps_openloop_test",
"platforms": [
"linux",
"mac",
"posix"
]
},
{ {
"args": [], "args": [],
"ci_platforms": [ "ci_platforms": [

Loading…
Cancel
Save