diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 28cd32a1974..1c4f46328f9 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -41,6 +41,7 @@ #include #include +#include namespace grpc { @@ -67,10 +68,12 @@ typedef std::chrono::time_point grpc_time; class Client { public: explicit Client(const ClientConfig& config) - : timer_(new Timer), interarrival_timer_() { + : channels_(config.client_channels()), + timer_(new Timer), + interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { - channels_.push_back(ClientChannelInfo( - config.server_targets(i % config.server_targets_size()), config)); + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config); } request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); request_.set_response_size(config.payload_size()); @@ -79,7 +82,8 @@ class Client { ClientStats Mark() { Histogram latencies; - std::vector to_merge(threads_.size()); + // avoid std::vector for old compilers that expect a copy constructor + Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { threads_[i]->BeginSwap(&to_merge[i]); } @@ -89,6 +93,7 @@ class Client { threads_[i]->EndSwap(); latencies.Merge(&to_merge[i]); } + delete[] to_merge; auto timer_result = timer->Mark(); @@ -106,9 +111,20 @@ class Client { class ClientChannelInfo { public: - ClientChannelInfo(const grpc::string& target, const ClientConfig& config) - : channel_(CreateTestChannel(target, config.enable_ssl())), - stub_(TestService::NewStub(channel_)) {} + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel(target, config.enable_ssl()); + stub_ = TestService::NewStub(channel_); + } ChannelInterface* get_channel() { return channel_.get(); } TestService::Stub* get_stub() { return stub_.get(); } @@ -189,27 +205,9 @@ class Client { Thread(Client* client, size_t idx) : done_(false), new_(nullptr), - impl_([this, idx, client]() { - for (;;) { - // run the loop body - bool thread_still_ok = client->ThreadFunc(&histogram_, idx); - // lock, see if we're done - std::lock_guard g(mu_); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - done_ = true; - } - if (done_) { - return; - } - // check if we're marking, swap out the histogram if so - if (new_) { - new_->Swap(&histogram_); - new_ = nullptr; - cv_.notify_one(); - } - } - }) {} + client_(client), + idx_(idx), + impl_(&Thread::ThreadFunc, this) {} ~Thread() { { @@ -226,13 +224,37 @@ class Client { void EndSwap() { std::unique_lock g(mu_); - cv_.wait(g, [this]() { return new_ == nullptr; }); + while (new_ != nullptr) { + cv_.wait(g); + }; } private: Thread(const Thread&); Thread& operator=(const Thread&); + void ThreadFunc() { + for (;;) { + // run the loop body + const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); + // lock, see if we're done + std::lock_guard g(mu_); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + done_ = true; + } + if (done_) { + return; + } + // check if we're marking, swap out the histogram if so + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } + } + } + TestService::Stub* stub_; ClientConfig config_; std::mutex mu_; @@ -240,6 +262,8 @@ class Client { bool done_; Histogram* new_; Histogram histogram_; + Client* client_; + size_t idx_; std::thread impl_; }; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index e1e44f9ac0d..a337610cbf7 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -156,7 +156,7 @@ class AsyncClient : public Client { std::function setup_ctx) : Client(config), - channel_lock_(config.client_channels()), + channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), @@ -208,6 +208,7 @@ class AsyncClient : public Client { delete ctx; } } + delete[] channel_lock_; } bool ThreadFunc(Histogram* histogram, @@ -316,23 +317,28 @@ class AsyncClient : public Client { } private: - class boolean { // exists only to avoid data-race on vector + class boolean { // exists only to avoid data-race on vector public: - boolean(): val_(false) {} - boolean(bool b): val_(b) {} - operator bool() const {return val_;} - boolean& operator=(bool b) {val_=b; return *this;} + boolean() : val_(false) {} + boolean(bool b) : val_(b) {} + operator bool() const { return val_; } + boolean& operator=(bool b) { + val_ = b; + return *this; + } + private: bool val_; }; std::vector> cli_cqs_; std::vector rpc_deadlines_; // per thread deadlines - std::vector next_channel_; // per thread round-robin channel ctr - std::vector issue_allowed_; // may this thread attempt to issue - std::vector next_issue_; // when should it issue? + std::vector next_channel_; // per thread round-robin channel ctr + std::vector issue_allowed_; // may this thread attempt to issue + std::vector next_issue_; // when should it issue? - std::vector channel_lock_; + std::mutex* + channel_lock_; // a vector, but avoid std::vector for old compilers std::vector contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; @@ -348,15 +354,17 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr> + StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { + return stub->AsyncUnaryCall(ctx, request, cq); + }; static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, const SimpleRequest& req) { - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { - return stub->AsyncUnaryCall(ctx, request, cq); - }; return new ClientRpcContextUnaryImpl( - channel_id, stub, req, start_req, check_done); + channel_id, stub, req, AsyncUnaryClient::StartReq, + AsyncUnaryClient::CheckDone); } }; @@ -442,16 +450,19 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr< + grpc::ClientAsyncReaderWriter> + StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq, void* tag) { + auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + return stream; + }; static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, const SimpleRequest& req) { - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); - return stream; - }; return new ClientRpcContextStreamingImpl( - channel_id, stub, req, start_req, check_done); + channel_id, stub, req, AsyncStreamingClient::StartReq, + AsyncStreamingClient::CheckDone); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 718698bfe1d..db5416a707e 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -45,8 +45,9 @@ #include #include #include -#include #include +#include +#include #include #include #include @@ -79,7 +80,9 @@ class SynchronousClient : public Client { void WaitToIssue(int thread_idx) { grpc_time next_time; if (NextIssueTime(thread_idx, &next_time)) { - std::this_thread::sleep_until(next_time); + gpr_timespec next_timespec; + TimepointHR2Timespec(next_time, &next_timespec); + gpr_sleep_until(next_timespec); } } @@ -110,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) - : SynchronousClient(config), - context_(num_threads_), - stream_(num_threads_) { + : SynchronousClient(config) { + context_ = new grpc::ClientContext[num_threads_]; + stream_ = new std::unique_ptr< + grpc::ClientReaderWriter>[num_threads_]; for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); @@ -121,12 +125,15 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } ~SynchronousStreamingClient() { EndThreads(); - for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { + for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; + stream++) { if (*stream) { (*stream)->WritesDone(); EXPECT_TRUE((*stream)->Finish().ok()); } } + delete[] stream_; + delete[] context_; } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { @@ -141,9 +148,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } private: - std::vector context_; - std::vector>> stream_; + // These are both conceptually std::vector but cannot be for old compilers + // that expect contained classes to support copy constructors + grpc::ClientContext* context_; + std::unique_ptr>* + stream_; }; std::unique_ptr CreateSynchronousUnaryClient( diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index a0360295e09..78e37209382 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -77,16 +77,34 @@ static deque get_hosts(const string& name) { } } +// Namespace for classes and functions used only in RunScenario +// Using this rather than local definitions to workaround gcc-4.4 limitations +// regarding using templates without linkage +namespace runsc { + +// ClientContext allocator +static ClientContext* AllocContext(list* contexts) { + contexts->emplace_back(); + return &contexts->back(); +} + +struct ServerData { + unique_ptr stub; + unique_ptr> stream; +}; + +struct ClientData { + unique_ptr stub; + unique_ptr> stream; +}; +} // namespace runsc + std::unique_ptr RunScenario( const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { - // ClientContext allocator (all are destroyed at scope exit) + // ClientContext allocations (all are destroyed at scope exit) list contexts; - auto alloc_context = [&contexts]() { - contexts.emplace_back(); - return &contexts.back(); - }; // To be added to the result, containing the final configuration used for // client and config (incluiding host, etc.) @@ -131,23 +149,22 @@ std::unique_ptr RunScenario( workers.resize(num_clients + num_servers); // Start servers - struct ServerData { - unique_ptr stub; - unique_ptr> stream; - }; - vector servers; + using runsc::ServerData; + // servers is array rather than std::vector to avoid gcc-4.4 issues + // where class contained in std::vector must have a copy constructor + auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { - ServerData sd; - sd.stub = std::move(Worker::NewStub( + servers[i].stub = std::move(Worker::NewStub( CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); ServerArgs args; result_server_config = server_config; result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; - sd.stream = std::move(sd.stub->RunServer(alloc_context())); - GPR_ASSERT(sd.stream->Write(args)); + servers[i].stream = + std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts))); + GPR_ASSERT(servers[i].stream->Write(args)); ServerStatus init_status; - GPR_ASSERT(sd.stream->Read(&init_status)); + GPR_ASSERT(servers[i].stream->Read(&init_status)); char* host; char* driver_port; char* cli_target; @@ -157,30 +174,25 @@ std::unique_ptr RunScenario( gpr_free(host); gpr_free(driver_port); gpr_free(cli_target); - - servers.push_back(std::move(sd)); } // Start clients - struct ClientData { - unique_ptr stub; - unique_ptr> stream; - }; - vector clients; + using runsc::ClientData; + // clients is array rather than std::vector to avoid gcc-4.4 issues + // where class contained in std::vector must have a copy constructor + auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { - ClientData cd; - cd.stub = std::move(Worker::NewStub(CreateChannel( + clients[i].stub = std::move(Worker::NewStub(CreateChannel( workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); ClientArgs args; result_client_config = client_config; result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; - cd.stream = std::move(cd.stub->RunTest(alloc_context())); - GPR_ASSERT(cd.stream->Write(args)); + clients[i].stream = + std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts))); + GPR_ASSERT(clients[i].stream->Write(args)); ClientStatus init_status; - GPR_ASSERT(cd.stream->Read(&init_status)); - - clients.push_back(std::move(cd)); + GPR_ASSERT(clients[i].stream->Read(&init_status)); } // Let everything warmup @@ -195,23 +207,25 @@ std::unique_ptr RunScenario( server_mark.mutable_mark(); ClientArgs client_mark; client_mark.mutable_mark(); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } ServerStatus server_status; ClientStatus client_status; - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); } // Wait some time gpr_log(GPR_INFO, "Running"); + // Use gpr_sleep_until rather than this_thread::sleep_until to support + // compilers that don't work with this_thread gpr_sleep_until(gpr_time_add( start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN))); @@ -220,34 +234,36 @@ std::unique_ptr RunScenario( result->client_config = result_client_config; result->server_config = result_server_config; gpr_log(GPR_INFO, "Finishing"); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); - result->server_resources.push_back(ResourceUsage{ - stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result->server_resources.emplace_back( + stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); result->latencies.MergeProto(stats.latencies()); - result->client_resources.push_back(ResourceUsage{ - stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result->client_resources.emplace_back( + stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->Finish().ok()); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->WritesDone()); GPR_ASSERT(server->stream->Finish().ok()); } + delete[] clients; + delete[] servers; return result; } } // namespace testing diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 5e9d4b3cb92..9a29df8d494 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -41,10 +41,18 @@ namespace grpc { namespace testing { -struct ResourceUsage { - double wall_time; - double user_time; - double system_time; +class ResourceUsage { + public: + ResourceUsage(double w, double u, double s) + : wall_time_(w), user_time_(u), system_time_(s) {} + double wall_time() const { return wall_time_; } + double user_time() const { return user_time_; } + double system_time() const { return system_time_; } + + private: + double wall_time_; + double user_time_; + double system_time_; }; struct ScenarioResult { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h index f90a17a8945..04d14f689fc 100644 --- a/test/cpp/qps/interarrival.h +++ b/test/cpp/qps/interarrival.h @@ -36,7 +36,8 @@ #include #include -#include +#include +#include #include @@ -141,17 +142,16 @@ class ParetoDist GRPC_FINAL : public RandomDist { // in an efficient re-entrant way. The random table is built at construction // time, and each call must include the thread id of the invoker -typedef std::default_random_engine qps_random_engine; - class InterarrivalTimer { public: InterarrivalTimer() {} void init(const RandomDist& r, int threads, int entries = 1000000) { - qps_random_engine gen; - std::uniform_real_distribution uniform(0.0, 1.0); for (int i = 0; i < entries; i++) { - random_table_.push_back(std::chrono::nanoseconds( - static_cast(1e9 * r(uniform(gen))))); + // rand is the only choice that is portable across POSIX and Windows + // and that supports new and old compilers + const double uniform_0_1 = rand() / RAND_MAX; + random_table_.push_back( + std::chrono::nanoseconds(static_cast(1e9 * r(uniform_0_1)))); } // Now set up the thread positions for (int i = 0; i < threads; i++) { diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index d5348463650..b1463be8f62 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -33,6 +33,7 @@ #include #include +#include #include #include diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index ff01ec15014..e03e8e1fb08 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -34,11 +34,16 @@ #include "test/cpp/qps/report.h" #include +#include "test/cpp/qps/driver.h" #include "test/cpp/qps/stats.h" namespace grpc { namespace testing { +static double WallTime(ResourceUsage u) { return u.wall_time(); } +static double UserTime(ResourceUsage u) { return u.user_time(); } +static double SystemTime(ResourceUsage u) { return u.system_time(); } + void CompositeReporter::add(std::unique_ptr reporter) { reporters_.emplace_back(std::move(reporter)); } @@ -68,16 +73,14 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) { } void GprLogReporter::ReportQPS(const ScenarioResult& result) { - gpr_log(GPR_INFO, "QPS: %.1f", - result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + gpr_log( + GPR_INFO, "QPS: %.1f", + result.latencies.Count() / average(result.client_resources, WallTime)); } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps / result.server_config.threads()); @@ -95,40 +98,30 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) { void GprLogReporter::ReportTimes(const ScenarioResult& result) { gpr_log(GPR_INFO, "Server system time: %.2f%%", - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.server_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.server_resources, SystemTime) / + sum(result.server_resources, WallTime)); gpr_log(GPR_INFO, "Server user time: %.2f%%", - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.server_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.server_resources, UserTime) / + sum(result.server_resources, WallTime)); gpr_log(GPR_INFO, "Client system time: %.2f%%", - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.client_resources, SystemTime) / + sum(result.client_resources, WallTime)); gpr_log(GPR_INFO, "Client user time: %.2f%%", - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.client_resources, UserTime) / + sum(result.client_resources, WallTime)); } void PerfDbReporter::ReportQPS(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); perf_db_client_.setQps(qps); perf_db_client_.setConfigs(result.client_config, result.server_config); } void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); auto qpsPerCore = qps / result.server_config.threads(); @@ -139,33 +132,29 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { void PerfDbReporter::ReportLatency(const ScenarioResult& result) { perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000, - result.latencies.Percentile(90) / 1000, - result.latencies.Percentile(95) / 1000, - result.latencies.Percentile(99) / 1000, - result.latencies.Percentile(99.9) / 1000); + result.latencies.Percentile(90) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); perf_db_client_.setConfigs(result.client_config, result.server_config); } void PerfDbReporter::ReportTimes(const ScenarioResult& result) { - double server_system_time = - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); - double server_user_time = - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); - double client_system_time = - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); - double client_user_time = - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); - - perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time, - client_user_time); + const double server_system_time = 100.0 * + sum(result.server_resources, SystemTime) / + sum(result.server_resources, WallTime); + const double server_user_time = 100.0 * + sum(result.server_resources, UserTime) / + sum(result.server_resources, WallTime); + const double client_system_time = 100.0 * + sum(result.client_resources, SystemTime) / + sum(result.client_resources, WallTime); + const double client_user_time = 100.0 * + sum(result.client_resources, UserTime) / + sum(result.client_resources, WallTime); + + perf_db_client_.setTimes(server_system_time, server_user_time, + client_system_time, client_user_time); perf_db_client_.setConfigs(result.client_config, result.server_config); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 33b6fa55c38..b4fc49c31c3 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -99,25 +99,7 @@ class AsyncQpsServerTest : public Server { shutdown_state_.emplace_back(new PerThreadShutdownState()); } for (int i = 0; i < config.threads(); i++) { - threads_.push_back(std::thread([=]() { - // Wait until work is available or we are shutting down - bool ok; - void *got_tag; - while (srv_cqs_[i]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); - // The tag is a pointer to an RPC context to invoke - bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[i]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { - return; - } - } - return; - })); + threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { @@ -142,6 +124,26 @@ class AsyncQpsServerTest : public Server { } private: + void ThreadFunc(int rank) { + // Wait until work is available or we are shutting down + bool ok; + void *got_tag; + while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + ServerRpcContext *ctx = detag(got_tag); + // The tag is a pointer to an RPC context to invoke + const bool still_going = ctx->RunNextState(ok); + if (!shutdown_state_[rank]->shutdown()) { + // this RPC context is done, so refresh it + if (!still_going) { + ctx->Reset(); + } + } else { + return; + } + } + return; + } + class ServerRpcContext { public: ServerRpcContext() {}