/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #include #include #include #include #include #include #include #include #include #include #include namespace grpc { // Declare base class of all reactors as internal namespace internal { class ServerReactor { public: virtual ~ServerReactor() = default; virtual void OnDone() {} virtual void OnCancel() {} }; } // namespace internal namespace experimental { // Forward declarations template class ServerReadReactor; template class ServerWriteReactor; template class ServerBidiReactor; // For unary RPCs, the exposed controller class is only an interface // and the actual implementation is an internal class. class ServerCallbackRpcController { public: virtual ~ServerCallbackRpcController() = default; // The method handler must call this function when it is done so that // the library knows to free its resources virtual void Finish(Status s) = 0; // Allow the method handler to push out the initial metadata before // the response and status are ready virtual void SendInitialMetadata(std::function) = 0; /// SetCancelCallback passes in a callback to be called when the RPC is /// canceled for whatever reason (streaming calls have OnCancel instead). This /// is an advanced and uncommon use with several important restrictions. (This /// function may be called multiple times on the same RPC but only that last /// registered callback is actually used.) /// /// If code calls SetCancelCallback on an RPC, it must also call /// ClearCancelCallback before calling Finish on the RPC controller. That /// method makes sure that no cancellation callback is executed for this RPC /// beyond the point of its return. ClearCancelCallback may be called even if /// SetCancelCallback was not called for this RPC, and it may be called /// multiple times. It _must_ be called if SetCancelCallback was called for /// this RPC. /// /// The callback should generally be lightweight and nonblocking and primarily /// concerned with clearing application state related to the RPC or causing /// operations (such as cancellations) to happen on dependent RPCs. /// /// If the RPC is already canceled at the time that SetCancelCallback is /// called, the callback is invoked immediately. /// /// The cancellation callback may be executed concurrently with the method /// handler that invokes it but will certainly not issue or execute after the /// return of ClearCancelCallback. /// /// To preserve the orderings described above, the callback may be called /// under a lock that is also used for ClearCancelCallback and /// ServerContext::IsCancelled, so the callback CANNOT call either of those /// operations on this RPC or any other function that causes those operations /// to be called before the callback completes. virtual void SetCancelCallback(std::function callback) = 0; virtual void ClearCancelCallback() = 0; }; // NOTE: The actual streaming object classes are provided // as API only to support mocking. There are no implementations of // these class interfaces in the API. template class ServerCallbackReader { public: virtual ~ServerCallbackReader() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; protected: template void BindReactor(ServerReadReactor* reactor) { reactor->BindReader(this); } }; template class ServerCallbackWriter { public: virtual ~ServerCallbackWriter() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Write(const Response* msg, WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, WriteOptions options, Status s) { // Default implementation that can/should be overridden Write(msg, std::move(options)); Finish(std::move(s)); } protected: template void BindReactor(ServerWriteReactor* reactor) { reactor->BindWriter(this); } }; template class ServerCallbackReaderWriter { public: virtual ~ServerCallbackReaderWriter() {} virtual void Finish(Status s) = 0; virtual void SendInitialMetadata() = 0; virtual void Read(Request* msg) = 0; virtual void Write(const Response* msg, WriteOptions options) = 0; virtual void WriteAndFinish(const Response* msg, WriteOptions options, Status s) { // Default implementation that can/should be overridden Write(msg, std::move(options)); Finish(std::move(s)); } protected: void BindReactor(ServerBidiReactor* reactor) { reactor->BindStream(this); } }; // The following classes are reactors that are to be implemented // by the user, returned as the result of the method handler for // a callback method, and activated by the call to OnStarted template class ServerBidiReactor : public internal::ServerReactor { public: ~ServerBidiReactor() = default; virtual void OnStarted(ServerContext*) {} virtual void OnSendInitialMetadataDone(bool ok) {} virtual void OnReadDone(bool ok) {} virtual void OnWriteDone(bool ok) {} void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } void StartRead(Request* msg) { stream_->Read(msg); } void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } void StartWrite(const Response* msg, WriteOptions options) { stream_->Write(msg, std::move(options)); } void StartWriteAndFinish(const Response* msg, WriteOptions options, Status s) { stream_->WriteAndFinish(msg, std::move(options), std::move(s)); } void StartWriteLast(const Response* msg, WriteOptions options) { StartWrite(msg, std::move(options.set_last_message())); } void Finish(Status s) { stream_->Finish(std::move(s)); } private: friend class ServerCallbackReaderWriter; void BindStream(ServerCallbackReaderWriter* stream) { stream_ = stream; } ServerCallbackReaderWriter* stream_; }; template class ServerReadReactor : public internal::ServerReactor { public: ~ServerReadReactor() = default; virtual void OnStarted(ServerContext*, Response* resp) {} virtual void OnSendInitialMetadataDone(bool ok) {} virtual void OnReadDone(bool ok) {} void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } void StartRead(Request* msg) { reader_->Read(msg); } void Finish(Status s) { reader_->Finish(std::move(s)); } private: friend class ServerCallbackReader; void BindReader(ServerCallbackReader* reader) { reader_ = reader; } ServerCallbackReader* reader_; }; template class ServerWriteReactor : public internal::ServerReactor { public: ~ServerWriteReactor() = default; virtual void OnStarted(ServerContext*, const Request* req) {} virtual void OnSendInitialMetadataDone(bool ok) {} virtual void OnWriteDone(bool ok) {} void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } void StartWrite(const Response* msg, WriteOptions options) { writer_->Write(msg, std::move(options)); } void StartWriteAndFinish(const Response* msg, WriteOptions options, Status s) { writer_->WriteAndFinish(msg, std::move(options), std::move(s)); } void StartWriteLast(const Response* msg, WriteOptions options) { StartWrite(msg, std::move(options.set_last_message())); } void Finish(Status s) { writer_->Finish(std::move(s)); } private: friend class ServerCallbackWriter; void BindWriter(ServerCallbackWriter* writer) { writer_ = writer; } ServerCallbackWriter* writer_; }; } // namespace experimental namespace internal { template class UnimplementedReadReactor : public experimental::ServerReadReactor { public: void OnDone() override { delete this; } void OnStarted(ServerContext*, Response*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class UnimplementedWriteReactor : public experimental::ServerWriteReactor { public: void OnDone() override { delete this; } void OnStarted(ServerContext*, const Request*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class UnimplementedBidiReactor : public experimental::ServerBidiReactor { public: void OnDone() override { delete this; } void OnStarted(ServerContext*) override { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } }; template class CallbackUnaryHandler : public MethodHandler { public: CallbackUnaryHandler( std::function func) : func_(func) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) g_core_codegen_interface->grpc_call_ref(param.call->call()); auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) ServerCallbackRpcControllerImpl( param.server_context, param.call, static_cast(param.request), std::move(param.call_requester)); Status status = param.status; if (status.ok()) { // Call the actual function handler and expect the user to call finish CatchingCallback(func_, param.server_context, controller->request(), controller->response(), controller); } else { // if deserialization failed, we need to fail the call controller->Finish(status); } } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( call, sizeof(RequestType))) RequestType(); *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } request->~RequestType(); return nullptr; } private: std::function func_; // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows // it to take advantage of CallbackUnaryHandler's friendships. class ServerCallbackRpcControllerImpl : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); } void SendInitialMetadata(std::function f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14 // and if performance of this operation matters meta_tag_.Set(call_.call(), [this, f](bool ok) { f(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } // Neither SetCancelCallback nor ClearCancelCallback should affect the // callbacks_outstanding_ count since they are paired and both must precede // the invocation of Finish (if they are used at all) void SetCancelCallback(std::function callback) override { ctx_->SetCancelCallback(std::move(callback)); } void ClearCancelCallback() override { ctx_->ClearCancelCallback(); } private: friend class CallbackUnaryHandler; ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, const RequestType* req, std::function call_requester) : ctx_(ctx), call_(*call), req_(req), call_requester_(std::move(call_requester)) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); } ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } const RequestType* request() { return req_; } ResponseType* response() { return &resp_; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; ServerContext* ctx_; Call call_; const RequestType* req_; ResponseType resp_; std::function call_requester_; std::atomic_int callbacks_outstanding_{ 2}; // reserve for Finish and CompletionOp }; }; template class CallbackClientStreamingHandler : public MethodHandler { public: CallbackClientStreamingHandler( std::function< experimental::ServerReadReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a reader structure (that includes response) g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerReadReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerReadReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedReadReactor; } auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackReaderImpl))) ServerCallbackReaderImpl(param.server_context, param.call, std::move(param.call_requester), reactor); reader->BindReactor(reactor); reactor->OnStarted(param.server_context, reader->response()); reader->MaybeDone(); } private: std::function*()> func_; class ServerCallbackReaderImpl : public experimental::ServerCallbackReader { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Read(RequestType* req) override { callbacks_outstanding_++; read_ops_.RecvMessage(req); call_.PerformOps(&read_ops_); } private: friend class CallbackClientStreamingHandler; ServerCallbackReaderImpl( ServerContext* ctx, Call* call, std::function call_requester, experimental::ServerReadReactor* reactor) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); MaybeDone(); }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); } ~ServerCallbackReaderImpl() {} ResponseType* response() { return &resp_; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackReaderImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; ServerContext* ctx_; Call call_; ResponseType resp_; std::function call_requester_; experimental::ServerReadReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; template class CallbackServerStreamingHandler : public MethodHandler { public: CallbackServerStreamingHandler( std::function< experimental::ServerWriteReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a writer structure g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerWriteReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerWriteReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedWriteReactor; } auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackWriterImpl))) ServerCallbackWriterImpl(param.server_context, param.call, static_cast(param.request), std::move(param.call_requester), reactor); writer->BindReactor(reactor); reactor->OnStarted(param.server_context, writer->request()); writer->MaybeDone(); } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( call, sizeof(RequestType))) RequestType(); *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } request->~RequestType(); return nullptr; } private: std::function*()> func_; class ServerCallbackWriterImpl : public experimental::ServerCallbackWriter { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Write(const ResponseType* resp, WriteOptions options) override { callbacks_outstanding_++; if (options.is_last_message()) { options.set_buffer_hint(); } if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { write_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } void WriteAndFinish(const ResponseType* resp, WriteOptions options, Status s) override { // This combines the write into the finish callback // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } private: friend class CallbackServerStreamingHandler; ServerCallbackWriterImpl( ServerContext* ctx, Call* call, const RequestType* req, std::function call_requester, experimental::ServerWriteReactor* reactor) : ctx_(ctx), call_(*call), req_(req), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); MaybeDone(); }, &write_ops_); write_ops_.set_core_cq_tag(&write_tag_); } ~ServerCallbackWriterImpl() { req_->~RequestType(); } const RequestType* request() { return req_; } void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackWriterImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; ServerContext* ctx_; Call call_; const RequestType* req_; std::function call_requester_; experimental::ServerWriteReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; template class CallbackBidiHandler : public MethodHandler { public: CallbackBidiHandler( std::function< experimental::ServerBidiReactor*()> func) : func_(std::move(func)) {} void RunHandler(const HandlerParameter& param) final { g_core_codegen_interface->grpc_call_ref(param.call->call()); experimental::ServerBidiReactor* reactor = param.status.ok() ? CatchingReactorCreator< experimental::ServerBidiReactor>( func_) : nullptr; if (reactor == nullptr) { // if deserialization or reactor creator failed, we need to fail the call reactor = new UnimplementedBidiReactor; } auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) ServerCallbackReaderWriterImpl(param.server_context, param.call, std::move(param.call_requester), reactor); stream->BindReactor(reactor); reactor->OnStarted(param.server_context); stream->MaybeDone(); } private: std::function*()> func_; class ServerCallbackReaderWriterImpl : public experimental::ServerCallbackReaderWriter { public: void Finish(Status s) override { finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_); finish_ops_.set_core_cq_tag(&finish_tag_); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); call_.PerformOps(&finish_ops_); } void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); callbacks_outstanding_++; meta_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnSendInitialMetadataDone(ok); MaybeDone(); }, &meta_ops_); meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_ops_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_ops_); } void Write(const ResponseType* resp, WriteOptions options) override { callbacks_outstanding_++; if (options.is_last_message()) { options.set_buffer_hint(); } if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { write_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } void WriteAndFinish(const ResponseType* resp, WriteOptions options, Status s) override { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } void Read(RequestType* req) override { callbacks_outstanding_++; read_ops_.RecvMessage(req); call_.PerformOps(&read_ops_); } private: friend class CallbackBidiHandler; ServerCallbackReaderWriterImpl( ServerContext* ctx, Call* call, std::function call_requester, experimental::ServerBidiReactor* reactor) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)), reactor_(reactor) { ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); MaybeDone(); }, &write_ops_); write_ops_.set_core_cq_tag(&write_tag_); read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); MaybeDone(); }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); } ~ServerCallbackReaderWriterImpl() {} void MaybeDone() { if (--callbacks_outstanding_ == 0) { reactor_->OnDone(); grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); } } CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_ops_; CallbackWithSuccessTag finish_tag_; CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; ServerContext* ctx_; Call call_; std::function call_requester_; experimental::ServerBidiReactor* reactor_; std::atomic_int callbacks_outstanding_{ 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; } // namespace internal } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H