/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H #include #include #include #include #include #include #include #include #include #include namespace grpc_impl { // Declare base class of all reactors as internal namespace internal { // Forward declarations template class CallbackUnaryHandler; template class CallbackClientStreamingHandler; template class CallbackServerStreamingHandler; template class CallbackBidiHandler; class ServerReactor { public: virtual ~ServerReactor() = default; virtual void OnDone() = 0; virtual void OnCancel() = 0; // The following is not API. It is for internal use only and specifies whether // all reactions of this Reactor can be run without an extra executor // scheduling. This should only be used for internally-defined reactors with // trivial reactions. virtual bool InternalInlineable() { return false; } private: template friend class CallbackUnaryHandler; template friend class CallbackClientStreamingHandler; template friend class CallbackServerStreamingHandler; template friend class CallbackBidiHandler; }; /// The base class of ServerCallbackUnary etc. class ServerCallbackCall { public: virtual ~ServerCallbackCall() {} // This object is responsible for tracking when it is safe to call OnDone and // OnCancel. OnDone should not be called until the method handler is complete, // Finish has been called, the ServerContext CompletionOp (which tracks // cancellation or successful completion) has completed, and all outstanding // Read/Write actions have seen their reactions. OnCancel should not be called // until after the method handler is done and the RPC has completed with a // cancellation. This is tracked by counting how many of these conditions have // been met and calling OnCancel when none remain unmet. // Public versions of MaybeDone: one where we don't know the reactor in // advance (used for the ServerContext CompletionOp), and one for where we // know the inlineability of the OnDone reaction. You should set the inline // flag to true if either the Reactor is InternalInlineable() or if this // callback is already being forced to run dispatched to an executor // (typically because it contains additional work than just the MaybeDone). void MaybeDone() { if (GPR_UNLIKELY(Unref() == 1)) { ScheduleOnDone(reactor()->InternalInlineable()); } } void MaybeDone(bool inline_ondone) { if (GPR_UNLIKELY(Unref() == 1)) { ScheduleOnDone(inline_ondone); } } // Fast version called with known reactor passed in, used from derived // classes, typically in non-cancel case void MaybeCallOnCancel(ServerReactor* reactor) { if (GPR_UNLIKELY(UnblockCancellation())) { CallOnCancel(reactor); } } // Slower version called from object that doesn't know the reactor a priori // (such as the ServerContext CompletionOp which is formed before the // reactor). This is used in cancel cases only, so it's ok to be slower and // invoke a virtual function. void MaybeCallOnCancel() { if (GPR_UNLIKELY(UnblockCancellation())) { CallOnCancel(reactor()); } } protected: /// Increases the reference count void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } private: virtual ServerReactor* reactor() = 0; // CallOnDone performs the work required at completion of the RPC: invoking // the OnDone function and doing all necessary cleanup. This function is only // ever invoked on a fully-Unref'fed ServerCallbackCall. virtual void CallOnDone() = 0; // If the OnDone reaction is inlineable, execute it inline. Otherwise send it // to an executor. void ScheduleOnDone(bool inline_ondone); // If the OnCancel reaction is inlineable, execute it inline. Otherwise send // it to an executor. void CallOnCancel(ServerReactor* reactor); // Implement the cancellation constraint counter. Return true if OnCancel // should be called, false otherwise. bool UnblockCancellation() { return on_cancel_conditions_remaining_.fetch_sub( 1, std::memory_order_acq_rel) == 1; } /// Decreases the reference count and returns the previous value int Unref() { return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); } std::atomic_int on_cancel_conditions_remaining_{2}; std::atomic_int callbacks_outstanding_{ 3}; // reserve for start, Finish, and CompletionOp }; template class DefaultMessageHolder : public ::grpc::experimental::MessageHolder { public: DefaultMessageHolder() { this->set_request(&request_obj_); this->set_response(&response_obj_); } void Release() override { // the object is allocated in the call arena. this->~DefaultMessageHolder(); } private: Request request_obj_; Response response_obj_; }; } // namespace internal // Forward declarations class ServerUnaryReactor; template class ServerReadReactor; template class ServerWriteReactor; template class ServerBidiReactor; // NOTE: The actual call/stream object classes are provided as API only to // support mocking. There are no implementations of these class interfaces in // the API. class ServerCallbackUnary : public internal::ServerCallbackCall { public: virtual ~ServerCallbackUnary() {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; protected: // Use a template rather than explicitly specifying ServerUnaryReactor to // delay binding and avoid a circular forward declaration issue template void BindReactor(Reactor* reactor) { reactor->InternalBindCall(this); } }; template class ServerCallbackReader : public internal::ServerCallbackCall { public: virtual ~ServerCallbackReader() {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; protected: void BindReactor(ServerReadReactor* reactor) { reactor->InternalBindReader(this); } }; template class ServerCallbackWriter : public internal::ServerCallbackCall { public: virtual ~ServerCallbackWriter() {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, ::grpc::Status s) = 0; protected: void BindReactor(ServerWriteReactor* reactor) { reactor->InternalBindWriter(this); } }; template class ServerCallbackReaderWriter : public internal::ServerCallbackCall { public: virtual ~ServerCallbackReaderWriter() {} virtual void Finish(::grpc::Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, ::grpc::Status s) = 0; protected: void BindReactor(ServerBidiReactor* reactor) { reactor->InternalBindStream(this); } }; // The following classes are the reactor interfaces that are to be implemented // by the user, returned as the output parameter of the method handler for a // callback method. Note that none of the classes are pure; all reactions have a // default empty reaction so that the user class only needs to override those // classes that it cares about. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. template class ServerBidiReactor : public internal::ServerReactor { public: // NOTE: Initializing stream_ as a constructor initializer rather than a // default initializer because gcc-4.x requires a copy constructor for // default initializing a templated member, which isn't ok for atomic. // TODO(vjpai): Switch to default constructor and default initializer when // gcc-4.x is no longer supported ServerBidiReactor() : stream_(nullptr) {} ~ServerBidiReactor() = default; /// Send any initial metadata stored in the RPC context. If not invoked, /// any initial metadata will be passed along with the first Write or the /// Finish (if there are no writes). void StartSendInitialMetadata() { ServerCallbackReaderWriter* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { grpc::internal::MutexLock l(&stream_mu_); stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.send_initial_metadata_wanted = true; return; } } stream->SendInitialMetadata(); } /// Initiate a read operation. /// /// \param[out] req Where to eventually store the read message. Valid when /// the library calls OnReadDone void StartRead(Request* req) { ServerCallbackReaderWriter* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { grpc::internal::MutexLock l(&stream_mu_); stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.read_wanted = req; return; } } stream->Read(req); } /// Initiate a write operation. /// /// \param[in] resp The message to be written. The library does not take /// ownership but the caller must ensure that the message is /// not deleted or modified until OnWriteDone is called. void StartWrite(const Response* resp) { StartWrite(resp, ::grpc::WriteOptions()); } /// Initiate a write operation with specified options. /// /// \param[in] resp The message to be written. The library does not take /// ownership but the caller must ensure that the message is /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message void StartWrite(const Response* resp, ::grpc::WriteOptions options) { ServerCallbackReaderWriter* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { grpc::internal::MutexLock l(&stream_mu_); stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.write_wanted = resp; backlog_.write_options_wanted = std::move(options); return; } } stream->Write(resp, std::move(options)); } /// Initiate a write operation with specified options and final RPC Status, /// which also causes any trailing metadata for this RPC to be sent out. /// StartWriteAndFinish is like merging StartWriteLast and Finish into a /// single step. A key difference, though, is that this operation doesn't have /// an OnWriteDone reaction - it is considered complete only when OnDone is /// available. An RPC can either have StartWriteAndFinish or Finish, but not /// both. /// /// \param[in] resp The message to be written. The library does not take /// ownership but the caller must ensure that the message is /// not deleted or modified until OnDone is called. /// \param[in] options The WriteOptions to use for writing this message /// \param[in] s The status outcome of this RPC void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, ::grpc::Status s) { ServerCallbackReaderWriter* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { grpc::internal::MutexLock l(&stream_mu_); stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.write_and_finish_wanted = true; backlog_.write_wanted = resp; backlog_.write_options_wanted = std::move(options); backlog_.status_wanted = std::move(s); return; } } stream->WriteAndFinish(resp, std::move(options), std::move(s)); } /// Inform system of a planned write operation with specified options, but /// allow the library to schedule the actual write coalesced with the writing /// of trailing metadata (which takes place on a Finish call). /// /// \param[in] resp The message to be written. The library does not take /// ownership but the caller must ensure that the message is /// not deleted or modified until OnWriteDone is called. /// \param[in] options The WriteOptions to use for writing this message void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { StartWrite(resp, std::move(options.set_last_message())); } /// Indicate that the stream is to be finished and the trailing metadata and /// RPC status are to be sent. Every RPC MUST be finished using either Finish /// or StartWriteAndFinish (but not both), even if the RPC is already /// cancelled. /// /// \param[in] s The status outcome of this RPC void Finish(::grpc::Status s) { ServerCallbackReaderWriter* stream = stream_.load(std::memory_order_acquire); if (stream == nullptr) { grpc::internal::MutexLock l(&stream_mu_); stream = stream_.load(std::memory_order_relaxed); if (stream == nullptr) { backlog_.finish_wanted = true; backlog_.status_wanted = std::move(s); return; } } stream->Finish(std::move(s)); } /// Notifies the application that an explicit StartSendInitialMetadata /// operation completed. Not used when the sending of initial metadata /// piggybacks onto the first write. /// /// \param[in] ok Was it successful? If false, no further write-side operation /// will succeed. virtual void OnSendInitialMetadataDone(bool /*ok*/) {} /// Notifies the application that a StartRead operation completed. /// /// \param[in] ok Was it successful? If false, no further read-side operation /// will succeed. virtual void OnReadDone(bool /*ok*/) {} /// Notifies the application that a StartWrite (or StartWriteLast) operation /// completed. /// /// \param[in] ok Was it successful? If false, no further write-side operation /// will succeed. virtual void OnWriteDone(bool /*ok*/) {} /// Notifies the application that all operations associated with this RPC /// have completed. This is an override (from the internal base class) but /// still abstract, so derived classes MUST override it to be instantiated. void OnDone() override = 0; /// Notifies the application that this RPC has been cancelled. This is an /// override (from the internal base class) but not final, so derived classes /// should override it if they want to take action. void OnCancel() override {} private: friend class ServerCallbackReaderWriter; // May be overridden by internal implementation details. This is not a public // customization point. virtual void InternalBindStream( ServerCallbackReaderWriter* stream) { // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that // has stream, then std::get out of that after the lock. // Do likewise with the remaining InternalBind* functions as well. grpc::internal::ReleasableMutexLock l(&stream_mu_); PreBindBacklog ops(std::move(backlog_)); stream_.store(stream, std::memory_order_release); l.Unlock(); if (ops.send_initial_metadata_wanted) { stream->SendInitialMetadata(); } if (ops.read_wanted != nullptr) { stream->Read(ops.read_wanted); } if (ops.write_and_finish_wanted) { stream->WriteAndFinish(ops.write_wanted, std::move(ops.write_options_wanted), std::move(ops.status_wanted)); } else { if (ops.write_wanted != nullptr) { stream->Write(ops.write_wanted, std::move(ops.write_options_wanted)); } if (ops.finish_wanted) { stream->Finish(std::move(ops.status_wanted)); } } } grpc::internal::Mutex stream_mu_; // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant // once C++17 or ABSL is supported since stream and backlog are // mutually exclusive in this class. Do likewise with the // remaining reactor classes and their backlogs as well. std::atomic*> stream_{nullptr}; struct PreBindBacklog { bool send_initial_metadata_wanted = false; bool write_and_finish_wanted = false; bool finish_wanted = false; Request* read_wanted = nullptr; const Response* write_wanted = nullptr; ::grpc::WriteOptions write_options_wanted; ::grpc::Status status_wanted; }; PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */; }; /// \a ServerReadReactor is the interface for a client-streaming RPC. template class ServerReadReactor : public internal::ServerReactor { public: ServerReadReactor() : reader_(nullptr) {} ~ServerReadReactor() = default; /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { ServerCallbackReader* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { grpc::internal::MutexLock l(&reader_mu_); reader = reader_.load(std::memory_order_relaxed); if (reader == nullptr) { backlog_.send_initial_metadata_wanted = true; return; } } reader->SendInitialMetadata(); } void StartRead(Request* req) { ServerCallbackReader* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { grpc::internal::MutexLock l(&reader_mu_); reader = reader_.load(std::memory_order_relaxed); if (reader == nullptr) { backlog_.read_wanted = req; return; } } reader->Read(req); } void Finish(::grpc::Status s) { ServerCallbackReader* reader = reader_.load(std::memory_order_acquire); if (reader == nullptr) { grpc::internal::MutexLock l(&reader_mu_); reader = reader_.load(std::memory_order_relaxed); if (reader == nullptr) { backlog_.finish_wanted = true; backlog_.status_wanted = std::move(s); return; } } reader->Finish(std::move(s)); } /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool /*ok*/) {} virtual void OnReadDone(bool /*ok*/) {} void OnDone() override = 0; void OnCancel() override {} private: friend class ServerCallbackReader; // May be overridden by internal implementation details. This is not a public // customization point. virtual void InternalBindReader(ServerCallbackReader* reader) { grpc::internal::ReleasableMutexLock l(&reader_mu_); PreBindBacklog ops(std::move(backlog_)); reader_.store(reader, std::memory_order_release); l.Unlock(); if (ops.send_initial_metadata_wanted) { reader->SendInitialMetadata(); } if (ops.read_wanted != nullptr) { reader->Read(ops.read_wanted); } if (ops.finish_wanted) { reader->Finish(std::move(ops.status_wanted)); } } grpc::internal::Mutex reader_mu_; std::atomic*> reader_{nullptr}; struct PreBindBacklog { bool send_initial_metadata_wanted = false; bool finish_wanted = false; Request* read_wanted = nullptr; ::grpc::Status status_wanted; }; PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */; }; /// \a ServerWriteReactor is the interface for a server-streaming RPC. template class ServerWriteReactor : public internal::ServerReactor { public: ServerWriteReactor() : writer_(nullptr) {} ~ServerWriteReactor() = default; /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { ServerCallbackWriter* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { grpc::internal::MutexLock l(&writer_mu_); writer = writer_.load(std::memory_order_relaxed); if (writer == nullptr) { backlog_.send_initial_metadata_wanted = true; return; } } writer->SendInitialMetadata(); } void StartWrite(const Response* resp) { StartWrite(resp, ::grpc::WriteOptions()); } void StartWrite(const Response* resp, ::grpc::WriteOptions options) { ServerCallbackWriter* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { grpc::internal::MutexLock l(&writer_mu_); writer = writer_.load(std::memory_order_relaxed); if (writer == nullptr) { backlog_.write_wanted = resp; backlog_.write_options_wanted = std::move(options); return; } } writer->Write(resp, std::move(options)); } void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, ::grpc::Status s) { ServerCallbackWriter* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { grpc::internal::MutexLock l(&writer_mu_); writer = writer_.load(std::memory_order_relaxed); if (writer == nullptr) { backlog_.write_and_finish_wanted = true; backlog_.write_wanted = resp; backlog_.write_options_wanted = std::move(options); backlog_.status_wanted = std::move(s); return; } } writer->WriteAndFinish(resp, std::move(options), std::move(s)); } void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { StartWrite(resp, std::move(options.set_last_message())); } void Finish(::grpc::Status s) { ServerCallbackWriter* writer = writer_.load(std::memory_order_acquire); if (writer == nullptr) { grpc::internal::MutexLock l(&writer_mu_); writer = writer_.load(std::memory_order_relaxed); if (writer == nullptr) { backlog_.finish_wanted = true; backlog_.status_wanted = std::move(s); return; } } writer->Finish(std::move(s)); } /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool /*ok*/) {} virtual void OnWriteDone(bool /*ok*/) {} void OnDone() override = 0; void OnCancel() override {} private: friend class ServerCallbackWriter; // May be overridden by internal implementation details. This is not a public // customization point. virtual void InternalBindWriter(ServerCallbackWriter* writer) { grpc::internal::ReleasableMutexLock l(&writer_mu_); PreBindBacklog ops(std::move(backlog_)); writer_.store(writer, std::memory_order_release); l.Unlock(); if (ops.send_initial_metadata_wanted) { writer->SendInitialMetadata(); } if (ops.write_and_finish_wanted) { writer->WriteAndFinish(ops.write_wanted, std::move(ops.write_options_wanted), std::move(ops.status_wanted)); } else { if (ops.write_wanted != nullptr) { writer->Write(ops.write_wanted, std::move(ops.write_options_wanted)); } if (ops.finish_wanted) { writer->Finish(std::move(ops.status_wanted)); } } } grpc::internal::Mutex writer_mu_; std::atomic*> writer_{nullptr}; struct PreBindBacklog { bool send_initial_metadata_wanted = false; bool write_and_finish_wanted = false; bool finish_wanted = false; const Response* write_wanted = nullptr; ::grpc::WriteOptions write_options_wanted; ::grpc::Status status_wanted; }; PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */; }; class ServerUnaryReactor : public internal::ServerReactor { public: ServerUnaryReactor() : call_(nullptr) {} ~ServerUnaryReactor() = default; /// StartSendInitialMetadata is exactly like ServerBidiReactor. void StartSendInitialMetadata() { ServerCallbackUnary* call = call_.load(std::memory_order_acquire); if (call == nullptr) { grpc::internal::MutexLock l(&call_mu_); call = call_.load(std::memory_order_relaxed); if (call == nullptr) { backlog_.send_initial_metadata_wanted = true; return; } } call->SendInitialMetadata(); } /// Finish is similar to ServerBidiReactor except for one detail. /// If the status is non-OK, any message will not be sent. Instead, /// the client will only receive the status and any trailing metadata. void Finish(::grpc::Status s) { ServerCallbackUnary* call = call_.load(std::memory_order_acquire); if (call == nullptr) { grpc::internal::MutexLock l(&call_mu_); call = call_.load(std::memory_order_relaxed); if (call == nullptr) { backlog_.finish_wanted = true; backlog_.status_wanted = std::move(s); return; } } call->Finish(std::move(s)); } /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool /*ok*/) {} void OnDone() override = 0; void OnCancel() override {} private: friend class ServerCallbackUnary; // May be overridden by internal implementation details. This is not a public // customization point. virtual void InternalBindCall(ServerCallbackUnary* call) { grpc::internal::ReleasableMutexLock l(&call_mu_); PreBindBacklog ops(std::move(backlog_)); call_.store(call, std::memory_order_release); l.Unlock(); if (ops.send_initial_metadata_wanted) { call->SendInitialMetadata(); } if (ops.finish_wanted) { call->Finish(std::move(ops.status_wanted)); } } grpc::internal::Mutex call_mu_; std::atomic call_{nullptr}; struct PreBindBacklog { bool send_initial_metadata_wanted = false; bool finish_wanted = false; ::grpc::Status status_wanted; }; PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */; }; namespace internal { template class FinishOnlyReactor : public Base { public: explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); } void OnDone() override { this->~FinishOnlyReactor(); } }; using UnimplementedUnaryReactor = FinishOnlyReactor; template using UnimplementedReadReactor = FinishOnlyReactor>; template using UnimplementedWriteReactor = FinishOnlyReactor>; template using UnimplementedBidiReactor = FinishOnlyReactor>; } // namespace internal } // namespace grpc_impl #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H