pull/837/head
Craig Tiller 10 years ago
parent 6af9ed0bf7
commit 10923c2fb5
  1. 61
      test/cpp/qps/client.cc
  2. 10
      test/cpp/qps/driver.cc
  3. 6
      test/cpp/qps/driver.h
  4. 7
      test/cpp/qps/histogram.h
  5. 14
      test/cpp/qps/qps_driver.cc
  6. 8
      test/cpp/qps/server.cc
  7. 3
      test/cpp/qps/server.h
  8. 8
      test/cpp/qps/timer.h
  9. 123
      test/cpp/qps/worker.cc

@ -65,7 +65,8 @@ class SynchronousClient GRPC_FINAL : public Client {
public:
SynchronousClient(const ClientConfig& config) : timer_(new Timer) {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(config.server_targets(i % config.server_targets_size()), config));
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
auto* stub = channels_.back().get_stub();
for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) {
threads_.emplace_back(new Thread(stub, config));
@ -104,29 +105,32 @@ class SynchronousClient GRPC_FINAL : public Client {
private:
class Thread {
public:
Thread(TestService::Stub* stub, const ClientConfig& config) : stub_(stub), config_(config), done_(false), new_(nullptr), impl_([this]() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(
grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(config_.payload_size());
for (;;) {
{
std::lock_guard<std::mutex> g(mu_);
if (done_) return;
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s =
stub_->UnaryCall(&context, request, &response);
histogram_.Add((Timer::Now() - start) * 1e9);
}
}) {}
Thread(TestService::Stub* stub, const ClientConfig& config)
: stub_(stub),
config_(config),
done_(false),
new_(nullptr),
impl_([this]() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(config_.payload_size());
for (;;) {
{
std::lock_guard<std::mutex> g(mu_);
if (done_) return;
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s = stub_->UnaryCall(&context, request, &response);
histogram_.Add((Timer::Now() - start) * 1e9);
}
}) {}
~Thread() {
{
@ -155,18 +159,19 @@ class SynchronousClient GRPC_FINAL : public Client {
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram *new_;
Histogram* new_;
Histogram histogram_;
std::thread impl_;
};
class ClientChannelInfo {
public:
explicit ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
explicit ClientChannelInfo(const grpc::string& target,
const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface *get_channel() { return channel_.get(); }
TestService::Stub *get_stub() { return stub_.get(); }
ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<ChannelInterface> channel_;

@ -83,7 +83,8 @@ void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
auto workers = get_hosts("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
// TODO(ctiller): support running multiple configurations, and binpack client/server pairs
// TODO(ctiller): support running multiple configurations, and binpack
// client/server pairs
// to available workers
GPR_ASSERT(workers.size() >= num_clients + num_servers);
@ -98,7 +99,8 @@ void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
vector<ServerData> servers;
for (size_t i = 0; i < num_servers; i++) {
ServerData sd;
sd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i], ChannelArguments())));
sd.stub = std::move(Worker::NewStub(
CreateChannelDeprecated(workers[i], ChannelArguments())));
ServerArgs args;
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
@ -126,7 +128,8 @@ void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
vector<ClientData> clients;
for (size_t i = 0; i < num_clients; i++) {
ClientData cd;
cd.stub = std::move(Worker::NewStub(CreateChannelDeprecated(workers[i + num_servers], ChannelArguments())));
cd.stub = std::move(Worker::NewStub(
CreateChannelDeprecated(workers[i + num_servers], ChannelArguments())));
ClientArgs args;
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
@ -191,6 +194,5 @@ void RunScenario(const ClientConfig& initial_client_config, size_t num_clients,
GPR_ASSERT(server.stream->Finish().IsOk());
}
}
}
}

@ -38,8 +38,10 @@
namespace grpc {
namespace testing {
void RunScenario(const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers);
void RunScenario(const grpc::testing::ClientConfig& client_config,
size_t num_clients,
const grpc::testing::ServerConfig& server_config,
size_t num_servers);
}
}

@ -46,7 +46,9 @@ class Histogram {
void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) { return gpr_histogram_percentile(impl_, pctile); }
double Percentile(double pctile) {
return gpr_histogram_percentile(impl_, pctile);
}
double Count() { return gpr_histogram_count(impl_); }
void Swap(Histogram* other) { std::swap(impl_, other->impl_); }
@ -56,8 +58,7 @@ class Histogram {
gpr_histogram* impl_;
};
}
}
#endif /* TEST_QPS_HISTOGRAM_H */
#endif /* TEST_QPS_HISTOGRAM_H */

@ -47,7 +47,8 @@ DEFINE_int32(server_threads, 1, "Number of server threads");
DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel");
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
@ -59,8 +60,8 @@ using grpc::testing::ServerType;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
@ -76,7 +77,8 @@ int main(int argc, char **argv) {
ClientConfig client_config;
client_config.set_client_type(client_type);
client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_outstanding_rpcs_per_channel(FLAGS_outstanding_rpcs_per_channel);
client_config.set_outstanding_rpcs_per_channel(
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
client_config.set_payload_size(FLAGS_payload_size);
@ -85,9 +87,9 @@ int main(int argc, char **argv) {
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);
RunScenario(client_config, FLAGS_num_clients, server_config, FLAGS_num_servers);
RunScenario(client_config, FLAGS_num_clients, server_config,
FLAGS_num_servers);
grpc_shutdown();
return 0;
}

@ -85,7 +85,10 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port) : thread_pool_(config.threads()), impl_(MakeImpl(port)), timer_(new Timer) {}
SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()),
impl_(MakeImpl(port)),
timer_(new Timer) {}
ServerStats Mark() GRPC_OVERRIDE {
std::unique_ptr<Timer> timer(new Timer);
@ -120,7 +123,8 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(const ServerConfig& config, int port) {
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new SynchronousServer(config, port));
}

@ -46,7 +46,8 @@ class Server {
virtual ServerStats Mark() = 0;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, int port);
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
int port);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
} // namespace testing

@ -39,9 +39,9 @@ class Timer {
Timer();
struct Result {
double wall;
double user;
double system;
double wall;
double user;
double system;
};
Result Mark();
@ -54,4 +54,4 @@ class Timer {
const Result start_;
};
#endif // TEST_QPS_TIMER_H
#endif // TEST_QPS_TIMER_H

@ -63,8 +63,8 @@ DEFINE_int32(server_port, 0, "Spawned server port.");
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
@ -77,16 +77,20 @@ namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT: return CreateSynchronousClient(config);
case ClientType::ASYNC_CLIENT: abort(); //return CreateAsyncClient(config);
case ClientType::SYNCHRONOUS_CLIENT:
return CreateSynchronousClient(config);
case ClientType::ASYNC_CLIENT:
abort(); // return CreateAsyncClient(config);
}
abort();
}
std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER: return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER: abort(); //return CreateAsyncServer(config, FLAGS_server_port);
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER:
abort(); // return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}
@ -95,23 +99,25 @@ class WorkerImpl final : public Worker::Service {
public:
WorkerImpl() : acquired_(false) {}
Status RunTest(ServerContext* ctx, ServerReaderWriter<ClientStatus, ClientArgs>* stream) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(RESOURCE_EXHAUSTED);
}
ClientArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
}
if (!args.has_setup()) {
return Status(INVALID_ARGUMENT);
}
auto client = CreateClient(args.setup());
if (!client) {
return Status(INVALID_ARGUMENT);
}
Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(RESOURCE_EXHAUSTED);
}
ClientArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
}
if (!args.has_setup()) {
return Status(INVALID_ARGUMENT);
}
auto client = CreateClient(args.setup());
if (!client) {
return Status(INVALID_ARGUMENT);
}
ClientStatus status;
if (!stream->Write(status)) {
return Status(UNKNOWN);
@ -127,23 +133,25 @@ class WorkerImpl final : public Worker::Service {
return Status::OK;
}
Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(RESOURCE_EXHAUSTED);
}
ServerArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
}
if (!args.has_setup()) {
return Status(INVALID_ARGUMENT);
}
auto server = CreateServer(args.setup());
if (!server) {
return Status(INVALID_ARGUMENT);
}
Status RunServer(ServerContext* ctx,
ServerReaderWriter<ServerStatus, ServerArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(RESOURCE_EXHAUSTED);
}
ServerArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
}
if (!args.has_setup()) {
return Status(INVALID_ARGUMENT);
}
auto server = CreateServer(args.setup());
if (!server) {
return Status(INVALID_ARGUMENT);
}
ServerStatus status;
status.set_port(FLAGS_server_port);
if (!stream->Write(status)) {
@ -163,27 +171,32 @@ class WorkerImpl final : public Worker::Service {
private:
class InstanceGuard {
public:
InstanceGuard(WorkerImpl* impl) : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() { if (acquired_) { impl_->ReleaseInstance(); } }
InstanceGuard(WorkerImpl* impl)
: impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() {
if (acquired_) {
impl_->ReleaseInstance();
}
}
bool Acquired() const { return acquired_; }
bool Acquired() const { return acquired_; }
private:
WorkerImpl* const impl_;
const bool acquired_;
WorkerImpl* const impl_;
const bool acquired_;
};
bool TryAcquireInstance() {
std::lock_guard<std::mutex> g(mu_);
if (acquired_) return false;
acquired_ = true;
return true;
std::lock_guard<std::mutex> g(mu_);
if (acquired_) return false;
acquired_ = true;
return true;
}
void ReleaseInstance() {
std::lock_guard<std::mutex> g(mu_);
GPR_ASSERT(acquired_);
acquired_ = false;
std::lock_guard<std::mutex> g(mu_);
GPR_ASSERT(acquired_);
acquired_ = false;
}
std::mutex mu_;
@ -209,10 +222,10 @@ static void RunServer() {
}
}
} // namespace testing
} // namespace grpc
} // namespace testing
} // namespace grpc
int main(int argc, char **argv) {
int main(int argc, char** argv) {
signal(SIGINT, sigint_handler);
grpc_init();

Loading…
Cancel
Save