diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc index 0d637c07fef..815780e40ff 100644 --- a/test/cpp/qps/client_callback.cc +++ b/test/cpp/qps/client_callback.cc @@ -253,18 +253,20 @@ class CallbackStreamingPingPongReactor final : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {} void StartNewRpc() { - if (client_->ThreadCompleted()) return; ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this); write_time_ = UsageTimer::Now(); StartWrite(client_->request()); + writes_done_started_.clear(); StartCall(); } void OnWriteDone(bool ok) override { - if (!ok || client_->ThreadCompleted()) { - if (!ok) gpr_log(GPR_ERROR, "Error writing RPC"); + if (!ok) { + gpr_log(GPR_ERROR, "Error writing RPC"); + } + if ((!ok || client_->ThreadCompleted()) && + !writes_done_started_.test_and_set()) { StartWritesDone(); - return; } StartRead(&ctx_->response_); } @@ -278,7 +280,9 @@ class CallbackStreamingPingPongReactor final if (!ok) { gpr_log(GPR_ERROR, "Error reading RPC"); } - StartWritesDone(); + if (!writes_done_started_.test_and_set()) { + StartWritesDone(); + } return; } write_time_ = UsageTimer::Now(); @@ -295,8 +299,6 @@ class CallbackStreamingPingPongReactor final } void ScheduleRpc() { - if (client_->ThreadCompleted()) return; - if (!client_->IsClosedLoop()) { gpr_timespec next_issue_time = client_->NextRPCIssueTime(); // Start an alarm callback to run the internal callback after @@ -312,6 +314,7 @@ class CallbackStreamingPingPongReactor final CallbackStreamingPingPongClient* client_; std::unique_ptr ctx_; + std::atomic_flag writes_done_started_; Client::Thread* thread_ptr_; // Needed to update histogram entries double write_time_; // Track ping-pong round start time int messages_issued_; // Messages issued by this stream