Fix timer issues

pull/1948/head
vjpai 10 years ago
parent 1795985322
commit 924d459c27
  1. 15
      test/cpp/qps/client.h
  2. 21
      test/cpp/qps/client_async.cc
  3. 2
      test/cpp/qps/client_sync.cc

@ -45,6 +45,9 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
typedef std::chrono::system_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
class Client { class Client {
public: public:
explicit Client(const ClientConfig& config) : timer_(new Timer), explicit Client(const ClientConfig& config) : timer_(new Timer),
@ -145,19 +148,18 @@ class Client {
interarrival_timer_.init(*random_dist, num_threads); interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i<num_threads; i++) { for (size_t i = 0; i<num_threads; i++) {
next_time_.push_back(std::chrono::high_resolution_clock::now() next_time_.push_back(grpc_time_source::now() +
+ interarrival_timer_(i)); std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(i)));
} }
} }
} }
template<class Timepoint> bool NextIssueTime(int thread_idx, grpc_time *time_delay) {
bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
if (closed_loop_) { if (closed_loop_) {
return false; return false;
} }
else { else {
*time_delay = next_time_[thread_idx]; *time_delay = next_time_[thread_idx];
next_time_[thread_idx] += interarrival_timer_(thread_idx); next_time_[thread_idx] += std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(thread_idx));
return true; return true;
} }
} }
@ -226,8 +228,7 @@ class Client {
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
InterarrivalTimer interarrival_timer_; InterarrivalTimer interarrival_timer_;
std::vector<std::chrono::time_point std::vector<grpc_time> next_time_;
<std::chrono::high_resolution_clock>> next_time_;
}; };
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);

@ -32,8 +32,10 @@
*/ */
#include <cassert> #include <cassert>
#include <forward_list>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
@ -55,8 +57,6 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
typedef std::chrono::high_resolution_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
typedef std::forward_list<grpc_time> deadline_list; typedef std::forward_list<grpc_time> deadline_list;
class ClientRpcContext { class ClientRpcContext {
@ -98,7 +98,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
} }
void Start() GRPC_OVERRIDE { void Start() GRPC_OVERRIDE {
start_ = Timer::Now(); start_ = Timer::Now();
response_reader_.reset(start_req(stub_, &context_, req_)); response_reader_ = start_req_(stub_, &context_, req_);
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
} }
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
@ -142,7 +142,7 @@ class AsyncClient : public Client {
explicit AsyncClient(const ClientConfig& config, explicit AsyncClient(const ClientConfig& config,
std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*,
const SimpleRequest&)> setup_ctx) : const SimpleRequest&)> setup_ctx) :
Client(config) { Client(config), channel_rpc_lock_(config.client_channels()) {
for (int i = 0; i < config.async_client_threads(); i++) { for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
if (!closed_loop_) { if (!closed_loop_) {
@ -158,7 +158,6 @@ class AsyncClient : public Client {
if (!closed_loop_) { if (!closed_loop_) {
for (auto channel = channels_.begin(); channel != channels_.end(); for (auto channel = channels_.begin(); channel != channels_.end();
channel++) { channel++) {
channel_rpc_lock_.emplace_back();
rpcs_outstanding_.push_back(0); rpcs_outstanding_.push_back(0);
} }
} }
@ -202,6 +201,9 @@ class AsyncClient : public Client {
short_deadline = issue_allowed_[thread_idx] ? short_deadline = issue_allowed_[thread_idx] ?
next_issue_[thread_idx] : deadline; next_issue_[thread_idx] : deadline;
} }
bool got_event;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: case CompletionQueue::TIMEOUT:
@ -232,15 +234,16 @@ class AsyncClient : public Client {
bool issued = false; bool issued = false;
for (int num_attempts = 0; num_attempts < channel_count_ && !issued; for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]); std::lock_guard<std::mutex>
if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) { g(channel_rpc_lock_[next_channel_[thread_idx]]);
if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
// do the work to issue // do the work to issue
rpcs_outstanding[next_channel_[thread_idx]]++; rpcs_outstanding_[next_channel_[thread_idx]]++;
issued = true; issued = true;
} }
} }
if (!issued) if (!issued)
issue_allowed = false; issue_allowed_[thread_idx] = false;
} }
return true; return true;
} }

@ -77,7 +77,7 @@ class SynchronousClient : public Client {
protected: protected:
void WaitToIssue(int thread_idx) { void WaitToIssue(int thread_idx) {
std::chrono::time_point<std::chrono::high_resolution_clock> next_time; grpc_time next_time;
if (NextIssueTime(thread_idx, &next_time)) { if (NextIssueTime(thread_idx, &next_time)) {
std::this_thread::sleep_until(next_time); std::this_thread::sleep_until(next_time);
} }

Loading…
Cancel
Save