|
|
|
@ -140,7 +140,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { |
|
|
|
|
class AsyncClient : public Client { |
|
|
|
|
public: |
|
|
|
|
explicit AsyncClient(const ClientConfig& config, |
|
|
|
|
std::function<void(CompletionQueue*, TestService::Stub*, |
|
|
|
|
std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, |
|
|
|
|
const SimpleRequest&)> setup_ctx) : |
|
|
|
|
Client(config) { |
|
|
|
|
for (int i = 0; i < config.async_client_threads(); i++) { |
|
|
|
@ -158,18 +158,22 @@ class AsyncClient : public Client { |
|
|
|
|
if (!closed_loop_) { |
|
|
|
|
for (auto channel = channels_.begin(); channel != channels_.end(); |
|
|
|
|
channel++) { |
|
|
|
|
channel_rpc_count_lock.emplace_back(); |
|
|
|
|
channel_rpc_lock_.emplace_back(); |
|
|
|
|
rpcs_outstanding_.push_back(0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
|
|
|
|
|
int t = 0; |
|
|
|
|
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { |
|
|
|
|
for (auto channel = channels_.begin(); channel != channels_.end(); |
|
|
|
|
channel++) { |
|
|
|
|
auto* cq = cli_cqs_[t].get(); |
|
|
|
|
t = (t + 1) % cli_cqs_.size(); |
|
|
|
|
setup_ctx(cq, channel->get_stub(), request_); |
|
|
|
|
ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); |
|
|
|
|
if (closed_loop_) { |
|
|
|
|
// only relevant for closed_loop unary, but harmless for
|
|
|
|
|
// closed_loop streaming
|
|
|
|
|
ctx->Start(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -222,12 +226,13 @@ class AsyncClient : public Client { |
|
|
|
|
} |
|
|
|
|
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
|
|
|
|
|
} |
|
|
|
|
if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) { |
|
|
|
|
if (issue_allowed_[thread_idx] && |
|
|
|
|
grpc_time_source::now() >= next_issue_[thread_idx]) { |
|
|
|
|
// Attempt to issue
|
|
|
|
|
bool issued = false; |
|
|
|
|
for (int num_attempts = 0; num_attempts < channel_count_ && !issued; |
|
|
|
|
num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { |
|
|
|
|
std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]); |
|
|
|
|
std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]); |
|
|
|
|
if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { |
|
|
|
|
// do the work to issue
|
|
|
|
|
rpcs_outstanding[next_channel_[thread_idx]]++; |
|
|
|
@ -247,7 +252,7 @@ class AsyncClient : public Client { |
|
|
|
|
std::vector<bool> issue_allowed_; // may this thread attempt to issue
|
|
|
|
|
std::vector<grpc_time> next_issue_; // when should it issue?
|
|
|
|
|
|
|
|
|
|
std::vector<std::mutex> channel_rpc_count_lock_; |
|
|
|
|
std::vector<std::mutex> channel_rpc_lock_; |
|
|
|
|
std::vector<int> rpcs_outstanding_; // per-channel vector
|
|
|
|
|
int max_outstanding_per_channel_; |
|
|
|
|
int channel_count_; |
|
|
|
@ -261,14 +266,14 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { |
|
|
|
|
} |
|
|
|
|
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } |
|
|
|
|
private: |
|
|
|
|
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, |
|
|
|
|
static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, |
|
|
|
|
const SimpleRequest& req) { |
|
|
|
|
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; |
|
|
|
|
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, |
|
|
|
|
const SimpleRequest& request) { |
|
|
|
|
return stub->AsyncUnaryCall(ctx, request, cq); |
|
|
|
|
}; |
|
|
|
|
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( |
|
|
|
|
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( |
|
|
|
|
stub, req, start_req, check_done); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -350,7 +355,7 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { |
|
|
|
|
|
|
|
|
|
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } |
|
|
|
|
private: |
|
|
|
|
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, |
|
|
|
|
static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, |
|
|
|
|
const SimpleRequest& req) { |
|
|
|
|
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; |
|
|
|
|
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, |
|
|
|
@ -358,7 +363,7 @@ private: |
|
|
|
|
auto stream = stub->AsyncStreamingCall(ctx, cq, tag); |
|
|
|
|
return stream; |
|
|
|
|
}; |
|
|
|
|
new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( |
|
|
|
|
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( |
|
|
|
|
stub, req, start_req, check_done); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|