allow disable core_list setting and override qps server in benchmarks

pull/9168/head
Alexander Polcyn 8 years ago
parent 258d678096
commit 4873d30ea2
  1. 84
      test/cpp/qps/driver.cc
  2. 4
      test/cpp/qps/driver.h
  3. 18
      test/cpp/qps/qps_json_driver.cc

@ -195,7 +195,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
std::unique_ptr<ScenarioResult> RunScenario( std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients, const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers, const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const char* qps_server_target_override, bool configure_core_lists) {
// Log everything from the driver // Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@ -241,9 +242,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
} }
} }
// Setup the hosts and core counts
auto hosts_cores = get_hosts_and_cores(workers);
// if num_clients is set to <=0, do dynamic sizing: all workers // if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients // except for servers are clients
if (num_clients <= 0) { if (num_clients <= 0) {
@ -264,6 +262,11 @@ std::unique_ptr<ScenarioResult> RunScenario(
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
}; };
std::vector<ServerData> servers(num_servers); std::vector<ServerData> servers(num_servers);
std::unordered_map<string, std::deque<int>> hosts_cores;
if (configure_core_lists) {
hosts_cores = get_hosts_and_cores(workers);
}
for (size_t i = 0; i < num_servers; i++) { for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")", gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i); workers[i].c_str(), i);
@ -271,37 +274,36 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(workers[i], InsecureChannelCredentials())); CreateChannel(workers[i], InsecureChannelCredentials()));
ServerConfig server_config = initial_server_config; ServerConfig server_config = initial_server_config;
char* host;
char* driver_port;
char* cli_target;
gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
string host_str(host);
int server_core_limit = initial_server_config.core_limit(); int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit(); int client_core_limit = initial_client_config.core_limit();
if (server_core_limit == 0 && client_core_limit > 0) { if (configure_core_lists) {
// In this case, limit the server cores if it matches the string host_str(get_host(workers[i]));
// same host as one or more clients if (server_core_limit == 0 && client_core_limit > 0) {
const auto& dq = hosts_cores.at(host_str); // In this case, limit the server cores if it matches the
bool match = false; // same host as one or more clients
int limit = dq.size(); const auto& dq = hosts_cores.at(host_str);
for (size_t cli = 0; cli < num_clients; cli++) { bool match = false;
if (host_str == get_host(workers[cli + num_servers])) { int limit = dq.size();
limit -= client_core_limit; for (size_t cli = 0; cli < num_clients; cli++) {
match = true; if (host_str == get_host(workers[cli + num_servers])) {
limit -= client_core_limit;
match = true;
}
}
if (match) {
GPR_ASSERT(limit > 0);
server_core_limit = limit;
} }
} }
if (match) { if (server_core_limit > 0) {
GPR_ASSERT(limit > 0); auto& dq = hosts_cores.at(host_str);
server_core_limit = limit; GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
} gpr_log(GPR_INFO, "Setting server core_list");
} for (int core = 0; core < server_core_limit; core++) {
if (server_core_limit > 0) { server_config.add_core_list(dq.front());
auto& dq = hosts_cores.at(host_str); dq.pop_front();
GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); }
for (int core = 0; core < server_core_limit; core++) {
server_config.add_core_list(dq.front());
dq.pop_front();
} }
} }
@ -315,11 +317,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) { if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
} }
gpr_join_host_port(&cli_target, host, init_status.port()); if (qps_server_target_override != NULL &&
client_config.add_server_targets(cli_target); strlen(qps_server_target_override) > 0) {
gpr_free(host); // overriding the qps server target only works if there is 1 server
gpr_free(driver_port); GPR_ASSERT(num_servers == 1);
gpr_free(cli_target); client_config.add_server_targets(qps_server_target_override);
} else {
std::string host;
char* cli_target;
host = get_host(workers[i]);
gpr_join_host_port(&cli_target, host.c_str(), init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(cli_target);
}
} }
// Targets are all set by now // Targets are all set by now
@ -341,7 +351,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
int server_core_limit = initial_server_config.core_limit(); int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit(); int client_core_limit = initial_client_config.core_limit();
if ((server_core_limit > 0) || (client_core_limit > 0)) { if (configure_core_lists &&
((server_core_limit > 0) || (client_core_limit > 0))) {
auto& dq = hosts_cores.at(get_host(worker)); auto& dq = hosts_cores.at(get_host(worker));
if (client_core_limit == 0) { if (client_core_limit == 0) {
// limit client cores if it matches a server host // limit client cores if it matches a server host
@ -359,6 +370,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
} }
if (client_core_limit > 0) { if (client_core_limit > 0) {
GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit)); GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
gpr_log(GPR_INFO, "Setting client core_list");
for (int core = 0; core < client_core_limit; core++) { for (int core = 0; core < client_core_limit; core++) {
per_client_config.add_core_list(dq.front()); per_client_config.add_core_list(dq.front());
dq.pop_front(); dq.pop_front();

@ -45,7 +45,9 @@ namespace testing {
std::unique_ptr<ScenarioResult> RunScenario( std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ClientConfig& client_config, size_t num_clients, const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers, const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const char* qps_server_target_override = "",
bool configure_core_lists = true);
bool RunQuit(); bool RunQuit();
} // namespace testing } // namespace testing

@ -67,17 +67,25 @@ DEFINE_double(error_tolerance, 0.01,
"range is narrower than the error_tolerance computed range, we " "range is narrower than the error_tolerance computed range, we "
"stop the search."); "stop the search.");
DEFINE_string(qps_server_target_override, "",
"Override QPS server target to configure in client configs."
"Only applicable if there is a single benchmark server.");
DEFINE_bool(configure_core_lists, true,
"Provide 'core_list' parameters to workers. Value determined "
"by cores available and 'core_limit' parameters of the scenarios.");
namespace grpc { namespace grpc {
namespace testing { namespace testing {
static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) { bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n"; std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";
auto result = auto result = RunScenario(
RunScenario(scenario.client_config(), scenario.num_clients(), scenario.client_config(), scenario.num_clients(),
scenario.server_config(), scenario.num_servers(), scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(), scenario.warmup_seconds(), scenario.benchmark_seconds(),
scenario.spawn_local_worker_count()); scenario.spawn_local_worker_count(),
FLAGS_qps_server_target_override.c_str(), FLAGS_configure_core_lists);
// Amend the result with scenario config. Eventually we should adjust // Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here. // RunScenario contract so we don't need to touch the result here.

Loading…
Cancel
Save