@ -30,8 +30,6 @@
# include "absl/memory/memory.h"
# include "absl/memory/memory.h"
# include "absl/status/statusor.h"
# include "absl/status/statusor.h"
# include <grpc/support/log.h>
# include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
# include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
# include "src/core/ext/transport/binder/wire_format/binder.h"
# include "src/core/ext/transport/binder/wire_format/binder.h"
# include "src/core/ext/transport/binder/wire_format/wire_writer.h"
# include "src/core/ext/transport/binder/wire_format/wire_writer.h"
@ -109,11 +107,12 @@ std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport(
void WireReaderImpl : : SendSetupTransport ( Binder * binder ) {
void WireReaderImpl : : SendSetupTransport ( Binder * binder ) {
binder - > Initialize ( ) ;
binder - > Initialize ( ) ;
gpr_log ( GPR_DEBUG , " prepare transaction = %d " ,
const absl : : Status prep_transaction_status = binder - > PrepareTransaction ( ) ;
binder - > PrepareTransaction ( ) . ok ( ) ) ;
VLOG ( 2 ) < < " prepare transaction = " < < prep_transaction_status ;
WritableParcel * writable_parcel = binder - > GetWritableParcel ( ) ;
WritableParcel * writable_parcel = binder - > GetWritableParcel ( ) ;
gpr_log ( GPR_DEBUG , " write int32 = %d " ,
const absl : : Status write_status =
writable_parcel - > WriteInt32 ( kWireFormatVersion ) . ok ( ) ) ;
writable_parcel - > WriteInt32 ( kWireFormatVersion ) ;
VLOG ( 2 ) < < " write int32 = " < < write_status ;
// The lifetime of the transaction receiver is the same as the wire writer's.
// The lifetime of the transaction receiver is the same as the wire writer's.
// The transaction receiver is responsible for not calling the on-transact
// The transaction receiver is responsible for not calling the on-transact
// callback when it's dead.
// callback when it's dead.
@ -127,11 +126,13 @@ void WireReaderImpl::SendSetupTransport(Binder* binder) {
return this - > ProcessTransaction ( code , readable_parcel , uid ) ;
return this - > ProcessTransaction ( code , readable_parcel , uid ) ;
} ) ;
} ) ;
gpr_log ( GPR_DEBUG , " tx_receiver = %p " , tx_receiver_ - > GetRawBinder ( ) ) ;
VLOG ( 2 ) < < " tx_receiver = " < < tx_receiver_ - > GetRawBinder ( ) ;
gpr_log ( GPR_DEBUG , " AParcel_writeStrongBinder = %d " ,
const absl : : Status write_binder_status =
writable_parcel - > WriteBinder ( tx_receiver_ . get ( ) ) . ok ( ) ) ;
writable_parcel - > WriteBinder ( tx_receiver_ . get ( ) ) ;
gpr_log ( GPR_DEBUG , " AIBinder_transact = %d " ,
VLOG ( 2 ) < < " AParcel_writeStrongBinder = " < < write_binder_status ;
binder - > Transact ( BinderTransportTxCode : : SETUP_TRANSPORT ) . ok ( ) ) ;
const absl : : Status transact_status =
binder - > Transact ( BinderTransportTxCode : : SETUP_TRANSPORT ) ;
VLOG ( 2 ) < < " AIBinder_transact = " < < transact_status ;
}
}
std : : unique_ptr < Binder > WireReaderImpl : : RecvSetupTransport ( ) {
std : : unique_ptr < Binder > WireReaderImpl : : RecvSetupTransport ( ) {
@ -181,7 +182,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
}
}
recvd_setup_transport_ = true ;
recvd_setup_transport_ = true ;
gpr_log ( GPR_DEBUG , " calling uid = %d " , uid ) ;
VLOG ( 2 ) < < " calling uid = " < < uid ;
if ( ! security_policy_ - > IsAuthorized ( uid ) ) {
if ( ! security_policy_ - > IsAuthorized ( uid ) ) {
return absl : : PermissionDeniedError (
return absl : : PermissionDeniedError (
" UID " + std : : to_string ( uid ) +
" UID " + std : : to_string ( uid ) +
@ -191,14 +192,13 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
int version ;
int version ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & version ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & version ) ) ;
gpr_log ( GPR_DEBUG , " The other end respond with version = %d " , version ) ;
VLOG ( 2 ) < < " The other end respond with version = " < < version ;
// We only support this single lowest possible version, so server must
// We only support this single lowest possible version, so server must
// respond that version too.
// respond that version too.
if ( version ! = kWireFormatVersion ) {
if ( version ! = kWireFormatVersion ) {
gpr_log ( GPR_ERROR ,
LOG ( ERROR ) < < " The other end respond with version = " < < version
" The other end respond with version = %d, but we requested "
< < " , but we requested version " < < kWireFormatVersion
" version %d, trying to continue anyway " ,
< < " , trying to continue anyway " ;
version , kWireFormatVersion ) ;
}
}
std : : unique_ptr < Binder > binder { } ;
std : : unique_ptr < Binder > binder { } ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadBinder ( & binder ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadBinder ( & binder ) ) ;
@ -218,7 +218,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
case BinderTransportTxCode : : ACKNOWLEDGE_BYTES : {
case BinderTransportTxCode : : ACKNOWLEDGE_BYTES : {
int64_t num_bytes = - 1 ;
int64_t num_bytes = - 1 ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt64 ( & num_bytes ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt64 ( & num_bytes ) ) ;
gpr_log ( GPR_DEBUG , " received acknowledge bytes = % " PRId64 , num_bytes ) ;
VLOG ( 2 ) < < " received acknowledge bytes = " < < num_bytes ;
if ( ! wire_writer_ready_notification_ . WaitForNotificationWithTimeout (
if ( ! wire_writer_ready_notification_ . WaitForNotificationWithTimeout (
absl : : Seconds ( 5 ) ) ) {
absl : : Seconds ( 5 ) ) ) {
return absl : : DeadlineExceededError (
return absl : : DeadlineExceededError (
@ -233,14 +233,14 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
}
}
int ping_id = - 1 ;
int ping_id = - 1 ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & ping_id ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & ping_id ) ) ;
gpr_log ( GPR_DEBUG , " received ping id = %d " , ping_id ) ;
VLOG ( 2 ) < < " received ping id = " < < ping_id ;
// TODO(waynetu): Ping back.
// TODO(waynetu): Ping back.
break ;
break ;
}
}
case BinderTransportTxCode : : PING_RESPONSE : {
case BinderTransportTxCode : : PING_RESPONSE : {
int value = - 1 ;
int value = - 1 ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & value ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & value ) ) ;
gpr_log ( GPR_DEBUG , " received ping response = %d " , value ) ;
VLOG ( 2 ) < < " received ping response = " < < value ;
break ;
break ;
}
}
}
}
@ -283,8 +283,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
}
}
if ( ! tx_process_result . ok ( ) ) {
if ( ! tx_process_result . ok ( ) ) {
gpr_log ( GPR_ERROR , " Failed to process streaming transaction: %s " ,
LOG ( ERROR ) < < " Failed to process streaming transaction: "
tx_process_result . ToString ( ) . c_str ( ) ) ;
< < tx_process_result . ToString ( ) ;
// Something went wrong when receiving transaction. Cancel failed requests.
// Something went wrong when receiving transaction. Cancel failed requests.
if ( cancellation_flags & kFlagPrefix ) {
if ( cancellation_flags & kFlagPrefix ) {
LOG ( INFO ) < < " cancelling initial metadata " ;
LOG ( INFO ) < < " cancelling initial metadata " ;
@ -327,7 +327,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
std : : queue < absl : : AnyInvocable < void ( ) & & > > & deferred_func_queue ) {
std : : queue < absl : : AnyInvocable < void ( ) & & > > & deferred_func_queue ) {
CHECK ( cancellation_flags ) ;
CHECK ( cancellation_flags ) ;
num_incoming_bytes_ + = parcel - > GetDataSize ( ) ;
num_incoming_bytes_ + = parcel - > GetDataSize ( ) ;
gpr_log ( GPR_INFO , " Total incoming bytes: % " PRId64 , num_incoming_bytes_ ) ;
LOG ( INFO ) < < " Total incoming bytes: " < < num_incoming_bytes_ ;
int flags ;
int flags ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & flags ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & flags ) ) ;
@ -344,10 +344,10 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
}
}
int status = flags > > 16 ;
int status = flags > > 16 ;
gpr_log ( GPR_DEBUG , " status = %d " , status ) ;
VLOG ( 2 ) < < " status = " < < status ;
gpr_log ( GPR_DEBUG , " FLAG_PREFIX = %d " , ( flags & kFlagPrefix ) ) ;
VLOG ( 2 ) < < " FLAG_PREFIX = " < < ( flags & kFlagPrefix ) ;
gpr_log ( GPR_DEBUG , " FLAG_MESSAGE_DATA = %d " , ( flags & kFlagMessageData ) ) ;
VLOG ( 2 ) < < " FLAG_MESSAGE_DATA = " < < ( flags & kFlagMessageData ) ;
gpr_log ( GPR_DEBUG , " FLAG_SUFFIX = %d " , ( flags & kFlagSuffix ) ) ;
VLOG ( 2 ) < < " FLAG_SUFFIX = " < < ( flags & kFlagSuffix ) ;
int seq_num ;
int seq_num ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & seq_num ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & seq_num ) ) ;
// TODO(waynetu): For now we'll just assume that the transactions commit in
// TODO(waynetu): For now we'll just assume that the transactions commit in
@ -365,7 +365,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
CHECK ( expectation < std : : numeric_limits < int32_t > : : max ( ) )
CHECK ( expectation < std : : numeric_limits < int32_t > : : max ( ) )
< < " Sequence number too large " ;
< < " Sequence number too large " ;
expectation + + ;
expectation + + ;
gpr_log ( GPR_DEBUG , " sequence number = %d " , seq_num ) ;
VLOG ( 2 ) < < " sequence number = " < < seq_num ;
if ( flags & kFlagPrefix ) {
if ( flags & kFlagPrefix ) {
std : : string method_ref ;
std : : string method_ref ;
if ( ! is_client_ ) {
if ( ! is_client_ ) {
@ -405,7 +405,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
if ( flags & kFlagMessageData ) {
if ( flags & kFlagMessageData ) {
int count ;
int count ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & count ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadInt32 ( & count ) ) ;
gpr_log ( GPR_DEBUG , " count = %d " , count ) ;
VLOG ( 2 ) < < " count = " < < count ;
std : : string msg_data { } ;
std : : string msg_data { } ;
if ( count > 0 ) {
if ( count > 0 ) {
GRPC_RETURN_IF_ERROR ( parcel - > ReadByteArray ( & msg_data ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadByteArray ( & msg_data ) ) ;
@ -425,7 +425,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
// FLAG_STATUS_DESCRIPTION set
// FLAG_STATUS_DESCRIPTION set
std : : string desc ;
std : : string desc ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadString ( & desc ) ) ;
GRPC_RETURN_IF_ERROR ( parcel - > ReadString ( & desc ) ) ;
gpr_log ( GPR_DEBUG , " description = %s " , desc . c_str ( ) ) ;
VLOG ( 2 ) < < " description = " < < desc ;
}
}
Metadata trailing_metadata ;
Metadata trailing_metadata ;
if ( is_client_ ) {
if ( is_client_ ) {