Merge github.com:grpc/grpc into new_op

reviewable/pr3993/r7
Craig Tiller 9 years ago
commit 61ed001585
  1. 140
      Makefile
  2. 23
      build.yaml
  3. 2
      include/grpc/support/histogram.h
  4. 2
      src/core/support/histogram.c
  5. 70
      test/core/network_benchmarks/low_level_ping_pong.c
  6. 4
      test/core/surface/byte_buffer_reader_test.c
  7. 9
      test/cpp/qps/async_streaming_ping_pong_test.cc
  8. 9
      test/cpp/qps/async_unary_ping_pong_test.cc
  9. 154
      test/cpp/qps/client.h
  10. 41
      test/cpp/qps/client_async.cc
  11. 2
      test/cpp/qps/client_sync.cc
  12. 27
      test/cpp/qps/driver.cc
  13. 8
      test/cpp/qps/driver.h
  14. 4
      test/cpp/qps/histogram.h
  15. 2
      test/cpp/qps/perf_db.proto
  16. 18
      test/cpp/qps/qps-sweep.sh
  17. 119
      test/cpp/qps/qps_driver.cc
  18. 2
      test/cpp/qps/qps_interarrival_test.cc
  19. 9
      test/cpp/qps/qps_openloop_test.cc
  20. 9
      test/cpp/qps/qps_test.cc
  21. 9
      test/cpp/qps/qps_test_with_poll.cc
  22. 54
      test/cpp/qps/qps_worker.cc
  23. 6
      test/cpp/qps/qps_worker.h
  24. 7
      test/cpp/qps/report.cc
  25. 1
      test/cpp/qps/report.h
  26. 84
      test/cpp/qps/secure_sync_unary_ping_pong_test.cc
  27. 52
      test/cpp/qps/server.h
  28. 35
      test/cpp/qps/server_async.cc
  29. 25
      test/cpp/qps/server_sync.cc
  30. 4
      test/cpp/qps/single_run_localhost.sh
  31. 12
      test/cpp/qps/sync_streaming_ping_pong_test.cc
  32. 12
      test/cpp/qps/sync_unary_ping_pong_test.cc
  33. 2
      test/cpp/qps/timer.cc
  34. 2
      test/cpp/qps/timer.h
  35. 5
      test/cpp/qps/worker.cc
  36. 148
      test/proto/benchmarks/control.proto
  37. 55
      test/proto/benchmarks/payloads.proto
  38. 55
      test/proto/benchmarks/services.proto
  39. 59
      test/proto/benchmarks/stats.proto
  40. 29
      tools/run_tests/sources_and_headers.json
  41. 16
      tools/run_tests/tests.json
  42. 40
      vsprojects/vcxproj/qps/qps.vcxproj
  43. 17
      vsprojects/vcxproj/qps/qps.vcxproj.filters

File diff suppressed because one or more lines are too long

@ -753,7 +753,11 @@ libs:
- test/cpp/qps/timer.h
- test/cpp/util/benchmark_config.h
src:
- test/proto/qpstest.proto
- test/proto/messages.proto
- test/proto/benchmarks/control.proto
- test/proto/benchmarks/payloads.proto
- test/proto/benchmarks/services.proto
- test/proto/benchmarks/stats.proto
- test/cpp/qps/perf_db.proto
- test/cpp/qps/client_async.cc
- test/cpp/qps/client_sync.cc
@ -2030,6 +2034,23 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: secure_sync_unary_ping_pong_test
build: test
language: c++
src:
- test/cpp/qps/secure_sync_unary_ping_pong_test.cc
deps:
- qps
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
platforms:
- mac
- linux
- posix
- name: server_crash_test
build: test
language: c++

@ -50,7 +50,7 @@ void gpr_histogram_add(gpr_histogram *h, double x);
/* The following merges the second histogram into the first. It only works
if they have the same buckets and resolution. Returns 0 on failure, 1
on success */
int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src);
int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src);
double gpr_histogram_percentile(gpr_histogram *histogram, double percentile);
double gpr_histogram_mean(gpr_histogram *histogram);

@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) {
h->buckets[bucket_for(h, x)]++;
}
int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) {
int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) {
if ((dst->num_buckets != src->num_buckets) ||
(dst->multiplier != src->multiplier)) {
/* Fail because these histograms don't match */

@ -82,9 +82,9 @@ typedef struct thread_args {
/* Basic call to read() */
static int read_bytes(int fd, char *buf, size_t read_size, int spin) {
size_t bytes_read = 0;
int err;
ssize_t err;
do {
err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
err = read(fd, buf + bytes_read, read_size - bytes_read);
if (err < 0) {
if (errno == EINTR) {
continue;
@ -115,6 +115,7 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
struct pollfd pfd;
size_t bytes_read = 0;
int err;
ssize_t err2;
pfd.fd = fd;
pfd.events = POLLIN;
@ -132,13 +133,13 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
GPR_ASSERT(err == 1);
GPR_ASSERT(pfd.revents == POLLIN);
do {
err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
} while (err < 0 && errno == EINTR);
if (err < 0 && errno != EAGAIN) {
err2 = read(fd, buf + bytes_read, read_size - bytes_read);
} while (err2 < 0 && errno == EINTR);
if (err2 < 0 && errno != EAGAIN) {
gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
return -1;
}
bytes_read += (size_t)err;
bytes_read += (size_t) err2;
} while (bytes_read < read_size);
return 0;
}
@ -157,6 +158,7 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
struct epoll_event ev;
size_t bytes_read = 0;
int err;
ssize_t err2;
size_t read_size = args->msg_size;
do {
@ -172,11 +174,11 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
GPR_ASSERT(ev.data.fd == args->fds.read_fd);
do {
do {
err = (int)read(args->fds.read_fd, buf + bytes_read,
read_size - bytes_read);
} while (err < 0 && errno == EINTR);
err2 = read(args->fds.read_fd, buf + bytes_read,
read_size - bytes_read);
} while (err2 < 0 && errno == EINTR);
if (errno == EAGAIN) break;
bytes_read += (size_t)err;
bytes_read += (size_t) err2;
/* TODO(klempner): This should really be doing an extra call after we are
done to ensure we see an EAGAIN */
} while (bytes_read < read_size);
@ -200,11 +202,11 @@ static int epoll_read_bytes_spin(struct thread_args *args, char *buf) {
*/
static int blocking_write_bytes(struct thread_args *args, char *buf) {
size_t bytes_written = 0;
int err;
ssize_t err;
size_t write_size = args->msg_size;
do {
err = (int)write(args->fds.write_fd, buf + bytes_written,
write_size - bytes_written);
err = write(args->fds.write_fd, buf + bytes_written,
write_size - bytes_written);
if (err < 0) {
if (errno == EINTR) {
continue;
@ -298,7 +300,7 @@ static void print_histogram(gpr_histogram *histogram) {
static double now(void) {
gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
return 1e9 * (double)tv.tv_sec + tv.tv_nsec;
return 1e9 * (double)tv.tv_sec + (double)tv.tv_nsec;
}
static void client_thread(thread_args *args) {
@ -374,7 +376,7 @@ error:
return -1;
}
static int connect_client(struct sockaddr *addr, int len) {
static int connect_client(struct sockaddr *addr, socklen_t len) {
int fd = socket(addr->sa_family, SOCK_STREAM, 0);
int err;
if (fd < 0) {
@ -388,7 +390,7 @@ static int connect_client(struct sockaddr *addr, int len) {
}
do {
err = (int)connect(fd, addr, (socklen_t)len);
err = connect(fd, addr, len);
} while (err < 0 && errno == EINTR);
if (err < 0) {
@ -587,27 +589,27 @@ static int run_benchmark(char *socket_type, thread_args *client_args,
return 0;
}
static int run_all_benchmarks(int msg_size) {
static int run_all_benchmarks(size_t msg_size) {
int error = 0;
size_t i;
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
test_strategy *ts = &test_strategies[i];
test_strategy *strategy = &test_strategies[i];
size_t j;
for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
thread_args *client_args = malloc(sizeof(thread_args));
thread_args *server_args = malloc(sizeof(thread_args));
char *socket_type = socket_types[j];
client_args->read_bytes = ts->read_strategy;
client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
client_args->setup = ts->setup;
client_args->msg_size = (size_t)msg_size;
client_args->strategy_name = ts->name;
server_args->read_bytes = ts->read_strategy;
client_args->setup = strategy->setup;
client_args->msg_size = msg_size;
client_args->strategy_name = strategy->name;
server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
server_args->setup = ts->setup;
server_args->msg_size = (size_t)msg_size;
server_args->strategy_name = ts->name;
server_args->setup = strategy->setup;
server_args->msg_size = msg_size;
server_args->strategy_name = strategy->name;
error = run_benchmark(socket_type, client_args, server_args);
if (error < 0) {
return error;
@ -624,7 +626,7 @@ int main(int argc, char **argv) {
char *read_strategy = NULL;
char *socket_type = NULL;
size_t i;
const test_strategy *ts = NULL;
const test_strategy *strategy = NULL;
int error = 0;
gpr_cmdline *cmdline =
@ -644,7 +646,7 @@ int main(int argc, char **argv) {
if (read_strategy == NULL) {
gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
return run_all_benchmarks(msg_size);
return run_all_benchmarks((size_t)msg_size);
}
if (socket_type == NULL) {
@ -658,22 +660,22 @@ int main(int argc, char **argv) {
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
if (strcmp(test_strategies[i].name, read_strategy) == 0) {
ts = &test_strategies[i];
strategy = &test_strategies[i];
}
}
if (ts == NULL) {
if (strategy == NULL) {
fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
return -1;
}
client_args->read_bytes = ts->read_strategy;
client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
client_args->setup = ts->setup;
client_args->setup = strategy->setup;
client_args->msg_size = (size_t)msg_size;
client_args->strategy_name = read_strategy;
server_args->read_bytes = ts->read_strategy;
server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
server_args->setup = ts->setup;
server_args->setup = strategy->setup;
server_args->msg_size = (size_t)msg_size;
server_args->strategy_name = read_strategy;

@ -185,8 +185,8 @@ static void test_byte_buffer_from_reader(void) {
}
static void test_readall(void) {
const char* lotsa_as[512];
const char* lotsa_bs[1024];
char* lotsa_as[512];
char* lotsa_bs[1024];
gpr_slice slices[2];
grpc_byte_buffer *buffer;
grpc_byte_buffer_reader reader;

@ -35,8 +35,6 @@
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@ -52,17 +50,15 @@ static void RunAsyncStreamingPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -77,7 +73,6 @@ static void RunAsyncStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
return 0;
}

@ -35,8 +35,6 @@
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@ -52,17 +50,15 @@ static void RunAsyncUnaryPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -75,7 +71,6 @@ static void RunAsyncUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncUnaryPingPong();
return 0;

@ -40,8 +40,9 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/payloads.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
@ -75,27 +76,54 @@ class Client {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size());
if (config.payload_config().has_bytebuf_params()) {
GPR_ASSERT(false); // not yet implemented
} else if (config.payload_config().has_simple_params()) {
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(
config.payload_config().simple_params().resp_size());
request_.mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
int size = config.payload_config().simple_params().req_size();
std::unique_ptr<char[]> body(new char[size]);
request_.mutable_payload()->set_body(body.get(), size);
} else if (config.payload_config().has_complex_params()) {
GPR_ASSERT(false); // not yet implemented
} else {
// default should be simple proto without payloads
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(0);
request_.mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
}
}
virtual ~Client() {}
ClientStats Mark() {
ClientStats Mark(bool reset) {
Histogram latencies;
Timer::Result timer_result;
// avoid std::vector for old compilers that expect a copy constructor
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(to_merge[i]);
}
delete[] to_merge;
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->MergeStatsInto(&latencies);
}
timer_result = timer_->Mark();
}
delete[] to_merge;
auto timer_result = timer->Mark();
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
@ -122,15 +150,18 @@ class Client {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(target, config.enable_ssl());
stub_ = TestService::NewStub(channel_);
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
config.has_security_params(),
!config.security_params().use_test_ca());
stub_ = BenchmarkService::NewStub(channel_);
}
Channel* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
BenchmarkService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<BenchmarkService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
@ -146,37 +177,41 @@ class Client {
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
if (config.load_type() == CLOSED_LOOP) {
const auto& load = config.load_params();
std::unique_ptr<RandomDist> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
break;
case LoadParams::kPoisson:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
case LoadParams::kUniform:
random_dist.reset(
new UniformDist(load.uniform().interarrival_lo() * num_threads,
load.uniform().interarrival_hi() * num_threads));
break;
case LoadParams::kDeterm:
random_dist.reset(
new DetDist(num_threads / load.determ().offered_load()));
break;
case LoadParams::kPareto:
random_dist.reset(
new ParetoDist(load.pareto().interarrival_base() * num_threads,
load.pareto().alpha()));
break;
default:
GPR_ASSERT(false);
}
// Set closed_loop_ based on whether or not random_dist is set
if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
std::unique_ptr<RandomDist> random_dist;
const auto& load = config.load_params();
switch (config.load_type()) {
case POISSON:
random_dist.reset(
new ExpDist(load.poisson().offered_load() / num_threads));
break;
case UNIFORM:
random_dist.reset(
new UniformDist(load.uniform().interarrival_lo() * num_threads,
load.uniform().interarrival_hi() * num_threads));
break;
case DETERMINISTIC:
random_dist.reset(
new DetDist(num_threads / load.determ().offered_load()));
break;
case PARETO:
random_dist.reset(
new ParetoDist(load.pareto().interarrival_base() * num_threads,
load.pareto().alpha()));
break;
default:
GPR_ASSERT(false);
break;
}
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(
@ -204,7 +239,7 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_(nullptr),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@ -219,16 +254,21 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_ = n;
new_stats_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_ != nullptr) {
while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
hist->Merge(histogram_);
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@ -246,21 +286,21 @@ class Client {
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
size_t idx_;

@ -48,10 +48,10 @@
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
@ -88,10 +88,10 @@ template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
int channel_id, TestService::Stub* stub, const RequestType& req,
int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&,
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
@ -131,13 +131,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_;
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&,
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
grpc::Status status_;
double start_;
@ -151,7 +151,7 @@ class AsyncClient : public Client {
public:
explicit AsyncClient(
const ClientConfig& config,
std::function<ClientRpcContext*(int, TestService::Stub*,
std::function<ClientRpcContext*(int, BenchmarkService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
channel_lock_(new std::mutex[config.client_channels()]),
@ -354,11 +354,12 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
static ClientRpcContext* SetupCtx(int channel_id,
BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncUnaryClient::StartReq,
@ -370,10 +371,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
int channel_id, TestService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
CompletionQueue*, void*)> start_req,
int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
@ -420,15 +422,15 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return StartWrite(ok);
}
grpc::ClientContext context_;
TestService::Stub* stub_;
BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
start_req_;
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@ -439,8 +441,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx) {
// async streaming currently only supported closed loop
GPR_ASSERT(config.load_type() == CLOSED_LOOP);
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
StartThreads(config.async_client_threads());
}
@ -451,12 +453,13 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
static ClientRpcContext* SetupCtx(int channel_id,
BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncStreamingClient::StartReq,

@ -54,10 +54,10 @@
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"

@ -48,6 +48,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
using std::list;
using std::thread;
@ -91,12 +92,12 @@ static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
}
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
@ -131,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
local_workers.emplace_back(new QpsWorker(driver_port));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@ -161,11 +161,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
servers[i].stub =
Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
servers[i].stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureCredentials()));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
@ -189,14 +188,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
clients[i].stub = Worker::NewStub(
clients[i].stub = WorkerService::NewStub(
CreateChannel(workers[i + num_servers], InsecureCredentials()));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
clients[i].stream =
clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
@ -211,9 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start a run
gpr_log(GPR_INFO, "Starting");
ServerArgs server_mark;
server_mark.mutable_mark();
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark();
client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
@ -251,14 +249,15 @@ std::unique_ptr<ScenarioResult> RunScenario(
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
stats.time_elapsed(), stats.time_user(), stats.time_system(),
server_status.cores());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {

@ -37,22 +37,24 @@
#include <memory>
#include "test/cpp/qps/histogram.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class ResourceUsage {
public:
ResourceUsage(double w, double u, double s)
: wall_time_(w), user_time_(u), system_time_(s) {}
ResourceUsage(double w, double u, double s, int c)
: wall_time_(w), user_time_(u), system_time_(s), cores_(c) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
int cores() const { return cores_; }
private:
double wall_time_;
double user_time_;
double system_time_;
int cores_;
};
struct ScenarioResult {

@ -35,7 +35,7 @@
#define TEST_QPS_HISTOGRAM_H
#include <grpc/support/histogram.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/benchmarks/stats.grpc.pb.h"
namespace grpc {
namespace testing {
@ -48,7 +48,7 @@ class Histogram {
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) const {
return gpr_histogram_percentile(impl_, pctile);

@ -29,7 +29,7 @@
syntax = "proto3";
import "test/proto/qpstest.proto";
import "test/proto/benchmarks/control.proto";
package grpc.testing;

@ -37,17 +37,21 @@ fi
bins=`find . .. ../.. ../../.. -name bins | head -1`
for channels in 1 2 4 8
for secure in true false
do
for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT
for channels in 1 2 4 8
do
for server in SYNCHRONOUS_SERVER ASYNC_SERVER
for client in SYNC_CLIENT ASYNC_CLIENT
do
for rpc in UNARY STREAMING
for server in SYNC_SERVER ASYNC_SERVER
do
echo "Test $rpc $client $server , $channels channels"
"$bins"/opt/qps_driver --rpc_type=$rpc \
--client_type=$client --server_type=$server
for rpc in UNARY STREAMING
do
echo "Test $rpc $client $server, $channels channels, secure=$secure"
"$bins"/opt/qps_driver --rpc_type=$rpc \
--client_type=$client --server_type=$server \
--secure_test=$secure
done
done
done
done

@ -33,7 +33,6 @@
#include <memory>
#include <set>
#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
@ -50,31 +49,39 @@ DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)");
DEFINE_int32(local_workers, 0, "Number of local workers to start");
// Common config
DEFINE_bool(enable_ssl, false, "Use SSL");
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
// Server config
DEFINE_int32(server_threads, 1, "Number of server threads");
DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
DEFINE_string(server_type, "SYNC_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
DEFINE_int32(simple_req_size, -1, "Simple proto request payload size");
DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size");
DEFINE_string(client_type, "SYNC_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
DEFINE_string(load_type, "CLOSED_LOOP", "Load type");
DEFINE_double(load_param_1, 0.0, "Load parameter 1");
DEFINE_double(load_param_2, 0.0, "Load parameter 2");
DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)");
DEFINE_double(uniform_lo, -1.0, "Uniform low interarrival time (us)");
DEFINE_double(uniform_hi, -1.0, "Uniform high interarrival time (us)");
DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)");
DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)");
DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value");
DEFINE_bool(secure_test, false, "Run a secure test");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
using grpc::testing::ServerType;
using grpc::testing::LoadType;
using grpc::testing::RpcType;
using grpc::testing::ResourceUsage;
using grpc::testing::SecurityParams;
namespace grpc {
namespace testing {
@ -85,72 +92,63 @@ static void QpsDriver() {
ClientType client_type;
ServerType server_type;
LoadType load_type;
GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type));
ClientConfig client_config;
client_config.set_client_type(client_type);
client_config.set_load_type(load_type);
client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_outstanding_rpcs_per_channel(
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
client_config.set_payload_size(FLAGS_payload_size);
// Decide which type to use based on the response type
if (FLAGS_simple_resp_size >= 0) {
auto params =
client_config.mutable_payload_config()->mutable_simple_params();
params->set_resp_size(FLAGS_simple_resp_size);
if (FLAGS_simple_req_size >= 0) {
params->set_req_size(FLAGS_simple_req_size);
}
} else {
// set a reasonable default: proto but no payload
client_config.mutable_payload_config()->mutable_simple_params();
}
client_config.set_async_client_threads(FLAGS_async_client_threads);
client_config.set_rpc_type(rpc_type);
// set up the load parameters
switch (load_type) {
case grpc::testing::CLOSED_LOOP:
break;
case grpc::testing::POISSON: {
auto poisson = client_config.mutable_load_params()->mutable_poisson();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
poisson->set_offered_load(FLAGS_load_param_1);
break;
}
case grpc::testing::UNIFORM: {
auto uniform = client_config.mutable_load_params()->mutable_uniform();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
GPR_ASSERT(FLAGS_load_param_2 != 0.0);
uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6);
uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6);
break;
}
case grpc::testing::DETERMINISTIC: {
auto determ = client_config.mutable_load_params()->mutable_determ();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
determ->set_offered_load(FLAGS_load_param_1);
break;
}
case grpc::testing::PARETO: {
auto pareto = client_config.mutable_load_params()->mutable_pareto();
GPR_ASSERT(FLAGS_load_param_1 != 0.0);
GPR_ASSERT(FLAGS_load_param_2 != 0.0);
pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6);
pareto->set_alpha(FLAGS_load_param_2);
break;
}
default:
GPR_ASSERT(false);
break;
if (FLAGS_poisson_load > 0.0) {
auto poisson = client_config.mutable_load_params()->mutable_poisson();
poisson->set_offered_load(FLAGS_poisson_load);
} else if (FLAGS_uniform_lo > 0.0) {
auto uniform = client_config.mutable_load_params()->mutable_uniform();
uniform->set_interarrival_lo(FLAGS_uniform_lo / 1e6);
uniform->set_interarrival_hi(FLAGS_uniform_hi / 1e6);
} else if (FLAGS_determ_load > 0.0) {
auto determ = client_config.mutable_load_params()->mutable_determ();
determ->set_offered_load(FLAGS_determ_load);
} else if (FLAGS_pareto_base > 0.0) {
auto pareto = client_config.mutable_load_params()->mutable_pareto();
pareto->set_interarrival_base(FLAGS_pareto_base / 1e6);
pareto->set_alpha(FLAGS_pareto_alpha);
} else {
client_config.mutable_load_params()->mutable_closed_loop();
// No further load parameters to set up for closed loop
}
ServerConfig server_config;
server_config.set_server_type(server_type);
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);
// If we're running a sync-server streaming test, make sure
// that we have at least as many threads as the active streams
// or else threads will be blocked from forward progress and the
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads <
FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
server_config.set_async_server_threads(FLAGS_async_server_threads);
if (FLAGS_secure_test) {
// Set up security params
SecurityParams security;
security.set_use_test_ca(true);
security.set_server_host_override("foo.test.google.fr");
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
}
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
@ -168,7 +166,6 @@ static void QpsDriver() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::QpsDriver();
return 0;

@ -42,7 +42,7 @@
using grpc::testing::RandomDist;
using grpc::testing::InterarrivalTimer;
void RunTest(RandomDist &&r, int threads, std::string title) {
static void RunTest(RandomDist &&r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9));

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -52,20 +50,16 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.set_load_type(POISSON);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(4);
server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -80,7 +74,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -52,17 +50,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(8);
server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -77,7 +73,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -56,17 +54,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(4);
server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -83,7 +79,6 @@ int main(int argc, char** argv) {
grpc_platform_become_multipoller = grpc_poll_become_multipoller;
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;

@ -52,17 +52,17 @@
#include <grpc++/security/server_credentials.h>
#include "test/core/util/grpc_profiler.h"
#include "test/proto/qpstest.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.pb.h"
namespace grpc {
namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
? CreateSynchronousUnaryClient(config)
: CreateSynchronousStreamingClient(config);
@ -76,26 +76,29 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
std::unique_ptr<Server> CreateServer(const ServerConfig& config,
int server_port) {
static void LimitCores(int cores) {}
static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
if (config.core_limit() > 0) {
LimitCores(config.core_limit());
}
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port);
case ServerType::SYNC_SERVER:
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config, server_port);
return CreateAsyncServer(config);
default:
abort();
}
abort();
}
class WorkerImpl GRPC_FINAL : public Worker::Service {
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
explicit WorkerImpl(int server_port)
: server_port_(server_port), acquired_(false) {}
explicit WorkerServiceImpl() : acquired_(false) {}
Status RunTest(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
@ -103,7 +106,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
Status ret = RunTestBody(ctx, stream);
Status ret = RunClientBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@ -126,7 +129,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
// Protect against multiple clients using this worker at once.
class InstanceGuard {
public:
InstanceGuard(WorkerImpl* impl)
InstanceGuard(WorkerServiceImpl* impl)
: impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() {
if (acquired_) {
@ -137,7 +140,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
bool Acquired() const { return acquired_; }
private:
WorkerImpl* const impl_;
WorkerServiceImpl* const impl_;
const bool acquired_;
};
@ -154,8 +157,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
acquired_ = false;
}
Status RunTestBody(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
Status RunClientBody(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
@ -175,7 +178,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = client->Mark();
*status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
}
@ -191,12 +194,13 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
auto server = CreateServer(args.setup(), server_port_);
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
ServerStatus status;
status.set_port(server_port_);
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
@ -204,21 +208,19 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = server->Mark();
*status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
}
return Status::OK;
}
const int server_port_;
std::mutex mu_;
bool acquired_;
};
QpsWorker::QpsWorker(int driver_port, int server_port) {
impl_.reset(new WorkerImpl(server_port));
QpsWorker::QpsWorker(int driver_port) {
impl_.reset(new WorkerServiceImpl());
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);

@ -42,15 +42,15 @@ class Server;
namespace testing {
class WorkerImpl;
class WorkerServiceImpl;
class QpsWorker {
public:
QpsWorker(int driver_port, int server_port);
explicit QpsWorker(int driver_port);
~QpsWorker();
private:
std::unique_ptr<WorkerImpl> impl_;
std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_;
};

@ -43,6 +43,7 @@ namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
static int Cores(ResourceUsage u) { return u.cores(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads());
qps / sum(result.server_resources, Cores));
}
void GprLogReporter::ReportLatency(const ScenarioResult& result) {
@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
auto qpsPerCore = qps / result.server_config.threads();
auto qps_per_core = qps / sum(result.server_resources, Cores);
perf_db_client_.setQps(qps);
perf_db_client_.setQpsPerCore(qpsPerCore);
perf_db_client_.setQpsPerCore(qps_per_core);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}

@ -41,7 +41,6 @@
#include <grpc++/support/config.h>
#include "test/cpp/qps/driver.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/perf_db_client.h"
namespace grpc {

@ -0,0 +1,84 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <set>
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
namespace grpc {
namespace testing {
static const int WARMUP = 5;
static const int BENCHMARK = 10;
static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNC_SERVER);
// Set up security params
SecurityParams security;
security.set_use_test_ca(true);
security.set_server_host_override("foo.test.google.fr");
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;
}

@ -34,22 +34,38 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include <grpc/support/cpu.h>
#include <grpc++/security/server_credentials.h>
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/qpstest.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class Server {
public:
Server() : timer_(new Timer) {}
explicit Server(const ServerConfig& config) : timer_(new Timer) {
if (config.port()) {
port_ = config.port();
} else {
port_ = grpc_pick_unused_port_or_die();
}
}
virtual ~Server() {}
ServerStats Mark() {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats Mark(bool reset) {
Timer::Result timer_result;
if (reset) {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
timer_result = timer->Mark();
} else {
timer_result = timer_->Mark();
}
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
@ -70,13 +86,29 @@ class Server {
return true;
}
int port() const { return port_; }
int cores() const { return gpr_cpu_num_cores(); }
static std::shared_ptr<ServerCredentials> CreateServerCredentials(
const ServerConfig& config) {
if (config.has_security_params()) {
SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
test_server1_cert};
SslServerCredentialsOptions ssl_opts;
ssl_opts.pem_root_certs = "";
ssl_opts.pem_key_cert_pairs.push_back(pkcp);
return SslServerCredentials(ssl_opts);
} else {
return InsecureServerCredentials();
}
}
private:
int port_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
int port);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc

@ -49,38 +49,40 @@
#include <grpc++/security/server_credentials.h>
#include <gtest/gtest.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) {
explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
gpr_join_host_port(&server_address, "::", port());
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.AddListeningPort(server_address,
Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
for (int i = 0; i < 10000 / config.threads(); i++) {
for (int j = 0; j < config.threads(); j++) {
for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
for (int j = 0; j < config.async_server_threads(); j++) {
auto request_unary = std::bind(
&TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
_2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
&BenchmarkService::AsyncService::RequestUnaryCall, &async_service_,
_1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
auto request_streaming = std::bind(
&TestService::AsyncService::RequestStreamingCall, &async_service_,
_1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
&BenchmarkService::AsyncService::RequestStreamingCall,
&async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary, ProcessRPC));
@ -89,10 +91,10 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
@ -309,7 +311,7 @@ class AsyncQpsServerTest : public Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_;
BenchmarkService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
class PerThreadShutdownState {
@ -333,9 +335,8 @@ class AsyncQpsServerTest : public Server {
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config));
}
} // namespace testing

@ -43,14 +43,14 @@
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class TestServiceImpl GRPC_FINAL : public TestService::Service {
class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) GRPC_OVERRIDE {
@ -84,30 +84,29 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
explicit SynchronousServer(const ServerConfig& config) : Server(config) {
ServerBuilder builder;
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
builder.AddListeningPort(server_address, InsecureServerCredentials());
gpr_join_host_port(&server_address, "::", port());
builder.AddListeningPort(server_address,
Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterService(&service_);
return builder.BuildAndStart();
impl_ = builder.BuildAndStart();
}
TestServiceImpl service_;
private:
BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new SynchronousServer(config, port));
const ServerConfig& config) {
return std::unique_ptr<Server>(new SynchronousServer(config));
}
} // namespace testing

@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
bins/$config/qps_worker -driver_port 10000 &
PID1=$!
bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
bins/$config/qps_worker -driver_port 10010 &
PID2=$!
export QPS_WORKERS="localhost:10000,localhost:10010"

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -51,17 +49,14 @@ static void RunSynchronousStreamingPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -75,7 +70,6 @@ static void RunSynchronousStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousStreamingPingPong();
return 0;

@ -31,8 +31,6 @@
*
*/
#include <signal.h>
#include <set>
#include <grpc/support/log.h>
@ -51,17 +49,14 @@ static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@ -76,7 +71,6 @@ static void RunSynchronousUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;

@ -61,7 +61,7 @@ Timer::Result Timer::Sample() {
return r;
}
Timer::Result Timer::Mark() {
Timer::Result Timer::Mark() const {
Result s = Sample();
Result r;
r.wall = s.wall - start_.wall;

@ -44,7 +44,7 @@ class Timer {
double system;
};
Result Mark();
Result Mark() const;
static double Now();

@ -43,8 +43,7 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
DEFINE_int32(driver_port, 0, "Driver server port.");
DEFINE_int32(server_port, 0, "Spawned server port.");
DEFINE_int32(driver_port, 0, "Port for communication with driver");
static bool got_sigint = false;
@ -54,7 +53,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
QpsWorker worker(FLAGS_driver_port);
while (!got_sigint) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),

@ -1,4 +1,3 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,62 +27,20 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto3";
package grpc.testing;
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
}
message StatsRequest {
// run number
int32 test_num = 1;
}
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
}
message Payload {
// The type of data in body.
PayloadType type = 1;
// Primary contents of payload.
bytes body = 2;
}
import "test/proto/benchmarks/payloads.proto";
import "test/proto/benchmarks/stats.proto";
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
package grpc.testing;
enum ClientType {
SYNCHRONOUS_CLIENT = 0;
SYNC_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 0;
SYNC_SERVER = 0;
ASYNC_SERVER = 1;
}
@ -92,14 +49,6 @@ enum RpcType {
STREAMING = 1;
}
enum LoadType {
CLOSED_LOOP = 0;
POISSON = 1;
UNIFORM = 2;
DETERMINISTIC = 3;
PARETO = 4;
}
message PoissonParams {
double offered_load = 1;
}
@ -118,32 +67,45 @@ message ParetoParams {
double alpha = 2;
}
message ClosedLoopParams {
}
message LoadParams {
oneof load {
PoissonParams poisson = 1;
UniformParams uniform = 2;
DeterministicParams determ = 3;
ParetoParams pareto = 4;
ClosedLoopParams closed_loop = 1;
PoissonParams poisson = 2;
UniformParams uniform = 3;
DeterministicParams determ = 4;
ParetoParams pareto = 5;
};
}
// presence of SecurityParams implies use of TLS
message SecurityParams {
bool use_test_ca = 1;
string server_host_override = 2;
}
message ClientConfig {
repeated string server_targets = 1;
ClientType client_type = 2;
bool enable_ssl = 3;
SecurityParams security_params = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
string host = 9;
LoadType load_type = 10;
LoadParams load_params = 11;
LoadParams load_params = 10;
PayloadConfig payload_config = 11;
}
message ClientStatus {
ClientStats stats = 1;
}
// Request current stats
message Mark {
bool reset = 1;
}
message ClientArgs {
@ -153,22 +115,15 @@ message ClientArgs {
}
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 2;
double time_user = 3;
double time_system = 4;
}
message ClientStatus {
ClientStats stats = 1;
}
message ServerConfig {
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
string host = 4;
SecurityParams security_params = 2;
int32 port = 4;
// only for async server
int32 async_server_threads = 7;
// restrict core usage
int32 core_limit = 8;
PayloadConfig payload_config = 9;
}
message ServerArgs {
@ -181,38 +136,5 @@ message ServerArgs {
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
int32 response_size = 2;
// Optional input payload sent along with the request.
Payload payload = 3;
}
message SimpleResponse {
Payload payload = 1;
}
service TestService {
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
service Worker {
// Start test with specified workload
rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
// Start test with specified workload
rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
int32 cores = 3;
}

@ -0,0 +1,55 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.testing;
message ByteBufferParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message SimpleProtoParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message ComplexProtoParams {
// TODO (vpai): Fill this in once the details of complex, representative
// protos are decided
}
message PayloadConfig {
oneof payload {
ByteBufferParams bytebuf_params = 1;
SimpleProtoParams simple_params = 2;
ComplexProtoParams complex_params = 3;
}
}

@ -0,0 +1,55 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto3";
import "test/proto/messages.proto";
import "test/proto/benchmarks/control.proto";
package grpc.testing;
service BenchmarkService {
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
service WorkerService {
// Start server with specified workload
rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
// Start client with specified workload
rpc RunClient(stream ClientArgs) returns (stream ClientStatus);
}

@ -0,0 +1,59 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.testing;
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
}
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 2;
double time_user = 3;
double time_system = 4;
}

@ -1573,6 +1573,23 @@
"test/cpp/common/secure_auth_context_test.cc"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util",
"qps"
],
"headers": [],
"language": "c++",
"name": "secure_sync_unary_ping_pong_test",
"src": [
"test/cpp/qps/secure_sync_unary_ping_pong_test.cc"
]
},
{
"deps": [
"gpr",
@ -15371,8 +15388,16 @@
"test/cpp/qps/stats.h",
"test/cpp/qps/timer.h",
"test/cpp/util/benchmark_config.h",
"test/proto/qpstest.grpc.pb.h",
"test/proto/qpstest.pb.h"
"test/proto/benchmarks/control.grpc.pb.h",
"test/proto/benchmarks/control.pb.h",
"test/proto/benchmarks/payloads.grpc.pb.h",
"test/proto/benchmarks/payloads.pb.h",
"test/proto/benchmarks/services.grpc.pb.h",
"test/proto/benchmarks/services.pb.h",
"test/proto/benchmarks/stats.grpc.pb.h",
"test/proto/benchmarks/stats.pb.h",
"test/proto/messages.grpc.pb.h",
"test/proto/messages.pb.h"
],
"language": "c++",
"name": "qps",

@ -1515,6 +1515,22 @@
"windows"
]
},
{
"ci_platforms": [
"linux",
"mac",
"posix"
],
"exclude_configs": [],
"flaky": false,
"language": "c++",
"name": "secure_sync_unary_ping_pong_test",
"platforms": [
"linux",
"mac",
"posix"
]
},
{
"ci_platforms": [
"linux",

@ -147,13 +147,45 @@
<ClInclude Include="..\..\..\test\cpp\util\benchmark_config.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\..\test\proto\qpstest.pb.cc">
<ClCompile Include="..\..\..\test\proto\messages.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\qpstest.pb.h">
<ClInclude Include="..\..\..\test\proto\messages.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\qpstest.grpc.pb.cc">
<ClCompile Include="..\..\..\test\proto\messages.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\qpstest.grpc.pb.h">
<ClInclude Include="..\..\..\test\proto\messages.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\control.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\control.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\payloads.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\payloads.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\services.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\services.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\stats.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.grpc.pb.cc">
</ClCompile>
<ClInclude Include="..\..\..\test\proto\benchmarks\stats.grpc.pb.h">
</ClInclude>
<ClCompile Include="..\..\..\test\cpp\qps\perf_db.pb.cc">
</ClCompile>

@ -1,9 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="..\..\..\test\proto\qpstest.proto">
<ClCompile Include="..\..\..\test\proto\messages.proto">
<Filter>test\proto</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\control.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\payloads.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\services.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\proto\benchmarks\stats.proto">
<Filter>test\proto\benchmarks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\cpp\qps\perf_db.proto">
<Filter>test\cpp\qps</Filter>
</ClCompile>
@ -90,6 +102,9 @@
<Filter Include="test\proto">
<UniqueIdentifier>{44e63a33-67f4-0575-e87a-711a7c9111e2}</UniqueIdentifier>
</Filter>
<Filter Include="test\proto\benchmarks">
<UniqueIdentifier>{4180a094-39b4-e46c-1576-940bfe87d284}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>

Loading…
Cancel
Save