|
|
|
@ -30,6 +30,8 @@ |
|
|
|
|
#include "absl/memory/memory.h" |
|
|
|
|
#include "absl/status/statusor.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" |
|
|
|
|
#include "src/core/ext/transport/binder/wire_format/binder.h" |
|
|
|
|
#include "src/core/ext/transport/binder/wire_format/wire_writer.h" |
|
|
|
@ -107,10 +109,11 @@ std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport( |
|
|
|
|
|
|
|
|
|
void WireReaderImpl::SendSetupTransport(Binder* binder) { |
|
|
|
|
binder->Initialize(); |
|
|
|
|
VLOG(2) << "prepare transaction = " << binder->PrepareTransaction().ok(); |
|
|
|
|
gpr_log(GPR_DEBUG, "prepare transaction = %d", |
|
|
|
|
binder->PrepareTransaction().ok()); |
|
|
|
|
WritableParcel* writable_parcel = binder->GetWritableParcel(); |
|
|
|
|
VLOG(2) << "write int32 = " |
|
|
|
|
<< writable_parcel->WriteInt32(kWireFormatVersion).ok(); |
|
|
|
|
gpr_log(GPR_DEBUG, "write int32 = %d", |
|
|
|
|
writable_parcel->WriteInt32(kWireFormatVersion).ok()); |
|
|
|
|
// The lifetime of the transaction receiver is the same as the wire writer's.
|
|
|
|
|
// The transaction receiver is responsible for not calling the on-transact
|
|
|
|
|
// callback when it's dead.
|
|
|
|
@ -124,11 +127,11 @@ void WireReaderImpl::SendSetupTransport(Binder* binder) { |
|
|
|
|
return this->ProcessTransaction(code, readable_parcel, uid); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
VLOG(2) << "tx_receiver = " << tx_receiver_->GetRawBinder(); |
|
|
|
|
VLOG(2) << "AParcel_writeStrongBinder = " |
|
|
|
|
<< writable_parcel->WriteBinder(tx_receiver_.get()).ok(); |
|
|
|
|
VLOG(2) << "AIBinder_transact = " |
|
|
|
|
<< binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok(); |
|
|
|
|
gpr_log(GPR_DEBUG, "tx_receiver = %p", tx_receiver_->GetRawBinder()); |
|
|
|
|
gpr_log(GPR_DEBUG, "AParcel_writeStrongBinder = %d", |
|
|
|
|
writable_parcel->WriteBinder(tx_receiver_.get()).ok()); |
|
|
|
|
gpr_log(GPR_DEBUG, "AIBinder_transact = %d", |
|
|
|
|
binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::unique_ptr<Binder> WireReaderImpl::RecvSetupTransport() { |
|
|
|
@ -178,7 +181,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
recvd_setup_transport_ = true; |
|
|
|
|
|
|
|
|
|
VLOG(2) << "calling uid = " << uid; |
|
|
|
|
gpr_log(GPR_DEBUG, "calling uid = %d", uid); |
|
|
|
|
if (!security_policy_->IsAuthorized(uid)) { |
|
|
|
|
return absl::PermissionDeniedError( |
|
|
|
|
"UID " + std::to_string(uid) + |
|
|
|
@ -188,13 +191,14 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
|
|
|
|
|
int version; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&version)); |
|
|
|
|
VLOG(2) << "The other end respond with version = " << version; |
|
|
|
|
gpr_log(GPR_DEBUG, "The other end respond with version = %d", version); |
|
|
|
|
// We only support this single lowest possible version, so server must
|
|
|
|
|
// respond that version too.
|
|
|
|
|
if (version != kWireFormatVersion) { |
|
|
|
|
LOG(ERROR) << "The other end respond with version = " << version |
|
|
|
|
<< ", but we requested version " << kWireFormatVersion |
|
|
|
|
<< ", trying to continue anyway"; |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"The other end respond with version = %d, but we requested " |
|
|
|
|
"version %d, trying to continue anyway", |
|
|
|
|
version, kWireFormatVersion); |
|
|
|
|
} |
|
|
|
|
std::unique_ptr<Binder> binder{}; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadBinder(&binder)); |
|
|
|
@ -214,7 +218,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
case BinderTransportTxCode::ACKNOWLEDGE_BYTES: { |
|
|
|
|
int64_t num_bytes = -1; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt64(&num_bytes)); |
|
|
|
|
VLOG(2) << "received acknowledge bytes = " << num_bytes; |
|
|
|
|
gpr_log(GPR_DEBUG, "received acknowledge bytes = %" PRId64, num_bytes); |
|
|
|
|
if (!wire_writer_ready_notification_.WaitForNotificationWithTimeout( |
|
|
|
|
absl::Seconds(5))) { |
|
|
|
|
return absl::DeadlineExceededError( |
|
|
|
@ -229,14 +233,14 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
int ping_id = -1; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&ping_id)); |
|
|
|
|
VLOG(2) << "received ping id = " << ping_id; |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping id = %d", ping_id); |
|
|
|
|
// TODO(waynetu): Ping back.
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case BinderTransportTxCode::PING_RESPONSE: { |
|
|
|
|
int value = -1; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&value)); |
|
|
|
|
VLOG(2) << "received ping response = " << value; |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping response = %d", value); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -279,8 +283,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!tx_process_result.ok()) { |
|
|
|
|
LOG(ERROR) << "Failed to process streaming transaction: " |
|
|
|
|
<< tx_process_result.ToString(); |
|
|
|
|
gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", |
|
|
|
|
tx_process_result.ToString().c_str()); |
|
|
|
|
// Something went wrong when receiving transaction. Cancel failed requests.
|
|
|
|
|
if (cancellation_flags & kFlagPrefix) { |
|
|
|
|
LOG(INFO) << "cancelling initial metadata"; |
|
|
|
@ -323,7 +327,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
std::queue<absl::AnyInvocable<void() &&>>& deferred_func_queue) { |
|
|
|
|
CHECK(cancellation_flags); |
|
|
|
|
num_incoming_bytes_ += parcel->GetDataSize(); |
|
|
|
|
LOG(INFO) << "Total incoming bytes: " << num_incoming_bytes_; |
|
|
|
|
gpr_log(GPR_INFO, "Total incoming bytes: %" PRId64, num_incoming_bytes_); |
|
|
|
|
|
|
|
|
|
int flags; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&flags)); |
|
|
|
@ -340,10 +344,10 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int status = flags >> 16; |
|
|
|
|
VLOG(2) << "status = " << status; |
|
|
|
|
VLOG(2) << "FLAG_PREFIX = " << (flags & kFlagPrefix); |
|
|
|
|
VLOG(2) << "FLAG_MESSAGE_DATA = " << (flags & kFlagMessageData); |
|
|
|
|
VLOG(2) << "FLAG_SUFFIX = " << (flags & kFlagSuffix); |
|
|
|
|
gpr_log(GPR_DEBUG, "status = %d", status); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_PREFIX = %d", (flags & kFlagPrefix)); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData)); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_SUFFIX = %d", (flags & kFlagSuffix)); |
|
|
|
|
int seq_num; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&seq_num)); |
|
|
|
|
// TODO(waynetu): For now we'll just assume that the transactions commit in
|
|
|
|
@ -361,7 +365,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
CHECK(expectation < std::numeric_limits<int32_t>::max()) |
|
|
|
|
<< "Sequence number too large"; |
|
|
|
|
expectation++; |
|
|
|
|
VLOG(2) << "sequence number = " << seq_num; |
|
|
|
|
gpr_log(GPR_DEBUG, "sequence number = %d", seq_num); |
|
|
|
|
if (flags & kFlagPrefix) { |
|
|
|
|
std::string method_ref; |
|
|
|
|
if (!is_client_) { |
|
|
|
@ -401,7 +405,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
if (flags & kFlagMessageData) { |
|
|
|
|
int count; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&count)); |
|
|
|
|
VLOG(2) << "count = " << count; |
|
|
|
|
gpr_log(GPR_DEBUG, "count = %d", count); |
|
|
|
|
std::string msg_data{}; |
|
|
|
|
if (count > 0) { |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data)); |
|
|
|
@ -421,7 +425,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
// FLAG_STATUS_DESCRIPTION set
|
|
|
|
|
std::string desc; |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadString(&desc)); |
|
|
|
|
VLOG(2) << "description = " << desc; |
|
|
|
|
gpr_log(GPR_DEBUG, "description = %s", desc.c_str()); |
|
|
|
|
} |
|
|
|
|
Metadata trailing_metadata; |
|
|
|
|
if (is_client_) { |
|
|
|
|