|
|
@ -153,16 +153,22 @@ class SynchronousStreamingClient final : public SynchronousClient { |
|
|
|
StartThreads(num_threads_); |
|
|
|
StartThreads(num_threads_); |
|
|
|
} |
|
|
|
} |
|
|
|
~SynchronousStreamingClient() { |
|
|
|
~SynchronousStreamingClient() { |
|
|
|
|
|
|
|
std::vector<std::thread> cleanup_threads; |
|
|
|
for (size_t i = 0; i < num_threads_; i++) { |
|
|
|
for (size_t i = 0; i < num_threads_; i++) { |
|
|
|
auto stream = &stream_[i]; |
|
|
|
cleanup_threads.emplace_back([this, i]() { |
|
|
|
if (*stream) { |
|
|
|
auto stream = &stream_[i]; |
|
|
|
(*stream)->WritesDone(); |
|
|
|
if (*stream) { |
|
|
|
Status s = (*stream)->Finish(); |
|
|
|
(*stream)->WritesDone(); |
|
|
|
if (!s.ok()) { |
|
|
|
Status s = (*stream)->Finish(); |
|
|
|
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, |
|
|
|
if (!s.ok()) { |
|
|
|
s.error_message().c_str()); |
|
|
|
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i, |
|
|
|
|
|
|
|
s.error_message().c_str()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for (size_t i=0; i<num_threads_; i++) { |
|
|
|
|
|
|
|
cleanup_threads[i].join(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -179,6 +185,8 @@ class SynchronousStreamingClient final : public SynchronousClient { |
|
|
|
if ((messages_per_stream_ != 0) && |
|
|
|
if ((messages_per_stream_ != 0) && |
|
|
|
(++messages_issued_[thread_idx] < messages_per_stream_)) { |
|
|
|
(++messages_issued_[thread_idx] < messages_per_stream_)) { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
|
|
|
|
} else if (messages_per_stream_ == 0) { |
|
|
|
|
|
|
|
return true; |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Fall through to the below resetting code after finish
|
|
|
|
// Fall through to the below resetting code after finish
|
|
|
|
} |
|
|
|
} |
|
|
|