From 6929cdd65454f0e77d5ad931209a4207ae15331c Mon Sep 17 00:00:00 2001 From: yang-g Date: Tue, 16 Apr 2019 14:16:55 -0700 Subject: [PATCH] initial --- BUILD | 2 + BUILD.gn | 2 + CMakeLists.txt | 49 +++ Makefile | 56 +++ build.yaml | 15 + gRPC-C++.podspec | 2 + .../grpcpp/impl/codegen/message_allocator.h | 53 +++ .../grpcpp/impl/codegen/method_handler_impl.h | 12 +- .../grpcpp/impl/codegen/rpc_service_method.h | 8 +- include/grpcpp/impl/codegen/server_callback.h | 105 +++-- include/grpcpp/support/message_allocator.h | 24 ++ src/compiler/cpp_generator.cc | 20 +- src/cpp/server/server_cc.cc | 12 +- src/proto/grpc/testing/echo_messages.proto | 2 + test/cpp/end2end/BUILD | 20 + .../end2end/message_allocator_end2end_test.cc | 395 ++++++++++++++++++ tools/doxygen/Doxyfile.c++ | 2 + tools/doxygen/Doxyfile.c++.internal | 2 + .../generated/sources_and_headers.json | 22 + tools/run_tests/generated/tests.json | 24 ++ 20 files changed, 789 insertions(+), 38 deletions(-) create mode 100644 include/grpcpp/impl/codegen/message_allocator.h create mode 100644 include/grpcpp/support/message_allocator.h create mode 100644 test/cpp/end2end/message_allocator_end2end_test.cc diff --git a/BUILD b/BUILD index fd75012d214..ad040d1f924 100644 --- a/BUILD +++ b/BUILD @@ -268,6 +268,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", @@ -2127,6 +2128,7 @@ grpc_cc_library( "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", diff --git a/BUILD.gn b/BUILD.gn index 21f567d9af4..6f1864a6d8c 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -1047,6 +1047,7 @@ config("grpc_config") { "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/proto_buffer_reader.h", @@ -1101,6 +1102,7 @@ config("grpc_config") { "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e1bee493f8..cfb5af701cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -660,6 +660,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx json_run_localhost) endif() add_dependencies(buildtests_cxx memory_test) +add_dependencies(buildtests_cxx message_allocator_end2end_test) add_dependencies(buildtests_cxx metrics_client) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx nonblocking_test) @@ -3053,6 +3054,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -3168,6 +3170,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -3656,6 +3659,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -3771,6 +3775,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4203,6 +4208,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4399,6 +4405,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4632,6 +4639,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -4747,6 +4755,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -14495,6 +14504,46 @@ target_link_libraries(memory_test ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + +add_executable(message_allocator_end2end_test + test/cpp/end2end/message_allocator_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(message_allocator_end2end_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(message_allocator_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + + endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 5c3e0614c7a..235ec8297d8 100644 --- a/Makefile +++ b/Makefile @@ -1233,6 +1233,7 @@ interop_server: $(BINDIR)/$(CONFIG)/interop_server interop_test: $(BINDIR)/$(CONFIG)/interop_test json_run_localhost: $(BINDIR)/$(CONFIG)/json_run_localhost memory_test: $(BINDIR)/$(CONFIG)/memory_test +message_allocator_end2end_test: $(BINDIR)/$(CONFIG)/message_allocator_end2end_test metrics_client: $(BINDIR)/$(CONFIG)/metrics_client mock_test: $(BINDIR)/$(CONFIG)/mock_test nonblocking_test: $(BINDIR)/$(CONFIG)/nonblocking_test @@ -1699,6 +1700,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/interop_test \ $(BINDIR)/$(CONFIG)/json_run_localhost \ $(BINDIR)/$(CONFIG)/memory_test \ + $(BINDIR)/$(CONFIG)/message_allocator_end2end_test \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ @@ -1842,6 +1844,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/interop_test \ $(BINDIR)/$(CONFIG)/json_run_localhost \ $(BINDIR)/$(CONFIG)/memory_test \ + $(BINDIR)/$(CONFIG)/message_allocator_end2end_test \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ @@ -2341,6 +2344,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 ) $(E) "[RUN] Testing memory_test" $(Q) $(BINDIR)/$(CONFIG)/memory_test || ( echo test memory_test failed ; exit 1 ) + $(E) "[RUN] Testing message_allocator_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/message_allocator_end2end_test || ( echo test message_allocator_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing mock_test" $(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 ) $(E) "[RUN] Testing nonblocking_test" @@ -5388,6 +5393,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -5503,6 +5509,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -5999,6 +6006,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -6114,6 +6122,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6518,6 +6527,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6685,6 +6695,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6924,6 +6935,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -7039,6 +7051,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -17400,6 +17413,49 @@ endif endif +MESSAGE_ALLOCATOR_END2END_TEST_SRC = \ + test/cpp/end2end/message_allocator_end2end_test.cc \ + +MESSAGE_ALLOCATOR_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(MESSAGE_ALLOCATOR_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: $(PROTOBUF_DEP) $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/message_allocator_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/message_allocator_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_message_allocator_end2end_test: $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS:.o=.dep) +endif +endif + + METRICS_CLIENT_SRC = \ $(GENDIR)/src/proto/grpc/testing/metrics.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.grpc.pb.cc \ test/cpp/interop/metrics_client.cc \ diff --git a/build.yaml b/build.yaml index 6683db05fd4..efc89c7594c 100644 --- a/build.yaml +++ b/build.yaml @@ -1258,6 +1258,7 @@ filegroups: - include/grpcpp/impl/codegen/intercepted_channel.h - include/grpcpp/impl/codegen/interceptor.h - include/grpcpp/impl/codegen/interceptor_common.h + - include/grpcpp/impl/codegen/message_allocator.h - include/grpcpp/impl/codegen/metadata_map.h - include/grpcpp/impl/codegen/method_handler_impl.h - include/grpcpp/impl/codegen/rpc_method.h @@ -1395,6 +1396,7 @@ filegroups: - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h - include/grpcpp/support/interceptor.h + - include/grpcpp/support/message_allocator.h - include/grpcpp/support/proto_buffer_reader.h - include/grpcpp/support/proto_buffer_writer.h - include/grpcpp/support/server_callback.h @@ -5063,6 +5065,19 @@ targets: uses: - grpc++_test uses_polling: false +- name: message_allocator_end2end_test + gtest: true + cpu_cost: 0.5 + build: test + language: c++ + src: + - test/cpp/end2end/message_allocator_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr - name: metrics_client build: test run: false diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 4d858ab6c77..1c4c7e48d91 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -132,6 +132,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/client_interceptor.h', 'include/grpcpp/support/config.h', 'include/grpcpp/support/interceptor.h', + 'include/grpcpp/support/message_allocator.h', 'include/grpcpp/support/proto_buffer_reader.h', 'include/grpcpp/support/proto_buffer_writer.h', 'include/grpcpp/support/server_callback.h', @@ -166,6 +167,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/intercepted_channel.h', 'include/grpcpp/impl/codegen/interceptor.h', 'include/grpcpp/impl/codegen/interceptor_common.h', + 'include/grpcpp/impl/codegen/message_allocator.h', 'include/grpcpp/impl/codegen/metadata_map.h', 'include/grpcpp/impl/codegen/method_handler_impl.h', 'include/grpcpp/impl/codegen/rpc_method.h', diff --git a/include/grpcpp/impl/codegen/message_allocator.h b/include/grpcpp/impl/codegen/message_allocator.h new file mode 100644 index 00000000000..8f37d36eb8b --- /dev/null +++ b/include/grpcpp/impl/codegen/message_allocator.h @@ -0,0 +1,53 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H_ +#define GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H_ + +namespace grpc { + +// This is per rpc struct for the allocator. We can potentially put the grpc +// call arena in here in the future. +template +struct RpcAllocatorInfo { + RequestT* request = nullptr; + ResponseT* response = nullptr; + // per rpc allocator internal state. MessageAllocator can set it when + // AllocateMessages is called and use it later. + void* allocator_state = nullptr; +}; + +// Implementations need to be thread-safe +template +class MessageAllocator { + public: + virtual ~MessageAllocator() = default; + // Allocate both request and response + virtual void AllocateMessages( + RpcAllocatorInfo* info) = 0; + // Optional: deallocate request early, called by + // ServerCallbackRpcController::ReleaseRequest + virtual void DeallocateRequest(RpcAllocatorInfo* info) {} + // Deallocate response and request (if applicable) + virtual void DeallocateMessages( + RpcAllocatorInfo* info) = 0; +}; + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H_ diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 094286294c2..dee1cb56ad1 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -86,8 +86,8 @@ class RpcMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( @@ -191,8 +191,8 @@ class ServerStreamingHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( @@ -327,8 +327,8 @@ class ErrorMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { // We have to destroy any request payload if (req != nullptr) { g_core_codegen_interface->grpc_byte_buffer_destroy(req); diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index 56df61cdfae..21fb2ac2130 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -46,21 +46,25 @@ class MethodHandler { /// \param context : the ServerContext structure for this server call /// \param req : the request payload, if appropriate for this RPC /// \param req_status : the request status after any interceptors have run + /// \param handler_data: internal data for the handler. /// \param requester : used only by the callback API. It is a function /// called by the RPC Controller to request another RPC (and also /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status, std::function requester) + Status req_status, void* handler_data, + std::function requester) : call(c), server_context(context), request(req), status(req_status), + internal_data(handler_data), call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; + void* internal_data; std::function call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; @@ -71,7 +75,7 @@ class MethodHandler { pointer after calling RunHandler. Ownership of the deserialized request is retained by the handler. Returns nullptr if deserialization failed. */ virtual void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) { + Status* status, void** handler_data) { GPR_CODEGEN_ASSERT(req == nullptr); return nullptr; } diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index ce27b156283..030e4aea92a 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,9 @@ class ServerCallbackRpcController { /// to be called before the callback completes. virtual void SetCancelCallback(std::function callback) = 0; virtual void ClearCancelCallback() = 0; + + virtual void FreeRequest() = 0; + virtual void* GetAllocatorState() = 0; }; // NOTE: The actual streaming object classes are provided @@ -445,19 +449,22 @@ class CallbackUnaryHandler : public MethodHandler { CallbackUnaryHandler( std::function - func) - : func_(func) {} + func, + MessageAllocator* allocator) + : func_(func), allocator_(allocator) {} + void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) g_core_codegen_interface->grpc_call_ref(param.call->call()); + auto* allocator_info = + static_cast*>( + param.internal_data); auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) - ServerCallbackRpcControllerImpl( - param.server_context, param.call, - static_cast(param.request), - std::move(param.call_requester)); + ServerCallbackRpcControllerImpl(param.server_context, param.call, + allocator_info, allocator_, + std::move(param.call_requester)); Status status = param.status; - if (status.ok()) { // Call the actual function handler and expect the user to call finish CatchingCallback(func_, param.server_context, controller->request(), @@ -468,18 +475,41 @@ class CallbackUnaryHandler : public MethodHandler { } } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); - auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(RequestType))) RequestType(); + RequestType* request = nullptr; + RpcAllocatorInfo* allocator_info = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(*allocator_info))) + RpcAllocatorInfo(); + if (allocator_ != nullptr) { + allocator_->AllocateMessages(allocator_info); + } else { + allocator_info->request = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + allocator_info->response = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(ResponseType))) ResponseType(); + } + *handler_data = allocator_info; + request = allocator_info->request; *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } - request->~RequestType(); + // Clean up on deserialization failure. + if (allocator_ != nullptr) { + allocator_->DeallocateMessages(allocator_info); + } else { + allocator_info->request->~RequestType(); + allocator_info->response->~ResponseType(); + allocator_info->request = nullptr; + allocator_info->response = nullptr; + } return nullptr; } @@ -487,6 +517,7 @@ class CallbackUnaryHandler : public MethodHandler { std::function func_; + MessageAllocator* allocator_; // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows @@ -507,8 +538,9 @@ class CallbackUnaryHandler : public MethodHandler { } // The response is dropped if the status is not OK. if (s.ok()) { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessagePtr(&resp_)); + finish_ops_.ServerSendStatus( + &ctx_->trailing_metadata_, + finish_ops_.SendMessagePtr(allocator_info_->response)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -546,29 +578,52 @@ class CallbackUnaryHandler : public MethodHandler { void ClearCancelCallback() override { ctx_->ClearCancelCallback(); } + void FreeRequest() override { + if (allocator_ != nullptr) { + allocator_->DeallocateRequest(allocator_info_); + } + } + + void* GetAllocatorState() override { + return allocator_info_->allocator_state; + } + private: friend class CallbackUnaryHandler; - ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - const RequestType* req, - std::function call_requester) + ServerCallbackRpcControllerImpl( + ServerContext* ctx, Call* call, + RpcAllocatorInfo* allocator_info, + MessageAllocator* allocator, + std::function call_requester) : ctx_(ctx), call_(*call), - req_(req), + allocator_info_(allocator_info), + allocator_(allocator), call_requester_(std::move(call_requester)) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); } - ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } + ~ServerCallbackRpcControllerImpl() {} - const RequestType* request() { return req_; } - ResponseType* response() { return &resp_; } + const RequestType* request() { return allocator_info_->request; } + ResponseType* response() { return allocator_info_->response; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor + if (allocator_ != nullptr) { + allocator_->DeallocateMessages(allocator_info_); + } else { + if (allocator_info_->request != nullptr) { + allocator_info_->request->~RequestType(); + } + if (allocator_info_->response != nullptr) { + allocator_info_->response->~ResponseType(); + } + } g_core_codegen_interface->grpc_call_unref(call); call_requester(); } @@ -583,8 +638,8 @@ class CallbackUnaryHandler : public MethodHandler { ServerContext* ctx_; Call call_; - const RequestType* req_; - ResponseType resp_; + RpcAllocatorInfo* allocator_info_; + MessageAllocator* allocator_; std::function call_requester_; std::atomic_int callbacks_outstanding_{ 2}; // reserve for Finish and CompletionOp @@ -771,8 +826,8 @@ class CallbackServerStreamingHandler : public MethodHandler { writer->MaybeDone(); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( diff --git a/include/grpcpp/support/message_allocator.h b/include/grpcpp/support/message_allocator.h new file mode 100644 index 00000000000..eb1dbb14764 --- /dev/null +++ b/include/grpcpp/support/message_allocator.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H_ +#define GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H_ + +#include + +#endif // GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H_ diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1bb9c509bb5..86f4e3c5df9 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -155,6 +155,8 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, params.grpc_search_path); printer->Print(vars, "\n"); printer->Print(vars, "namespace grpc {\n"); + printer->Print(vars, "template \n"); + printer->Print(vars, "class MessageAllocator;\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); @@ -993,7 +995,23 @@ void PrintHeaderServerMethodCallback( "controller) {\n" " return this->$" "Method$(context, request, response, controller);\n" - " }));\n"); + " }, nullptr));\n}\n"); + printer->Print( + *vars, + "void SetMessageAllocatorFor_$Method$(\n" + " ::grpc::MessageAllocator<$RealRequest$, $RealResponse$>* " + "allocator) {\n" + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackUnaryHandler< " + "$RealRequest$, $RealResponse$>(\n" + " [this](::grpc::ServerContext* context,\n" + " const $RealRequest$* request,\n" + " $RealResponse$* response,\n" + " ::grpc::experimental::ServerCallbackRpcController* " + "controller) {\n" + " return this->$" + "Method$(context, request, response, controller);\n" + " }, allocator));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index aa9fdac9bea..836435c86a9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -281,7 +281,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_); + &request_status_, nullptr); request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( @@ -305,7 +305,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr)); + &call_, &ctx_, request_, request_status_, nullptr, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); @@ -512,7 +512,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE req_->request_ = req_->method_->handler()->Deserialize( - req_->call_, req_->request_payload_, &req_->request_status_); + req_->call_, req_->request_payload_, &req_->request_status_, + &req_->handler_data_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); @@ -532,7 +533,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, [this] { + call_, &req_->ctx_, req_->request_, req_->request_status_, + req_->handler_data_, [this] { // Recycle this request if there aren't too many outstanding. // Note that we don't have to worry about a case where there // are no requests waiting to match for this method since that @@ -577,6 +579,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); request_payload_ = nullptr; request_ = nullptr; + handler_data_ = nullptr; request_status_ = Status(); } @@ -587,6 +590,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; + void* handler_data_ = nullptr; Status request_status_; grpc_call_details* call_details_ = nullptr; grpc_call* call_; diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index 2f935304ab2..066a0850d6a 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -17,6 +17,8 @@ syntax = "proto3"; package grpc.testing; +option cc_enable_arenas = true; + // Message to be echoed back serialized in trailer. message DebugInfo { repeated string stack_entries = 1; diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 707a628148e..e89667a9714 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -675,3 +675,23 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "message_allocator_end2end_test", + srcs = ["message_allocator_end2end_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + ":test_service_impl", + "//:grpc", + "//:gpr", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//src/proto/grpc/testing:simple_messages_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc new file mode 100644 index 00000000000..995318ce8d9 --- /dev/null +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -0,0 +1,395 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/iomgr/iomgr.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/test_credentials_provider.h" + +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, any +// callback tests can only be run if the iomgr can run in the background or if +// the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + +namespace grpc { +namespace testing { +namespace { + +class CallbackTestServiceImpl + : public EchoTestService::ExperimentalCallbackService { + public: + explicit CallbackTestServiceImpl() {} + + void SetFreeRequest() { free_request_ = true; } + + void SetAllocatorMutator( + std::function + mutator) { + allocator_mutator_ = mutator; + } + + void Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller) override { + response->set_message(request->message()); + if (free_request_) { + controller->FreeRequest(); + } else if (allocator_mutator_) { + allocator_mutator_(controller->GetAllocatorState(), request, response); + } + controller->Finish(Status::OK); + } + + private: + bool free_request_ = false; + std::function + allocator_mutator_; +}; + +enum class Protocol { INPROC, TCP }; + +class TestScenario { + public: + TestScenario(Protocol protocol, const grpc::string& creds_type) + : protocol(protocol), credentials_type(creds_type) {} + void Log() const; + Protocol protocol; + const grpc::string credentials_type; +}; + +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{protocol=" + << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP") + << "," << scenario.credentials_type << "}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_INFO, "%s", out.str().c_str()); +} + +class MessageAllocatorEnd2endTestBase + : public ::testing::TestWithParam { + protected: + MessageAllocatorEnd2endTestBase() { + GetParam().Log(); + if (GetParam().protocol == Protocol::TCP) { + if (!grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } + } + + ~MessageAllocatorEnd2endTestBase() = default; + + void CreateServer(MessageAllocator* allocator) { + ServerBuilder builder; + + auto server_creds = GetCredentialsProvider()->GetServerCredentials( + GetParam().credentials_type); + if (GetParam().protocol == Protocol::TCP) { + picked_port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << picked_port_; + builder.AddListeningPort(server_address_.str(), server_creds); + } + callback_service_.SetMessageAllocatorFor_Echo(allocator); + builder.RegisterService(&callback_service_); + + server_ = builder.BuildAndStart(); + is_server_started_ = true; + } + + void ResetStub() { + ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); + switch (GetParam().protocol) { + case Protocol::TCP: + channel_ = + CreateCustomChannel(server_address_.str(), channel_creds, args); + break; + case Protocol::INPROC: + channel_ = server_->InProcessChannel(args); + break; + default: + assert(false); + } + stub_ = EchoTestService::NewStub(channel_); + } + + void TearDown() override { + if (is_server_started_) { + server_->Shutdown(); + } + if (picked_port_ > 0) { + grpc_recycle_unused_port(picked_port_); + } + } + + void SendRpcs(int num_rpcs) { + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + + test_string += "Hello world. "; + request.set_message(test_string); + grpc::string val; + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&request, &response, &done, &mu, &cv, val](Status s) { + GPR_ASSERT(s.ok()); + + EXPECT_EQ(request.message(), response.message()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } + } + } + + bool do_not_test_{false}; + bool is_server_started_{false}; + int picked_port_{0}; + std::shared_ptr channel_; + std::unique_ptr stub_; + CallbackTestServiceImpl callback_service_; + std::unique_ptr server_; + std::ostringstream server_address_; +}; + +class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {}; + +TEST_P(NullAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + CreateServer(nullptr); + ResetStub(); + SendRpcs(1); +} + +class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase { + public: + class SimpleAllocator : public MessageAllocator { + public: + void AllocateMessages(RpcAllocatorInfo* info) { + allocation_count++; + info->request = new EchoRequest; + info->response = new EchoResponse; + info->allocator_state = info; + } + void DeallocateRequest(RpcAllocatorInfo* info) { + request_deallocation_count++; + delete info->request; + info->request = nullptr; + } + void DeallocateMessages(RpcAllocatorInfo* info) { + messages_deallocation_count++; + delete info->request; + delete info->response; + } + + int allocation_count = 0; + int request_deallocation_count = 0; + int messages_deallocation_count = 0; + }; +}; + +TEST_P(SimpleAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(0, allocator->request_deallocation_count); +} + +TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + callback_service_.SetFreeRequest(); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(kRpcCount, allocator->request_deallocation_count); +} + +TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + std::vector released_requests; + auto mutator = [&released_requests](void* allocator_state, + const EchoRequest* req, + EchoResponse* resp) { + auto* info = static_cast*>( + allocator_state); + EXPECT_EQ(req, info->request); + EXPECT_EQ(resp, info->response); + EXPECT_EQ(allocator_state, info->allocator_state); + released_requests.push_back(info->request); + info->request = nullptr; + }; + callback_service_.SetAllocatorMutator(mutator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(0, allocator->request_deallocation_count); + EXPECT_EQ(static_cast(kRpcCount), released_requests.size()); + for (auto* req : released_requests) { + delete req; + } +} + +class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase { + public: + class ArenaAllocator : public MessageAllocator { + public: + void AllocateMessages(RpcAllocatorInfo* info) { + allocation_count++; + auto* arena = new google::protobuf::Arena; + info->allocator_state = arena; + info->request = + google::protobuf::Arena::CreateMessage(arena); + info->response = + google::protobuf::Arena::CreateMessage(arena); + } + void DeallocateRequest(RpcAllocatorInfo* info) { + GPR_ASSERT(0); + } + void DeallocateMessages(RpcAllocatorInfo* info) { + deallocation_count++; + auto* arena = + static_cast(info->allocator_state); + delete arena; + } + + int allocation_count = 0; + int deallocation_count = 0; + }; +}; + +TEST_P(ArenaAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new ArenaAllocator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->deallocation_count); +} + +std::vector CreateTestScenarios(bool test_insecure) { + std::vector scenarios; + std::vector credentials_types{ + GetCredentialsProvider()->GetSecureCredentialsTypeList()}; + auto insec_ok = [] { + // Only allow insecure credentials type when it is registered with the + // provider. User may create providers that do not have insecure. + return GetCredentialsProvider()->GetChannelCredentials( + kInsecureCredentialsType, nullptr) != nullptr; + }; + if (test_insecure && insec_ok()) { + credentials_types.push_back(kInsecureCredentialsType); + } + GPR_ASSERT(!credentials_types.empty()); + + Protocol parr[]{Protocol::INPROC, Protocol::TCP}; + for (Protocol p : parr) { + for (const auto& cred : credentials_types) { + // TODO(vjpai): Test inproc with secure credentials when feasible + if (p == Protocol::INPROC && + (cred != kInsecureCredentialsType || !insec_ok())) { + continue; + } + scenarios.emplace_back(p, cred); + } + } + return scenarios; +} + +INSTANTIATE_TEST_CASE_P(NullAllocatorTest, NullAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); +INSTANTIATE_TEST_CASE_P(SimpleAllocatorTest, SimpleAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); +INSTANTIATE_TEST_CASE_P(ArenaAllocatorTest, ArenaAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + // The grpc_init is to cover the MAYBE_SKIP_TEST. + grpc_init(); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 7274f9588b3..1b8faf655ae 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -968,6 +968,7 @@ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ +include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ @@ -1022,6 +1023,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ +include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 80760da0c58..47f583bf7f7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -970,6 +970,7 @@ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ +include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ @@ -1024,6 +1025,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ +include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index de84eb74b29..61c828ec61a 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -4149,6 +4149,24 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "message_allocator_end2end_test", + "src": [ + "test/cpp/end2end/message_allocator_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -9919,6 +9937,7 @@ "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", @@ -9995,6 +10014,7 @@ "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", @@ -10163,6 +10183,7 @@ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", @@ -10283,6 +10304,7 @@ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index cfeff6fa838..881695f3bdd 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4857,6 +4857,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 0.5, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "message_allocator_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,