diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 2ff2e4e8a28..57592662c44 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -108,6 +108,9 @@ message ClientConfig { // Number of messages on a stream before it gets finished/restarted int32 messages_per_stream = 18; + + // Use coalescing API when possible. + bool use_coalesce_api = 19; } message ClientStatus { ClientStats stats = 1; } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 7cf9d3ea7e1..e3fba36a7a7 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -82,6 +82,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -349,10 +350,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - prepare_req_(prepare_req) {} + prepare_req_(prepare_req), + coalesce_(false) {} ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { - StartInternal(cq, config.messages_per_stream()); + StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api()); } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { @@ -375,7 +377,12 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { } start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; - stream_->Write(req_, ClientRpcContext::tag(this)); + if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) { + stream_->WriteLast(req_, WriteOptions(), + ClientRpcContext::tag(this)); + } else { + stream_->Write(req_, ClientRpcContext::tag(this)); + } return true; case State::WRITE_DONE: if (!ok) { @@ -391,6 +398,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { if ((messages_per_stream_ != 0) && (++messages_issued_ >= messages_per_stream_)) { next_state_ = State::WRITES_DONE_DONE; + if (coalesce_) { + // WritesDone should have been called on the last Write. + // loop around to call Finish. + break; + } stream_->WritesDone(ClientRpcContext::tag(this)); return true; } @@ -413,7 +425,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( stub_, req_, next_issue_, prepare_req_, callback_); - clone->StartInternal(cq, messages_per_stream_); + clone->StartInternal(cq, messages_per_stream_, coalesce_); } void TryCancel() override { context_.TryCancel(); } @@ -449,14 +461,27 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { // Allow a limit on number of messages in a stream int messages_per_stream_; int messages_issued_; + // Whether to use coalescing API. + bool coalesce_; - void StartInternal(CompletionQueue* cq, int messages_per_stream) { + void StartInternal(CompletionQueue* cq, int messages_per_stream, + bool coalesce) { cq_ = cq; messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + coalesce_ = coalesce; + if (coalesce_) { + GPR_ASSERT(messages_per_stream_ != 0); + context_.set_initial_metadata_corked(true); + } stream_ = prepare_req_(stub_, &context_, cq); next_state_ = State::STREAM_IDLE; stream_->StartCall(ClientRpcContext::tag(this)); + if (coalesce_) { + // When the intial metadata is corked, the tag will not come back and we + // need to manually drive the state machine. + RunNextState(true, nullptr); + } } }; @@ -512,6 +537,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromClientImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -641,6 +667,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromServerImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -753,6 +780,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq, config.messages_per_stream()); } bool RunNextState(bool ok, HistogramEntry* entry) override { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 82a3f0042d1..a2ddbeb508a 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -402,6 +402,7 @@ class SynchronousStreamingBothWaysClient final }; std::unique_ptr CreateSynchronousClient(const ClientConfig& config) { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. switch (config.rpc_type()) { case UNARY: return std::unique_ptr(new SynchronousUnaryClient(config));