From a02c76dfb920da5039a48faad0b3daac0ead05d1 Mon Sep 17 00:00:00 2001 From: Na-Na Pang Date: Mon, 13 May 2019 13:29:06 -0700 Subject: [PATCH] Cancel predefine number of streaming --- .../callback_streaming_ping_pong.h | 2 -- .../microbenchmarks/callback_test_service.cc | 23 ++++--------------- .../microbenchmarks/callback_test_service.h | 1 - 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h index 0f4549df3ff..9fb86bd8299 100644 --- a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h @@ -83,8 +83,6 @@ class BidiClient void StartNewRpc() { cli_ctx_->~ClientContext(); new (cli_ctx_) ClientContext(); - cli_ctx_->AddMetadata(kServerFinishAfterNReads, - grpc::to_string(msgs_to_send_)); cli_ctx_->AddMetadata(kServerMessageSize, grpc::to_string(msgs_size_)); stub_->experimental_async()->BidiStream(cli_ctx_, this); MaybeWrite(); diff --git a/test/cpp/microbenchmarks/callback_test_service.cc b/test/cpp/microbenchmarks/callback_test_service.cc index 2473dc5f31d..321a5b39184 100644 --- a/test/cpp/microbenchmarks/callback_test_service.cc +++ b/test/cpp/microbenchmarks/callback_test_service.cc @@ -67,54 +67,41 @@ CallbackStreamingTestService::BidiStream() { Reactor() {} void OnStarted(ServerContext* context) override { ctx_ = context; - server_write_last_ = GetIntValueFromMetadata( - kServerFinishAfterNReads, context->client_metadata(), 0); message_size_ = GetIntValueFromMetadata(kServerMessageSize, context->client_metadata(), 0); StartRead(&request_); } void OnDone() override { GPR_ASSERT(finished_); - GPR_ASSERT(num_msgs_read_ == server_write_last_); delete this; } void OnCancel() override {} void OnReadDone(bool ok) override { if (!ok) { - gpr_log(GPR_ERROR, "Server read failed"); + // Stream is over + Finish(::grpc::Status::OK); + finished_ = true; return; } - num_msgs_read_++; if (message_size_ > 0) { response_.set_message(std::string(message_size_, 'a')); } else { response_.set_message(""); } - if (num_msgs_read_ < server_write_last_) { - StartWrite(&response_); - } else { - StartWriteLast(&response_, WriteOptions()); - } + StartWrite(&response_); } void OnWriteDone(bool ok) override { if (!ok) { gpr_log(GPR_ERROR, "Server write failed"); return; } - if (num_msgs_read_ < server_write_last_) { - StartRead(&request_); - } else { - Finish(::grpc::Status::OK); - finished_ = true; - } + StartRead(&request_); } private: ServerContext* ctx_; EchoRequest request_; EchoResponse response_; - int num_msgs_read_{0}; - int server_write_last_; int message_size_; bool finished_{false}; }; diff --git a/test/cpp/microbenchmarks/callback_test_service.h b/test/cpp/microbenchmarks/callback_test_service.h index dd7977a5913..97188595382 100644 --- a/test/cpp/microbenchmarks/callback_test_service.h +++ b/test/cpp/microbenchmarks/callback_test_service.h @@ -30,7 +30,6 @@ namespace grpc { namespace testing { -const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; const char* const kServerMessageSize = "server_message_size"; class CallbackStreamingTestService