diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 69ba8662336..82a3f0042d1 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -154,7 +154,12 @@ class SynchronousStreamingClient : public SynchronousClient { messages_issued_(num_threads_) { StartThreads(num_threads_); } - virtual ~SynchronousStreamingClient() {} + virtual ~SynchronousStreamingClient() { + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); + } protected: std::vector context_; @@ -187,18 +192,14 @@ class SynchronousStreamingClient : public SynchronousClient { new (&context_[thread_idx]) ClientContext(); } - virtual void CleanStream(size_t thread_idx) { - context_[thread_idx].TryCancel(); - } - - void CleanupAllStreams() { + void CleanupAllStreams(std::function cleaner) { std::vector cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i] { + cleanup_threads.emplace_back([this, i, cleaner] { std::lock_guard l(stream_mu_[i]); shutdown_[i].val = true; if (stream_[i]) { - CleanStream(i); + cleaner(i); } }); } @@ -209,7 +210,8 @@ class SynchronousStreamingClient : public SynchronousClient { private: void DestroyMultithreading() override final { - CleanupAllStreams(); + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); EndThreads(); } }; @@ -220,7 +222,10 @@ class SynchronousStreamingPingPongClient final public: SynchronousStreamingPingPongClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { CleanupAllStreams(); } + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: bool InitThreadFuncImpl(size_t thread_idx) override { @@ -267,12 +272,6 @@ class SynchronousStreamingPingPongClient final messages_issued_[thread_idx] = 0; return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingFromClientClient final @@ -280,7 +279,10 @@ class SynchronousStreamingFromClientClient final public: SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} - ~SynchronousStreamingFromClientClient() { CleanupAllStreams(); } + ~SynchronousStreamingFromClientClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: std::vector last_issue_; @@ -323,12 +325,6 @@ class SynchronousStreamingFromClientClient final } return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingFromServerClient final @@ -336,7 +332,7 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - ~SynchronousStreamingFromServerClient() { CleanupAllStreams(); } + ~SynchronousStreamingFromServerClient() {} private: std::vector last_recv_; @@ -374,11 +370,6 @@ class SynchronousStreamingFromServerClient final } return true; } - - void CleanStream(size_t thread_idx) override { - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingBothWaysClient final @@ -387,7 +378,10 @@ class SynchronousStreamingBothWaysClient final public: SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} - ~SynchronousStreamingBothWaysClient() { CleanupAllStreams(); } + ~SynchronousStreamingBothWaysClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: bool InitThreadFuncImpl(size_t thread_idx) override { @@ -405,12 +399,6 @@ class SynchronousStreamingBothWaysClient final // TODO (vjpai): Do this return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; std::unique_ptr CreateSynchronousClient(const ClientConfig& config) {