Merge pull request #23229 from grpc/revert-23116-revert-23005-asyncwrite

Roll-forward: Fix StartCall: make corking work and allow concurrent Start*
pull/23081/head
Vijay Pai 5 years ago committed by GitHub
commit 84cf666014
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 2
      build_autogenerated.yaml
  6. 1
      gRPC-C++.podspec
  7. 2
      grpc.gyp
  8. 461
      include/grpcpp/impl/codegen/client_callback_impl.h
  9. 52
      src/cpp/client/client_callback.cc
  10. 118
      test/cpp/end2end/client_callback_end2end_test.cc
  11. 1
      tools/doxygen/Doxyfile.c++.internal

@ -124,6 +124,7 @@ GRPC_SECURE_PUBLIC_HDRS = [
# TODO(ctiller): layer grpc atop grpc_unsecure, layer grpc++ atop grpc++_unsecure
GRPCXX_SRCS = [
"src/cpp/client/channel_cc.cc",
"src/cpp/client/client_callback.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_interceptor.cc",
"src/cpp/client/create_channel.cc",

@ -1220,6 +1220,7 @@ config("grpc_config") {
"include/grpcpp/support/time.h",
"include/grpcpp/support/validate_service_config.h",
"src/cpp/client/channel_cc.cc",
"src/cpp/client/client_callback.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_interceptor.cc",
"src/cpp/client/create_channel.cc",

@ -2482,6 +2482,7 @@ endif()
add_library(grpc++
src/cpp/client/channel_cc.cc
src/cpp/client/client_callback.cc
src/cpp/client/client_context.cc
src/cpp/client/client_interceptor.cc
src/cpp/client/create_channel.cc
@ -3183,6 +3184,7 @@ endif()
add_library(grpc++_unsecure
src/cpp/client/channel_cc.cc
src/cpp/client/client_callback.cc
src/cpp/client/client_context.cc
src/cpp/client/client_interceptor.cc
src/cpp/client/create_channel.cc

@ -4717,6 +4717,7 @@ $(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/helpers.o: $(GENDIR)/src/proto/grpc
LIBGRPC++_SRC = \
src/cpp/client/channel_cc.cc \
src/cpp/client/client_callback.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_interceptor.cc \
src/cpp/client/create_channel.cc \
@ -5423,6 +5424,7 @@ endif
LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/channel_cc.cc \
src/cpp/client/client_callback.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_interceptor.cc \
src/cpp/client/create_channel.cc \

@ -2203,6 +2203,7 @@ libs:
- src/cpp/thread_manager/thread_manager.h
src:
- src/cpp/client/channel_cc.cc
- src/cpp/client/client_callback.cc
- src/cpp/client/client_context.cc
- src/cpp/client/client_interceptor.cc
- src/cpp/client/create_channel.cc
@ -2592,6 +2593,7 @@ libs:
- src/cpp/thread_manager/thread_manager.h
src:
- src/cpp/client/channel_cc.cc
- src/cpp/client/client_callback.cc
- src/cpp/client/client_context.cc
- src/cpp/client/client_interceptor.cc
- src/cpp/client/create_channel.cc

@ -616,6 +616,7 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_grpc.h',
'src/core/tsi/transport_security_interface.h',
'src/cpp/client/channel_cc.cc',
'src/cpp/client/client_callback.cc',
'src/cpp/client/client_context.cc',
'src/cpp/client/client_interceptor.cc',
'src/cpp/client/create_channel.cc',

@ -1313,6 +1313,7 @@
],
'sources': [
'src/cpp/client/channel_cc.cc',
'src/cpp/client/client_callback.cc',
'src/cpp/client/client_context.cc',
'src/cpp/client/client_interceptor.cc',
'src/cpp/client/create_channel.cc',
@ -1464,6 +1465,7 @@
],
'sources': [
'src/cpp/client/channel_cc.cc',
'src/cpp/client/client_callback.cc',
'src/cpp/client/client_context.cc',
'src/cpp/client/client_interceptor.cc',
'src/cpp/client/create_channel.cc',

@ -101,6 +101,29 @@ class CallbackUnaryCallImpl {
call.PerformOps(ops);
}
};
// Base class for public API classes.
class ClientReactor {
public:
/// Called by the library when all operations associated with this RPC have
/// completed and all Holds have been removed. OnDone provides the RPC status
/// outcome for both successful and failed RPCs. If it is never called on an
/// RPC, it indicates an application-level problem (like failure to remove a
/// hold).
///
/// \param[in] s The status outcome of this RPC
virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
/// InternalScheduleOnDone is not part of the API and is not meant to be
/// overridden. It is virtual to allow successful builds for certain bazel
/// build users that only want to depend on gRPC codegen headers and not the
/// full library (although this is not a generally-supported option). Although
/// the virtual call is slower than a direct call, this function is
/// heavyweight and the cost of the virtual call is not much in comparison.
/// This function may be removed or devirtualized in the future.
virtual void InternalScheduleOnDone(::grpc::Status s);
};
} // namespace internal
// Forward declarations
@ -189,7 +212,7 @@ class ClientCallbackUnary {
/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
template <class Request, class Response>
class ClientBidiReactor {
class ClientBidiReactor : public internal::ClientReactor {
public:
virtual ~ClientBidiReactor() {}
@ -282,7 +305,7 @@ class ClientBidiReactor {
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
virtual void OnDone(const ::grpc::Status& /*s*/) {}
void OnDone(const ::grpc::Status& /*s*/) override {}
/// Notifies the application that a read of initial metadata from the
/// server is done. If the application chooses not to implement this method,
@ -327,7 +350,7 @@ class ClientBidiReactor {
/// \a ClientReadReactor is the interface for a server-streaming RPC.
/// All public methods behave as in ClientBidiReactor.
template <class Response>
class ClientReadReactor {
class ClientReadReactor : public internal::ClientReactor {
public:
virtual ~ClientReadReactor() {}
@ -341,7 +364,7 @@ class ClientReadReactor {
}
void RemoveHold() { reader_->RemoveHold(); }
virtual void OnDone(const ::grpc::Status& /*s*/) {}
void OnDone(const ::grpc::Status& /*s*/) override {}
virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
virtual void OnReadDone(bool /*ok*/) {}
@ -354,7 +377,7 @@ class ClientReadReactor {
/// \a ClientWriteReactor is the interface for a client-streaming RPC.
/// All public methods behave as in ClientBidiReactor.
template <class Request>
class ClientWriteReactor {
class ClientWriteReactor : public internal::ClientReactor {
public:
virtual ~ClientWriteReactor() {}
@ -377,7 +400,7 @@ class ClientWriteReactor {
}
void RemoveHold() { writer_->RemoveHold(); }
virtual void OnDone(const ::grpc::Status& /*s*/) {}
void OnDone(const ::grpc::Status& /*s*/) override {}
virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
virtual void OnWriteDone(bool /*ok*/) {}
virtual void OnWritesDoneDone(bool /*ok*/) {}
@ -385,6 +408,7 @@ class ClientWriteReactor {
private:
friend class ClientCallbackWriter<Request>;
void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
ClientCallbackWriter<Request>* writer_;
};
@ -399,12 +423,12 @@ class ClientWriteReactor {
/// call (that is part of the unary call itself) and there is no reactor object
/// being created as a result of this call, we keep a consistent 2-phase
/// initiation API among all the reactor flavors.
class ClientUnaryReactor {
class ClientUnaryReactor : public internal::ClientReactor {
public:
virtual ~ClientUnaryReactor() {}
void StartCall() { call_->StartCall(); }
virtual void OnDone(const ::grpc::Status& /*s*/) {}
void OnDone(const ::grpc::Status& /*s*/) override {}
virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
private:
@ -444,7 +468,13 @@ class ClientCallbackReaderWriterImpl
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
void MaybeFinish() {
// MaybeFinish can be called from reactions or from user-initiated operations
// like StartCall or RemoveHold. If this is the last operation or hold on this
// object, it will invoke the OnDone reaction. If MaybeFinish was called from
// a reaction, it can call OnDone directly. If not, it would need to schedule
// OnDone onto an executor thread to avoid the possibility of deadlocking with
// any locks in the user code that invoked it.
void MaybeFinish(bool from_reaction) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
::grpc::Status s = std::move(finish_status_);
@ -452,7 +482,11 @@ class ClientCallbackReaderWriterImpl
auto* call = call_.call();
this->~ClientCallbackReaderWriterImpl();
::grpc::g_core_codegen_interface->grpc_call_unref(call);
reactor->OnDone(s);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
}
}
}
@ -461,76 +495,51 @@ class ClientCallbackReaderWriterImpl
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any read backlog
// 3. Any write backlog
// 4. Recv trailing metadata, on_completion callback
started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
// 4. Recv trailing metadata (unless corked)
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
}
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadDone(ok);
MaybeFinish();
},
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
call_.PerformOps(&read_ops_);
}
if (write_ops_at_start_) {
call_.PerformOps(&write_ops_);
}
call_.PerformOps(&start_ops_);
if (writes_done_ops_at_start_) {
call_.PerformOps(&writes_done_ops_);
{
grpc::internal::MutexLock lock(&start_mu_);
if (backlog_.read_ops) {
call_.PerformOps(&read_ops_);
}
if (backlog_.write_ops) {
call_.PerformOps(&write_ops_);
}
if (backlog_.writes_done_ops) {
call_.PerformOps(&writes_done_ops_);
}
call_.PerformOps(&finish_ops_);
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_.store(true, std::memory_order_release);
}
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
// MaybeFinish outside the lock to make sure that destruction of this object
// doesn't take place while holding the lock (which would cause the lock to
// be released after destruction)
this->MaybeFinish(/*from_reaction=*/false);
}
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&read_ops_);
} else {
read_ops_at_start_ = true;
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.read_ops = true;
return;
}
}
call_.PerformOps(&read_ops_);
}
void Write(const Request* msg, ::grpc::WriteOptions options) override {
if (start_corked_) {
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
}
if (options.is_last_message()) {
options.set_buffer_hint();
write_ops_.ClientSendClose();
@ -538,38 +547,50 @@ class ClientCallbackReaderWriterImpl
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&write_ops_);
} else {
write_ops_at_start_ = true;
if (GPR_UNLIKELY(corked_write_needed_)) {
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.write_ops = true;
return;
}
}
call_.PerformOps(&write_ops_);
}
void WritesDone() override {
if (start_corked_) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
}
writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWritesDoneDone(ok);
MaybeFinish();
MaybeFinish(/*from_reaction=*/true);
},
&writes_done_ops_, /*can_inline=*/false);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&writes_done_ops_);
} else {
writes_done_ops_at_start_ = true;
if (GPR_UNLIKELY(corked_write_needed_)) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.writes_done_ops = true;
return;
}
}
call_.PerformOps(&writes_done_ops_);
}
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
private:
friend class ClientCallbackReaderWriterFactory<Request, Response>;
@ -580,8 +601,44 @@ class ClientCallbackReaderWriterImpl
: context_(context),
call_(call),
reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) {
start_corked_(context_->initial_metadata_corked_),
corked_write_needed_(start_corked_) {
this->BindReactor(reactor);
// Set up the unchanging parts of the start, read, and write tags and ops.
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish(/*from_reaction=*/true);
},
&start_ops_, /*can_inline=*/false);
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish(/*from_reaction=*/true);
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadDone(ok);
MaybeFinish(/*from_reaction=*/true);
},
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
// Also set up the Finish tag and op set.
finish_tag_.Set(
call_.call(),
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
&finish_ops_,
/*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
}
::grpc_impl::ClientContext* const context_;
@ -592,7 +649,9 @@ class ClientCallbackReaderWriterImpl
grpc::internal::CallOpRecvInitialMetadata>
start_ops_;
grpc::internal::CallbackWithSuccessTag start_tag_;
bool start_corked_;
const bool start_corked_;
bool corked_write_needed_; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
grpc::internal::CallbackWithSuccessTag finish_tag_;
@ -603,22 +662,27 @@ class ClientCallbackReaderWriterImpl
grpc::internal::CallOpClientSendClose>
write_ops_;
grpc::internal::CallbackWithSuccessTag write_tag_;
bool write_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpClientSendClose>
writes_done_ops_;
grpc::internal::CallbackWithSuccessTag writes_done_tag_;
bool writes_done_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
read_ops_;
grpc::internal::CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
struct StartCallBacklog {
bool write_ops = false;
bool writes_done_ops = false;
bool read_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std::atomic<intptr_t> callbacks_outstanding_{3};
std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
};
template <class Request, class Response>
@ -654,7 +718,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
void MaybeFinish() {
// MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
void MaybeFinish(bool from_reaction) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
::grpc::Status s = std::move(finish_status_);
@ -662,7 +727,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
auto* call = call_.call();
this->~ClientCallbackReaderImpl();
::grpc::g_core_codegen_interface->grpc_call_unref(call);
reactor->OnDone(s);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
}
}
}
@ -670,13 +739,12 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog
// 3. Recv trailing metadata, on_completion callback
started_ = true;
// 3. Recv trailing metadata
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
MaybeFinish(/*from_reaction=*/true);
},
&start_ops_, /*can_inline=*/false);
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
@ -689,16 +757,23 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadDone(ok);
MaybeFinish();
MaybeFinish(/*from_reaction=*/true);
},
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
call_.PerformOps(&read_ops_);
{
grpc::internal::MutexLock lock(&start_mu_);
if (backlog_.read_ops) {
call_.PerformOps(&read_ops_);
}
started_.store(true, std::memory_order_release);
}
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
finish_tag_.Set(
call_.call(),
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
&finish_ops_, /*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
@ -707,17 +782,20 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&read_ops_);
} else {
read_ops_at_start_ = true;
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.read_ops = true;
return;
}
}
call_.PerformOps(&read_ops_);
}
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
private:
friend class ClientCallbackReaderFactory<Response>;
@ -752,11 +830,16 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
read_ops_;
grpc::internal::CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
struct StartCallBacklog {
bool read_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
};
template <class Response>
@ -793,7 +876,8 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
void MaybeFinish() {
// MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
void MaybeFinish(bool from_reaction) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
::grpc::Status s = std::move(finish_status_);
@ -801,7 +885,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
auto* call = call_.call();
this->~ClientCallbackWriterImpl();
::grpc::g_core_codegen_interface->grpc_call_unref(call);
reactor->OnDone(s);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
}
}
}
@ -809,94 +897,90 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog
// 3. Recv trailing metadata, on_completion callback
started_ = true;
// 3. Recv trailing metadata
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
}
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
if (write_ops_at_start_) {
call_.PerformOps(&write_ops_);
{
grpc::internal::MutexLock lock(&start_mu_);
if (backlog_.write_ops) {
call_.PerformOps(&write_ops_);
}
if (backlog_.writes_done_ops) {
call_.PerformOps(&writes_done_ops_);
}
call_.PerformOps(&finish_ops_);
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_.store(true, std::memory_order_release);
}
if (writes_done_ops_at_start_) {
call_.PerformOps(&writes_done_ops_);
}
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
// MaybeFinish outside the lock to make sure that destruction of this object
// doesn't take place while holding the lock (which would cause the lock to
// be released after destruction)
this->MaybeFinish(/*from_reaction=*/false);
}
void Write(const Request* msg, ::grpc::WriteOptions options) override {
if (start_corked_) {
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
}
if (options.is_last_message()) {
if (GPR_UNLIKELY(options.is_last_message())) {
options.set_buffer_hint();
write_ops_.ClientSendClose();
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&write_ops_);
} else {
write_ops_at_start_ = true;
if (GPR_UNLIKELY(corked_write_needed_)) {
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.write_ops = true;
return;
}
}
call_.PerformOps(&write_ops_);
}
void WritesDone() override {
if (start_corked_) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
}
writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWritesDoneDone(ok);
MaybeFinish();
MaybeFinish(/*from_reaction=*/true);
},
&writes_done_ops_, /*can_inline=*/false);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&writes_done_ops_);
} else {
writes_done_ops_at_start_ = true;
if (GPR_UNLIKELY(corked_write_needed_)) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.writes_done_ops = true;
return;
}
}
call_.PerformOps(&writes_done_ops_);
}
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
private:
friend class ClientCallbackWriterFactory<Request>;
@ -909,10 +993,38 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
: context_(context),
call_(call),
reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) {
start_corked_(context_->initial_metadata_corked_),
corked_write_needed_(start_corked_) {
this->BindReactor(reactor);
// Set up the unchanging parts of the start and write tags and ops.
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish(/*from_reaction=*/true);
},
&start_ops_, /*can_inline=*/false);
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish(/*from_reaction=*/true);
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
// Also set up the Finish tag and op set.
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
finish_tag_.Set(
call_.call(),
[this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
&finish_ops_,
/*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
}
::grpc_impl::ClientContext* const context_;
@ -923,7 +1035,9 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc::internal::CallOpRecvInitialMetadata>
start_ops_;
grpc::internal::CallbackWithSuccessTag start_tag_;
bool start_corked_;
const bool start_corked_;
bool corked_write_needed_; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
grpc::internal::CallOpClientRecvStatus>
@ -936,17 +1050,22 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc::internal::CallOpClientSendClose>
write_ops_;
grpc::internal::CallbackWithSuccessTag write_tag_;
bool write_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpClientSendClose>
writes_done_ops_;
grpc::internal::CallbackWithSuccessTag writes_done_tag_;
bool writes_done_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
struct StartCallBacklog {
bool write_ops = false;
bool writes_done_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std::atomic<intptr_t> callbacks_outstanding_{3};
std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
};
template <class Request>
@ -985,7 +1104,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call initiates two batches, each with a callback
// 1. Send initial metadata + write + writes done + recv initial metadata
// 2. Read message, recv trailing metadata
started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@ -1000,12 +1118,16 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
call_.PerformOps(&start_ops_);
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
&finish_ops_,
/*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
}
// In the unary case, MaybeFinish is only ever invoked from a
// library-initiated reaction, so it will just directly call OnDone if this is
// the last reaction for this RPC.
void MaybeFinish() {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
@ -1053,7 +1175,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call will have 2 callbacks: start and finish
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
};
class ClientCallbackUnaryFactory {

@ -0,0 +1,52 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpcpp/impl/codegen/client_callback_impl.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
namespace grpc_impl {
namespace internal {
void ClientReactor::InternalScheduleOnDone(grpc::Status s) {
// Unlike other uses of closure, do not Ref or Unref here since the reactor
// object's lifetime is controlled by user code.
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ClientReactor* const reactor;
const grpc::Status status;
ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s)
: reactor(reactor_arg), status(std::move(s)) {
GRPC_CLOSURE_INIT(&closure,
[](void* void_arg, grpc_error*) {
ClosureWithArg* arg =
static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnDone(arg->status);
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, std::move(s));
grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
}
} // namespace internal
} // namespace grpc_impl

@ -16,12 +16,6 @@
*
*/
#include <algorithm>
#include <functional>
#include <mutex>
#include <sstream>
#include <thread>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
@ -31,6 +25,14 @@
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/client_callback.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <sstream>
#include <thread>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/iomgr.h"
@ -43,8 +45,6 @@
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h"
#include <gtest/gtest.h>
// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
// should be skipped based on a decision made at SetUp time. In particular, any
// callback tests can only be run if the iomgr can run in the background or if
@ -1114,7 +1114,8 @@ class BidiClient
public:
BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
int num_msgs_to_send, bool cork_metadata, bool first_write_async,
ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} {
@ -1124,8 +1125,9 @@ class BidiClient
grpc::to_string(server_try_cancel));
}
request_.set_message("Hello fren ");
context_.set_initial_metadata_corked(cork_metadata);
stub->experimental_async()->BidiStream(&context_, this);
MaybeWrite();
MaybeAsyncWrite(first_write_async);
StartRead(&response_);
StartCall();
}
@ -1146,6 +1148,10 @@ class BidiClient
}
}
void OnWriteDone(bool ok) override {
if (async_write_thread_.joinable()) {
async_write_thread_.join();
RemoveHold();
}
if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_TRUE(ok);
} else if (!ok) {
@ -1210,6 +1216,26 @@ class BidiClient
}
private:
void MaybeAsyncWrite(bool first_write_async) {
if (first_write_async) {
// Make sure that we have a write to issue.
// TODO(vjpai): Make this work with 0 writes case as well.
assert(msgs_to_send_ >= 1);
AddHold();
async_write_thread_ = std::thread([this] {
std::unique_lock<std::mutex> lock(async_write_thread_mu_);
async_write_thread_cv_.wait(
lock, [this] { return async_write_thread_start_; });
MaybeWrite();
});
std::lock_guard<std::mutex> lock(async_write_thread_mu_);
async_write_thread_start_ = true;
async_write_thread_cv_.notify_one();
return;
}
MaybeWrite();
}
void MaybeWrite() {
if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) {
@ -1231,13 +1257,57 @@ class BidiClient
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
std::thread async_write_thread_;
bool async_write_thread_start_ = false;
std::mutex async_write_thread_mu_;
std::condition_variable async_write_thread_cv_;
};
TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend};
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/true);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/true, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/true, /*first_write_async=*/true);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
@ -1248,8 +1318,10 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}};
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/false,
ClientCancelInfo(2));
test.Await();
// Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) {
@ -1261,7 +1333,8 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@ -1274,7 +1347,9 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
/*num_msgs_to_send=*/10, /*cork_metadata=*/false,
/*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@ -1287,7 +1362,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@ -1402,6 +1478,12 @@ TEST_P(ClientCallbackEnd2endTest,
done_cv_.wait(l);
}
}
// RemoveHold under the same lock used for OnDone to make sure that we don't
// call OnDone directly or indirectly from the RemoveHold function.
void RemoveHoldUnderLock() {
std::unique_lock<std::mutex> l(mu_);
RemoveHold();
}
const Status& status() {
std::unique_lock<std::mutex> l(mu_);
return status_;
@ -1446,7 +1528,7 @@ TEST_P(ClientCallbackEnd2endTest,
++reads_complete;
}
}
client.RemoveHold();
client.RemoveHoldUnderLock();
client.Await();
EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);

@ -1920,6 +1920,7 @@ src/core/tsi/transport_security_grpc.h \
src/core/tsi/transport_security_interface.h \
src/cpp/README.md \
src/cpp/client/channel_cc.cc \
src/cpp/client/client_callback.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_interceptor.cc \
src/cpp/client/create_channel.cc \

Loading…
Cancel
Save