|
|
@ -31,7 +31,6 @@ |
|
|
|
* |
|
|
|
* |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
#include <cassert> |
|
|
|
|
|
|
|
#include <forward_list> |
|
|
|
#include <forward_list> |
|
|
|
#include <functional> |
|
|
|
#include <functional> |
|
|
|
#include <list> |
|
|
|
#include <list> |
|
|
@ -48,7 +47,6 @@ |
|
|
|
#include <grpc++/generic/generic_stub.h> |
|
|
|
#include <grpc++/generic/generic_stub.h> |
|
|
|
#include <grpc/grpc.h> |
|
|
|
#include <grpc/grpc.h> |
|
|
|
#include <grpc/support/cpu.h> |
|
|
|
#include <grpc/support/cpu.h> |
|
|
|
#include <grpc/support/histogram.h> |
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
|
|
#include "src/proto/grpc/testing/services.grpc.pb.h" |
|
|
|
#include "src/proto/grpc/testing/services.grpc.pb.h" |
|
|
@ -64,7 +62,7 @@ class ClientRpcContext { |
|
|
|
ClientRpcContext() {} |
|
|
|
ClientRpcContext() {} |
|
|
|
virtual ~ClientRpcContext() {} |
|
|
|
virtual ~ClientRpcContext() {} |
|
|
|
// next state, return false if done. Collect stats when appropriate
|
|
|
|
// next state, return false if done. Collect stats when appropriate
|
|
|
|
virtual bool RunNextState(bool, Histogram* hist) = 0; |
|
|
|
virtual bool RunNextState(bool, HistogramEntry* entry) = 0; |
|
|
|
virtual ClientRpcContext* StartNewClone() = 0; |
|
|
|
virtual ClientRpcContext* StartNewClone() = 0; |
|
|
|
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } |
|
|
|
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } |
|
|
|
static ClientRpcContext* detag(void* t) { |
|
|
|
static ClientRpcContext* detag(void* t) { |
|
|
@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { |
|
|
|
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); |
|
|
|
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
|
|
|
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { |
|
|
|
switch (next_state_) { |
|
|
|
switch (next_state_) { |
|
|
|
case State::READY: |
|
|
|
case State::READY: |
|
|
|
start_ = UsageTimer::Now(); |
|
|
|
start_ = UsageTimer::Now(); |
|
|
@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { |
|
|
|
next_state_ = State::RESP_DONE; |
|
|
|
next_state_ = State::RESP_DONE; |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
case State::RESP_DONE: |
|
|
|
case State::RESP_DONE: |
|
|
|
hist->Add((UsageTimer::Now() - start_) * 1e9); |
|
|
|
entry->set_value((UsageTimer::Now() - start_) * 1e9); |
|
|
|
callback_(status_, &response_); |
|
|
|
callback_(status_, &response_); |
|
|
|
next_state_ = State::INVALID; |
|
|
|
next_state_ = State::INVALID; |
|
|
|
return false; |
|
|
|
return false; |
|
|
@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
for (int i = 0; i < num_async_threads_; i++) { |
|
|
|
for (int i = 0; i < num_async_threads_; i++) { |
|
|
|
cli_cqs_.emplace_back(new CompletionQueue); |
|
|
|
cli_cqs_.emplace_back(new CompletionQueue); |
|
|
|
next_issuers_.emplace_back(NextIssuer(i)); |
|
|
|
next_issuers_.emplace_back(NextIssuer(i)); |
|
|
|
|
|
|
|
shutdown_state_.emplace_back(new PerThreadShutdownState()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
using namespace std::placeholders; |
|
|
|
using namespace std::placeholders; |
|
|
@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
} |
|
|
|
} |
|
|
|
virtual ~AsyncClient() { |
|
|
|
virtual ~AsyncClient() { |
|
|
|
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { |
|
|
|
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { |
|
|
|
(*cq)->Shutdown(); |
|
|
|
|
|
|
|
void* got_tag; |
|
|
|
void* got_tag; |
|
|
|
bool ok; |
|
|
|
bool ok; |
|
|
|
while ((*cq)->Next(&got_tag, &ok)) { |
|
|
|
while ((*cq)->Next(&got_tag, &ok)) { |
|
|
@ -201,7 +199,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool ThreadFunc(Histogram* histogram, |
|
|
|
protected: |
|
|
|
|
|
|
|
const int num_async_threads_; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
struct PerThreadShutdownState { |
|
|
|
|
|
|
|
mutable std::mutex mutex; |
|
|
|
|
|
|
|
bool shutdown; |
|
|
|
|
|
|
|
PerThreadShutdownState() : shutdown(false) {} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int NumThreads(const ClientConfig& config) { |
|
|
|
|
|
|
|
int num_threads = config.async_client_threads(); |
|
|
|
|
|
|
|
if (num_threads <= 0) { // Use dynamic sizing
|
|
|
|
|
|
|
|
num_threads = cores_; |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return num_threads; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { |
|
|
|
|
|
|
|
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { |
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock((*ss)->mutex); |
|
|
|
|
|
|
|
(*ss)->shutdown = true; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { |
|
|
|
|
|
|
|
(*cq)->Shutdown(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this->EndThreads(); // this needed for resolution
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool ThreadFunc(HistogramEntry* entry, |
|
|
|
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { |
|
|
|
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { |
|
|
|
void* got_tag; |
|
|
|
void* got_tag; |
|
|
|
bool ok; |
|
|
|
bool ok; |
|
|
@ -209,12 +236,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
switch (cli_cqs_[thread_idx]->AsyncNext( |
|
|
|
switch (cli_cqs_[thread_idx]->AsyncNext( |
|
|
|
&got_tag, &ok, |
|
|
|
&got_tag, &ok, |
|
|
|
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { |
|
|
|
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { |
|
|
|
case CompletionQueue::SHUTDOWN: |
|
|
|
|
|
|
|
return false; |
|
|
|
|
|
|
|
case CompletionQueue::GOT_EVENT: { |
|
|
|
case CompletionQueue::GOT_EVENT: { |
|
|
|
// Got a regular event, so process it
|
|
|
|
// Got a regular event, so process it
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
if (!ctx->RunNextState(ok, histogram)) { |
|
|
|
// Proceed while holding a lock to make sure that
|
|
|
|
|
|
|
|
// this thread isn't supposed to shut down
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); |
|
|
|
|
|
|
|
if (shutdown_state_[thread_idx]->shutdown) { |
|
|
|
|
|
|
|
return true; |
|
|
|
|
|
|
|
} else if (!ctx->RunNextState(ok, entry)) { |
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
// and kickstart the new one
|
|
|
|
// and kickstart the new one
|
|
|
|
auto clone = ctx->StartNewClone(); |
|
|
|
auto clone = ctx->StartNewClone(); |
|
|
@ -224,29 +254,23 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
} |
|
|
|
} |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
case CompletionQueue::TIMEOUT: |
|
|
|
case CompletionQueue::TIMEOUT: { |
|
|
|
// TODO(ctiller): do something here to track how frequently we pass
|
|
|
|
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); |
|
|
|
// through this codepath.
|
|
|
|
if (shutdown_state_[thread_idx]->shutdown) { |
|
|
|
|
|
|
|
return true; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return true; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
|
|
|
|
|
|
|
|
// done
|
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_UNREACHABLE_CODE(return false); |
|
|
|
GPR_UNREACHABLE_CODE(return true); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
|
|
|
const int num_async_threads_; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
int NumThreads(const ClientConfig& config) { |
|
|
|
|
|
|
|
int num_threads = config.async_client_threads(); |
|
|
|
|
|
|
|
if (num_threads <= 0) { // Use dynamic sizing
|
|
|
|
|
|
|
|
num_threads = cores_; |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return num_threads; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; |
|
|
|
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; |
|
|
|
std::vector<std::function<gpr_timespec()>> next_issuers_; |
|
|
|
std::vector<std::function<gpr_timespec()>> next_issuers_; |
|
|
|
|
|
|
|
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( |
|
|
|
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( |
|
|
@ -262,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL |
|
|
|
config, SetupCtx, BenchmarkStubCreator) { |
|
|
|
config, SetupCtx, BenchmarkStubCreator) { |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
} |
|
|
|
} |
|
|
|
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } |
|
|
|
~AsyncUnaryClient() GRPC_OVERRIDE {} |
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
private: |
|
|
|
static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
|
|
|
static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
|
|
@ -307,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { |
|
|
|
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); |
|
|
|
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
} |
|
|
|
} |
|
|
|
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
|
|
|
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { |
|
|
|
while (true) { |
|
|
|
while (true) { |
|
|
|
switch (next_state_) { |
|
|
|
switch (next_state_) { |
|
|
|
case State::STREAM_IDLE: |
|
|
|
case State::STREAM_IDLE: |
|
|
@ -339,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
break; |
|
|
|
break; |
|
|
|
case State::READ_DONE: |
|
|
|
case State::READ_DONE: |
|
|
|
hist->Add((UsageTimer::Now() - start_) * 1e9); |
|
|
|
entry->set_value((UsageTimer::Now() - start_) * 1e9); |
|
|
|
callback_(status_, &response_); |
|
|
|
callback_(status_, &response_); |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
break; // loop around
|
|
|
|
break; // loop around
|
|
|
@ -391,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } |
|
|
|
~AsyncStreamingClient() GRPC_OVERRIDE {} |
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
private: |
|
|
|
static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
|
|
|
static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
|
|
@ -439,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { |
|
|
|
ClientRpcContext::tag(this)); |
|
|
|
ClientRpcContext::tag(this)); |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
} |
|
|
|
} |
|
|
|
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
|
|
|
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { |
|
|
|
while (true) { |
|
|
|
while (true) { |
|
|
|
switch (next_state_) { |
|
|
|
switch (next_state_) { |
|
|
|
case State::STREAM_IDLE: |
|
|
|
case State::STREAM_IDLE: |
|
|
@ -471,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
break; |
|
|
|
break; |
|
|
|
case State::READ_DONE: |
|
|
|
case State::READ_DONE: |
|
|
|
hist->Add((UsageTimer::Now() - start_) * 1e9); |
|
|
|
entry->set_value((UsageTimer::Now() - start_) * 1e9); |
|
|
|
callback_(status_, &response_); |
|
|
|
callback_(status_, &response_); |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
next_state_ = State::STREAM_IDLE; |
|
|
|
break; // loop around
|
|
|
|
break; // loop around
|
|
|
@ -527,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
StartThreads(num_async_threads_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } |
|
|
|
~GenericAsyncStreamingClient() GRPC_OVERRIDE {} |
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
private: |
|
|
|
static void CheckDone(grpc::Status s, ByteBuffer* response) {} |
|
|
|
static void CheckDone(grpc::Status s, ByteBuffer* response) {} |
|
|
|