Merge pull request #7407 from ctiller/delayed-write

Benchmark fixes
pull/7462/head
Craig Tiller 9 years ago
parent 5ca7e47493
commit 77c7f9fd62
  1. 3
      src/proto/grpc/testing/control.proto
  2. 68
      test/cpp/qps/client.h
  3. 92
      test/cpp/qps/client_async.cc
  4. 28
      test/cpp/qps/client_sync.cc
  5. 135
      test/cpp/qps/driver.cc
  6. 2
      test/cpp/qps/driver.h
  7. 18
      test/cpp/qps/qps_json_driver.cc
  8. 40
      test/cpp/qps/qps_worker.cc
  9. 56
      test/cpp/qps/server_async.cc

@ -229,4 +229,7 @@ message ScenarioResult {
repeated int32 server_cores = 5;
// An after-the-fact computed summary
ScenarioResultSummary summary = 6;
// Information on success or failure of each worker
repeated bool client_success = 7;
repeated bool server_success = 8;
}

@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};
class HistogramEntry GRPC_FINAL {
public:
HistogramEntry() : used_(false) {}
bool used() const { return used_; }
double value() const { return value_; }
void set_value(double v) {
used_ = true;
value_ = v;
}
private:
bool used_;
double value_;
};
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@ -151,10 +166,21 @@ class Client {
return stats;
}
// Must call AwaitThreadsCompletion before destructor to avoid a race
// between destructor and invocation of virtual ThreadFunc
void AwaitThreadsCompletion() {
DestroyMultithreading();
std::unique_lock<std::mutex> g(thread_completion_mu_);
while (threads_remaining_ != 0) {
threads_complete_.wait(g);
}
}
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@ -162,7 +188,8 @@ class Client {
void EndThreads() { threads_.clear(); }
virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
virtual void DestroyMultithreading() = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@ -230,15 +256,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_stats_ = n;
n->Swap(&histogram_);
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
void EndSwap() {}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
HistogramEntry entry;
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
if (entry.used()) {
histogram_.Add(entry.value());
}
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
client_->CompleteThread();
return;
}
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@ -286,6 +304,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
std::mutex thread_completion_mu_;
size_t threads_remaining_;
std::condition_variable threads_complete_;
void CompleteThread() {
std::lock_guard<std::mutex> g(thread_completion_mu_);
threads_remaining_--;
if (threads_remaining_ == 0) {
threads_complete_.notify_all();
}
}
};
template <class StubType, class RequestType>

@ -31,7 +31,6 @@
*
*/
#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
@ -48,7 +47,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
@ -64,7 +62,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
using namespace std::placeholders;
@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
void* got_tag;
bool ok;
while ((*cq)->Next(&got_tag, &ok)) {
@ -201,7 +199,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
bool ThreadFunc(Histogram* histogram,
protected:
const int num_async_threads_;
private:
struct PerThreadShutdownState {
mutable std::mutex mutex;
bool shutdown;
PerThreadShutdownState() : shutdown(false) {}
};
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
}
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
}
this->EndThreads(); // this needed for resolution
}
bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
@ -209,12 +236,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
switch (cli_cqs_[thread_idx]->AsyncNext(
&got_tag, &ok,
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
case CompletionQueue::SHUTDOWN:
return false;
case CompletionQueue::GOT_EVENT: {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (!ctx->RunNextState(ok, histogram)) {
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
return true;
} else if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx
// and kickstart the new one
auto clone = ctx->StartNewClone();
@ -224,29 +254,23 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
return true;
}
case CompletionQueue::TIMEOUT:
// TODO(ctiller): do something here to track how frequently we pass
// through this codepath.
case CompletionQueue::TIMEOUT: {
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
return true;
}
return true;
}
case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
// done
return true;
}
GPR_UNREACHABLE_CODE(return false);
}
protected:
const int num_async_threads_;
private:
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
GPR_UNREACHABLE_CODE(return true);
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@ -262,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@ -307,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@ -339,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@ -391,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
~AsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@ -439,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@ -471,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@ -527,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}

@ -31,7 +31,6 @@
*
*/
#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@ -46,7 +45,6 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -55,7 +53,6 @@
#include "src/core/lib/profiling/timers.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
@ -90,6 +87,9 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
private:
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
~SynchronousUnaryClient() { EndThreads(); }
~SynchronousUnaryClient() {}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
histogram->Add((UsageTimer::Now() - start) * 1e9);
entry->set_value((UsageTimer::Now() - start) * 1e9);
return s.ok();
}
};
@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
EndThreads();
for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
stream++) {
for (size_t i = 0; i < num_threads_; i++) {
auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok());
Status s = (*stream)->Finish();
EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
s.error_message().c_str());
}
}
}
delete[] stream_;
delete[] context_;
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((UsageTimer::Now() - start) * 1e9);
entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
return false;

@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
assert(s.ok());
GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
GPR_ASSERT(servers[i].stream->Write(args));
if (!servers[i].stream->Write(args)) {
gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
}
ServerStatus init_status;
GPR_ASSERT(servers[i].stream->Read(&init_status));
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
GPR_ASSERT(clients[i].stream->Write(args));
if (!clients[i].stream->Write(args)) {
gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
}
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
if (!clients[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
}
}
// Let everything warmup
@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
}
}
ServerStatus server_status;
ClientStatus client_status;
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Read(&server_status)) {
gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Read(&client_status)) {
gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
}
}
// Wait some time
@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
GPR_ASSERT(client->stream->WritesDone());
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
if (!client->stream->Write(client_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
}
if (!client->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
result->add_client_stats()->CopyFrom(stats);
GPR_ASSERT(!client->stream->Read(&client_status));
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
// Read the client final status
if (client->stream->Read(&client_status)) {
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
result->add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
}
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Finish().ok());
for (size_t i = 0; i < num_clients; i++) {
auto client = &clients[i];
Status s = client->stream->Finish();
result->add_client_success(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
s.error_message().c_str());
}
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
GPR_ASSERT(server->stream->WritesDone());
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
if (!server->stream->Write(server_mark)) {
gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
}
if (!server->stream->WritesDone()) {
gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
}
}
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
result->add_server_stats()->CopyFrom(server_status.stats());
result->add_server_cores(server_status.cores());
GPR_ASSERT(!server->stream->Read(&server_status));
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
// Read the server final status
if (server->stream->Read(&server_status)) {
gpr_log(GPR_INFO, "Received final status from server %zu", i);
result->add_server_stats()->CopyFrom(server_status.stats());
result->add_server_cores(server_status.cores());
// That final status should be the last message on the server stream
GPR_ASSERT(!server->stream->Read(&server_status));
} else {
gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
}
}
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Finish().ok());
for (size_t i = 0; i < num_servers; i++) {
auto server = &servers[i];
Status s = server->stream->Finish();
result->add_server_success(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
s.error_message().c_str());
}
}
delete[] servers;
@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
void RunQuit() {
bool RunQuit() {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
@ -438,8 +495,14 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
Status s = stub->QuitWorker(&ctx, dummy, &dummy);
if (!s.ok()) {
gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
s.error_message().c_str());
result = false;
}
}
return result;
}
} // namespace testing

@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
void RunQuit();
bool RunQuit();
} // namespace testing
} // namespace grpc

@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
namespace grpc {
namespace testing {
static void QpsDriver() {
static bool QpsDriver() {
grpc::string json;
bool scfile = (FLAGS_scenarios_file != "");
@ -81,13 +81,13 @@ static void QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
RunQuit();
return;
return RunQuit();
}
// Parse into an array of scenarios
Scenarios scenarios;
ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
bool success = true;
// Make sure that there is at least some valid scenario here
GPR_ASSERT(scenarios.scenarios_size() > 0);
@ -109,7 +109,15 @@ static void QpsDriver() {
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
for (int i = 0; success && i < result->client_success_size(); i++) {
success = result->client_success(i);
}
for (int i = 0; success && i < result->server_success_size(); i++) {
success = result->server_success(i);
}
}
return success;
}
} // namespace testing
@ -118,7 +126,7 @@ static void QpsDriver() {
int main(int argc, char **argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
grpc::testing::QpsDriver();
bool ok = grpc::testing::QpsDriver();
return 0;
return ok ? 0 : 1;
}

@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
Status ret = RunClientBody(ctx, stream);
gpr_log(GPR_INFO, "RunClient: Returning");
return ret;
}
@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
Status ret = RunServerBody(ctx, stream);
gpr_log(GPR_INFO, "RunServer: Returning");
return ret;
}
@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
}
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
client->AwaitThreadsCompletion();
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}
@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
}
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}

@ -123,22 +123,24 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
(*ss)->set_shutdown();
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
// TODO (vpai): Remove this deadline and allow Shutdown to finish properly
auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
server_->Shutdown(deadline);
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
(*cq)->Shutdown();
}
server_->Shutdown(std::chrono::system_clock::now() +
std::chrono::seconds(3));
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
(*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server {
}
private:
void ThreadFunc(int rank) {
void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
const bool still_going = ctx->RunNextState(ok);
if (!shutdown_state_[rank]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
ctx->Reset();
}
} else {
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
return;
}
const bool still_going = ctx->RunNextState(ok);
// if this RPC context is done, refresh it
if (!still_going) {
ctx->Reset();
}
}
return;
}
@ -334,24 +338,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
class PerThreadShutdownState {
public:
PerThreadShutdownState() : shutdown_(false) {}
bool shutdown() const {
std::lock_guard<std::mutex> lock(mutex_);
return shutdown_;
}
void set_shutdown() {
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
private:
mutable std::mutex mutex_;
bool shutdown_;
struct PerThreadShutdownState {
mutable std::mutex mutex;
bool shutdown;
PerThreadShutdownState() : shutdown(false) {}
};
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};

Loading…
Cancel
Save