Update by review

pull/22984/head
Esun Kim 5 years ago
parent 0eefd8b209
commit ed2037d343
  1. 260
      test/cpp/qps/driver.cc

@ -227,6 +227,132 @@ static void postprocess_scenario_result(ScenarioResult* result) {
client_queries_per_cpu_sec);
}
struct ClientData {
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
struct ServerData {
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
static void FinishClients(const std::vector<ClientData>& clients,
const ClientArgs& client_mark) {
gpr_log(GPR_INFO, "Finishing clients");
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
GPR_ASSERT(false);
}
if (!client->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
GPR_ASSERT(false);
}
}
}
static void ReceiveFinalStatusFromClients(
const std::vector<ClientData>& clients, Histogram& merged_latencies,
std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
gpr_log(GPR_INFO, "Receiving final status from clients");
ClientStatus client_status;
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
auto client = &clients[i];
// Read the client final status
if (client->stream->Read(&client_status)) {
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
for (int i = 0; i < stats.request_results_size(); i++) {
merged_statuses[stats.request_results(i).status_code()] +=
stats.request_results(i).count();
}
result.add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
GPR_ASSERT(false);
}
}
}
static void ShutdownClients(const std::vector<ClientData>& clients,
ScenarioResult& result) {
gpr_log(GPR_INFO, "Shutdown clients");
for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
auto client = &clients[i];
Status s = client->stream->Finish();
// Since we shutdown servers and clients at the same time, clients can
// observe cancellation. Thus, we consider both OK and CANCELLED as good
// status.
const bool success = IsSuccess(s);
result.add_client_success(success);
if (!success) {
gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
s.error_message().c_str());
GPR_ASSERT(false);
}
}
}
static void FinishServers(const std::vector<ServerData>& servers,
const ServerArgs& server_mark) {
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
GPR_ASSERT(false);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
GPR_ASSERT(false);
}
}
}
static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
ScenarioResult& result) {
gpr_log(GPR_INFO, "Receiving final status from servers");
ServerStatus server_status;
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
auto server = &servers[i];
// Read the server final status
if (server->stream->Read(&server_status)) {
gpr_log(GPR_INFO, "Received final status from server %zu", i);
result.add_server_stats()->CopyFrom(server_status.stats());
result.add_server_cores(server_status.cores());
// That final status should be the last message on the server stream
GPR_ASSERT(!server->stream->Read(&server_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
GPR_ASSERT(false);
}
}
}
static void ShutdownServers(const std::vector<ServerData>& servers,
ScenarioResult& result) {
gpr_log(GPR_INFO, "Shutdown servers");
for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
auto server = &servers[i];
Status s = server->stream->Finish();
// Since we shutdown servers and clients at the same time, servers can
// observe cancellation. Thus, we consider both OK and CANCELLED as good
// status.
const bool success = IsSuccess(s);
result.add_server_success(success);
if (!success) {
gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
s.error_message().c_str());
GPR_ASSERT(false);
}
}
}
std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
std::unique_ptr<ScenarioResult> RunScenario(
@ -301,10 +427,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.resize(num_clients + num_servers);
// Start servers
struct ServerData {
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
std::vector<ServerData> servers(num_servers);
std::unordered_map<string, std::deque<int>> hosts_cores;
ChannelArguments channel_args;
@ -363,10 +485,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Targets are all set by now
result_client_config = client_config;
// Start clients
struct ClientData {
unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
std::vector<ClientData> clients(num_clients);
size_t channels_allocated = 0;
for (size_t i = 0; i < num_clients; i++) {
@ -492,70 +610,32 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
std::unordered_map<int, int64_t> merged_statuses;
// For the case where clients leads the test, it's going to finish
// clients first and wait until it's completely done for generaous termination.
bool client_finish_first = (client_config.rpc_type() != STREAMING_FROM_SERVER);
// For the case where clients lead the test such as UNARY and
// STREAMING_FROM_CLIENT, clients need to finish completely while a server
// is running to prevent the clients from being stuck while waiting for
// the result.
bool client_finish_first =
(client_config.rpc_type() != STREAMING_FROM_SERVER);
gpr_log(GPR_INFO, "Finishing clients");
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
GPR_ASSERT(false);
}
if (!client->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
GPR_ASSERT(false);
}
}
FinishClients(clients, client_mark);
if (!client_finish_first) {
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
GPR_ASSERT(false);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
GPR_ASSERT(false);
}
}
FinishServers(servers, server_mark);
}
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
// Read the client final status
if (client->stream->Read(&client_status)) {
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
for (int i = 0; i < stats.request_results_size(); i++) {
merged_statuses[stats.request_results(i).status_code()] +=
stats.request_results(i).count();
}
result->add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
GPR_ASSERT(false);
}
ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
*result);
ShutdownClients(clients, *result);
if (client_finish_first) {
FinishServers(servers, server_mark);
}
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
Status s = client->stream->Finish();
// Since we shutdown servers and clients at the same time, clients can
// observe cancellation. Thus, we consider both OK and CANCELLED as good
// status.
const bool success = IsSuccess(s);
result->add_client_success(success);
if (!success) {
gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
s.error_message().c_str());
GPR_ASSERT(false);
}
ReceiveFinalStatusFromServer(servers, *result);
ShutdownServers(servers, *result);
if (g_inproc_servers != nullptr) {
delete g_inproc_servers;
}
merged_latencies.FillProto(result->mutable_latencies());
@ -565,54 +645,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
rrc->set_status_code(it->first);
rrc->set_count(it->second);
}
if (client_finish_first) {
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
GPR_ASSERT(false);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
GPR_ASSERT(false);
}
}
}
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
// Read the server final status
if (server->stream->Read(&server_status)) {
gpr_log(GPR_INFO, "Received final status from server %zu", i);
result->add_server_stats()->CopyFrom(server_status.stats());
result->add_server_cores(server_status.cores());
// That final status should be the last message on the server stream
GPR_ASSERT(!server->stream->Read(&server_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
GPR_ASSERT(false);
}
}
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
Status s = server->stream->Finish();
// Since we shutdown servers and clients at the same time, servers can
// observe cancellation. Thus, we consider both OK and CANCELLED as good
// status.
const bool success = IsSuccess(s);
result->add_server_success(success);
if (!success) {
gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
s.error_message().c_str());
GPR_ASSERT(false);
}
}
if (g_inproc_servers != nullptr) {
delete g_inproc_servers;
}
postprocess_scenario_result(result.get());
return result;
}

Loading…
Cancel
Save