From d4e9a4863a25f40389db01347aaabcb798dc9138 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 5 Apr 2017 15:41:59 -0700 Subject: [PATCH] Convert all async client stream types to not allocate --- include/grpc++/impl/codegen/async_stream.h | 129 ++++++++++++------ .../grpc++/impl/codegen/async_unary_call.h | 27 ++-- src/compiler/cpp_generator.cc | 19 +-- src/cpp/client/generic_stub.cc | 2 +- 4 files changed, 110 insertions(+), 67 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 8f529895cac..f97d824bafe 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -145,17 +145,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { public: /// Create a stream and write the first request out. template - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); + static ClientAsyncReader* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncReader)); } void ReadInitialMetadata(void* tag) override { @@ -185,6 +187,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { } private: + template + ClientAsyncReader(Call call, ClientContext* context, const W& request, + void* tag) + : context_(context), call_(call) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); + } + ClientContext* context_; Call call_; CallOpSet @@ -210,23 +225,19 @@ template class ClientAsyncWriter final : public ClientAsyncWriterInterface { public: template - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - if (context_->initial_metadata_corked_) { - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - } else { - write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&write_ops_); - } + static ClientAsyncWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncWriter)); } void ReadInitialMetadata(void* tag) override { @@ -271,6 +282,24 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { } private: + template + ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) + : context_(context), call_(call) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + if (context_->initial_metadata_corked_) { + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } + } + ClientContext* context_; Call call_; CallOpSet meta_ops_; @@ -298,21 +327,20 @@ template class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface { public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - if (context_->initial_metadata_corked_) { - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - } else { - write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&write_ops_); - } + static ClientAsyncReaderWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, + const RpcMethod& method, + ClientContext* context, void* tag) { + Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncReaderWriter)); } void ReadInitialMetadata(void* tag) override { @@ -366,6 +394,21 @@ class ClientAsyncReaderWriter final } private: + ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) + : context_(context), call_(call) { + if (context_->initial_metadata_corked_) { + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } + } + ClientContext* context_; Call call_; CallOpSet meta_ops_; diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 13b0cc419c6..a147a6acbf8 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -66,18 +66,9 @@ class ClientAsyncResponseReader final ClientContext* context, const W& request) { Call call = channel->CreateCall(method, context, cq); - ClientAsyncResponseReader* reader = - new (g_core_codegen_interface->grpc_call_arena_alloc(call.call(), - sizeof(*reader))) - ClientAsyncResponseReader(call, context); - - reader->init_buf_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(reader->init_buf_.SendMessage(request).ok()); - reader->init_buf_.ClientSendClose(); - reader->call_.PerformOps(&reader->init_buf_); - return reader; + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); } // always allocated against a call arena, no memory free required @@ -108,8 +99,16 @@ class ClientAsyncResponseReader final ClientContext* const context_; Call call_; - ClientAsyncResponseReader(Call call, ClientContext* context) - : context_(context), call_(call) {} + template + ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) + : context_(context), call_(call) { + init_buf_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(init_buf_.SendMessage(request).ok()); + init_buf_.ClientSendClose(); + call_.PerformOps(&init_buf_); + } // disable operator new static void* operator new(std::size_t size); diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 34d641f7385..4cc6a4cf393 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -1129,7 +1129,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return new ::grpc::ClientAsyncWriter< $Request$>(" + " return ::grpc::ClientAsyncWriter< $Request$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, response, tag);\n" @@ -1152,7 +1152,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return new ::grpc::ClientAsyncReader< $Response$>(" + " return ::grpc::ClientAsyncReader< $Response$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, tag);\n" @@ -1174,13 +1174,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return new " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + printer->Print( + *vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, tag);\n" + "}\n\n"); } } diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 7a2fdf941ce..4adb2a53599 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -42,7 +42,7 @@ std::unique_ptr GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr( - new GenericClientAsyncReaderWriter( + GenericClientAsyncReaderWriter::Create( channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); }