/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #include #include #include #include #include #include #include #include #include #include #include #include namespace grpc { // Declare base class of all reactors as internal namespace internal { // Forward declarations template class CallbackClientStreamingHandler; template class CallbackServerStreamingHandler; template class CallbackBidiHandler; class ServerReactor { public: virtual ~ServerReactor() = default; virtual void OnDone() = 0; virtual void OnCancel() = 0; private: friend class ::grpc_impl::ServerContext; template friend class CallbackClientStreamingHandler; template friend class CallbackServerStreamingHandler; template friend class CallbackBidiHandler; // The ServerReactor is responsible for tracking when it is safe to call // OnCancel. This function should not be called until after OnStarted is done // and the RPC has completed with a cancellation. This is tracked by counting // how many of these conditions have been met and calling OnCancel when none // remain unmet. void MaybeCallOnCancel() { if (on_cancel_conditions_remaining_.fetch_sub( 1, std::memory_order_acq_rel) == 1) { OnCancel(); } } std::atomic_int on_cancel_conditions_remaining_{2}; }; template class DefaultMessageHolder : public experimental::MessageHolder { public: DefaultMessageHolder() { this->set_request(&request_obj_); this->set_response(&response_obj_); } void Release() override { // the object is allocated in the call arena. this->~DefaultMessageHolder(); } private: Request request_obj_; Response response_obj_; }; } // namespace internal namespace experimental { // Forward declarations template class ServerReadReactor; template class ServerWriteReactor; template class ServerBidiReactor; // For unary RPCs, the exposed controller class is only an interface // and the actual implementation is an internal class. class ServerCallbackRpcController { public: virtual ~ServerCallbackRpcController() = default; // The method handler must call this function when it is done so that // the library knows to free its resources virtual void Finish(Status s) = 0; // Allow the method handler to push out the initial metadata before // the response and status are ready virtual void SendInitialMetadata(std::function) = 0; /// SetCancelCallback passes in a callback to be called when the RPC is /// canceled for whatever reason (streaming calls have OnCancel instead). This /// is an advanced and uncommon use with several important restrictions. This /// function may not be called more than once on the same RPC. /// /// If code calls SetCancelCallback on an RPC, it must also call /// ClearCancelCallback before calling Finish on the RPC controller. That /// method makes sure that no cancellation callback is executed for this RPC /// beyond the point of its return. ClearCancelCallback may be called even if /// SetCancelCallback was not called for this RPC, and it may be called /// multiple times. It _must_ be called if SetCancelCallback was called for /// this RPC. /// /// The callback should generally be lightweight and nonblocking and primarily /// concerned with clearing application state related to the RPC or causing /// operations (such as cancellations) to happen on dependent RPCs. /// /// If the RPC is already canceled at the time that SetCancelCallback is /// called, the callback is invoked immediately. /// /// The cancellation callback may be executed concurrently with the method /// handler that invokes it but will certainly not issue or execute after the /// return of ClearCancelCallback. If ClearCancelCallback is invoked while the /// callback is already executing, the callback will complete its execution /// before ClearCancelCallback takes effect. /// /// To preserve the orderings described above, the callback may be called /// under a lock that is also used for ClearCancelCallback and /// ServerContext::IsCancelled, so the callback CANNOT call either of those /// operations on this RPC or any other function that causes those operations /// to be called before the callback completes. virtual void SetCancelCallback(std::function callback) = 0; virtual void ClearCancelCallback() = 0; // NOTE: This is an API for advanced users who need custom allocators. // Get and maybe mutate the allocator state associated with the current RPC. virtual RpcAllocatorState* GetRpcAllocatorState() = 0; }; // NOTE: The actual streaming object classes are provided // as API only to support mocking. There are no implementations of // these class interfaces in the API. template class ServerCallbackReader { public: virtual ~ServerCallbackReader() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; protected: template void BindReactor(ServerReadReactor* reactor) { reactor->BindReader(this); } }; template class ServerCallbackWriter { public: virtual ~ServerCallbackWriter() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Write(const Response* msg, WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, WriteOptions options, Status s) { // Default implementation that can/should be overridden Write(msg, std::move(options)); Finish(std::move(s)); } protected: template void BindReactor(ServerWriteReactor* reactor) { reactor->BindWriter(this); } }; template class ServerCallbackReaderWriter { public: virtual ~ServerCallbackReaderWriter() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; virtual void Write(const Response* msg, WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, WriteOptions options, Status s) { // Default implementation that can/should be overridden Write(msg, std::move(options)); Finish(std::move(s)); } protected: void BindReactor(ServerBidiReactor* reactor) { reactor->BindStream(this); } }; // The following classes are the reactor interfaces that are to be implemented // by the user, returned as the result of the method handler for a callback // method, and activated by the call to OnStarted. The library guarantees that // OnStarted will be called for any reactor that has been created using a // method handler registered on a service. No operation initiation method may be // called until after the call to OnStarted. // Note that none of the classes are pure; all reactions have a default empty // reaction so that the user class only needs to override those classes that it // cares about. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. template class ServerBidiReactor : public internal::ServerReactor { public: ~ServerBidiReactor() = default; /// Do NOT call any operation initiation method (names that start with Start) /// until after the library has called OnStarted on this object. /// Send any initial metadata stored in the RPC context. If not invoked, /// any initial metadata will be passed along with the first Write or the /// Finish (if there are no writes). void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } /// Initiate a read operation. /// /// \param[out] req Where to eventually store the read message. Valid when /// the library calls OnReadDone void StartRead(Request* req) { stream_->Read(req); } /// Initiate a write operation. /// /// \param[in] resp The message to be written. The library takes temporary /// ownership until OnWriteDone, at which point the /// application regains ownership of resp. void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); } /// Initiate a write operation with specified options. /// /// \param[in] resp The message to be written. The library takes temporary /// ownership until OnWriteDone, at which point the /// application regains ownership of resp. /// \param[in] options The WriteOptions to use for writing this message void StartWrite(const Response* resp, WriteOptions options) { stream_->Write(resp, std::move(options)); } /// Initiate a write operation with specified options and final RPC Status, /// which also causes any trailing metadata for this RPC to be sent out. /// StartWriteAndFinish is like merging StartWriteLast and Finish into a /// single step. A key difference, though, is that this operation doesn't have /// an OnWriteDone reaction - it is considered complete only when OnDone is /// available. An RPC can either have StartWriteAndFinish or Finish, but not /// both. /// /// \param[in] resp The message to be written. The library takes temporary /// ownership until Onone, at which point the application /// regains ownership of resp. /// \param[in] options The WriteOptions to use for writing this message /// \param[in] s The status outcome of this RPC void StartWriteAndFinish(const Response* resp, WriteOptions options, Status s) { stream_->WriteAndFinish(resp, std::move(options), std::move(s)); } /// Inform system of a planned write operation with specified options, but /// allow the library to schedule the actual write coalesced with the writing /// of trailing metadata (which takes place on a Finish call). /// /// \param[in] resp The message to be written. The library takes temporary /// ownership until OnWriteDone, at which point the /// application regains ownership of resp. /// \param[in] options The WriteOptions to use for writing this message void StartWriteLast(const Response* resp, WriteOptions options) { StartWrite(resp, std::move(options.set_last_message())); } /// Indicate that the stream is to be finished and the trailing metadata and /// RPC status are to be sent. Every RPC MUST be finished using either Finish /// or StartWriteAndFinish (but not both), even if the RPC is already /// cancelled. /// /// \param[in] s The status outcome of this RPC void Finish(Status s) { stream_->Finish(std::move(s)); } /// Notify the application that a streaming RPC has started and that it is now /// ok to call any operation initiation method. An RPC is considered started /// after the server has received all initial metadata from the client, which /// is a result of the client calling StartCall(). /// /// \param[in] context The context object now associated with this RPC virtual void OnStarted(::grpc_impl::ServerContext* context) {} /// Notifies the application that an explicit StartSendInitialMetadata /// operation completed. Not used when the sending of initial metadata /// piggybacks onto the first write. /// /// \param[in] ok Was it successful? If false, no further write-side operation /// will succeed. virtual void OnSendInitialMetadataDone(bool ok) {} /// Notifies the application that a StartRead operation completed. /// /// \param[in] ok Was it successful? If false, no further read-side operation /// will succeed. virtual void OnReadDone(bool ok) {} /// Notifies the application that a StartWrite (or StartWriteLast) operation /// completed. /// /// \param[in] ok Was it successful? If false, no further write-side operation /// will succeed. virtual void OnWriteDone(bool ok) {} /// Notifies the application that all operations associated with this RPC /// have completed. This is an override (from the internal base class) but not /// final, so derived classes should override it if they want to take action. void OnDone() override {} /// Notifies the application that this RPC has been cancelled. This is an /// override (from the internal base class) but not final, so derived classes /// should override it if they want to take action. void OnCancel() override {} private: friend class ServerCallbackReaderWriter; virtual void BindStream( ServerCallbackReaderWriter* stream) { stream_ = stream; } ServerCallbackReaderWriter* stream_; }; /// \a ServerReadReactor is the interface for a client-streaming RPC. template class ServerReadReactor : public internal::ServerReactor { public: ~ServerReadReactor() = default; /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } void StartRead(Request* req) { reader_->Read(req); } void Finish(Status s) { reader_->Finish(std::move(s)); } /// Similar to ServerBidiReactor::OnStarted, except that this also provides /// the response object that the stream fills in before calling Finish. /// (It must be filled in if status is OK, but it may be filled in otherwise.) /// /// \param[in] context The context object now associated with this RPC /// \param[in] resp The response object to be used by this RPC virtual void OnStarted(::grpc_impl::ServerContext* context, Response* resp) {} /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool ok) {} virtual void OnReadDone(bool ok) {} void OnDone() override {} void OnCancel() override {} private: friend class ServerCallbackReader; virtual void BindReader(ServerCallbackReader* reader) { reader_ = reader; } ServerCallbackReader* reader_; }; /// \a ServerWriteReactor is the interface for a server-streaming RPC. template class ServerWriteReactor : public internal::ServerReactor { public: ~ServerWriteReactor() = default; /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); } void StartWrite(const Response* resp, WriteOptions options) { writer_->Write(resp, std::move(options)); } void StartWriteAndFinish(const Response* resp, WriteOptions options, Status s) { writer_->WriteAndFinish(resp, std::move(options), std::move(s)); } void StartWriteLast(const Response* resp, WriteOptions options) { StartWrite(resp, std::move(options.set_last_message())); } void Finish(Status s) { writer_->Finish(std::move(s)); } /// Similar to ServerBidiReactor::OnStarted, except that this also provides /// the request object sent by the client. /// /// \param[in] context The context object now associated with this RPC /// \param[in] req The request object sent by the client virtual void OnStarted(::grpc_impl::ServerContext* context, const Request* req) {} /// The following notifications are exactly like ServerBidiReactor. virtual void OnSendInitialMetadataDone(bool ok) {} virtual void OnWriteDone(bool ok) {} void OnDone() override {} void OnCancel() override {} private: friend class ServerCallbackWriter; virtual void BindWriter(ServerCallbackWriter* writer) { writer_ = writer; } ServerCallbackWriter* writer_; }; } // namespace experimental namespace internal { template class UnimplementedReadReactor : public experimental::ServerReadReactor { public: void OnDone() override { delete this; } void OnStarted(::grpc_impl::ServerContext*, Response*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class UnimplementedWriteReactor : public experimental::ServerWriteReactor { public: void OnDone() override { delete this; } void OnStarted(::grpc_impl::ServerContext*, const Request*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class UnimplementedBidiReactor : public experimental::ServerBidiReactor { public: void OnDone() override { delete this; } void OnStarted(::grpc_impl::ServerContext*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class CallbackUnaryHandler : public MethodHandler { public: CallbackUnaryHandler( std::function func) : func_(func) {} void SetMessageAllocator( experimental::MessageAllocator* allocator) { allocator_ = allocator; } void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) g_core_codegen_interface->grpc_call_ref(param.call->call()); auto* allocator_state = static_cast*>( param.internal_data); auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) ServerCallbackRpcControllerImpl(param.server_context, param.call, allocator_state, std::move(param.call_requester)); Status status = param.status; if (status.ok()) { // Call the actual function handler and expect the user to call finish CatchingCallback(func_, param.server_context, controller->request(), controller->response(), controller); } else { // if deserialization failed, we need to fail the call controller->Finish(status); } } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); RequestType* request = nullptr; experimental::MessageHolder* allocator_state = nullptr; if (allocator_ != nullptr) { allocator_state = allocator_->AllocateMessages(); } else { allocator_state = new (g_core_codegen_interface->grpc_call_arena_alloc( call, sizeof(DefaultMessageHolder))) DefaultMessageHolder(); } *handler_data = allocator_state; request = allocator_state->request(); *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } // Clean up on deserialization failure. allocator_state->Release(); return nullptr; } private: std::function func_; experimental::MessageAllocator* allocator_ = nullptr; // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows // it to take advantage of CallbackUnaryHandler's friendships. class ServerCallbackRpcControllerImpl : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, finish_ops_.SendMessagePtr(response())); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); } void SendInitialMetadata(std::function f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14 // and if performance of this operation matters meta_tag_.Set(call_.call(), [this, f](bool ok) { f(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } // Neither SetCancelCallback nor ClearCancelCallback should affect the // callbacks_outstanding_ count since they are paired and both must precede // the invocation of Finish (if they are used at all) void SetCancelCallback(std::function callback) override { ctx_->SetCancelCallback(std::move(callback)); } void ClearCancelCallback() override { ctx_->ClearCancelCallback(); } experimental::RpcAllocatorState* GetRpcAllocatorState() override { return allocator_state_; } private: friend class CallbackUnaryHandler; ServerCallbackRpcControllerImpl( ::grpc_impl::ServerContext* ctx, Call* call, experimental::MessageHolder* allocator_state, std::function call_requester) : ctx_(ctx), call_(*call), allocator_state_(allocator_state), call_requester_(std::move(call_requester)) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); } const RequestType* request() { return allocator_state_->request(); } ResponseType* response() { return allocator_state_->response(); } void MaybeDone() { if (--callbacks_outstanding_ == 0) { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); allocator_state_->Release(); this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; ::grpc_impl::ServerContext* ctx_; Call call_; experimental::MessageHolder* const allocator_state_; std::function call_requester_; std::atomic_int callbacks_outstanding_{ 2}; // reserve for Finish and CompletionOp }; }; template class CallbackClientStreamingHandler : public MethodHandler { public: CallbackClientStreamingHandler( std::function< experimental::ServerReadReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a reader structure (that includes response) g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerReadReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerReadReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedReadReactor; } auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackReaderImpl))) ServerCallbackReaderImpl(param.server_context, param.call, std::move(param.call_requester), reactor); reader->BindReactor(reactor); reactor->OnStarted(param.server_context, reader->response()); // The earliest that OnCancel can be called is after OnStarted is done. reactor->MaybeCallOnCancel(); reader->MaybeDone(); } private: std::function*()> func_; class ServerCallbackReaderImpl : public experimental::ServerCallbackReader { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Read(RequestType* req) override { callbacks_outstanding_++; read_ops_.RecvMessage(req); call_.PerformOps(&read_ops_); } private: friend class CallbackClientStreamingHandler; ServerCallbackReaderImpl( ::grpc_impl::ServerContext* ctx, Call* call, std::function call_requester, experimental::ServerReadReactor* reactor) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); MaybeDone(); }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); } ~ServerCallbackReaderImpl() {} ResponseType* response() { return &resp_; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackReaderImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; ::grpc_impl::ServerContext* ctx_; Call call_; ResponseType resp_; std::function call_requester_; experimental::ServerReadReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; template class CallbackServerStreamingHandler : public MethodHandler { public: CallbackServerStreamingHandler( std::function< experimental::ServerWriteReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a writer structure g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerWriteReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerWriteReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedWriteReactor; } auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackWriterImpl))) ServerCallbackWriterImpl(param.server_context, param.call, static_cast(param.request), std::move(param.call_requester), reactor); writer->BindReactor(reactor); reactor->OnStarted(param.server_context, writer->request()); // The earliest that OnCancel can be called is after OnStarted is done. reactor->MaybeCallOnCancel(); writer->MaybeDone(); } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status, void** handler_data) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( call, sizeof(RequestType))) RequestType(); *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } request->~RequestType(); return nullptr; } private: std::function*()> func_; class ServerCallbackWriterImpl : public experimental::ServerCallbackWriter { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Write(const ResponseType* resp, WriteOptions options) override { callbacks_outstanding_++; if (options.is_last_message()) { options.set_buffer_hint(); } if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { write_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } void WriteAndFinish(const ResponseType* resp, WriteOptions options, Status s) override { // This combines the write into the finish callback // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } private: friend class CallbackServerStreamingHandler; ServerCallbackWriterImpl( ::grpc_impl::ServerContext* ctx, Call* call, const RequestType* req, std::function call_requester, experimental::ServerWriteReactor* reactor) : ctx_(ctx), call_(*call), req_(req), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); MaybeDone(); }, &write_ops_); write_ops_.set_core_cq_tag(&write_tag_); } ~ServerCallbackWriterImpl() { req_->~RequestType(); } const RequestType* request() { return req_; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackWriterImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; ::grpc_impl::ServerContext* ctx_; Call call_; const RequestType* req_; std::function call_requester_; experimental::ServerWriteReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; template class CallbackBidiHandler : public MethodHandler { public: CallbackBidiHandler( std::function< experimental::ServerBidiReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerBidiReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerBidiReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedBidiReactor; } auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) ServerCallbackReaderWriterImpl(param.server_context, param.call, std::move(param.call_requester), reactor); stream->BindReactor(reactor); reactor->OnStarted(param.server_context); // The earliest that OnCancel can be called is after OnStarted is done. reactor->MaybeCallOnCancel(); stream->MaybeDone(); } private: std::function*()> func_; class ServerCallbackReaderWriterImpl : public experimental::ServerCallbackReaderWriter { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Write(const ResponseType* resp, WriteOptions options) override { callbacks_outstanding_++; if (options.is_last_message()) { options.set_buffer_hint(); } if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { write_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } void WriteAndFinish(const ResponseType* resp, WriteOptions options, Status s) override { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } void Read(RequestType* req) override { callbacks_outstanding_++; read_ops_.RecvMessage(req); call_.PerformOps(&read_ops_); } private: friend class CallbackBidiHandler; ServerCallbackReaderWriterImpl( ::grpc_impl::ServerContext* ctx, Call* call, std::function call_requester, experimental::ServerBidiReactor* reactor) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); MaybeDone(); }, &write_ops_); write_ops_.set_core_cq_tag(&write_tag_); read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); MaybeDone(); }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); } ~ServerCallbackReaderWriterImpl() {} void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; ::grpc_impl::ServerContext* ctx_; Call call_; std::function call_requester_; experimental::ServerBidiReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; } // namespace internal } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H