From 20778b0c468db746088793da13699dd22c8993e0 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 11 Sep 2018 09:14:57 -0700 Subject: [PATCH] Address reviewer comments --- include/grpcpp/channel.h | 8 ++++++- include/grpcpp/impl/codegen/call.h | 8 ++++++- include/grpcpp/impl/codegen/callback_common.h | 22 ++++++++++++++++-- .../impl/codegen/completion_queue_tag.h | 23 +++++++++++++++---- src/cpp/common/callback_common.cc | 14 +++++++++++ 5 files changed, 67 insertions(+), 8 deletions(-) diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index 58a6f516648..f1dba5b8ad1 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -83,8 +83,14 @@ class Channel final : public ChannelInterface, const grpc::string host_; grpc_channel* const c_channel_; // owned - CompletionQueue* callback_cq_ = nullptr; + // mu_ protects callback_cq_ (the per-channel callbackable completion queue) std::mutex mu_; + + // callback_cq_ references the callbackable completion queue associated + // with this channel (if any). It is set on the first call to CallbackCQ(). + // It is _not owned_ by the channel; ownership belongs with its internal + // shutdown callback tag (invoked when the CQ is fully shutdown). + CompletionQueue* callback_cq_ = nullptr; }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 7bd03b6b1bd..9e4163ec95c 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -609,7 +609,9 @@ class CallOpSetInterface : public CompletionQueueTag { /// upwards. virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; - /// Get the tag to be used at the CQ + /// Get the tag to be used at the core completion queue. Generally, the + /// value of cq_tag will be "this". However, it can be overridden if we + /// want core to process the tag differently (e.g., as a core callback) virtual void* cq_tag() = 0; }; @@ -659,6 +661,10 @@ class CallOpSet : public CallOpSetInterface, void* cq_tag() override { return cq_tag_; } + /// set_cq_tag is used to provide a different core CQ tag than "this". + /// This is used for callback-based tags, where the core tag is the core + /// callback function. It does not change the use or behavior of any other + /// function (such as FinalizeResult) void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } private: diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 3481460c767..68c318d2b44 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -42,6 +42,13 @@ class CallbackWithStatusTag { assert(size == sizeof(CallbackWithStatusTag)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + CallbackWithStatusTag(grpc_call* call, std::function f, CompletionQueueTag* ops); ~CallbackWithStatusTag() {} @@ -49,7 +56,9 @@ class CallbackWithStatusTag { Status* status_ptr() { return status_; } CompletionQueueTag* ops() { return ops_; } - // force_run can only be performed on a tag before it can ever be active + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. void force_run(Status s); private: @@ -65,13 +74,22 @@ class CallbackWithSuccessTag { assert(size == sizeof(CallbackWithSuccessTag)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + CallbackWithSuccessTag(grpc_call* call, std::function f, CompletionQueueTag* ops); void* tag() { return static_cast(impl_); } CompletionQueueTag* ops() { return ops_; } - // force_run can only be performed on a tag before it can ever be active + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. void force_run(bool ok); private: diff --git a/include/grpcpp/impl/codegen/completion_queue_tag.h b/include/grpcpp/impl/codegen/completion_queue_tag.h index ffb642c56b5..304386a9ecc 100644 --- a/include/grpcpp/impl/codegen/completion_queue_tag.h +++ b/include/grpcpp/impl/codegen/completion_queue_tag.h @@ -26,10 +26,25 @@ namespace internal { class CompletionQueueTag { public: virtual ~CompletionQueueTag() {} - /// Called prior to returning from Next(), return value is the status of the - /// operation (return status is the default thing to do). If this function - /// returns false, the tag is dropped and not returned from the completion - /// queue + + /// FinalizeResult must be called before informing user code that the + /// operation bound to the underlying core completion queue tag has + /// completed. In practice, this means: + /// + /// 1. For the sync API - before returning from Pluck + /// 2. For the CQ-based async API - before returning from Next + /// 3. For the callback-based API - before invoking the user callback + /// + /// This is the method that translates from core-side tag/status to + /// C++ API-observable tag/status. + /// + /// The return value is the status of the operation (returning status is the + /// general behavior of this function). If this function returns false, the + /// tag is dropped and not returned from the completion queue: this concept is + /// for events that are observed at core but not requested by the user + /// application (e.g., server shutdown, for server unimplemented method + /// responses, or for cases where a server-side RPC doesn't have a completion + /// notification registered using AsyncNotifyWhenDone) virtual bool FinalizeResult(void** tag, bool* status) = 0; }; } // namespace internal diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc index 68df1166ba9..ae47901f1be 100644 --- a/src/cpp/common/callback_common.cc +++ b/src/cpp/common/callback_common.cc @@ -34,6 +34,13 @@ class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { assert(size == sizeof(CallbackWithSuccessImpl)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, std::function f) : call_(call), parent_(parent), func_(std::move(f)) { @@ -62,6 +69,13 @@ class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { assert(size == sizeof(CallbackWithStatusImpl)); } + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, std::function f) : call_(call), parent_(parent), func_(std::move(f)), status_() {