diff --git a/include/grpcpp/impl/codegen/client_callback_impl.h b/include/grpcpp/impl/codegen/client_callback_impl.h index 8e683743e06..86a46d8c870 100644 --- a/include/grpcpp/impl/codegen/client_callback_impl.h +++ b/include/grpcpp/impl/codegen/client_callback_impl.h @@ -461,76 +461,51 @@ class ClientCallbackReaderWriterImpl // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any read backlog // 3. Any write backlog - // 4. Recv trailing metadata, on_completion callback - started_ = true; - - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); + // 4. Recv trailing metadata (unless corked) if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); } - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - call_.PerformOps(&start_ops_); - - // Also set up the read and write tags so that they don't have to be set up - // each time - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - read_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(); - }, - &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - if (read_ops_at_start_) { - call_.PerformOps(&read_ops_); - } - if (write_ops_at_start_) { - call_.PerformOps(&write_ops_); - } + call_.PerformOps(&start_ops_); - if (writes_done_ops_at_start_) { - call_.PerformOps(&writes_done_ops_); + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); } - - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(); } void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&read_ops_); - } else { - read_ops_at_start_ = true; + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } } + call_.PerformOps(&read_ops_); } void Write(const Request* msg, ::grpc::WriteOptions options) override { - if (start_corked_) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_corked_ = false; - } - if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); @@ -538,18 +513,22 @@ class ClientCallbackReaderWriterImpl // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&write_ops_); - } else { - write_ops_at_start_ = true; + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } } + call_.PerformOps(&write_ops_); } void WritesDone() override { - if (start_corked_) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_corked_ = false; - } writes_done_ops_.ClientSendClose(); writes_done_tag_.Set(call_.call(), [this](bool ok) { @@ -559,11 +538,19 @@ class ClientCallbackReaderWriterImpl &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&writes_done_ops_); - } else { - writes_done_ops_at_start_ = true; + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; } + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } + } + call_.PerformOps(&writes_done_ops_); } void AddHold(int holds) override { @@ -580,8 +567,42 @@ class ClientCallbackReaderWriterImpl : context_(context), call_(call), reactor_(reactor), - start_corked_(context_->initial_metadata_corked_) { + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { this->BindReactor(reactor); + + // Set up the unchanging parts of the start, read, and write tags and ops. + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + + // Also set up the Finish tag and op set. + finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); } ::grpc_impl::ClientContext* const context_; @@ -592,7 +613,9 @@ class ClientCallbackReaderWriterImpl grpc::internal::CallOpRecvInitialMetadata> start_ops_; grpc::internal::CallbackWithSuccessTag start_tag_; - bool start_corked_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent grpc::internal::CallOpSet finish_ops_; grpc::internal::CallbackWithSuccessTag finish_tag_; @@ -603,22 +626,27 @@ class ClientCallbackReaderWriterImpl grpc::internal::CallOpClientSendClose> write_ops_; grpc::internal::CallbackWithSuccessTag write_tag_; - bool write_ops_at_start_{false}; grpc::internal::CallOpSet writes_done_ops_; grpc::internal::CallbackWithSuccessTag writes_done_tag_; - bool writes_done_ops_at_start_{false}; grpc::internal::CallOpSet> read_ops_; grpc::internal::CallbackWithSuccessTag read_tag_; - bool read_ops_at_start_{false}; - // Minimum of 2 callbacks to pre-register for start and finish - std::atomic callbacks_outstanding_{2}; - bool started_{false}; + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + bool read_ops = false; + }; + StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; }; template @@ -670,8 +698,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog - // 3. Recv trailing metadata, on_completion callback - started_ = true; + // 3. Recv trailing metadata start_tag_.Set(call_.call(), [this](bool ok) { @@ -693,8 +720,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { }, &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); - if (read_ops_at_start_) { - call_.PerformOps(&read_ops_); + + { + grpc::internal::MutexLock lock(&start_mu_); + if (backlog_.read_ops) { + call_.PerformOps(&read_ops_); + } + started_.store(true, std::memory_order_release); } finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, @@ -707,11 +739,14 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&read_ops_); - } else { - read_ops_at_start_ = true; + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.read_ops = true; + return; + } } + call_.PerformOps(&read_ops_); } void AddHold(int holds) override { @@ -752,11 +787,16 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { grpc::internal::CallOpSet> read_ops_; grpc::internal::CallbackWithSuccessTag read_tag_; - bool read_ops_at_start_{false}; + + struct StartCallBacklog { + bool read_ops = false; + }; + StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; // Minimum of 2 callbacks to pre-register for start and finish std::atomic callbacks_outstanding_{2}; - bool started_{false}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; }; template @@ -809,74 +849,60 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog - // 3. Recv trailing metadata, on_completion callback - started_ = true; + // 3. Recv trailing metadata - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); } - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - // Also set up the read and write tags so that they don't have to be set up - // each time - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - if (write_ops_at_start_) { - call_.PerformOps(&write_ops_); + { + grpc::internal::MutexLock lock(&start_mu_); + + if (backlog_.write_ops) { + call_.PerformOps(&write_ops_); + } + if (backlog_.writes_done_ops) { + call_.PerformOps(&writes_done_ops_); + } + call_.PerformOps(&finish_ops_); + // The last thing in this critical section is to set started_ so that it + // can be used lock-free as well. + started_.store(true, std::memory_order_release); } - - if (writes_done_ops_at_start_) { - call_.PerformOps(&writes_done_ops_); - } - - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); + // MaybeFinish outside the lock to make sure that destruction of this object + // doesn't take place while holding the lock (which would cause the lock to + // be released after destruction) + this->MaybeFinish(); } void Write(const Request* msg, ::grpc::WriteOptions options) override { - if (start_corked_) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_corked_ = false; - } - - if (options.is_last_message()) { + if (GPR_UNLIKELY(options.is_last_message())) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&write_ops_); - } else { - write_ops_at_start_ = true; + + if (GPR_UNLIKELY(corked_write_needed_)) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.write_ops = true; + return; + } + } + call_.PerformOps(&write_ops_); } + void WritesDone() override { - if (start_corked_) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - start_corked_ = false; - } writes_done_ops_.ClientSendClose(); writes_done_tag_.Set(call_.call(), [this](bool ok) { @@ -886,11 +912,21 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (started_) { - call_.PerformOps(&writes_done_ops_); - } else { - writes_done_ops_at_start_ = true; + + if (GPR_UNLIKELY(corked_write_needed_)) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + corked_write_needed_ = false; + } + + if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { + grpc::internal::MutexLock lock(&start_mu_); + if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { + backlog_.writes_done_ops = true; + return; + } } + call_.PerformOps(&writes_done_ops_); } void AddHold(int holds) override { @@ -909,10 +945,36 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { : context_(context), call_(call), reactor_(reactor), - start_corked_(context_->initial_metadata_corked_) { + start_corked_(context_->initial_metadata_corked_), + corked_write_needed_(start_corked_) { this->BindReactor(reactor); + + // Set up the unchanging parts of the start and write tags and ops. + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + // Also set up the Finish tag and op set. finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); + finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, + &finish_ops_, + /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); } ::grpc_impl::ClientContext* const context_; @@ -923,7 +985,9 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { grpc::internal::CallOpRecvInitialMetadata> start_ops_; grpc::internal::CallbackWithSuccessTag start_tag_; - bool start_corked_; + const bool start_corked_; + bool corked_write_needed_; // no lock needed since only accessed in + // Write/WritesDone which cannot be concurrent grpc::internal::CallOpSet @@ -936,17 +1000,22 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { grpc::internal::CallOpClientSendClose> write_ops_; grpc::internal::CallbackWithSuccessTag write_tag_; - bool write_ops_at_start_{false}; grpc::internal::CallOpSet writes_done_ops_; grpc::internal::CallbackWithSuccessTag writes_done_tag_; - bool writes_done_ops_at_start_{false}; - // Minimum of 2 callbacks to pre-register for start and finish - std::atomic callbacks_outstanding_{2}; - bool started_{false}; + struct StartCallBacklog { + bool write_ops = false; + bool writes_done_ops = false; + }; + StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + + // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish + std::atomic callbacks_outstanding_{3}; + std::atomic_bool started_{false}; + grpc::internal::Mutex start_mu_; }; template @@ -985,7 +1054,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { // This call initiates two batches, each with a callback // 1. Send initial metadata + write + writes done + recv initial metadata // 2. Read message, recv trailing metadata - started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -1053,7 +1121,6 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { // This call will have 2 callbacks: start and finish std::atomic callbacks_outstanding_{2}; - bool started_{false}; }; class ClientCallbackUnaryFactory { diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 05aac336789..cd3c14881bd 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -16,12 +16,6 @@ * */ -#include -#include -#include -#include -#include - #include #include #include @@ -31,6 +25,14 @@ #include #include #include +#include + +#include +#include +#include +#include +#include +#include #include "src/core/lib/gpr/env.h" #include "src/core/lib/iomgr/iomgr.h" @@ -43,8 +45,6 @@ #include "test/cpp/util/string_ref_helper.h" #include "test/cpp/util/test_credentials_provider.h" -#include - // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration // should be skipped based on a decision made at SetUp time. In particular, any // callback tests can only be run if the iomgr can run in the background or if @@ -1114,7 +1114,8 @@ class BidiClient public: BidiClient(grpc::testing::EchoTestService::Stub* stub, ServerTryCancelRequestPhase server_try_cancel, - int num_msgs_to_send, ClientCancelInfo client_cancel = {}) + int num_msgs_to_send, bool cork_metadata, bool first_write_async, + ClientCancelInfo client_cancel = {}) : server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send}, client_cancel_{client_cancel} { @@ -1124,8 +1125,9 @@ class BidiClient grpc::to_string(server_try_cancel)); } request_.set_message("Hello fren "); + context_.set_initial_metadata_corked(cork_metadata); stub->experimental_async()->BidiStream(&context_, this); - MaybeWrite(); + MaybeAsyncWrite(first_write_async); StartRead(&response_); StartCall(); } @@ -1146,6 +1148,10 @@ class BidiClient } } void OnWriteDone(bool ok) override { + if (async_write_thread_.joinable()) { + async_write_thread_.join(); + RemoveHold(); + } if (server_try_cancel_ == DO_NOT_CANCEL) { EXPECT_TRUE(ok); } else if (!ok) { @@ -1210,6 +1216,26 @@ class BidiClient } private: + void MaybeAsyncWrite(bool first_write_async) { + if (first_write_async) { + // Make sure that we have a write to issue. + // TODO(vjpai): Make this work with 0 writes case as well. + assert(msgs_to_send_ >= 1); + + AddHold(); + async_write_thread_ = std::thread([this] { + std::unique_lock lock(async_write_thread_mu_); + async_write_thread_cv_.wait( + lock, [this] { return async_write_thread_start_; }); + MaybeWrite(); + }); + std::lock_guard lock(async_write_thread_mu_); + async_write_thread_start_ = true; + async_write_thread_cv_.notify_one(); + return; + } + MaybeWrite(); + } void MaybeWrite() { if (client_cancel_.cancel && writes_complete_ == client_cancel_.ops_before_cancel) { @@ -1231,13 +1257,57 @@ class BidiClient std::mutex mu_; std::condition_variable cv_; bool done_ = false; + std::thread async_write_thread_; + bool async_write_thread_start_ = false; + std::mutex async_write_thread_mu_; + std::condition_variable async_write_thread_cv_; }; TEST_P(ClientCallbackEnd2endTest, BidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test{stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend}; + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/false); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/true); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/true, /*first_write_async=*/false); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/true, /*first_write_async=*/true); test.Await(); // Make sure that the server interceptors were not notified of a cancel if (GetParam().use_interceptors) { @@ -1248,8 +1318,10 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test{stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}}; + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/false, + ClientCancelInfo(2)); test.Await(); // Make sure that the server interceptors were notified of a cancel if (GetParam().use_interceptors) { @@ -1261,7 +1333,8 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2}; + BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2, + /*cork_metadata=*/false, /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1274,7 +1347,9 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; + BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING, + /*num_msgs_to_send=*/10, /*cork_metadata=*/false, + /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1287,7 +1362,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5}; + BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5, + /*cork_metadata=*/false, /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) {