diff --git a/include/grpcpp/impl/codegen/client_callback_impl.h b/include/grpcpp/impl/codegen/client_callback_impl.h index 3ac46b788ea..bed52fe6a80 100644 --- a/include/grpcpp/impl/codegen/client_callback_impl.h +++ b/include/grpcpp/impl/codegen/client_callback_impl.h @@ -468,28 +468,6 @@ class ClientCallbackReaderWriterImpl // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - // MaybeFinish can be called from reactions or from user-initiated operations - // like StartCall or RemoveHold. If this is the last operation or hold on this - // object, it will invoke the OnDone reaction. If MaybeFinish was called from - // a reaction, it can call OnDone directly. If not, it would need to schedule - // OnDone onto an executor thread to avoid the possibility of deadlocking with - // any locks in the user code that invoked it. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - ::grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackReaderWriterImpl(); - ::grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - void StartCall() override { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata @@ -641,6 +619,28 @@ class ClientCallbackReaderWriterImpl finish_ops_.set_core_cq_tag(&finish_tag_); } + // MaybeFinish can be called from reactions or from user-initiated operations + // like StartCall or RemoveHold. If this is the last operation or hold on this + // object, it will invoke the OnDone reaction. If MaybeFinish was called from + // a reaction, it can call OnDone directly. If not, it would need to schedule + // OnDone onto an executor thread to avoid the possibility of deadlocking with + // any locks in the user code that invoked it. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + ::grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderWriterImpl(); + ::grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + ::grpc_impl::ClientContext* const context_; grpc::internal::Call call_; ClientBidiReactor* const reactor_; @@ -718,23 +718,6 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - ::grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackReaderImpl(); - ::grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - void StartCall() override { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata @@ -812,6 +795,23 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { start_ops_.ClientSendClose(); } + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + ::grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackReaderImpl(); + ::grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + ::grpc_impl::ClientContext* const context_; grpc::internal::Call call_; ClientReadReactor* const reactor_; @@ -876,23 +876,6 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. - void MaybeFinish(bool from_reaction) { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - ::grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackWriterImpl(); - ::grpc::g_core_codegen_interface->grpc_call_unref(call); - if (GPR_LIKELY(from_reaction)) { - reactor->OnDone(s); - } else { - reactor->InternalScheduleOnDone(std::move(s)); - } - } - } - void StartCall() override { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata @@ -1027,6 +1010,23 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { finish_ops_.set_core_cq_tag(&finish_tag_); } + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + ::grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackWriterImpl(); + ::grpc::g_core_codegen_interface->grpc_call_unref(call); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } + } + } + ::grpc_impl::ClientContext* const context_; grpc::internal::Call call_; ClientWriteReactor* const reactor_; @@ -1125,21 +1125,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { call_.PerformOps(&finish_ops_); } - // In the unary case, MaybeFinish is only ever invoked from a - // library-initiated reaction, so it will just directly call OnDone if this is - // the last reaction for this RPC. - void MaybeFinish() { - if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( - 1, std::memory_order_acq_rel) == 1)) { - ::grpc::Status s = std::move(finish_status_); - auto* reactor = reactor_; - auto* call = call_.call(); - this->~ClientCallbackUnaryImpl(); - ::grpc::g_core_codegen_interface->grpc_call_unref(call); - reactor->OnDone(s); - } - } - private: friend class ClientCallbackUnaryFactory; @@ -1156,6 +1141,21 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { finish_ops_.AllowNoMessage(); } + // In the unary case, MaybeFinish is only ever invoked from a + // library-initiated reaction, so it will just directly call OnDone if this is + // the last reaction for this RPC. + void MaybeFinish() { + if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + ::grpc::Status s = std::move(finish_status_); + auto* reactor = reactor_; + auto* call = call_.call(); + this->~ClientCallbackUnaryImpl(); + ::grpc::g_core_codegen_interface->grpc_call_unref(call); + reactor->OnDone(s); + } + } + ::grpc_impl::ClientContext* const context_; grpc::internal::Call call_; ClientUnaryReactor* const reactor_;