Resolve review comments

pull/18784/head
yang-g 6 years ago
parent 051d421579
commit 41824319fa
  1. 2
      include/grpcpp/impl/codegen/message_allocator.h
  2. 33
      include/grpcpp/impl/codegen/server_callback.h
  3. 5
      include/grpcpp/impl/codegen/service_type.h
  4. 28
      src/compiler/cpp_generator.cc
  5. 2
      src/cpp/server/server_cc.cc
  6. 32
      test/cpp/codegen/compiler_test_golden
  7. 34
      test/cpp/end2end/message_allocator_end2end_test.cc

@ -20,6 +20,7 @@
#define GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H
namespace grpc {
namespace experimental {
// This is per rpc struct for the allocator. We can potentially put the grpc
// call arena in here in the future.
@ -48,6 +49,7 @@ class MessageAllocator {
RpcAllocatorInfo<RequestT, ResponseT>* info) = 0;
};
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_MESSAGE_ALLOCATOR_H

@ -137,7 +137,12 @@ class ServerCallbackRpcController {
virtual void SetCancelCallback(std::function<void()> callback) = 0;
virtual void ClearCancelCallback() = 0;
// NOTE: This is an API for advanced users who need custom allocators.
// Optionally deallocate request early to reduce the size of working set.
// A custom MessageAllocator needs to be registered to make use of this.
virtual void FreeRequest() = 0;
// NOTE: This is an API for advanced users who need custom allocators.
// Get and maybe mutate the allocator state associated with the current RPC.
virtual void* GetAllocatorState() = 0;
};
@ -449,15 +454,19 @@ class CallbackUnaryHandler : public MethodHandler {
CallbackUnaryHandler(
std::function<void(ServerContext*, const RequestType*, ResponseType*,
experimental::ServerCallbackRpcController*)>
func,
MessageAllocator<RequestType, ResponseType>* allocator)
: func_(func), allocator_(allocator) {}
func)
: func_(func) {}
void SetMessageAllocator(
experimental::MessageAllocator<RequestType, ResponseType>* allocator) {
allocator_ = allocator;
}
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a controller structure (that includes request/response)
g_core_codegen_interface->grpc_call_ref(param.call->call());
auto* allocator_info =
static_cast<RpcAllocatorInfo<RequestType, ResponseType>*>(
static_cast<experimental::RpcAllocatorInfo<RequestType, ResponseType>*>(
param.internal_data);
auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
@ -480,10 +489,10 @@ class CallbackUnaryHandler : public MethodHandler {
ByteBuffer buf;
buf.set_buffer(req);
RequestType* request = nullptr;
RpcAllocatorInfo<RequestType, ResponseType>* allocator_info =
experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info =
new (g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(*allocator_info)))
RpcAllocatorInfo<RequestType, ResponseType>();
experimental::RpcAllocatorInfo<RequestType, ResponseType>();
if (allocator_ != nullptr) {
allocator_->AllocateMessages(allocator_info);
} else {
@ -517,7 +526,8 @@ class CallbackUnaryHandler : public MethodHandler {
std::function<void(ServerContext*, const RequestType*, ResponseType*,
experimental::ServerCallbackRpcController*)>
func_;
MessageAllocator<RequestType, ResponseType>* allocator_;
experimental::MessageAllocator<RequestType, ResponseType>* allocator_ =
nullptr;
// The implementation class of ServerCallbackRpcController is a private member
// of CallbackUnaryHandler since it is never exposed anywhere, and this allows
@ -593,8 +603,9 @@ class CallbackUnaryHandler : public MethodHandler {
ServerCallbackRpcControllerImpl(
ServerContext* ctx, Call* call,
RpcAllocatorInfo<RequestType, ResponseType>* allocator_info,
MessageAllocator<RequestType, ResponseType>* allocator,
experimental::RpcAllocatorInfo<RequestType, ResponseType>*
allocator_info,
experimental::MessageAllocator<RequestType, ResponseType>* allocator,
std::function<void()> call_requester)
: ctx_(ctx),
call_(*call),
@ -636,8 +647,8 @@ class CallbackUnaryHandler : public MethodHandler {
ServerContext* ctx_;
Call call_;
RpcAllocatorInfo<RequestType, ResponseType>* allocator_info_;
MessageAllocator<RequestType, ResponseType>* allocator_;
experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info_;
experimental::MessageAllocator<RequestType, ResponseType>* allocator_;
std::function<void()> call_requester_;
std::atomic_int callbacks_outstanding_{
2}; // reserve for Finish and CompletionOp

@ -132,6 +132,11 @@ class Service {
internal::RpcServiceMethod::ApiType::RAW_CALL_BACK);
}
internal::MethodHandler* GetHandler(int index) {
size_t idx = static_cast<size_t>(index);
return service_->methods_[idx]->handler();
}
private:
Service* service_;
};

@ -155,8 +155,10 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file,
params.grpc_search_path);
printer->Print(vars, "\n");
printer->Print(vars, "namespace grpc {\n");
printer->Print(vars, "namespace experimental {\n");
printer->Print(vars, "template <typename RequestT, typename ResponseT>\n");
printer->Print(vars, "class MessageAllocator;\n");
printer->Print(vars, "} // namespace experimental\n");
printer->Print(vars, "class CompletionQueue;\n");
printer->Print(vars, "class Channel;\n");
printer->Print(vars, "class ServerCompletionQueue;\n");
@ -995,23 +997,15 @@ void PrintHeaderServerMethodCallback(
"controller) {\n"
" return this->$"
"Method$(context, request, response, controller);\n"
" }, nullptr));\n}\n");
printer->Print(
*vars,
"void SetMessageAllocatorFor_$Method$(\n"
" ::grpc::MessageAllocator<$RealRequest$, $RealResponse$>* "
"allocator) {\n"
" ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n"
" new ::grpc::internal::CallbackUnaryHandler< "
"$RealRequest$, $RealResponse$>(\n"
" [this](::grpc::ServerContext* context,\n"
" const $RealRequest$* request,\n"
" $RealResponse$* response,\n"
" ::grpc::experimental::ServerCallbackRpcController* "
"controller) {\n"
" return this->$"
"Method$(context, request, response, controller);\n"
" }, allocator));\n");
" }));\n}\n");
printer->Print(*vars,
"void SetMessageAllocatorFor_$Method$(\n"
" ::grpc::experimental::MessageAllocator< "
"$RealRequest$, $RealResponse$>* allocator) {\n"
" dynamic_cast<::grpc::internal::CallbackUnaryHandler< "
"$RealRequest$, $RealResponse$>*>(\n"
" ::grpc::Service::experimental().GetHandler($Idx$))\n"
" ->SetMessageAllocator(allocator);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,

@ -590,7 +590,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
void* request_;
void* handler_data_ = nullptr;
void* handler_data_;
Status request_status_;
grpc_call_details* call_details_ = nullptr;
grpc_call* call_;

@ -41,8 +41,10 @@
#include <grpcpp/impl/codegen/sync_stream.h>
namespace grpc {
namespace experimental {
template <typename RequestT, typename ResponseT>
class MessageAllocator;
} // namespace experimental
class CompletionQueue;
class Channel;
class ServerCompletionQueue;
@ -332,18 +334,13 @@ class ServiceA final {
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->MethodA1(context, request, response, controller);
}, nullptr));
}));
}
void SetMessageAllocatorFor_MethodA1(
::grpc::MessageAllocator<::grpc::testing::Request, ::grpc::testing::Response>* allocator) {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
[this](::grpc::ServerContext* context,
const ::grpc::testing::Request* request,
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->MethodA1(context, request, response, controller);
}, allocator));
::grpc::experimental::MessageAllocator< ::grpc::testing::Request, ::grpc::testing::Response>* allocator) {
dynamic_cast<::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>*>(
::grpc::Service::experimental().GetHandler(0))
->SetMessageAllocator(allocator);
}
~ExperimentalWithCallbackMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@ -811,18 +808,13 @@ class ServiceB final {
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->MethodB1(context, request, response, controller);
}, nullptr));
}));
}
void SetMessageAllocatorFor_MethodB1(
::grpc::MessageAllocator<::grpc::testing::Request, ::grpc::testing::Response>* allocator) {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
[this](::grpc::ServerContext* context,
const ::grpc::testing::Request* request,
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->MethodB1(context, request, response, controller);
}, allocator));
::grpc::experimental::MessageAllocator< ::grpc::testing::Request, ::grpc::testing::Response>* allocator) {
dynamic_cast<::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>*>(
::grpc::Service::experimental().GetHandler(0))
->SetMessageAllocator(allocator);
}
~ExperimentalWithCallbackMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);

@ -129,7 +129,8 @@ class MessageAllocatorEnd2endTestBase
~MessageAllocatorEnd2endTestBase() = default;
void CreateServer(MessageAllocator<EchoRequest, EchoResponse>* allocator) {
void CreateServer(
experimental::MessageAllocator<EchoRequest, EchoResponse>* allocator) {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
@ -180,7 +181,7 @@ class MessageAllocatorEnd2endTestBase
EchoResponse response;
ClientContext cli_ctx;
test_string += "Hello world. ";
test_string += grpc::string(1024, 'x');
request.set_message(test_string);
grpc::string val;
cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
@ -226,20 +227,24 @@ TEST_P(NullAllocatorTest, SimpleRpc) {
class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
public:
class SimpleAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
class SimpleAllocator
: public experimental::MessageAllocator<EchoRequest, EchoResponse> {
public:
void AllocateMessages(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void AllocateMessages(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
allocation_count++;
info->request = new EchoRequest;
info->response = new EchoResponse;
info->allocator_state = info;
}
void DeallocateRequest(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void DeallocateRequest(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
request_deallocation_count++;
delete info->request;
info->request = nullptr;
}
void DeallocateMessages(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void DeallocateMessages(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
messages_deallocation_count++;
delete info->request;
delete info->response;
@ -284,8 +289,9 @@ TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
auto mutator = [&released_requests](void* allocator_state,
const EchoRequest* req,
EchoResponse* resp) {
auto* info = static_cast<RpcAllocatorInfo<EchoRequest, EchoResponse>*>(
allocator_state);
auto* info =
static_cast<experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>*>(
allocator_state);
EXPECT_EQ(req, info->request);
EXPECT_EQ(resp, info->response);
EXPECT_EQ(allocator_state, info->allocator_state);
@ -307,9 +313,11 @@ TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
public:
class ArenaAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
class ArenaAllocator
: public experimental::MessageAllocator<EchoRequest, EchoResponse> {
public:
void AllocateMessages(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void AllocateMessages(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
allocation_count++;
auto* arena = new google::protobuf::Arena;
info->allocator_state = arena;
@ -318,10 +326,12 @@ class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
info->response =
google::protobuf::Arena::CreateMessage<EchoResponse>(arena);
}
void DeallocateRequest(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void DeallocateRequest(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
GPR_ASSERT(0);
}
void DeallocateMessages(RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
void DeallocateMessages(
experimental::RpcAllocatorInfo<EchoRequest, EchoResponse>* info) {
deallocation_count++;
auto* arena =
static_cast<google::protobuf::Arena*>(info->allocator_state);

Loading…
Cancel
Save