From cf3fb092cc595066e70f780a6ffc5c79f4531148 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 5 Jun 2015 03:41:30 -0700 Subject: [PATCH] clang-format --- include/grpc++/config.h | 22 +- include/grpc++/time.h | 18 +- src/cpp/util/time.cc | 2 +- test/cpp/qps/client.h | 49 ++--- test/cpp/qps/client_async.cc | 287 +++++++++++++------------- test/cpp/qps/interarrival.h | 90 ++++---- test/cpp/qps/qps_interarrival_test.cc | 20 +- test/cpp/qps/qpstest.proto | 19 +- 8 files changed, 260 insertions(+), 247 deletions(-) diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 8d674efef85..ca74064be2d 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -99,24 +99,28 @@ ::google::protobuf::io::ZeroCopyOutputStream #define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ ::google::protobuf::io::ZeroCopyInputStream -#define GRPC_CUSTOM_CODEDINPUTSTREAM \ - ::google::protobuf::io::CodedInputStream +#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream #endif - #ifdef GRPC_CXX0X_NO_NULLPTR #include const class { -public: - template operator T*() const {return static_cast(0);} - template operator std::unique_ptr() const { + public: + template + operator T *() const { + return static_cast(0); + } + template + operator std::unique_ptr() const { return std::unique_ptr(static_cast(0)); } - template operator std::shared_ptr() const { + template + operator std::shared_ptr() const { return std::shared_ptr(static_cast(0)); } - operator bool() const {return false;} -private: + operator bool() const { return false; } + + private: void operator&() const = delete; } nullptr = {}; #endif diff --git a/include/grpc++/time.h b/include/grpc++/time.h index da22bca47ac..8fb2f8505cd 100644 --- a/include/grpc++/time.h +++ b/include/grpc++/time.h @@ -52,22 +52,22 @@ namespace grpc { template class TimePoint { public: - TimePoint(const T& time) { - you_need_a_specialization_of_TimePoint(); - } + TimePoint(const T& time) { you_need_a_specialization_of_TimePoint(); } gpr_timespec raw_time() { gpr_timespec t; return t; } + private: void you_need_a_specialization_of_TimePoint(); }; -template<> +template <> class TimePoint { public: - TimePoint(const gpr_timespec& time) : time_(time) { } + TimePoint(const gpr_timespec& time) : time_(time) {} gpr_timespec raw_time() { return time_; } + private: gpr_timespec time_; }; @@ -85,8 +85,9 @@ namespace grpc { // from and to should be absolute time. void Timepoint2Timespec(const std::chrono::system_clock::time_point& from, gpr_timespec* to); -void TimepointHR2Timespec(const std::chrono::high_resolution_clock::time_point& from, - gpr_timespec* to); +void TimepointHR2Timespec( + const std::chrono::high_resolution_clock::time_point& from, + gpr_timespec* to); std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t); @@ -94,9 +95,10 @@ template <> class TimePoint { public: TimePoint(const std::chrono::system_clock::time_point& time) { - Timepoint2Timespec(time, &time_); + Timepoint2Timespec(time, &time_); } gpr_timespec raw_time() const { return time_; } + private: gpr_timespec time_; }; diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc index beb58ef3f79..fd94d00b329 100644 --- a/src/cpp/util/time.cc +++ b/src/cpp/util/time.cc @@ -61,7 +61,7 @@ void Timepoint2Timespec(const system_clock::time_point& from, } void TimepointHR2Timespec(const high_resolution_clock::time_point& from, - gpr_timespec* to) { + gpr_timespec* to) { high_resolution_clock::duration deadline = from.time_since_epoch(); seconds secs = duration_cast(deadline); if (from == high_resolution_clock::time_point::max() || diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 7d75ed7d742..309e60d9703 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -50,9 +50,10 @@ template <> class TimePoint { public: TimePoint(const std::chrono::high_resolution_clock::time_point& time) { - TimepointHR2Timespec(time, &time_); + TimepointHR2Timespec(time, &time_); } gpr_timespec raw_time() const { return time_; } + private: gpr_timespec time_; }; @@ -65,8 +66,8 @@ typedef std::chrono::time_point grpc_time; class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer), - interarrival_timer_() { + explicit Client(const ClientConfig& config) + : timer_(new Timer), interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -131,30 +132,29 @@ class Client { // Set up the load distribution based on the number of threads if (config.load_type() == CLOSED_LOOP) { closed_loop_ = true; - } - else { + } else { closed_loop_ = false; std::unique_ptr random_dist; auto& load = config.load_params(); switch (config.load_type()) { case POISSON: - random_dist.reset - (new ExpDist(load.poisson().offered_load()/num_threads)); + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); break; case UNIFORM: - random_dist.reset - (new UniformDist(load.uniform().interarrival_lo()*num_threads, - load.uniform().interarrival_hi()*num_threads)); + random_dist.reset( + new UniformDist(load.uniform().interarrival_lo() * num_threads, + load.uniform().interarrival_hi() * num_threads)); break; case DETERMINISTIC: - random_dist.reset - (new DetDist(num_threads/load.determ().offered_load())); + random_dist.reset( + new DetDist(num_threads / load.determ().offered_load())); break; case PARETO: - random_dist.reset - (new ParetoDist(load.pareto().interarrival_base()*num_threads, - load.pareto().alpha())); + random_dist.reset( + new ParetoDist(load.pareto().interarrival_base() * num_threads, + load.pareto().alpha())); break; default: GPR_ASSERT(false); @@ -162,23 +162,26 @@ class Client { } interarrival_timer_.init(*random_dist, num_threads); - for (size_t i = 0; i(interarrival_timer_(i))); + for (size_t i = 0; i < num_threads; i++) { + next_time_.push_back( + grpc_time_source::now() + + std::chrono::duration_cast( + interarrival_timer_(i))); } } } - bool NextIssueTime(int thread_idx, grpc_time *time_delay) { + bool NextIssueTime(int thread_idx, grpc_time* time_delay) { if (closed_loop_) { return false; - } - else { + } else { *time_delay = next_time_[thread_idx]; - next_time_[thread_idx] += std::chrono::duration_cast(interarrival_timer_(thread_idx)); + next_time_[thread_idx] += + std::chrono::duration_cast( + interarrival_timer_(thread_idx)); return true; } } - + private: class Thread { public: diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 24d8b3751d4..b0752a97925 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -62,7 +62,7 @@ typedef std::list deadline_list; class ClientRpcContext { public: - ClientRpcContext(int ch): channel_id_(ch) {} + ClientRpcContext(int ch) : channel_id_(ch) {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; @@ -72,12 +72,16 @@ class ClientRpcContext { return reinterpret_cast(t); } - deadline_list::iterator deadline_posn() const {return deadline_posn_;} - void set_deadline_posn(const deadline_list::iterator& it) {deadline_posn_ = it;} - virtual void Start(CompletionQueue *cq) = 0; - int channel_id() const {return channel_id_;} + deadline_list::iterator deadline_posn() const { return deadline_posn_; } + void set_deadline_posn(const deadline_list::iterator& it) { + deadline_posn_ = it; + } + virtual void Start(CompletionQueue* cq) = 0; + int channel_id() const { return channel_id_; } + protected: int channel_id_; + private: deadline_list::iterator deadline_posn_; }; @@ -85,23 +89,22 @@ class ClientRpcContext { template class ClientRpcContextUnaryImpl : public ClientRpcContext { public: - ClientRpcContextUnaryImpl(int channel_id, - TestService::Stub* stub, const RequestType& req, + ClientRpcContextUnaryImpl( + int channel_id, TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr>( TestService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> - start_req, + CompletionQueue*)> start_req, std::function on_done) - : ClientRpcContext(channel_id), context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req) { - } - void Start(CompletionQueue *cq) GRPC_OVERRIDE { + start_req_(start_req) {} + void Start(CompletionQueue* cq) GRPC_OVERRIDE { start_ = Timer::Now(); response_reader_ = start_req_(stub_, &context_, req_, cq); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); @@ -116,8 +119,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextUnaryImpl(channel_id_, - stub_, req_, start_req_, callback_); + return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, + callback_); } private: @@ -125,9 +128,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; return false; } - bool DoCallBack (bool) { + bool DoCallBack(bool) { callback_(status_, &response_); - return true; // we're done, this'll be ignored + return true; // we're done, this'll be ignored } grpc::ClientContext context_; TestService::Stub* stub_; @@ -136,27 +139,28 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function callback_; std::function>( - TestService::Stub*, grpc::ClientContext*, - const RequestType&, CompletionQueue *)> start_req_; + TestService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*)> start_req_; grpc::Status status_; double start_; std::unique_ptr> response_reader_; }; -typedef std::forward_list context_list; +typedef std::forward_list context_list; class AsyncClient : public Client { public: - explicit AsyncClient(const ClientConfig& config, - std::function setup_ctx) : - Client(config), channel_lock_(config.client_channels()), - contexts_(config.client_channels()), - max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), - channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - + explicit AsyncClient( + const ClientConfig& config, + std::function setup_ctx) + : Client(config), + channel_lock_(config.client_channels()), + contexts_(config.client_channels()), + max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), + channel_count_(config.client_channels()), + pref_channel_inc_(config.async_client_threads()) { SetupLoadTest(config, config.async_client_threads()); for (int i = 0; i < config.async_client_threads(); i++) { @@ -181,8 +185,7 @@ class AsyncClient : public Client { auto ctx = setup_ctx(ch, channel.get_stub(), request_); if (closed_loop_) { ctx->Start(cq); - } - else { + } else { contexts_[ch].push_front(ctx); } } @@ -210,24 +213,24 @@ class AsyncClient : public Client { } else { if (rpc_deadlines_[thread_idx].empty()) { deadline = grpc_time_source::now() + std::chrono::seconds(1); - } - else { + } else { deadline = *(rpc_deadlines_[thread_idx].begin()); } - short_deadline = issue_allowed_[thread_idx] ? - next_issue_[thread_idx] : deadline; + short_deadline = + issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; } bool got_event; switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { - case CompletionQueue::SHUTDOWN: return false; + case CompletionQueue::SHUTDOWN: + return false; case CompletionQueue::TIMEOUT: - got_event = false; - break; + got_event = false; + break; case CompletionQueue::GOT_EVENT: - got_event = true; - break; + got_event = true; + break; default: GPR_ASSERT(false); break; @@ -239,89 +242,87 @@ class AsyncClient : public Client { return false; } if (got_event) { - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then clone the ctx - ctx->RunNextState(ok, histogram); - ClientRpcContext *clone_ctx = ctx->StartNewClone(); - if (closed_loop_) { - clone_ctx->Start(cli_cqs_[thread_idx].get()); - } - else { - // Remove the entry from the rpc deadlines list - rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); - // Put the clone_ctx in the list of idle contexts for this channel - // Under lock - int ch = clone_ctx->channel_id(); - std::lock_guard g(channel_lock_[ch]); - contexts_[ch].push_front(clone_ctx); - } - // delete the old version - delete ctx; - } - if (!closed_loop_) - issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been - } - if (!closed_loop_ && issue_allowed_[thread_idx] && - grpc_time_source::now() >= next_issue_[thread_idx]) { - // Attempt to issue - bool issued = false; - for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; - num_attempts < channel_count_ && !issued; num_attempts++) { - bool can_issue = false; - ClientRpcContext* ctx = nullptr; - { - std::lock_guard - g(channel_lock_[channel_attempt]); - if (!contexts_[channel_attempt].empty()) { - // Get an idle context from the front of the list - ctx = *(contexts_[channel_attempt].begin()); - contexts_[channel_attempt].pop_front(); - can_issue = true; - } - } - if (can_issue) { - // do the work to issue - rpc_deadlines_[thread_idx].emplace_back( - grpc_time_source::now() + std::chrono::seconds(1)); - auto it = rpc_deadlines_[thread_idx].end(); - --it; - ctx->set_deadline_posn(it); - ctx->Start(cli_cqs_[thread_idx].get()); - issued = true; - // If we did issue, then next time, try our thread's next - // preferred channel - next_channel_[thread_idx] += pref_channel_inc_; - if (next_channel_[thread_idx] >= channel_count_) - next_channel_[thread_idx] = (thread_idx % channel_count_); - } else { - // Do a modular increment of channel attempt if we couldn't issue - channel_attempt = (channel_attempt+1) % channel_count_; - } - } - if (issued) { - // We issued one; see when we can issue the next - grpc_time next_issue; - NextIssueTime(thread_idx, &next_issue); - next_issue_[thread_idx]=next_issue; - } - else { - issue_allowed_[thread_idx] = false; - } - } - return true; + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then clone the ctx + ctx->RunNextState(ok, histogram); + ClientRpcContext* clone_ctx = ctx->StartNewClone(); + if (closed_loop_) { + clone_ctx->Start(cli_cqs_[thread_idx].get()); + } else { + // Remove the entry from the rpc deadlines list + rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); + // Put the clone_ctx in the list of idle contexts for this channel + // Under lock + int ch = clone_ctx->channel_id(); + std::lock_guard g(channel_lock_[ch]); + contexts_[ch].push_front(clone_ctx); + } + // delete the old version + delete ctx; + } + if (!closed_loop_) + issue_allowed_[thread_idx] = + true; // may be ok now even if it hadn't been + } + if (!closed_loop_ && issue_allowed_[thread_idx] && + grpc_time_source::now() >= next_issue_[thread_idx]) { + // Attempt to issue + bool issued = false; + for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; + num_attempts < channel_count_ && !issued; num_attempts++) { + bool can_issue = false; + ClientRpcContext* ctx = nullptr; + { + std::lock_guard g(channel_lock_[channel_attempt]); + if (!contexts_[channel_attempt].empty()) { + // Get an idle context from the front of the list + ctx = *(contexts_[channel_attempt].begin()); + contexts_[channel_attempt].pop_front(); + can_issue = true; + } + } + if (can_issue) { + // do the work to issue + rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + + std::chrono::seconds(1)); + auto it = rpc_deadlines_[thread_idx].end(); + --it; + ctx->set_deadline_posn(it); + ctx->Start(cli_cqs_[thread_idx].get()); + issued = true; + // If we did issue, then next time, try our thread's next + // preferred channel + next_channel_[thread_idx] += pref_channel_inc_; + if (next_channel_[thread_idx] >= channel_count_) + next_channel_[thread_idx] = (thread_idx % channel_count_); + } else { + // Do a modular increment of channel attempt if we couldn't issue + channel_attempt = (channel_attempt + 1) % channel_count_; + } + } + if (issued) { + // We issued one; see when we can issue the next + grpc_time next_issue; + NextIssueTime(thread_idx, &next_issue); + next_issue_[thread_idx] = next_issue; + } else { + issue_allowed_[thread_idx] = false; + } + } + return true; } private: std::vector> cli_cqs_; - std::vector rpc_deadlines_; // per thread deadlines - std::vector next_channel_; // per thread round-robin channel ctr - std::vector issue_allowed_; // may this thread attempt to issue - std::vector next_issue_; // when should it issue? + std::vector rpc_deadlines_; // per thread deadlines + std::vector next_channel_; // per thread round-robin channel ctr + std::vector issue_allowed_; // may this thread attempt to issue + std::vector next_issue_; // when should it issue? std::vector channel_lock_; - std::vector contexts_; // per-channel list of idle contexts + std::vector contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; int pref_channel_inc_; @@ -334,34 +335,31 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } -private: - static ClientRpcContext *SetupCtx(int channel_id, - TestService::Stub* stub, - const SimpleRequest& req) { + + private: + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; - return new ClientRpcContextUnaryImpl(channel_id, stub, req, - start_req, check_done); + return new ClientRpcContextUnaryImpl( + channel_id, stub, req, start_req, check_done); } - }; template class ClientRpcContextStreamingImpl : public ClientRpcContext { public: - ClientRpcContextStreamingImpl(int channel_id, - TestService::Stub* stub, const RequestType& req, - std::function>( - TestService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + ClientRpcContextStreamingImpl( + int channel_id, TestService::Stub* stub, const RequestType& req, + std::function>(TestService::Stub*, grpc::ClientContext*, + CompletionQueue*, void*)> start_req, std::function on_done) : ClientRpcContext(channel_id), - context_(), + context_(), stub_(stub), req_(req), response_(), @@ -374,12 +372,13 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return (this->*next_state_)(ok, hist); } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextStreamingImpl(channel_id_, - stub_, req_, start_req_, callback_); + return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); } - void Start(CompletionQueue *cq) GRPC_OVERRIDE { + void Start(CompletionQueue* cq) GRPC_OVERRIDE { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); } + private: bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } bool StartWrite(bool ok) { @@ -411,8 +410,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function callback_; std::function< std::unique_ptr>( - TestService::Stub*, grpc::ClientContext*, - CompletionQueue *, void*)> start_req_; + TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -430,20 +429,18 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } -private: - static ClientRpcContext *SetupCtx(int channel_id, - TestService::Stub* stub, - const SimpleRequest& req) { + + private: + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue *cq, void* tag) { + CompletionQueue* cq, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - return new ClientRpcContextStreamingImpl(channel_id, stub, - req, start_req, - check_done); + return new ClientRpcContextStreamingImpl( + channel_id, stub, req, start_req, check_done); } }; diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h index 98f4def1f2a..d8accc1d8ab 100644 --- a/test/cpp/qps/interarrival.h +++ b/test/cpp/qps/interarrival.h @@ -55,53 +55,60 @@ class RandomDist { RandomDist() {} virtual ~RandomDist() = 0; // Argument to operator() is a uniform double in the range [0,1) - virtual double operator() (double uni) const = 0; + virtual double operator()(double uni) const = 0; }; inline RandomDist::~RandomDist() {} -class UniformDist GRPC_FINAL: public RandomDist { -public: - UniformDist(double lo, double hi): lo_(lo), range_(hi-lo) {} - ~UniformDist() GRPC_OVERRIDE {} - double operator() (double uni) const GRPC_OVERRIDE {return uni*range_+lo_;} -private: +class UniformDist GRPC_FINAL : public RandomDist { + public: + UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {} + ~UniformDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { + return uni * range_ + lo_; + } + + private: double lo_; double range_; }; class ExpDist GRPC_FINAL : public RandomDist { -public: - explicit ExpDist(double lambda): lambda_recip_(1.0/lambda) {} - ~ExpDist() GRPC_OVERRIDE {} - double operator() (double uni) const GRPC_OVERRIDE { - // Note: Use 1.0-uni above to avoid NaN if uni is 0 - return lambda_recip_ * (-log(1.0-uni)); - } -private: + public: + explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {} + ~ExpDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid NaN if uni is 0 + return lambda_recip_ * (-log(1.0 - uni)); + } + + private: double lambda_recip_; }; class DetDist GRPC_FINAL : public RandomDist { -public: - explicit DetDist(double val): val_(val) {} - ~DetDist() GRPC_OVERRIDE {} - double operator() (double uni) const GRPC_OVERRIDE {return val_;} -private: + public: + explicit DetDist(double val) : val_(val) {} + ~DetDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { return val_; } + + private: double val_; }; class ParetoDist GRPC_FINAL : public RandomDist { -public: - ParetoDist(double base, double alpha): base_(base), alpha_recip_(1.0/alpha) {} + public: + ParetoDist(double base, double alpha) + : base_(base), alpha_recip_(1.0 / alpha) {} ~ParetoDist() GRPC_OVERRIDE {} - double operator() (double uni) const GRPC_OVERRIDE { - // Note: Use 1.0-uni above to avoid div by zero if uni is 0 - return base_ / pow(1.0-uni, alpha_recip_); - } -private: - double base_; - double alpha_recip_; + double operator()(double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid div by zero if uni is 0 + return base_ / pow(1.0 - uni, alpha_recip_); + } + + private: + double base_; + double alpha_recip_; }; // A class library for generating pseudo-random interarrival times @@ -111,38 +118,37 @@ private: using qps_random_engine = std::default_random_engine; class InterarrivalTimer { -public: + public: InterarrivalTimer() {} - InterarrivalTimer(const RandomDist& r, int threads, int entries=1000000) { + InterarrivalTimer(const RandomDist& r, int threads, int entries = 1000000) { init(r, threads, entries); } - void init(const RandomDist& r, int threads, int entries=1000000) { + void init(const RandomDist& r, int threads, int entries = 1000000) { qps_random_engine gen; - std::uniform_real_distribution uniform(0.0,1.0); - for (int i=0; i(1e9*r(uniform(gen))))); + std::uniform_real_distribution uniform(0.0, 1.0); + for (int i = 0; i < entries; i++) { + random_table_.push_back(std::chrono::nanoseconds( + static_cast(1e9 * r(uniform(gen))))); } // Now set up the thread positions - for (int i=0; i time_table; std::vector thread_posns_; time_table random_table_; }; - } } diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc index 14af4c65067..95fe1b8b0d6 100644 --- a/test/cpp/qps/qps_interarrival_test.cc +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -42,21 +42,21 @@ using grpc::testing::ExpDist; using grpc::testing::InterarrivalTimer; -void RunTest(InterarrivalTimer&& timer, std::string title) { - gpr_histogram *h(gpr_histogram_create(0.01,60e9)); - - for (int i=0; i<10000000; i++) { - for (int j=0; j<5; j++) { +void RunTest(InterarrivalTimer &&timer, std::string title) { + gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); + + for (int i = 0; i < 10000000; i++) { + for (int j = 0; j < 5; j++) { gpr_histogram_add(h, timer(j).count()); } } - - std::cout << title << " Distribution" << std::endl; + + std::cout << title << " Distribution" << std::endl; std::cout << "Value, Percentile" << std::endl; for (double pct = 0.0; pct < 100.0; pct += 1.0) { std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; } - + gpr_histogram_destroy(h); } @@ -68,9 +68,9 @@ using grpc::testing::ParetoDist; int main(int argc, char **argv) { RunTest(InterarrivalTimer(ExpDist(10.0), 5), std::string("Exponential(10)")); RunTest(InterarrivalTimer(DetDist(5.0), 5), std::string("Det(5)")); - RunTest(InterarrivalTimer(UniformDist(0.0,10.0), 5), + RunTest(InterarrivalTimer(UniformDist(0.0, 10.0), 5), std::string("Uniform(1,10)")); - RunTest(InterarrivalTimer(ParetoDist(1.0,1.0), 5), + RunTest(InterarrivalTimer(ParetoDist(1.0, 1.0), 5), std::string("Pareto(1,1)")); return 0; diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index b8661f3f4fa..d977c9b48b0 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -36,7 +36,7 @@ package grpc.testing; enum PayloadType { // Compressable text format. - COMPRESSABLE= 1; + COMPRESSABLE = 1; // Uncompressable binary format. UNCOMPRESSABLE = 2; @@ -130,20 +130,21 @@ message LoadParams { message ClientConfig { repeated string server_targets = 1; required ClientType client_type = 2; - optional bool enable_ssl = 3 [default=false]; + optional bool enable_ssl = 3 [default = false]; required int32 outstanding_rpcs_per_channel = 4; required int32 client_channels = 5; required int32 payload_size = 6; // only for async client: optional int32 async_client_threads = 7; - optional RpcType rpc_type = 8 [default=UNARY]; + optional RpcType rpc_type = 8 [default = UNARY]; optional string host = 9; - optional LoadType load_type = 10 [default=CLOSED_LOOP]; + optional LoadType load_type = 10 [default = CLOSED_LOOP]; optional LoadParams load_params = 11; } // Request current stats -message Mark {} +message Mark { +} message ClientArgs { oneof argtype { @@ -165,8 +166,8 @@ message ClientStatus { message ServerConfig { required ServerType server_type = 1; - optional int32 threads = 2 [default=1]; - optional bool enable_ssl = 3 [default=false]; + optional int32 threads = 2 [default = 1]; + optional bool enable_ssl = 3 [default = false]; optional string host = 4; } @@ -185,11 +186,11 @@ message ServerStatus { message SimpleRequest { // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. - optional PayloadType response_type = 1 [default=COMPRESSABLE]; + optional PayloadType response_type = 1 [default = COMPRESSABLE]; // Desired payload size in the response from the server. // If response_type is COMPRESSABLE, this denotes the size before compression. - optional int32 response_size = 2 [default=0]; + optional int32 response_size = 2 [default = 0]; // Optional input payload sent along with the request. optional Payload payload = 3;