@ -24,6 +24,7 @@
# include <utility>
# include <utility>
# include <vector>
# include <vector>
# include "absl/functional/any_invocable.h"
# include "absl/memory/memory.h"
# include "absl/memory/memory.h"
# include "absl/status/statusor.h"
# include "absl/status/statusor.h"
@ -248,29 +249,45 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
transaction_code_t code , ReadableParcel * parcel ) {
transaction_code_t code , ReadableParcel * parcel ) {
bool need_to_send_ack = false ;
bool need_to_send_ack = false ;
int64_t num_bytes = 0 ;
int64_t num_bytes = 0 ;
// Indicates which callbacks should be cancelled. It will be initialized as
// the flags the in-coming transaction carries, and when a particular
// callback is completed, the corresponding bit in cancellation_flag will be
// set to 0 so that we won't cancel it afterward.
int cancellation_flags = 0 ;
// The queue saves the actions needed to be done "WITHOUT" `mu_`.
// It prevents deadlock against wire writer issues.
std : : queue < absl : : AnyInvocable < void ( ) & & > > deferred_func_queue ;
absl : : Status tx_process_result ;
absl : : Status tx_process_result ;
{
{
grpc_core : : MutexLock lock ( & mu_ ) ;
grpc_core : : MutexLock lock ( & mu_ ) ;
if ( ! connected_ ) {
if ( ! connected_ ) {
return absl : : InvalidArgumentError ( " Transports not connected yet " ) ;
return absl : : InvalidArgumentError ( " Transports not connected yet " ) ;
}
}
// Indicate which callbacks should be cancelled. It will be initialized as
tx_process_result = ProcessStreamingTransactionImpl (
// the flags the in-coming transaction carries, and when a particular
code , parcel , & cancellation_flags , deferred_func_queue ) ;
// callback is completed, the corresponding bit in cancellation_flag will be
if ( ( num_incoming_bytes_ - num_acknowledged_bytes_ ) > =
// set to 0 so that we won't cancel it afterward.
kFlowControlAckBytes ) {
int cancellation_flags = 0 ;
need_to_send_ack = true ;
tx_process_result =
num_bytes = num_incoming_bytes_ ;
ProcessStreamingTransactionImpl ( code , parcel , & cancellation_flags ) ;
num_acknowledged_bytes_ = num_incoming_bytes_ ;
}
}
// Executes all actions in the queue.
while ( ! deferred_func_queue . empty ( ) ) {
std : : move ( deferred_func_queue . front ( ) ) ( ) ;
deferred_func_queue . pop ( ) ;
}
if ( ! tx_process_result . ok ( ) ) {
if ( ! tx_process_result . ok ( ) ) {
gpr_log ( GPR_ERROR , " Failed to process streaming transaction: %s " ,
gpr_log ( GPR_ERROR , " Failed to process streaming transaction: %s " ,
tx_process_result . ToString ( ) . c_str ( ) ) ;
tx_process_result . ToString ( ) . c_str ( ) ) ;
// Something went wrong when receiving transaction. Cancel failed
// Something went wrong when receiving transaction. Cancel failed requests.
// requests.
if ( cancellation_flags & kFlagPrefix ) {
if ( cancellation_flags & kFlagPrefix ) {
gpr_log ( GPR_INFO , " cancelling initial metadata " ) ;
gpr_log ( GPR_INFO , " cancelling initial metadata " ) ;
transport_stream_receiver_ - > NotifyRecvInitialMetadata (
transport_stream_receiver_ - > NotifyRecvInitialMetadata ( code ,
code , tx_process_result ) ;
tx_process_result ) ;
}
}
if ( cancellation_flags & kFlagMessageData ) {
if ( cancellation_flags & kFlagMessageData ) {
gpr_log ( GPR_INFO , " cancelling message data " ) ;
gpr_log ( GPR_INFO , " cancelling message data " ) ;
@ -282,13 +299,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
code , tx_process_result , 0 ) ;
code , tx_process_result , 0 ) ;
}
}
}
}
if ( ( num_incoming_bytes_ - num_acknowledged_bytes_ ) > =
kFlowControlAckBytes ) {
need_to_send_ack = true ;
num_bytes = num_incoming_bytes_ ;
num_acknowledged_bytes_ = num_incoming_bytes_ ;
}
}
if ( need_to_send_ack ) {
if ( need_to_send_ack ) {
if ( ! wire_writer_ready_notification_ . WaitForNotificationWithTimeout (
if ( ! wire_writer_ready_notification_ . WaitForNotificationWithTimeout (
absl : : Seconds ( 5 ) ) ) {
absl : : Seconds ( 5 ) ) ) {
@ -310,7 +321,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
}
}
absl : : Status WireReaderImpl : : ProcessStreamingTransactionImpl (
absl : : Status WireReaderImpl : : ProcessStreamingTransactionImpl (
transaction_code_t code , ReadableParcel * parcel , int * cancellation_flags ) {
transaction_code_t code , ReadableParcel * parcel , int * cancellation_flags ,
std : : queue < absl : : AnyInvocable < void ( ) & & > > & deferred_func_queue ) {
GPR_ASSERT ( cancellation_flags ) ;
GPR_ASSERT ( cancellation_flags ) ;
num_incoming_bytes_ + = parcel - > GetDataSize ( ) ;
num_incoming_bytes_ + = parcel - > GetDataSize ( ) ;
gpr_log ( GPR_INFO , " Total incoming bytes: % " PRId64 , num_incoming_bytes_ ) ;
gpr_log ( GPR_INFO , " Total incoming bytes: % " PRId64 , num_incoming_bytes_ ) ;
@ -380,8 +392,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
" binder.authority " ) ;
" binder.authority " ) ;
}
}
}
}
transport_stream_receiver_ - > NotifyRecvInitialMetadata (
deferred_func_queue . emplace ( [ this , code ,
code , * initial_metadata_or_error ) ;
initial_metadata_or_error = std : : move (
initial_metadata_or_error ) ] ( ) mutable {
this - > transport_stream_receiver_ - > NotifyRecvInitialMetadata (
code , std : : move ( initial_metadata_or_error ) ) ;
} ) ;
* cancellation_flags & = ~ kFlagPrefix ;
* cancellation_flags & = ~ kFlagPrefix ;
}
}
if ( flags & kFlagMessageData ) {
if ( flags & kFlagMessageData ) {
@ -396,7 +412,9 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
if ( ( flags & kFlagMessageDataIsPartial ) = = 0 ) {
if ( ( flags & kFlagMessageDataIsPartial ) = = 0 ) {
std : : string s = std : : move ( message_buffer_ [ code ] ) ;
std : : string s = std : : move ( message_buffer_ [ code ] ) ;
message_buffer_ . erase ( code ) ;
message_buffer_ . erase ( code ) ;
transport_stream_receiver_ - > NotifyRecvMessage ( code , std : : move ( s ) ) ;
deferred_func_queue . emplace ( [ this , code , s = std : : move ( s ) ] ( ) mutable {
this - > transport_stream_receiver_ - > NotifyRecvMessage ( code , std : : move ( s ) ) ;
} ) ;
}
}
* cancellation_flags & = ~ kFlagMessageData ;
* cancellation_flags & = ~ kFlagMessageData ;
}
}
@ -416,8 +434,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
}
}
trailing_metadata = * trailing_metadata_or_error ;
trailing_metadata = * trailing_metadata_or_error ;
}
}
transport_stream_receiver_ - > NotifyRecvTrailingMetadata (
deferred_func_queue . emplace (
[ this , code , trailing_metadata = std : : move ( trailing_metadata ) ,
status ] ( ) mutable {
this - > transport_stream_receiver_ - > NotifyRecvTrailingMetadata (
code , std : : move ( trailing_metadata ) , status ) ;
code , std : : move ( trailing_metadata ) , status ) ;
} ) ;
* cancellation_flags & = ~ kFlagSuffix ;
* cancellation_flags & = ~ kFlagSuffix ;
}
}
return absl : : OkStatus ( ) ;
return absl : : OkStatus ( ) ;