Revert "Revert "Fix StartCall: make corking work and allow concurrent Start*""

pull/23230/head^2
Vijay Pai 5 years ago committed by GitHub
parent a2a1ca3281
commit 278671072f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 333
      include/grpcpp/impl/codegen/client_callback_impl.h
  2. 110
      test/cpp/end2end/client_callback_end2end_test.cc

@ -461,76 +461,51 @@ class ClientCallbackReaderWriterImpl
// 1. Send initial metadata (unless corked) + recv initial metadata // 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any read backlog // 2. Any read backlog
// 3. Any write backlog // 3. Any write backlog
// 4. Recv trailing metadata, on_completion callback // 4. Recv trailing metadata (unless corked)
started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
if (!start_corked_) { if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
} }
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_); call_.PerformOps(&start_ops_);
// Also set up the read and write tags so that they don't have to be set up {
// each time grpc::internal::MutexLock lock(&start_mu_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(), if (backlog_.read_ops) {
[this](bool ok) {
reactor_->OnReadDone(ok);
MaybeFinish();
},
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
call_.PerformOps(&read_ops_); call_.PerformOps(&read_ops_);
} }
if (backlog_.write_ops) {
if (write_ops_at_start_) {
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
if (backlog_.writes_done_ops) {
if (writes_done_ops_at_start_) {
call_.PerformOps(&writes_done_ops_); call_.PerformOps(&writes_done_ops_);
} }
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_.store(true, std::memory_order_release);
}
// MaybeFinish outside the lock to make sure that destruction of this object
// doesn't take place while holding the lock (which would cause the lock to
// be released after destruction)
this->MaybeFinish();
} }
void Read(Response* msg) override { void Read(Response* msg) override {
read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) { if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
call_.PerformOps(&read_ops_); grpc::internal::MutexLock lock(&start_mu_);
} else { if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
read_ops_at_start_ = true; backlog_.read_ops = true;
return;
} }
} }
call_.PerformOps(&read_ops_);
void Write(const Request* msg, ::grpc::WriteOptions options) override {
if (start_corked_) {
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
} }
void Write(const Request* msg, ::grpc::WriteOptions options) override {
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();
write_ops_.ClientSendClose(); write_ops_.ClientSendClose();
@ -538,18 +513,22 @@ class ClientCallbackReaderWriterImpl
// TODO(vjpai): don't assert // TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) { if (GPR_UNLIKELY(corked_write_needed_)) {
call_.PerformOps(&write_ops_); write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
} else { context_->initial_metadata_flags());
write_ops_at_start_ = true; corked_write_needed_ = false;
} }
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.write_ops = true;
return;
} }
void WritesDone() override {
if (start_corked_) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
} }
call_.PerformOps(&write_ops_);
}
void WritesDone() override {
writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(), writes_done_tag_.Set(call_.call(),
[this](bool ok) { [this](bool ok) {
@ -559,11 +538,19 @@ class ClientCallbackReaderWriterImpl
&writes_done_ops_, /*can_inline=*/false); &writes_done_ops_, /*can_inline=*/false);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) { if (GPR_UNLIKELY(corked_write_needed_)) {
call_.PerformOps(&writes_done_ops_); writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
} else { context_->initial_metadata_flags());
writes_done_ops_at_start_ = true; corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.writes_done_ops = true;
return;
}
} }
call_.PerformOps(&writes_done_ops_);
} }
void AddHold(int holds) override { void AddHold(int holds) override {
@ -580,8 +567,42 @@ class ClientCallbackReaderWriterImpl
: context_(context), : context_(context),
call_(call), call_(call),
reactor_(reactor), reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) { start_corked_(context_->initial_metadata_corked_),
corked_write_needed_(start_corked_) {
this->BindReactor(reactor); this->BindReactor(reactor);
// Set up the unchanging parts of the start, read, and write tags and ops.
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadDone(ok);
MaybeFinish();
},
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
// Also set up the Finish tag and op set.
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_,
/*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
} }
::grpc_impl::ClientContext* const context_; ::grpc_impl::ClientContext* const context_;
@ -592,7 +613,9 @@ class ClientCallbackReaderWriterImpl
grpc::internal::CallOpRecvInitialMetadata> grpc::internal::CallOpRecvInitialMetadata>
start_ops_; start_ops_;
grpc::internal::CallbackWithSuccessTag start_tag_; grpc::internal::CallbackWithSuccessTag start_tag_;
bool start_corked_; const bool start_corked_;
bool corked_write_needed_; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_; grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
grpc::internal::CallbackWithSuccessTag finish_tag_; grpc::internal::CallbackWithSuccessTag finish_tag_;
@ -603,22 +626,27 @@ class ClientCallbackReaderWriterImpl
grpc::internal::CallOpClientSendClose> grpc::internal::CallOpClientSendClose>
write_ops_; write_ops_;
grpc::internal::CallbackWithSuccessTag write_tag_; grpc::internal::CallbackWithSuccessTag write_tag_;
bool write_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpClientSendClose> grpc::internal::CallOpClientSendClose>
writes_done_ops_; writes_done_ops_;
grpc::internal::CallbackWithSuccessTag writes_done_tag_; grpc::internal::CallbackWithSuccessTag writes_done_tag_;
bool writes_done_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>> grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
read_ops_; read_ops_;
grpc::internal::CallbackWithSuccessTag read_tag_; grpc::internal::CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish struct StartCallBacklog {
std::atomic<intptr_t> callbacks_outstanding_{2}; bool write_ops = false;
bool started_{false}; bool writes_done_ops = false;
bool read_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std::atomic<intptr_t> callbacks_outstanding_{3};
std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
}; };
template <class Request, class Response> template <class Request, class Response>
@ -670,8 +698,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
// This call initiates two batches, plus any backlog, each with a callback // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata // 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog // 2. Any backlog
// 3. Recv trailing metadata, on_completion callback // 3. Recv trailing metadata
started_ = true;
start_tag_.Set(call_.call(), start_tag_.Set(call_.call(),
[this](bool ok) { [this](bool ok) {
@ -693,9 +720,14 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
}, },
&read_ops_, /*can_inline=*/false); &read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_); read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) {
{
grpc::internal::MutexLock lock(&start_mu_);
if (backlog_.read_ops) {
call_.PerformOps(&read_ops_); call_.PerformOps(&read_ops_);
} }
started_.store(true, std::memory_order_release);
}
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false); &finish_ops_, /*can_inline=*/false);
@ -707,12 +739,15 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
void Read(Response* msg) override { void Read(Response* msg) override {
read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) { if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
call_.PerformOps(&read_ops_); grpc::internal::MutexLock lock(&start_mu_);
} else { if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
read_ops_at_start_ = true; backlog_.read_ops = true;
return;
} }
} }
call_.PerformOps(&read_ops_);
}
void AddHold(int holds) override { void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
@ -752,11 +787,16 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>> grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
read_ops_; read_ops_;
grpc::internal::CallbackWithSuccessTag read_tag_; grpc::internal::CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
struct StartCallBacklog {
bool read_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 2 callbacks to pre-register for start and finish // Minimum of 2 callbacks to pre-register for start and finish
std::atomic<intptr_t> callbacks_outstanding_{2}; std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false}; std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
}; };
template <class Response> template <class Response>
@ -809,74 +849,60 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
// This call initiates two batches, plus any backlog, each with a callback // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata // 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog // 2. Any backlog
// 3. Recv trailing metadata, on_completion callback // 3. Recv trailing metadata
started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
if (!start_corked_) { if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
} }
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_); call_.PerformOps(&start_ops_);
// Also set up the read and write tags so that they don't have to be set up {
// each time grpc::internal::MutexLock lock(&start_mu_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
if (write_ops_at_start_) { if (backlog_.write_ops) {
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
if (backlog_.writes_done_ops) {
if (writes_done_ops_at_start_) {
call_.PerformOps(&writes_done_ops_); call_.PerformOps(&writes_done_ops_);
} }
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_, /*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_.store(true, std::memory_order_release);
} }
// MaybeFinish outside the lock to make sure that destruction of this object
void Write(const Request* msg, ::grpc::WriteOptions options) override { // doesn't take place while holding the lock (which would cause the lock to
if (start_corked_) { // be released after destruction)
write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, this->MaybeFinish();
context_->initial_metadata_flags());
start_corked_ = false;
} }
if (options.is_last_message()) { void Write(const Request* msg, ::grpc::WriteOptions options) override {
if (GPR_UNLIKELY(options.is_last_message())) {
options.set_buffer_hint(); options.set_buffer_hint();
write_ops_.ClientSendClose(); write_ops_.ClientSendClose();
} }
// TODO(vjpai): don't assert // TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&write_ops_); if (GPR_UNLIKELY(corked_write_needed_)) {
} else { write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
write_ops_at_start_ = true; context_->initial_metadata_flags());
corked_write_needed_ = false;
} }
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.write_ops = true;
return;
} }
void WritesDone() override {
if (start_corked_) {
writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_corked_ = false;
} }
call_.PerformOps(&write_ops_);
}
void WritesDone() override {
writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(), writes_done_tag_.Set(call_.call(),
[this](bool ok) { [this](bool ok) {
@ -886,12 +912,22 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
&writes_done_ops_, /*can_inline=*/false); &writes_done_ops_, /*can_inline=*/false);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&writes_done_ops_); if (GPR_UNLIKELY(corked_write_needed_)) {
} else { writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
writes_done_ops_at_start_ = true; context_->initial_metadata_flags());
corked_write_needed_ = false;
}
if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
grpc::internal::MutexLock lock(&start_mu_);
if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
backlog_.writes_done_ops = true;
return;
} }
} }
call_.PerformOps(&writes_done_ops_);
}
void AddHold(int holds) override { void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed); callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
@ -909,10 +945,36 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
: context_(context), : context_(context),
call_(call), call_(call),
reactor_(reactor), reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) { start_corked_(context_->initial_metadata_corked_),
corked_write_needed_(start_corked_) {
this->BindReactor(reactor); this->BindReactor(reactor);
// Set up the unchanging parts of the start and write tags and ops.
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_, /*can_inline=*/false);
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
write_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnWriteDone(ok);
MaybeFinish();
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
// Also set up the Finish tag and op set.
finish_ops_.RecvMessage(response); finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage(); finish_ops_.AllowNoMessage();
finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
&finish_ops_,
/*can_inline=*/false);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
} }
::grpc_impl::ClientContext* const context_; ::grpc_impl::ClientContext* const context_;
@ -923,7 +985,9 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc::internal::CallOpRecvInitialMetadata> grpc::internal::CallOpRecvInitialMetadata>
start_ops_; start_ops_;
grpc::internal::CallbackWithSuccessTag start_tag_; grpc::internal::CallbackWithSuccessTag start_tag_;
bool start_corked_; const bool start_corked_;
bool corked_write_needed_; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage, grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
grpc::internal::CallOpClientRecvStatus> grpc::internal::CallOpClientRecvStatus>
@ -936,17 +1000,22 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc::internal::CallOpClientSendClose> grpc::internal::CallOpClientSendClose>
write_ops_; write_ops_;
grpc::internal::CallbackWithSuccessTag write_tag_; grpc::internal::CallbackWithSuccessTag write_tag_;
bool write_ops_at_start_{false};
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpClientSendClose> grpc::internal::CallOpClientSendClose>
writes_done_ops_; writes_done_ops_;
grpc::internal::CallbackWithSuccessTag writes_done_tag_; grpc::internal::CallbackWithSuccessTag writes_done_tag_;
bool writes_done_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish struct StartCallBacklog {
std::atomic<intptr_t> callbacks_outstanding_{2}; bool write_ops = false;
bool started_{false}; bool writes_done_ops = false;
};
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std::atomic<intptr_t> callbacks_outstanding_{3};
std::atomic_bool started_{false};
grpc::internal::Mutex start_mu_;
}; };
template <class Request> template <class Request>
@ -985,7 +1054,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call initiates two batches, each with a callback // This call initiates two batches, each with a callback
// 1. Send initial metadata + write + writes done + recv initial metadata // 1. Send initial metadata + write + writes done + recv initial metadata
// 2. Read message, recv trailing metadata // 2. Read message, recv trailing metadata
started_ = true;
start_tag_.Set(call_.call(), start_tag_.Set(call_.call(),
[this](bool ok) { [this](bool ok) {
@ -1053,7 +1121,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call will have 2 callbacks: start and finish // This call will have 2 callbacks: start and finish
std::atomic<intptr_t> callbacks_outstanding_{2}; std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
}; };
class ClientCallbackUnaryFactory { class ClientCallbackUnaryFactory {

@ -16,12 +16,6 @@
* *
*/ */
#include <algorithm>
#include <functional>
#include <mutex>
#include <sstream>
#include <thread>
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
@ -31,6 +25,14 @@
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
#include <grpcpp/support/client_callback.h> #include <grpcpp/support/client_callback.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <sstream>
#include <thread>
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
@ -43,8 +45,6 @@
#include "test/cpp/util/string_ref_helper.h" #include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h" #include "test/cpp/util/test_credentials_provider.h"
#include <gtest/gtest.h>
// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
// should be skipped based on a decision made at SetUp time. In particular, any // should be skipped based on a decision made at SetUp time. In particular, any
// callback tests can only be run if the iomgr can run in the background or if // callback tests can only be run if the iomgr can run in the background or if
@ -1114,7 +1114,8 @@ class BidiClient
public: public:
BidiClient(grpc::testing::EchoTestService::Stub* stub, BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel, ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {}) int num_msgs_to_send, bool cork_metadata, bool first_write_async,
ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel), : server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send}, msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} { client_cancel_{client_cancel} {
@ -1124,8 +1125,9 @@ class BidiClient
grpc::to_string(server_try_cancel)); grpc::to_string(server_try_cancel));
} }
request_.set_message("Hello fren "); request_.set_message("Hello fren ");
context_.set_initial_metadata_corked(cork_metadata);
stub->experimental_async()->BidiStream(&context_, this); stub->experimental_async()->BidiStream(&context_, this);
MaybeWrite(); MaybeAsyncWrite(first_write_async);
StartRead(&response_); StartRead(&response_);
StartCall(); StartCall();
} }
@ -1146,6 +1148,10 @@ class BidiClient
} }
} }
void OnWriteDone(bool ok) override { void OnWriteDone(bool ok) override {
if (async_write_thread_.joinable()) {
async_write_thread_.join();
RemoveHold();
}
if (server_try_cancel_ == DO_NOT_CANCEL) { if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_TRUE(ok); EXPECT_TRUE(ok);
} else if (!ok) { } else if (!ok) {
@ -1210,6 +1216,26 @@ class BidiClient
} }
private: private:
void MaybeAsyncWrite(bool first_write_async) {
if (first_write_async) {
// Make sure that we have a write to issue.
// TODO(vjpai): Make this work with 0 writes case as well.
assert(msgs_to_send_ >= 1);
AddHold();
async_write_thread_ = std::thread([this] {
std::unique_lock<std::mutex> lock(async_write_thread_mu_);
async_write_thread_cv_.wait(
lock, [this] { return async_write_thread_start_; });
MaybeWrite();
});
std::lock_guard<std::mutex> lock(async_write_thread_mu_);
async_write_thread_start_ = true;
async_write_thread_cv_.notify_one();
return;
}
MaybeWrite();
}
void MaybeWrite() { void MaybeWrite() {
if (client_cancel_.cancel && if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) { writes_complete_ == client_cancel_.ops_before_cancel) {
@ -1231,13 +1257,57 @@ class BidiClient
std::mutex mu_; std::mutex mu_;
std::condition_variable cv_; std::condition_variable cv_;
bool done_ = false; bool done_ = false;
std::thread async_write_thread_;
bool async_write_thread_start_ = false;
std::mutex async_write_thread_mu_;
std::condition_variable async_write_thread_cv_;
}; };
TEST_P(ClientCallbackEnd2endTest, BidiStream) { TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL, BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend}; kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/true);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/true, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/true, /*first_write_async=*/true);
test.Await(); test.Await();
// Make sure that the server interceptors were not notified of a cancel // Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) { if (GetParam().use_interceptors) {
@ -1248,8 +1318,10 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL, BidiClient test(stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}}; kServerDefaultResponseStreamsToSend,
/*cork_metadata=*/false, /*first_write_async=*/false,
ClientCancelInfo(2));
test.Await(); test.Await();
// Make sure that the server interceptors were notified of a cancel // Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) { if (GetParam().use_interceptors) {
@ -1261,7 +1333,8 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2}; BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await(); test.Await();
// Make sure that the server interceptors were notified // Make sure that the server interceptors were notified
if (GetParam().use_interceptors) { if (GetParam().use_interceptors) {
@ -1274,7 +1347,9 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
/*num_msgs_to_send=*/10, /*cork_metadata=*/false,
/*first_write_async=*/false);
test.Await(); test.Await();
// Make sure that the server interceptors were notified // Make sure that the server interceptors were notified
if (GetParam().use_interceptors) { if (GetParam().use_interceptors) {
@ -1287,7 +1362,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5}; BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
/*cork_metadata=*/false, /*first_write_async=*/false);
test.Await(); test.Await();
// Make sure that the server interceptors were notified // Make sure that the server interceptors were notified
if (GetParam().use_interceptors) { if (GetParam().use_interceptors) {

Loading…
Cancel
Save