|
|
|
@ -25,8 +25,8 @@ |
|
|
|
|
#include <unordered_map> |
|
|
|
|
#include <vector> |
|
|
|
|
|
|
|
|
|
#include "absl/log/check.h" |
|
|
|
|
#include "google/protobuf/timestamp.pb.h" |
|
|
|
|
#include "third_party/absl/log/check.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
@ -78,13 +78,13 @@ static deque<string> get_workers(const string& env_name) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (out.empty()) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Environment variable \"%s\" does not contain a list of QPS " |
|
|
|
|
"workers to use. Set it to a comma-separated list of " |
|
|
|
|
"hostname:port pairs, starting with hosts that should act as " |
|
|
|
|
"servers. E.g. export " |
|
|
|
|
"%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"", |
|
|
|
|
env_name.c_str(), env_name.c_str()); |
|
|
|
|
LOG(ERROR) << "Environment variable \"" << env_name |
|
|
|
|
<< "\" does not contain a list of QPS " |
|
|
|
|
"workers to use. Set it to a comma-separated list of " |
|
|
|
|
"hostname:port pairs, starting with hosts that should act as " |
|
|
|
|
"servers. E.g. export " |
|
|
|
|
<< env_name |
|
|
|
|
<< "=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\""; |
|
|
|
|
} |
|
|
|
|
return out; |
|
|
|
|
} |
|
|
|
@ -237,7 +237,7 @@ struct ServerData { |
|
|
|
|
|
|
|
|
|
static void FinishClients(const std::vector<ClientData>& clients, |
|
|
|
|
const ClientArgs& client_mark) { |
|
|
|
|
gpr_log(GPR_INFO, "Finishing clients"); |
|
|
|
|
LOG(INFO) << "Finishing clients"; |
|
|
|
|
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
if (!client->stream->Write(client_mark)) { |
|
|
|
@ -252,13 +252,13 @@ static void FinishClients(const std::vector<ClientData>& clients, |
|
|
|
|
static void ReceiveFinalStatusFromClients( |
|
|
|
|
const std::vector<ClientData>& clients, Histogram& merged_latencies, |
|
|
|
|
std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) { |
|
|
|
|
gpr_log(GPR_INFO, "Receiving final status from clients"); |
|
|
|
|
LOG(INFO) << "Receiving final status from clients"; |
|
|
|
|
ClientStatus client_status; |
|
|
|
|
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
// Read the client final status
|
|
|
|
|
if (client->stream->Read(&client_status)) { |
|
|
|
|
gpr_log(GPR_INFO, "Received final status from client %zu", i); |
|
|
|
|
LOG(INFO) << "Received final status from client " << i; |
|
|
|
|
const auto& stats = client_status.stats(); |
|
|
|
|
merged_latencies.MergeProto(stats.latencies()); |
|
|
|
|
for (int i = 0; i < stats.request_results_size(); i++) { |
|
|
|
@ -282,7 +282,7 @@ static void ReceiveFinalStatusFromClients( |
|
|
|
|
|
|
|
|
|
static void ShutdownClients(const std::vector<ClientData>& clients, |
|
|
|
|
ScenarioResult& result) { |
|
|
|
|
gpr_log(GPR_INFO, "Shutdown clients"); |
|
|
|
|
LOG(INFO) << "Shutdown clients"; |
|
|
|
|
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
Status s = client->stream->Finish(); |
|
|
|
@ -300,7 +300,7 @@ static void ShutdownClients(const std::vector<ClientData>& clients, |
|
|
|
|
|
|
|
|
|
static void FinishServers(const std::vector<ServerData>& servers, |
|
|
|
|
const ServerArgs& server_mark) { |
|
|
|
|
gpr_log(GPR_INFO, "Finishing servers"); |
|
|
|
|
LOG(INFO) << "Finishing servers"; |
|
|
|
|
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
if (!server->stream->Write(server_mark)) { |
|
|
|
@ -314,13 +314,13 @@ static void FinishServers(const std::vector<ServerData>& servers, |
|
|
|
|
|
|
|
|
|
static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers, |
|
|
|
|
ScenarioResult& result) { |
|
|
|
|
gpr_log(GPR_INFO, "Receiving final status from servers"); |
|
|
|
|
LOG(INFO) << "Receiving final status from servers"; |
|
|
|
|
ServerStatus server_status; |
|
|
|
|
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
// Read the server final status
|
|
|
|
|
if (server->stream->Read(&server_status)) { |
|
|
|
|
gpr_log(GPR_INFO, "Received final status from server %zu", i); |
|
|
|
|
LOG(INFO) << "Received final status from server " << i; |
|
|
|
|
result.add_server_stats()->CopyFrom(server_status.stats()); |
|
|
|
|
result.add_server_cores(server_status.cores()); |
|
|
|
|
// That final status should be the last message on the server stream
|
|
|
|
@ -334,7 +334,7 @@ static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers, |
|
|
|
|
|
|
|
|
|
static void ShutdownServers(const std::vector<ServerData>& servers, |
|
|
|
|
ScenarioResult& result) { |
|
|
|
|
gpr_log(GPR_INFO, "Shutdown servers"); |
|
|
|
|
LOG(INFO) << "Shutdown servers"; |
|
|
|
|
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
Status s = server->stream->Finish(); |
|
|
|
@ -430,8 +430,8 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
ChannelArguments channel_args; |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_servers; i++) { |
|
|
|
|
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")", |
|
|
|
|
workers[i].c_str(), i); |
|
|
|
|
LOG(INFO) << "Starting server on " << workers[i] << " (worker #" << i |
|
|
|
|
<< ")"; |
|
|
|
|
if (!run_inproc) { |
|
|
|
|
servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel( |
|
|
|
|
workers[i], |
|
|
|
@ -487,8 +487,8 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
size_t channels_allocated = 0; |
|
|
|
|
for (size_t i = 0; i < num_clients; i++) { |
|
|
|
|
const auto& worker = workers[i + num_servers]; |
|
|
|
|
gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")", |
|
|
|
|
worker.c_str(), i + num_servers); |
|
|
|
|
LOG(INFO) << "Starting client on " << worker << " (worker #" |
|
|
|
|
<< i + num_servers << ")"; |
|
|
|
|
if (!run_inproc) { |
|
|
|
|
clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel( |
|
|
|
|
worker, |
|
|
|
@ -510,8 +510,7 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
(client_config.client_channels() - channels_allocated) / |
|
|
|
|
(num_clients - i); |
|
|
|
|
channels_allocated += num_channels; |
|
|
|
|
gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i, |
|
|
|
|
num_channels); |
|
|
|
|
VLOG(2) << "Client " << i << " gets " << num_channels << " channels"; |
|
|
|
|
per_client_config.set_client_channels(num_channels); |
|
|
|
|
|
|
|
|
|
ClientArgs args; |
|
|
|
@ -533,7 +532,7 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
|
|
|
|
|
// Send an initial mark: clients can use this to know that everything is ready
|
|
|
|
|
// to start
|
|
|
|
|
gpr_log(GPR_INFO, "Initiating"); |
|
|
|
|
LOG(INFO) << "Initiating"; |
|
|
|
|
ServerArgs server_mark; |
|
|
|
|
server_mark.mutable_mark()->set_reset(true); |
|
|
|
|
ClientArgs client_mark; |
|
|
|
@ -555,13 +554,13 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Let everything warmup
|
|
|
|
|
gpr_log(GPR_INFO, "Warming up"); |
|
|
|
|
LOG(INFO) << "Warming up"; |
|
|
|
|
gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME); |
|
|
|
|
gpr_sleep_until( |
|
|
|
|
gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN))); |
|
|
|
|
|
|
|
|
|
// Start a run
|
|
|
|
|
gpr_log(GPR_INFO, "Starting"); |
|
|
|
|
LOG(INFO) << "Starting"; |
|
|
|
|
|
|
|
|
|
auto start_time = time(nullptr); |
|
|
|
|
|
|
|
|
@ -593,7 +592,7 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Wait some time
|
|
|
|
|
gpr_log(GPR_INFO, "Running"); |
|
|
|
|
LOG(INFO) << "Running"; |
|
|
|
|
// Use gpr_sleep_until rather than this_thread::sleep_until to support
|
|
|
|
|
// compilers that don't work with this_thread
|
|
|
|
|
gpr_sleep_until(gpr_time_add( |
|
|
|
@ -669,8 +668,8 @@ bool RunQuit( |
|
|
|
|
ctx.set_wait_for_ready(true); |
|
|
|
|
Status s = stub->QuitWorker(&ctx, phony, &phony); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, |
|
|
|
|
s.error_message().c_str()); |
|
|
|
|
LOG(ERROR) << "Worker " << i << " could not be properly quit because " |
|
|
|
|
<< s.error_message(); |
|
|
|
|
result = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|