Merge pull request #2771 from vjpai/asklipion2

Remove C++11 dependencies in QPS test that break old compilers
pull/2612/head
David G. Quintas 9 years ago
commit 46e786ba60
  1. 82
      test/cpp/qps/client.h
  2. 57
      test/cpp/qps/client_async.cc
  3. 27
      test/cpp/qps/client_sync.cc
  4. 102
      test/cpp/qps/driver.cc
  5. 16
      test/cpp/qps/driver.h
  6. 14
      test/cpp/qps/interarrival.h
  7. 1
      test/cpp/qps/qps_driver.cc
  8. 93
      test/cpp/qps/report.cc
  9. 40
      test/cpp/qps/server_async.cc

@ -41,6 +41,7 @@
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
#include <grpc++/config.h>
namespace grpc { namespace grpc {
@ -67,10 +68,12 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time;
class Client { class Client {
public: public:
explicit Client(const ClientConfig& config) explicit Client(const ClientConfig& config)
: timer_(new Timer), interarrival_timer_() { : channels_(config.client_channels()),
timer_(new Timer),
interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) { for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo( channels_[i].init(config.server_targets(i % config.server_targets_size()),
config.server_targets(i % config.server_targets_size()), config)); config);
} }
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size()); request_.set_response_size(config.payload_size());
@ -79,7 +82,8 @@ class Client {
ClientStats Mark() { ClientStats Mark() {
Histogram latencies; Histogram latencies;
std::vector<Histogram> to_merge(threads_.size()); // avoid std::vector for old compilers that expect a copy constructor
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) { for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]); threads_[i]->BeginSwap(&to_merge[i]);
} }
@ -89,6 +93,7 @@ class Client {
threads_[i]->EndSwap(); threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]); latencies.Merge(&to_merge[i]);
} }
delete[] to_merge;
auto timer_result = timer->Mark(); auto timer_result = timer->Mark();
@ -106,9 +111,20 @@ class Client {
class ClientChannelInfo { class ClientChannelInfo {
public: public:
ClientChannelInfo(const grpc::string& target, const ClientConfig& config) ClientChannelInfo() {}
: channel_(CreateTestChannel(target, config.enable_ssl())), ClientChannelInfo(const ClientChannelInfo& i) {
stub_(TestService::NewStub(channel_)) {} // The copy constructor is to satisfy old compilers
// that need it for using std::vector . It is only ever
// used for empty entries
GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(target, config.enable_ssl());
stub_ = TestService::NewStub(channel_);
}
ChannelInterface* get_channel() { return channel_.get(); } ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); } TestService::Stub* get_stub() { return stub_.get(); }
@ -189,27 +205,9 @@ class Client {
Thread(Client* client, size_t idx) Thread(Client* client, size_t idx)
: done_(false), : done_(false),
new_(nullptr), new_(nullptr),
impl_([this, idx, client]() { client_(client),
for (;;) { idx_(idx),
// run the loop body impl_(&Thread::ThreadFunc, this) {}
bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}) {}
~Thread() { ~Thread() {
{ {
@ -226,13 +224,37 @@ class Client {
void EndSwap() { void EndSwap() {
std::unique_lock<std::mutex> g(mu_); std::unique_lock<std::mutex> g(mu_);
cv_.wait(g, [this]() { return new_ == nullptr; }); while (new_ != nullptr) {
cv_.wait(g);
};
} }
private: private:
Thread(const Thread&); Thread(const Thread&);
Thread& operator=(const Thread&); Thread& operator=(const Thread&);
void ThreadFunc() {
for (;;) {
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}
TestService::Stub* stub_; TestService::Stub* stub_;
ClientConfig config_; ClientConfig config_;
std::mutex mu_; std::mutex mu_;
@ -240,6 +262,8 @@ class Client {
bool done_; bool done_;
Histogram* new_; Histogram* new_;
Histogram histogram_; Histogram histogram_;
Client* client_;
size_t idx_;
std::thread impl_; std::thread impl_;
}; };

@ -156,7 +156,7 @@ class AsyncClient : public Client {
std::function<ClientRpcContext*(int, TestService::Stub*, std::function<ClientRpcContext*(int, TestService::Stub*,
const SimpleRequest&)> setup_ctx) const SimpleRequest&)> setup_ctx)
: Client(config), : Client(config),
channel_lock_(config.client_channels()), channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()), contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()), channel_count_(config.client_channels()),
@ -208,6 +208,7 @@ class AsyncClient : public Client {
delete ctx; delete ctx;
} }
} }
delete[] channel_lock_;
} }
bool ThreadFunc(Histogram* histogram, bool ThreadFunc(Histogram* histogram,
@ -316,23 +317,28 @@ class AsyncClient : public Client {
} }
private: private:
class boolean { // exists only to avoid data-race on vector<bool> class boolean { // exists only to avoid data-race on vector<bool>
public: public:
boolean(): val_(false) {} boolean() : val_(false) {}
boolean(bool b): val_(b) {} boolean(bool b) : val_(b) {}
operator bool() const {return val_;} operator bool() const { return val_; }
boolean& operator=(bool b) {val_=b; return *this;} boolean& operator=(bool b) {
val_ = b;
return *this;
}
private: private:
bool val_; bool val_;
}; };
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<deadline_list> rpc_deadlines_; // per thread deadlines std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
std::vector<int> next_channel_; // per thread round-robin channel ctr std::vector<int> next_channel_; // per thread round-robin channel ctr
std::vector<boolean> issue_allowed_; // may this thread attempt to issue std::vector<boolean> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue? std::vector<grpc_time> next_issue_; // when should it issue?
std::vector<std::mutex> channel_lock_; std::mutex*
channel_lock_; // a vector, but avoid std::vector for old compilers
std::vector<context_list> contexts_; // per-channel list of idle contexts std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_; int max_outstanding_per_channel_;
int channel_count_; int channel_count_;
@ -348,15 +354,17 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private: private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, start_req, check_done); channel_id, stub, req, AsyncUnaryClient::StartReq,
AsyncUnaryClient::CheckDone);
} }
}; };
@ -442,16 +450,19 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private: private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, start_req, check_done); channel_id, stub, req, AsyncStreamingClient::StartReq,
AsyncStreamingClient::CheckDone);
} }
}; };

@ -45,8 +45,9 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/histogram.h> #include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include <grpc/support/host_port.h> #include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include <grpc++/server.h> #include <grpc++/server.h>
@ -79,7 +80,9 @@ class SynchronousClient : public Client {
void WaitToIssue(int thread_idx) { void WaitToIssue(int thread_idx) {
grpc_time next_time; grpc_time next_time;
if (NextIssueTime(thread_idx, &next_time)) { if (NextIssueTime(thread_idx, &next_time)) {
std::this_thread::sleep_until(next_time); gpr_timespec next_timespec;
TimepointHR2Timespec(next_time, &next_timespec);
gpr_sleep_until(next_timespec);
} }
} }
@ -110,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public: public:
SynchronousStreamingClient(const ClientConfig& config) SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config), : SynchronousClient(config) {
context_(num_threads_), context_ = new grpc::ClientContext[num_threads_];
stream_(num_threads_) { stream_ = new std::unique_ptr<
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>[num_threads_];
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
@ -121,12 +125,15 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
} }
~SynchronousStreamingClient() { ~SynchronousStreamingClient() {
EndThreads(); EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
stream++) {
if (*stream) { if (*stream) {
(*stream)->WritesDone(); (*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok()); EXPECT_TRUE((*stream)->Finish().ok());
} }
} }
delete[] stream_;
delete[] context_;
} }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
@ -141,9 +148,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
} }
private: private:
std::vector<grpc::ClientContext> context_; // These are both conceptually std::vector but cannot be for old compilers
std::vector<std::unique_ptr< // that expect contained classes to support copy constructors
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_; grpc::ClientContext* context_;
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>*
stream_;
}; };
std::unique_ptr<Client> CreateSynchronousUnaryClient( std::unique_ptr<Client> CreateSynchronousUnaryClient(

@ -77,16 +77,34 @@ static deque<string> get_hosts(const string& name) {
} }
} }
// Namespace for classes and functions used only in RunScenario
// Using this rather than local definitions to workaround gcc-4.4 limitations
// regarding using templates without linkage
namespace runsc {
// ClientContext allocator
static ClientContext* AllocContext(list<ClientContext>* contexts) {
contexts->emplace_back();
return &contexts->back();
}
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
std::unique_ptr<ScenarioResult> RunScenario( std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients, const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers, int warmup_seconds, const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
int benchmark_seconds, int spawn_local_worker_count) { int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocator (all are destroyed at scope exit) // ClientContext allocations (all are destroyed at scope exit)
list<ClientContext> contexts; list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
contexts.emplace_back();
return &contexts.back();
};
// To be added to the result, containing the final configuration used for // To be added to the result, containing the final configuration used for
// client and config (incluiding host, etc.) // client and config (incluiding host, etc.)
@ -131,23 +149,22 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.resize(num_clients + num_servers); workers.resize(num_clients + num_servers);
// Start servers // Start servers
struct ServerData { using runsc::ServerData;
unique_ptr<Worker::Stub> stub; // servers is array rather than std::vector to avoid gcc-4.4 issues
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; // where class contained in std::vector must have a copy constructor
}; auto* servers = new ServerData[num_servers];
vector<ServerData> servers;
for (size_t i = 0; i < num_servers; i++) { for (size_t i = 0; i < num_servers; i++) {
ServerData sd; servers[i].stub = std::move(Worker::NewStub(
sd.stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args; ServerArgs args;
result_server_config = server_config; result_server_config = server_config;
result_server_config.set_host(workers[i]); result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config; *args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context())); servers[i].stream =
GPR_ASSERT(sd.stream->Write(args)); std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts)));
GPR_ASSERT(servers[i].stream->Write(args));
ServerStatus init_status; ServerStatus init_status;
GPR_ASSERT(sd.stream->Read(&init_status)); GPR_ASSERT(servers[i].stream->Read(&init_status));
char* host; char* host;
char* driver_port; char* driver_port;
char* cli_target; char* cli_target;
@ -157,30 +174,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_free(host); gpr_free(host);
gpr_free(driver_port); gpr_free(driver_port);
gpr_free(cli_target); gpr_free(cli_target);
servers.push_back(std::move(sd));
} }
// Start clients // Start clients
struct ClientData { using runsc::ClientData;
unique_ptr<Worker::Stub> stub; // clients is array rather than std::vector to avoid gcc-4.4 issues
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; // where class contained in std::vector must have a copy constructor
}; auto* clients = new ClientData[num_clients];
vector<ClientData> clients;
for (size_t i = 0; i < num_clients; i++) { for (size_t i = 0; i < num_clients; i++) {
ClientData cd; clients[i].stub = std::move(Worker::NewStub(CreateChannel(
cd.stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args; ClientArgs args;
result_client_config = client_config; result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]); result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config; *args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context())); clients[i].stream =
GPR_ASSERT(cd.stream->Write(args)); std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts)));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status; ClientStatus init_status;
GPR_ASSERT(cd.stream->Read(&init_status)); GPR_ASSERT(clients[i].stream->Read(&init_status));
clients.push_back(std::move(cd));
} }
// Let everything warmup // Let everything warmup
@ -195,23 +207,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark(); server_mark.mutable_mark();
ClientArgs client_mark; ClientArgs client_mark;
client_mark.mutable_mark(); client_mark.mutable_mark();
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark)); GPR_ASSERT(server->stream->Write(server_mark));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark)); GPR_ASSERT(client->stream->Write(client_mark));
} }
ServerStatus server_status; ServerStatus server_status;
ClientStatus client_status; ClientStatus client_status;
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status)); GPR_ASSERT(server->stream->Read(&server_status));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status)); GPR_ASSERT(client->stream->Read(&client_status));
} }
// Wait some time // Wait some time
gpr_log(GPR_INFO, "Running"); gpr_log(GPR_INFO, "Running");
// Use gpr_sleep_until rather than this_thread::sleep_until to support
// compilers that don't work with this_thread
gpr_sleep_until(gpr_time_add( gpr_sleep_until(gpr_time_add(
start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN))); start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
@ -220,34 +234,36 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->client_config = result_client_config; result->client_config = result_client_config;
result->server_config = result_server_config; result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing"); gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark)); GPR_ASSERT(server->stream->Write(server_mark));
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark)); GPR_ASSERT(client->stream->Write(client_mark));
} }
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status)); GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats(); const auto& stats = server_status.stats();
result->server_resources.push_back(ResourceUsage{ result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system()}); stats.time_elapsed(), stats.time_user(), stats.time_system());
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status)); GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats(); const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies()); result->latencies.MergeProto(stats.latencies());
result->client_resources.push_back(ResourceUsage{ result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system()}); stats.time_elapsed(), stats.time_user(), stats.time_system());
} }
for (auto client = clients.begin(); client != clients.end(); client++) { for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok()); GPR_ASSERT(client->stream->Finish().ok());
} }
for (auto server = servers.begin(); server != servers.end(); server++) { for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->WritesDone()); GPR_ASSERT(server->stream->WritesDone());
GPR_ASSERT(server->stream->Finish().ok()); GPR_ASSERT(server->stream->Finish().ok());
} }
delete[] clients;
delete[] servers;
return result; return result;
} }
} // namespace testing } // namespace testing

@ -41,10 +41,18 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
struct ResourceUsage { class ResourceUsage {
double wall_time; public:
double user_time; ResourceUsage(double w, double u, double s)
double system_time; : wall_time_(w), user_time_(u), system_time_(s) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
private:
double wall_time_;
double user_time_;
double system_time_;
}; };
struct ScenarioResult { struct ScenarioResult {

@ -36,7 +36,8 @@
#include <chrono> #include <chrono>
#include <cmath> #include <cmath>
#include <random> #include <cstdlib>
#include <vector>
#include <grpc++/config.h> #include <grpc++/config.h>
@ -141,17 +142,16 @@ class ParetoDist GRPC_FINAL : public RandomDist {
// in an efficient re-entrant way. The random table is built at construction // in an efficient re-entrant way. The random table is built at construction
// time, and each call must include the thread id of the invoker // time, and each call must include the thread id of the invoker
typedef std::default_random_engine qps_random_engine;
class InterarrivalTimer { class InterarrivalTimer {
public: public:
InterarrivalTimer() {} InterarrivalTimer() {}
void init(const RandomDist& r, int threads, int entries = 1000000) { void init(const RandomDist& r, int threads, int entries = 1000000) {
qps_random_engine gen;
std::uniform_real_distribution<double> uniform(0.0, 1.0);
for (int i = 0; i < entries; i++) { for (int i = 0; i < entries; i++) {
random_table_.push_back(std::chrono::nanoseconds( // rand is the only choice that is portable across POSIX and Windows
static_cast<int64_t>(1e9 * r(uniform(gen))))); // and that supports new and old compilers
const double uniform_0_1 = rand() / RAND_MAX;
random_table_.push_back(
std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1))));
} }
// Now set up the thread positions // Now set up the thread positions
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {

@ -33,6 +33,7 @@
#include <memory> #include <memory>
#include <set> #include <set>
#include <signal.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>

@ -34,11 +34,16 @@
#include "test/cpp/qps/report.h" #include "test/cpp/qps/report.h"
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/stats.h" #include "test/cpp/qps/stats.h"
namespace grpc { namespace grpc {
namespace testing { namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) { void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter)); reporters_.emplace_back(std::move(reporter));
} }
@ -68,16 +73,14 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) {
} }
void GprLogReporter::ReportQPS(const ScenarioResult& result) { void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", gpr_log(
result.latencies.Count() / GPR_INFO, "QPS: %.1f",
average(result.client_resources, result.latencies.Count() / average(result.client_resources, WallTime));
[](ResourceUsage u) { return u.wall_time; }));
} }
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps = result.latencies.Count() / auto qps =
average(result.client_resources, result.latencies.Count() / average(result.client_resources, WallTime);
[](ResourceUsage u) { return u.wall_time; });
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads()); qps / result.server_config.threads());
@ -95,40 +98,30 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) {
void GprLogReporter::ReportTimes(const ScenarioResult& result) { void GprLogReporter::ReportTimes(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Server system time: %.2f%%", gpr_log(GPR_INFO, "Server system time: %.2f%%",
100.0 * sum(result.server_resources, 100.0 * sum(result.server_resources, SystemTime) /
[](ResourceUsage u) { return u.system_time; }) / sum(result.server_resources, WallTime));
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Server user time: %.2f%%", gpr_log(GPR_INFO, "Server user time: %.2f%%",
100.0 * sum(result.server_resources, 100.0 * sum(result.server_resources, UserTime) /
[](ResourceUsage u) { return u.user_time; }) / sum(result.server_resources, WallTime));
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Client system time: %.2f%%", gpr_log(GPR_INFO, "Client system time: %.2f%%",
100.0 * sum(result.client_resources, 100.0 * sum(result.client_resources, SystemTime) /
[](ResourceUsage u) { return u.system_time; }) / sum(result.client_resources, WallTime));
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Client user time: %.2f%%", gpr_log(GPR_INFO, "Client user time: %.2f%%",
100.0 * sum(result.client_resources, 100.0 * sum(result.client_resources, UserTime) /
[](ResourceUsage u) { return u.user_time; }) / sum(result.client_resources, WallTime));
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
} }
void PerfDbReporter::ReportQPS(const ScenarioResult& result) { void PerfDbReporter::ReportQPS(const ScenarioResult& result) {
auto qps = result.latencies.Count() / auto qps =
average(result.client_resources, result.latencies.Count() / average(result.client_resources, WallTime);
[](ResourceUsage u) { return u.wall_time; });
perf_db_client_.setQps(qps); perf_db_client_.setQps(qps);
perf_db_client_.setConfigs(result.client_config, result.server_config); perf_db_client_.setConfigs(result.client_config, result.server_config);
} }
void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps = result.latencies.Count() / auto qps =
average(result.client_resources, result.latencies.Count() / average(result.client_resources, WallTime);
[](ResourceUsage u) { return u.wall_time; });
auto qpsPerCore = qps / result.server_config.threads(); auto qpsPerCore = qps / result.server_config.threads();
@ -139,33 +132,29 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
void PerfDbReporter::ReportLatency(const ScenarioResult& result) { void PerfDbReporter::ReportLatency(const ScenarioResult& result) {
perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000, perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000,
result.latencies.Percentile(90) / 1000, result.latencies.Percentile(90) / 1000,
result.latencies.Percentile(95) / 1000, result.latencies.Percentile(95) / 1000,
result.latencies.Percentile(99) / 1000, result.latencies.Percentile(99) / 1000,
result.latencies.Percentile(99.9) / 1000); result.latencies.Percentile(99.9) / 1000);
perf_db_client_.setConfigs(result.client_config, result.server_config); perf_db_client_.setConfigs(result.client_config, result.server_config);
} }
void PerfDbReporter::ReportTimes(const ScenarioResult& result) { void PerfDbReporter::ReportTimes(const ScenarioResult& result) {
double server_system_time = const double server_system_time = 100.0 *
100.0 * sum(result.server_resources, sum(result.server_resources, SystemTime) /
[](ResourceUsage u) { return u.system_time; }) / sum(result.server_resources, WallTime);
sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); const double server_user_time = 100.0 *
double server_user_time = sum(result.server_resources, UserTime) /
100.0 * sum(result.server_resources, sum(result.server_resources, WallTime);
[](ResourceUsage u) { return u.user_time; }) / const double client_system_time = 100.0 *
sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); sum(result.client_resources, SystemTime) /
double client_system_time = sum(result.client_resources, WallTime);
100.0 * sum(result.client_resources, const double client_user_time = 100.0 *
[](ResourceUsage u) { return u.system_time; }) / sum(result.client_resources, UserTime) /
sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); sum(result.client_resources, WallTime);
double client_user_time =
100.0 * sum(result.client_resources, perf_db_client_.setTimes(server_system_time, server_user_time,
[](ResourceUsage u) { return u.user_time; }) / client_system_time, client_user_time);
sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; });
perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time,
client_user_time);
perf_db_client_.setConfigs(result.client_config, result.server_config); perf_db_client_.setConfigs(result.client_config, result.server_config);
} }

@ -99,25 +99,7 @@ class AsyncQpsServerTest : public Server {
shutdown_state_.emplace_back(new PerThreadShutdownState()); shutdown_state_.emplace_back(new PerThreadShutdownState());
} }
for (int i = 0; i < config.threads(); i++) { for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
if (!shutdown_state_[i]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
ctx->Reset();
}
} else {
return;
}
}
return;
}));
} }
} }
~AsyncQpsServerTest() { ~AsyncQpsServerTest() {
@ -142,6 +124,26 @@ class AsyncQpsServerTest : public Server {
} }
private: private:
void ThreadFunc(int rank) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
const bool still_going = ctx->RunNextState(ok);
if (!shutdown_state_[rank]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
ctx->Reset();
}
} else {
return;
}
}
return;
}
class ServerRpcContext { class ServerRpcContext {
public: public:
ServerRpcContext() {} ServerRpcContext() {}

Loading…
Cancel
Save