Cleanup some names

Remove some unused options and make server_threads relevant
Start stubbing out better qps/core calculations
reviewable/pr3905/r6
vjpai 9 years ago
parent 9b14e15165
commit d08a738166
  1. 4
      test/cpp/qps/async_streaming_ping_pong_test.cc
  2. 4
      test/cpp/qps/async_unary_ping_pong_test.cc
  3. 4
      test/cpp/qps/client.h
  4. 2
      test/cpp/qps/client_async.cc
  5. 2
      test/cpp/qps/client_sync.cc
  6. 15
      test/cpp/qps/driver.cc
  7. 8
      test/cpp/qps/driver.h
  8. 4
      test/cpp/qps/qps-sweep.sh
  9. 19
      test/cpp/qps/qps_driver.cc
  10. 2
      test/cpp/qps/qps_openloop_test.cc
  11. 4
      test/cpp/qps/qps_test.cc
  12. 17
      test/cpp/qps/qps_worker.cc
  13. 7
      test/cpp/qps/report.cc
  14. 5
      test/cpp/qps/server.h
  15. 12
      test/cpp/qps/server_async.cc
  16. 2
      test/cpp/qps/server_sync.cc
  17. 7
      test/cpp/qps/sync_streaming_ping_pong_test.cc
  18. 7
      test/cpp/qps/sync_unary_ping_pong_test.cc
  19. 14
      test/proto/benchmarks/control.proto

@ -58,12 +58,12 @@ static void RunAsyncStreamingPingPong() {
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -58,12 +58,12 @@ static void RunAsyncUnaryPingPong() {
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(1);
server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -41,7 +41,7 @@
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
@ -171,7 +171,7 @@ class Client {
} else if (load.has_pareto()) {
random_dist.reset(new ParetoDist(load.pareto().interarrival_base() * num_threads,
load.pareto().alpha()));
} else if (load.has_closed()) {
} else if (load.has_closed_loop()) {
// Closed-loop doesn't use random dist at all
} else { // invalid load type
GPR_ASSERT(false);

@ -51,7 +51,7 @@
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {

@ -57,7 +57,7 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"

@ -48,7 +48,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
using std::list;
using std::thread;
@ -165,7 +165,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
WorkerService::NewStub(CreateChannel(workers[i], InsecureCredentials()));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
@ -193,7 +192,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(workers[i + num_servers], InsecureCredentials()));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
@ -250,15 +248,18 @@ std::unique_ptr<ScenarioResult> RunScenario(
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
result->server_resources.emplace_back(stats.time_elapsed(),
stats.time_user(),
stats.time_system(),
server_status.cores());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
result->client_resources.emplace_back(stats.time_elapsed(),
stats.time_user(),
stats.time_system(), -1);
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {

@ -37,22 +37,24 @@
#include <memory>
#include "test/cpp/qps/histogram.h"
#include "test/proto/perf_tests/perf_control.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class ResourceUsage {
public:
ResourceUsage(double w, double u, double s)
: wall_time_(w), user_time_(u), system_time_(s) {}
ResourceUsage(double w, double u, double s, int c)
: wall_time_(w), user_time_(u), system_time_(s), cores_(c) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
int cores() const { return cores_; }
private:
double wall_time_;
double user_time_;
double system_time_;
int cores_;
};
struct ScenarioResult {

@ -39,9 +39,9 @@ bins=`find . .. ../.. ../../.. -name bins | head -1`
for channels in 1 2 4 8
do
for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT
for client in SYNC_CLIENT ASYNC_CLIENT
do
for server in SYNCHRONOUS_SERVER ASYNC_SERVER
for server in SYNC_SERVER ASYNC_SERVER
do
for rpc in UNARY STREAMING
do

@ -54,15 +54,15 @@ DEFINE_bool(use_tls, false, "Use TLS");
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
// Server config
DEFINE_int32(server_threads, 1, "Number of server threads");
DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
DEFINE_string(server_type, "SYNC_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
DEFINE_string(client_type, "SYNC_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)");
@ -117,23 +117,14 @@ static void QpsDriver() {
pareto->set_interarrival_base(FLAGS_pareto_base / 1e6);
pareto->set_alpha(FLAGS_pareto_alpha);
} else {
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
// No further load parameters to set up for closed loop
}
ServerConfig server_config;
server_config.set_server_type(server_type);
server_config.set_threads(FLAGS_server_threads);
server_config.set_use_tls(FLAGS_use_tls);
// If we're running a sync-server streaming test, make sure
// that we have at least as many threads as the active streams
// or else threads will be blocked from forward progress and the
// client will deadlock on a timer.
GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
rpc_type == grpc::testing::STREAMING &&
FLAGS_server_threads <
FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
server_config.set_async_server_threads(FLAGS_async_server_threads);
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,

@ -64,7 +64,7 @@ static void RunQPS() {
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(4);
server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -58,12 +58,12 @@ static void RunQPS() {
client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(8);
server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -55,14 +55,14 @@
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/perf_tests/perf_services.pb.h"
#include "test/proto/benchmarks/services.pb.h"
namespace grpc {
namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
? CreateSynchronousUnaryClient(config)
: CreateSynchronousStreamingClient(config);
@ -76,9 +76,15 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
static void LimitCores(int cores) {
}
static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
if (config.core_limit() > 0) {
LimitCores(config.core_limit());
}
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
case ServerType::SYNC_SERVER:
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config);
@ -195,6 +201,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
}
ServerStatus status;
status.set_port(server->Port());
status.set_cores(server->Cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}

@ -43,6 +43,7 @@ namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
static int Cores(ResourceUsage u) { return u.cores(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads());
qps / sum(result.server_resources, Cores));
}
void GprLogReporter::ReportLatency(const ScenarioResult& result) {
@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
auto qpsPerCore = qps / result.server_config.threads();
auto qps_per_core = qps / sum(result.server_resources, Cores);
perf_db_client_.setQps(qps);
perf_db_client_.setQpsPerCore(qpsPerCore);
perf_db_client_.setQpsPerCore(qps_per_core);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}

@ -34,10 +34,12 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include <grpc/support/cpu.h>
#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/messages.grpc.pb.h"
#include "test/proto/perf_tests/perf_control.grpc.pb.h"
#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
@ -83,6 +85,7 @@ class Server {
}
int Port() const {return port_;}
int Cores() const {return gpr_cpu_num_cores();}
private:
int port_;
std::unique_ptr<Timer> timer_;

@ -50,7 +50,7 @@
#include <gtest/gtest.h>
#include "test/cpp/qps/server.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
@ -67,15 +67,15 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
for (int i = 0; i < 10000 / config.threads(); i++) {
for (int j = 0; j < config.threads(); j++) {
for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
for (int j = 0; j < config.async_server_threads(); j++) {
auto request_unary = std::bind(
&BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, _1,
_2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
@ -90,10 +90,10 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
for (int i = 0; i < config.async_server_threads(); i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}

@ -45,7 +45,7 @@
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/perf_tests/perf_services.grpc.pb.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {

@ -51,18 +51,17 @@ static void RunSynchronousStreamingPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_use_tls(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_server_type(SYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -51,18 +51,17 @@ static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
client_config.set_client_type(SYNCHRONOUS_CLIENT);
client_config.set_client_type(SYNC_CLIENT);
client_config.set_use_tls(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_rpc_type(UNARY);
client_config.mutable_load_params()->mutable_closed();
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(SYNCHRONOUS_SERVER);
server_config.set_server_type(SYNC_SERVER);
server_config.set_use_tls(false);
server_config.set_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);

@ -91,8 +91,7 @@ message ClientConfig {
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
string host = 9;
LoadParams load_params = 11;
LoadParams load_params = 10;
}
message ClientStatus {
@ -113,10 +112,12 @@ message ClientArgs {
message ServerConfig {
ServerType server_type = 1;
int32 threads = 2;
bool use_tls = 3;
string host = 4;
int32 port = 5;
bool use_tls = 2;
int32 port = 4;
// only for async server
int32 async_server_threads = 7;
// restrict core usage
int32 core_limit = 8;
}
message ServerArgs {
@ -129,4 +130,5 @@ message ServerArgs {
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
int32 cores = 3;
}

Loading…
Cancel
Save