|
|
|
@ -101,6 +101,29 @@ class CallbackUnaryCallImpl { |
|
|
|
|
call.PerformOps(ops); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Base class for public API classes.
|
|
|
|
|
class ClientReactor { |
|
|
|
|
public: |
|
|
|
|
/// Called by the library when all operations associated with this RPC have
|
|
|
|
|
/// completed and all Holds have been removed. OnDone provides the RPC status
|
|
|
|
|
/// outcome for both successful and failed RPCs. If it is never called on an
|
|
|
|
|
/// RPC, it indicates an application-level problem (like failure to remove a
|
|
|
|
|
/// hold).
|
|
|
|
|
///
|
|
|
|
|
/// \param[in] s The status outcome of this RPC
|
|
|
|
|
virtual void OnDone(const ::grpc::Status& /*s*/) = 0; |
|
|
|
|
|
|
|
|
|
/// InternalScheduleOnDone is not part of the API and is not meant to be
|
|
|
|
|
/// overridden. It is virtual to allow successful builds for certain bazel
|
|
|
|
|
/// build users that only want to depend on gRPC codegen headers and not the
|
|
|
|
|
/// full library (although this is not a generally-supported option). Although
|
|
|
|
|
/// the virtual call is slower than a direct call, this function is
|
|
|
|
|
/// heavyweight and the cost of the virtual call is not much in comparison.
|
|
|
|
|
/// This function may be removed or devirtualized in the future.
|
|
|
|
|
virtual void InternalScheduleOnDone(::grpc::Status s); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace internal
|
|
|
|
|
|
|
|
|
|
// Forward declarations
|
|
|
|
@ -189,7 +212,7 @@ class ClientCallbackUnary { |
|
|
|
|
|
|
|
|
|
/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
|
|
|
|
|
template <class Request, class Response> |
|
|
|
|
class ClientBidiReactor { |
|
|
|
|
class ClientBidiReactor : public internal::ClientReactor { |
|
|
|
|
public: |
|
|
|
|
virtual ~ClientBidiReactor() {} |
|
|
|
|
|
|
|
|
@ -282,7 +305,7 @@ class ClientBidiReactor { |
|
|
|
|
/// (like failure to remove a hold).
|
|
|
|
|
///
|
|
|
|
|
/// \param[in] s The status outcome of this RPC
|
|
|
|
|
virtual void OnDone(const ::grpc::Status& /*s*/) {} |
|
|
|
|
void OnDone(const ::grpc::Status& /*s*/) override {} |
|
|
|
|
|
|
|
|
|
/// Notifies the application that a read of initial metadata from the
|
|
|
|
|
/// server is done. If the application chooses not to implement this method,
|
|
|
|
@ -327,7 +350,7 @@ class ClientBidiReactor { |
|
|
|
|
/// \a ClientReadReactor is the interface for a server-streaming RPC.
|
|
|
|
|
/// All public methods behave as in ClientBidiReactor.
|
|
|
|
|
template <class Response> |
|
|
|
|
class ClientReadReactor { |
|
|
|
|
class ClientReadReactor : public internal::ClientReactor { |
|
|
|
|
public: |
|
|
|
|
virtual ~ClientReadReactor() {} |
|
|
|
|
|
|
|
|
@ -341,7 +364,7 @@ class ClientReadReactor { |
|
|
|
|
} |
|
|
|
|
void RemoveHold() { reader_->RemoveHold(); } |
|
|
|
|
|
|
|
|
|
virtual void OnDone(const ::grpc::Status& /*s*/) {} |
|
|
|
|
void OnDone(const ::grpc::Status& /*s*/) override {} |
|
|
|
|
virtual void OnReadInitialMetadataDone(bool /*ok*/) {} |
|
|
|
|
virtual void OnReadDone(bool /*ok*/) {} |
|
|
|
|
|
|
|
|
@ -354,7 +377,7 @@ class ClientReadReactor { |
|
|
|
|
/// \a ClientWriteReactor is the interface for a client-streaming RPC.
|
|
|
|
|
/// All public methods behave as in ClientBidiReactor.
|
|
|
|
|
template <class Request> |
|
|
|
|
class ClientWriteReactor { |
|
|
|
|
class ClientWriteReactor : public internal::ClientReactor { |
|
|
|
|
public: |
|
|
|
|
virtual ~ClientWriteReactor() {} |
|
|
|
|
|
|
|
|
@ -377,7 +400,7 @@ class ClientWriteReactor { |
|
|
|
|
} |
|
|
|
|
void RemoveHold() { writer_->RemoveHold(); } |
|
|
|
|
|
|
|
|
|
virtual void OnDone(const ::grpc::Status& /*s*/) {} |
|
|
|
|
void OnDone(const ::grpc::Status& /*s*/) override {} |
|
|
|
|
virtual void OnReadInitialMetadataDone(bool /*ok*/) {} |
|
|
|
|
virtual void OnWriteDone(bool /*ok*/) {} |
|
|
|
|
virtual void OnWritesDoneDone(bool /*ok*/) {} |
|
|
|
@ -385,6 +408,7 @@ class ClientWriteReactor { |
|
|
|
|
private: |
|
|
|
|
friend class ClientCallbackWriter<Request>; |
|
|
|
|
void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; } |
|
|
|
|
|
|
|
|
|
ClientCallbackWriter<Request>* writer_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -399,12 +423,12 @@ class ClientWriteReactor { |
|
|
|
|
/// call (that is part of the unary call itself) and there is no reactor object
|
|
|
|
|
/// being created as a result of this call, we keep a consistent 2-phase
|
|
|
|
|
/// initiation API among all the reactor flavors.
|
|
|
|
|
class ClientUnaryReactor { |
|
|
|
|
class ClientUnaryReactor : public internal::ClientReactor { |
|
|
|
|
public: |
|
|
|
|
virtual ~ClientUnaryReactor() {} |
|
|
|
|
|
|
|
|
|
void StartCall() { call_->StartCall(); } |
|
|
|
|
virtual void OnDone(const ::grpc::Status& /*s*/) {} |
|
|
|
|
void OnDone(const ::grpc::Status& /*s*/) override {} |
|
|
|
|
virtual void OnReadInitialMetadataDone(bool /*ok*/) {} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -444,7 +468,13 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void MaybeFinish() { |
|
|
|
|
// MaybeFinish can be called from reactions or from user-initiated operations
|
|
|
|
|
// like StartCall or RemoveHold. If this is the last operation or hold on this
|
|
|
|
|
// object, it will invoke the OnDone reaction. If MaybeFinish was called from
|
|
|
|
|
// a reaction, it can call OnDone directly. If not, it would need to schedule
|
|
|
|
|
// OnDone onto an executor thread to avoid the possibility of deadlocking with
|
|
|
|
|
// any locks in the user code that invoked it.
|
|
|
|
|
void MaybeFinish(bool from_reaction) { |
|
|
|
|
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( |
|
|
|
|
1, std::memory_order_acq_rel) == 1)) { |
|
|
|
|
::grpc::Status s = std::move(finish_status_); |
|
|
|
@ -452,7 +482,11 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
auto* call = call_.call(); |
|
|
|
|
this->~ClientCallbackReaderWriterImpl(); |
|
|
|
|
::grpc::g_core_codegen_interface->grpc_call_unref(call); |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
if (GPR_LIKELY(from_reaction)) { |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
} else { |
|
|
|
|
reactor->InternalScheduleOnDone(std::move(s)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -489,7 +523,7 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
// MaybeFinish outside the lock to make sure that destruction of this object
|
|
|
|
|
// doesn't take place while holding the lock (which would cause the lock to
|
|
|
|
|
// be released after destruction)
|
|
|
|
|
this->MaybeFinish(); |
|
|
|
|
this->MaybeFinish(/*from_reaction=*/false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(Response* msg) override { |
|
|
|
@ -533,7 +567,7 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
writes_done_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnWritesDoneDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&writes_done_ops_, /*can_inline=*/false); |
|
|
|
|
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); |
|
|
|
@ -556,7 +590,7 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
void AddHold(int holds) override { |
|
|
|
|
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); |
|
|
|
|
} |
|
|
|
|
void RemoveHold() override { MaybeFinish(); } |
|
|
|
|
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
friend class ClientCallbackReaderWriterFactory<Request, Response>; |
|
|
|
@ -575,7 +609,7 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
start_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnReadInitialMetadataDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&start_ops_, /*can_inline=*/false); |
|
|
|
|
start_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -584,7 +618,7 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
write_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnWriteDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&write_ops_, /*can_inline=*/false); |
|
|
|
|
write_ops_.set_core_cq_tag(&write_tag_); |
|
|
|
@ -592,15 +626,17 @@ class ClientCallbackReaderWriterImpl |
|
|
|
|
read_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnReadDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&read_ops_, /*can_inline=*/false); |
|
|
|
|
read_ops_.set_core_cq_tag(&read_tag_); |
|
|
|
|
|
|
|
|
|
// Also set up the Finish tag and op set.
|
|
|
|
|
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, |
|
|
|
|
&finish_ops_, |
|
|
|
|
/*can_inline=*/false); |
|
|
|
|
finish_tag_.Set( |
|
|
|
|
call_.call(), |
|
|
|
|
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, |
|
|
|
|
&finish_ops_, |
|
|
|
|
/*can_inline=*/false); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, &finish_status_); |
|
|
|
|
finish_ops_.set_core_cq_tag(&finish_tag_); |
|
|
|
|
} |
|
|
|
@ -682,7 +718,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void MaybeFinish() { |
|
|
|
|
// MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
|
|
|
|
|
void MaybeFinish(bool from_reaction) { |
|
|
|
|
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( |
|
|
|
|
1, std::memory_order_acq_rel) == 1)) { |
|
|
|
|
::grpc::Status s = std::move(finish_status_); |
|
|
|
@ -690,7 +727,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
auto* call = call_.call(); |
|
|
|
|
this->~ClientCallbackReaderImpl(); |
|
|
|
|
::grpc::g_core_codegen_interface->grpc_call_unref(call); |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
if (GPR_LIKELY(from_reaction)) { |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
} else { |
|
|
|
|
reactor->InternalScheduleOnDone(std::move(s)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -703,7 +744,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
start_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnReadInitialMetadataDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&start_ops_, /*can_inline=*/false); |
|
|
|
|
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, |
|
|
|
@ -716,7 +757,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
read_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnReadDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&read_ops_, /*can_inline=*/false); |
|
|
|
|
read_ops_.set_core_cq_tag(&read_tag_); |
|
|
|
@ -729,8 +770,10 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
started_.store(true, std::memory_order_release); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, |
|
|
|
|
&finish_ops_, /*can_inline=*/false); |
|
|
|
|
finish_tag_.Set( |
|
|
|
|
call_.call(), |
|
|
|
|
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, |
|
|
|
|
&finish_ops_, /*can_inline=*/false); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, &finish_status_); |
|
|
|
|
finish_ops_.set_core_cq_tag(&finish_tag_); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
@ -752,7 +795,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> { |
|
|
|
|
void AddHold(int holds) override { |
|
|
|
|
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); |
|
|
|
|
} |
|
|
|
|
void RemoveHold() override { MaybeFinish(); } |
|
|
|
|
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
friend class ClientCallbackReaderFactory<Response>; |
|
|
|
@ -833,7 +876,8 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void MaybeFinish() { |
|
|
|
|
// MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
|
|
|
|
|
void MaybeFinish(bool from_reaction) { |
|
|
|
|
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( |
|
|
|
|
1, std::memory_order_acq_rel) == 1)) { |
|
|
|
|
::grpc::Status s = std::move(finish_status_); |
|
|
|
@ -841,7 +885,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
auto* call = call_.call(); |
|
|
|
|
this->~ClientCallbackWriterImpl(); |
|
|
|
|
::grpc::g_core_codegen_interface->grpc_call_unref(call); |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
if (GPR_LIKELY(from_reaction)) { |
|
|
|
|
reactor->OnDone(s); |
|
|
|
|
} else { |
|
|
|
|
reactor->InternalScheduleOnDone(std::move(s)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -874,7 +922,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
// MaybeFinish outside the lock to make sure that destruction of this object
|
|
|
|
|
// doesn't take place while holding the lock (which would cause the lock to
|
|
|
|
|
// be released after destruction)
|
|
|
|
|
this->MaybeFinish(); |
|
|
|
|
this->MaybeFinish(/*from_reaction=*/false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const Request* msg, ::grpc::WriteOptions options) override { |
|
|
|
@ -907,7 +955,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
writes_done_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnWritesDoneDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&writes_done_ops_, /*can_inline=*/false); |
|
|
|
|
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); |
|
|
|
@ -932,7 +980,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
void AddHold(int holds) override { |
|
|
|
|
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); |
|
|
|
|
} |
|
|
|
|
void RemoveHold() override { MaybeFinish(); } |
|
|
|
|
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
friend class ClientCallbackWriterFactory<Request>; |
|
|
|
@ -953,7 +1001,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
start_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnReadInitialMetadataDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&start_ops_, /*can_inline=*/false); |
|
|
|
|
start_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -962,7 +1010,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
write_tag_.Set(call_.call(), |
|
|
|
|
[this](bool ok) { |
|
|
|
|
reactor_->OnWriteDone(ok); |
|
|
|
|
MaybeFinish(); |
|
|
|
|
MaybeFinish(/*from_reaction=*/true); |
|
|
|
|
}, |
|
|
|
|
&write_ops_, /*can_inline=*/false); |
|
|
|
|
write_ops_.set_core_cq_tag(&write_tag_); |
|
|
|
@ -970,9 +1018,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> { |
|
|
|
|
// Also set up the Finish tag and op set.
|
|
|
|
|
finish_ops_.RecvMessage(response); |
|
|
|
|
finish_ops_.AllowNoMessage(); |
|
|
|
|
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, |
|
|
|
|
&finish_ops_, |
|
|
|
|
/*can_inline=*/false); |
|
|
|
|
finish_tag_.Set( |
|
|
|
|
call_.call(), |
|
|
|
|
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); }, |
|
|
|
|
&finish_ops_, |
|
|
|
|
/*can_inline=*/false); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, &finish_status_); |
|
|
|
|
finish_ops_.set_core_cq_tag(&finish_tag_); |
|
|
|
|
} |
|
|
|
@ -1068,12 +1118,16 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { |
|
|
|
|
call_.PerformOps(&start_ops_); |
|
|
|
|
|
|
|
|
|
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, |
|
|
|
|
&finish_ops_, /*can_inline=*/false); |
|
|
|
|
&finish_ops_, |
|
|
|
|
/*can_inline=*/false); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, &finish_status_); |
|
|
|
|
finish_ops_.set_core_cq_tag(&finish_tag_); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// In the unary case, MaybeFinish is only ever invoked from a
|
|
|
|
|
// library-initiated reaction, so it will just directly call OnDone if this is
|
|
|
|
|
// the last reaction for this RPC.
|
|
|
|
|
void MaybeFinish() { |
|
|
|
|
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub( |
|
|
|
|
1, std::memory_order_acq_rel) == 1)) { |
|
|
|
|