From 7eebba37817d16ba7baeb9c2fb94984c25dec695 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 16 Jun 2020 12:30:01 -0700 Subject: [PATCH] Fixes needed to make roll-forward of StartCall and corking work --- BUILD | 1 + BUILD.gn | 1 + CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 2 + gRPC-C++.podspec | 1 + grpc.gyp | 2 + .../impl/codegen/client_callback_impl.h | 128 +++++++++++++----- src/cpp/client/client_callback.cc | 52 +++++++ .../end2end/client_callback_end2end_test.cc | 8 +- tools/doxygen/Doxyfile.c++.internal | 1 + 11 files changed, 162 insertions(+), 38 deletions(-) create mode 100644 src/cpp/client/client_callback.cc diff --git a/BUILD b/BUILD index 0d2a7ca058c..99750d060cc 100644 --- a/BUILD +++ b/BUILD @@ -124,6 +124,7 @@ GRPC_SECURE_PUBLIC_HDRS = [ # TODO(ctiller): layer grpc atop grpc_unsecure, layer grpc++ atop grpc++_unsecure GRPCXX_SRCS = [ "src/cpp/client/channel_cc.cc", + "src/cpp/client/client_callback.cc", "src/cpp/client/client_context.cc", "src/cpp/client/client_interceptor.cc", "src/cpp/client/create_channel.cc", diff --git a/BUILD.gn b/BUILD.gn index a16d298f1be..378250dd2c0 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -1220,6 +1220,7 @@ config("grpc_config") { "include/grpcpp/support/time.h", "include/grpcpp/support/validate_service_config.h", "src/cpp/client/channel_cc.cc", + "src/cpp/client/client_callback.cc", "src/cpp/client/client_context.cc", "src/cpp/client/client_interceptor.cc", "src/cpp/client/create_channel.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 158b3c03f34..4e0c695a5bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2482,6 +2482,7 @@ endif() add_library(grpc++ src/cpp/client/channel_cc.cc + src/cpp/client/client_callback.cc src/cpp/client/client_context.cc src/cpp/client/client_interceptor.cc src/cpp/client/create_channel.cc @@ -3183,6 +3184,7 @@ endif() add_library(grpc++_unsecure src/cpp/client/channel_cc.cc + src/cpp/client/client_callback.cc src/cpp/client/client_context.cc src/cpp/client/client_interceptor.cc src/cpp/client/create_channel.cc diff --git a/Makefile b/Makefile index 80d3be9e53a..83c7600a698 100644 --- a/Makefile +++ b/Makefile @@ -4717,6 +4717,7 @@ $(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/helpers.o: $(GENDIR)/src/proto/grpc LIBGRPC++_SRC = \ src/cpp/client/channel_cc.cc \ + src/cpp/client/client_callback.cc \ src/cpp/client/client_context.cc \ src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ @@ -5423,6 +5424,7 @@ endif LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/channel_cc.cc \ + src/cpp/client/client_callback.cc \ src/cpp/client/client_context.cc \ src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 9eb53b8e576..5f09c7b3e73 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -2203,6 +2203,7 @@ libs: - src/cpp/thread_manager/thread_manager.h src: - src/cpp/client/channel_cc.cc + - src/cpp/client/client_callback.cc - src/cpp/client/client_context.cc - src/cpp/client/client_interceptor.cc - src/cpp/client/create_channel.cc @@ -2592,6 +2593,7 @@ libs: - src/cpp/thread_manager/thread_manager.h src: - src/cpp/client/channel_cc.cc + - src/cpp/client/client_callback.cc - src/cpp/client/client_context.cc - src/cpp/client/client_interceptor.cc - src/cpp/client/create_channel.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index d8f404abc1f..8851119c07f 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -616,6 +616,7 @@ Pod::Spec.new do |s| 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security_interface.h', 'src/cpp/client/channel_cc.cc', + 'src/cpp/client/client_callback.cc', 'src/cpp/client/client_context.cc', 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', diff --git a/grpc.gyp b/grpc.gyp index b80efd83534..4022227f5e3 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1313,6 +1313,7 @@ ], 'sources': [ 'src/cpp/client/channel_cc.cc', + 'src/cpp/client/client_callback.cc', 'src/cpp/client/client_context.cc', 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', @@ -1464,6 +1465,7 @@ ], 'sources': [ 'src/cpp/client/channel_cc.cc', + 'src/cpp/client/client_callback.cc', 'src/cpp/client/client_context.cc', 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', diff --git a/include/grpcpp/impl/codegen/client_callback_impl.h b/include/grpcpp/impl/codegen/client_callback_impl.h index 86a46d8c870..3ac46b788ea 100644 --- a/include/grpcpp/impl/codegen/client_callback_impl.h +++ b/include/grpcpp/impl/codegen/client_callback_impl.h @@ -101,6 +101,29 @@ class CallbackUnaryCallImpl { call.PerformOps(ops); } }; + +// Base class for public API classes. +class ClientReactor { + public: + /// Called by the library when all operations associated with this RPC have + /// completed and all Holds have been removed. OnDone provides the RPC status + /// outcome for both successful and failed RPCs. If it is never called on an + /// RPC, it indicates an application-level problem (like failure to remove a + /// hold). + /// + /// \param[in] s The status outcome of this RPC + virtual void OnDone(const ::grpc::Status& /*s*/) = 0; + + /// InternalScheduleOnDone is not part of the API and is not meant to be + /// overridden. It is virtual to allow successful builds for certain bazel + /// build users that only want to depend on gRPC codegen headers and not the + /// full library (although this is not a generally-supported option). Although + /// the virtual call is slower than a direct call, this function is + /// heavyweight and the cost of the virtual call is not much in comparison. + /// This function may be removed or devirtualized in the future. + virtual void InternalScheduleOnDone(::grpc::Status s); +}; + } // namespace internal // Forward declarations @@ -189,7 +212,7 @@ class ClientCallbackUnary { /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. template -class ClientBidiReactor { +class ClientBidiReactor : public internal::ClientReactor { public: virtual ~ClientBidiReactor() {} @@ -282,7 +305,7 @@ class ClientBidiReactor { /// (like failure to remove a hold). /// /// \param[in] s The status outcome of this RPC - virtual void OnDone(const ::grpc::Status& /*s*/) {} + void OnDone(const ::grpc::Status& /*s*/) override {} /// Notifies the application that a read of initial metadata from the /// server is done. If the application chooses not to implement this method, @@ -327,7 +350,7 @@ class ClientBidiReactor { /// \a ClientReadReactor is the interface for a server-streaming RPC. /// All public methods behave as in ClientBidiReactor. template -class ClientReadReactor { +class ClientReadReactor : public internal::ClientReactor { public: virtual ~ClientReadReactor() {} @@ -341,7 +364,7 @@ class ClientReadReactor { } void RemoveHold() { reader_->RemoveHold(); } - virtual void OnDone(const ::grpc::Status& /*s*/) {} + void OnDone(const ::grpc::Status& /*s*/) override {} virtual void OnReadInitialMetadataDone(bool /*ok*/) {} virtual void OnReadDone(bool /*ok*/) {} @@ -354,7 +377,7 @@ class ClientReadReactor { /// \a ClientWriteReactor is the interface for a client-streaming RPC. /// All public methods behave as in ClientBidiReactor. template -class ClientWriteReactor { +class ClientWriteReactor : public internal::ClientReactor { public: virtual ~ClientWriteReactor() {} @@ -377,7 +400,7 @@ class ClientWriteReactor { } void RemoveHold() { writer_->RemoveHold(); } - virtual void OnDone(const ::grpc::Status& /*s*/) {} + void OnDone(const ::grpc::Status& /*s*/) override {} virtual void OnReadInitialMetadataDone(bool /*ok*/) {} virtual void OnWriteDone(bool /*ok*/) {} virtual void OnWritesDoneDone(bool /*ok*/) {} @@ -385,6 +408,7 @@ class ClientWriteReactor { private: friend class ClientCallbackWriter; void BindWriter(ClientCallbackWriter* writer) { writer_ = writer; } + ClientCallbackWriter* writer_; }; @@ -399,12 +423,12 @@ class ClientWriteReactor { /// call (that is part of the unary call itself) and there is no reactor object /// being created as a result of this call, we keep a consistent 2-phase /// initiation API among all the reactor flavors. -class ClientUnaryReactor { +class ClientUnaryReactor : public internal::ClientReactor { public: virtual ~ClientUnaryReactor() {} void StartCall() { call_->StartCall(); } - virtual void OnDone(const ::grpc::Status& /*s*/) {} + void OnDone(const ::grpc::Status& /*s*/) override {} virtual void OnReadInitialMetadataDone(bool /*ok*/) {} private: @@ -444,7 +468,13 @@ class ClientCallbackReaderWriterImpl // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - void MaybeFinish() { + // MaybeFinish can be called from reactions or from user-initiated operations + // like StartCall or RemoveHold. If this is the last operation or hold on this + // object, it will invoke the OnDone reaction. If MaybeFinish was called from + // a reaction, it can call OnDone directly. If not, it would need to schedule + // OnDone onto an executor thread to avoid the possibility of deadlocking with + // any locks in the user code that invoked it. + void MaybeFinish(bool from_reaction) { if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( 1, std::memory_order_acq_rel) == 1)) { ::grpc::Status s = std::move(finish_status_); @@ -452,7 +482,11 @@ class ClientCallbackReaderWriterImpl auto* call = call_.call(); this->~ClientCallbackReaderWriterImpl(); ::grpc::g_core_codegen_interface->grpc_call_unref(call); - reactor->OnDone(s); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } } } @@ -489,7 +523,7 @@ class ClientCallbackReaderWriterImpl // MaybeFinish outside the lock to make sure that destruction of this object // doesn't take place while holding the lock (which would cause the lock to // be released after destruction) - this->MaybeFinish(); + this->MaybeFinish(/*from_reaction=*/false); } void Read(Response* msg) override { @@ -533,7 +567,7 @@ class ClientCallbackReaderWriterImpl writes_done_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWritesDoneDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); @@ -556,7 +590,7 @@ class ClientCallbackReaderWriterImpl void AddHold(int holds) override { callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); } - void RemoveHold() override { MaybeFinish(); } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } private: friend class ClientCallbackReaderWriterFactory; @@ -575,7 +609,7 @@ class ClientCallbackReaderWriterImpl start_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &start_ops_, /*can_inline=*/false); start_ops_.RecvInitialMetadata(context_); @@ -584,7 +618,7 @@ class ClientCallbackReaderWriterImpl write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); @@ -592,15 +626,17 @@ class ClientCallbackReaderWriterImpl read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); // Also set up the Finish tag and op set. - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, - /*can_inline=*/false); + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); } @@ -682,7 +718,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - void MaybeFinish() { + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( 1, std::memory_order_acq_rel) == 1)) { ::grpc::Status s = std::move(finish_status_); @@ -690,7 +727,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { auto* call = call_.call(); this->~ClientCallbackReaderImpl(); ::grpc::g_core_codegen_interface->grpc_call_unref(call); - reactor->OnDone(s); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } } } @@ -703,7 +744,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { start_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &start_ops_, /*can_inline=*/false); start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, @@ -716,7 +757,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { read_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); @@ -729,8 +770,10 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { started_.store(true, std::memory_order_release); } - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, /*can_inline=*/false); + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, /*can_inline=*/false); finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); @@ -752,7 +795,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { void AddHold(int holds) override { callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); } - void RemoveHold() override { MaybeFinish(); } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } private: friend class ClientCallbackReaderFactory; @@ -833,7 +876,8 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // there are no tests catching the compiler warning. static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } - void MaybeFinish() { + // MaybeFinish behaves as in ClientCallbackReaderWriterImpl. + void MaybeFinish(bool from_reaction) { if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( 1, std::memory_order_acq_rel) == 1)) { ::grpc::Status s = std::move(finish_status_); @@ -841,7 +885,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { auto* call = call_.call(); this->~ClientCallbackWriterImpl(); ::grpc::g_core_codegen_interface->grpc_call_unref(call); - reactor->OnDone(s); + if (GPR_LIKELY(from_reaction)) { + reactor->OnDone(s); + } else { + reactor->InternalScheduleOnDone(std::move(s)); + } } } @@ -874,7 +922,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // MaybeFinish outside the lock to make sure that destruction of this object // doesn't take place while holding the lock (which would cause the lock to // be released after destruction) - this->MaybeFinish(); + this->MaybeFinish(/*from_reaction=*/false); } void Write(const Request* msg, ::grpc::WriteOptions options) override { @@ -907,7 +955,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { writes_done_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWritesDoneDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); @@ -932,7 +980,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { void AddHold(int holds) override { callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); } - void RemoveHold() override { MaybeFinish(); } + void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } private: friend class ClientCallbackWriterFactory; @@ -953,7 +1001,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { start_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &start_ops_, /*can_inline=*/false); start_ops_.RecvInitialMetadata(context_); @@ -962,7 +1010,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { write_tag_.Set(call_.call(), [this](bool ok) { reactor_->OnWriteDone(ok); - MaybeFinish(); + MaybeFinish(/*from_reaction=*/true); }, &write_ops_, /*can_inline=*/false); write_ops_.set_core_cq_tag(&write_tag_); @@ -970,9 +1018,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // Also set up the Finish tag and op set. finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, - /*can_inline=*/false); + finish_tag_.Set( + call_.call(), + [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, + &finish_ops_, + /*can_inline=*/false); finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); } @@ -1068,12 +1118,16 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { call_.PerformOps(&start_ops_); finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, /*can_inline=*/false); + &finish_ops_, + /*can_inline=*/false); finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_ops_); } + // In the unary case, MaybeFinish is only ever invoked from a + // library-initiated reaction, so it will just directly call OnDone if this is + // the last reaction for this RPC. void MaybeFinish() { if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( 1, std::memory_order_acq_rel) == 1)) { diff --git a/src/cpp/client/client_callback.cc b/src/cpp/client/client_callback.cc new file mode 100644 index 00000000000..ca325ba1dbf --- /dev/null +++ b/src/cpp/client/client_callback.cc @@ -0,0 +1,52 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" + +namespace grpc_impl { +namespace internal { + +void ClientReactor::InternalScheduleOnDone(grpc::Status s) { + // Unlike other uses of closure, do not Ref or Unref here since the reactor + // object's lifetime is controlled by user code. + grpc_core::ExecCtx exec_ctx; + struct ClosureWithArg { + grpc_closure closure; + ClientReactor* const reactor; + const grpc::Status status; + ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s) + : reactor(reactor_arg), status(std::move(s)) { + GRPC_CLOSURE_INIT(&closure, + [](void* void_arg, grpc_error*) { + ClosureWithArg* arg = + static_cast(void_arg); + arg->reactor->OnDone(arg->status); + delete arg; + }, + this, grpc_schedule_on_exec_ctx); + } + }; + ClosureWithArg* arg = new ClosureWithArg(this, std::move(s)); + grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE); +} + +} // namespace internal +} // namespace grpc_impl diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index cd3c14881bd..f9c2dbaca30 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -1478,6 +1478,12 @@ TEST_P(ClientCallbackEnd2endTest, done_cv_.wait(l); } } + // RemoveHold under the same lock used for OnDone to make sure that we don't + // call OnDone directly or indirectly from the RemoveHold function. + void RemoveHoldUnderLock() { + std::unique_lock l(mu_); + RemoveHold(); + } const Status& status() { std::unique_lock l(mu_); return status_; @@ -1522,7 +1528,7 @@ TEST_P(ClientCallbackEnd2endTest, ++reads_complete; } } - client.RemoveHold(); + client.RemoveHoldUnderLock(); client.Await(); EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 1867476c484..e69d345dfbb 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1920,6 +1920,7 @@ src/core/tsi/transport_security_grpc.h \ src/core/tsi/transport_security_interface.h \ src/cpp/README.md \ src/cpp/client/channel_cc.cc \ +src/cpp/client/client_callback.cc \ src/cpp/client/client_context.cc \ src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \