Better management of channel attempts and channel coverage

pull/1948/head
Vijay Pai 10 years ago
parent db398e0609
commit c7de81e57e
  1. 26
      test/cpp/qps/client_async.cc

@ -154,7 +154,8 @@ class AsyncClient : public Client {
Client(config), channel_lock_(config.client_channels()),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()) {
channel_count_(config.client_channels()),
pref_channel_inc_(config.async_client_threads()) {
SetupLoadTest(config, config.async_client_threads());
@ -265,17 +266,17 @@ class AsyncClient : public Client {
grpc_time_source::now() >= next_issue_[thread_idx]) {
// Attempt to issue
bool issued = false;
for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
num_attempts++) {
for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
num_attempts < channel_count_ && !issued; num_attempts++) {
bool can_issue = false;
ClientRpcContext* ctx = nullptr;
{
std::lock_guard<std::mutex>
g(channel_lock_[next_channel_[thread_idx]]);
if (!contexts_[next_channel_[thread_idx]].empty()) {
g(channel_lock_[channel_attempt]);
if (!contexts_[channel_attempt].empty()) {
// Get an idle context from the front of the list
ctx = *(contexts_[next_channel_[thread_idx]].begin());
contexts_[next_channel_[thread_idx]].pop_front();
ctx = *(contexts_[channel_attempt].begin());
contexts_[channel_attempt].pop_front();
can_issue = true;
}
}
@ -288,10 +289,14 @@ class AsyncClient : public Client {
ctx->set_deadline_posn(it);
ctx->Start(cli_cqs_[thread_idx].get());
issued = true;
// If we did issue, then next time, try our thread's next
// preferred channel
next_channel_[thread_idx] += pref_channel_inc_;
if (next_channel_[thread_idx] >= channel_count_)
next_channel_[thread_idx] = (thread_idx % channel_count_);
} else {
// Do a modular increment of next_channel only if we didn't issue
next_channel_[thread_idx] =
(next_channel_[thread_idx]+1)%channel_count_;
// Do a modular increment of channel attempt if we couldn't issue
channel_attempt = (channel_attempt+1) % channel_count_;
}
}
if (issued) {
@ -319,6 +324,7 @@ class AsyncClient : public Client {
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
int channel_count_;
int pref_channel_inc_;
};
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {

Loading…
Cancel
Save