Dramatically reduce time required to complete sync test when running

with lots of threads (by parallelizing shutdown of course)
pull/7652/head
Vijay Pai 9 years ago
parent ffbfd01049
commit 773ecd62dd
  1. 18
      test/cpp/qps/client.h

@ -169,6 +169,7 @@ class Client {
// Must call AwaitThreadsCompletion before destructor to avoid a race // Must call AwaitThreadsCompletion before destructor to avoid a race
// between destructor and invocation of virtual ThreadFunc // between destructor and invocation of virtual ThreadFunc
void AwaitThreadsCompletion() { void AwaitThreadsCompletion() {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(1));
DestroyMultithreading(); DestroyMultithreading();
std::unique_lock<std::mutex> g(thread_completion_mu_); std::unique_lock<std::mutex> g(thread_completion_mu_);
while (threads_remaining_ != 0) { while (threads_remaining_ != 0) {
@ -180,6 +181,7 @@ class Client {
bool closed_loop_; bool closed_loop_;
void StartThreads(size_t num_threads) { void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(0));
threads_remaining_ = num_threads; threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i)); threads_.emplace_back(new Thread(this, i));
@ -241,16 +243,11 @@ class Client {
class Thread { class Thread {
public: public:
Thread(Client* client, size_t idx) Thread(Client* client, size_t idx)
: done_(false), : client_(client),
client_(client),
idx_(idx), idx_(idx),
impl_(&Thread::ThreadFunc, this) {} impl_(&Thread::ThreadFunc, this) {}
~Thread() { ~Thread() {
{
std::lock_guard<std::mutex> g(mu_);
done_ = true;
}
impl_.join(); impl_.join();
} }
@ -280,11 +277,14 @@ class Client {
if (entry.used()) { if (entry.used()) {
histogram_.Add(entry.value()); histogram_.Add(entry.value());
} }
bool done = false;
if (!thread_still_ok) { if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true; done = true;
} }
if (done_) { done = done || (gpr_atm_acq_load(&client_->thread_pool_done_) !=
static_cast<gpr_atm>(0));
if (done) {
client_->CompleteThread(); client_->CompleteThread();
return; return;
} }
@ -292,7 +292,6 @@ class Client {
} }
std::mutex mu_; std::mutex mu_;
bool done_;
Histogram histogram_; Histogram histogram_;
Client* client_; Client* client_;
const size_t idx_; const size_t idx_;
@ -305,6 +304,7 @@ class Client {
InterarrivalTimer interarrival_timer_; InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_; std::vector<gpr_timespec> next_time_;
gpr_atm thread_pool_done_;
std::mutex thread_completion_mu_; std::mutex thread_completion_mu_;
size_t threads_remaining_; size_t threads_remaining_;
std::condition_variable threads_complete_; std::condition_variable threads_complete_;

Loading…
Cancel
Save