|
|
|
@ -42,19 +42,19 @@ const char kAuthorityMetadataKey[] = ":authority"; |
|
|
|
|
|
|
|
|
|
absl::StatusOr<Metadata> parse_metadata(ReadableParcel* reader) { |
|
|
|
|
int num_header; |
|
|
|
|
RETURN_IF_NOT_OK(reader->ReadInt32(&num_header)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(reader->ReadInt32(&num_header)); |
|
|
|
|
if (num_header < 0) { |
|
|
|
|
return absl::InvalidArgumentError("num_header cannot be negative"); |
|
|
|
|
} |
|
|
|
|
std::vector<std::pair<std::string, std::string>> ret; |
|
|
|
|
for (int i = 0; i < num_header; i++) { |
|
|
|
|
int count; |
|
|
|
|
RETURN_IF_NOT_OK(reader->ReadInt32(&count)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(reader->ReadInt32(&count)); |
|
|
|
|
std::string key{}; |
|
|
|
|
if (count > 0) RETURN_IF_NOT_OK(reader->ReadByteArray(&key)); |
|
|
|
|
RETURN_IF_NOT_OK(reader->ReadInt32(&count)); |
|
|
|
|
if (count > 0) GRPC_RETURN_IF_ERROR(reader->ReadByteArray(&key)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(reader->ReadInt32(&count)); |
|
|
|
|
std::string value{}; |
|
|
|
|
if (count > 0) RETURN_IF_NOT_OK(reader->ReadByteArray(&value)); |
|
|
|
|
if (count > 0) GRPC_RETURN_IF_ERROR(reader->ReadByteArray(&value)); |
|
|
|
|
ret.emplace_back(key, value); |
|
|
|
|
} |
|
|
|
|
return ret; |
|
|
|
@ -183,7 +183,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int version; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&version)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&version)); |
|
|
|
|
gpr_log(GPR_DEBUG, "The other end respond with version = %d", version); |
|
|
|
|
// We only support this single lowest possible version, so server must
|
|
|
|
|
// respond that version too.
|
|
|
|
@ -194,7 +194,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
version, kWireFormatVersion); |
|
|
|
|
} |
|
|
|
|
std::unique_ptr<Binder> binder{}; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadBinder(&binder)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadBinder(&binder)); |
|
|
|
|
if (!binder) { |
|
|
|
|
return absl::InternalError("Read NULL binder from the parcel"); |
|
|
|
|
} |
|
|
|
@ -210,7 +210,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
} |
|
|
|
|
case BinderTransportTxCode::ACKNOWLEDGE_BYTES: { |
|
|
|
|
int64_t num_bytes = -1; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt64(&num_bytes)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt64(&num_bytes)); |
|
|
|
|
gpr_log(GPR_DEBUG, "received acknowledge bytes = %" PRId64, num_bytes); |
|
|
|
|
wire_writer_->OnAckReceived(num_bytes); |
|
|
|
|
break; |
|
|
|
@ -220,14 +220,14 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code, |
|
|
|
|
return absl::FailedPreconditionError("Receive PING request in client"); |
|
|
|
|
} |
|
|
|
|
int ping_id = -1; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&ping_id)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&ping_id)); |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping id = %d", ping_id); |
|
|
|
|
// TODO(waynetu): Ping back.
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case BinderTransportTxCode::PING_RESPONSE: { |
|
|
|
|
int value = -1; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&value)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&value)); |
|
|
|
|
gpr_log(GPR_DEBUG, "received ping response = %d", value); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -303,7 +303,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
gpr_log(GPR_INFO, "Total incoming bytes: %" PRId64, num_incoming_bytes_); |
|
|
|
|
|
|
|
|
|
int flags; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&flags)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&flags)); |
|
|
|
|
*cancellation_flags = flags; |
|
|
|
|
|
|
|
|
|
// Ignore in-coming transaction with flag = 0 to match with Java
|
|
|
|
@ -322,7 +322,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData)); |
|
|
|
|
gpr_log(GPR_DEBUG, "FLAG_SUFFIX = %d", (flags & kFlagSuffix)); |
|
|
|
|
int seq_num; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&seq_num)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&seq_num)); |
|
|
|
|
// TODO(waynetu): For now we'll just assume that the transactions commit in
|
|
|
|
|
// the same order they're issued. The following assertion detects
|
|
|
|
|
// out-of-order or missing transactions. WireReaderImpl should be fixed if
|
|
|
|
@ -342,7 +342,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
if (flags & kFlagPrefix) { |
|
|
|
|
std::string method_ref; |
|
|
|
|
if (!is_client_) { |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadString(&method_ref)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadString(&method_ref)); |
|
|
|
|
} |
|
|
|
|
absl::StatusOr<Metadata> initial_metadata_or_error = parse_metadata(parcel); |
|
|
|
|
if (!initial_metadata_or_error.ok()) { |
|
|
|
@ -373,11 +373,11 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
} |
|
|
|
|
if (flags & kFlagMessageData) { |
|
|
|
|
int count; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadInt32(&count)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadInt32(&count)); |
|
|
|
|
gpr_log(GPR_DEBUG, "count = %d", count); |
|
|
|
|
std::string msg_data{}; |
|
|
|
|
if (count > 0) { |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadByteArray(&msg_data)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data)); |
|
|
|
|
} |
|
|
|
|
message_buffer_[code] += msg_data; |
|
|
|
|
if ((flags & kFlagMessageDataIsPartial) == 0) { |
|
|
|
@ -391,7 +391,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( |
|
|
|
|
if (flags & kFlagStatusDescription) { |
|
|
|
|
// FLAG_STATUS_DESCRIPTION set
|
|
|
|
|
std::string desc; |
|
|
|
|
RETURN_IF_NOT_OK(parcel->ReadString(&desc)); |
|
|
|
|
GRPC_RETURN_IF_ERROR(parcel->ReadString(&desc)); |
|
|
|
|
gpr_log(GPR_DEBUG, "description = %s", desc.c_str()); |
|
|
|
|
} |
|
|
|
|
Metadata trailing_metadata; |
|
|
|
|