Cancel predefine number of streaming

pull/18881/head
Na-Na Pang 6 years ago
parent e4d9b57900
commit a02c76dfb9
  1. 2
      test/cpp/microbenchmarks/callback_streaming_ping_pong.h
  2. 23
      test/cpp/microbenchmarks/callback_test_service.cc
  3. 1
      test/cpp/microbenchmarks/callback_test_service.h

@ -83,8 +83,6 @@ class BidiClient
void StartNewRpc() {
cli_ctx_->~ClientContext();
new (cli_ctx_) ClientContext();
cli_ctx_->AddMetadata(kServerFinishAfterNReads,
grpc::to_string(msgs_to_send_));
cli_ctx_->AddMetadata(kServerMessageSize, grpc::to_string(msgs_size_));
stub_->experimental_async()->BidiStream(cli_ctx_, this);
MaybeWrite();

@ -67,54 +67,41 @@ CallbackStreamingTestService::BidiStream() {
Reactor() {}
void OnStarted(ServerContext* context) override {
ctx_ = context;
server_write_last_ = GetIntValueFromMetadata(
kServerFinishAfterNReads, context->client_metadata(), 0);
message_size_ = GetIntValueFromMetadata(kServerMessageSize,
context->client_metadata(), 0);
StartRead(&request_);
}
void OnDone() override {
GPR_ASSERT(finished_);
GPR_ASSERT(num_msgs_read_ == server_write_last_);
delete this;
}
void OnCancel() override {}
void OnReadDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Server read failed");
// Stream is over
Finish(::grpc::Status::OK);
finished_ = true;
return;
}
num_msgs_read_++;
if (message_size_ > 0) {
response_.set_message(std::string(message_size_, 'a'));
} else {
response_.set_message("");
}
if (num_msgs_read_ < server_write_last_) {
StartWrite(&response_);
} else {
StartWriteLast(&response_, WriteOptions());
}
StartWrite(&response_);
}
void OnWriteDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Server write failed");
return;
}
if (num_msgs_read_ < server_write_last_) {
StartRead(&request_);
} else {
Finish(::grpc::Status::OK);
finished_ = true;
}
StartRead(&request_);
}
private:
ServerContext* ctx_;
EchoRequest request_;
EchoResponse response_;
int num_msgs_read_{0};
int server_write_last_;
int message_size_;
bool finished_{false};
};

@ -30,7 +30,6 @@
namespace grpc {
namespace testing {
const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
const char* const kServerMessageSize = "server_message_size";
class CallbackStreamingTestService

Loading…
Cancel
Save