|
|
|
@ -463,6 +463,7 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
gpr_timer_set_enabled(0); |
|
|
|
|
|
|
|
|
|
// Finish a run
|
|
|
|
|
// Finish clients
|
|
|
|
|
std::unique_ptr<ScenarioResult> result(new ScenarioResult); |
|
|
|
|
Histogram merged_latencies; |
|
|
|
|
std::unordered_map<int, int64_t> merged_statuses; |
|
|
|
@ -477,17 +478,9 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "Finishing servers"); |
|
|
|
|
for (size_t i = 0; i < num_servers; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
if (!server->stream->Write(server_mark)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); |
|
|
|
|
} |
|
|
|
|
if (!server->stream->WritesDone()) { |
|
|
|
|
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Collect the client final run results before finish server
|
|
|
|
|
// otherwise, we will include client shutdown process in benchmark results
|
|
|
|
|
for (size_t i = 0; i < num_clients; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
// Read the client final status
|
|
|
|
@ -506,28 +499,21 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_clients; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
Status s = client->stream->Finish(); |
|
|
|
|
// Since we shutdown servers and clients at the same time, clients can
|
|
|
|
|
// observe cancellation. Thus, we consider both OK and CANCELLED as good
|
|
|
|
|
// status.
|
|
|
|
|
const bool success = IsSuccess(s); |
|
|
|
|
result->add_client_success(success); |
|
|
|
|
if (!success) { |
|
|
|
|
gpr_log(GPR_ERROR, "Client %zu had an error %s", i, |
|
|
|
|
s.error_message().c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
merged_latencies.FillProto(result->mutable_latencies()); |
|
|
|
|
for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin(); |
|
|
|
|
it != merged_statuses.end(); ++it) { |
|
|
|
|
RequestResultCount* rrc = result->add_request_results(); |
|
|
|
|
rrc->set_status_code(it->first); |
|
|
|
|
rrc->set_count(it->second); |
|
|
|
|
// Finish servers
|
|
|
|
|
gpr_log(GPR_INFO, "Finishing servers"); |
|
|
|
|
for (size_t i = 0; i < num_servers; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
if (!server->stream->Write(server_mark)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); |
|
|
|
|
} |
|
|
|
|
if (!server->stream->WritesDone()) { |
|
|
|
|
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Collect the server final run results before checking client status
|
|
|
|
|
// otherwise, we will wait for the benchmark
|
|
|
|
|
for (size_t i = 0; i < num_servers; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
// Read the server final status
|
|
|
|
@ -541,6 +527,23 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get final rpc status from clients
|
|
|
|
|
for (size_t i = 0; i < num_clients; i++) { |
|
|
|
|
auto client = &clients[i]; |
|
|
|
|
Status s = client->stream->Finish(); |
|
|
|
|
// Since we shutdown servers and clients at the same time, clients can
|
|
|
|
|
// observe cancellation. Thus, we consider both OK and CANCELLED as good
|
|
|
|
|
// status.
|
|
|
|
|
const bool success = IsSuccess(s); |
|
|
|
|
result->add_client_success(success); |
|
|
|
|
if (!success) { |
|
|
|
|
gpr_log(GPR_ERROR, "Client %zu had an error %s", i, |
|
|
|
|
s.error_message().c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get final rpc status from servers
|
|
|
|
|
for (size_t i = 0; i < num_servers; i++) { |
|
|
|
|
auto server = &servers[i]; |
|
|
|
|
Status s = server->stream->Finish(); |
|
|
|
@ -558,6 +561,15 @@ std::unique_ptr<ScenarioResult> RunScenario( |
|
|
|
|
if (g_inproc_servers != nullptr) { |
|
|
|
|
delete g_inproc_servers; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Post-processing the results summary
|
|
|
|
|
merged_latencies.FillProto(result->mutable_latencies()); |
|
|
|
|
for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin(); |
|
|
|
|
it != merged_statuses.end(); ++it) { |
|
|
|
|
RequestResultCount* rrc = result->add_request_results(); |
|
|
|
|
rrc->set_status_code(it->first); |
|
|
|
|
rrc->set_count(it->second); |
|
|
|
|
} |
|
|
|
|
postprocess_scenario_result(result.get()); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|