Merge pull request #11787 from guantaol/qps_driver

Add a new metric and enable internal credential types in QPS Driver
pull/11883/head
Guantao Liu 8 years ago committed by GitHub
commit 1932a50e50
  1. 5
      src/proto/grpc/testing/control.proto
  2. 4
      test/cpp/qps/BUILD
  3. 14
      test/cpp/qps/client.h
  4. 39
      test/cpp/qps/driver.cc
  5. 5
      test/cpp/qps/driver.h
  6. 9
      test/cpp/qps/qps_json_driver.cc
  7. 5
      test/cpp/qps/qps_openloop_test.cc
  8. 8
      test/cpp/qps/qps_worker.cc
  9. 4
      test/cpp/qps/qps_worker.h
  10. 21
      test/cpp/qps/report.cc
  11. 7
      test/cpp/qps/report.h
  12. 5
      test/cpp/qps/secure_sync_unary_ping_pong_test.cc
  13. 15
      test/cpp/qps/server.h
  14. 5
      test/cpp/qps/worker.cc
  15. 40
      test/cpp/util/create_test_channel.cc
  16. 6
      test/cpp/util/create_test_channel.h

@ -64,6 +64,7 @@ message LoadParams {
message SecurityParams {
bool use_test_ca = 1;
string server_host_override = 2;
string cred_type = 3;
}
message ChannelArg {
@ -240,6 +241,10 @@ message ScenarioResultSummary
// Number of polls called inside completion queue per request
double client_polls_per_request = 15;
double server_polls_per_request = 16;
// Queries per CPU-sec over all servers or clients
double server_queries_per_cpu_sec = 17;
double client_queries_per_cpu_sec = 18;
}
// Results of a single benchmark scenario.

@ -81,6 +81,7 @@ grpc_cc_library(
"//src/proto/grpc/testing:services_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)
@ -148,6 +149,7 @@ grpc_cc_binary(
":driver_impl",
"//:grpc++",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
external_deps = [
"gflags",
@ -162,6 +164,7 @@ grpc_cc_test(
":driver_impl",
":qps_worker_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)
@ -173,6 +176,7 @@ grpc_cc_test(
":driver_impl",
"//:grpc++",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)

@ -39,6 +39,7 @@
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@ -405,9 +406,18 @@ class ClientImpl : public Client {
ChannelArguments args;
args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
set_channel_args(config, &args);
grpc::string type;
if (config.has_security_params() &&
config.security_params().cred_type().empty()) {
type = kTlsCredentialsType;
} else {
type = config.security_params().cred_type();
}
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
config.has_security_params(), !config.security_params().use_test_ca(),
target, type, config.security_params().server_host_override(),
!config.security_params().use_test_ca(),
std::shared_ptr<CallCredentials>(), args);
gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
GPR_ASSERT(channel_->WaitForConnected(

@ -40,6 +40,7 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/qps/stats.h"
#include "test/cpp/util/test_credentials_provider.h"
using std::list;
using std::thread;
@ -172,13 +173,26 @@ static void postprocess_scenario_result(ScenarioResult* result) {
sum(result->client_stats(), CliPollCount) / histogram.Count());
result->mutable_summary()->set_server_polls_per_request(
sum(result->server_stats(), SvrPollCount) / histogram.Count());
auto server_queries_per_cpu_sec =
histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
sum(result->server_stats(), ServerUserTime));
auto client_queries_per_cpu_sec =
histogram.Count() / (sum(result->client_stats(), SystemTime) +
sum(result->client_stats(), UserTime));
result->mutable_summary()->set_server_queries_per_cpu_sec(
server_queries_per_cpu_sec);
result->mutable_summary()->set_client_queries_per_cpu_sec(
client_queries_per_cpu_sec);
}
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const char* qps_server_target_override) {
const grpc::string& qps_server_target_override,
const grpc::string& credential_type) {
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@ -214,7 +228,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port));
local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@ -246,12 +260,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
};
std::vector<ServerData> servers(num_servers);
std::unordered_map<string, std::deque<int>> hosts_cores;
ChannelArguments channel_args;
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
servers[i].stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));
servers[i].stub = WorkerService::NewStub(CreateChannel(
workers[i], GetCredentialsProvider()->GetChannelCredentials(
credential_type, &channel_args)));
ServerConfig server_config = initial_server_config;
if (server_config.core_limit() != 0) {
@ -269,8 +285,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
if (qps_server_target_override != NULL &&
strlen(qps_server_target_override) > 0) {
if (qps_server_target_override.length() > 0) {
// overriding the qps server target only works if there is 1 server
GPR_ASSERT(num_servers == 1);
client_config.add_server_targets(qps_server_target_override);
@ -298,7 +313,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
worker.c_str(), i + num_servers);
clients[i].stub = WorkerService::NewStub(
CreateChannel(worker, InsecureChannelCredentials()));
CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials(
credential_type, &channel_args)));
ClientConfig per_client_config = client_config;
if (initial_client_config.core_limit() != 0) {
@ -483,16 +499,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
bool RunQuit() {
bool RunQuit(const grpc::string& credential_type) {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
if (workers.size() == 0) {
return false;
}
ChannelArguments channel_args;
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));
auto stub = WorkerService::NewStub(CreateChannel(
workers[i], GetCredentialsProvider()->GetChannelCredentials(
credential_type, &channel_args)));
Void dummy;
grpc::ClientContext ctx;
ctx.set_wait_for_ready(true);

@ -31,9 +31,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const char* qps_server_target_override = "");
const grpc::string& qps_server_target_override,
const grpc::string& credential_type);
bool RunQuit();
bool RunQuit(const grpc::string& credential_type);
} // namespace testing
} // namespace grpc

@ -31,6 +31,7 @@
#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
DEFINE_string(scenarios_file, "",
"JSON file containing an array of Scenario objects");
@ -61,6 +62,9 @@ DEFINE_string(qps_server_target_override, "",
DEFINE_string(json_file_out, "", "File to write the JSON output to.");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with workers");
namespace grpc {
namespace testing {
@ -72,7 +76,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
scenario.spawn_local_worker_count(),
FLAGS_qps_server_target_override.c_str());
FLAGS_qps_server_target_override, FLAGS_credential_type);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
@ -84,6 +88,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
GetReporter()->ReportTimes(*result);
GetReporter()->ReportCpuUsage(*result);
GetReporter()->ReportPollCount(*result);
GetReporter()->ReportQueriesPerCpuSec(*result);
for (int i = 0; *success && i < result->client_success_size(); i++) {
*success = result->client_success(i);
@ -185,7 +190,7 @@ static bool QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
return RunQuit();
return RunQuit(FLAGS_credential_type);
}
// Parse into an array of scenarios

@ -25,6 +25,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@ -48,8 +49,8 @@ static void RunQPS() {
server_config.set_server_type(ASYNC_SERVER);
server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
BENCHMARK, -2, "", kInsecureCredentialsType);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);

@ -41,6 +41,7 @@
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@ -263,7 +264,8 @@ class WorkerServiceImpl final : public WorkerService::Service {
QpsWorker* worker_;
};
QpsWorker::QpsWorker(int driver_port, int server_port) {
QpsWorker::QpsWorker(int driver_port, int server_port,
const grpc::string& credential_type) {
impl_.reset(new WorkerServiceImpl(server_port, this));
gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
@ -271,7 +273,9 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
gpr_join_host_port(&server_address, "::", driver_port);
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.AddListeningPort(
server_address,
GetCredentialsProvider()->GetServerCredentials(credential_type));
builder.RegisterService(impl_.get());
gpr_free(server_address);

@ -21,6 +21,7 @@
#include <memory>
#include <grpc++/support/config.h>
#include <grpc/support/atm.h>
namespace grpc {
@ -33,7 +34,8 @@ class WorkerServiceImpl;
class QpsWorker {
public:
explicit QpsWorker(int driver_port, int server_port = 0);
explicit QpsWorker(int driver_port, int server_port,
const grpc::string& credential_type);
~QpsWorker();
bool Done() const;

@ -71,6 +71,12 @@ void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
}
}
void CompositeReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
for (size_t i = 0; i < reporters_.size(); ++i) {
reporters_[i]->ReportQueriesPerCpuSec(result);
}
}
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
if (result.summary().failed_requests_per_second() > 0) {
@ -119,6 +125,13 @@ void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
result.summary().server_polls_per_request());
}
void GprLogReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Server Queries/CPU-sec: %.2f",
result.summary().server_queries_per_cpu_sec());
gpr_log(GPR_INFO, "Client Queries/CPU-sec: %.2f",
result.summary().client_queries_per_cpu_sec());
}
void JsonReporter::ReportQPS(const ScenarioResult& result) {
grpc::string json_string =
SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@ -147,6 +160,10 @@ void JsonReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void JsonReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportQPS(const ScenarioResult& result) {
grpc::ClientContext context;
grpc::Status status;
@ -183,5 +200,9 @@ void RpcReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
void RpcReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
} // namespace testing
} // namespace grpc

@ -64,6 +64,9 @@ class Reporter {
/** Reports client and server poll usage inside completion queue. */
virtual void ReportPollCount(const ScenarioResult& result) = 0;
/** Reports queries per cpu-sec. */
virtual void ReportQueriesPerCpuSec(const ScenarioResult& result) = 0;
private:
const string name_;
};
@ -82,6 +85,7 @@ class CompositeReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
private:
std::vector<std::unique_ptr<Reporter> > reporters_;
@ -99,6 +103,7 @@ class GprLogReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
};
/** Dumps the report to a JSON file. */
@ -114,6 +119,7 @@ class JsonReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
const string report_file_;
};
@ -130,6 +136,7 @@ class RpcReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
};

@ -24,6 +24,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@ -51,8 +52,8 @@ static void RunSynchronousUnaryPingPong() {
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
BENCHMARK, -2, "", kInsecureCredentialsType);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);

@ -32,6 +32,7 @@
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/port.h"
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@ -89,12 +90,14 @@ class Server {
static std::shared_ptr<ServerCredentials> CreateServerCredentials(
const ServerConfig& config) {
if (config.has_security_params()) {
SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
test_server1_cert};
SslServerCredentialsOptions ssl_opts;
ssl_opts.pem_root_certs = "";
ssl_opts.pem_key_cert_pairs.push_back(pkcp);
return SslServerCredentials(ssl_opts);
grpc::string type;
if (config.security_params().cred_type().empty()) {
type = kTlsCredentialsType;
} else {
type = config.security_params().cred_type();
}
return GetCredentialsProvider()->GetServerCredentials(type);
} else {
return InsecureServerCredentials();
}

@ -27,9 +27,12 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
DEFINE_int32(driver_port, 0, "Port for communication with driver");
DEFINE_int32(server_port, 0, "Port for operation as a server");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with driver");
static bool got_sigint = false;
@ -39,7 +42,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port, FLAGS_credential_type);
while (!got_sigint && !worker.Done()) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),

@ -51,29 +51,31 @@ void AddProdSslType() {
} // namespace
// When ssl is enabled, if server is empty, override_hostname is used to
// When cred_type is 'ssl', if server is empty, override_hostname is used to
// create channel. Otherwise, connect to server and override hostname if
// override_hostname is provided.
// When ssl is not enabled, override_hostname is ignored.
// When cred_type is not 'ssl', override_hostname is ignored.
// Set use_prod_root to true to use the SSL root for connecting to google.
// In this case, path to the roots pem file must be set via environment variable
// GRPC_DEFAULT_SSL_ROOTS_FILE_PATH.
// Otherwise, root for test SSL cert will be used.
// creds will be used to create a channel when enable_ssl is true.
// creds will be used to create a channel when cred_type is 'ssl'.
// Use examples:
// CreateTestChannel(
// "1.1.1.1:12345", "override.hostname.com", true, false, creds);
// CreateTestChannel("test.google.com:443", "", true, true, creds);
// "1.1.1.1:12345", "ssl", "override.hostname.com", false, creds);
// CreateTestChannel("test.google.com:443", "ssl", "", true, creds);
// same as above
// CreateTestChannel("", "test.google.com:443", true, true, creds);
// CreateTestChannel("", "ssl", "test.google.com:443", true, creds);
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const grpc::string& server, const grpc::string& cred_type,
const grpc::string& override_hostname, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args) {
ChannelArguments channel_args(args);
std::shared_ptr<ChannelCredentials> channel_creds;
if (enable_ssl) {
if (cred_type.empty()) {
return CreateChannel(server, InsecureChannelCredentials());
} else if (cred_type == testing::kTlsCredentialsType) { // cred_type == "ssl"
if (use_prod_roots) {
gpr_once_init(&g_once_init_add_prod_ssl_provider, &AddProdSslType);
channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
@ -95,8 +97,26 @@ std::shared_ptr<Channel> CreateTestChannel(
}
return CreateCustomChannel(connect_to, channel_creds, channel_args);
} else {
return CreateChannel(server, InsecureChannelCredentials());
channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
cred_type, &channel_args);
GPR_ASSERT(channel_creds != nullptr);
return CreateChannel(server, channel_creds);
}
}
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args) {
grpc::string type;
if (enable_ssl) {
type = testing::kTlsCredentialsType;
}
return CreateTestChannel(server, type, override_hostname, use_prod_roots,
creds, args);
}
std::shared_ptr<Channel> CreateTestChannel(

@ -44,6 +44,12 @@ std::shared_ptr<Channel> CreateTestChannel(
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args);
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& cred_type,
const grpc::string& override_hostname, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args);
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& credential_type,
const std::shared_ptr<CallCredentials>& creds);

Loading…
Cancel
Save