|
|
|
@ -221,11 +221,11 @@ class CallbackStreamingClient : public CallbackClient { |
|
|
|
|
} |
|
|
|
|
~CallbackStreamingClient() {} |
|
|
|
|
|
|
|
|
|
void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) { |
|
|
|
|
void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) { |
|
|
|
|
// Update Histogram with data from the callback run
|
|
|
|
|
HistogramEntry entry; |
|
|
|
|
if (ok) { |
|
|
|
|
entry.set_value((UsageTimer::Now() - start_) * 1e9); |
|
|
|
|
entry.set_value((UsageTimer::Now() - start) * 1e9); |
|
|
|
|
} |
|
|
|
|
thread_ptr->UpdateHistogram(&entry); |
|
|
|
|
} |
|
|
|
@ -254,8 +254,8 @@ class CallbackStreamingPingPongReactor final |
|
|
|
|
|
|
|
|
|
void StartNewRpc() { |
|
|
|
|
if (client_->ThreadCompleted()) return; |
|
|
|
|
start_ = UsageTimer::Now(); |
|
|
|
|
ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this); |
|
|
|
|
write_time_ = UsageTimer::Now(); |
|
|
|
|
StartWrite(client_->request()); |
|
|
|
|
StartCall(); |
|
|
|
|
} |
|
|
|
@ -270,7 +270,7 @@ class CallbackStreamingPingPongReactor final |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void OnReadDone(bool ok) override { |
|
|
|
|
client_->AddHistogramEntry(start_, ok, thread_ptr_); |
|
|
|
|
client_->AddHistogramEntry(write_time_, ok, thread_ptr_); |
|
|
|
|
|
|
|
|
|
if (client_->ThreadCompleted() || !ok || |
|
|
|
|
(client_->messages_per_stream() != 0 && |
|
|
|
@ -281,6 +281,7 @@ class CallbackStreamingPingPongReactor final |
|
|
|
|
StartWritesDone(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
write_time_ = UsageTimer::Now(); |
|
|
|
|
StartWrite(client_->request()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -312,7 +313,7 @@ class CallbackStreamingPingPongReactor final |
|
|
|
|
CallbackStreamingPingPongClient* client_; |
|
|
|
|
std::unique_ptr<CallbackClientRpcContext> ctx_; |
|
|
|
|
Client::Thread* thread_ptr_; // Needed to update histogram entries
|
|
|
|
|
double start_; // Track message start time
|
|
|
|
|
double write_time_; // Track ping-pong round start time
|
|
|
|
|
int messages_issued_; // Messages issued by this stream
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|