diff --git a/BUILD b/BUILD index 56d332e0807..1735961eb61 100644 --- a/BUILD +++ b/BUILD @@ -268,6 +268,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", @@ -2138,6 +2139,7 @@ grpc_cc_library( "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", diff --git a/BUILD.gn b/BUILD.gn index 10b514f8f2e..10b0f1a7fdd 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -1047,6 +1047,7 @@ config("grpc_config") { "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/proto_buffer_reader.h", @@ -1102,6 +1103,7 @@ config("grpc_config") { "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index ee8712f1d38..0edd7fd54ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -661,6 +661,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx json_run_localhost) endif() add_dependencies(buildtests_cxx memory_test) +add_dependencies(buildtests_cxx message_allocator_end2end_test) add_dependencies(buildtests_cxx metrics_client) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx nonblocking_test) @@ -3054,6 +3055,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -3169,6 +3171,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -3658,6 +3661,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -3773,6 +3777,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4206,6 +4211,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4403,6 +4409,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -4637,6 +4644,7 @@ foreach(_hdr include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h include/grpcpp/support/interceptor.h + include/grpcpp/support/message_allocator.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/server_callback.h @@ -4752,6 +4760,7 @@ foreach(_hdr include/grpcpp/impl/codegen/intercepted_channel.h include/grpcpp/impl/codegen/interceptor.h include/grpcpp/impl/codegen/interceptor_common.h + include/grpcpp/impl/codegen/message_allocator.h include/grpcpp/impl/codegen/metadata_map.h include/grpcpp/impl/codegen/method_handler_impl.h include/grpcpp/impl/codegen/rpc_method.h @@ -14501,6 +14510,46 @@ target_link_libraries(memory_test ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + +add_executable(message_allocator_end2end_test + test/cpp/end2end/message_allocator_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(message_allocator_end2end_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(message_allocator_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + + endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index c7fbca51225..57530500b15 100644 --- a/Makefile +++ b/Makefile @@ -1233,6 +1233,7 @@ interop_server: $(BINDIR)/$(CONFIG)/interop_server interop_test: $(BINDIR)/$(CONFIG)/interop_test json_run_localhost: $(BINDIR)/$(CONFIG)/json_run_localhost memory_test: $(BINDIR)/$(CONFIG)/memory_test +message_allocator_end2end_test: $(BINDIR)/$(CONFIG)/message_allocator_end2end_test metrics_client: $(BINDIR)/$(CONFIG)/metrics_client mock_test: $(BINDIR)/$(CONFIG)/mock_test nonblocking_test: $(BINDIR)/$(CONFIG)/nonblocking_test @@ -1701,6 +1702,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/interop_test \ $(BINDIR)/$(CONFIG)/json_run_localhost \ $(BINDIR)/$(CONFIG)/memory_test \ + $(BINDIR)/$(CONFIG)/message_allocator_end2end_test \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ @@ -1844,6 +1846,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/interop_test \ $(BINDIR)/$(CONFIG)/json_run_localhost \ $(BINDIR)/$(CONFIG)/memory_test \ + $(BINDIR)/$(CONFIG)/message_allocator_end2end_test \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ @@ -2343,6 +2346,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 ) $(E) "[RUN] Testing memory_test" $(Q) $(BINDIR)/$(CONFIG)/memory_test || ( echo test memory_test failed ; exit 1 ) + $(E) "[RUN] Testing message_allocator_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/message_allocator_end2end_test || ( echo test message_allocator_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing mock_test" $(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 ) $(E) "[RUN] Testing nonblocking_test" @@ -5390,6 +5395,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -5505,6 +5511,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6002,6 +6009,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -6117,6 +6125,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6522,6 +6531,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6690,6 +6700,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -6930,6 +6941,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ + include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ @@ -7045,6 +7057,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ + include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/rpc_method.h \ @@ -17407,6 +17420,49 @@ endif endif +MESSAGE_ALLOCATOR_END2END_TEST_SRC = \ + test/cpp/end2end/message_allocator_end2end_test.cc \ + +MESSAGE_ALLOCATOR_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(MESSAGE_ALLOCATOR_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/message_allocator_end2end_test: $(PROTOBUF_DEP) $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/message_allocator_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/message_allocator_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_message_allocator_end2end_test: $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(MESSAGE_ALLOCATOR_END2END_TEST_OBJS:.o=.dep) +endif +endif + + METRICS_CLIENT_SRC = \ $(GENDIR)/src/proto/grpc/testing/metrics.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.grpc.pb.cc \ test/cpp/interop/metrics_client.cc \ diff --git a/build.yaml b/build.yaml index 4593be986bf..80519946295 100644 --- a/build.yaml +++ b/build.yaml @@ -1258,6 +1258,7 @@ filegroups: - include/grpcpp/impl/codegen/intercepted_channel.h - include/grpcpp/impl/codegen/interceptor.h - include/grpcpp/impl/codegen/interceptor_common.h + - include/grpcpp/impl/codegen/message_allocator.h - include/grpcpp/impl/codegen/metadata_map.h - include/grpcpp/impl/codegen/method_handler_impl.h - include/grpcpp/impl/codegen/rpc_method.h @@ -1396,6 +1397,7 @@ filegroups: - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h - include/grpcpp/support/interceptor.h + - include/grpcpp/support/message_allocator.h - include/grpcpp/support/proto_buffer_reader.h - include/grpcpp/support/proto_buffer_writer.h - include/grpcpp/support/server_callback.h @@ -5069,6 +5071,19 @@ targets: uses: - grpc++_test uses_polling: false +- name: message_allocator_end2end_test + gtest: true + cpu_cost: 0.5 + build: test + language: c++ + src: + - test/cpp/end2end/message_allocator_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr - name: metrics_client build: test run: false diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 74f758e6487..de1bb0ae57f 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -132,6 +132,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/client_interceptor.h', 'include/grpcpp/support/config.h', 'include/grpcpp/support/interceptor.h', + 'include/grpcpp/support/message_allocator.h', 'include/grpcpp/support/proto_buffer_reader.h', 'include/grpcpp/support/proto_buffer_writer.h', 'include/grpcpp/support/server_callback.h', @@ -166,6 +167,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/intercepted_channel.h', 'include/grpcpp/impl/codegen/interceptor.h', 'include/grpcpp/impl/codegen/interceptor_common.h', + 'include/grpcpp/impl/codegen/message_allocator.h', 'include/grpcpp/impl/codegen/metadata_map.h', 'include/grpcpp/impl/codegen/method_handler_impl.h', 'include/grpcpp/impl/codegen/rpc_method.h', diff --git a/include/grpcpp/impl/codegen/message_allocator.h b/include/grpcpp/impl/codegen/message_allocator.h new file mode 100644 index 00000000000..107bec62f1d --- /dev/null +++ b/include/grpcpp/impl/codegen/message_allocator.h @@ -0,0 +1,55 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H +#define GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H + +namespace grpc { +namespace experimental { + +// This is per rpc struct for the allocator. We can potentially put the grpc +// call arena in here in the future. +template +struct RpcAllocatorInfo { + RequestT* request; + ResponseT* response; + // per rpc allocator internal state. MessageAllocator can set it when + // AllocateMessages is called and use it later. + void* allocator_state; +}; + +// Implementations need to be thread-safe +template +class MessageAllocator { + public: + virtual ~MessageAllocator() = default; + // Allocate both request and response + virtual void AllocateMessages( + RpcAllocatorInfo* info) = 0; + // Optional: deallocate request early, called by + // ServerCallbackRpcController::ReleaseRequest + virtual void DeallocateRequest(RpcAllocatorInfo* info) {} + // Deallocate response and request (if applicable) + virtual void DeallocateMessages( + RpcAllocatorInfo* info) = 0; +}; + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 094286294c2..dee1cb56ad1 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -86,8 +86,8 @@ class RpcMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( @@ -191,8 +191,8 @@ class ServerStreamingHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( @@ -327,8 +327,8 @@ class ErrorMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { // We have to destroy any request payload if (req != nullptr) { g_core_codegen_interface->grpc_byte_buffer_destroy(req); diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index 56df61cdfae..21fb2ac2130 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -46,21 +46,25 @@ class MethodHandler { /// \param context : the ServerContext structure for this server call /// \param req : the request payload, if appropriate for this RPC /// \param req_status : the request status after any interceptors have run + /// \param handler_data: internal data for the handler. /// \param requester : used only by the callback API. It is a function /// called by the RPC Controller to request another RPC (and also /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status, std::function requester) + Status req_status, void* handler_data, + std::function requester) : call(c), server_context(context), request(req), status(req_status), + internal_data(handler_data), call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; + void* internal_data; std::function call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; @@ -71,7 +75,7 @@ class MethodHandler { pointer after calling RunHandler. Ownership of the deserialized request is retained by the handler. Returns nullptr if deserialization failed. */ virtual void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) { + Status* status, void** handler_data) { GPR_CODEGEN_ASSERT(req == nullptr); return nullptr; } diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 782a942f56d..3f6d5cd3555 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,14 @@ class ServerCallbackRpcController { /// to be called before the callback completes. virtual void SetCancelCallback(std::function callback) = 0; virtual void ClearCancelCallback() = 0; + + // NOTE: This is an API for advanced users who need custom allocators. + // Optionally deallocate request early to reduce the size of working set. + // A custom MessageAllocator needs to be registered to make use of this. + virtual void FreeRequest() = 0; + // NOTE: This is an API for advanced users who need custom allocators. + // Get and maybe mutate the allocator state associated with the current RPC. + virtual void* GetAllocatorState() = 0; }; // NOTE: The actual streaming object classes are provided @@ -447,17 +456,24 @@ class CallbackUnaryHandler : public MethodHandler { experimental::ServerCallbackRpcController*)> func) : func_(func) {} + + void SetMessageAllocator( + experimental::MessageAllocator* allocator) { + allocator_ = allocator; + } + void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) g_core_codegen_interface->grpc_call_ref(param.call->call()); + auto* allocator_info = + static_cast*>( + param.internal_data); auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) - ServerCallbackRpcControllerImpl( - param.server_context, param.call, - static_cast(param.request), - std::move(param.call_requester)); + ServerCallbackRpcControllerImpl(param.server_context, param.call, + allocator_info, allocator_, + std::move(param.call_requester)); Status status = param.status; - if (status.ok()) { // Call the actual function handler and expect the user to call finish CatchingCallback(func_, param.server_context, controller->request(), @@ -468,18 +484,41 @@ class CallbackUnaryHandler : public MethodHandler { } } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); - auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( - call, sizeof(RequestType))) RequestType(); + RequestType* request = nullptr; + experimental::RpcAllocatorInfo* allocator_info = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(*allocator_info))) + experimental::RpcAllocatorInfo(); + if (allocator_ != nullptr) { + allocator_->AllocateMessages(allocator_info); + } else { + allocator_info->request = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + allocator_info->response = + new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(ResponseType))) ResponseType(); + } + *handler_data = allocator_info; + request = allocator_info->request; *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } - request->~RequestType(); + // Clean up on deserialization failure. + if (allocator_ != nullptr) { + allocator_->DeallocateMessages(allocator_info); + } else { + allocator_info->request->~RequestType(); + allocator_info->response->~ResponseType(); + allocator_info->request = nullptr; + allocator_info->response = nullptr; + } return nullptr; } @@ -487,6 +526,8 @@ class CallbackUnaryHandler : public MethodHandler { std::function func_; + experimental::MessageAllocator* allocator_ = + nullptr; // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows @@ -507,8 +548,9 @@ class CallbackUnaryHandler : public MethodHandler { } // The response is dropped if the status is not OK. if (s.ok()) { - finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessagePtr(&resp_)); + finish_ops_.ServerSendStatus( + &ctx_->trailing_metadata_, + finish_ops_.SendMessagePtr(allocator_info_->response)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -546,28 +588,50 @@ class CallbackUnaryHandler : public MethodHandler { void ClearCancelCallback() override { ctx_->ClearCancelCallback(); } + void FreeRequest() override { + if (allocator_ != nullptr) { + allocator_->DeallocateRequest(allocator_info_); + } + } + + void* GetAllocatorState() override { + return allocator_info_->allocator_state; + } + private: friend class CallbackUnaryHandler; - ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - const RequestType* req, - std::function call_requester) + ServerCallbackRpcControllerImpl( + ServerContext* ctx, Call* call, + experimental::RpcAllocatorInfo* + allocator_info, + experimental::MessageAllocator* allocator, + std::function call_requester) : ctx_(ctx), call_(*call), - req_(req), + allocator_info_(allocator_info), + allocator_(allocator), call_requester_(std::move(call_requester)) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); } - ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } - - const RequestType* request() { return req_; } - ResponseType* response() { return &resp_; } + const RequestType* request() { return allocator_info_->request; } + ResponseType* response() { return allocator_info_->response; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); + if (allocator_ != nullptr) { + allocator_->DeallocateMessages(allocator_info_); + } else { + if (allocator_info_->request != nullptr) { + allocator_info_->request->~RequestType(); + } + if (allocator_info_->response != nullptr) { + allocator_info_->response->~ResponseType(); + } + } this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); @@ -583,8 +647,8 @@ class CallbackUnaryHandler : public MethodHandler { ServerContext* ctx_; Call call_; - const RequestType* req_; - ResponseType resp_; + experimental::RpcAllocatorInfo* allocator_info_; + experimental::MessageAllocator* allocator_; std::function call_requester_; std::atomic_int callbacks_outstanding_{ 2}; // reserve for Finish and CompletionOp @@ -771,8 +835,8 @@ class CallbackServerStreamingHandler : public MethodHandler { writer->MaybeDone(); } - void* Deserialize(grpc_call* call, grpc_byte_buffer* req, - Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, + void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 332a04c294f..198bc7244b4 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -132,6 +132,11 @@ class Service { internal::RpcServiceMethod::ApiType::RAW_CALL_BACK); } + internal::MethodHandler* GetHandler(int index) { + size_t idx = static_cast(index); + return service_->methods_[idx]->handler(); + } + private: Service* service_; }; diff --git a/include/grpcpp/support/message_allocator.h b/include/grpcpp/support/message_allocator.h new file mode 100644 index 00000000000..20ce072b901 --- /dev/null +++ b/include/grpcpp/support/message_allocator.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H +#define GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H + +#include + +#endif // GRPCPP_SUPPORT_MESSAGE_ALLOCATOR_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 97b84dd202a..ecec3206577 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -155,6 +155,10 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, params.grpc_search_path); printer->Print(vars, "\n"); printer->Print(vars, "namespace grpc {\n"); + printer->Print(vars, "namespace experimental {\n"); + printer->Print(vars, "template \n"); + printer->Print(vars, "class MessageAllocator;\n"); + printer->Print(vars, "} // namespace experimental\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); @@ -1011,7 +1015,15 @@ void PrintHeaderServerMethodCallback( "controller) {\n" " return this->$" "Method$(context, request, response, controller);\n" - " }));\n"); + " }));\n}\n"); + printer->Print(*vars, + "void SetMessageAllocatorFor_$Method$(\n" + " ::grpc::experimental::MessageAllocator< " + "$RealRequest$, $RealResponse$>* allocator) {\n" + " static_cast<::grpc::internal::CallbackUnaryHandler< " + "$RealRequest$, $RealResponse$>*>(\n" + " ::grpc::Service::experimental().GetHandler($Idx$))\n" + " ->SetMessageAllocator(allocator);\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 64a6de97d7e..63608cbcde6 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -281,7 +281,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_); + &request_status_, nullptr); request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( @@ -305,7 +305,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr)); + &call_, &ctx_, request_, request_status_, nullptr, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); @@ -512,7 +512,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE req_->request_ = req_->method_->handler()->Deserialize( - req_->call_, req_->request_payload_, &req_->request_status_); + req_->call_, req_->request_payload_, &req_->request_status_, + &req_->handler_data_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); @@ -532,7 +533,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, [this] { + call_, &req_->ctx_, req_->request_, req_->request_status_, + req_->handler_data_, [this] { // Recycle this request if there aren't too many outstanding. // Note that we don't have to worry about a case where there // are no requests waiting to match for this method since that @@ -577,6 +579,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); request_payload_ = nullptr; request_ = nullptr; + handler_data_ = nullptr; request_status_ = Status(); } @@ -587,6 +590,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; + void* handler_data_; Status request_status_; grpc_call_details* call_details_ = nullptr; grpc_call* call_; diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index df0b6801d3d..2c773ea0aaa 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -17,6 +17,8 @@ syntax = "proto3"; package grpc.testing; +option cc_enable_arenas = true; + // Message to be echoed back serialized in trailer. message DebugInfo { repeated string stack_entries = 1; diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 1067dbbb98e..9e99bc7b0a2 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -41,6 +41,10 @@ #include namespace grpc { +namespace experimental { +template +class MessageAllocator; +} // namespace experimental class CompletionQueue; class Channel; class ServerCompletionQueue; @@ -336,6 +340,12 @@ class ServiceA final { return this->MethodA1(context, request, response, controller); })); } + void SetMessageAllocatorFor_MethodA1( + ::grpc::experimental::MessageAllocator< ::grpc::testing::Request, ::grpc::testing::Response>* allocator) { + static_cast<::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>*>( + ::grpc::Service::experimental().GetHandler(0)) + ->SetMessageAllocator(allocator); + } ~ExperimentalWithCallbackMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); } @@ -808,6 +818,12 @@ class ServiceB final { return this->MethodB1(context, request, response, controller); })); } + void SetMessageAllocatorFor_MethodB1( + ::grpc::experimental::MessageAllocator< ::grpc::testing::Request, ::grpc::testing::Response>* allocator) { + static_cast<::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>*>( + ::grpc::Service::experimental().GetHandler(0)) + ->SetMessageAllocator(allocator); + } ~ExperimentalWithCallbackMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 707a628148e..e89667a9714 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -675,3 +675,23 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "message_allocator_end2end_test", + srcs = ["message_allocator_end2end_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + ":test_service_impl", + "//:grpc", + "//:gpr", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//src/proto/grpc/testing:simple_messages_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc new file mode 100644 index 00000000000..55f792aa3bf --- /dev/null +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -0,0 +1,405 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/iomgr/iomgr.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/test_credentials_provider.h" + +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, any +// callback tests can only be run if the iomgr can run in the background or if +// the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + +namespace grpc { +namespace testing { +namespace { + +class CallbackTestServiceImpl + : public EchoTestService::ExperimentalCallbackService { + public: + explicit CallbackTestServiceImpl() {} + + void SetFreeRequest() { free_request_ = true; } + + void SetAllocatorMutator( + std::function + mutator) { + allocator_mutator_ = mutator; + } + + void Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller) override { + response->set_message(request->message()); + if (free_request_) { + controller->FreeRequest(); + } else if (allocator_mutator_) { + allocator_mutator_(controller->GetAllocatorState(), request, response); + } + controller->Finish(Status::OK); + } + + private: + bool free_request_ = false; + std::function + allocator_mutator_; +}; + +enum class Protocol { INPROC, TCP }; + +class TestScenario { + public: + TestScenario(Protocol protocol, const grpc::string& creds_type) + : protocol(protocol), credentials_type(creds_type) {} + void Log() const; + Protocol protocol; + const grpc::string credentials_type; +}; + +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{protocol=" + << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP") + << "," << scenario.credentials_type << "}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_INFO, "%s", out.str().c_str()); +} + +class MessageAllocatorEnd2endTestBase + : public ::testing::TestWithParam { + protected: + MessageAllocatorEnd2endTestBase() { + GetParam().Log(); + if (GetParam().protocol == Protocol::TCP) { + if (!grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } + } + + ~MessageAllocatorEnd2endTestBase() = default; + + void CreateServer( + experimental::MessageAllocator* allocator) { + ServerBuilder builder; + + auto server_creds = GetCredentialsProvider()->GetServerCredentials( + GetParam().credentials_type); + if (GetParam().protocol == Protocol::TCP) { + picked_port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << picked_port_; + builder.AddListeningPort(server_address_.str(), server_creds); + } + callback_service_.SetMessageAllocatorFor_Echo(allocator); + builder.RegisterService(&callback_service_); + + server_ = builder.BuildAndStart(); + is_server_started_ = true; + } + + void ResetStub() { + ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); + switch (GetParam().protocol) { + case Protocol::TCP: + channel_ = + CreateCustomChannel(server_address_.str(), channel_creds, args); + break; + case Protocol::INPROC: + channel_ = server_->InProcessChannel(args); + break; + default: + assert(false); + } + stub_ = EchoTestService::NewStub(channel_); + } + + void TearDown() override { + if (is_server_started_) { + server_->Shutdown(); + } + if (picked_port_ > 0) { + grpc_recycle_unused_port(picked_port_); + } + } + + void SendRpcs(int num_rpcs) { + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + + test_string += grpc::string(1024, 'x'); + request.set_message(test_string); + grpc::string val; + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&request, &response, &done, &mu, &cv, val](Status s) { + GPR_ASSERT(s.ok()); + + EXPECT_EQ(request.message(), response.message()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } + } + } + + bool do_not_test_{false}; + bool is_server_started_{false}; + int picked_port_{0}; + std::shared_ptr channel_; + std::unique_ptr stub_; + CallbackTestServiceImpl callback_service_; + std::unique_ptr server_; + std::ostringstream server_address_; +}; + +class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {}; + +TEST_P(NullAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + CreateServer(nullptr); + ResetStub(); + SendRpcs(1); +} + +class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase { + public: + class SimpleAllocator + : public experimental::MessageAllocator { + public: + void AllocateMessages( + experimental::RpcAllocatorInfo* info) { + allocation_count++; + info->request = new EchoRequest; + info->response = new EchoResponse; + info->allocator_state = info; + } + void DeallocateRequest( + experimental::RpcAllocatorInfo* info) { + request_deallocation_count++; + delete info->request; + info->request = nullptr; + } + void DeallocateMessages( + experimental::RpcAllocatorInfo* info) { + messages_deallocation_count++; + delete info->request; + delete info->response; + } + + int allocation_count = 0; + int request_deallocation_count = 0; + int messages_deallocation_count = 0; + }; +}; + +TEST_P(SimpleAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(0, allocator->request_deallocation_count); +} + +TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + callback_service_.SetFreeRequest(); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(kRpcCount, allocator->request_deallocation_count); +} + +TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new SimpleAllocator); + std::vector released_requests; + auto mutator = [&released_requests](void* allocator_state, + const EchoRequest* req, + EchoResponse* resp) { + auto* info = + static_cast*>( + allocator_state); + EXPECT_EQ(req, info->request); + EXPECT_EQ(resp, info->response); + EXPECT_EQ(allocator_state, info->allocator_state); + released_requests.push_back(info->request); + info->request = nullptr; + }; + callback_service_.SetAllocatorMutator(mutator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count); + EXPECT_EQ(0, allocator->request_deallocation_count); + EXPECT_EQ(static_cast(kRpcCount), released_requests.size()); + for (auto* req : released_requests) { + delete req; + } +} + +class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase { + public: + class ArenaAllocator + : public experimental::MessageAllocator { + public: + void AllocateMessages( + experimental::RpcAllocatorInfo* info) { + allocation_count++; + auto* arena = new google::protobuf::Arena; + info->allocator_state = arena; + info->request = + google::protobuf::Arena::CreateMessage(arena); + info->response = + google::protobuf::Arena::CreateMessage(arena); + } + void DeallocateRequest( + experimental::RpcAllocatorInfo* info) { + GPR_ASSERT(0); + } + void DeallocateMessages( + experimental::RpcAllocatorInfo* info) { + deallocation_count++; + auto* arena = + static_cast(info->allocator_state); + delete arena; + } + + int allocation_count = 0; + int deallocation_count = 0; + }; +}; + +TEST_P(ArenaAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; + const int kRpcCount = 10; + std::unique_ptr allocator(new ArenaAllocator); + CreateServer(allocator.get()); + ResetStub(); + SendRpcs(kRpcCount); + EXPECT_EQ(kRpcCount, allocator->allocation_count); + EXPECT_EQ(kRpcCount, allocator->deallocation_count); +} + +std::vector CreateTestScenarios(bool test_insecure) { + std::vector scenarios; + std::vector credentials_types{ + GetCredentialsProvider()->GetSecureCredentialsTypeList()}; + auto insec_ok = [] { + // Only allow insecure credentials type when it is registered with the + // provider. User may create providers that do not have insecure. + return GetCredentialsProvider()->GetChannelCredentials( + kInsecureCredentialsType, nullptr) != nullptr; + }; + if (test_insecure && insec_ok()) { + credentials_types.push_back(kInsecureCredentialsType); + } + GPR_ASSERT(!credentials_types.empty()); + + Protocol parr[]{Protocol::INPROC, Protocol::TCP}; + for (Protocol p : parr) { + for (const auto& cred : credentials_types) { + // TODO(vjpai): Test inproc with secure credentials when feasible + if (p == Protocol::INPROC && + (cred != kInsecureCredentialsType || !insec_ok())) { + continue; + } + scenarios.emplace_back(p, cred); + } + } + return scenarios; +} + +INSTANTIATE_TEST_CASE_P(NullAllocatorTest, NullAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); +INSTANTIATE_TEST_CASE_P(SimpleAllocatorTest, SimpleAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); +INSTANTIATE_TEST_CASE_P(ArenaAllocatorTest, ArenaAllocatorTest, + ::testing::ValuesIn(CreateTestScenarios(true))); + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + // The grpc_init is to cover the MAYBE_SKIP_TEST. + grpc_init(); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index a0a6e952409..6902c3aa38b 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -968,6 +968,7 @@ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ +include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ @@ -1023,6 +1024,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ +include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index c48c15bd1f5..ed0974fdb2b 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -970,6 +970,7 @@ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/intercepted_channel.h \ include/grpcpp/impl/codegen/interceptor.h \ include/grpcpp/impl/codegen/interceptor_common.h \ +include/grpcpp/impl/codegen/message_allocator.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ @@ -1025,6 +1026,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ include/grpcpp/support/interceptor.h \ +include/grpcpp/support/message_allocator.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/server_callback.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index bd6d68bc5c3..9b9ad7d430c 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -4149,6 +4149,24 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "message_allocator_end2end_test", + "src": [ + "test/cpp/end2end/message_allocator_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -9937,6 +9955,7 @@ "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", @@ -10013,6 +10032,7 @@ "include/grpcpp/impl/codegen/intercepted_channel.h", "include/grpcpp/impl/codegen/interceptor.h", "include/grpcpp/impl/codegen/interceptor_common.h", + "include/grpcpp/impl/codegen/message_allocator.h", "include/grpcpp/impl/codegen/metadata_map.h", "include/grpcpp/impl/codegen/method_handler_impl.h", "include/grpcpp/impl/codegen/rpc_method.h", @@ -10182,6 +10202,7 @@ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", @@ -10302,6 +10323,7 @@ "include/grpcpp/support/client_interceptor.h", "include/grpcpp/support/config.h", "include/grpcpp/support/interceptor.h", + "include/grpcpp/support/message_allocator.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/server_callback.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 80392e00518..ec2e570bea7 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4857,6 +4857,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 0.5, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "message_allocator_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,