Add assertion

pull/18881/head
Na-Na Pang 6 years ago
parent 9b50fbcaac
commit 1ea651aee3
  1. 1
      test/cpp/microbenchmarks/BUILD
  2. 20
      test/cpp/microbenchmarks/bm_cq.cc
  3. 95
      test/cpp/microbenchmarks/callback_streaming_ping_pong.h
  4. 3
      test/cpp/microbenchmarks/callback_test_service.cc
  5. 81
      test/cpp/microbenchmarks/callback_test_service.h
  6. 1
      test/cpp/microbenchmarks/callback_unary_ping_pong.h

@ -230,7 +230,6 @@ grpc_cc_library(
external_deps = [
"benchmark",
],
tags = ["no_windows"],
deps = [
":helpers",
"//src/proto/grpc/testing:echo_proto",

@ -144,21 +144,29 @@ static void BM_EmptyCore(benchmark::State& state) {
}
BENCHMARK(BM_EmptyCore);
// helper for tests to shutdown correctly and tersely
// Helper for tests to shutdown correctly and tersely
static void shutdown_and_destroy(grpc_completion_queue* cc) {
grpc_completion_queue_shutdown(cc);
grpc_completion_queue_destroy(cc);
}
// Tag completion queue iterate times
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback() { functor_run = &TagCallback::Run; }
TagCallback(int* iter) : iter_ (iter) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
GPR_ASSERT(static_cast<bool>(ok));
*static_cast<TagCallback*>(cb)->iter_ += 1;
};
private:
int* iter_;
};
// Check if completion queue is shut down
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
ShutdownCallback(bool* done) : done_(done) {
@ -175,7 +183,8 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
TrackCounters track_counters;
TagCallback tag;
int iteration = 0;
TagCallback tag_cb(&iteration);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc =
@ -184,12 +193,13 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_cq_completion completion;
GPR_ASSERT(grpc_cq_begin_op(cc, &tag));
grpc_cq_end_op(cc, &tag, GRPC_ERROR_NONE, DoneWithCompletionOnStack,
GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnStack,
nullptr, &completion);
}
shutdown_and_destroy(cc);
GPR_ASSERT(got_shutdown);
GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
track_counters.Finish(state);
}
BENCHMARK(BM_Callback_CQ_Pass1Core);

@ -34,16 +34,105 @@ namespace testing {
* BENCHMARKING KERNELS
*/
class BidiClient
: public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
public:
BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
ClientContext* cli_ctx, EchoRequest* request,
EchoResponse* response)
: state_{state},
stub_{stub},
cli_ctx_{cli_ctx},
request_{request},
response_{response} {
msgs_size_ = state->range(0);
msgs_to_send_ = state->range(1);
StartNewRpc();
}
void OnReadDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Client read failed");
return;
}
if (writes_complete_ < msgs_to_send_) {
MaybeWrite();
}
}
void OnWriteDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Client write failed");
return;
}
writes_complete_++;
StartRead(response_);
}
void OnDone(const Status& s) override {
GPR_ASSERT(s.ok());
GPR_ASSERT(writes_complete_ == msgs_to_send_);
if (state_->KeepRunning()) {
writes_complete_ = 0;
StartNewRpc();
} else {
std::unique_lock<std::mutex> l(mu);
done = true;
cv.notify_one();
}
}
void StartNewRpc() {
cli_ctx_->~ClientContext();
new (cli_ctx_) ClientContext();
cli_ctx_->AddMetadata(kServerFinishAfterNReads,
grpc::to_string(msgs_to_send_));
cli_ctx_->AddMetadata(kServerMessageSize, grpc::to_string(msgs_size_));
stub_->experimental_async()->BidiStream(cli_ctx_, this);
MaybeWrite();
StartCall();
}
void Await() {
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
private:
void MaybeWrite() {
if (writes_complete_ < msgs_to_send_) {
StartWrite(request_);
} else {
StartWritesDone();
}
}
benchmark::State* state_;
EchoTestService::Stub* stub_;
ClientContext* cli_ctx_;
EchoRequest* request_;
EchoResponse* response_;
int writes_complete_{0};
int msgs_to_send_;
int msgs_size_;
std::mutex mu;
std::condition_variable cv;
bool done;
};
template <class Fixture, class ClientContextMutator, class ServerContextMutator>
static void BM_CallbackBidiStreaming(benchmark::State& state) {
const int message_size = state.range(0);
const int max_ping_pongs = state.range(1);
int message_size = state.range(0);
int max_ping_pongs = state.range(1);
CallbackStreamingTestService service;
std::unique_ptr<Fixture> fixture(new Fixture(&service));
std::unique_ptr<EchoTestService::Stub> stub_(
EchoTestService::NewStub(fixture->channel()));
EchoRequest request;
EchoResponse response;
ClientContext cli_ctx;
if (message_size > 0) {
request.set_message(std::string(message_size, 'a'));
} else {
@ -51,7 +140,7 @@ static void BM_CallbackBidiStreaming(benchmark::State& state) {
}
if (state.KeepRunning()) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
BidiClient test{&state, stub_.get(), &request, &response};
BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
test.Await();
}
fixture->Finish(state);

@ -75,11 +75,13 @@ CallbackStreamingTestService::BidiStream() {
}
void OnDone() override {
GPR_ASSERT(finished_);
GPR_ASSERT(num_msgs_read_ == server_write_last_);
delete this;
}
void OnCancel() override {}
void OnReadDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Server read failed");
return;
}
num_msgs_read_++;
@ -96,6 +98,7 @@ CallbackStreamingTestService::BidiStream() {
}
void OnWriteDone(bool ok) override {
if (!ok) {
gpr_log(GPR_ERROR, "Server write failed");
return;
}
if (num_msgs_read_ < server_write_last_) {

@ -44,87 +44,6 @@ class CallbackStreamingTestService
experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream()
override;
};
class BidiClient
: public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
public:
BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
EchoRequest* request, EchoResponse* response)
: state_{state}, stub_{stub}, request_{request}, response_{response} {
msgs_size_ = state->range(0);
msgs_to_send_ = state->range(1);
StartNewRpc();
}
void OnReadDone(bool ok) override {
if (!ok) {
return;
}
if (writes_complete_ < msgs_to_send_) {
MaybeWrite();
}
}
void OnWriteDone(bool ok) override {
if (!ok) {
return;
}
writes_complete_++;
StartRead(response_);
}
void OnDone(const Status& s) override {
GPR_ASSERT(s.ok());
if (state_->KeepRunning()) {
writes_complete_ = 0;
StartNewRpc();
} else {
std::unique_lock<std::mutex> l(mu);
done = true;
cv.notify_one();
}
}
void StartNewRpc() {
cli_ctx_.reset(new ClientContext);
cli_ctx_.get()->AddMetadata(kServerFinishAfterNReads,
grpc::to_string(msgs_to_send_));
cli_ctx_.get()->AddMetadata(kServerMessageSize,
grpc::to_string(msgs_size_));
stub_->experimental_async()->BidiStream(cli_ctx_.get(), this);
MaybeWrite();
StartCall();
}
void Await() {
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
private:
void MaybeWrite() {
if (writes_complete_ < msgs_to_send_) {
StartWrite(request_);
} else {
StartWritesDone();
}
}
std::unique_ptr<ClientContext> cli_ctx_;
benchmark::State* state_;
EchoTestService::Stub* stub_;
EchoRequest* request_;
EchoResponse* response_;
int writes_complete_{0};
int msgs_to_send_;
int msgs_size_;
std::mutex mu;
std::condition_variable cv;
bool done;
};
} // namespace testing
} // namespace grpc
#endif // TEST_CPP_MICROBENCHMARKS_CALLBACK_TEST_SERVICE_H

@ -36,7 +36,6 @@ namespace testing {
* BENCHMARKING KERNELS
*/
// Send next rpc when callback function is evoked.
void SendCallbackUnaryPingPong(benchmark::State* state, ClientContext* cli_ctx,
EchoRequest* request, EchoResponse* response,
EchoTestService::Stub* stub_, bool* done,

Loading…
Cancel
Save