Merge pull request #18113 from vjpai/cli_streaming_qps

Fix termination condition of streaming callback QPS tests
pull/18098/head
Vijay Pai 6 years ago committed by GitHub
commit 837055509e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      test/cpp/qps/client_callback.cc

@ -253,18 +253,20 @@ class CallbackStreamingPingPongReactor final
: client_(client), ctx_(std::move(ctx)), messages_issued_(0) {} : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
void StartNewRpc() { void StartNewRpc() {
if (client_->ThreadCompleted()) return;
ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this); ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
write_time_ = UsageTimer::Now(); write_time_ = UsageTimer::Now();
StartWrite(client_->request()); StartWrite(client_->request());
writes_done_started_.clear();
StartCall(); StartCall();
} }
void OnWriteDone(bool ok) override { void OnWriteDone(bool ok) override {
if (!ok || client_->ThreadCompleted()) { if (!ok) {
if (!ok) gpr_log(GPR_ERROR, "Error writing RPC"); gpr_log(GPR_ERROR, "Error writing RPC");
}
if ((!ok || client_->ThreadCompleted()) &&
!writes_done_started_.test_and_set()) {
StartWritesDone(); StartWritesDone();
return;
} }
StartRead(&ctx_->response_); StartRead(&ctx_->response_);
} }
@ -278,7 +280,9 @@ class CallbackStreamingPingPongReactor final
if (!ok) { if (!ok) {
gpr_log(GPR_ERROR, "Error reading RPC"); gpr_log(GPR_ERROR, "Error reading RPC");
} }
if (!writes_done_started_.test_and_set()) {
StartWritesDone(); StartWritesDone();
}
return; return;
} }
write_time_ = UsageTimer::Now(); write_time_ = UsageTimer::Now();
@ -295,8 +299,6 @@ class CallbackStreamingPingPongReactor final
} }
void ScheduleRpc() { void ScheduleRpc() {
if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) { if (!client_->IsClosedLoop()) {
gpr_timespec next_issue_time = client_->NextRPCIssueTime(); gpr_timespec next_issue_time = client_->NextRPCIssueTime();
// Start an alarm callback to run the internal callback after // Start an alarm callback to run the internal callback after
@ -312,6 +314,7 @@ class CallbackStreamingPingPongReactor final
CallbackStreamingPingPongClient* client_; CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_; std::unique_ptr<CallbackClientRpcContext> ctx_;
std::atomic_flag writes_done_started_;
Client::Thread* thread_ptr_; // Needed to update histogram entries Client::Thread* thread_ptr_; // Needed to update histogram entries
double write_time_; // Track ping-pong round start time double write_time_; // Track ping-pong round start time
int messages_issued_; // Messages issued by this stream int messages_issued_; // Messages issued by this stream

Loading…
Cancel
Save