[EXPERIMENTAL] allocate unary response writer against call arena

pull/10182/head
Craig Tiller 8 years ago
parent 9f6b9aba88
commit 5845091432
  1. 53
      include/grpc++/impl/codegen/async_unary_call.h
  2. 4
      include/grpc/grpc.h
  3. 4
      src/compiler/cpp_generator.cc
  4. 4
      src/core/lib/surface/call.c

@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
#define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H #define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
#include <assert.h>
#include <grpc++/impl/codegen/call.h> #include <grpc++/impl/codegen/call.h>
#include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/codegen/client_context.h> #include <grpc++/impl/codegen/client_context.h>
@ -41,6 +42,8 @@
#include <grpc++/impl/codegen/service_type.h> #include <grpc++/impl/codegen/service_type.h>
#include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/status.h>
extern "C" void* grpc_call_arena_alloc(grpc_call* call, size_t size);
namespace grpc { namespace grpc {
class CompletionQueue; class CompletionQueue;
@ -59,19 +62,33 @@ class ClientAsyncResponseReader final
: public ClientAsyncResponseReaderInterface<R> { : public ClientAsyncResponseReaderInterface<R> {
public: public:
template <class W> template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, static ClientAsyncResponseReader* Create(ChannelInterface* channel,
const RpcMethod& method, ClientContext* context, CompletionQueue* cq,
const W& request) const RpcMethod& method,
: context_(context), ClientContext* context,
call_(channel->CreateCall(method, context, cq)), const W& request) {
collection_(std::make_shared<CallOpSetCollection>()) { Call call = channel->CreateCall(method, context, cq);
collection_->init_buf_.SetCollection(collection_); ClientAsyncResponseReader* reader = static_cast<ClientAsyncResponseReader*>(
collection_->init_buf_.SendInitialMetadata( grpc_call_arena_alloc(call.call(), sizeof(*reader)));
new (&reader->call_) Call(std::move(call));
reader->context_ = context;
new (&reader->collection_) std::shared_ptr<CallOpSetCollection>(
new (grpc_call_arena_alloc(call.call(), sizeof(CallOpSetCollection)))
CallOpSetCollection());
reader->collection_->init_buf_.SetCollection(reader->collection_);
reader->collection_->init_buf_.SendInitialMetadata(
context->send_initial_metadata_, context->initial_metadata_flags()); context->send_initial_metadata_, context->initial_metadata_flags());
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok()); GPR_CODEGEN_ASSERT(
collection_->init_buf_.ClientSendClose(); reader->collection_->init_buf_.SendMessage(request).ok());
call_.PerformOps(&collection_->init_buf_); reader->collection_->init_buf_.ClientSendClose();
reader->call_.PerformOps(&reader->collection_->init_buf_);
return reader;
}
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientAsyncResponseReader));
} }
void ReadInitialMetadata(void* tag) { void ReadInitialMetadata(void* tag) {
@ -99,7 +116,10 @@ class ClientAsyncResponseReader final
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
class CallOpSetCollection : public CallOpSetCollectionInterface { // disable operator new
static void* operator new(std::size_t size);
class CallOpSetCollection final : public CallOpSetCollectionInterface {
public: public:
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> CallOpClientSendClose>
@ -108,6 +128,15 @@ class ClientAsyncResponseReader final
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
CallOpClientRecvStatus> CallOpClientRecvStatus>
finish_buf_; finish_buf_;
static void* operator new(std::size_t size, void* p) { return p; }
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallOpSetCollection));
}
private:
// disable operator new
static void* operator new(std::size_t size);
}; };
std::shared_ptr<CallOpSetCollection> collection_; std::shared_ptr<CallOpSetCollection> collection_;
}; };

@ -198,6 +198,10 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call(
grpc_completion_queue *completion_queue, void *registered_call_handle, grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline, void *reserved); gpr_timespec deadline, void *reserved);
/** Allocate memory in the grpc_call arena: this memory is automatically
discarded at call completion */
GRPCAPI void *grpc_call_arena_alloc(grpc_call *call, size_t size);
/** Start a batch of operations defined in the array ops; when complete, post a /** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call. completion of type 'tag' to the completion queue bound to the call.
The order of ops specified in the batch has no significance. The order of ops specified in the batch has no significance.

@ -1078,8 +1078,8 @@ void PrintSourceClientMethod(Printer *printer, const Method *method,
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars, printer->Print(*vars,
" return new " " return "
"::grpc::ClientAsyncResponseReader< $Response$>(" "::grpc::ClientAsyncResponseReader< $Response$>::Create("
"channel_.get(), cq, " "channel_.get(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request);\n" "context, request);\n"

@ -268,6 +268,10 @@ static void add_init_error(grpc_error **composite, grpc_error *new) {
*composite = grpc_error_add_child(*composite, new); *composite = grpc_error_add_child(*composite, new);
} }
void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
return gpr_arena_alloc(call->arena, size);
}
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
const grpc_call_create_args *args, const grpc_call_create_args *args,
grpc_call **out_call) { grpc_call **out_call) {

Loading…
Cancel
Save