diff --git a/include/grpcpp/ext/orca_service.h b/include/grpcpp/ext/orca_service.h index 3acf4bf030f..b18b94908cb 100644 --- a/include/grpcpp/ext/orca_service.h +++ b/include/grpcpp/ext/orca_service.h @@ -23,10 +23,10 @@ #include "absl/time/time.h" #include "absl/types/optional.h" -#include #include #include #include +#include #include namespace grpc { diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index d292e7b0ca5..d33283ca1bf 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -18,782 +18,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H -// IWYU pragma: private, include +// IWYU pragma: private -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace grpc { - -// Declare base class of all reactors as internal -namespace internal { - -// Forward declarations -template -class CallbackUnaryHandler; -template -class CallbackClientStreamingHandler; -template -class CallbackServerStreamingHandler; -template -class CallbackBidiHandler; - -class ServerReactor { - public: - virtual ~ServerReactor() = default; - virtual void OnDone() = 0; - virtual void OnCancel() = 0; - - // The following is not API. It is for internal use only and specifies whether - // all reactions of this Reactor can be run without an extra executor - // scheduling. This should only be used for internally-defined reactors with - // trivial reactions. - virtual bool InternalInlineable() { return false; } - - private: - template - friend class CallbackUnaryHandler; - template - friend class CallbackClientStreamingHandler; - template - friend class CallbackServerStreamingHandler; - template - friend class CallbackBidiHandler; -}; - -/// The base class of ServerCallbackUnary etc. -class ServerCallbackCall { - public: - virtual ~ServerCallbackCall() {} - - // This object is responsible for tracking when it is safe to call OnDone and - // OnCancel. OnDone should not be called until the method handler is complete, - // Finish has been called, the ServerContext CompletionOp (which tracks - // cancellation or successful completion) has completed, and all outstanding - // Read/Write actions have seen their reactions. OnCancel should not be called - // until after the method handler is done and the RPC has completed with a - // cancellation. This is tracked by counting how many of these conditions have - // been met and calling OnCancel when none remain unmet. - - // Public versions of MaybeDone: one where we don't know the reactor in - // advance (used for the ServerContext CompletionOp), and one for where we - // know the inlineability of the OnDone reaction. You should set the inline - // flag to true if either the Reactor is InternalInlineable() or if this - // callback is already being forced to run dispatched to an executor - // (typically because it contains additional work than just the MaybeDone). - - void MaybeDone() { - if (GPR_UNLIKELY(Unref() == 1)) { - ScheduleOnDone(reactor()->InternalInlineable()); - } - } - - void MaybeDone(bool inline_ondone) { - if (GPR_UNLIKELY(Unref() == 1)) { - ScheduleOnDone(inline_ondone); - } - } - - // Fast version called with known reactor passed in, used from derived - // classes, typically in non-cancel case - void MaybeCallOnCancel(ServerReactor* reactor) { - if (GPR_UNLIKELY(UnblockCancellation())) { - CallOnCancel(reactor); - } - } - - // Slower version called from object that doesn't know the reactor a priori - // (such as the ServerContext CompletionOp which is formed before the - // reactor). This is used in cancel cases only, so it's ok to be slower and - // invoke a virtual function. - void MaybeCallOnCancel() { - if (GPR_UNLIKELY(UnblockCancellation())) { - CallOnCancel(reactor()); - } - } - - protected: - /// Increases the reference count - void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } - - private: - virtual ServerReactor* reactor() = 0; - - // CallOnDone performs the work required at completion of the RPC: invoking - // the OnDone function and doing all necessary cleanup. This function is only - // ever invoked on a fully-Unref'fed ServerCallbackCall. - virtual void CallOnDone() = 0; - - // If the OnDone reaction is inlineable, execute it inline. Otherwise send it - // to an executor. - void ScheduleOnDone(bool inline_ondone); - - // If the OnCancel reaction is inlineable, execute it inline. Otherwise send - // it to an executor. - void CallOnCancel(ServerReactor* reactor); - - // Implement the cancellation constraint counter. Return true if OnCancel - // should be called, false otherwise. - bool UnblockCancellation() { - return on_cancel_conditions_remaining_.fetch_sub( - 1, std::memory_order_acq_rel) == 1; - } - - /// Decreases the reference count and returns the previous value - int Unref() { - return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); - } - - std::atomic_int on_cancel_conditions_remaining_{2}; - std::atomic_int callbacks_outstanding_{ - 3}; // reserve for start, Finish, and CompletionOp -}; - -template -class DefaultMessageHolder : public MessageHolder { - public: - DefaultMessageHolder() { - this->set_request(&request_obj_); - this->set_response(&response_obj_); - } - void Release() override { - // the object is allocated in the call arena. - this->~DefaultMessageHolder(); - } - - private: - Request request_obj_; - Response response_obj_; -}; - -} // namespace internal - -// Forward declarations -class ServerUnaryReactor; -template -class ServerReadReactor; -template -class ServerWriteReactor; -template -class ServerBidiReactor; - -// NOTE: The actual call/stream object classes are provided as API only to -// support mocking. There are no implementations of these class interfaces in -// the API. -class ServerCallbackUnary : public internal::ServerCallbackCall { - public: - ~ServerCallbackUnary() override {} - virtual void Finish(grpc::Status s) = 0; - virtual void SendInitialMetadata() = 0; - - protected: - // Use a template rather than explicitly specifying ServerUnaryReactor to - // delay binding and avoid a circular forward declaration issue - template - void BindReactor(Reactor* reactor) { - reactor->InternalBindCall(this); - } -}; - -template -class ServerCallbackReader : public internal::ServerCallbackCall { - public: - ~ServerCallbackReader() override {} - virtual void Finish(grpc::Status s) = 0; - virtual void SendInitialMetadata() = 0; - virtual void Read(Request* msg) = 0; - - protected: - void BindReactor(ServerReadReactor* reactor) { - reactor->InternalBindReader(this); - } -}; - -template -class ServerCallbackWriter : public internal::ServerCallbackCall { - public: - ~ServerCallbackWriter() override {} - - virtual void Finish(grpc::Status s) = 0; - virtual void SendInitialMetadata() = 0; - virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; - virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, - grpc::Status s) = 0; - - protected: - void BindReactor(ServerWriteReactor* reactor) { - reactor->InternalBindWriter(this); - } -}; - -template -class ServerCallbackReaderWriter : public internal::ServerCallbackCall { - public: - ~ServerCallbackReaderWriter() override {} - - virtual void Finish(grpc::Status s) = 0; - virtual void SendInitialMetadata() = 0; - virtual void Read(Request* msg) = 0; - virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; - virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, - grpc::Status s) = 0; - - protected: - void BindReactor(ServerBidiReactor* reactor) { - reactor->InternalBindStream(this); - } -}; - -// The following classes are the reactor interfaces that are to be implemented -// by the user, returned as the output parameter of the method handler for a -// callback method. Note that none of the classes are pure; all reactions have a -// default empty reaction so that the user class only needs to override those -// reactions that it cares about. The reaction methods will be invoked by the -// library in response to the completion of various operations. Reactions must -// not include blocking operations (such as blocking I/O, starting synchronous -// RPCs, or waiting on condition variables). Reactions may be invoked -// concurrently, except that OnDone is called after all others (assuming proper -// API usage). The reactor may not be deleted until OnDone is called. - -/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. -template -class ServerBidiReactor : public internal::ServerReactor { - public: - // NOTE: Initializing stream_ as a constructor initializer rather than a - // default initializer because gcc-4.x requires a copy constructor for - // default initializing a templated member, which isn't ok for atomic. - // TODO(vjpai): Switch to default constructor and default initializer when - // gcc-4.x is no longer supported - ServerBidiReactor() : stream_(nullptr) {} - ~ServerBidiReactor() override = default; - - /// Send any initial metadata stored in the RPC context. If not invoked, - /// any initial metadata will be passed along with the first Write or the - /// Finish (if there are no writes). - void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) { - ServerCallbackReaderWriter* stream = - stream_.load(std::memory_order_acquire); - if (stream == nullptr) { - grpc::internal::MutexLock l(&stream_mu_); - stream = stream_.load(std::memory_order_relaxed); - if (stream == nullptr) { - backlog_.send_initial_metadata_wanted = true; - return; - } - } - stream->SendInitialMetadata(); - } - - /// Initiate a read operation. - /// - /// \param[out] req Where to eventually store the read message. Valid when - /// the library calls OnReadDone - void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) { - ServerCallbackReaderWriter* stream = - stream_.load(std::memory_order_acquire); - if (stream == nullptr) { - grpc::internal::MutexLock l(&stream_mu_); - stream = stream_.load(std::memory_order_relaxed); - if (stream == nullptr) { - backlog_.read_wanted = req; - return; - } - } - stream->Read(req); - } - - /// Initiate a write operation. - /// - /// \param[in] resp The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - void StartWrite(const Response* resp) { - StartWrite(resp, grpc::WriteOptions()); - } - - /// Initiate a write operation with specified options. - /// - /// \param[in] resp The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - /// \param[in] options The WriteOptions to use for writing this message - void StartWrite(const Response* resp, grpc::WriteOptions options) - ABSL_LOCKS_EXCLUDED(stream_mu_) { - ServerCallbackReaderWriter* stream = - stream_.load(std::memory_order_acquire); - if (stream == nullptr) { - grpc::internal::MutexLock l(&stream_mu_); - stream = stream_.load(std::memory_order_relaxed); - if (stream == nullptr) { - backlog_.write_wanted = resp; - backlog_.write_options_wanted = options; - return; - } - } - stream->Write(resp, options); - } - - /// Initiate a write operation with specified options and final RPC Status, - /// which also causes any trailing metadata for this RPC to be sent out. - /// StartWriteAndFinish is like merging StartWriteLast and Finish into a - /// single step. A key difference, though, is that this operation doesn't have - /// an OnWriteDone reaction - it is considered complete only when OnDone is - /// available. An RPC can either have StartWriteAndFinish or Finish, but not - /// both. - /// - /// \param[in] resp The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnDone is called. - /// \param[in] options The WriteOptions to use for writing this message - /// \param[in] s The status outcome of this RPC - void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, - grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { - ServerCallbackReaderWriter* stream = - stream_.load(std::memory_order_acquire); - if (stream == nullptr) { - grpc::internal::MutexLock l(&stream_mu_); - stream = stream_.load(std::memory_order_relaxed); - if (stream == nullptr) { - backlog_.write_and_finish_wanted = true; - backlog_.write_wanted = resp; - backlog_.write_options_wanted = options; - backlog_.status_wanted = std::move(s); - return; - } - } - stream->WriteAndFinish(resp, options, std::move(s)); - } - - /// Inform system of a planned write operation with specified options, but - /// allow the library to schedule the actual write coalesced with the writing - /// of trailing metadata (which takes place on a Finish call). - /// - /// \param[in] resp The message to be written. The library does not take - /// ownership but the caller must ensure that the message is - /// not deleted or modified until OnWriteDone is called. - /// \param[in] options The WriteOptions to use for writing this message - void StartWriteLast(const Response* resp, grpc::WriteOptions options) { - StartWrite(resp, options.set_last_message()); - } - - /// Indicate that the stream is to be finished and the trailing metadata and - /// RPC status are to be sent. Every RPC MUST be finished using either Finish - /// or StartWriteAndFinish (but not both), even if the RPC is already - /// cancelled. - /// - /// \param[in] s The status outcome of this RPC - void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { - ServerCallbackReaderWriter* stream = - stream_.load(std::memory_order_acquire); - if (stream == nullptr) { - grpc::internal::MutexLock l(&stream_mu_); - stream = stream_.load(std::memory_order_relaxed); - if (stream == nullptr) { - backlog_.finish_wanted = true; - backlog_.status_wanted = std::move(s); - return; - } - } - stream->Finish(std::move(s)); - } - - /// Notifies the application that an explicit StartSendInitialMetadata - /// operation completed. Not used when the sending of initial metadata - /// piggybacks onto the first write. - /// - /// \param[in] ok Was it successful? If false, no further write-side operation - /// will succeed. - virtual void OnSendInitialMetadataDone(bool /*ok*/) {} - - /// Notifies the application that a StartRead operation completed. - /// - /// \param[in] ok Was it successful? If false, no further read-side operation - /// will succeed. - virtual void OnReadDone(bool /*ok*/) {} - - /// Notifies the application that a StartWrite (or StartWriteLast) operation - /// completed. - /// - /// \param[in] ok Was it successful? If false, no further write-side operation - /// will succeed. - virtual void OnWriteDone(bool /*ok*/) {} - - /// Notifies the application that all operations associated with this RPC - /// have completed. This is an override (from the internal base class) but - /// still abstract, so derived classes MUST override it to be instantiated. - void OnDone() override = 0; - - /// Notifies the application that this RPC has been cancelled. This is an - /// override (from the internal base class) but not final, so derived classes - /// should override it if they want to take action. - void OnCancel() override {} - - private: - friend class ServerCallbackReaderWriter; - // May be overridden by internal implementation details. This is not a public - // customization point. - virtual void InternalBindStream( - ServerCallbackReaderWriter* stream) { - grpc::internal::MutexLock l(&stream_mu_); - - if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { - stream->SendInitialMetadata(); - } - if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { - stream->Read(backlog_.read_wanted); - } - if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { - stream->WriteAndFinish(backlog_.write_wanted, - std::move(backlog_.write_options_wanted), - std::move(backlog_.status_wanted)); - } else { - if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { - stream->Write(backlog_.write_wanted, - std::move(backlog_.write_options_wanted)); - } - if (GPR_UNLIKELY(backlog_.finish_wanted)) { - stream->Finish(std::move(backlog_.status_wanted)); - } - } - // Set stream_ last so that other functions can use it lock-free - stream_.store(stream, std::memory_order_release); - } - - grpc::internal::Mutex stream_mu_; - // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant - // once C++17 or ABSL is supported since stream and backlog are - // mutually exclusive in this class. Do likewise with the - // remaining reactor classes and their backlogs as well. - std::atomic*> stream_{nullptr}; - struct PreBindBacklog { - bool send_initial_metadata_wanted = false; - bool write_and_finish_wanted = false; - bool finish_wanted = false; - Request* read_wanted = nullptr; - const Response* write_wanted = nullptr; - grpc::WriteOptions write_options_wanted; - grpc::Status status_wanted; - }; - PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_); -}; - -/// \a ServerReadReactor is the interface for a client-streaming RPC. -template -class ServerReadReactor : public internal::ServerReactor { - public: - ServerReadReactor() : reader_(nullptr) {} - ~ServerReadReactor() override = default; - - /// The following operation initiations are exactly like ServerBidiReactor. - void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) { - ServerCallbackReader* reader = - reader_.load(std::memory_order_acquire); - if (reader == nullptr) { - grpc::internal::MutexLock l(&reader_mu_); - reader = reader_.load(std::memory_order_relaxed); - if (reader == nullptr) { - backlog_.send_initial_metadata_wanted = true; - return; - } - } - reader->SendInitialMetadata(); - } - void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) { - ServerCallbackReader* reader = - reader_.load(std::memory_order_acquire); - if (reader == nullptr) { - grpc::internal::MutexLock l(&reader_mu_); - reader = reader_.load(std::memory_order_relaxed); - if (reader == nullptr) { - backlog_.read_wanted = req; - return; - } - } - reader->Read(req); - } - void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) { - ServerCallbackReader* reader = - reader_.load(std::memory_order_acquire); - if (reader == nullptr) { - grpc::internal::MutexLock l(&reader_mu_); - reader = reader_.load(std::memory_order_relaxed); - if (reader == nullptr) { - backlog_.finish_wanted = true; - backlog_.status_wanted = std::move(s); - return; - } - } - reader->Finish(std::move(s)); - } - - /// The following notifications are exactly like ServerBidiReactor. - virtual void OnSendInitialMetadataDone(bool /*ok*/) {} - virtual void OnReadDone(bool /*ok*/) {} - void OnDone() override = 0; - void OnCancel() override {} - - private: - friend class ServerCallbackReader; - - // May be overridden by internal implementation details. This is not a public - // customization point. - virtual void InternalBindReader(ServerCallbackReader* reader) - ABSL_LOCKS_EXCLUDED(reader_mu_) { - grpc::internal::MutexLock l(&reader_mu_); - - if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { - reader->SendInitialMetadata(); - } - if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { - reader->Read(backlog_.read_wanted); - } - if (GPR_UNLIKELY(backlog_.finish_wanted)) { - reader->Finish(std::move(backlog_.status_wanted)); - } - // Set reader_ last so that other functions can use it lock-free - reader_.store(reader, std::memory_order_release); - } - - grpc::internal::Mutex reader_mu_; - std::atomic*> reader_{nullptr}; - struct PreBindBacklog { - bool send_initial_metadata_wanted = false; - bool finish_wanted = false; - Request* read_wanted = nullptr; - grpc::Status status_wanted; - }; - PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_); -}; - -/// \a ServerWriteReactor is the interface for a server-streaming RPC. -template -class ServerWriteReactor : public internal::ServerReactor { - public: - ServerWriteReactor() : writer_(nullptr) {} - ~ServerWriteReactor() override = default; - - /// The following operation initiations are exactly like ServerBidiReactor. - void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) { - ServerCallbackWriter* writer = - writer_.load(std::memory_order_acquire); - if (writer == nullptr) { - grpc::internal::MutexLock l(&writer_mu_); - writer = writer_.load(std::memory_order_relaxed); - if (writer == nullptr) { - backlog_.send_initial_metadata_wanted = true; - return; - } - } - writer->SendInitialMetadata(); - } - void StartWrite(const Response* resp) { - StartWrite(resp, grpc::WriteOptions()); - } - void StartWrite(const Response* resp, grpc::WriteOptions options) - ABSL_LOCKS_EXCLUDED(writer_mu_) { - ServerCallbackWriter* writer = - writer_.load(std::memory_order_acquire); - if (writer == nullptr) { - grpc::internal::MutexLock l(&writer_mu_); - writer = writer_.load(std::memory_order_relaxed); - if (writer == nullptr) { - backlog_.write_wanted = resp; - backlog_.write_options_wanted = options; - return; - } - } - writer->Write(resp, options); - } - void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, - grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { - ServerCallbackWriter* writer = - writer_.load(std::memory_order_acquire); - if (writer == nullptr) { - grpc::internal::MutexLock l(&writer_mu_); - writer = writer_.load(std::memory_order_relaxed); - if (writer == nullptr) { - backlog_.write_and_finish_wanted = true; - backlog_.write_wanted = resp; - backlog_.write_options_wanted = options; - backlog_.status_wanted = std::move(s); - return; - } - } - writer->WriteAndFinish(resp, options, std::move(s)); - } - void StartWriteLast(const Response* resp, grpc::WriteOptions options) { - StartWrite(resp, options.set_last_message()); - } - void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { - ServerCallbackWriter* writer = - writer_.load(std::memory_order_acquire); - if (writer == nullptr) { - grpc::internal::MutexLock l(&writer_mu_); - writer = writer_.load(std::memory_order_relaxed); - if (writer == nullptr) { - backlog_.finish_wanted = true; - backlog_.status_wanted = std::move(s); - return; - } - } - writer->Finish(std::move(s)); - } - - /// The following notifications are exactly like ServerBidiReactor. - virtual void OnSendInitialMetadataDone(bool /*ok*/) {} - virtual void OnWriteDone(bool /*ok*/) {} - void OnDone() override = 0; - void OnCancel() override {} - - private: - friend class ServerCallbackWriter; - // May be overridden by internal implementation details. This is not a public - // customization point. - virtual void InternalBindWriter(ServerCallbackWriter* writer) - ABSL_LOCKS_EXCLUDED(writer_mu_) { - grpc::internal::MutexLock l(&writer_mu_); - - if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { - writer->SendInitialMetadata(); - } - if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { - writer->WriteAndFinish(backlog_.write_wanted, - std::move(backlog_.write_options_wanted), - std::move(backlog_.status_wanted)); - } else { - if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { - writer->Write(backlog_.write_wanted, - std::move(backlog_.write_options_wanted)); - } - if (GPR_UNLIKELY(backlog_.finish_wanted)) { - writer->Finish(std::move(backlog_.status_wanted)); - } - } - // Set writer_ last so that other functions can use it lock-free - writer_.store(writer, std::memory_order_release); - } - - grpc::internal::Mutex writer_mu_; - std::atomic*> writer_{nullptr}; - struct PreBindBacklog { - bool send_initial_metadata_wanted = false; - bool write_and_finish_wanted = false; - bool finish_wanted = false; - const Response* write_wanted = nullptr; - grpc::WriteOptions write_options_wanted; - grpc::Status status_wanted; - }; - PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_); -}; - -class ServerUnaryReactor : public internal::ServerReactor { - public: - ServerUnaryReactor() : call_(nullptr) {} - ~ServerUnaryReactor() override = default; - - /// StartSendInitialMetadata is exactly like ServerBidiReactor. - void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) { - ServerCallbackUnary* call = call_.load(std::memory_order_acquire); - if (call == nullptr) { - grpc::internal::MutexLock l(&call_mu_); - call = call_.load(std::memory_order_relaxed); - if (call == nullptr) { - backlog_.send_initial_metadata_wanted = true; - return; - } - } - call->SendInitialMetadata(); - } - /// Finish is similar to ServerBidiReactor except for one detail. - /// If the status is non-OK, any message will not be sent. Instead, - /// the client will only receive the status and any trailing metadata. - void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) { - ServerCallbackUnary* call = call_.load(std::memory_order_acquire); - if (call == nullptr) { - grpc::internal::MutexLock l(&call_mu_); - call = call_.load(std::memory_order_relaxed); - if (call == nullptr) { - backlog_.finish_wanted = true; - backlog_.status_wanted = std::move(s); - return; - } - } - call->Finish(std::move(s)); - } - - /// The following notifications are exactly like ServerBidiReactor. - virtual void OnSendInitialMetadataDone(bool /*ok*/) {} - void OnDone() override = 0; - void OnCancel() override {} - - private: - friend class ServerCallbackUnary; - // May be overridden by internal implementation details. This is not a public - // customization point. - virtual void InternalBindCall(ServerCallbackUnary* call) - ABSL_LOCKS_EXCLUDED(call_mu_) { - grpc::internal::MutexLock l(&call_mu_); - - if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { - call->SendInitialMetadata(); - } - if (GPR_UNLIKELY(backlog_.finish_wanted)) { - call->Finish(std::move(backlog_.status_wanted)); - } - // Set call_ last so that other functions can use it lock-free - call_.store(call, std::memory_order_release); - } - - grpc::internal::Mutex call_mu_; - std::atomic call_{nullptr}; - struct PreBindBacklog { - bool send_initial_metadata_wanted = false; - bool finish_wanted = false; - grpc::Status status_wanted; - }; - PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_); -}; - -namespace internal { - -template -class FinishOnlyReactor : public Base { - public: - explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); } - void OnDone() override { this->~FinishOnlyReactor(); } -}; - -using UnimplementedUnaryReactor = FinishOnlyReactor; -template -using UnimplementedReadReactor = FinishOnlyReactor>; -template -using UnimplementedWriteReactor = - FinishOnlyReactor>; -template -using UnimplementedBidiReactor = - FinishOnlyReactor>; - -} // namespace internal - -// TODO(vjpai): Remove namespace experimental when last known users are migrated -// off. -namespace experimental { - -template -using ServerBidiReactor = ::grpc::ServerBidiReactor; - -} // namespace experimental - -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h index 1ffdce53d9e..c0f818ee39b 100644 --- a/include/grpcpp/support/server_callback.h +++ b/include/grpcpp/support/server_callback.h @@ -19,6 +19,780 @@ #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H #define GRPCPP_SUPPORT_SERVER_CALLBACK_H -#include // IWYU pragma: export +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace grpc { + +// Declare base class of all reactors as internal +namespace internal { + +// Forward declarations +template +class CallbackUnaryHandler; +template +class CallbackClientStreamingHandler; +template +class CallbackServerStreamingHandler; +template +class CallbackBidiHandler; + +class ServerReactor { + public: + virtual ~ServerReactor() = default; + virtual void OnDone() = 0; + virtual void OnCancel() = 0; + + // The following is not API. It is for internal use only and specifies whether + // all reactions of this Reactor can be run without an extra executor + // scheduling. This should only be used for internally-defined reactors with + // trivial reactions. + virtual bool InternalInlineable() { return false; } + + private: + template + friend class CallbackUnaryHandler; + template + friend class CallbackClientStreamingHandler; + template + friend class CallbackServerStreamingHandler; + template + friend class CallbackBidiHandler; +}; + +/// The base class of ServerCallbackUnary etc. +class ServerCallbackCall { + public: + virtual ~ServerCallbackCall() {} + + // This object is responsible for tracking when it is safe to call OnDone and + // OnCancel. OnDone should not be called until the method handler is complete, + // Finish has been called, the ServerContext CompletionOp (which tracks + // cancellation or successful completion) has completed, and all outstanding + // Read/Write actions have seen their reactions. OnCancel should not be called + // until after the method handler is done and the RPC has completed with a + // cancellation. This is tracked by counting how many of these conditions have + // been met and calling OnCancel when none remain unmet. + + // Public versions of MaybeDone: one where we don't know the reactor in + // advance (used for the ServerContext CompletionOp), and one for where we + // know the inlineability of the OnDone reaction. You should set the inline + // flag to true if either the Reactor is InternalInlineable() or if this + // callback is already being forced to run dispatched to an executor + // (typically because it contains additional work than just the MaybeDone). + + void MaybeDone() { + if (GPR_UNLIKELY(Unref() == 1)) { + ScheduleOnDone(reactor()->InternalInlineable()); + } + } + + void MaybeDone(bool inline_ondone) { + if (GPR_UNLIKELY(Unref() == 1)) { + ScheduleOnDone(inline_ondone); + } + } + + // Fast version called with known reactor passed in, used from derived + // classes, typically in non-cancel case + void MaybeCallOnCancel(ServerReactor* reactor) { + if (GPR_UNLIKELY(UnblockCancellation())) { + CallOnCancel(reactor); + } + } + + // Slower version called from object that doesn't know the reactor a priori + // (such as the ServerContext CompletionOp which is formed before the + // reactor). This is used in cancel cases only, so it's ok to be slower and + // invoke a virtual function. + void MaybeCallOnCancel() { + if (GPR_UNLIKELY(UnblockCancellation())) { + CallOnCancel(reactor()); + } + } + + protected: + /// Increases the reference count + void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } + + private: + virtual ServerReactor* reactor() = 0; + + // CallOnDone performs the work required at completion of the RPC: invoking + // the OnDone function and doing all necessary cleanup. This function is only + // ever invoked on a fully-Unref'fed ServerCallbackCall. + virtual void CallOnDone() = 0; + + // If the OnDone reaction is inlineable, execute it inline. Otherwise send it + // to an executor. + void ScheduleOnDone(bool inline_ondone); + + // If the OnCancel reaction is inlineable, execute it inline. Otherwise send + // it to an executor. + void CallOnCancel(ServerReactor* reactor); + + // Implement the cancellation constraint counter. Return true if OnCancel + // should be called, false otherwise. + bool UnblockCancellation() { + return on_cancel_conditions_remaining_.fetch_sub( + 1, std::memory_order_acq_rel) == 1; + } + + /// Decreases the reference count and returns the previous value + int Unref() { + return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); + } + + std::atomic_int on_cancel_conditions_remaining_{2}; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for start, Finish, and CompletionOp +}; + +template +class DefaultMessageHolder : public MessageHolder { + public: + DefaultMessageHolder() { + this->set_request(&request_obj_); + this->set_response(&response_obj_); + } + void Release() override { + // the object is allocated in the call arena. + this->~DefaultMessageHolder(); + } + + private: + Request request_obj_; + Response response_obj_; +}; + +} // namespace internal + +// Forward declarations +class ServerUnaryReactor; +template +class ServerReadReactor; +template +class ServerWriteReactor; +template +class ServerBidiReactor; + +// NOTE: The actual call/stream object classes are provided as API only to +// support mocking. There are no implementations of these class interfaces in +// the API. +class ServerCallbackUnary : public internal::ServerCallbackCall { + public: + ~ServerCallbackUnary() override {} + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + + protected: + // Use a template rather than explicitly specifying ServerUnaryReactor to + // delay binding and avoid a circular forward declaration issue + template + void BindReactor(Reactor* reactor) { + reactor->InternalBindCall(this); + } +}; + +template +class ServerCallbackReader : public internal::ServerCallbackCall { + public: + ~ServerCallbackReader() override {} + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + + protected: + void BindReactor(ServerReadReactor* reactor) { + reactor->InternalBindReader(this); + } +}; + +template +class ServerCallbackWriter : public internal::ServerCallbackCall { + public: + ~ServerCallbackWriter() override {} + + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, + grpc::Status s) = 0; + + protected: + void BindReactor(ServerWriteReactor* reactor) { + reactor->InternalBindWriter(this); + } +}; + +template +class ServerCallbackReaderWriter : public internal::ServerCallbackCall { + public: + ~ServerCallbackReaderWriter() override {} + + virtual void Finish(grpc::Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, + grpc::Status s) = 0; + + protected: + void BindReactor(ServerBidiReactor* reactor) { + reactor->InternalBindStream(this); + } +}; + +// The following classes are the reactor interfaces that are to be implemented +// by the user, returned as the output parameter of the method handler for a +// callback method. Note that none of the classes are pure; all reactions have a +// default empty reaction so that the user class only needs to override those +// reactions that it cares about. The reaction methods will be invoked by the +// library in response to the completion of various operations. Reactions must +// not include blocking operations (such as blocking I/O, starting synchronous +// RPCs, or waiting on condition variables). Reactions may be invoked +// concurrently, except that OnDone is called after all others (assuming proper +// API usage). The reactor may not be deleted until OnDone is called. + +/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. +template +class ServerBidiReactor : public internal::ServerReactor { + public: + // NOTE: Initializing stream_ as a constructor initializer rather than a + // default initializer because gcc-4.x requires a copy constructor for + // default initializing a templated member, which isn't ok for atomic. + // TODO(vjpai): Switch to default constructor and default initializer when + // gcc-4.x is no longer supported + ServerBidiReactor() : stream_(nullptr) {} + ~ServerBidiReactor() override = default; + + /// Send any initial metadata stored in the RPC context. If not invoked, + /// any initial metadata will be passed along with the first Write or the + /// Finish (if there are no writes). + void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + stream->SendInitialMetadata(); + } + + /// Initiate a read operation. + /// + /// \param[out] req Where to eventually store the read message. Valid when + /// the library calls OnReadDone + void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.read_wanted = req; + return; + } + } + stream->Read(req); + } + + /// Initiate a write operation. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + void StartWrite(const Response* resp) { + StartWrite(resp, grpc::WriteOptions()); + } + + /// Initiate a write operation with specified options. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWrite(const Response* resp, grpc::WriteOptions options) + ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + return; + } + } + stream->Write(resp, options); + } + + /// Initiate a write operation with specified options and final RPC Status, + /// which also causes any trailing metadata for this RPC to be sent out. + /// StartWriteAndFinish is like merging StartWriteLast and Finish into a + /// single step. A key difference, though, is that this operation doesn't have + /// an OnWriteDone reaction - it is considered complete only when OnDone is + /// available. An RPC can either have StartWriteAndFinish or Finish, but not + /// both. + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnDone is called. + /// \param[in] options The WriteOptions to use for writing this message + /// \param[in] s The status outcome of this RPC + void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, + grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.write_and_finish_wanted = true; + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + backlog_.status_wanted = std::move(s); + return; + } + } + stream->WriteAndFinish(resp, options, std::move(s)); + } + + /// Inform system of a planned write operation with specified options, but + /// allow the library to schedule the actual write coalesced with the writing + /// of trailing metadata (which takes place on a Finish call). + /// + /// \param[in] resp The message to be written. The library does not take + /// ownership but the caller must ensure that the message is + /// not deleted or modified until OnWriteDone is called. + /// \param[in] options The WriteOptions to use for writing this message + void StartWriteLast(const Response* resp, grpc::WriteOptions options) { + StartWrite(resp, options.set_last_message()); + } + + /// Indicate that the stream is to be finished and the trailing metadata and + /// RPC status are to be sent. Every RPC MUST be finished using either Finish + /// or StartWriteAndFinish (but not both), even if the RPC is already + /// cancelled. + /// + /// \param[in] s The status outcome of this RPC + void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { + ServerCallbackReaderWriter* stream = + stream_.load(std::memory_order_acquire); + if (stream == nullptr) { + grpc::internal::MutexLock l(&stream_mu_); + stream = stream_.load(std::memory_order_relaxed); + if (stream == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + stream->Finish(std::move(s)); + } + + /// Notifies the application that an explicit StartSendInitialMetadata + /// operation completed. Not used when the sending of initial metadata + /// piggybacks onto the first write. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no further read-side operation + /// will succeed. + virtual void OnReadDone(bool /*ok*/) {} + + /// Notifies the application that a StartWrite (or StartWriteLast) operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnWriteDone(bool /*ok*/) {} + + /// Notifies the application that all operations associated with this RPC + /// have completed. This is an override (from the internal base class) but + /// still abstract, so derived classes MUST override it to be instantiated. + void OnDone() override = 0; + + /// Notifies the application that this RPC has been cancelled. This is an + /// override (from the internal base class) but not final, so derived classes + /// should override it if they want to take action. + void OnCancel() override {} + + private: + friend class ServerCallbackReaderWriter; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindStream( + ServerCallbackReaderWriter* stream) { + grpc::internal::MutexLock l(&stream_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + stream->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { + stream->Read(backlog_.read_wanted); + } + if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { + stream->WriteAndFinish(backlog_.write_wanted, + std::move(backlog_.write_options_wanted), + std::move(backlog_.status_wanted)); + } else { + if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { + stream->Write(backlog_.write_wanted, + std::move(backlog_.write_options_wanted)); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + stream->Finish(std::move(backlog_.status_wanted)); + } + } + // Set stream_ last so that other functions can use it lock-free + stream_.store(stream, std::memory_order_release); + } + + grpc::internal::Mutex stream_mu_; + // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant + // once C++17 or ABSL is supported since stream and backlog are + // mutually exclusive in this class. Do likewise with the + // remaining reactor classes and their backlogs as well. + std::atomic*> stream_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool write_and_finish_wanted = false; + bool finish_wanted = false; + Request* read_wanted = nullptr; + const Response* write_wanted = nullptr; + grpc::WriteOptions write_options_wanted; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_); +}; + +/// \a ServerReadReactor is the interface for a client-streaming RPC. +template +class ServerReadReactor : public internal::ServerReactor { + public: + ServerReadReactor() : reader_(nullptr) {} + ~ServerReadReactor() override = default; + + /// The following operation initiations are exactly like ServerBidiReactor. + void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + reader->SendInitialMetadata(); + } + void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.read_wanted = req; + return; + } + } + reader->Read(req); + } + void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) { + ServerCallbackReader* reader = + reader_.load(std::memory_order_acquire); + if (reader == nullptr) { + grpc::internal::MutexLock l(&reader_mu_); + reader = reader_.load(std::memory_order_relaxed); + if (reader == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + reader->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + virtual void OnReadDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackReader; + + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindReader(ServerCallbackReader* reader) + ABSL_LOCKS_EXCLUDED(reader_mu_) { + grpc::internal::MutexLock l(&reader_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + reader->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { + reader->Read(backlog_.read_wanted); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + reader->Finish(std::move(backlog_.status_wanted)); + } + // Set reader_ last so that other functions can use it lock-free + reader_.store(reader, std::memory_order_release); + } + + grpc::internal::Mutex reader_mu_; + std::atomic*> reader_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool finish_wanted = false; + Request* read_wanted = nullptr; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_); +}; + +/// \a ServerWriteReactor is the interface for a server-streaming RPC. +template +class ServerWriteReactor : public internal::ServerReactor { + public: + ServerWriteReactor() : writer_(nullptr) {} + ~ServerWriteReactor() override = default; + + /// The following operation initiations are exactly like ServerBidiReactor. + void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + writer->SendInitialMetadata(); + } + void StartWrite(const Response* resp) { + StartWrite(resp, grpc::WriteOptions()); + } + void StartWrite(const Response* resp, grpc::WriteOptions options) + ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + return; + } + } + writer->Write(resp, options); + } + void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, + grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.write_and_finish_wanted = true; + backlog_.write_wanted = resp; + backlog_.write_options_wanted = options; + backlog_.status_wanted = std::move(s); + return; + } + } + writer->WriteAndFinish(resp, options, std::move(s)); + } + void StartWriteLast(const Response* resp, grpc::WriteOptions options) { + StartWrite(resp, options.set_last_message()); + } + void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { + ServerCallbackWriter* writer = + writer_.load(std::memory_order_acquire); + if (writer == nullptr) { + grpc::internal::MutexLock l(&writer_mu_); + writer = writer_.load(std::memory_order_relaxed); + if (writer == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + writer->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + virtual void OnWriteDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackWriter; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindWriter(ServerCallbackWriter* writer) + ABSL_LOCKS_EXCLUDED(writer_mu_) { + grpc::internal::MutexLock l(&writer_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + writer->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { + writer->WriteAndFinish(backlog_.write_wanted, + std::move(backlog_.write_options_wanted), + std::move(backlog_.status_wanted)); + } else { + if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { + writer->Write(backlog_.write_wanted, + std::move(backlog_.write_options_wanted)); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + writer->Finish(std::move(backlog_.status_wanted)); + } + } + // Set writer_ last so that other functions can use it lock-free + writer_.store(writer, std::memory_order_release); + } + + grpc::internal::Mutex writer_mu_; + std::atomic*> writer_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool write_and_finish_wanted = false; + bool finish_wanted = false; + const Response* write_wanted = nullptr; + grpc::WriteOptions write_options_wanted; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_); +}; + +class ServerUnaryReactor : public internal::ServerReactor { + public: + ServerUnaryReactor() : call_(nullptr) {} + ~ServerUnaryReactor() override = default; + + /// StartSendInitialMetadata is exactly like ServerBidiReactor. + void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) { + ServerCallbackUnary* call = call_.load(std::memory_order_acquire); + if (call == nullptr) { + grpc::internal::MutexLock l(&call_mu_); + call = call_.load(std::memory_order_relaxed); + if (call == nullptr) { + backlog_.send_initial_metadata_wanted = true; + return; + } + } + call->SendInitialMetadata(); + } + /// Finish is similar to ServerBidiReactor except for one detail. + /// If the status is non-OK, any message will not be sent. Instead, + /// the client will only receive the status and any trailing metadata. + void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) { + ServerCallbackUnary* call = call_.load(std::memory_order_acquire); + if (call == nullptr) { + grpc::internal::MutexLock l(&call_mu_); + call = call_.load(std::memory_order_relaxed); + if (call == nullptr) { + backlog_.finish_wanted = true; + backlog_.status_wanted = std::move(s); + return; + } + } + call->Finish(std::move(s)); + } + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool /*ok*/) {} + void OnDone() override = 0; + void OnCancel() override {} + + private: + friend class ServerCallbackUnary; + // May be overridden by internal implementation details. This is not a public + // customization point. + virtual void InternalBindCall(ServerCallbackUnary* call) + ABSL_LOCKS_EXCLUDED(call_mu_) { + grpc::internal::MutexLock l(&call_mu_); + + if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { + call->SendInitialMetadata(); + } + if (GPR_UNLIKELY(backlog_.finish_wanted)) { + call->Finish(std::move(backlog_.status_wanted)); + } + // Set call_ last so that other functions can use it lock-free + call_.store(call, std::memory_order_release); + } + + grpc::internal::Mutex call_mu_; + std::atomic call_{nullptr}; + struct PreBindBacklog { + bool send_initial_metadata_wanted = false; + bool finish_wanted = false; + grpc::Status status_wanted; + }; + PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_); +}; + +namespace internal { + +template +class FinishOnlyReactor : public Base { + public: + explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); } + void OnDone() override { this->~FinishOnlyReactor(); } +}; + +using UnimplementedUnaryReactor = FinishOnlyReactor; +template +using UnimplementedReadReactor = FinishOnlyReactor>; +template +using UnimplementedWriteReactor = + FinishOnlyReactor>; +template +using UnimplementedBidiReactor = + FinishOnlyReactor>; + +} // namespace internal + +// TODO(vjpai): Remove namespace experimental when last known users are migrated +// off. +namespace experimental { + +template +using ServerBidiReactor = ::grpc::ServerBidiReactor; + +} // namespace experimental + +} // namespace grpc #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index f793b43b3b4..4249df6d3e7 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -147,7 +147,7 @@ std::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/support/method_handler.h", "grpcpp/impl/codegen/proto_utils.h", "grpcpp/impl/rpc_method.h", - "grpcpp/impl/codegen/server_callback.h", + "grpcpp/support/server_callback.h", "grpcpp/impl/codegen/server_callback_handlers.h", "grpcpp/server_context.h", "grpcpp/impl/codegen/service_type.h", @@ -1656,7 +1656,7 @@ std::string GetSourceIncludes(grpc_generator::File* file, "grpcpp/support/message_allocator.h", "grpcpp/support/method_handler.h", "grpcpp/impl/codegen/rpc_service_method.h", - "grpcpp/impl/codegen/server_callback.h", + "grpcpp/support/server_callback.h", "grpcpp/impl/codegen/server_callback_handlers.h", "grpcpp/server_context.h", "grpcpp/impl/codegen/service_type.h", diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index e73bb47311b..20a695c3ff3 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -37,7 +37,7 @@ #include #include #include -#include +#include #include #include #include