From 5474e92292a37ea9017db02da1e8e6aea621b156 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 30 Aug 2018 17:21:07 -0700 Subject: [PATCH] Allocate using call arenas --- include/grpcpp/impl/codegen/callback_common.h | 22 ++++--- include/grpcpp/impl/codegen/client_callback.h | 19 +++--- src/cpp/common/callback_common.cc | 59 ++++++++++--------- 3 files changed, 57 insertions(+), 43 deletions(-) diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index b14ed0c24f8..3481460c767 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -37,11 +37,14 @@ namespace internal { class CallbackWithStatusTag { public: - // TODO(vjpai): make impl and ops part of this structure to avoid allocation, - // ownership transfer, and delete - CallbackWithStatusTag(std::function f, bool self_delete, + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusTag)); + } + + CallbackWithStatusTag(grpc_call* call, std::function f, CompletionQueueTag* ops); - ~CallbackWithStatusTag() { delete ops_; } + ~CallbackWithStatusTag() {} void* tag() { return static_cast(impl_); } Status* status_ptr() { return status_; } CompletionQueueTag* ops() { return ops_; } @@ -57,11 +60,14 @@ class CallbackWithStatusTag { class CallbackWithSuccessTag { public: - // TODO(vjpai): make impl and ops part of this structure to avoid allocation, - // ownership transfer, and delete - CallbackWithSuccessTag(std::function f, bool self_delete, + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessTag)); + } + + CallbackWithSuccessTag(grpc_call* call, std::function f, CompletionQueueTag* ops); - ~CallbackWithSuccessTag() { delete ops_; } + void* tag() { return static_cast(impl_); } CompletionQueueTag* ops() { return ops_; } diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 419933f85cc..fc81c8aa0ae 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -57,18 +57,21 @@ class CallbackUnaryCallImpl { std::function on_completion) { CompletionQueue* cq = channel->CallbackCQ(); GPR_CODEGEN_ASSERT(cq != nullptr); + Call call(channel->CreateCall(method, context, cq)); + + using FullCallOpSet = + CallOpSet, + CallOpClientSendClose, CallOpClientRecvStatus>; - // TODO(vjpai): Allocate this as part of the tag's arena - auto* ops = new CallOpSet, - CallOpClientSendClose, CallOpClientRecvStatus>; + auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(FullCallOpSet))) FullCallOpSet; - // TODO(vjpai): Move to using pre-allocated tags rather than new/self-delete - auto* tag = new CallbackWithStatusTag(on_completion, true, ops); + auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(CallbackWithStatusTag))) + CallbackWithStatusTag(call.call(), on_completion, ops); // TODO(vjpai): Unify code with sync API as much as possible - Call call(channel->CreateCall(method, context, cq)); Status s = ops->SendMessage(*request); if (!s.ok()) { tag->force_run(s); diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc index a2c32fe72ba..68df1166ba9 100644 --- a/src/cpp/common/callback_common.cc +++ b/src/cpp/common/callback_common.cc @@ -30,9 +30,15 @@ namespace internal { namespace { class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { public: - CallbackWithSuccessImpl(CallbackWithSuccessTag* parent, - std::function f, bool self_delete) - : parent_(parent), func_(std::move(f)), self_delete_(self_delete) {} + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessImpl)); + } + + CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, + std::function f) + : call_(call), parent_(parent), func_(std::move(f)) { + grpc_call_ref(call); + } void Run(bool ok) override { void* ignored = parent_->ops(); @@ -40,27 +46,27 @@ class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(ignored == parent_->ops()); func_(ok); - if (self_delete_) { - delete parent_; - // Must use grpc_core::Delete since base is GRPC_ABSTRACT - grpc_core::Delete(this); - } + func_ = nullptr; // release the function + grpc_call_unref(call_); } private: + grpc_call* call_; CallbackWithSuccessTag* parent_; std::function func_; - bool self_delete_; }; class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { public: - CallbackWithStatusImpl(CallbackWithStatusTag* parent, - std::function f, bool self_delete) - : parent_(parent), - func_(std::move(f)), - status_(), - self_delete_(self_delete) {} + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusImpl)); + } + + CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, + std::function f) + : call_(call), parent_(parent), func_(std::move(f)), status_() { + grpc_call_ref(call); + } void Run(bool ok) override { void* ignored = parent_->ops(); @@ -69,36 +75,35 @@ class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { GPR_ASSERT(ignored == parent_->ops()); func_(status_); - if (self_delete_) { - delete parent_; - // Must use grpc_core::Delete since base is GRPC_ABSTRACT - grpc_core::Delete(this); - } + func_ = nullptr; // release the function + grpc_call_unref(call_); } Status* status_ptr() { return &status_; } private: + grpc_call* call_; CallbackWithStatusTag* parent_; std::function func_; Status status_; - bool self_delete_; }; } // namespace -CallbackWithSuccessTag::CallbackWithSuccessTag(std::function f, - bool self_delete, +CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, + std::function f, CompletionQueueTag* ops) - : impl_(grpc_core::New(this, f, self_delete)), + : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) + CallbackWithSuccessImpl(call, this, std::move(f))), ops_(ops) {} void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } -CallbackWithStatusTag::CallbackWithStatusTag(std::function f, - bool self_delete, +CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, + std::function f, CompletionQueueTag* ops) : ops_(ops) { - auto* impl = grpc_core::New(this, f, self_delete); + auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) + CallbackWithStatusImpl(call, this, std::move(f)); impl_ = impl; status_ = impl->status_ptr(); }