Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing.

pull/7285/head
Vijay Pai 9 years ago
parent ad7c527618
commit 40317fd720
  1. 25
      test/cpp/qps/client.h
  2. 54
      test/cpp/qps/client_async.cc
  3. 17
      test/cpp/qps/client_sync.cc
  4. 3
      test/cpp/qps/qps_worker.cc

@ -162,10 +162,20 @@ class Client {
return stats;
}
// Must call AwaitThreadsCompletion before destructor to avoid a race
// between destructor and invocation of virtual ThreadFunc
void AwaitThreadsCompletion() {
DestroyMultithreading();
std::unique_lock<std::mutex> g(thread_completion_mu_);
while (threads_remaining_ != 0) {
threads_complete_.wait(g);
}
}
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@ -173,6 +183,7 @@ class Client {
void EndThreads() { threads_.clear(); }
virtual void DestroyMultithreading() = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
@ -270,6 +281,7 @@ class Client {
done_ = true;
}
if (done_) {
client_->CompleteThread();
return;
}
}
@ -277,7 +289,6 @@ class Client {
std::mutex mu_;
bool done_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@ -289,6 +300,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
std::mutex thread_completion_mu_;
size_t threads_remaining_;
std::condition_variable threads_complete_;
void CompleteThread() {
std::lock_guard<std::mutex> g(thread_completion_mu_);
threads_remaining_--;
if (threads_remaining_ == 0) {
threads_complete_.notify_all();
}
}
};
template <class StubType, class RequestType>

@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
virtual ~AsyncClient() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
}
this->EndThreads(); // Need "this->" for resolution
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
void* got_tag;
bool ok;
@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
}
protected:
const int num_async_threads_;
private:
struct PerThreadShutdownState {
mutable std::mutex mutex;
bool shutdown;
PerThreadShutdownState() : shutdown(false) {}
};
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
}
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
}
this->EndThreads(); // this needed for resolution
}
bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
protected:
const int num_async_threads_;
private:
struct PerThreadShutdownState {
mutable std::mutex mutex;
bool shutdown;
PerThreadShutdownState() : shutdown(false) {}
};
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;

@ -87,6 +87,8 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
private:
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
~SynchronousUnaryClient() { EndThreads(); }
~SynchronousUnaryClient() {}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
EndThreads();
for (size_t i = 0; i < num_threads_; i++) {
auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
Status s = (*stream)->Finish();
EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
s.error_message().c_str());
}
Status s = (*stream)->Finish();
EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
s.error_message().c_str());
}
}
}
delete[] stream_;

@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
client->AwaitThreadsCompletion();
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}

Loading…
Cancel
Save