Merge pull request #16492 from vjpai/client_callback

EXPERIMENTAL: C++  generic client-side unary callback API
pull/16541/head^2
Vijay Pai 7 years ago committed by GitHub
commit 3bc10c0f44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 57
      CMakeLists.txt
  3. 64
      Makefile
  4. 18
      build.yaml
  5. 4
      gRPC-C++.podspec
  6. 2
      grpc.gyp
  7. 11
      include/grpcpp/channel.h
  8. 23
      include/grpcpp/generic/generic_stub.h
  9. 16
      include/grpcpp/impl/codegen/call.h
  10. 103
      include/grpcpp/impl/codegen/callback_common.h
  11. 14
      include/grpcpp/impl/codegen/channel_interface.h
  12. 95
      include/grpcpp/impl/codegen/client_callback.h
  13. 4
      include/grpcpp/impl/codegen/client_context.h
  14. 3
      include/grpcpp/impl/codegen/completion_queue.h
  15. 23
      include/grpcpp/impl/codegen/completion_queue_tag.h
  16. 24
      include/grpcpp/support/client_callback.h
  17. 4
      src/core/lib/surface/completion_queue.cc
  18. 48
      src/cpp/client/channel_cc.cc
  19. 14
      src/cpp/client/generic_stub.cc
  20. 131
      src/cpp/common/callback_common.cc
  21. 1
      src/cpp/server/server_cc.cc
  22. 3
      src/cpp/server/server_context.cc
  23. 19
      test/cpp/end2end/BUILD
  24. 126
      test/cpp/end2end/client_callback_end2end_test.cc
  25. 3
      tools/doxygen/Doxyfile.c++
  26. 4
      tools/doxygen/Doxyfile.c++.internal
  27. 26
      tools/run_tests/generated/sources_and_headers.json
  28. 24
      tools/run_tests/generated/tests.json

@ -119,6 +119,7 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc",
@ -243,6 +244,7 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
"include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@ -1979,7 +1981,9 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
"include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
"include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",

@ -565,6 +565,7 @@ add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
add_dependencies(buildtests_cxx check_gcp_environment_windows_test)
add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
add_dependencies(buildtests_cxx cli_call_test)
add_dependencies(buildtests_cxx client_callback_end2end_test)
add_dependencies(buildtests_cxx client_channel_stress_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_crash_test)
@ -2772,6 +2773,7 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@ -2918,6 +2920,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@ -3015,7 +3018,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@ -3130,6 +3135,7 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@ -3487,6 +3493,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@ -3584,7 +3591,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@ -3994,7 +4003,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@ -4172,7 +4183,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@ -4248,6 +4261,7 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@ -4393,6 +4407,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@ -4490,7 +4505,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@ -11327,6 +11344,46 @@ target_link_libraries(cli_call_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(client_callback_end2end_test
test/cpp/end2end/client_callback_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(client_callback_end2end_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(client_callback_end2end_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(client_channel_stress_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc

@ -1161,6 +1161,7 @@ check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linu
check_gcp_environment_windows_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test
chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
client_callback_end2end_test: $(BINDIR)/$(CONFIG)/client_callback_end2end_test
client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server
@ -1667,6 +1668,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@ -1847,6 +1849,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@ -2306,6 +2309,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
$(E) "[RUN] Testing client_callback_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/client_callback_end2end_test || ( echo test client_callback_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing client_channel_stress_test"
$(Q) $(BINDIR)/$(CONFIG)/client_channel_stress_test || ( echo test client_channel_stress_test failed ; exit 1 )
$(E) "[RUN] Testing client_crash_test"
@ -5223,6 +5228,7 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@ -5333,6 +5339,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@ -5430,7 +5437,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -5589,6 +5598,7 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@ -5909,6 +5919,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@ -6006,7 +6017,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -6396,7 +6409,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -6550,7 +6565,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -6665,6 +6682,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@ -6775,6 +6793,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@ -6872,7 +6891,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -17112,6 +17133,49 @@ endif
endif
CLIENT_CALLBACK_END2END_TEST_SRC = \
test/cpp/end2end/client_callback_end2end_test.cc \
CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/client_callback_end2end_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/client_callback_end2end_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/client_callback_end2end_test: $(PROTOBUF_DEP) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_callback_end2end_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
endif
endif
CLIENT_CHANNEL_STRESS_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \
test/cpp/client/client_channel_stress_test.cc \

@ -1169,7 +1169,9 @@ filegroups:
- include/grpcpp/impl/codegen/byte_buffer.h
- include/grpcpp/impl/codegen/call.h
- include/grpcpp/impl/codegen/call_hook.h
- include/grpcpp/impl/codegen/callback_common.h
- include/grpcpp/impl/codegen/channel_interface.h
- include/grpcpp/impl/codegen/client_callback.h
- include/grpcpp/impl/codegen/client_context.h
- include/grpcpp/impl/codegen/client_unary_call.h
- include/grpcpp/impl/codegen/completion_queue.h
@ -1297,6 +1299,7 @@ filegroups:
- include/grpcpp/support/async_unary_call.h
- include/grpcpp/support/byte_buffer.h
- include/grpcpp/support/channel_arguments.h
- include/grpcpp/support/client_callback.h
- include/grpcpp/support/config.h
- include/grpcpp/support/proto_buffer_reader.h
- include/grpcpp/support/proto_buffer_writer.h
@ -1324,6 +1327,7 @@ filegroups:
- src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
@ -4479,6 +4483,20 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: client_callback_end2end_test
gtest: true
cpu_cost: 0.5
build: test
language: c++
src:
- test/cpp/end2end/client_callback_end2end_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
- name: client_channel_stress_test
gtest: false
build: test

@ -111,6 +111,7 @@ Pod::Spec.new do |s|
'include/grpcpp/support/async_unary_call.h',
'include/grpcpp/support/byte_buffer.h',
'include/grpcpp/support/channel_arguments.h',
'include/grpcpp/support/client_callback.h',
'include/grpcpp/support/config.h',
'include/grpcpp/support/proto_buffer_reader.h',
'include/grpcpp/support/proto_buffer_writer.h',
@ -127,7 +128,9 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/byte_buffer.h',
'include/grpcpp/impl/codegen/call.h',
'include/grpcpp/impl/codegen/call_hook.h',
'include/grpcpp/impl/codegen/callback_common.h',
'include/grpcpp/impl/codegen/channel_interface.h',
'include/grpcpp/impl/codegen/client_callback.h',
'include/grpcpp/impl/codegen/client_context.h',
'include/grpcpp/impl/codegen/client_unary_call.h',
'include/grpcpp/impl/codegen/completion_queue.h',
@ -187,6 +190,7 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',

@ -1381,6 +1381,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
@ -1528,6 +1529,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',

@ -78,8 +78,19 @@ class Channel final : public ChannelInterface,
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
CompletionQueue* CallbackCQ() override;
const grpc::string host_;
grpc_channel* const c_channel_; // owned
// mu_ protects callback_cq_ (the per-channel callbackable completion queue)
std::mutex mu_;
// callback_cq_ references the callbackable completion queue associated
// with this channel (if any). It is set on the first call to CallbackCQ().
// It is _not owned_ by the channel; ownership belongs with its internal
// shutdown callback tag (invoked when the CQ is fully shutdown).
CompletionQueue* callback_cq_ = nullptr;
};
} // namespace grpc

@ -19,9 +19,12 @@
#ifndef GRPCPP_GENERIC_GENERIC_STUB_H
#define GRPCPP_GENERIC_GENERIC_STUB_H
#include <functional>
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/status.h>
namespace grpc {
@ -62,6 +65,26 @@ class GenericStub final {
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag);
/// NOTE: class experimental_type is not part of the public API of this class
/// TODO(vjpai): Move these contents to the public API of GenericStub when
/// they are no longer experimental
class experimental_type {
public:
explicit experimental_type(GenericStub* stub) : stub_(stub) {}
void UnaryCall(ClientContext* context, const grpc::string& method,
const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion);
private:
GenericStub* stub_;
};
/// NOTE: The function experimental() is not stable public API. It is a view
/// to the experimental components of this class. It may be changed or removed
/// at any time.
experimental_type experimental() { return experimental_type(this); }
private:
std::shared_ptr<ChannelInterface> channel_;
};

@ -599,6 +599,11 @@ class CallOpSetInterface : public CompletionQueueTag {
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
/// Get the tag to be used at the core completion queue. Generally, the
/// value of cq_tag will be "this". However, it can be overridden if we
/// want core to process the tag differently (e.g., as a core callback)
virtual void* cq_tag() = 0;
};
/// Primary implementation of CallOpSetInterface.
@ -618,7 +623,7 @@ class CallOpSet : public CallOpSetInterface,
public Op5,
public Op6 {
public:
CallOpSet() : return_tag_(this), call_(nullptr) {}
CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
@ -645,7 +650,16 @@ class CallOpSet : public CallOpSetInterface,
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
void* cq_tag() override { return cq_tag_; }
/// set_cq_tag is used to provide a different core CQ tag than "this".
/// This is used for callback-based tags, where the core tag is the core
/// callback function. It does not change the use or behavior of any other
/// function (such as FinalizeResult)
void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
private:
void* cq_tag_;
void* return_tag_;
grpc_call* call_;
};

@ -0,0 +1,103 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
#include <functional>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/status.h>
// Forward declarations
namespace grpc_core {
class CQCallbackInterface;
};
namespace grpc {
namespace internal {
class CallbackWithStatusTag {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithStatusTag));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
CompletionQueueTag* ops);
~CallbackWithStatusTag() {}
void* tag() { return static_cast<void*>(impl_); }
Status* status_ptr() { return status_; }
CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
void force_run(Status s);
private:
grpc_core::CQCallbackInterface* impl_;
Status* status_;
CompletionQueueTag* ops_;
};
class CallbackWithSuccessTag {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithSuccessTag));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops);
void* tag() { return static_cast<void*>(impl_); }
CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
void force_run(bool ok);
private:
grpc_core::CQCallbackInterface* impl_;
CompletionQueueTag* ops_;
};
} // namespace internal
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H

@ -41,6 +41,8 @@ class CallOpSetInterface;
class RpcMethod;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
class CallbackUnaryCallImpl;
template <class R>
class ClientAsyncReaderFactory;
template <class W>
@ -103,6 +105,8 @@ class ChannelInterface {
friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::CallbackUnaryCallImpl;
friend class ::grpc::internal::RpcMethod;
virtual internal::Call CreateCall(const internal::RpcMethod& method,
ClientContext* context,
@ -115,6 +119,16 @@ class ChannelInterface {
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
// EXPERIMENTAL
// A method to get the callbackable completion queue associated with this
// channel. If the return value is nullptr, this channel doesn't support
// callback operations.
// TODO(vjpai): Consider a better default like using a global CQ
// Returns nullptr (rather than being pure) since this is a new method
// and adding a new pure method to an interface would be a breaking change
// (even though this is private and non-API)
virtual CompletionQueue* CallbackCQ() { return nullptr; }
};
} // namespace grpc

@ -0,0 +1,95 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
#include <functional>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/status.h>
namespace grpc {
class Channel;
class ClientContext;
class CompletionQueue;
namespace internal {
class RpcMethod;
/// Perform a callback-based unary call
/// TODO(vjpai): Combine as much as possible with the blocking unary call code
template <class InputMessage, class OutputMessage>
void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage* request,
OutputMessage* result,
std::function<void(Status)> on_completion) {
CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
channel, method, context, request, result, on_completion);
}
template <class InputMessage, class OutputMessage>
class CallbackUnaryCallImpl {
public:
CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage* request,
OutputMessage* result,
std::function<void(Status)> on_completion) {
CompletionQueue* cq = channel->CallbackCQ();
GPR_CODEGEN_ASSERT(cq != nullptr);
Call call(channel->CreateCall(method, context, cq));
using FullCallOpSet =
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
CallOpClientSendClose, CallOpClientRecvStatus>;
auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(CallbackWithStatusTag)))
CallbackWithStatusTag(call.call(), on_completion, ops);
// TODO(vjpai): Unify code with sync API as much as possible
Status s = ops->SendMessage(*request);
if (!s.ok()) {
tag->force_run(s);
return;
}
ops->SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
ops->RecvInitialMetadata(context);
ops->RecvMessage(result);
ops->AllowNoMessage();
ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr());
ops->set_cq_tag(tag->tag());
call.PerformOps(ops);
}
};
} // namespace internal
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H

@ -68,6 +68,8 @@ class CallOpClientRecvStatus;
class CallOpRecvInitialMetadata;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
class CallbackUnaryCallImpl;
} // namespace internal
template <class R>
@ -389,6 +391,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::CallbackUnaryCallImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {

@ -274,6 +274,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
// Friends that need access to constructor for callback CQ
friend class ::grpc::Channel;
/// EXPERIMENTAL
/// Creates a Thread Local cache to store the first event
/// On this completion queue queued from this thread. Once

@ -26,10 +26,25 @@ namespace internal {
class CompletionQueueTag {
public:
virtual ~CompletionQueueTag() {}
/// Called prior to returning from Next(), return value is the status of the
/// operation (return status is the default thing to do). If this function
/// returns false, the tag is dropped and not returned from the completion
/// queue
/// FinalizeResult must be called before informing user code that the
/// operation bound to the underlying core completion queue tag has
/// completed. In practice, this means:
///
/// 1. For the sync API - before returning from Pluck
/// 2. For the CQ-based async API - before returning from Next
/// 3. For the callback-based API - before invoking the user callback
///
/// This is the method that translates from core-side tag/status to
/// C++ API-observable tag/status.
///
/// The return value is the status of the operation (returning status is the
/// general behavior of this function). If this function returns false, the
/// tag is dropped and not returned from the completion queue: this concept is
/// for events that are observed at core but not requested by the user
/// application (e.g., server shutdown, for server unimplemented method
/// responses, or for cases where a server-side RPC doesn't have a completion
/// notification registered using AsyncNotifyWhenDone)
virtual bool FinalizeResult(void** tag, bool* status) = 0;
};
} // namespace internal

@ -0,0 +1,24 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
#include <grpcpp/impl/codegen/client_callback.h>
#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H

@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
} else {
gpr_mu_unlock(cq->mu);
}
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}

@ -42,8 +42,10 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@ -53,7 +55,12 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
g_gli_initializer.summon();
}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
if (callback_cq_ != nullptr) {
callback_cq_->Shutdown();
}
}
namespace {
@ -135,8 +142,8 @@ void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops,
ops->cq_tag(), nullptr));
}
void* Channel::RegisterMethod(const char* method) {
@ -185,4 +192,39 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
return ok;
}
namespace {
class ShutdownCallback : public grpc_core::CQCallbackInterface {
public:
// TakeCQ takes ownership of the cq into the shutdown callback
// so that the shutdown callback will be responsible for destroying it
void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
void Run(bool) override {
delete cq_;
grpc_core::Delete(this);
}
private:
CompletionQueue* cq_ = nullptr;
};
} // namespace
CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered
std::lock_guard<std::mutex> l(mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
}
return callback_cq_;
}
} // namespace grpc

@ -16,9 +16,11 @@
*
*/
#include <grpcpp/generic/generic_stub.h>
#include <functional>
#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/client_callback.h>
namespace grpc {
@ -60,4 +62,14 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
context, request, false));
}
void GenericStub::experimental_type::UnaryCall(
ClientContext* context, const grpc::string& method,
const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion) {
internal::CallbackUnaryCall(
stub_->channel_.get(),
internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC),
context, request, response, std::move(on_completion));
}
} // namespace grpc

@ -0,0 +1,131 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <functional>
#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/status.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
namespace internal {
namespace {
class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
public:
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithSuccessImpl));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
std::function<void(bool)> f)
: call_(call), parent_(parent), func_(std::move(f)) {
grpc_call_ref(call);
}
void Run(bool ok) override {
void* ignored = parent_->ops();
bool new_ok = ok;
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == parent_->ops());
func_(ok);
func_ = nullptr; // release the function
grpc_call_unref(call_);
}
private:
grpc_call* call_;
CallbackWithSuccessTag* parent_;
std::function<void(bool)> func_;
};
class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
public:
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithStatusImpl));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
std::function<void(Status)> f)
: call_(call), parent_(parent), func_(std::move(f)), status_() {
grpc_call_ref(call);
}
void Run(bool ok) override {
void* ignored = parent_->ops();
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == parent_->ops());
func_(status_);
func_ = nullptr; // release the function
grpc_call_unref(call_);
}
Status* status_ptr() { return &status_; }
private:
grpc_call* call_;
CallbackWithStatusTag* parent_;
std::function<void(Status)> func_;
Status status_;
};
} // namespace
CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
std::function<void(bool)> f,
CompletionQueueTag* ops)
: impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
CallbackWithSuccessImpl(call, this, std::move(f))),
ops_(ops) {}
void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
std::function<void(Status)> f,
CompletionQueueTag* ops)
: ops_(ops) {
auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
CallbackWithStatusImpl(call, this, std::move(f));
impl_ = impl;
status_ = impl->status_ptr();
}
void CallbackWithStatusTag::force_run(Status s) {
*status_ = std::move(s);
impl_->Run(true);
}
} // namespace internal
} // namespace grpc

@ -657,6 +657,7 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
// TODO(vjpai): Use ops->cq_tag once this case supports callbacks
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
if (result != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);

@ -61,6 +61,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
/// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
void* cq_tag() override { return this; }
void Unref();
private:

@ -98,6 +98,25 @@ grpc_cc_binary(
],
)
grpc_cc_test(
name = "client_callback_end2end_test",
srcs = ["client_callback_end2end_test.cc"],
external_deps = [
"gtest",
],
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)
grpc_cc_library(
name = "end2end_test_lib",
testonly = True,

@ -0,0 +1,126 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <functional>
#include <mutex>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/client_callback.h>
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include <gtest/gtest.h>
namespace grpc {
namespace testing {
namespace {
class ClientCallbackEnd2endTest : public ::testing::Test {
protected:
ClientCallbackEnd2endTest() {}
void SetUp() override {
ServerBuilder builder;
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
void ResetStub() {
ChannelArguments args;
channel_ = server_->InProcessChannel(args);
stub_.reset(new GenericStub(channel_));
}
void TearDown() override {
if (is_server_started_) {
server_->Shutdown();
}
}
void SendRpcs(int num_rpcs) {
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
grpc::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
EchoRequest request;
std::unique_ptr<ByteBuffer> send_buf;
ByteBuffer recv_buf;
ClientContext cli_ctx;
test_string += "Hello world. ";
request.set_message(test_string);
send_buf = SerializeToByteBuffer(&request);
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->experimental().UnaryCall(
&cli_ctx, kMethodName, send_buf.get(), &recv_buf,
[&request, &recv_buf, &done, &mu, &cv](Status s) {
GPR_ASSERT(s.ok());
EchoResponse response;
EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
EXPECT_EQ(request.message(), response.message());
std::lock_guard<std::mutex> l(mu);
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
}
bool is_server_started_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::GenericStub> stub_;
TestServiceImpl service_;
std::unique_ptr<Server> server_;
};
TEST_F(ClientCallbackEnd2endTest, SimpleRpc) {
ResetStub();
SendRpcs(1);
}
TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
ResetStub();
SendRpcs(10);
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -945,7 +945,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -997,6 +999,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \

@ -946,7 +946,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@ -999,6 +1001,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@ -1187,6 +1190,7 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \

@ -3339,6 +3339,25 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "client_callback_end2end_test",
"src": [
"test/cpp/end2end/client_callback_end2end_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -11098,7 +11117,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
"include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
"include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@ -11164,7 +11185,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
"include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
"include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@ -11322,6 +11345,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
"include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@ -11426,6 +11450,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
"include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@ -11445,6 +11470,7 @@
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
"src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h",

@ -3997,6 +3997,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 0.5,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "client_callback_end2end_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save