|
|
|
@ -57,15 +57,13 @@ |
|
|
|
|
namespace grpc { |
|
|
|
|
namespace testing { |
|
|
|
|
|
|
|
|
|
typedef std::forward_list<grpc_time> deadline_list; |
|
|
|
|
|
|
|
|
|
class ClientRpcContext { |
|
|
|
|
public: |
|
|
|
|
ClientRpcContext() {} |
|
|
|
|
virtual ~ClientRpcContext() {} |
|
|
|
|
// next state, return false if done. Collect stats when appropriate
|
|
|
|
|
virtual bool RunNextState(bool, Histogram* hist) = 0; |
|
|
|
|
virtual void StartNewClone() = 0; |
|
|
|
|
virtual ClientRpcContext* StartNewClone() = 0; |
|
|
|
|
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } |
|
|
|
|
static ClientRpcContext* detag(void* t) { |
|
|
|
|
return reinterpret_cast<ClientRpcContext*>(t); |
|
|
|
@ -110,8 +108,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void StartNewClone() GRPC_OVERRIDE { |
|
|
|
|
new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); |
|
|
|
|
ClientRpcContext* StartNewClone() GRPC_OVERRIDE { |
|
|
|
|
return new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -137,12 +135,17 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { |
|
|
|
|
response_reader_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
typedef std::forward_list<grpc_time> deadline_list; |
|
|
|
|
typedef std::forward_list<ClientRpcContext *> context_list; |
|
|
|
|
|
|
|
|
|
class AsyncClient : public Client { |
|
|
|
|
public: |
|
|
|
|
explicit AsyncClient(const ClientConfig& config, |
|
|
|
|
std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, |
|
|
|
|
const SimpleRequest&)> setup_ctx) : |
|
|
|
|
Client(config), channel_rpc_lock_(config.client_channels()) { |
|
|
|
|
Client(config), channel_rpc_lock_(config.client_channels()), |
|
|
|
|
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), |
|
|
|
|
channel_count_(config.client_channels()) { |
|
|
|
|
|
|
|
|
|
SetupLoadTest(config, config.async_client_threads()); |
|
|
|
|
|
|
|
|
@ -167,8 +170,8 @@ class AsyncClient : public Client { |
|
|
|
|
|
|
|
|
|
int t = 0; |
|
|
|
|
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { |
|
|
|
|
for (auto channel = channels_.begin(); channel != channels_.end(); |
|
|
|
|
channel++) { |
|
|
|
|
for (int ch = 0; ch < channel_count_; ch++) { |
|
|
|
|
auto channel = channels_[ch]; |
|
|
|
|
auto* cq = cli_cqs_[t].get(); |
|
|
|
|
t = (t + 1) % cli_cqs_.size(); |
|
|
|
|
ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); |
|
|
|
@ -177,6 +180,9 @@ class AsyncClient : public Client { |
|
|
|
|
// closed_loop streaming
|
|
|
|
|
ctx->Start(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
contexts_[ch].push_front(ctx); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -200,7 +206,12 @@ class AsyncClient : public Client { |
|
|
|
|
deadline = grpc_time_source::now() + std::chrono::seconds(1); |
|
|
|
|
short_deadline = deadline; |
|
|
|
|
} else { |
|
|
|
|
deadline = *(rpc_deadlines_[thread_idx].begin()); |
|
|
|
|
if (rpc_deadlines_[thread_idx].empty()) { |
|
|
|
|
deadline = grpc_time_source::now() + std::chrono::seconds(1); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
deadline = *(rpc_deadlines_[thread_idx].begin()); |
|
|
|
|
} |
|
|
|
|
short_deadline = issue_allowed_[thread_idx] ? |
|
|
|
|
next_issue_[thread_idx] : deadline; |
|
|
|
|
} |
|
|
|
@ -219,37 +230,50 @@ class AsyncClient : public Client { |
|
|
|
|
GPR_ASSERT(false); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if (grpc_time_source::now() > deadline) { |
|
|
|
|
// we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (got_event) { |
|
|
|
|
if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && |
|
|
|
|
grpc_time_source::now() > deadline) { |
|
|
|
|
// we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (got_event) { |
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
if (ctx->RunNextState(ok, histogram) == false) { |
|
|
|
|
// call the callback and then delete it
|
|
|
|
|
rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn()); |
|
|
|
|
ctx->RunNextState(ok, histogram); |
|
|
|
|
ctx->StartNewClone(); |
|
|
|
|
ClientRpcContext *clone_ctx = ctx->StartNewClone(); |
|
|
|
|
delete ctx; |
|
|
|
|
if (!closed_loop_) { |
|
|
|
|
// Put this in the list of idle contexts for this channel
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
|
|
|
|
|
} |
|
|
|
|
if (issue_allowed_[thread_idx] && |
|
|
|
|
grpc_time_source::now() >= next_issue_[thread_idx]) { |
|
|
|
|
// Attempt to issue
|
|
|
|
|
// Attempt to issue
|
|
|
|
|
bool issued = false; |
|
|
|
|
for (int num_attempts = 0; num_attempts < channel_count_ && !issued; |
|
|
|
|
num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { |
|
|
|
|
num_attempts++, |
|
|
|
|
next_channel_[thread_idx] = |
|
|
|
|
(next_channel_[thread_idx]+1)%channel_count_) { |
|
|
|
|
std::lock_guard<std::mutex> |
|
|
|
|
g(channel_rpc_lock_[next_channel_[thread_idx]]); |
|
|
|
|
if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) { |
|
|
|
|
g(channel_rpc_lock_[next_channel_[thread_idx]]); |
|
|
|
|
if ((rpcs_outstanding_[next_channel_[thread_idx]] < |
|
|
|
|
max_outstanding_per_channel_) && |
|
|
|
|
!contexts_[next_channel_[thread_idx]].empty()) { |
|
|
|
|
// Get an idle context from the front of the list
|
|
|
|
|
auto ctx = contexts_[next_channel_[thread_idx]].begin(); |
|
|
|
|
contexts_[next_channel_[thread_idx]].pop_front(); |
|
|
|
|
// do the work to issue
|
|
|
|
|
ctx->Start(); |
|
|
|
|
rpcs_outstanding_[next_channel_[thread_idx]]++; |
|
|
|
|
issued = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!issued) |
|
|
|
|
issue_allowed_[thread_idx] = false;
|
|
|
|
|
issue_allowed_[thread_idx] = false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
@ -264,6 +288,7 @@ class AsyncClient : public Client { |
|
|
|
|
|
|
|
|
|
std::vector<std::mutex> channel_rpc_lock_; |
|
|
|
|
std::vector<int> rpcs_outstanding_; // per-channel vector
|
|
|
|
|
std::vector<context_list> contexts_; // per-channel list of idle contexts
|
|
|
|
|
int max_outstanding_per_channel_; |
|
|
|
|
int channel_count_; |
|
|
|
|
}; |
|
|
|
@ -311,8 +336,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { |
|
|
|
|
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
|
|
|
|
return (this->*next_state_)(ok, hist); |
|
|
|
|
} |
|
|
|
|
void StartNewClone() GRPC_OVERRIDE { |
|
|
|
|
new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); |
|
|
|
|
ClientRpcContext* StartNewClone() GRPC_OVERRIDE { |
|
|
|
|
return new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); |
|
|
|
|
} |
|
|
|
|
void Start() GRPC_OVERRIDE {} |
|
|
|
|
private: |
|
|
|
|