|
|
|
@ -74,9 +74,6 @@ class SynchronousClient |
|
|
|
|
HistogramEntry entry; |
|
|
|
|
const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); |
|
|
|
|
t->UpdateHistogram(&entry); |
|
|
|
|
if (!thread_still_ok) { |
|
|
|
|
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); |
|
|
|
|
} |
|
|
|
|
if (!thread_still_ok || ThreadCompleted()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -170,6 +167,7 @@ class SynchronousStreamingClient : public SynchronousClient { |
|
|
|
|
std::vector<std::unique_ptr<StreamType>> stream_; |
|
|
|
|
// stream_mu_ is only needed when changing an element of stream_ or context_
|
|
|
|
|
std::vector<std::mutex> stream_mu_; |
|
|
|
|
// use struct Bool rather than bool because vector<bool> is not concurrent
|
|
|
|
|
struct Bool { |
|
|
|
|
bool val; |
|
|
|
|
Bool() : val(false) {} |
|
|
|
@ -183,8 +181,11 @@ class SynchronousStreamingClient : public SynchronousClient { |
|
|
|
|
// don't set the value since the stream is failed and shouldn't be timed
|
|
|
|
|
entry->set_status(s.error_code()); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, |
|
|
|
|
s.error_message().c_str()); |
|
|
|
|
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); |
|
|
|
|
if (!shutdown_[thread_idx].val) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", |
|
|
|
|
thread_idx, s.error_message().c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Lock the stream_mu_ now because the client context could change
|
|
|
|
|
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); |
|
|
|
|