|
|
|
@ -79,10 +79,29 @@ class SynchronousClient |
|
|
|
|
virtual ~SynchronousClient(){}; |
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
void WaitToIssue(int thread_idx) { |
|
|
|
|
// WaitToIssue returns false if we realize that we need to break out
|
|
|
|
|
bool WaitToIssue(int thread_idx) { |
|
|
|
|
if (!closed_loop_) { |
|
|
|
|
gpr_sleep_until(NextIssueTime(thread_idx)); |
|
|
|
|
const gpr_timespec next_issue_time = NextIssueTime(thread_idx); |
|
|
|
|
// Avoid sleeping for too long continuously because we might
|
|
|
|
|
// need to terminate before then. This is an issue since
|
|
|
|
|
// exponential distribution can occasionally produce bad outliers
|
|
|
|
|
while (true) { |
|
|
|
|
const gpr_timespec one_sec_delay = |
|
|
|
|
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
|
|
|
|
gpr_time_from_seconds(1, GPR_TIMESPAN)); |
|
|
|
|
if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) { |
|
|
|
|
gpr_sleep_until(next_issue_time); |
|
|
|
|
return true; |
|
|
|
|
} else { |
|
|
|
|
gpr_sleep_until(one_sec_delay); |
|
|
|
|
if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
size_t num_threads_; |
|
|
|
@ -101,7 +120,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { |
|
|
|
|
~SynchronousUnaryClient() {} |
|
|
|
|
|
|
|
|
|
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { |
|
|
|
|
WaitToIssue(thread_idx); |
|
|
|
|
if (!WaitToIssue(thread_idx)) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); |
|
|
|
|
double start = UsageTimer::Now(); |
|
|
|
|
GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0); |
|
|
|
@ -144,7 +165,9 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { |
|
|
|
|
WaitToIssue(thread_idx); |
|
|
|
|
if (!WaitToIssue(thread_idx)) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); |
|
|
|
|
double start = UsageTimer::Now(); |
|
|
|
|
if (stream_[thread_idx]->Write(request_) && |
|
|
|
|