Rework QPS client/server

Now setup as a driver and N anonymous workers that may become clients or servers.
Will convert async soon.
pull/837/head
Craig Tiller 10 years ago
parent 32083bd771
commit 6af9ed0bf7
  1. 256
      test/cpp/qps/client.cc
  2. 2
      test/cpp/qps/client.h
  3. 168
      test/cpp/qps/driver.cc
  4. 4
      test/cpp/qps/driver.h
  5. 63
      test/cpp/qps/histogram.h
  6. 18
      test/cpp/qps/qps_driver.cc
  7. 26
      test/cpp/qps/qpstest.proto
  8. 130
      test/cpp/qps/server.cc
  9. 2
      test/cpp/qps/server.h
  10. 18
      test/cpp/qps/single_run_localhost.sh
  11. 6
      test/cpp/qps/timer.cc
  12. 2
      test/cpp/qps/timer.h
  13. 31
      test/cpp/qps/worker.cc

@ -53,193 +53,133 @@
#include <grpc++/server_builder.h>
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/qpstest.pb.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/timer.h"
DEFINE_int32(driver_port, 0, "Client driver port.");
using grpc::ChannelInterface;
using grpc::CreateTestChannel;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::testing::ClientArgs;
using grpc::testing::ClientConfig;
using grpc::testing::ClientResult;
using grpc::testing::QpsClient;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
using namespace google;
using namespace gflags;
static double now() {
gpr_timespec tv = gpr_now();
return 1e9 * tv.tv_sec + tv.tv_nsec;
}
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
ClientResult RunTest(const ClientArgs& args) {
const auto& config = args.config();
namespace grpc {
namespace testing {
gpr_log(GPR_INFO,
"QPS test with parameters\n"
"enable_ssl = %d\n"
"client_channels = %d\n"
"client_threads = %d\n"
"num_rpcs = %d\n"
"payload_size = %d\n",
config.enable_ssl(), config.client_channels(), config.client_threads(), config.num_rpcs(),
config.payload_size());
class ClientChannelInfo {
class SynchronousClient GRPC_FINAL : public Client {
public:
explicit ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface *get_channel() { return channel_.get(); }
TestService::Stub *get_stub() { return stub_.get(); }
private:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels;
SynchronousClient(const ClientConfig& config) : timer_(new Timer) {
for (int i = 0; i < config.client_channels(); i++) {
channels.push_back(ClientChannelInfo(args.server_targets(i % args.server_targets_size()), config));
channels_.push_back(ClientChannelInfo(config.server_targets(i % config.server_targets_size()), config));
auto* stub = channels_.back().get_stub();
for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) {
threads_.emplace_back(new Thread(stub, config));
}
}
}
std::vector<std::thread> threads; // Will add threads when ready to execute
std::vector< ::gpr_histogram *> thread_stats(config.client_threads());
grpc::ClientContext context_stats_begin;
grpc_profiler_start("qps_client.prof");
Timer timer;
ClientStats Mark() {
Histogram latencies;
std::vector<Histogram> to_merge(threads_.size());
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
}
for (int i = 0; i < config.client_threads(); i++) {
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
thread_stats[i] = hist;
auto timer_result = timer->Mark();
ClientStats stats;
auto* l = stats.mutable_latencies();
l->set_l_50(latencies.Percentile(50));
l->set_l_90(latencies.Percentile(90));
l->set_l_99(latencies.Percentile(99));
l->set_l_999(latencies.Percentile(99.9));
stats.set_num_rpcs(latencies.Count());
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
threads.push_back(
std::thread([hist, config, &channels](int channel_num) {
private:
class Thread {
public:
Thread(TestService::Stub* stub, const ClientConfig& config) : stub_(stub), config_(config), done_(false), new_(nullptr), impl_([this]() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(
grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(config.payload_size());
for (int j = 0; j < config.num_rpcs(); j++) {
TestService::Stub *stub =
channels[channel_num].get_stub();
double start = now();
request.set_response_size(config_.payload_size());
for (;;) {
{
std::lock_guard<std::mutex> g(mu_);
if (done_) return;
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request, &response);
gpr_histogram_add(hist, now() - start);
GPR_ASSERT((s.code() == grpc::StatusCode::OK) &&
(response.payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) &&
(response.payload().body().length() ==
static_cast<size_t>(config.payload_size())));
// Now do runtime round-robin assignment of the next
// channel number
channel_num += config.client_threads();
channel_num %= config.client_channels();
}
},
i % config.client_channels()));
stub_->UnaryCall(&context, request, &response);
histogram_.Add((Timer::Now() - start) * 1e9);
}
}) {}
for (auto &t : threads) {
t.join();
~Thread() {
{
std::lock_guard<std::mutex> g(mu_);
done_ = true;
}
auto timer_result = timer.Mark();
grpc_profiler_stop();
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
for (int i = 0; i < config.client_threads(); i++) {
gpr_histogram *h = thread_stats[i];
gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f",
i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90),
gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99),
gpr_histogram_percentile(h, 99.9));
gpr_histogram_merge(hist, h);
gpr_histogram_destroy(h);
impl_.join();
}
ClientResult result;
auto* latencies = result.mutable_latencies();
latencies->set_l_50(gpr_histogram_percentile(hist, 50));
latencies->set_l_90(gpr_histogram_percentile(hist, 90));
latencies->set_l_99(gpr_histogram_percentile(hist, 99));
latencies->set_l_999(gpr_histogram_percentile(hist, 99.9));
result.set_num_rpcs(config.client_threads() * config.num_rpcs());
result.set_time_elapsed(timer_result.wall);
result.set_time_system(timer_result.system);
result.set_time_user(timer_result.user);
gpr_histogram_destroy(hist);
return result;
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_ = n;
}
class ClientImpl final : public QpsClient::Service {
public:
Status RunTest(ServerContext* ctx, const ClientArgs* args, ClientResult* result) override {
*result = ::RunTest(*args);
return Status::OK;
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
cv_.wait(g, [this]() { return new_ == nullptr; });
}
private:
std::mutex client_mu_;
Thread(const Thread&);
Thread& operator=(const Thread&);
TestService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram *new_;
Histogram histogram_;
std::thread impl_;
};
static void RunServer() {
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
ClientImpl service;
ServerBuilder builder;
builder.AddPort(server_address);
builder.RegisterService(&service);
gpr_free(server_address);
class ClientChannelInfo {
public:
explicit ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface *get_channel() { return channel_.get(); }
TestService::Stub *get_stub() { return stub_.get(); }
auto server = builder.BuildAndStart();
private:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
};
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousClient(config));
}
}
int main(int argc, char **argv) {
signal(SIGINT, sigint_handler);
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
RunServer();
grpc_shutdown();
return 0;
}
} // namespace testing
} // namespace grpc

@ -42,6 +42,8 @@ namespace testing {
class Client {
public:
virtual ~Client() {}
virtual ClientStats Mark() = 0;
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);

@ -48,21 +48,9 @@ using std::list;
using std::thread;
using std::unique_ptr;
using std::vector;
using grpc::string;
using grpc::ChannelArguments;
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using grpc::CreateChannelDeprecated;
using grpc::Status;
using grpc::testing::ClientArgs;
using grpc::testing::ClientConfig;
using grpc::testing::ClientResult;
using grpc::testing::Worker;
using grpc::testing::ServerArgs;
using grpc::testing::ServerConfig;
using grpc::testing::ServerStatus;
#if 0
namespace grpc {
namespace testing {
static vector<string> get_hosts(const string& name) {
char* env = gpr_getenv(name.c_str());
if (!env) return vector<string>();
@ -82,7 +70,7 @@ static vector<string> get_hosts(const string& name) {
}
}
void RunScenario(const ClientConfig& client_config, size_t num_clients,
void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers) {
// ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts;
@ -93,96 +81,116 @@ void RunScenario(const ClientConfig& client_config, size_t num_clients,
// Get client, server lists
auto workers = get_hosts("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
GPR_ASSERT(clients.size() >= num_clients);
GPR_ASSERT(servers.size() >= num_servers);
// TODO(ctiller): support running multiple configurations, and binpack client/server pairs
// to available workers
GPR_ASSERT(workers.size() >= num_clients + num_servers);
// Trim to just what we need
clients.resize(num_clients);
servers.resize(num_servers);
workers.resize(num_clients + num_servers);
// Start servers
vector<unique_ptr<QpsServer::Stub>> server_stubs;
vector<unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>> server_streams;
vector<string> server_targets;
for (const auto& target : servers) {
server_stubs.push_back(QpsServer::NewStub(CreateChannelDeprecated(target, ChannelArguments())));
auto* stub = server_stubs.back().get();
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
vector<ServerData> servers;
for (size_t i = 0; i < num_servers; i++) {
ServerData sd;
sd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i], ChannelArguments())));
ServerArgs args;
*args.mutable_config() = server_config;
server_streams.push_back(stub->RunServer(alloc_context()));
auto* stream = server_streams.back().get();
if (!stream->Write(args)) {
gpr_log(GPR_ERROR, "Failed starting server");
return;
}
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
GPR_ASSERT(sd.stream->Write(args));
ServerStatus init_status;
if (!stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Failed starting server");
return;
}
GPR_ASSERT(sd.stream->Read(&init_status));
char* host;
char* driver_port;
char* cli_target;
gpr_split_host_port(target.c_str(), &host, &driver_port);
gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
gpr_join_host_port(&cli_target, host, init_status.port());
server_targets.push_back(cli_target);
client_config.add_server_targets(cli_target);
gpr_free(host);
gpr_free(driver_port);
gpr_free(cli_target);
servers.push_back(std::move(sd));
}
// Start clients
class Client {
public:
Client(ClientContext* ctx, const string& target, const ClientArgs& args)
: thread_([ctx, target, args, this]() {
auto stub = QpsClient::NewStub(CreateChannelDeprecated(target, ChannelArguments()));
status_ = stub->RunTest(ctx, args, &result_);
}) {}
~Client() { join(); }
void join() { if (!joined_) { thread_.join(); joined_ = true; } }
const Status& status() const { return status_; }
const ClientResult& result() const { return result_; }
private:
bool joined_ = false;
Status status_;
ClientResult result_;
thread thread_;
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
list<Client> running_clients;
size_t svr_idx = 0;
for (const auto& target : clients) {
vector<ClientData> clients;
for (size_t i = 0; i < num_clients; i++) {
ClientData cd;
cd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i + num_servers], ChannelArguments())));
ClientArgs args;
*args.mutable_config() = client_config;
for (size_t i = 0; i < num_servers; i++) {
args.add_server_targets(server_targets[svr_idx]);
svr_idx = (svr_idx + 1) % num_servers;
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
GPR_ASSERT(cd.stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(cd.stream->Read(&init_status));
clients.push_back(std::move(cd));
}
running_clients.emplace_back(alloc_context(), target, args);
// Let everything warmup
gpr_log(GPR_INFO, "Warming up");
gpr_timespec start = gpr_now();
gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(5)));
// Start a run
gpr_log(GPR_INFO, "Starting");
ServerArgs server_mark;
server_mark.mutable_mark();
ClientArgs client_mark;
client_mark.mutable_mark();
for (auto& server : servers) {
GPR_ASSERT(server.stream->Write(server_mark));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Write(client_mark));
}
ServerStatus server_status;
ClientStatus client_status;
for (auto& server : servers) {
GPR_ASSERT(server.stream->Read(&server_status));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Read(&client_status));
}
// Finish clients
for (auto& client : running_clients) {
client.join();
if (!client.status().IsOk()) {
gpr_log(GPR_ERROR, "Client failed");
return;
// Wait some time
gpr_log(GPR_INFO, "Running");
gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(15)));
// Finish a run
gpr_log(GPR_INFO, "Finishing");
for (auto& server : servers) {
GPR_ASSERT(server.stream->Write(server_mark));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Write(client_mark));
}
for (auto& server : servers) {
GPR_ASSERT(server.stream->Read(&server_status));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Read(&client_status));
}
// Finish servers
for (auto& stream : server_streams) {
ServerStatus final_status;
ServerStatus dummy;
if (!stream->WritesDone() || !stream->Read(&final_status) || stream->Read(&dummy) || !stream->Finish().IsOk()) {
gpr_log(GPR_ERROR, "Server protocol error");
for (auto& client : clients) {
GPR_ASSERT(client.stream->WritesDone());
GPR_ASSERT(client.stream->Finish().IsOk());
}
for (auto& server : servers) {
GPR_ASSERT(server.stream->WritesDone());
GPR_ASSERT(server.stream->Finish().IsOk());
}
}
}
}
#endif

@ -36,7 +36,11 @@
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
namespace testing {
void RunScenario(const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers);
}
}
#endif

@ -0,0 +1,63 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef TEST_QPS_HISTOGRAM_H
#define TEST_QPS_HISTOGRAM_H
#include <grpc/support/histogram.h>
namespace grpc {
namespace testing {
class Histogram {
public:
Histogram() : impl_(gpr_histogram_create(0.01, 60e9)) {}
~Histogram() { gpr_histogram_destroy(impl_); }
void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) { return gpr_histogram_percentile(impl_, pctile); }
double Count() { return gpr_histogram_count(impl_); }
void Swap(Histogram* other) { std::swap(impl_, other->impl_); }
private:
Histogram(const Histogram&);
Histogram& operator=(const Histogram&);
gpr_histogram* impl_;
};
}
}
#endif /* TEST_QPS_HISTOGRAM_H */

@ -32,6 +32,7 @@
*/
#include <gflags/gflags.h>
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
@ -43,15 +44,18 @@ DEFINE_bool(enable_ssl, false, "Use SSL");
// Server config
DEFINE_int32(server_threads, 1, "Number of server threads");
DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
// Client config
DEFINE_int32(client_threads, 1, "Number of client threads");
DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(num_rpcs, 10000, "Number of rpcs per client thread");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
using grpc::testing::ServerType;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
@ -64,14 +68,20 @@ int main(int argc, char **argv) {
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
ClientType client_type;
ServerType server_type;
GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
ClientConfig client_config;
client_config.set_client_type(client_type);
client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_client_threads(FLAGS_client_threads);
client_config.set_outstanding_rpcs_per_channel(FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
client_config.set_num_rpcs(FLAGS_num_rpcs);
client_config.set_payload_size(FLAGS_payload_size);
ServerConfig server_config;
server_config.set_server_type(server_type);
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);

@ -89,30 +89,21 @@ message ClientConfig {
repeated string server_targets = 1;
required ClientType client_type = 2;
required bool enable_ssl = 3;
required int32 client_threads = 4;
// We have a configurable number of channels for sending RPCs.
// RPCs are sent round-robin on the available channels by the
// various threads. Interesting cases are 1 global channel or
// 1 per-thread channel, but we can support any number.
// The channels are assigned round-robin on an RPC by RPC basis
// rather than just at initialization time in order to also measure the
// impact of cache thrashing caused by channel changes. This is an issue
// if you are not in one of the above "interesting cases"
required int32 outstanding_rpcs_per_channel = 4;
required int32 client_channels = 5;
required int32 num_rpcs = 6;
required int32 payload_size = 7;
required int32 payload_size = 6;
}
message ClientStart {}
message Mark {}
message ClientArgs {
oneof argtype {
ClientConfig setup = 1;
ClientStart start = 2;
Mark mark = 2;
}
}
message ClientResult {
message ClientStats {
required Latencies latencies = 1;
required int32 num_rpcs = 2;
required double time_elapsed = 3;
@ -121,7 +112,7 @@ message ClientResult {
}
message ClientStatus {
optional ClientResult result = 1;
optional ClientStats stats = 1;
}
message ServerConfig {
@ -131,7 +122,10 @@ message ServerConfig {
}
message ServerArgs {
required ServerConfig config = 1;
oneof argtype {
ServerConfig setup = 1;
Mark mark = 2;
}
}
message ServerStatus {

@ -48,42 +48,14 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
DEFINE_int32(port, 0, "Server port.");
DEFINE_int32(driver_port, 0, "Server driver port.");
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReaderWriter;
using grpc::ThreadPool;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::ServerStats;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
using grpc::testing::QpsServer;
using grpc::testing::ServerArgs;
using grpc::testing::ServerStats;
using grpc::testing::ServerStatus;
using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
using namespace google;
using namespace gflags;
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
namespace grpc {
namespace testing {
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
@ -97,8 +69,6 @@ static bool SetPayload(PayloadType type, int size, Payload* payload) {
return true;
}
namespace {
class TestServiceImpl GRPC_FINAL : public TestService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
@ -113,88 +83,46 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
}
};
} // namespace
class ServerImpl : public QpsServer::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) return Status::OK;
std::lock_guard<std::mutex> lock(server_mu_);
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_port);
TestServiceImpl service;
ServerBuilder builder;
builder.AddPort(server_address);
builder.RegisterService(&service);
std::unique_ptr<ThreadPool> pool(new ThreadPool(args.config().threads()));
builder.SetThreadPool(pool.get());
auto server = builder.BuildAndStart();
gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
gpr_free(server_address);
ServerStatus status;
status.set_port(FLAGS_port);
if (!stream->Write(status)) return Status(grpc::UNKNOWN);
SynchronousServer(const ServerConfig& config, int port) : thread_pool_(config.threads()), impl_(MakeImpl(port)), timer_(new Timer) {}
grpc_profiler_start("qps_server.prof");
Timer timer;
ServerStats Mark() GRPC_OVERRIDE {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
if (stream->Read(&args)) {
gpr_log(GPR_ERROR, "Got a server request, but not expecting one");
return Status(grpc::UNKNOWN);
}
auto timer_result = timer.Mark();
grpc_profiler_stop();
auto timer_result = timer->Mark();
auto* stats = status.mutable_stats();
stats->set_time_elapsed(timer_result.wall);
stats->set_time_system(timer_result.system);
stats->set_time_user(timer_result.user);
stream->Write(status);
return Status::OK;
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
private:
std::mutex server_mu_;
};
std::unique_ptr<grpc::Server> MakeImpl(int port) {
ServerBuilder builder;
static void RunServer() {
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
ServerImpl service;
ServerBuilder builder;
gpr_join_host_port(&server_address, "::", port);
builder.AddPort(server_address);
builder.RegisterService(&service);
gpr_free(server_address);
auto server = builder.BuildAndStart();
builder.RegisterService(&service_);
while (!got_sigint) {
sleep(5);
return builder.BuildAndStart();
}
}
int main(int argc, char** argv) {
signal(SIGINT, sigint_handler);
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
GPR_ASSERT(FLAGS_port != 0);
RunServer();
TestServiceImpl service_;
ThreadPool thread_pool_;
std::unique_ptr<grpc::Server> impl_;
std::unique_ptr<Timer> timer_;
};
grpc_shutdown();
return 0;
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new SynchronousServer(config, port));
}
} // namespace testing
} // namespace grpc

@ -42,6 +42,8 @@ namespace testing {
class Server {
public:
virtual ~Server() {}
virtual ServerStats Mark() = 0;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, int port);

@ -6,25 +6,23 @@ set -ex
cd $(dirname $0)/../../..
killall qps_server qps_client || true
killall qps_worker || true
config=opt
NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'`
make CONFIG=$config qps_client qps_server qps_driver -j$NUMCPUS
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
bins/$config/qps_server -driver_port 10000 -port 10002 &
SERVER_PID=$!
bins/$config/qps_client -driver_port 10001 &
CLIENT_PID=$!
bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
PID1=$!
bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
PID2=$!
export QPS_SERVERS=localhost:10000
export QPS_CLIENTS=localhost:10001
export QPS_WORKERS="localhost:10000,localhost:10010"
bins/$config/qps_driver $*
kill -2 $CLIENT_PID
kill -2 $SERVER_PID
kill -2 $PID1 $PID2
wait

@ -35,9 +35,15 @@
#include <sys/time.h>
#include <sys/resource.h>
#include <grpc/support/time.h>
Timer::Timer() : start_(Sample()) {}
double Timer::Now() {
auto ts = gpr_now();
return ts.tv_sec + 1e-9 * ts.tv_nsec;
}
static double time_double(struct timeval* tv) {
return tv->tv_sec + 1e-6 * tv->tv_usec;
}

@ -46,6 +46,8 @@ class Timer {
Result Mark();
static double Now();
private:
static Result Sample();

@ -78,7 +78,7 @@ namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT: return CreateSynchronousClient(config);
case ClientType::ASYNC_CLIENT: return CreateAsyncClient(config);
case ClientType::ASYNC_CLIENT: abort(); //return CreateAsyncClient(config);
}
abort();
}
@ -86,7 +86,7 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER: return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER: return CreateAsyncServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER: abort(); //return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}
@ -112,6 +112,17 @@ class WorkerImpl final : public Worker::Service {
if (!client) {
return Status(INVALID_ARGUMENT);
}
ClientStatus status;
if (!stream->Write(status)) {
return Status(UNKNOWN);
}
while (stream->Read(&args)) {
if (!args.has_mark()) {
return Status(INVALID_ARGUMENT);
}
*status.mutable_stats() = client->Mark();
stream->Write(status);
}
return Status::OK;
}
@ -126,13 +137,25 @@ class WorkerImpl final : public Worker::Service {
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
}
if (!args.has_config()) {
if (!args.has_setup()) {
return Status(INVALID_ARGUMENT);
}
auto server = CreateServer(args.config());
auto server = CreateServer(args.setup());
if (!server) {
return Status(INVALID_ARGUMENT);
}
ServerStatus status;
status.set_port(FLAGS_server_port);
if (!stream->Write(status)) {
return Status(UNKNOWN);
}
while (stream->Read(&args)) {
if (!args.has_mark()) {
return Status(INVALID_ARGUMENT);
}
*status.mutable_stats() = server->Mark();
stream->Write(status);
}
return Status::OK;
}

Loading…
Cancel
Save