Reuse reactor to send new RPC

pull/18881/head
Na-Na Pang 6 years ago
parent c96c21244c
commit 3fc702510f
  1. 11
      test/cpp/microbenchmarks/callback_streaming_ping_pong.h
  2. 50
      test/cpp/microbenchmarks/callback_test_service.h

@ -51,15 +51,8 @@ static void BM_CallbackBidiStreaming(benchmark::State& state) {
} }
if (state.KeepRunning()) { if (state.KeepRunning()) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0); GPR_TIMER_SCOPE("BenchmarkCycle", 0);
std::mutex mu; BidiClient test{&state, stub_.get(), &request, &response};
std::condition_variable cv; test.Await();
bool done = false;
BidiClient test{&state, stub_.get(), &request, &response, &mu, &cv, &done};
test.StartNewRpc();
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
} }
fixture->Finish(state); fixture->Finish(state);
fixture.reset(); fixture.reset();

@ -49,17 +49,11 @@ class BidiClient
: public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> { : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
public: public:
BidiClient(benchmark::State* state, EchoTestService::Stub* stub, BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
EchoRequest* request, EchoResponse* response, std::mutex* mu, EchoRequest* request, EchoResponse* response)
std::condition_variable* cv, bool* done) : state_{state}, stub_{stub}, request_{request}, response_{response} {
: state_{state},
stub_{stub},
request_{request},
response_{response},
mu_{mu},
cv_{cv},
done_(done) {
msgs_size_ = state->range(0); msgs_size_ = state->range(0);
msgs_to_send_ = state->range(1); msgs_to_send_ = state->range(1);
StartNewRpc();
} }
void OnReadDone(bool ok) override { void OnReadDone(bool ok) override {
@ -82,27 +76,33 @@ class BidiClient
void OnDone(const Status& s) override { void OnDone(const Status& s) override {
GPR_ASSERT(s.ok()); GPR_ASSERT(s.ok());
if (state_->KeepRunning()) { if (state_->KeepRunning()) {
BidiClient* test = writes_complete_ = 0;
new BidiClient(state_, stub_, request_, response_, mu_, cv_, done_); StartNewRpc();
test->StartNewRpc();
} else { } else {
std::unique_lock<std::mutex> l(*mu_); std::unique_lock<std::mutex> l(mu);
*done_ = true; done = true;
cv_->notify_one(); cv.notify_one();
} }
delete cli_ctx_;
} }
void StartNewRpc() { void StartNewRpc() {
cli_ctx_ = new ClientContext(); cli_ctx_.reset(new ClientContext);
cli_ctx_->AddMetadata(kServerFinishAfterNReads, cli_ctx_.get()->AddMetadata(kServerFinishAfterNReads,
grpc::to_string(msgs_to_send_)); grpc::to_string(msgs_to_send_));
cli_ctx_->AddMetadata(kServerMessageSize, grpc::to_string(msgs_size_)); cli_ctx_.get()->AddMetadata(kServerMessageSize,
stub_->experimental_async()->BidiStream(cli_ctx_, this); grpc::to_string(msgs_size_));
stub_->experimental_async()->BidiStream(cli_ctx_.get(), this);
MaybeWrite(); MaybeWrite();
StartCall(); StartCall();
} }
void Await() {
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
private: private:
void MaybeWrite() { void MaybeWrite() {
if (writes_complete_ < msgs_to_send_) { if (writes_complete_ < msgs_to_send_) {
@ -112,7 +112,7 @@ class BidiClient
} }
} }
ClientContext* cli_ctx_; std::unique_ptr<ClientContext> cli_ctx_;
benchmark::State* state_; benchmark::State* state_;
EchoTestService::Stub* stub_; EchoTestService::Stub* stub_;
EchoRequest* request_; EchoRequest* request_;
@ -120,9 +120,9 @@ class BidiClient
int writes_complete_{0}; int writes_complete_{0};
int msgs_to_send_; int msgs_to_send_;
int msgs_size_; int msgs_size_;
std::mutex* mu_; std::mutex mu;
std::condition_variable* cv_; std::condition_variable cv;
bool* done_; bool done;
}; };
} // namespace testing } // namespace testing

Loading…
Cancel
Save