Merge branch 'verbose_errors' into histo

pull/7285/head
Vijay Pai 9 years ago
commit aa8bbeb6c6
  1. 3
      src/proto/grpc/testing/control.proto
  2. 1
      test/cpp/qps/client_async.cc
  3. 12
      test/cpp/qps/client_sync.cc
  4. 137
      test/cpp/qps/driver.cc
  5. 2
      test/cpp/qps/driver.h
  6. 18
      test/cpp/qps/qps_json_driver.cc
  7. 35
      test/cpp/qps/qps_worker.cc

@ -229,4 +229,7 @@ message ScenarioResult {
repeated int32 server_cores = 5;
// An after-the-fact computed summary
ScenarioResultSummary summary = 6;
// Information on success or failure of each worker
repeated bool client_success = 7;
repeated bool server_success = 8;
}

@ -31,7 +31,6 @@
*
*/
#include <cassert>
#include <forward_list>
#include <functional>
#include <list>

@ -31,7 +31,6 @@
*
*/
#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@ -126,11 +125,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
~SynchronousStreamingClient() {
EndThreads();
for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
stream++) {
for (size_t i = 0; i < num_threads_; i++) {
auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok());
Status s = (*stream)->Finish();
EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
s.error_message().c_str());
}
}
}
delete[] stream_;

@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
assert(s.ok());
GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
GPR_ASSERT(servers[i].stream->Write(args));
if (!servers[i].stream->Write(args)) {
gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
}
ServerStatus init_status;
GPR_ASSERT(servers[i].stream->Read(&init_status));
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@ -344,10 +348,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
ClientArgs args;
*args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
GPR_ASSERT(clients[i].stream->Write(args));
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
if (!clients[i].stream->Write(args)) {
gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
}
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
if (!clients[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
}
}
// Let everything warmup
@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
}
}
ServerStatus server_status;
ClientStatus client_status;
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Read(&server_status)) {
gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Read(&client_status)) {
gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
}
}
// Wait some time
@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
GPR_ASSERT(client->stream->WritesDone());
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
}
if (!client->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
result->add_client_stats()->CopyFrom(stats);
GPR_ASSERT(!client->stream->Read(&client_status));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
// Read the client final status
if (client->stream->Read(&client_status)) {
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
result->add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Finish().ok());
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
Status s = client->stream->Finish();
result->add_client_success(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
s.error_message().c_str());
}
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
GPR_ASSERT(server->stream->WritesDone());
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
}
}
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
result->add_server_stats()->CopyFrom(server_status.stats());
result->add_server_cores(server_status.cores());
GPR_ASSERT(!server->stream->Read(&server_status));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
// Read the server final status
if (server->stream->Read(&server_status)) {
gpr_log(GPR_INFO, "Received final status from server %zu", i);
result->add_server_stats()->CopyFrom(server_status.stats());
result->add_server_cores(server_status.cores());
// That final status should be the last message on the server stream
GPR_ASSERT(!server->stream->Read(&server_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
}
}
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Finish().ok());
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
Status s = server->stream->Finish();
result->add_server_success(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
s.error_message().c_str());
}
}
delete[] servers;
@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
void RunQuit() {
bool RunQuit() {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
@ -438,8 +495,14 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
Status s = stub->QuitWorker(&ctx, dummy, &dummy);
if (!s.ok()) {
gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s",
i, s.error_message().c_str());
result = false;
}
}
return result;
}
} // namespace testing

@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
void RunQuit();
bool RunQuit();
} // namespace testing
} // namespace grpc

@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
namespace grpc {
namespace testing {
static void QpsDriver() {
static bool QpsDriver() {
grpc::string json;
bool scfile = (FLAGS_scenarios_file != "");
@ -81,13 +81,13 @@ static void QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
RunQuit();
return;
return RunQuit();
}
// Parse into an array of scenarios
Scenarios scenarios;
ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
bool success = true;
// Make sure that there is at least some valid scenario here
GPR_ASSERT(scenarios.scenarios_size() > 0);
@ -109,7 +109,15 @@ static void QpsDriver() {
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
for (int i = 0; success && i < result->client_success_size(); i++) {
success = result->client_success(i);
}
for (int i = 0; success && i < result->server_success_size(); i++) {
success = result->server_success(i);
}
}
return success;
}
} // namespace testing
@ -118,7 +126,7 @@ static void QpsDriver() {
int main(int argc, char **argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
grpc::testing::QpsDriver();
bool ok = grpc::testing::QpsDriver();
return 0;
return ok ? 0 : 1;
}

@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@ -124,7 +123,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
@ -137,7 +136,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
@ -154,7 +153,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@ -197,30 +196,32 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
}
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
@ -232,10 +233,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@ -243,24 +244,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
}
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}

Loading…
Cancel
Save