|
|
|
@ -196,9 +196,9 @@ class SynchronousStreamingClient : public SynchronousClient { |
|
|
|
|
for (size_t i = 0; i < num_threads_; i++) { |
|
|
|
|
cleanup_threads.emplace_back([this, i] { |
|
|
|
|
std::lock_guard<std::mutex> l(stream_mu_[i]); |
|
|
|
|
shutdown_[i].val = true; |
|
|
|
|
shutdown_[i].val = true; |
|
|
|
|
if (stream_[i]) { |
|
|
|
|
CleanStream(i); |
|
|
|
|
CleanStream(i); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
@ -206,6 +206,7 @@ class SynchronousStreamingClient : public SynchronousClient { |
|
|
|
|
th.join(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void DestroyMultithreading() override final { |
|
|
|
|
CleanupAllStreams(); |
|
|
|
@ -219,9 +220,8 @@ class SynchronousStreamingPingPongClient final |
|
|
|
|
public: |
|
|
|
|
SynchronousStreamingPingPongClient(const ClientConfig& config) |
|
|
|
|
: SynchronousStreamingClient(config) {} |
|
|
|
|
~SynchronousStreamingPingPongClient() { |
|
|
|
|
CleanupAllStreams(); |
|
|
|
|
} |
|
|
|
|
~SynchronousStreamingPingPongClient() { CleanupAllStreams(); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
bool InitThreadFuncImpl(size_t thread_idx) override { |
|
|
|
|
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); |
|
|
|
@ -280,9 +280,7 @@ class SynchronousStreamingFromClientClient final |
|
|
|
|
public: |
|
|
|
|
SynchronousStreamingFromClientClient(const ClientConfig& config) |
|
|
|
|
: SynchronousStreamingClient(config), last_issue_(num_threads_) {} |
|
|
|
|
~SynchronousStreamingFromClientClient() { |
|
|
|
|
CleanupAllStreams(); |
|
|
|
|
} |
|
|
|
|
~SynchronousStreamingFromClientClient() { CleanupAllStreams(); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
std::vector<double> last_issue_; |
|
|
|
@ -338,9 +336,7 @@ class SynchronousStreamingFromServerClient final |
|
|
|
|
public: |
|
|
|
|
SynchronousStreamingFromServerClient(const ClientConfig& config) |
|
|
|
|
: SynchronousStreamingClient(config), last_recv_(num_threads_) {} |
|
|
|
|
~SynchronousStreamingFromServerClient() { |
|
|
|
|
CleanupAllStreams(); |
|
|
|
|
} |
|
|
|
|
~SynchronousStreamingFromServerClient() { CleanupAllStreams(); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
std::vector<double> last_recv_; |
|
|
|
@ -391,9 +387,8 @@ class SynchronousStreamingBothWaysClient final |
|
|
|
|
public: |
|
|
|
|
SynchronousStreamingBothWaysClient(const ClientConfig& config) |
|
|
|
|
: SynchronousStreamingClient(config) {} |
|
|
|
|
~SynchronousStreamingBothWaysClient() { |
|
|
|
|
CleanupAllStreams(); |
|
|
|
|
} |
|
|
|
|
~SynchronousStreamingBothWaysClient() { CleanupAllStreams(); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
bool InitThreadFuncImpl(size_t thread_idx) override { |
|
|
|
|
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); |
|
|
|
|