From 4ba0f9afb54a1ec8f5a7b0bf2355b3a5d3964a2d Mon Sep 17 00:00:00 2001 From: Cheng-Yu Chung Date: Tue, 4 Oct 2022 13:59:32 -0400 Subject: [PATCH] Revert "Revert "Remove `include/grpcpp/impl/codegen/client_callback.h` (#31005)" (#31079)" (#31087) This reverts commit da08fe1d6fee9884a2adcf43424dd56e2f83ee04. --- include/grpcpp/impl/codegen/client_callback.h | 1209 +---------------- include/grpcpp/support/client_callback.h | 1207 +++++++++++++++- src/compiler/cpp_generator.cc | 4 +- test/cpp/codegen/compiler_test_golden | 2 +- 4 files changed, 1212 insertions(+), 1210 deletions(-) diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 04ab4aa8b8e..ac1bd93b71a 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -18,1212 +18,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H -// IWYU pragma: private, include +// IWYU pragma: private -#include -#include +/// TODO(chengyuc): Remove this file after solving compatibility. +#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace grpc { -class Channel; -class ClientContext; - -namespace internal { -class RpcMethod; - -/// Perform a callback-based unary call. May optionally specify the base -/// class of the Request and Response so that the internal calls and structures -/// below this may be based on those base classes and thus achieve code reuse -/// across different RPCs (e.g., for protobuf, MessageLite would be a base -/// class). -/// TODO(vjpai): Combine as much as possible with the blocking unary call code -template -void CallbackUnaryCall(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, - const InputMessage* request, OutputMessage* result, - std::function on_completion) { - static_assert(std::is_base_of::value, - "Invalid input message specification"); - static_assert(std::is_base_of::value, - "Invalid output message specification"); - CallbackUnaryCallImpl x( - channel, method, context, request, result, on_completion); -} - -template -class CallbackUnaryCallImpl { - public: - CallbackUnaryCallImpl(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, - const InputMessage* request, OutputMessage* result, - std::function on_completion) { - grpc::CompletionQueue* cq = channel->CallbackCQ(); - GPR_CODEGEN_ASSERT(cq != nullptr); - grpc::internal::Call call(channel->CreateCall(method, context, cq)); - - using FullCallOpSet = grpc::internal::CallOpSet< - grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpSendMessage, - grpc::internal::CallOpRecvInitialMetadata, - grpc::internal::CallOpRecvMessage, - grpc::internal::CallOpClientSendClose, - grpc::internal::CallOpClientRecvStatus>; - - struct OpSetAndTag { - FullCallOpSet opset; - grpc::internal::CallbackWithStatusTag tag; - }; - const size_t alloc_sz = sizeof(OpSetAndTag); - auto* const alloced = static_cast( - grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(), - alloc_sz)); - auto* ops = new (&alloced->opset) FullCallOpSet; - auto* tag = new (&alloced->tag) - grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops); - - // TODO(vjpai): Unify code with sync API as much as possible - grpc::Status s = ops->SendMessagePtr(request); - if (!s.ok()) { - tag->force_run(s); - return; - } - ops->SendInitialMetadata(&context->send_initial_metadata_, - context->initial_metadata_flags()); - ops->RecvInitialMetadata(context); - ops->RecvMessage(result); - ops->AllowNoMessage(); - ops->ClientSendClose(); - ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_core_cq_tag(tag); - call.PerformOps(ops); - } -}; - -// Base class for public API classes. -class ClientReactor { - public: - virtual ~ClientReactor() = default; - - /// Called by the library when all operations associated with this RPC have - /// completed and all Holds have been removed. OnDone provides the RPC status - /// outcome for both successful and failed RPCs. If it is never called on an - /// RPC, it indicates an application-level problem (like failure to remove a - /// hold). - /// - /// \param[in] s The status outcome of this RPC - virtual void OnDone(const grpc::Status& /*s*/) = 0; - - /// InternalScheduleOnDone is not part of the API and is not meant to be - /// overridden. It is virtual to allow successful builds for certain bazel - /// build users that only want to depend on gRPC codegen headers and not the - /// full library (although this is not a generally-supported option). Although - /// the virtual call is slower than a direct call, this function is - /// heavyweight and the cost of the virtual call is not much in comparison. - /// This function may be removed or devirtualized in the future. - virtual void InternalScheduleOnDone(grpc::Status s); - - /// InternalTrailersOnly is not part of the API and is not meant to be - /// overridden. It is virtual to allow successful builds for certain bazel - /// build users that only want to depend on gRPC codegen headers and not the - /// full library (although this is not a generally-supported option). Although - /// the virtual call is slower than a direct call, this function is - /// heavyweight and the cost of the virtual call is not much in comparison. - /// This function may be removed or devirtualized in the future. - virtual bool InternalTrailersOnly(const grpc_call* call) const; -}; - -} // namespace internal - -// Forward declarations -template -class ClientBidiReactor; -template -class ClientReadReactor; -template -class ClientWriteReactor; -class ClientUnaryReactor; - -// NOTE: The streaming objects are not actually implemented in the public API. -// These interfaces are provided for mocking only. Typical applications -// will interact exclusively with the reactors that they define. -template -class ClientCallbackReaderWriter { - public: - virtual ~ClientCallbackReaderWriter() {} - virtual void StartCall() = 0; - virtual void Write(const Request* req, grpc::WriteOptions options) = 0; - virtual void WritesDone() = 0; - virtual void Read(Response* resp) = 0; - virtual void AddHold(int holds) = 0; - virtual void RemoveHold() = 0; - - protected: - void BindReactor(ClientBidiReactor* reactor) { - reactor->BindStream(this); - } -}; - -template -class ClientCallbackReader { - public: - virtual ~ClientCallbackReader() {} - virtual void StartCall() = 0; - virtual void Read(Response* resp) = 0; - virtual void AddHold(int holds) = 0; - virtual void RemoveHold() = 0; - - protected: - void BindReactor(ClientReadReactor* reactor) { - reactor->BindReader(this); - } -}; - -template -class ClientCallbackWriter { - public: - virtual ~ClientCallbackWriter() {} - virtual void StartCall() = 0; - void Write(const Request* req) { Write(req, grpc::WriteOptions()); } - virtual void Write(const Request* req, grpc::WriteOptions options) = 0; - void WriteLast(const Request* req, grpc::WriteOptions options) { - Write(req, options.set_last_message()); - } - virtual void WritesDone() = 0; - - virtual void AddHold(int holds) = 0; - virtual void RemoveHold() = 0; - - protected: - void BindReactor(ClientWriteReactor* reactor) { - reactor->BindWriter(this); - } -}; - -class ClientCallbackUnary { - public: - virtual ~ClientCallbackUnary() {} - virtual void StartCall() = 0; - - protected: - void BindReactor(ClientUnaryReactor* reactor); -}; - -// The following classes are the reactor interfaces that are to be implemented -// by the user. They are passed in to the library as an argument to a call on a -// stub (either a codegen-ed call or a generic call). The streaming RPC is -// activated by calling StartCall, possibly after initiating StartRead, -// StartWrite, or AddHold operations on the streaming object. Note that none of -// the classes are pure; all reactions have a default empty reaction so that the -// user class only needs to override those reactions that it cares about. -// The reactor must be passed to the stub invocation before any of the below -// operations can be called and its reactions will be invoked by the library in -// response to the completion of various operations. Reactions must not include -// blocking operations (such as blocking I/O, starting synchronous RPCs, or -// waiting on condition variables). Reactions may be invoked concurrently, -// except that OnDone is called after all others (assuming proper API usage). -// The reactor may not be deleted until OnDone is called. - -/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. -template -class ClientBidiReactor : public internal::ClientReactor { - public: - /// Activate the RPC and initiate any reads or writes that have been Start'ed - /// before this call. All streaming RPCs issued by the client MUST have - /// StartCall invoked on them (even if they are canceled) as this call is the - /// activation of their lifecycle. - void StartCall() { stream_->StartCall(); } - - /// Initiate a read operation (or post it for later initiation if StartCall - /// has not yet been invoked). - /// - /// \param[out] resp Where to eventually store the read message. Valid when - /// the library calls OnReadDone - void StartRead(Response* resp) { stream_->Read(resp); } - - /// Initiate a write operation (or post it for later initiation if StartCall - /// has not yet been invoked). - /// - /// \param[in] req The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } - - /// Initiate/post a write operation with specified options. - /// - /// \param[in] req The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - /// \param[in] options The WriteOptions to use for writing this message - void StartWrite(const Request* req, grpc::WriteOptions options) { - stream_->Write(req, options); - } - - /// Initiate/post a write operation with specified options and an indication - /// that this is the last write (like StartWrite and StartWritesDone, merged). - /// Note that calling this means that no more calls to StartWrite, - /// StartWriteLast, or StartWritesDone are allowed. - /// - /// \param[in] req The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - /// \param[in] options The WriteOptions to use for writing this message - void StartWriteLast(const Request* req, grpc::WriteOptions options) { - StartWrite(req, options.set_last_message()); - } - - /// Indicate that the RPC will have no more write operations. This can only be - /// issued once for a given RPC. This is not required or allowed if - /// StartWriteLast is used since that already has the same implication. - /// Note that calling this means that no more calls to StartWrite, - /// StartWriteLast, or StartWritesDone are allowed. - void StartWritesDone() { stream_->WritesDone(); } - - /// Holds are needed if (and only if) this stream has operations that take - /// place on it after StartCall but from outside one of the reactions - /// (OnReadDone, etc). This is _not_ a common use of the streaming API. - /// - /// Holds must be added before calling StartCall. If a stream still has a hold - /// in place, its resources will not be destroyed even if the status has - /// already come in from the wire and there are currently no active callbacks - /// outstanding. Similarly, the stream will not call OnDone if there are still - /// holds on it. - /// - /// For example, if a StartRead or StartWrite operation is going to be - /// initiated from elsewhere in the application, the application should call - /// AddHold or AddMultipleHolds before StartCall. If there is going to be, - /// for example, a read-flow and a write-flow taking place outside the - /// reactions, then call AddMultipleHolds(2) before StartCall. When the - /// application knows that it won't issue any more read operations (such as - /// when a read comes back as not ok), it should issue a RemoveHold(). It - /// should also call RemoveHold() again after it does StartWriteLast or - /// StartWritesDone that indicates that there will be no more write ops. - /// The number of RemoveHold calls must match the total number of AddHold - /// calls plus the number of holds added by AddMultipleHolds. - /// The argument to AddMultipleHolds must be positive. - void AddHold() { AddMultipleHolds(1); } - void AddMultipleHolds(int holds) { - GPR_CODEGEN_DEBUG_ASSERT(holds > 0); - stream_->AddHold(holds); - } - void RemoveHold() { stream_->RemoveHold(); } - - /// Notifies the application that all operations associated with this RPC - /// have completed and all Holds have been removed. OnDone provides the RPC - /// status outcome for both successful and failed RPCs and will be called in - /// all cases. If it is not called, it indicates an application-level problem - /// (like failure to remove a hold). - /// - /// \param[in] s The status outcome of this RPC - void OnDone(const grpc::Status& /*s*/) override {} - - /// Notifies the application that a read of initial metadata from the - /// server is done. If the application chooses not to implement this method, - /// it can assume that the initial metadata has been read before the first - /// call of OnReadDone or OnDone. - /// - /// \param[in] ok Was the initial metadata read successfully? If false, no - /// new read/write operation will succeed, and any further - /// Start* operations should not be called. - virtual void OnReadInitialMetadataDone(bool /*ok*/) {} - - /// Notifies the application that a StartRead operation completed. - /// - /// \param[in] ok Was it successful? If false, no new read/write operation - /// will succeed, and any further Start* should not be called. - virtual void OnReadDone(bool /*ok*/) {} - - /// Notifies the application that a StartWrite or StartWriteLast operation - /// completed. - /// - /// \param[in] ok Was it successful? If false, no new read/write operation - /// will succeed, and any further Start* should not be called. - virtual void OnWriteDone(bool /*ok*/) {} - - /// Notifies the application that a StartWritesDone operation completed. Note - /// that this is only used on explicit StartWritesDone operations and not for - /// those that are implicitly invoked as part of a StartWriteLast. - /// - /// \param[in] ok Was it successful? If false, the application will later see - /// the failure reflected as a bad status in OnDone and no - /// further Start* should be called. - virtual void OnWritesDoneDone(bool /*ok*/) {} - - private: - friend class ClientCallbackReaderWriter; - void BindStream(ClientCallbackReaderWriter* stream) { - stream_ = stream; - } - ClientCallbackReaderWriter* stream_; -}; - -/// \a ClientReadReactor is the interface for a server-streaming RPC. -/// All public methods behave as in ClientBidiReactor. -template -class ClientReadReactor : public internal::ClientReactor { - public: - void StartCall() { reader_->StartCall(); } - void StartRead(Response* resp) { reader_->Read(resp); } - - void AddHold() { AddMultipleHolds(1); } - void AddMultipleHolds(int holds) { - GPR_CODEGEN_DEBUG_ASSERT(holds > 0); - reader_->AddHold(holds); - } - void RemoveHold() { reader_->RemoveHold(); } - - void OnDone(const grpc::Status& /*s*/) override {} - virtual void OnReadInitialMetadataDone(bool /*ok*/) {} - virtual void OnReadDone(bool /*ok*/) {} - - private: - friend class ClientCallbackReader; - void BindReader(ClientCallbackReader* reader) { reader_ = reader; } - ClientCallbackReader* reader_; -}; - -/// \a ClientWriteReactor is the interface for a client-streaming RPC. -/// All public methods behave as in ClientBidiReactor. -template -class ClientWriteReactor : public internal::ClientReactor { - public: - void StartCall() { writer_->StartCall(); } - void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } - void StartWrite(const Request* req, grpc::WriteOptions options) { - writer_->Write(req, options); - } - void StartWriteLast(const Request* req, grpc::WriteOptions options) { - StartWrite(req, options.set_last_message()); - } - void StartWritesDone() { writer_->WritesDone(); } - - void AddHold() { AddMultipleHolds(1); } - void AddMultipleHolds(int holds) { - GPR_CODEGEN_DEBUG_ASSERT(holds > 0); - writer_->AddHold(holds); - } - void RemoveHold() { writer_->RemoveHold(); } - - void OnDone(const grpc::Status& /*s*/) override {} - virtual void OnReadInitialMetadataDone(bool /*ok*/) {} - virtual void OnWriteDone(bool /*ok*/) {} - virtual void OnWritesDoneDone(bool /*ok*/) {} - - private: - friend class ClientCallbackWriter; - void BindWriter(ClientCallbackWriter* writer) { writer_ = writer; } - - ClientCallbackWriter* writer_; -}; - -/// \a ClientUnaryReactor is a reactor-style interface for a unary RPC. -/// This is _not_ a common way of invoking a unary RPC. In practice, this -/// option should be used only if the unary RPC wants to receive initial -/// metadata without waiting for the response to complete. Most deployments of -/// RPC systems do not use this option, but it is needed for generality. -/// All public methods behave as in ClientBidiReactor. -/// StartCall is included for consistency with the other reactor flavors: even -/// though there are no StartRead or StartWrite operations to queue before the -/// call (that is part of the unary call itself) and there is no reactor object -/// being created as a result of this call, we keep a consistent 2-phase -/// initiation API among all the reactor flavors. -class ClientUnaryReactor : public internal::ClientReactor { - public: - void StartCall() { call_->StartCall(); } - void OnDone(const grpc::Status& /*s*/) override {} - virtual void OnReadInitialMetadataDone(bool /*ok*/) {} - - private: - friend class ClientCallbackUnary; - void BindCall(ClientCallbackUnary* call) { call_ = call; } - ClientCallbackUnary* call_; -}; - -// Define function out-of-line from class to avoid forward declaration issue -inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) { - reactor->BindCall(this); -} - -namespace internal { - -// Forward declare factory classes for friendship -template -class ClientCallbackReaderWriterFactory; -template -class ClientCallbackReaderFactory; -template -class ClientCallbackWriterFactory; - -template -class ClientCallbackReaderWriterImpl - : public ClientCallbackReaderWriter { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override { - // This call initiates two batches, plus any backlog, each with a callback - // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Any read backlog - // 3. Any write backlog - // 4. Recv trailing metadata (unless corked) - if (!start_corked_) { - start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - } - - call_.PerformOps(&start_ops_); - - { - grpc::internal::MutexLock lock(&start_mu_); - - if (backlog_.read_ops) { - call_.PerformOps(&read_ops_); - } - if (backlog_.write_ops) { - call_.PerformOps(&write_ops_); - } - if (backlog_.writes_done_ops) { - call_.PerformOps(&writes_done_ops_); - } - call_.PerformOps(&finish_ops_); - // The last thing in this critical section is to set started_ so that it - // can be used lock-free as well. - started_.store(true, std::memory_order_release); - } - // MaybeFinish outside the lock to make sure that destruction of this object - // doesn't take place while holding the lock (which would cause the lock to - // be released after destruction) - this->MaybeFinish(/*from_reaction=*/false); - } - - void Read(Response* msg) override { - read_ops_.RecvMessage(msg); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.read_ops = true; - return; - } - } - call_.PerformOps(&read_ops_); - } - - void Write(const Request* msg, grpc::WriteOptions options) - ABSL_LOCKS_EXCLUDED(start_mu_) override { - if (options.is_last_message()) { - options.set_buffer_hint(); - write_ops_.ClientSendClose(); - } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(corked_write_needed_)) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.write_ops = true; - return; - } - } - call_.PerformOps(&write_ops_); - } - void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override { - writes_done_ops_.ClientSendClose(); - writes_done_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnWritesDoneDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &writes_done_ops_, /*can_inline=*/false); - writes_done_ops_.set_core_cq_tag(&writes_done_tag_); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(corked_write_needed_)) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.writes_done_ops = true; - return; - } - } - call_.PerformOps(&writes_done_ops_); - } - - void AddHold(int holds) override { - callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); - } - void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } - - private: - friend class ClientCallbackReaderWriterFactory; - - ClientCallbackReaderWriterImpl(grpc::internal::Call call, - grpc::ClientContext* context, - ClientBidiReactor* reactor) - : context_(context), - call_(call), - reactor_(reactor), - start_corked_(context_->initial_metadata_corked_), - corked_write_needed_(start_corked_) { - this->BindReactor(reactor); - - // Set up the unchanging parts of the start, read, and write tags and ops. - start_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone( - ok && !reactor_->InternalTrailersOnly(call_.call())); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - - write_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - read_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - - // Also set up the Finish tag and op set. - finish_tag_.Set( - call_.call(), - [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, - &finish_ops_, - /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - } - - // MaybeFinish can be called from reactions or from user-initiated operations - // like StartCall or RemoveHold. If this is the last operation or hold on this - // object, it will invoke the OnDone reaction. If MaybeFinish was called from - // a reaction, it can call OnDone directly. If not, it would need to schedule - // OnDone onto an executor thread to avoid the possibility of deadlocking with - // any locks in the user code that invoked it. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackReaderWriterImpl(); - grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - - grpc::ClientContext* const context_; - grpc::internal::Call call_; - ClientBidiReactor* const reactor_; - - grpc::internal::CallOpSet - start_ops_; - grpc::internal::CallbackWithSuccessTag start_tag_; - const bool start_corked_; - bool corked_write_needed_; // no lock needed since only accessed in - // Write/WritesDone which cannot be concurrent - - grpc::internal::CallOpSet finish_ops_; - grpc::internal::CallbackWithSuccessTag finish_tag_; - grpc::Status finish_status_; - - grpc::internal::CallOpSet - write_ops_; - grpc::internal::CallbackWithSuccessTag write_tag_; - - grpc::internal::CallOpSet - writes_done_ops_; - grpc::internal::CallbackWithSuccessTag writes_done_tag_; - - grpc::internal::CallOpSet> - read_ops_; - grpc::internal::CallbackWithSuccessTag read_tag_; - - struct StartCallBacklog { - bool write_ops = false; - bool writes_done_ops = false; - bool read_ops = false; - }; - StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); - - // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish - std::atomic callbacks_outstanding_{3}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; -}; - -template -class ClientCallbackReaderWriterFactory { - public: - static void Create(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, - ClientBidiReactor* reactor) { - grpc::internal::Call call = - channel->CreateCall(method, context, channel->CallbackCQ()); - - grpc::g_core_codegen_interface->grpc_call_ref(call.call()); - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientCallbackReaderWriterImpl))) - ClientCallbackReaderWriterImpl(call, context, - reactor); - } -}; - -template -class ClientCallbackReaderImpl : public ClientCallbackReader { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall() override { - // This call initiates two batches, plus any backlog, each with a callback - // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Any backlog - // 3. Recv trailing metadata - - start_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone( - ok && !reactor_->InternalTrailersOnly(call_.call())); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - call_.PerformOps(&start_ops_); - - // Also set up the read tag so it doesn't have to be set up each time - read_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - - { - grpc::internal::MutexLock lock(&start_mu_); - if (backlog_.read_ops) { - call_.PerformOps(&read_ops_); - } - started_.store(true, std::memory_order_release); - } - - finish_tag_.Set( - call_.call(), - [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, - &finish_ops_, /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - } - - void Read(Response* msg) override { - read_ops_.RecvMessage(msg); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.read_ops = true; - return; - } - } - call_.PerformOps(&read_ops_); - } - - void AddHold(int holds) override { - callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); - } - void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } - - private: - friend class ClientCallbackReaderFactory; - - template - ClientCallbackReaderImpl(grpc::internal::Call call, - grpc::ClientContext* context, Request* request, - ClientReadReactor* reactor) - : context_(context), call_(call), reactor_(reactor) { - this->BindReactor(reactor); - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); - start_ops_.ClientSendClose(); - } - - // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackReaderImpl(); - grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - - grpc::ClientContext* const context_; - grpc::internal::Call call_; - ClientReadReactor* const reactor_; - - grpc::internal::CallOpSet - start_ops_; - grpc::internal::CallbackWithSuccessTag start_tag_; - - grpc::internal::CallOpSet finish_ops_; - grpc::internal::CallbackWithSuccessTag finish_tag_; - grpc::Status finish_status_; - - grpc::internal::CallOpSet> - read_ops_; - grpc::internal::CallbackWithSuccessTag read_tag_; - - struct StartCallBacklog { - bool read_ops = false; - }; - StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); - - // Minimum of 2 callbacks to pre-register for start and finish - std::atomic callbacks_outstanding_{2}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; -}; - -template -class ClientCallbackReaderFactory { - public: - template - static void Create(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, const Request* request, - ClientReadReactor* reactor) { - grpc::internal::Call call = - channel->CreateCall(method, context, channel->CallbackCQ()); - - grpc::g_core_codegen_interface->grpc_call_ref(call.call()); - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientCallbackReaderImpl))) - ClientCallbackReaderImpl(call, context, request, reactor); - } -}; - -template -class ClientCallbackWriterImpl : public ClientCallbackWriter { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override { - // This call initiates two batches, plus any backlog, each with a callback - // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Any backlog - // 3. Recv trailing metadata - - if (!start_corked_) { - start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - } - call_.PerformOps(&start_ops_); - - { - grpc::internal::MutexLock lock(&start_mu_); - - if (backlog_.write_ops) { - call_.PerformOps(&write_ops_); - } - if (backlog_.writes_done_ops) { - call_.PerformOps(&writes_done_ops_); - } - call_.PerformOps(&finish_ops_); - // The last thing in this critical section is to set started_ so that it - // can be used lock-free as well. - started_.store(true, std::memory_order_release); - } - // MaybeFinish outside the lock to make sure that destruction of this object - // doesn't take place while holding the lock (which would cause the lock to - // be released after destruction) - this->MaybeFinish(/*from_reaction=*/false); - } - - void Write(const Request* msg, grpc::WriteOptions options) - ABSL_LOCKS_EXCLUDED(start_mu_) override { - if (GPR_UNLIKELY(options.is_last_message())) { - options.set_buffer_hint(); - write_ops_.ClientSendClose(); - } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - - if (GPR_UNLIKELY(corked_write_needed_)) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.write_ops = true; - return; - } - } - call_.PerformOps(&write_ops_); - } - - void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override { - writes_done_ops_.ClientSendClose(); - writes_done_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnWritesDoneDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &writes_done_ops_, /*can_inline=*/false); - writes_done_ops_.set_core_cq_tag(&writes_done_tag_); - callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - - if (GPR_UNLIKELY(corked_write_needed_)) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.writes_done_ops = true; - return; - } - } - call_.PerformOps(&writes_done_ops_); - } - - void AddHold(int holds) override { - callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); - } - void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } - - private: - friend class ClientCallbackWriterFactory; - - template - ClientCallbackWriterImpl(grpc::internal::Call call, - grpc::ClientContext* context, Response* response, - ClientWriteReactor* reactor) - : context_(context), - call_(call), - reactor_(reactor), - start_corked_(context_->initial_metadata_corked_), - corked_write_needed_(start_corked_) { - this->BindReactor(reactor); - - // Set up the unchanging parts of the start and write tags and ops. - start_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone( - ok && !reactor_->InternalTrailersOnly(call_.call())); - MaybeFinish(/*from_reaction=*/true); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - - write_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(/*from_reaction=*/true); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - // Also set up the Finish tag and op set. - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - finish_tag_.Set( - call_.call(), - [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, - &finish_ops_, - /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - } - - // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackWriterImpl(); - grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - - grpc::ClientContext* const context_; - grpc::internal::Call call_; - ClientWriteReactor* const reactor_; - - grpc::internal::CallOpSet - start_ops_; - grpc::internal::CallbackWithSuccessTag start_tag_; - const bool start_corked_; - bool corked_write_needed_; // no lock needed since only accessed in - // Write/WritesDone which cannot be concurrent - - grpc::internal::CallOpSet - finish_ops_; - grpc::internal::CallbackWithSuccessTag finish_tag_; - grpc::Status finish_status_; - - grpc::internal::CallOpSet - write_ops_; - grpc::internal::CallbackWithSuccessTag write_tag_; - - grpc::internal::CallOpSet - writes_done_ops_; - grpc::internal::CallbackWithSuccessTag writes_done_tag_; - - struct StartCallBacklog { - bool write_ops = false; - bool writes_done_ops = false; - }; - StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); - - // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish - std::atomic callbacks_outstanding_{3}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; -}; - -template -class ClientCallbackWriterFactory { - public: - template - static void Create(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, Response* response, - ClientWriteReactor* reactor) { - grpc::internal::Call call = - channel->CreateCall(method, context, channel->CallbackCQ()); - - grpc::g_core_codegen_interface->grpc_call_ref(call.call()); - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientCallbackWriterImpl))) - ClientCallbackWriterImpl(call, context, response, reactor); - } -}; - -class ClientCallbackUnaryImpl final : public ClientCallbackUnary { - public: - // always allocated against a call arena, no memory free required - static void operator delete(void* /*ptr*/, std::size_t size) { - GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - - void StartCall() override { - // This call initiates two batches, each with a callback - // 1. Send initial metadata + write + writes done + recv initial metadata - // 2. Read message, recv trailing metadata - - start_tag_.Set( - call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone( - ok && !reactor_->InternalTrailersOnly(call_.call())); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - call_.PerformOps(&start_ops_); - - finish_tag_.Set( - call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_, - /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - } - - private: - friend class ClientCallbackUnaryFactory; - - template - ClientCallbackUnaryImpl(grpc::internal::Call call, - grpc::ClientContext* context, Request* request, - Response* response, ClientUnaryReactor* reactor) - : context_(context), call_(call), reactor_(reactor) { - this->BindReactor(reactor); - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); - start_ops_.ClientSendClose(); - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - } - - // In the unary case, MaybeFinish is only ever invoked from a - // library-initiated reaction, so it will just directly call OnDone if this is - // the last reaction for this RPC. - void MaybeFinish() { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackUnaryImpl(); - grpc::g_core_codegen_interface->grpc_call_unref(call); - reactor->OnDone(s); - } - } - - grpc::ClientContext* const context_; - grpc::internal::Call call_; - ClientUnaryReactor* const reactor_; - - grpc::internal::CallOpSet - start_ops_; - grpc::internal::CallbackWithSuccessTag start_tag_; - - grpc::internal::CallOpSet - finish_ops_; - grpc::internal::CallbackWithSuccessTag finish_tag_; - grpc::Status finish_status_; - - // This call will have 2 callbacks: start and finish - std::atomic callbacks_outstanding_{2}; -}; - -class ClientCallbackUnaryFactory { - public: - template - static void Create(grpc::ChannelInterface* channel, - const grpc::internal::RpcMethod& method, - grpc::ClientContext* context, const Request* request, - Response* response, ClientUnaryReactor* reactor) { - grpc::internal::Call call = - channel->CreateCall(method, context, channel->CallbackCQ()); - - grpc::g_core_codegen_interface->grpc_call_ref(call.call()); - - new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientCallbackUnaryImpl))) - ClientCallbackUnaryImpl(call, context, - static_cast(request), - static_cast(response), reactor); - } -}; - -} // namespace internal -} // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h index c15bca0dbe8..fecde3dc4ca 100644 --- a/include/grpcpp/support/client_callback.h +++ b/include/grpcpp/support/client_callback.h @@ -19,6 +19,1211 @@ #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H -#include // IWYU pragma: export +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace grpc { +class Channel; +class ClientContext; + +namespace internal { +class RpcMethod; + +/// Perform a callback-based unary call. May optionally specify the base +/// class of the Request and Response so that the internal calls and structures +/// below this may be based on those base classes and thus achieve code reuse +/// across different RPCs (e.g., for protobuf, MessageLite would be a base +/// class). +/// TODO(vjpai): Combine as much as possible with the blocking unary call code +template +void CallbackUnaryCall(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const InputMessage* request, OutputMessage* result, + std::function on_completion) { + static_assert(std::is_base_of::value, + "Invalid input message specification"); + static_assert(std::is_base_of::value, + "Invalid output message specification"); + CallbackUnaryCallImpl x( + channel, method, context, request, result, on_completion); +} + +template +class CallbackUnaryCallImpl { + public: + CallbackUnaryCallImpl(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + const InputMessage* request, OutputMessage* result, + std::function on_completion) { + grpc::CompletionQueue* cq = channel->CallbackCQ(); + GPR_CODEGEN_ASSERT(cq != nullptr); + grpc::internal::Call call(channel->CreateCall(method, context, cq)); + + using FullCallOpSet = grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpSendMessage, + grpc::internal::CallOpRecvInitialMetadata, + grpc::internal::CallOpRecvMessage, + grpc::internal::CallOpClientSendClose, + grpc::internal::CallOpClientRecvStatus>; + + struct OpSetAndTag { + FullCallOpSet opset; + grpc::internal::CallbackWithStatusTag tag; + }; + const size_t alloc_sz = sizeof(OpSetAndTag); + auto* const alloced = static_cast( + grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(), + alloc_sz)); + auto* ops = new (&alloced->opset) FullCallOpSet; + auto* tag = new (&alloced->tag) + grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops); + + // TODO(vjpai): Unify code with sync API as much as possible + grpc::Status s = ops->SendMessagePtr(request); + if (!s.ok()) { + tag->force_run(s); + return; + } + ops->SendInitialMetadata(&context->send_initial_metadata_, + context->initial_metadata_flags()); + ops->RecvInitialMetadata(context); + ops->RecvMessage(result); + ops->AllowNoMessage(); + ops->ClientSendClose(); + ops->ClientRecvStatus(context, tag->status_ptr()); + ops->set_core_cq_tag(tag); + call.PerformOps(ops); + } +}; + +// Base class for public API classes. +class ClientReactor { + public: + virtual ~ClientReactor() = default; + + /// Called by the library when all operations associated with this RPC have + /// completed and all Holds have been removed. OnDone provides the RPC status + /// outcome for both successful and failed RPCs. If it is never called on an + /// RPC, it indicates an application-level problem (like failure to remove a + /// hold). + /// + /// \param[in] s The status outcome of this RPC + virtual void OnDone(const grpc::Status& /*s*/) = 0; + + /// InternalScheduleOnDone is not part of the API and is not meant to be + /// overridden. It is virtual to allow successful builds for certain bazel + /// build users that only want to depend on gRPC codegen headers and not the + /// full library (although this is not a generally-supported option). Although + /// the virtual call is slower than a direct call, this function is + /// heavyweight and the cost of the virtual call is not much in comparison. + /// This function may be removed or devirtualized in the future. + virtual void InternalScheduleOnDone(grpc::Status s); + + /// InternalTrailersOnly is not part of the API and is not meant to be + /// overridden. It is virtual to allow successful builds for certain bazel + /// build users that only want to depend on gRPC codegen headers and not the + /// full library (although this is not a generally-supported option). Although + /// the virtual call is slower than a direct call, this function is + /// heavyweight and the cost of the virtual call is not much in comparison. + /// This function may be removed or devirtualized in the future. + virtual bool InternalTrailersOnly(const grpc_call* call) const; +}; + +} // namespace internal + +// Forward declarations +template +class ClientBidiReactor; +template +class ClientReadReactor; +template +class ClientWriteReactor; +class ClientUnaryReactor; + +// NOTE: The streaming objects are not actually implemented in the public API. +// These interfaces are provided for mocking only. Typical applications +// will interact exclusively with the reactors that they define. +template +class ClientCallbackReaderWriter { + public: + virtual ~ClientCallbackReaderWriter() {} + virtual void StartCall() = 0; + virtual void Write(const Request* req, grpc::WriteOptions options) = 0; + virtual void WritesDone() = 0; + virtual void Read(Response* resp) = 0; + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientBidiReactor* reactor) { + reactor->BindStream(this); + } +}; + +template +class ClientCallbackReader { + public: + virtual ~ClientCallbackReader() {} + virtual void StartCall() = 0; + virtual void Read(Response* resp) = 0; + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientReadReactor* reactor) { + reactor->BindReader(this); + } +}; + +template +class ClientCallbackWriter { + public: + virtual ~ClientCallbackWriter() {} + virtual void StartCall() = 0; + void Write(const Request* req) { Write(req, grpc::WriteOptions()); } + virtual void Write(const Request* req, grpc::WriteOptions options) = 0; + void WriteLast(const Request* req, grpc::WriteOptions options) { + Write(req, options.set_last_message()); + } + virtual void WritesDone() = 0; + + virtual void AddHold(int holds) = 0; + virtual void RemoveHold() = 0; + + protected: + void BindReactor(ClientWriteReactor* reactor) { + reactor->BindWriter(this); + } +}; + +class ClientCallbackUnary { + public: + virtual ~ClientCallbackUnary() {} + virtual void StartCall() = 0; + + protected: + void BindReactor(ClientUnaryReactor* reactor); +}; + +// The following classes are the reactor interfaces that are to be implemented +// by the user. They are passed in to the library as an argument to a call on a +// stub (either a codegen-ed call or a generic call). The streaming RPC is +// activated by calling StartCall, possibly after initiating StartRead, +// StartWrite, or AddHold operations on the streaming object. Note that none of +// the classes are pure; all reactions have a default empty reaction so that the +// user class only needs to override those reactions that it cares about. +// The reactor must be passed to the stub invocation before any of the below +// operations can be called and its reactions will be invoked by the library in +// response to the completion of various operations. Reactions must not include +// blocking operations (such as blocking I/O, starting synchronous RPCs, or +// waiting on condition variables). Reactions may be invoked concurrently, +// except that OnDone is called after all others (assuming proper API usage). +// The reactor may not be deleted until OnDone is called. + +/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. +template +class ClientBidiReactor : public internal::ClientReactor { + public: + /// Activate the RPC and initiate any reads or writes that have been Start'ed + /// before this call. All streaming RPCs issued by the client MUST have + /// StartCall invoked on them (even if they are canceled) as this call is the + /// activation of their lifecycle. + void StartCall() { stream_->StartCall(); } + + /// Initiate a read operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[out] resp Where to eventually store the read message. Valid when + /// the library calls OnReadDone + void StartRead(Response* resp) { stream_->Read(resp); } + + /// Initiate a write operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } + + /// Initiate/post a write operation with specified options. + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWrite(const Request* req, grpc::WriteOptions options) { + stream_->Write(req, options); + } + + /// Initiate/post a write operation with specified options and an indication + /// that this is the last write (like StartWrite and StartWritesDone, merged). + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. + /// + /// \param[in] req The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWriteLast(const Request* req, grpc::WriteOptions options) { + StartWrite(req, options.set_last_message()); + } + + /// Indicate that the RPC will have no more write operations. This can only be + /// issued once for a given RPC. This is not required or allowed if + /// StartWriteLast is used since that already has the same implication. + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. + void StartWritesDone() { stream_->WritesDone(); } + + /// Holds are needed if (and only if) this stream has operations that take + /// place on it after StartCall but from outside one of the reactions + /// (OnReadDone, etc). This is _not_ a common use of the streaming API. + /// + /// Holds must be added before calling StartCall. If a stream still has a hold + /// in place, its resources will not be destroyed even if the status has + /// already come in from the wire and there are currently no active callbacks + /// outstanding. Similarly, the stream will not call OnDone if there are still + /// holds on it. + /// + /// For example, if a StartRead or StartWrite operation is going to be + /// initiated from elsewhere in the application, the application should call + /// AddHold or AddMultipleHolds before StartCall. If there is going to be, + /// for example, a read-flow and a write-flow taking place outside the + /// reactions, then call AddMultipleHolds(2) before StartCall. When the + /// application knows that it won't issue any more read operations (such as + /// when a read comes back as not ok), it should issue a RemoveHold(). It + /// should also call RemoveHold() again after it does StartWriteLast or + /// StartWritesDone that indicates that there will be no more write ops. + /// The number of RemoveHold calls must match the total number of AddHold + /// calls plus the number of holds added by AddMultipleHolds. + /// The argument to AddMultipleHolds must be positive. + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_CODEGEN_DEBUG_ASSERT(holds > 0); + stream_->AddHold(holds); + } + void RemoveHold() { stream_->RemoveHold(); } + + /// Notifies the application that all operations associated with this RPC + /// have completed and all Holds have been removed. OnDone provides the RPC + /// status outcome for both successful and failed RPCs and will be called in + /// all cases. If it is not called, it indicates an application-level problem + /// (like failure to remove a hold). + /// + /// \param[in] s The status outcome of this RPC + void OnDone(const grpc::Status& /*s*/) override {} + + /// Notifies the application that a read of initial metadata from the + /// server is done. If the application chooses not to implement this method, + /// it can assume that the initial metadata has been read before the first + /// call of OnReadDone or OnDone. + /// + /// \param[in] ok Was the initial metadata read successfully? If false, no + /// new read/write operation will succeed, and any further + /// Start* operations should not be called. + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + virtual void OnReadDone(bool /*ok*/) {} + + /// Notifies the application that a StartWrite or StartWriteLast operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + virtual void OnWriteDone(bool /*ok*/) {} + + /// Notifies the application that a StartWritesDone operation completed. Note + /// that this is only used on explicit StartWritesDone operations and not for + /// those that are implicitly invoked as part of a StartWriteLast. + /// + /// \param[in] ok Was it successful? If false, the application will later see + /// the failure reflected as a bad status in OnDone and no + /// further Start* should be called. + virtual void OnWritesDoneDone(bool /*ok*/) {} + + private: + friend class ClientCallbackReaderWriter; + void BindStream(ClientCallbackReaderWriter* stream) { + stream_ = stream; + } + ClientCallbackReaderWriter* stream_; +}; + +/// \a ClientReadReactor is the interface for a server-streaming RPC. +/// All public methods behave as in ClientBidiReactor. +template +class ClientReadReactor : public internal::ClientReactor { + public: + void StartCall() { reader_->StartCall(); } + void StartRead(Response* resp) { reader_->Read(resp); } + + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_CODEGEN_DEBUG_ASSERT(holds > 0); + reader_->AddHold(holds); + } + void RemoveHold() { reader_->RemoveHold(); } + + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + virtual void OnReadDone(bool /*ok*/) {} + + private: + friend class ClientCallbackReader; + void BindReader(ClientCallbackReader* reader) { reader_ = reader; } + ClientCallbackReader* reader_; +}; + +/// \a ClientWriteReactor is the interface for a client-streaming RPC. +/// All public methods behave as in ClientBidiReactor. +template +class ClientWriteReactor : public internal::ClientReactor { + public: + void StartCall() { writer_->StartCall(); } + void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); } + void StartWrite(const Request* req, grpc::WriteOptions options) { + writer_->Write(req, options); + } + void StartWriteLast(const Request* req, grpc::WriteOptions options) { + StartWrite(req, options.set_last_message()); + } + void StartWritesDone() { writer_->WritesDone(); } + + void AddHold() { AddMultipleHolds(1); } + void AddMultipleHolds(int holds) { + GPR_CODEGEN_DEBUG_ASSERT(holds > 0); + writer_->AddHold(holds); + } + void RemoveHold() { writer_->RemoveHold(); } + + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + virtual void OnWriteDone(bool /*ok*/) {} + virtual void OnWritesDoneDone(bool /*ok*/) {} + + private: + friend class ClientCallbackWriter; + void BindWriter(ClientCallbackWriter* writer) { writer_ = writer; } + + ClientCallbackWriter* writer_; +}; + +/// \a ClientUnaryReactor is a reactor-style interface for a unary RPC. +/// This is _not_ a common way of invoking a unary RPC. In practice, this +/// option should be used only if the unary RPC wants to receive initial +/// metadata without waiting for the response to complete. Most deployments of +/// RPC systems do not use this option, but it is needed for generality. +/// All public methods behave as in ClientBidiReactor. +/// StartCall is included for consistency with the other reactor flavors: even +/// though there are no StartRead or StartWrite operations to queue before the +/// call (that is part of the unary call itself) and there is no reactor object +/// being created as a result of this call, we keep a consistent 2-phase +/// initiation API among all the reactor flavors. +class ClientUnaryReactor : public internal::ClientReactor { + public: + void StartCall() { call_->StartCall(); } + void OnDone(const grpc::Status& /*s*/) override {} + virtual void OnReadInitialMetadataDone(bool /*ok*/) {} + + private: + friend class ClientCallbackUnary; + void BindCall(ClientCallbackUnary* call) { call_ = call; } + ClientCallbackUnary* call_; +}; + +// Define function out-of-line from class to avoid forward declaration issue +inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) { + reactor->BindCall(this); +} + +namespace internal { + +// Forward declare factory classes for friendship +template +class ClientCallbackReaderWriterFactory; +template +class ClientCallbackReaderFactory; +template +class ClientCallbackWriterFactory; + +template +class ClientCallbackReaderWriterImpl + : public ClientCallbackReaderWriter { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any read backlog + // 3. Any write backlog + // 4. Recv trailing metadata (unless corked) + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + + call_.PerformOps(&start_ops_); + + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); + } + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(/*from_reaction=*/false); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } + } + call_.PerformOps(&read_ops_); + } + + void Write(const Request* msg, grpc::WriteOptions options) + ABSL_LOCKS_EXCLUDED(start_mu_) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } + } + call_.PerformOps(&write_ops_); + } + void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override { + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } + } + call_.PerformOps(&writes_done_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackReaderWriterFactory; + + ClientCallbackReaderWriterImpl(grpc::internal::Call call, + grpc::ClientContext* context, + ClientBidiReactor* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { + this->BindReactor(reactor); + + // Set up the unchanging parts of the start, read, and write tags and ops. + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + + // Also set up the Finish tag and op set. + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + } + + // MaybeFinish can be called from reactions or from user-initiated operations + // like StartCall or RemoveHold. If this is the last operation or hold on this + // object, it will invoke the OnDone reaction. If MaybeFinish was called from + // a reaction, it can call OnDone directly. If not, it would need to schedule + // OnDone onto an executor thread to avoid the possibility of deadlocking with + // any locks in the user code that invoked it. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderWriterImpl(); + grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientBidiReactor* const reactor_; + + grpc::internal::CallOpSet + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent + + grpc::internal::CallOpSet finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet + write_ops_; + grpc::internal::CallbackWithSuccessTag write_tag_; + + grpc::internal::CallOpSet + writes_done_ops_; + grpc::internal::CallbackWithSuccessTag writes_done_tag_; + + grpc::internal::CallOpSet> + read_ops_; + grpc::internal::CallbackWithSuccessTag read_tag_; + + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + bool read_ops = false; + }; + StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template +class ClientCallbackReaderWriterFactory { + public: + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, + ClientBidiReactor* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc::g_core_codegen_interface->grpc_call_ref(call.call()); + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderWriterImpl))) + ClientCallbackReaderWriterImpl(call, context, + reactor); + } +}; + +template +class ClientCallbackReaderImpl : public ClientCallbackReader { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall() override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any backlog + // 3. Recv trailing metadata + + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + // Also set up the read tag so it doesn't have to be set up each time + read_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + + { + grpc::internal::MutexLock lock(&start_mu_); + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + started_.store(true, std::memory_order_release); + } + + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } + } + call_.PerformOps(&read_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackReaderFactory; + + template + ClientCallbackReaderImpl(grpc::internal::Call call, + grpc::ClientContext* context, Request* request, + ClientReadReactor* reactor) + : context_(context), call_(call), reactor_(reactor) { + this->BindReactor(reactor); + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); + start_ops_.ClientSendClose(); + } + + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderImpl(); + grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientReadReactor* const reactor_; + + grpc::internal::CallOpSet + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + + grpc::internal::CallOpSet finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet> + read_ops_; + grpc::internal::CallbackWithSuccessTag read_tag_; + + struct StartCallBacklog { + bool read_ops = false; + }; + StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); + + // Minimum of 2 callbacks to pre-register for start and finish + std::atomic callbacks_outstanding_{2}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template +class ClientCallbackReaderFactory { + public: + template + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, const Request* request, + ClientReadReactor* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc::g_core_codegen_interface->grpc_call_ref(call.call()); + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderImpl))) + ClientCallbackReaderImpl(call, context, request, reactor); + } +}; + +template +class ClientCallbackWriterImpl : public ClientCallbackWriter { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any backlog + // 3. Recv trailing metadata + + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + call_.PerformOps(&start_ops_); + + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); + } + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(/*from_reaction=*/false); + } + + void Write(const Request* msg, grpc::WriteOptions options) + ABSL_LOCKS_EXCLUDED(start_mu_) override { + if (GPR_UNLIKELY(options.is_last_message())) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } + } + call_.PerformOps(&write_ops_); + } + + void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override { + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &writes_done_ops_, /*can_inline=*/false); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); + + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } + } + call_.PerformOps(&writes_done_ops_); + } + + void AddHold(int holds) override { + callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); + } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } + + private: + friend class ClientCallbackWriterFactory; + + template + ClientCallbackWriterImpl(grpc::internal::Call call, + grpc::ClientContext* context, Response* response, + ClientWriteReactor* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { + this->BindReactor(reactor); + + // Set up the unchanging parts of the start and write tags and ops. + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(/*from_reaction=*/true); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(/*from_reaction=*/true); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + // Also set up the Finish tag and op set. + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + } + + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackWriterImpl(); + grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientWriteReactor* const reactor_; + + grpc::internal::CallOpSet + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent + + grpc::internal::CallOpSet + finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + grpc::internal::CallOpSet + write_ops_; + grpc::internal::CallbackWithSuccessTag write_tag_; + + grpc::internal::CallOpSet + writes_done_ops_; + grpc::internal::CallbackWithSuccessTag writes_done_tag_; + + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + }; + StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_); + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; +}; + +template +class ClientCallbackWriterFactory { + public: + template + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, Response* response, + ClientWriteReactor* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc::g_core_codegen_interface->grpc_call_ref(call.call()); + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackWriterImpl))) + ClientCallbackWriterImpl(call, context, response, reactor); + } +}; + +class ClientCallbackUnaryImpl final : public ClientCallbackUnary { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* /*ptr*/, std::size_t size) { + GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } + + void StartCall() override { + // This call initiates two batches, each with a callback + // 1. Send initial metadata + write + writes done + recv initial metadata + // 2. Read message, recv trailing metadata + + start_tag_.Set( + call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone( + ok && !reactor_->InternalTrailersOnly(call_.call())); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + finish_tag_.Set( + call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + private: + friend class ClientCallbackUnaryFactory; + + template + ClientCallbackUnaryImpl(grpc::internal::Call call, + grpc::ClientContext* context, Request* request, + Response* response, ClientUnaryReactor* reactor) + : context_(context), call_(call), reactor_(reactor) { + this->BindReactor(reactor); + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); + start_ops_.ClientSendClose(); + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + } + + // In the unary case, MaybeFinish is only ever invoked from a + // library-initiated reaction, so it will just directly call OnDone if this is + // the last reaction for this RPC. + void MaybeFinish() { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackUnaryImpl(); + grpc::g_core_codegen_interface->grpc_call_unref(call); + reactor->OnDone(s); + } + } + + grpc::ClientContext* const context_; + grpc::internal::Call call_; + ClientUnaryReactor* const reactor_; + + grpc::internal::CallOpSet + start_ops_; + grpc::internal::CallbackWithSuccessTag start_tag_; + + grpc::internal::CallOpSet + finish_ops_; + grpc::internal::CallbackWithSuccessTag finish_tag_; + grpc::Status finish_status_; + + // This call will have 2 callbacks: start and finish + std::atomic callbacks_outstanding_{2}; +}; + +class ClientCallbackUnaryFactory { + public: + template + static void Create(grpc::ChannelInterface* channel, + const grpc::internal::RpcMethod& method, + grpc::ClientContext* context, const Request* request, + Response* response, ClientUnaryReactor* reactor) { + grpc::internal::Call call = + channel->CreateCall(method, context, channel->CallbackCQ()); + + grpc::g_core_codegen_interface->grpc_call_ref(call.call()); + + new (grpc::g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackUnaryImpl))) + ClientCallbackUnaryImpl(call, context, + static_cast(request), + static_cast(response), reactor); + } +}; + +} // namespace internal +} // namespace grpc #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 00eb3c82f79..3ea5ee1c85c 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -140,7 +140,7 @@ std::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/generic/async_generic_service.h", "grpcpp/support/async_stream.h", "grpcpp/support/async_unary_call.h", - "grpcpp/impl/codegen/client_callback.h", + "grpcpp/support/client_callback.h", "grpcpp/client_context.h", "grpcpp/impl/codegen/completion_queue.h", "grpcpp/impl/codegen/message_allocator.h", @@ -1652,7 +1652,7 @@ std::string GetSourceIncludes(grpc_generator::File* file, "grpcpp/support/async_unary_call.h", "grpcpp/impl/codegen/channel_interface.h", "grpcpp/impl/codegen/client_unary_call.h", - "grpcpp/impl/codegen/client_callback.h", + "grpcpp/support/client_callback.h", "grpcpp/impl/codegen/message_allocator.h", "grpcpp/impl/codegen/method_handler.h", "grpcpp/impl/codegen/rpc_service_method.h", diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 2927c4744fb..ff836ab9f8d 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include #include #include