|
|
|
@ -48,7 +48,6 @@ const char kAuthorityMetadataKey[] = ":authority"; |
|
|
|
|
absl::StatusOr<Metadata> parse_metadata(ReadableParcel* reader) { |
|
|
|
|
int num_header; |
|
|
|
|
RETURN_IF_ERROR(reader->ReadInt32(&num_header)); |
|
|
|
|
gpr_log(GPR_INFO, "num_header = %d", num_header); |
|
|
|
|
if (num_header < 0) { |
|
|
|
|
return absl::InvalidArgumentError("num_header cannot be negative"); |
|
|
|
|
} |
|
|
|
@ -56,15 +55,11 @@ absl::StatusOr<Metadata> parse_metadata(ReadableParcel* reader) { |
|
|
|
|
for (int i = 0; i < num_header; i++) { |
|
|
|
|
int count; |
|
|
|
|
RETURN_IF_ERROR(reader->ReadInt32(&count)); |
|
|
|
|
gpr_log(GPR_INFO, "count = %d", count); |
|
|
|
|
std::string key{}; |
|
|
|
|
if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&key)); |
|
|
|
|
gpr_log(GPR_INFO, "key = %s", key.c_str()); |
|
|
|
|
RETURN_IF_ERROR(reader->ReadInt32(&count)); |
|
|
|
|
gpr_log(GPR_INFO, "count = %d", count); |
|
|
|
|
std::string value{}; |
|
|
|
|
if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&value)); |
|
|
|
|
gpr_log(GPR_INFO, "value = %s", value.c_str()); |
|
|
|
|
ret.emplace_back(key, value); |
|
|
|
|
} |
|
|
|
|
return ret; |
|
|
|
@ -80,9 +75,7 @@ WireReaderImpl::WireReaderImpl( |
|
|
|
|
: transport_stream_receiver_(std::move(transport_stream_receiver)), |
|
|
|
|
is_client_(is_client), |
|
|
|
|
security_policy_(security_policy), |
|
|
|
|
on_destruct_callback_(on_destruct_callback) { |
|
|
|
|
gpr_log(GPR_INFO, "%s mu_ = %p", __func__, &mu_); |
|
|
|
|
} |
|
|
|
|
on_destruct_callback_(on_destruct_callback) {} |
|
|
|
|
|
|
|
|
|
WireReaderImpl::~WireReaderImpl() { |
|
|
|
|
if (on_destruct_callback_) { |
|
|
|
@ -92,7 +85,6 @@ WireReaderImpl::~WireReaderImpl() { |
|
|
|
|
|
|
|
|
|
std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport( |
|
|
|
|
std::unique_ptr<Binder> binder) { |
|
|
|
|
gpr_log(GPR_INFO, "Setting up transport"); |
|
|
|
|
if (!is_client_) { |
|
|
|
|
SendSetupTransport(binder.get()); |
|
|
|
|
{ |
|
|
|
@ -116,10 +108,10 @@ std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport( |
|
|
|
|
|
|
|
|
|
void WireReaderImpl::SendSetupTransport(Binder* binder) { |
|
|
|
|
binder->Initialize(); |
|
|
|
|
gpr_log(GPR_INFO, "prepare transaction = %d", |
|
|
|
|
gpr_log(GPR_DEBUG, "prepare transaction = %d", |
|
|
|
|
binder->PrepareTransaction().ok()); |
|
|
|
|
WritableParcel* writable_parcel = binder->GetWritableParcel(); |
|
|
|
|
gpr_log(GPR_INFO, "write int32 = %d", |
|
|
|
|
gpr_log(GPR_DEBUG, "write int32 = %d", |
|
|
|
|
writable_parcel->WriteInt32(kWireFormatVersion).ok()); |
|
|
|
|
// The lifetime of the transaction receiver is the same as the wire writer's.
|
|
|
|
|
// The transaction receiver is responsible for not calling the on-transact
|
|
|
|
@ -134,29 +126,26 @@ void WireReaderImpl::SendSetupTransport(Binder* binder) { |
|
|
|
|
return this->ProcessTransaction(code, readable_parcel, uid); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "tx_receiver = %p", tx_receiver_->GetRawBinder()); |
|
|
|
|
gpr_log(GPR_INFO, "AParcel_writeStrongBinder = %d", |
|
|
|
|
gpr_log(GPR_DEBUG, "tx_receiver = %p", tx_receiver_->GetRawBinder()); |
|
|
|
|
gpr_log(GPR_DEBUG, "AParcel_writeStrongBinder = %d", |
|
|
|
|
writable_parcel->WriteBinder(tx_receiver_.get()).ok()); |
|
|
|
|
gpr_log(GPR_INFO, "AIBinder_transact = %d", |
|
|
|
|
gpr_log(GPR_DEBUG, "AIBinder_transact = %d", |
|
|
|
|
binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::unique_ptr<Binder> WireReaderImpl::RecvSetupTransport() { |
|
|
|
|
// TODO(b/191941760): avoid blocking, handle wire_writer_noti lifetime
|
|
|
|
|
// better
|
|
|
|
|
gpr_log(GPR_INFO, "start waiting for noti"); |
|
|
|
|
gpr_log(GPR_DEBUG, "start waiting for noti"); |
|
|
|
|
connection_noti_.WaitForNotification(); |
|
|
|
|
gpr_log(GPR_INFO, "end waiting for noti"); |
|
|
|
|
gpr_log(GPR_DEBUG, "end waiting for noti"); |
|
|
|
|
return std::move(other_end_binder_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
ReadableParcel* parcel, |
|
|
|
|
int uid) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
gpr_log(GPR_INFO, "tx code = %u", code); |
|
|
|
|
if (code >= static_cast<unsigned>(kFirstCallId)) { |
|
|
|
|
gpr_log(GPR_INFO, "This is probably a Streaming Tx"); |
|
|
|
|
return ProcessStreamingTransaction(code, parcel); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -190,7 +179,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
recvd_setup_transport_ = true; |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, "calling uid = %d", uid); |
|
|
|
|
gpr_log(GPR_DEBUG, "calling uid = %d", uid); |
|
|
|
|
if (!security_policy_->IsAuthorized(uid)) { |
|
|
|
|
return absl::PermissionDeniedError( |
|
|
|
|
"UID " + std::to_string(uid) + |
|
|
|
@ -200,7 +189,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
|
|
|
|
|
int version; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&version)); |
|
|
|
|
gpr_log(GPR_INFO, "The other end respond with version = %d", version); |
|
|
|
|
gpr_log(GPR_DEBUG, "The other end respond with version = %d", version); |
|
|
|
|
// We only support this single lowest possible version, so server must
|
|
|
|
|
// respond that version too.
|
|
|
|
|
if (version != kWireFormatVersion) { |
|
|
|
@ -227,7 +216,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
case BinderTransportTxCode::ACKNOWLEDGE_BYTES: { |
|
|
|
|
int64_t num_bytes = -1; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt64(&num_bytes)); |
|
|
|
|
gpr_log(GPR_INFO, "received acknowledge bytes = %" PRId64, num_bytes); |
|
|
|
|
gpr_log(GPR_DEBUG, "received acknowledge bytes = %" PRId64, num_bytes); |
|
|
|
|
wire_writer_->OnAckReceived(num_bytes); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -237,14 +226,14 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
int ping_id = -1; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&ping_id)); |
|
|
|
|
gpr_log(GPR_INFO, "received ping id = %d", ping_id); |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping id = %d", ping_id); |
|
|
|
|
// TODO(waynetu): Ping back.
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case BinderTransportTxCode::PING_RESPONSE: { |
|
|
|
|
int value = -1; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&value)); |
|
|
|
|
gpr_log(GPR_INFO, "received ping response = %d", value); |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping response = %d", value); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -320,7 +309,6 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
|
|
|
|
|
int flags; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&flags)); |
|
|
|
|
gpr_log(GPR_INFO, "flags = %d", flags); |
|
|
|
|
*cancellation_flags = flags; |
|
|
|
|
|
|
|
|
|
// Ignore in-coming transaction with flag = 0 to match with Java
|
|
|
|
@ -334,10 +322,10 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int status = flags >> 16; |
|
|
|
|
gpr_log(GPR_INFO, "status = %d", status); |
|
|
|
|
gpr_log(GPR_INFO, "FLAG_PREFIX = %d", (flags & kFlagPrefix)); |
|
|
|
|
gpr_log(GPR_INFO, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData)); |
|
|
|
|
gpr_log(GPR_INFO, "FLAG_SUFFIX = %d", (flags & kFlagSuffix)); |
|
|
|
|
gpr_log(GPR_DEBUG, "status = %d", status); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_PREFIX = %d", (flags & kFlagPrefix)); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData)); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_SUFFIX = %d", (flags & kFlagSuffix)); |
|
|
|
|
int seq_num; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&seq_num)); |
|
|
|
|
// TODO(waynetu): For now we'll just assume that the transactions commit in
|
|
|
|
@ -355,7 +343,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
GPR_ASSERT(expectation < std::numeric_limits<int32_t>::max() && |
|
|
|
|
"Sequence number too large"); |
|
|
|
|
expectation++; |
|
|
|
|
gpr_log(GPR_INFO, "sequence number = %d", seq_num); |
|
|
|
|
gpr_log(GPR_DEBUG, "sequence number = %d", seq_num); |
|
|
|
|
if (flags & kFlagPrefix) { |
|
|
|
|
std::string method_ref; |
|
|
|
|
if (!is_client_) { |
|
|
|
@ -391,12 +379,11 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
if (flags & kFlagMessageData) { |
|
|
|
|
int count; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadInt32(&count)); |
|
|
|
|
gpr_log(GPR_INFO, "count = %d", count); |
|
|
|
|
gpr_log(GPR_DEBUG, "count = %d", count); |
|
|
|
|
std::string msg_data{}; |
|
|
|
|
if (count > 0) { |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data)); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_INFO, "msg_data = %s", msg_data.c_str()); |
|
|
|
|
message_buffer_[code] += msg_data; |
|
|
|
|
if ((flags & kFlagMessageDataIsPartial) == 0) { |
|
|
|
|
std::string s = std::move(message_buffer_[code]); |
|
|
|
@ -410,7 +397,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
// FLAG_STATUS_DESCRIPTION set
|
|
|
|
|
std::string desc; |
|
|
|
|
RETURN_IF_ERROR(parcel->ReadString(&desc)); |
|
|
|
|
gpr_log(GPR_INFO, "description = %s", desc.c_str()); |
|
|
|
|
gpr_log(GPR_DEBUG, "description = %s", desc.c_str()); |
|
|
|
|
} |
|
|
|
|
Metadata trailing_metadata; |
|
|
|
|
if (is_client_) { |
|
|
|
|