diff --git a/BUILD b/BUILD index 70dc45dc89a..0866d6d79ac 100644 --- a/BUILD +++ b/BUILD @@ -653,15 +653,17 @@ grpc_cc_library( }), external_deps = [ "absl/base:core_headers", + "absl/cleanup", "absl/container:flat_hash_map", "absl/hash", "absl/memory", "absl/meta:type_traits", "absl/status", + "absl/status:statusor", "absl/strings", "absl/synchronization", - "absl/status:statusor", "absl/time", + "absl/types:variant", ], language = "c++", public_hdrs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 222cadbca8b..ede84646b09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -110,6 +110,8 @@ set(gRPC_ABSL_USED_TARGETS absl_bits absl_city absl_civil_time + absl_cleanup + absl_cleanup_internal absl_compressed_tuple absl_config absl_container_common @@ -3171,6 +3173,7 @@ target_link_libraries(grpc++ ${_gRPC_BASELIB_LIBRARIES} ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc ) @@ -8306,6 +8309,7 @@ target_include_directories(binder_transport_test target_link_libraries(binder_transport_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -10186,6 +10190,7 @@ target_include_directories(endpoint_binder_pool_test target_link_libraries(endpoint_binder_pool_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -10610,6 +10615,7 @@ target_include_directories(fake_binder_test target_link_libraries(fake_binder_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -17585,6 +17591,7 @@ target_include_directories(transport_stream_receiver_test target_link_libraries(transport_stream_receiver_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -17969,6 +17976,7 @@ target_include_directories(wire_reader_test target_link_libraries(wire_reader_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -18061,6 +18069,7 @@ target_include_directories(wire_writer_test target_link_libraries(wire_writer_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::cleanup grpc_test_util ) @@ -20473,7 +20482,7 @@ generate_pkgconfig( "gRPC++" "C++ wrapper for gRPC" "${gRPC_CPP_VERSION}" - "grpc absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" + "grpc absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" "-lgrpc++" "" "grpc++.pc") diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index ed8e8125cda..c1adeed03b0 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -2809,6 +2809,7 @@ libs: - src/cpp/util/string_ref.cc - src/cpp/util/time_cc.cc deps: + - absl/cleanup:cleanup - grpc baselib: true - name: grpc++_alts @@ -4890,6 +4891,7 @@ targets: - test/core/transport/binder/binder_transport_test.cc - test/core/transport/binder/mock_objects.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: bitset_test @@ -5627,6 +5629,7 @@ targets: - test/core/transport/binder/endpoint_binder_pool_test.cc - test/core/transport/binder/mock_objects.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: endpoint_config_test @@ -5862,6 +5865,7 @@ targets: - test/core/transport/binder/end2end/fake_binder.cc - test/core/transport/binder/end2end/fake_binder_test.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: file_watcher_certificate_provider_factory_test @@ -8988,6 +8992,7 @@ targets: - src/cpp/util/time_cc.cc - test/core/transport/binder/transport_stream_receiver_test.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: try_join_test @@ -9195,6 +9200,7 @@ targets: - test/core/transport/binder/mock_objects.cc - test/core/transport/binder/wire_reader_test.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: wire_writer_test @@ -9290,6 +9296,7 @@ targets: - test/core/transport/binder/mock_objects.cc - test/core/transport/binder/wire_writer_test.cc deps: + - absl/cleanup:cleanup - grpc_test_util uses_polling: false - name: work_serializer_test diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 0253348860e..71a5b64da85 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -198,6 +198,7 @@ Pod::Spec.new do |s| abseil_version = '1.20211102.0' ss.dependency 'abseil/base/base', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version + ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/container/flat_hash_map', abseil_version ss.dependency 'abseil/container/flat_hash_set', abseil_version ss.dependency 'abseil/container/inlined_vector', abseil_version diff --git a/grpc.gyp b/grpc.gyp index f1d7c81c4e0..2179683415f 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1464,6 +1464,7 @@ 'target_name': 'grpc++', 'type': 'static_library', 'dependencies': [ + 'absl/cleanup:cleanup', 'grpc', ], 'sources': [ diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc index d61ae72d0ff..d478980f668 100644 --- a/src/core/ext/transport/binder/transport/binder_transport.cc +++ b/src/core/ext/transport/binder/transport/binder_transport.cc @@ -395,10 +395,11 @@ static void perform_stream_op_locked(void* stream_op, if (!gbs->is_client) { // Send trailing metadata to inform the other end about the cancellation, // regardless if we'd already done that or not. - grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbt->is_client); - cancel_tx.SetSuffix(grpc_binder::Metadata{}); - cancel_tx.SetStatus(1); - absl::Status status = gbt->wire_writer->RpcCall(cancel_tx); + auto cancel_tx = absl::make_unique( + gbs->GetTxCode(), gbt->is_client); + cancel_tx->SetSuffix(grpc_binder::Metadata{}); + cancel_tx->SetStatus(1); + absl::Status status = gbt->wire_writer->RpcCall(std::move(cancel_tx)); } cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error); if (op->on_complete != nullptr) { @@ -439,16 +440,17 @@ static void perform_stream_op_locked(void* stream_op, } int tx_code = gbs->tx_code; - grpc_binder::Transaction tx(tx_code, gbt->is_client); + auto tx = + absl::make_unique(tx_code, gbt->is_client); if (op->send_initial_metadata) { gpr_log(GPR_INFO, "send_initial_metadata"); grpc_binder::Metadata init_md; auto batch = op->payload->send_initial_metadata.send_initial_metadata; - grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx, &init_md); + grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), &init_md); batch->Encode(&encoder); - tx.SetPrefix(init_md); + tx->SetPrefix(init_md); } if (op->send_message) { gpr_log(GPR_INFO, "send_message"); @@ -469,7 +471,7 @@ static void perform_stream_op_locked(void* stream_op, message_data += std::string(reinterpret_cast(p), len); grpc_slice_unref_internal(message_slice); } - tx.SetData(message_data); + tx->SetData(message_data); // TODO(b/192369787): Are we supposed to reset here to avoid // use-after-free issue in call.cc? op->payload->send_message.send_message.reset(); @@ -480,13 +482,13 @@ static void perform_stream_op_locked(void* stream_op, auto batch = op->payload->send_trailing_metadata.send_trailing_metadata; grpc_binder::Metadata trailing_metadata; - grpc_binder::MetadataEncoder encoder(gbt->is_client, &tx, + grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), &trailing_metadata); batch->Encode(&encoder); // TODO(mingcl): Will we ever has key-value pair here? According to // wireformat client suffix data is always empty. - tx.SetSuffix(trailing_metadata); + tx->SetSuffix(trailing_metadata); } if (op->recv_initial_metadata) { gpr_log(GPR_INFO, "recv_initial_metadata"); @@ -556,10 +558,7 @@ static void perform_stream_op_locked(void* stream_op, absl::Status status = absl::OkStatus(); if (op->send_initial_metadata || op->send_message || op->send_trailing_metadata) { - // TODO(waynetu): RpcCall() is doing a lot of work (including waiting for - // acknowledgements from the other side). Consider delaying this operation - // with combiner. - status = gbt->wire_writer->RpcCall(tx); + status = gbt->wire_writer->RpcCall(std::move(tx)); if (!gbs->is_client && op->send_trailing_metadata) { gbs->trailing_metadata_sent = true; // According to transport explaineer - "Server extra: This op shouldn't diff --git a/src/core/ext/transport/binder/wire_format/binder_constants.h b/src/core/ext/transport/binder/wire_format/binder_constants.h index 5479f1af3e0..5b06bd919bc 100644 --- a/src/core/ext/transport/binder/wire_format/binder_constants.h +++ b/src/core/ext/transport/binder/wire_format/binder_constants.h @@ -28,7 +28,7 @@ ABSL_CONST_INIT extern const int LAST_CALL_TRANSACTION; namespace grpc_binder { -enum class BinderTransportTxCode { +enum class BinderTransportTxCode : int32_t { SETUP_TRANSPORT = 1, SHUTDOWN_TRANSPORT = 2, ACKNOWLEDGE_BYTES = 3, diff --git a/src/core/ext/transport/binder/wire_format/transaction.h b/src/core/ext/transport/binder/wire_format/transaction.h index eb6d18057f3..1bd61377c73 100644 --- a/src/core/ext/transport/binder/wire_format/transaction.h +++ b/src/core/ext/transport/binder/wire_format/transaction.h @@ -85,6 +85,9 @@ class Transaction { absl::string_view GetMessageData() const { return message_data_; } absl::string_view GetStatusDesc() const { return status_desc_; } + Transaction(const Transaction&) = delete; + void operator=(const Transaction&) = delete; + private: int tx_code_; bool is_client_; diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc index fbce41620c2..0f508e7124e 100644 --- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc +++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc @@ -80,7 +80,9 @@ WireReaderImpl::WireReaderImpl( : transport_stream_receiver_(std::move(transport_stream_receiver)), is_client_(is_client), security_policy_(security_policy), - on_destruct_callback_(on_destruct_callback) {} + on_destruct_callback_(on_destruct_callback) { + gpr_log(GPR_INFO, "%s mu_ = %p", __func__, &mu_); +} WireReaderImpl::~WireReaderImpl() { if (on_destruct_callback_) { @@ -168,11 +170,12 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, return absl::OkStatus(); } - grpc_core::MutexLock lock(&mu_); - - if (BinderTransportTxCode(code) != BinderTransportTxCode::SETUP_TRANSPORT && - !connected_) { - return absl::InvalidArgumentError("Transports not connected yet"); + { + grpc_core::MutexLock lock(&mu_); + if (BinderTransportTxCode(code) != BinderTransportTxCode::SETUP_TRANSPORT && + !connected_) { + return absl::InvalidArgumentError("Transports not connected yet"); + } } // TODO(mingcl): See if we want to check the security policy for every RPC @@ -180,6 +183,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, switch (BinderTransportTxCode(code)) { case BinderTransportTxCode::SETUP_TRANSPORT: { + grpc_core::MutexLock lock(&mu_); if (recvd_setup_transport_) { return absl::InvalidArgumentError( "Already received a SETUP_TRANSPORT request"); @@ -223,8 +227,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, case BinderTransportTxCode::ACKNOWLEDGE_BYTES: { int64_t num_bytes = -1; RETURN_IF_ERROR(parcel->ReadInt64(&num_bytes)); - gpr_log(GPR_INFO, "received acknowledge bytes = %lld", - static_cast(num_bytes)); + gpr_log(GPR_INFO, "received acknowledge bytes = %" PRId64, num_bytes); wire_writer_->OnAckReceived(num_bytes); break; } @@ -250,50 +253,70 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, absl::Status WireReaderImpl::ProcessStreamingTransaction( transaction_code_t code, ReadableParcel* parcel) { - grpc_core::MutexLock lock(&mu_); - if (!connected_) { - return absl::InvalidArgumentError("Transports not connected yet"); - } - - // Indicate which callbacks should be cancelled. It will be initialized as the - // flags the in-coming transaction carries, and when a particular callback is - // completed, the corresponding bit in cancellation_flag will be set to 0 so - // that we won't cancel it afterward. - int cancellation_flags = 0; - absl::Status status = - ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags); - if (!status.ok()) { - gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", - status.ToString().c_str()); - // Something went wrong when receiving transaction. Cancel failed requests. - if (cancellation_flags & kFlagPrefix) { - gpr_log(GPR_INFO, "cancelling initial metadata"); - transport_stream_receiver_->NotifyRecvInitialMetadata(code, status); + bool need_to_send_ack = false; + int64_t num_bytes = 0; + absl::Status tx_process_result; + { + grpc_core::MutexLock lock(&mu_); + if (!connected_) { + return absl::InvalidArgumentError("Transports not connected yet"); } - if (cancellation_flags & kFlagMessageData) { - gpr_log(GPR_INFO, "cancelling message data"); - transport_stream_receiver_->NotifyRecvMessage(code, status); + + // Indicate which callbacks should be cancelled. It will be initialized as + // the flags the in-coming transaction carries, and when a particular + // callback is completed, the corresponding bit in cancellation_flag will be + // set to 0 so that we won't cancel it afterward. + int cancellation_flags = 0; + tx_process_result = + ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags); + if (!tx_process_result.ok()) { + gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", + tx_process_result.ToString().c_str()); + // Something went wrong when receiving transaction. Cancel failed + // requests. + if (cancellation_flags & kFlagPrefix) { + gpr_log(GPR_INFO, "cancelling initial metadata"); + transport_stream_receiver_->NotifyRecvInitialMetadata( + code, tx_process_result); + } + if (cancellation_flags & kFlagMessageData) { + gpr_log(GPR_INFO, "cancelling message data"); + transport_stream_receiver_->NotifyRecvMessage(code, tx_process_result); + } + if (cancellation_flags & kFlagSuffix) { + gpr_log(GPR_INFO, "cancelling trailing metadata"); + transport_stream_receiver_->NotifyRecvTrailingMetadata( + code, tx_process_result, 0); + } } - if (cancellation_flags & kFlagSuffix) { - gpr_log(GPR_INFO, "cancelling trailing metadata"); - transport_stream_receiver_->NotifyRecvTrailingMetadata(code, status, 0); + if ((num_incoming_bytes_ - num_acknowledged_bytes_) >= + kFlowControlAckBytes) { + need_to_send_ack = true; + num_bytes = num_incoming_bytes_; + num_acknowledged_bytes_ = num_incoming_bytes_; } } - if ((num_incoming_bytes_ - num_acknowledged_bytes_) >= kFlowControlAckBytes) { + if (need_to_send_ack) { GPR_ASSERT(wire_writer_); - absl::Status ack_status = wire_writer_->SendAck(num_incoming_bytes_); - if (status.ok()) { - status = ack_status; + // wire_writer_ should not be accessed while holding mu_! + // Otherwise, it is possible that + // 1. wire_writer_::mu_ is acquired before mu_ (NDK call back during + // transaction) + // 2. mu_ is acquired before wire_writer_::mu_ (e.g. Java call back us, and + // we call WireWriter::SendAck which will try to acquire wire_writer_::mu_) + absl::Status ack_status = wire_writer_->SendAck(num_bytes); + if (tx_process_result.ok()) { + return ack_status; } - num_acknowledged_bytes_ = num_incoming_bytes_; } - return status; + return tx_process_result; } absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags) { GPR_ASSERT(cancellation_flags); num_incoming_bytes_ += parcel->GetDataSize(); + gpr_log(GPR_INFO, "Total incoming bytes: %" PRId64, num_incoming_bytes_); int flags; RETURN_IF_ERROR(parcel->ReadInt32(&flags)); diff --git a/src/core/ext/transport/binder/wire_format/wire_writer.cc b/src/core/ext/transport/binder/wire_format/wire_writer.cc index fe796a927c8..c194a32a377 100644 --- a/src/core/ext/transport/binder/wire_format/wire_writer.cc +++ b/src/core/ext/transport/binder/wire_format/wire_writer.cc @@ -20,6 +20,9 @@ #include +#include "absl/cleanup/cleanup.h" +#include "absl/types/variant.h" + #include #define RETURN_IF_ERROR(expr) \ @@ -29,11 +32,22 @@ } while (0) namespace grpc_binder { -WireWriterImpl::WireWriterImpl(std::unique_ptr binder) - : binder_(std::move(binder)) {} -absl::Status WireWriterImpl::WriteInitialMetadata(const Transaction& tx, - WritableParcel* parcel) { +bool CanBeSentInOneTransaction(const Transaction& tx) { + return (tx.GetFlags() & kFlagMessageData) == 0 || + static_cast(tx.GetMessageData().size()) <= + WireWriterImpl::kBlockSize; +} + +// Simply forward the call to `WireWriterImpl::RunScheduledTx`. +void RunScheduledTx(void* arg, grpc_error_handle /*error*/) { + auto* run_scheduled_tx_args = + static_cast(arg); + run_scheduled_tx_args->writer->RunScheduledTxInternal(run_scheduled_tx_args); +} + +absl::Status WriteInitialMetadata(const Transaction& tx, + WritableParcel* parcel) { if (tx.IsClient()) { // Only client sends method ref. RETURN_IF_ERROR(parcel->WriteString(tx.GetMethodRef())); @@ -46,8 +60,8 @@ absl::Status WireWriterImpl::WriteInitialMetadata(const Transaction& tx, return absl::OkStatus(); } -absl::Status WireWriterImpl::WriteTrailingMetadata(const Transaction& tx, - WritableParcel* parcel) { +absl::Status WriteTrailingMetadata(const Transaction& tx, + WritableParcel* parcel) { if (tx.IsServer()) { if (tx.GetFlags() & kFlagStatusDescription) { RETURN_IF_ERROR(parcel->WriteString(tx.GetStatusDesc())); @@ -66,119 +80,324 @@ absl::Status WireWriterImpl::WriteTrailingMetadata(const Transaction& tx, return absl::OkStatus(); } -const int64_t WireWriterImpl::kBlockSize = 16 * 1024; -const int64_t WireWriterImpl::kFlowControlWindowSize = 128 * 1024; +WireWriterImpl::WireWriterImpl(std::unique_ptr binder) + : binder_(std::move(binder)), combiner_(grpc_combiner_create()) { + gpr_log(GPR_INFO, "%s write_mu_ = %p , flow_control_mu_ = %p", __func__, + &write_mu_, &flow_control_mu_); +} -bool WireWriterImpl::CanBeSentInOneTransaction(const Transaction& tx) const { - return (tx.GetFlags() & kFlagMessageData) == 0 || - tx.GetMessageData().size() <= kBlockSize; +WireWriterImpl::~WireWriterImpl() { + GRPC_COMBINER_UNREF(combiner_, "wire_writer_impl"); + while (!pending_outgoing_tx_.empty()) { + delete pending_outgoing_tx_.front(); + pending_outgoing_tx_.pop(); + } } -absl::Status WireWriterImpl::RpcCallFastPath(const Transaction& tx) { - int& seq = seq_num_[tx.GetTxCode()]; - // Fast path: send data in one transaction. +// Flow control constant are specified at +// https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#flow-control +const int64_t WireWriterImpl::kBlockSize = 16 * 1024; +const int64_t WireWriterImpl::kFlowControlWindowSize = 128 * 1024; + +absl::Status WireWriterImpl::MakeBinderTransaction( + BinderTransportTxCode tx_code, + std::function fill_parcel) { + grpc_core::MutexLock lock(&write_mu_); RETURN_IF_ERROR(binder_->PrepareTransaction()); WritableParcel* parcel = binder_->GetWritableParcel(); - RETURN_IF_ERROR(parcel->WriteInt32(tx.GetFlags())); - RETURN_IF_ERROR(parcel->WriteInt32(seq++)); - if (tx.GetFlags() & kFlagPrefix) { - RETURN_IF_ERROR(WriteInitialMetadata(tx, parcel)); - } - if (tx.GetFlags() & kFlagMessageData) { - RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(tx.GetMessageData())); - } - if (tx.GetFlags() & kFlagSuffix) { - RETURN_IF_ERROR(WriteTrailingMetadata(tx, parcel)); + RETURN_IF_ERROR(fill_parcel(parcel)); + // Only stream transaction is accounted in flow control spec. + if (static_cast(tx_code) >= kFirstCallId) { + int64_t parcel_size = parcel->GetDataSize(); + if (parcel_size > 2 * kBlockSize) { + gpr_log(GPR_ERROR, + "Unexpected large transaction (possibly caused by a very large " + "metadata). This might overflow the binder " + "transaction buffer. Size: %" PRId64 " bytes", + parcel_size); + } + num_outgoing_bytes_ += parcel_size; + gpr_log(GPR_INFO, "Total outgoing bytes: %" PRId64, + num_outgoing_bytes_.load()); } - // FIXME(waynetu): Construct BinderTransportTxCode from an arbitrary integer - // is an undefined behavior. - return binder_->Transact(BinderTransportTxCode(tx.GetTxCode())); + GPR_ASSERT(!is_transacting_); + is_transacting_ = true; + absl::Status result = binder_->Transact(tx_code); + is_transacting_ = false; + return result; } -bool WireWriterImpl::WaitForAcknowledgement() { - if (num_outgoing_bytes_ < num_acknowledged_bytes_ + kFlowControlWindowSize) { - return true; - } - absl::Time deadline = absl::Now() + absl::Seconds(1); - do { - if (cv_.WaitWithDeadline(&mu_, deadline)) { - return false; +absl::Status WireWriterImpl::RpcCallFastPath(std::unique_ptr tx) { + return MakeBinderTransaction( + BinderTransportTxCode(tx->GetTxCode()), + [this, tx = tx.get()]( + WritableParcel* parcel) ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) { + RETURN_IF_ERROR(parcel->WriteInt32(tx->GetFlags())); + RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++)); + if (tx->GetFlags() & kFlagPrefix) { + RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel)); + } + if (tx->GetFlags() & kFlagMessageData) { + RETURN_IF_ERROR( + parcel->WriteByteArrayWithLength(tx->GetMessageData())); + } + if (tx->GetFlags() & kFlagSuffix) { + RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel)); + } + return absl::OkStatus(); + }); +} + +absl::Status WireWriterImpl::RunStreamTx( + RunScheduledTxArgs::StreamTx* stream_tx, WritableParcel* parcel, + bool* is_last_chunk) { + Transaction* tx = stream_tx->tx.get(); + // Transaction without data flag should go to fast path. + GPR_ASSERT(tx->GetFlags() & kFlagMessageData); + + absl::string_view data = tx->GetMessageData(); + GPR_ASSERT(stream_tx->bytes_sent <= static_cast(data.size())); + + int flags = kFlagMessageData; + + if (stream_tx->bytes_sent == 0) { + // This is the first transaction. Include initial + // metadata if there's any. + if (tx->GetFlags() & kFlagPrefix) { + flags |= kFlagPrefix; } - if (absl::Now() >= deadline) { - return false; + } + // There is also prefix/suffix in transaction beside the transaction data so + // actual transaction size will be greater than `kBlockSize`. This is + // unavoidable because we cannot split the prefix metadata and trailing + // metadata into different binder transactions. In most cases this is fine + // because single transaction size is not required to be strictly lower than + // `kBlockSize`, as long as it won't overflow Android's binder buffer. + int64_t size = std::min(WireWriterImpl::kBlockSize, + data.size() - stream_tx->bytes_sent); + if (stream_tx->bytes_sent + WireWriterImpl::kBlockSize >= + static_cast(data.size())) { + // This is the last transaction. Include trailing + // metadata if there's any. + if (tx->GetFlags() & kFlagSuffix) { + flags |= kFlagSuffix; } - } while (num_outgoing_bytes_ >= - num_acknowledged_bytes_ + kFlowControlWindowSize); - return true; + size = data.size() - stream_tx->bytes_sent; + *is_last_chunk = true; + } else { + // There are more messages to send. + flags |= kFlagMessageDataIsPartial; + *is_last_chunk = false; + } + RETURN_IF_ERROR(parcel->WriteInt32(flags)); + RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++)); + if (flags & kFlagPrefix) { + RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel)); + } + RETURN_IF_ERROR(parcel->WriteByteArrayWithLength( + data.substr(stream_tx->bytes_sent, size))); + if (flags & kFlagSuffix) { + RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel)); + } + stream_tx->bytes_sent += size; + return absl::OkStatus(); } -absl::Status WireWriterImpl::RpcCall(const Transaction& tx) { - // TODO(mingcl): check tx_code <= last call id - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(tx.GetTxCode() >= kFirstCallId); - if (CanBeSentInOneTransaction(tx)) { - return RpcCallFastPath(tx); - } - // Slow path: the message data is too large to fit in one transaction. - int& seq = seq_num_[tx.GetTxCode()]; - int original_flags = tx.GetFlags(); - GPR_ASSERT(original_flags & kFlagMessageData); - absl::string_view data = tx.GetMessageData(); - size_t bytes_sent = 0; - while (bytes_sent < data.size()) { - if (!WaitForAcknowledgement()) { - return absl::InternalError("Timeout waiting for acknowledgement"); +void WireWriterImpl::RunScheduledTxInternal(RunScheduledTxArgs* args) { + GPR_ASSERT(args->writer == this); + if (absl::holds_alternative(args->tx)) { + int64_t num_bytes = + absl::get(args->tx).num_bytes; + absl::Status result = + MakeBinderTransaction(BinderTransportTxCode::ACKNOWLEDGE_BYTES, + [num_bytes](WritableParcel* parcel) { + RETURN_IF_ERROR(parcel->WriteInt64(num_bytes)); + return absl::OkStatus(); + }); + if (!result.ok()) { + gpr_log(GPR_ERROR, "Failed to make binder transaction %s", + result.ToString().c_str()); } - RETURN_IF_ERROR(binder_->PrepareTransaction()); - WritableParcel* parcel = binder_->GetWritableParcel(); - size_t size = - std::min(static_cast(kBlockSize), data.size() - bytes_sent); - int flags = kFlagMessageData; - if (bytes_sent == 0) { - // This is the first transaction. Include initial metadata if there's any. - if (original_flags & kFlagPrefix) { - flags |= kFlagPrefix; - } - } - if (bytes_sent + kBlockSize >= data.size()) { - // This is the last transaction. Include trailing metadata if there's any. - if (original_flags & kFlagSuffix) { - flags |= kFlagSuffix; - } - } else { - // There are more messages to send. - flags |= kFlagMessageDataIsPartial; + delete args; + return; + } + GPR_ASSERT(absl::holds_alternative(args->tx)); + RunScheduledTxArgs::StreamTx* stream_tx = + &absl::get(args->tx); + // Be reservative. Decrease CombinerTxCount after the data size of this + // transaction has already been added to `num_outgoing_bytes_`, to make sure + // we never underestimate `num_outgoing_bytes_`. + auto decrease_combiner_tx_count = absl::MakeCleanup([this]() { + { + grpc_core::MutexLock lock(&flow_control_mu_); + GPR_ASSERT(num_non_acked_tx_in_combiner_ > 0); + num_non_acked_tx_in_combiner_--; } - RETURN_IF_ERROR(parcel->WriteInt32(flags)); - RETURN_IF_ERROR(parcel->WriteInt32(seq++)); - if (flags & kFlagPrefix) { - RETURN_IF_ERROR(WriteInitialMetadata(tx, parcel)); + // New transaction might be ready to be scheduled. + TryScheduleTransaction(); + }); + if (CanBeSentInOneTransaction(*stream_tx->tx.get())) { // NOLINT + absl::Status result = RpcCallFastPath(std::move(stream_tx->tx)); + if (!result.ok()) { + gpr_log(GPR_ERROR, "Failed to handle non-chunked RPC call %s", + result.ToString().c_str()); } - RETURN_IF_ERROR( - parcel->WriteByteArrayWithLength(data.substr(bytes_sent, size))); - if (flags & kFlagSuffix) { - RETURN_IF_ERROR(WriteTrailingMetadata(tx, parcel)); + delete args; + return; + } + bool is_last_chunk = true; + absl::Status result = MakeBinderTransaction( + BinderTransportTxCode(stream_tx->tx->GetTxCode()), + [stream_tx, &is_last_chunk, this](WritableParcel* parcel) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) { + return RunStreamTx(stream_tx, parcel, &is_last_chunk); + }); + if (!result.ok()) { + gpr_log(GPR_ERROR, "Failed to make binder transaction %s", + result.ToString().c_str()); + } + if (!is_last_chunk) { + { + grpc_core::MutexLock lock(&flow_control_mu_); + pending_outgoing_tx_.push(args); } - num_outgoing_bytes_ += parcel->GetDataSize(); - RETURN_IF_ERROR(binder_->Transact(BinderTransportTxCode(tx.GetTxCode()))); - bytes_sent += size; + TryScheduleTransaction(); + } else { + delete args; + } +} + +absl::Status WireWriterImpl::RpcCall(std::unique_ptr tx) { + // TODO(mingcl): check tx_code <= last call id + GPR_ASSERT(tx->GetTxCode() >= kFirstCallId); + auto args = new RunScheduledTxArgs(); + args->writer = this; + args->tx = RunScheduledTxArgs::StreamTx(); + absl::get(args->tx).tx = std::move(tx); + absl::get(args->tx).bytes_sent = 0; + { + grpc_core::MutexLock lock(&flow_control_mu_); + pending_outgoing_tx_.push(args); } + TryScheduleTransaction(); return absl::OkStatus(); } absl::Status WireWriterImpl::SendAck(int64_t num_bytes) { - grpc_core::MutexLock lock(&mu_); - RETURN_IF_ERROR(binder_->PrepareTransaction()); - WritableParcel* parcel = binder_->GetWritableParcel(); - RETURN_IF_ERROR(parcel->WriteInt64(num_bytes)); - return binder_->Transact(BinderTransportTxCode::ACKNOWLEDGE_BYTES); + // Ensure combiner will be run if this is not called from top-level gRPC API + // entrypoint. + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_INFO, "Ack %" PRId64 " bytes received", num_bytes); + if (is_transacting_) { + // This can happen because NDK might call our registered callback function + // in the same thread while we are telling it to send a transaction + // `is_transacting_` will be true. `Binder::Transact` is now being called on + // the same thread or the other thread. We are currently in the call stack + // of other transaction, Liveness of ACK is still guaranteed even if this is + // a race with another thread. + gpr_log( + GPR_INFO, + "Scheduling ACK transaction instead of directly execute it to avoid " + "deadlock."); + auto args = new RunScheduledTxArgs(); + args->writer = this; + args->tx = RunScheduledTxArgs::AckTx(); + absl::get(args->tx).num_bytes = num_bytes; + auto cl = GRPC_CLOSURE_CREATE(RunScheduledTx, args, nullptr); + combiner_->Run(cl, GRPC_ERROR_NONE); + return absl::OkStatus(); + } + // Otherwise, we can directly send ack. + absl::Status result = MakeBinderTransaction( + BinderTransportTxCode(BinderTransportTxCode::ACKNOWLEDGE_BYTES), + [num_bytes](WritableParcel* parcel) { + RETURN_IF_ERROR(parcel->WriteInt64(num_bytes)); + return absl::OkStatus(); + }); + if (!result.ok()) { + gpr_log(GPR_ERROR, "Failed to make binder transaction %s", + result.ToString().c_str()); + } + return result; } void WireWriterImpl::OnAckReceived(int64_t num_bytes) { - grpc_core::MutexLock lock(&mu_); - num_acknowledged_bytes_ = std::max(num_acknowledged_bytes_, num_bytes); - cv_.Signal(); + // Ensure combiner will be run if this is not called from top-level gRPC API + // entrypoint. + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_INFO, "OnAckReceived %" PRId64, num_bytes); + // Do not try to obtain `write_mu_` in this function. NDKBinder might invoke + // the callback to notify us about new incoming binder transaction when we are + // sending transaction. i.e. `write_mu_` might have already been acquired by + // this thread. + { + grpc_core::MutexLock lock(&flow_control_mu_); + num_acknowledged_bytes_ = std::max(num_acknowledged_bytes_, num_bytes); + int64_t num_outgoing_bytes = num_outgoing_bytes_; + if (num_acknowledged_bytes_ > num_outgoing_bytes) { + gpr_log(GPR_ERROR, + "The other end of transport acked more bytes than we ever sent, " + "%" PRId64 " > %" PRId64, + num_acknowledged_bytes_, num_outgoing_bytes); + } + } + TryScheduleTransaction(); +} + +void WireWriterImpl::TryScheduleTransaction() { + while (true) { + grpc_core::MutexLock lock(&flow_control_mu_); + if (pending_outgoing_tx_.empty()) { + // Nothing to be schduled. + break; + } + // Number of bytes we have scheduled in combiner but have not yet be + // executed by combiner. Here we make an assumption that every binder + // transaction will take `kBlockSize`. This should be close to truth when a + // large message is being cut to `kBlockSize` chunks. + int64_t num_bytes_scheduled_in_combiner = + num_non_acked_tx_in_combiner_ * kBlockSize; + // An estimation of number of bytes of traffic we will eventually send to + // the other end, assuming all tasks in combiner will be executed and we + // receive no new ACK from the other end of transport. + int64_t num_total_bytes_will_be_sent = + num_outgoing_bytes_ + num_bytes_scheduled_in_combiner; + // An estimation of number of bytes of traffic that will not be + // acknowledged, assuming all tasks in combiner will be executed and we + // receive no new ack message fomr the other end of transport. + int64_t num_non_acked_bytes_estimation = + num_total_bytes_will_be_sent - num_acknowledged_bytes_; + if (num_non_acked_bytes_estimation < 0) { + gpr_log( + GPR_ERROR, + "Something went wrong. `num_non_acked_bytes_estimation` should be " + "non-negative but it is %" PRId64, + num_non_acked_bytes_estimation); + } + // If we can schedule another transaction (which has size estimation of + // `kBlockSize`) without exceeding `kFlowControlWindowSize`, schedule it. + if ((num_non_acked_bytes_estimation + kBlockSize < + kFlowControlWindowSize)) { + num_non_acked_tx_in_combiner_++; + combiner_->Run(GRPC_CLOSURE_CREATE(RunScheduledTx, + pending_outgoing_tx_.front(), nullptr), + GRPC_ERROR_NONE); + pending_outgoing_tx_.pop(); + } else { + // It is common to fill `kFlowControlWindowSize` completely because + // transactions are send at faster rate than the other end of transport + // can handle it, so here we use `GPR_DEBUG` log level. + gpr_log(GPR_DEBUG, + "Some work cannot be scheduled yet due to slow ack from the " + "other end of transport. This transport might be blocked if this " + "number don't go down. pending_outgoing_tx_.size() = %zu " + "pending_outgoing_tx_.front() = %p", + pending_outgoing_tx_.size(), pending_outgoing_tx_.front()); + break; + } + } } } // namespace grpc_binder + #endif diff --git a/src/core/ext/transport/binder/wire_format/wire_writer.h b/src/core/ext/transport/binder/wire_format/wire_writer.h index 52b082b87c3..53dff6c622d 100644 --- a/src/core/ext/transport/binder/wire_format/wire_writer.h +++ b/src/core/ext/transport/binder/wire_format/wire_writer.h @@ -17,6 +17,7 @@ #include +#include #include #include @@ -25,13 +26,15 @@ #include "src/core/ext/transport/binder/wire_format/binder.h" #include "src/core/ext/transport/binder/wire_format/transaction.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/combiner.h" namespace grpc_binder { +// Member functions are thread safe. class WireWriter { public: virtual ~WireWriter() = default; - virtual absl::Status RpcCall(const Transaction& call) = 0; + virtual absl::Status RpcCall(std::unique_ptr tx) = 0; virtual absl::Status SendAck(int64_t num_bytes) = 0; virtual void OnAckReceived(int64_t num_bytes) = 0; }; @@ -39,44 +42,84 @@ class WireWriter { class WireWriterImpl : public WireWriter { public: explicit WireWriterImpl(std::unique_ptr binder); - absl::Status RpcCall(const Transaction& tx) override; + ~WireWriterImpl() override; + absl::Status RpcCall(std::unique_ptr tx) override; absl::Status SendAck(int64_t num_bytes) override; void OnAckReceived(int64_t num_bytes) override; + // Required to be public because we would like to call this in combiner (which + // cannot invoke class member function). `RunScheduledTxArgs` and + // `RunScheduledTxInternal` should not be used by user directly. + struct RunScheduledTxArgs { + WireWriterImpl* writer; + struct StreamTx { + std::unique_ptr tx; + // How many data in transaction's `data` field has been sent. + int64_t bytes_sent = 0; + }; + struct AckTx { + int64_t num_bytes; + }; + absl::variant tx; + }; + + void RunScheduledTxInternal(RunScheduledTxArgs* arg); + // Split long message into chunks of size 16k. This doesn't necessarily have // to be the same as the flow control acknowledgement size, but it should not // exceed 128k. static const int64_t kBlockSize; + // Flow control allows sending at most 128k between acknowledgements. static const int64_t kFlowControlWindowSize; private: - absl::Status WriteInitialMetadata(const Transaction& tx, - WritableParcel* parcel) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - absl::Status WriteTrailingMetadata(const Transaction& tx, - WritableParcel* parcel) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - bool CanBeSentInOneTransaction(const Transaction& tx) const; - absl::Status RpcCallFastPath(const Transaction& tx) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - // Wait for acknowledgement from the other side for a while (the timeout is - // currently set to 10ms for debugability). Returns true if we are able to - // proceed, and false otherwise. - // - // TODO(waynetu): Currently, RpcCall() will fail if we are blocked for 10ms. - // In the future, we should queue the transactions and release them later when - // acknowledgement comes. - bool WaitForAcknowledgement() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - std::unique_ptr binder_ ABSL_GUARDED_BY(mu_); - absl::flat_hash_map seq_num_ ABSL_GUARDED_BY(mu_); - int64_t num_outgoing_bytes_ ABSL_GUARDED_BY(mu_) = 0; - int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(mu_) = 0; + // Fast path: send data in one transaction. + absl::Status RpcCallFastPath(std::unique_ptr tx); + + // This function will acquire `write_mu_` to make sure the binder is not used + // concurrently, so this can be called by different threads safely. + absl::Status MakeBinderTransaction( + BinderTransportTxCode tx_code, + std::function fill_parcel); + + // Send a stream to `binder_`. Set `is_last_chunk` to `true` if the stream + // transaction has been sent completely. Otherwise set to `false`. + absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx* stream_tx, + WritableParcel* parcel, bool* is_last_chunk) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_); + + // Schdule `RunScheduledTxArgs*` in `pending_outgoing_tx_` to `combiner_`, as + // many as possible (under the constraint of `kFlowControlWindowSize`). + void TryScheduleTransaction(); + + // Guards variables related to transport state. + grpc_core::Mutex write_mu_; + std::unique_ptr binder_ ABSL_GUARDED_BY(write_mu_); + + // Maps the transaction code (which identifies streams) to their next + // available sequence number. See + // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#sequence-number + absl::flat_hash_map next_seq_num_ ABSL_GUARDED_BY(write_mu_); + + // Number of bytes we have already sent in stream transactions. + std::atomic num_outgoing_bytes_{0}; + + // Guards variables related to flow control logic. + grpc_core::Mutex flow_control_mu_; + int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(flow_control_mu_) = 0; + + // The queue takes ownership of the pointer. + std::queue pending_outgoing_tx_ + ABSL_GUARDED_BY(flow_control_mu_); + int num_non_acked_tx_in_combiner_ ABSL_GUARDED_BY(flow_control_mu_) = 0; + + // Helper variable for determining if we are currently calling into + // `Binder::Transact`. Useful for avoiding the attempt of acquiring + // `write_mu_` multiple times on the same thread. + std::atomic_bool is_transacting_{false}; + + grpc_core::Combiner* combiner_; }; } // namespace grpc_binder diff --git a/test/core/transport/binder/binder_transport_test.cc b/test/core/transport/binder/binder_transport_test.cc index b61f55c9d7c..53cc3424be5 100644 --- a/test/core/transport/binder/binder_transport_test.cc +++ b/test/core/transport/binder/binder_transport_test.cc @@ -155,23 +155,23 @@ bool MetadataEquivalent(Metadata a, Metadata b) { // initial_metadata, and message_data. MATCHER_P4(TransactionMatches, flag, method_ref, initial_metadata, message_data, "") { - if (arg.GetFlags() != flag) return false; + if (arg->GetFlags() != flag) return false; if (flag & kFlagPrefix) { - if (arg.GetMethodRef() != method_ref) { + if (arg->GetMethodRef() != method_ref) { printf("METHOD REF NOT EQ: %s %s\n", - std::string(arg.GetMethodRef()).c_str(), + std::string(arg->GetMethodRef()).c_str(), std::string(method_ref).c_str()); return false; } - if (!MetadataEquivalent(arg.GetPrefixMetadata(), initial_metadata)) { + if (!MetadataEquivalent(arg->GetPrefixMetadata(), initial_metadata)) { printf("METADATA NOT EQUIVALENT: %s %s\n", - MetadataString(arg.GetPrefixMetadata()).c_str(), + MetadataString(arg->GetPrefixMetadata()).c_str(), MetadataString(initial_metadata).c_str()); return false; } } if (flag & kFlagMessageData) { - if (arg.GetMessageData() != message_data) return false; + if (arg->GetMessageData() != message_data) return false; } return true; } diff --git a/test/core/transport/binder/end2end/end2end_binder_transport_test.cc b/test/core/transport/binder/end2end/end2end_binder_transport_test.cc index 98ccc9ba4bc..30e5e3f65cf 100644 --- a/test/core/transport/binder/end2end/end2end_binder_transport_test.cc +++ b/test/core/transport/binder/end2end/end2end_binder_transport_test.cc @@ -70,12 +70,14 @@ class End2EndBinderTransportTest protected: std::unique_ptr service_; std::unique_ptr server_; + + private: + grpc_core::ExecCtx exec_ctx; }; } // namespace TEST_P(End2EndBinderTransportTest, SetupTransport) { - grpc_core::ExecCtx exec_ctx; grpc_transport *client_transport, *server_transport; std::tie(client_transport, server_transport) = end2end_testing::CreateClientServerBindersPairForTesting(); diff --git a/test/core/transport/binder/end2end/fake_binder.cc b/test/core/transport/binder/end2end/fake_binder.cc index 6d0d1cd3804..0be3098536b 100644 --- a/test/core/transport/binder/end2end/fake_binder.cc +++ b/test/core/transport/binder/end2end/fake_binder.cc @@ -156,6 +156,7 @@ TransactionProcessor::TransactionProcessor(absl::Duration delay) tx_thread_( "process-thread", [](void* arg) { + grpc_core::ExecCtx exec_ctx; auto* self = static_cast(arg); self->ProcessLoop(); }, @@ -233,6 +234,7 @@ void TransactionProcessor::ProcessLoop() { static_cast(target->owner); auto parcel = absl::make_unique(std::move(data)); tx_receiver->Receive(tx_code, parcel.get()).IgnoreError(); + grpc_core::ExecCtx::Get()->Flush(); } Flush(); } diff --git a/test/core/transport/binder/mock_objects.h b/test/core/transport/binder/mock_objects.h index a48aa4e79a3..6424a2acd22 100644 --- a/test/core/transport/binder/mock_objects.h +++ b/test/core/transport/binder/mock_objects.h @@ -19,6 +19,7 @@ #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" #include "src/core/ext/transport/binder/wire_format/binder.h" +#include "src/core/ext/transport/binder/wire_format/binder_constants.h" #include "src/core/ext/transport/binder/wire_format/wire_reader.h" #include "src/core/ext/transport/binder/wire_format/wire_writer.h" @@ -92,7 +93,8 @@ class MockTransactionReceiver : public TransactionReceiver { class MockWireWriter : public WireWriter { public: - MOCK_METHOD(absl::Status, RpcCall, (const Transaction&), (override)); + MOCK_METHOD(absl::Status, RpcCall, (std::unique_ptr), + (override)); MOCK_METHOD(absl::Status, SendAck, (int64_t), (override)); MOCK_METHOD(void, OnAckReceived, (int64_t), (override)); }; diff --git a/test/core/transport/binder/wire_writer_test.cc b/test/core/transport/binder/wire_writer_test.cc index fddca7dbdeb..b117e1eb275 100644 --- a/test/core/transport/binder/wire_writer_test.cc +++ b/test/core/transport/binder/wire_writer_test.cc @@ -21,6 +21,8 @@ #include "absl/memory/memory.h" +#include + #include "test/core/transport/binder/mock_objects.h" #include "test/core/util/test_config.h" @@ -34,6 +36,10 @@ MATCHER_P(StrEqInt8Ptr, target, "") { } TEST(WireWriterTest, RpcCall) { + grpc::internal::GrpcLibrary init_lib; + init_lib.init(); + // Required because wire writer uses combiner internally. + grpc_core::ExecCtx exec_ctx; auto mock_binder = absl::make_unique(); MockBinder& mock_binder_ref = *mock_binder; MockWritableParcel mock_writable_parcel; @@ -62,9 +68,10 @@ TEST(WireWriterTest, RpcCall) { EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(kFirstCallId))); - Transaction tx(kFirstCallId, /*is_client=*/true); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = std::make_unique(kFirstCallId, /*is_client=*/true); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); sequence_number++; + grpc_core::ExecCtx::Get()->Flush(); } { // flag @@ -95,10 +102,12 @@ TEST(WireWriterTest, RpcCall) { EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(kFirstCallId + 1))); - Transaction tx(kFirstCallId + 1, /*is_client=*/true); - tx.SetPrefix(kMetadata); - tx.SetMethodRef("/example/method/ref"); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = + std::make_unique(kFirstCallId + 1, /*is_client=*/true); + tx->SetPrefix(kMetadata); + tx->SetMethodRef("/example/method/ref"); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); + grpc_core::ExecCtx::Get()->Flush(); } { // flag @@ -109,10 +118,11 @@ TEST(WireWriterTest, RpcCall) { ExpectWriteByteArray("data"); EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(kFirstCallId))); - Transaction tx(kFirstCallId, /*is_client=*/true); - tx.SetData("data"); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = std::make_unique(kFirstCallId, /*is_client=*/true); + tx->SetData("data"); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); sequence_number++; + grpc_core::ExecCtx::Get()->Flush(); } { // flag @@ -122,10 +132,11 @@ TEST(WireWriterTest, RpcCall) { EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(kFirstCallId))); - Transaction tx(kFirstCallId, /*is_client=*/true); - tx.SetSuffix({}); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = std::make_unique(kFirstCallId, /*is_client=*/true); + tx->SetSuffix({}); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); sequence_number++; + grpc_core::ExecCtx::Get()->Flush(); } { // flag @@ -158,15 +169,16 @@ TEST(WireWriterTest, RpcCall) { EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(kFirstCallId))); - Transaction tx(kFirstCallId, /*is_client=*/true); + auto tx = std::make_unique(kFirstCallId, /*is_client=*/true); // TODO(waynetu): Implement a helper function that automatically creates // EXPECT_CALL based on the tx object. - tx.SetPrefix(kMetadata); - tx.SetMethodRef("/example/method/ref"); - tx.SetData(""); - tx.SetSuffix({}); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + tx->SetPrefix(kMetadata); + tx->SetMethodRef("/example/method/ref"); + tx->SetData(""); + tx->SetSuffix({}); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); sequence_number++; + grpc_core::ExecCtx::Get()->Flush(); } // Really large message @@ -197,9 +209,11 @@ TEST(WireWriterTest, RpcCall) { Transact(BinderTransportTxCode(kFirstCallId + 2))); // Use a new stream. - Transaction tx(kFirstCallId + 2, /*is_client=*/true); - tx.SetData(std::string(2 * WireWriterImpl::kBlockSize + 1, 'a')); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = + std::make_unique(kFirstCallId + 2, /*is_client=*/true); + tx->SetData(std::string(2 * WireWriterImpl::kBlockSize + 1, 'a')); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); + grpc_core::ExecCtx::Get()->Flush(); } // Really large message with metadata { @@ -233,13 +247,17 @@ TEST(WireWriterTest, RpcCall) { Transact(BinderTransportTxCode(kFirstCallId + 3))); // Use a new stream. - Transaction tx(kFirstCallId + 3, /*is_client=*/true); - tx.SetPrefix({}); - tx.SetMethodRef("123"); - tx.SetData(std::string(2 * WireWriterImpl::kBlockSize + 1, 'a')); - tx.SetSuffix({}); - EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + auto tx = + std::make_unique(kFirstCallId + 3, /*is_client=*/true); + tx->SetPrefix({}); + tx->SetMethodRef("123"); + tx->SetData(std::string(2 * WireWriterImpl::kBlockSize + 1, 'a')); + tx->SetSuffix({}); + EXPECT_TRUE(wire_writer.RpcCall(std::move(tx)).ok()); + grpc_core::ExecCtx::Get()->Flush(); } + grpc_core::ExecCtx::Get()->Flush(); + init_lib.shutdown(); } } // namespace grpc_binder