Add callback completion queue and modify callback streaming ping pong

pull/18881/head
Na-Na Pang 6 years ago
parent 25128d18c1
commit 5748665bc5
  1. 48
      CMakeLists.txt
  2. 49
      Makefile
  3. 23
      build.yaml
  4. 11
      test/cpp/microbenchmarks/BUILD
  5. 74
      test/cpp/microbenchmarks/bm_callback_streaming_ping_pong.cc
  6. 2
      test/cpp/microbenchmarks/callback_streaming_ping_pong.h
  7. 52
      test/cpp/microbenchmarks/callback_test_service.cc
  8. 7
      test/cpp/microbenchmarks/callback_test_service.h
  9. 21
      tools/run_tests/generated/sources_and_headers.json
  10. 26
      tools/run_tests/generated/tests.json

@ -556,6 +556,9 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_call_create)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_callback_cq)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_callback_streaming_ping_pong)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -11641,6 +11644,51 @@ target_link_libraries(bm_call_create
)
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(bm_callback_cq
test/cpp/microbenchmarks/bm_callback_cq.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(bm_callback_cq
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(bm_callback_cq
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_benchmark
${_gRPC_BENCHMARK_LIBRARIES}
grpc++_test_util_unsecure
grpc_test_util_unsecure
grpc++_unsecure
grpc_unsecure
gpr
grpc++_test_config
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)

@ -1162,6 +1162,7 @@ bm_alarm: $(BINDIR)/$(CONFIG)/bm_alarm
bm_arena: $(BINDIR)/$(CONFIG)/bm_arena
bm_byte_buffer: $(BINDIR)/$(CONFIG)/bm_byte_buffer
bm_call_create: $(BINDIR)/$(CONFIG)/bm_call_create
bm_callback_cq: $(BINDIR)/$(CONFIG)/bm_callback_cq
bm_callback_streaming_ping_pong: $(BINDIR)/$(CONFIG)/bm_callback_streaming_ping_pong
bm_callback_unary_ping_pong: $(BINDIR)/$(CONFIG)/bm_callback_unary_ping_pong
bm_channel: $(BINDIR)/$(CONFIG)/bm_channel
@ -1640,6 +1641,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_arena \
$(BINDIR)/$(CONFIG)/bm_byte_buffer \
$(BINDIR)/$(CONFIG)/bm_call_create \
$(BINDIR)/$(CONFIG)/bm_callback_cq \
$(BINDIR)/$(CONFIG)/bm_callback_streaming_ping_pong \
$(BINDIR)/$(CONFIG)/bm_callback_unary_ping_pong \
$(BINDIR)/$(CONFIG)/bm_channel \
@ -1786,6 +1788,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_arena \
$(BINDIR)/$(CONFIG)/bm_byte_buffer \
$(BINDIR)/$(CONFIG)/bm_call_create \
$(BINDIR)/$(CONFIG)/bm_callback_cq \
$(BINDIR)/$(CONFIG)/bm_callback_streaming_ping_pong \
$(BINDIR)/$(CONFIG)/bm_callback_unary_ping_pong \
$(BINDIR)/$(CONFIG)/bm_channel \
@ -2238,6 +2241,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/bm_byte_buffer || ( echo test bm_byte_buffer failed ; exit 1 )
$(E) "[RUN] Testing bm_call_create"
$(Q) $(BINDIR)/$(CONFIG)/bm_call_create || ( echo test bm_call_create failed ; exit 1 )
$(E) "[RUN] Testing bm_callback_cq"
$(Q) $(BINDIR)/$(CONFIG)/bm_callback_cq || ( echo test bm_callback_cq failed ; exit 1 )
$(E) "[RUN] Testing bm_callback_streaming_ping_pong"
$(Q) $(BINDIR)/$(CONFIG)/bm_callback_streaming_ping_pong || ( echo test bm_callback_streaming_ping_pong failed ; exit 1 )
$(E) "[RUN] Testing bm_callback_unary_ping_pong"
@ -14568,6 +14573,50 @@ endif
endif
BM_CALLBACK_CQ_SRC = \
test/cpp/microbenchmarks/bm_callback_cq.cc \
BM_CALLBACK_CQ_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_CALLBACK_CQ_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/bm_callback_cq: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/bm_callback_cq: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/bm_callback_cq: $(PROTOBUF_DEP) $(BM_CALLBACK_CQ_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(BM_CALLBACK_CQ_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_callback_cq
endif
endif
$(BM_CALLBACK_CQ_OBJS): CPPFLAGS += -Ithird_party/benchmark/include -DHAVE_POSIX_REGEX
$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_callback_cq.o: $(LIBDIR)/$(CONFIG)/libgrpc_benchmark.a $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
deps_bm_callback_cq: $(BM_CALLBACK_CQ_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BM_CALLBACK_CQ_OBJS:.o=.dep)
endif
endif
BM_CALLBACK_STREAMING_PING_PONG_SRC = \
test/cpp/microbenchmarks/bm_callback_streaming_ping_pong.cc \

@ -4040,6 +4040,29 @@ targets:
- linux
- posix
uses_polling: false
- name: bm_callback_cq
build: test
language: c++
src:
- test/cpp/microbenchmarks/bm_callback_cq.cc
deps:
- grpc_benchmark
- benchmark
- grpc++_test_util_unsecure
- grpc_test_util_unsecure
- grpc++_unsecure
- grpc_unsecure
- gpr
- grpc++_test_config
benchmark: true
defaults: benchmark
excluded_poll_engines:
- poll
platforms:
- mac
- linux
- posix
timeout_seconds: 1200
- name: bm_callback_streaming_ping_pong
build: test
language: c++

@ -276,3 +276,14 @@ grpc_cc_binary(
tags = ["no_windows"],
deps = [":callback_streaming_ping_pong_h"],
)
grpc_cc_binary(
name = "bm_callback_cq",
testonly = 1,
srcs = ["bm_callback_cq.cc"],
language = "C++",
deps = [
":helpers",
"//test/cpp/util:test_util",
],
)

@ -32,13 +32,11 @@ auto& force_library_initialization = Library::get();
// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use
// internal microbenchmarking tooling
static void StreamingPingPongMsgSizeArgs(benchmark::internal::Benchmark* b) {
int msg_size = 0;
// base case: 0 byte ping-pong msgs
b->Args({0, 1});
b->Args({0, 2});
for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
msg_size == 0 ? msg_size++ : msg_size *= 8) {
for (int msg_size = 1; msg_size <= 128 * 1024 * 1024; msg_size *= 8) {
b->Args({msg_size, 1});
b->Args({msg_size, 2});
}
@ -47,13 +45,9 @@ static void StreamingPingPongMsgSizeArgs(benchmark::internal::Benchmark* b) {
// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use
// internal microbenchmarking tooling
static void StreamingPingPongMsgsNumberArgs(benchmark::internal::Benchmark* b) {
int msg_number = 0;
for (msg_number = 0; msg_number <= 128 * 1024;
msg_number == 0 ? msg_number++ : msg_number *= 8) {
for (int msg_number = 1; msg_number <= 256 * 1024; msg_number *= 8) {
b->Args({0, msg_number});
// 64 KiB same as the synthetic test configuration
b->Args({64 * 1024, msg_number});
b->Args({1024, msg_number});
}
}
@ -64,12 +58,6 @@ BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgSizeArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgSizeArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcessCHTTP2, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgSizeArgs);
// Streaming with different message number
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
@ -78,43 +66,8 @@ BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgsNumberArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgsNumberArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcessCHTTP2, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgsNumberArgs);
// Client context with different metadata
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<31>, 1>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<100>, 1>,
NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<10>, 2>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<31>, 2>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<100>, 2>,
NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2,
Client_AddMetadata<RandomAsciiMetadata<100>, 1>, NoOpMutator)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess,
Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator)
->Args({0, 1});
@ -146,27 +99,6 @@ BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess,
->Args({0, 1});
// Server context with different metadata
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>)
->Args({0, 1});
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>)
->Args({0, 1});

@ -54,7 +54,7 @@ static void BM_CallbackBidiStreaming(benchmark::State& state) {
ClientContext cli_ctx;
cli_ctx.AddMetadata(kServerFinishAfterNReads,
grpc::to_string(max_ping_pongs));
cli_ctx.AddMetadata(kServerResponseStreamsToSend,
cli_ctx.AddMetadata(kServerMessageSize,
grpc::to_string(message_size));
BidiClient test{stub_.get(), &request, &response, &cli_ctx, max_ping_pongs};
test.Await();

@ -72,52 +72,48 @@ CallbackStreamingTestService::BidiStream() {
message_size_ = GetIntValueFromMetadata(kServerMessageSize,
context->client_metadata(), 0);
StartRead(&request_);
on_started_done_ = true;
}
void OnDone() override { delete this; }
void OnDone() override {
GPR_ASSERT(finished_);
delete this;
}
void OnCancel() override {}
void OnReadDone(bool ok) override {
if (ok) {
num_msgs_read_++;
if (message_size_ > 0) {
response_.set_message(std::string(message_size_, 'a'));
} else {
response_.set_message("");
}
if (num_msgs_read_ == server_write_last_) {
StartWriteLast(&response_, WriteOptions());
} else {
StartWrite(&response_);
return;
}
if (!ok) {
return;
}
num_msgs_read_++;
if (message_size_ > 0) {
response_.set_message(std::string(message_size_, 'a'));
} else {
response_.set_message("");
}
if (num_msgs_read_ == server_write_last_) {
StartWriteLast(&response_, WriteOptions());
} else {
StartWrite(&response_);
}
FinishOnce(Status::OK);
}
void OnWriteDone(bool ok) override {
std::lock_guard<std::mutex> l(finish_mu_);
if (!finished_) {
StartRead(&request_);
if (!ok) {
return;
}
}
private:
void FinishOnce(const Status& s) {
std::lock_guard<std::mutex> l(finish_mu_);
if (!finished_) {
Finish(s);
if (num_msgs_read_ < server_write_last_) {
StartRead(&request_);
} else {
Finish(::grpc::Status::OK);
finished_ = true;
}
}
private:
ServerContext* ctx_;
EchoRequest request_;
EchoResponse response_;
int num_msgs_read_{0};
int server_write_last_;
int message_size_;
std::mutex finish_mu_;
bool finished_{false};
bool on_started_done_{false};
};
return new Reactor;

@ -56,7 +56,6 @@ class BidiClient
msgs_to_send_{num_msgs_to_send} {
stub->experimental_async()->BidiStream(context_, this);
MaybeWrite();
StartRead(response_);
StartCall();
}
@ -64,9 +63,9 @@ class BidiClient
if (!ok) {
return;
}
if (reads_complete_ < msgs_to_send_) {
if (ok && reads_complete_ < msgs_to_send_) {
reads_complete_++;
StartRead(response_);
MaybeWrite();
}
}
@ -75,7 +74,7 @@ class BidiClient
return;
}
writes_complete_++;
MaybeWrite();
StartRead(response_);
}
void OnDone(const Status& s) override {

@ -2776,6 +2776,27 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"benchmark",
"gpr",
"grpc++_test_config",
"grpc++_test_util_unsecure",
"grpc++_unsecure",
"grpc_benchmark",
"grpc_test_util_unsecure",
"grpc_unsecure"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "bm_callback_cq",
"src": [
"test/cpp/microbenchmarks/bm_callback_cq.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"benchmark",

@ -3479,6 +3479,32 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": true,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"excluded_poll_engines": [
"poll"
],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "bm_callback_cq",
"platforms": [
"linux",
"mac",
"posix"
],
"timeout_seconds": 1200,
"uses_polling": true
},
{
"args": [],
"benchmark": true,

Loading…
Cancel
Save