Allocate using call arenas

pull/16610/head
Vijay Pai 6 years ago
parent d4afae9a56
commit 5474e92292
  1. 22
      include/grpcpp/impl/codegen/callback_common.h
  2. 17
      include/grpcpp/impl/codegen/client_callback.h
  3. 59
      src/cpp/common/callback_common.cc

@ -37,11 +37,14 @@ namespace internal {
class CallbackWithStatusTag { class CallbackWithStatusTag {
public: public:
// TODO(vjpai): make impl and ops part of this structure to avoid allocation, // always allocated against a call arena, no memory free required
// ownership transfer, and delete static void operator delete(void* ptr, std::size_t size) {
CallbackWithStatusTag(std::function<void(Status)> f, bool self_delete, assert(size == sizeof(CallbackWithStatusTag));
}
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
CompletionQueueTag* ops); CompletionQueueTag* ops);
~CallbackWithStatusTag() { delete ops_; } ~CallbackWithStatusTag() {}
void* tag() { return static_cast<void*>(impl_); } void* tag() { return static_cast<void*>(impl_); }
Status* status_ptr() { return status_; } Status* status_ptr() { return status_; }
CompletionQueueTag* ops() { return ops_; } CompletionQueueTag* ops() { return ops_; }
@ -57,11 +60,14 @@ class CallbackWithStatusTag {
class CallbackWithSuccessTag { class CallbackWithSuccessTag {
public: public:
// TODO(vjpai): make impl and ops part of this structure to avoid allocation, // always allocated against a call arena, no memory free required
// ownership transfer, and delete static void operator delete(void* ptr, std::size_t size) {
CallbackWithSuccessTag(std::function<void(bool)> f, bool self_delete, assert(size == sizeof(CallbackWithSuccessTag));
}
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops); CompletionQueueTag* ops);
~CallbackWithSuccessTag() { delete ops_; }
void* tag() { return static_cast<void*>(impl_); } void* tag() { return static_cast<void*>(impl_); }
CompletionQueueTag* ops() { return ops_; } CompletionQueueTag* ops() { return ops_; }

@ -57,18 +57,21 @@ class CallbackUnaryCallImpl {
std::function<void(Status)> on_completion) { std::function<void(Status)> on_completion) {
CompletionQueue* cq = channel->CallbackCQ(); CompletionQueue* cq = channel->CallbackCQ();
GPR_CODEGEN_ASSERT(cq != nullptr); GPR_CODEGEN_ASSERT(cq != nullptr);
Call call(channel->CreateCall(method, context, cq));
// TODO(vjpai): Allocate this as part of the tag's arena using FullCallOpSet =
auto* ops = new CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
CallOpRecvMessage<OutputMessage>,
CallOpClientSendClose, CallOpClientRecvStatus>; CallOpClientSendClose, CallOpClientRecvStatus>;
// TODO(vjpai): Move to using pre-allocated tags rather than new/self-delete auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc(
auto* tag = new CallbackWithStatusTag(on_completion, true, ops); call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(CallbackWithStatusTag)))
CallbackWithStatusTag(call.call(), on_completion, ops);
// TODO(vjpai): Unify code with sync API as much as possible // TODO(vjpai): Unify code with sync API as much as possible
Call call(channel->CreateCall(method, context, cq));
Status s = ops->SendMessage(*request); Status s = ops->SendMessage(*request);
if (!s.ok()) { if (!s.ok()) {
tag->force_run(s); tag->force_run(s);

@ -30,9 +30,15 @@ namespace internal {
namespace { namespace {
class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
public: public:
CallbackWithSuccessImpl(CallbackWithSuccessTag* parent, static void operator delete(void* ptr, std::size_t size) {
std::function<void(bool)> f, bool self_delete) assert(size == sizeof(CallbackWithSuccessImpl));
: parent_(parent), func_(std::move(f)), self_delete_(self_delete) {} }
CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
std::function<void(bool)> f)
: call_(call), parent_(parent), func_(std::move(f)) {
grpc_call_ref(call);
}
void Run(bool ok) override { void Run(bool ok) override {
void* ignored = parent_->ops(); void* ignored = parent_->ops();
@ -40,27 +46,27 @@ class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == parent_->ops()); GPR_ASSERT(ignored == parent_->ops());
func_(ok); func_(ok);
if (self_delete_) { func_ = nullptr; // release the function
delete parent_; grpc_call_unref(call_);
// Must use grpc_core::Delete since base is GRPC_ABSTRACT
grpc_core::Delete(this);
}
} }
private: private:
grpc_call* call_;
CallbackWithSuccessTag* parent_; CallbackWithSuccessTag* parent_;
std::function<void(bool)> func_; std::function<void(bool)> func_;
bool self_delete_;
}; };
class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
public: public:
CallbackWithStatusImpl(CallbackWithStatusTag* parent, static void operator delete(void* ptr, std::size_t size) {
std::function<void(Status)> f, bool self_delete) assert(size == sizeof(CallbackWithStatusImpl));
: parent_(parent), }
func_(std::move(f)),
status_(), CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
self_delete_(self_delete) {} std::function<void(Status)> f)
: call_(call), parent_(parent), func_(std::move(f)), status_() {
grpc_call_ref(call);
}
void Run(bool ok) override { void Run(bool ok) override {
void* ignored = parent_->ops(); void* ignored = parent_->ops();
@ -69,36 +75,35 @@ class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
GPR_ASSERT(ignored == parent_->ops()); GPR_ASSERT(ignored == parent_->ops());
func_(status_); func_(status_);
if (self_delete_) { func_ = nullptr; // release the function
delete parent_; grpc_call_unref(call_);
// Must use grpc_core::Delete since base is GRPC_ABSTRACT
grpc_core::Delete(this);
}
} }
Status* status_ptr() { return &status_; } Status* status_ptr() { return &status_; }
private: private:
grpc_call* call_;
CallbackWithStatusTag* parent_; CallbackWithStatusTag* parent_;
std::function<void(Status)> func_; std::function<void(Status)> func_;
Status status_; Status status_;
bool self_delete_;
}; };
} // namespace } // namespace
CallbackWithSuccessTag::CallbackWithSuccessTag(std::function<void(bool)> f, CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
bool self_delete, std::function<void(bool)> f,
CompletionQueueTag* ops) CompletionQueueTag* ops)
: impl_(grpc_core::New<CallbackWithSuccessImpl>(this, f, self_delete)), : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
CallbackWithSuccessImpl(call, this, std::move(f))),
ops_(ops) {} ops_(ops) {}
void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
CallbackWithStatusTag::CallbackWithStatusTag(std::function<void(Status)> f, CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
bool self_delete, std::function<void(Status)> f,
CompletionQueueTag* ops) CompletionQueueTag* ops)
: ops_(ops) { : ops_(ops) {
auto* impl = grpc_core::New<CallbackWithStatusImpl>(this, f, self_delete); auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
CallbackWithStatusImpl(call, this, std::move(f));
impl_ = impl; impl_ = impl;
status_ = impl->status_ptr(); status_ = impl->status_ptr();
} }

Loading…
Cancel
Save