[binder] Handle inbound flow control (#27228)

pull/27241/head
Ta-Wei Tu 4 years ago committed by GitHub
parent 3a5e844b38
commit dff9e84e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/transport/binder/transport/binder_transport.h
  2. 2
      src/core/ext/transport/binder/wire_format/binder.h
  3. 12
      src/core/ext/transport/binder/wire_format/binder_android.cc
  4. 2
      src/core/ext/transport/binder/wire_format/binder_android.h
  5. 1
      src/core/ext/transport/binder/wire_format/transaction.cc
  6. 1
      src/core/ext/transport/binder/wire_format/transaction.h
  7. 2
      src/core/ext/transport/binder/wire_format/wire_reader.h
  8. 21
      src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
  9. 11
      src/core/ext/transport/binder/wire_format/wire_reader_impl.h
  10. 9
      src/core/ext/transport/binder/wire_format/wire_writer.cc
  11. 2
      src/core/ext/transport/binder/wire_format/wire_writer.h
  12. 15
      test/core/transport/binder/end2end/fake_binder.cc
  13. 4
      test/core/transport/binder/end2end/fake_binder.h
  14. 3
      test/core/transport/binder/mock_objects.h
  15. 33
      test/core/transport/binder/wire_reader_test.cc

@ -60,7 +60,7 @@ struct grpc_binder_transport {
std::shared_ptr<grpc_binder::TransportStreamReceiver>
transport_stream_receiver;
grpc_core::OrphanablePtr<grpc_binder::WireReader> wire_reader;
std::unique_ptr<grpc_binder::WireWriter> wire_writer;
std::shared_ptr<grpc_binder::WireWriter> wire_writer;
bool is_client;
grpc_core::Mutex mu;

@ -46,6 +46,7 @@ class WritableParcel {
virtual int32_t GetDataPosition() const = 0;
virtual absl::Status SetDataPosition(int32_t pos) = 0;
virtual absl::Status WriteInt32(int32_t data) = 0;
virtual absl::Status WriteInt64(int64_t data) = 0;
virtual absl::Status WriteBinder(HasRawBinder* binder) = 0;
virtual absl::Status WriteString(absl::string_view s) = 0;
virtual absl::Status WriteByteArray(const int8_t* buffer, int32_t length) = 0;
@ -66,6 +67,7 @@ class ReadableParcel {
public:
virtual ~ReadableParcel() = default;
virtual absl::Status ReadInt32(int32_t* data) const = 0;
virtual absl::Status ReadInt64(int64_t* data) const = 0;
virtual absl::Status ReadBinder(std::unique_ptr<Binder>* data) const = 0;
// TODO(waynetu): Provide better interfaces.
virtual absl::Status ReadByteArray(std::string* data) const = 0;

@ -205,6 +205,12 @@ absl::Status WritableParcelAndroid::WriteInt32(int32_t data) {
: absl::InternalError("AParcel_writeInt32 failed");
}
absl::Status WritableParcelAndroid::WriteInt64(int64_t data) {
return AParcel_writeInt64(parcel_, data) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_writeInt64 failed");
}
absl::Status WritableParcelAndroid::WriteBinder(HasRawBinder* binder) {
return AParcel_writeStrongBinder(
parcel_, reinterpret_cast<AIBinder*>(binder->GetRawBinder())) ==
@ -232,6 +238,12 @@ absl::Status ReadableParcelAndroid::ReadInt32(int32_t* data) const {
: absl::InternalError("AParcel_readInt32 failed");
}
absl::Status ReadableParcelAndroid::ReadInt64(int64_t* data) const {
return AParcel_readInt64(parcel_, data) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_readInt64 failed");
}
absl::Status ReadableParcelAndroid::ReadBinder(
std::unique_ptr<Binder>* data) const {
AIBinder* binder;

@ -46,6 +46,7 @@ class WritableParcelAndroid final : public WritableParcel {
int32_t GetDataPosition() const override;
absl::Status SetDataPosition(int32_t pos) override;
absl::Status WriteInt32(int32_t data) override;
absl::Status WriteInt64(int64_t data) override;
absl::Status WriteBinder(HasRawBinder* binder) override;
absl::Status WriteString(absl::string_view s) override;
absl::Status WriteByteArray(const int8_t* buffer, int32_t length) override;
@ -65,6 +66,7 @@ class ReadableParcelAndroid final : public ReadableParcel {
~ReadableParcelAndroid() override = default;
absl::Status ReadInt32(int32_t* data) const override;
absl::Status ReadInt64(int64_t* data) const override;
absl::Status ReadBinder(std::unique_ptr<Binder>* data) const override;
absl::Status ReadByteArray(std::string* data) const override;
// FIXME(waynetu): Fix the interface.

@ -25,5 +25,6 @@ const int kFlagOutOfBandClose = 0x8;
const int kFlagExpectSingleMessage = 0x10;
const int kFlagStatusDescription = 0x20;
const int kFlagMessageDataIsParcelable = 0x40;
const int kFlagMessageDataIsPartial = 0x80;
} // namespace grpc_binder

@ -33,6 +33,7 @@ ABSL_CONST_INIT extern const int kFlagOutOfBandClose;
ABSL_CONST_INIT extern const int kFlagExpectSingleMessage;
ABSL_CONST_INIT extern const int kFlagStatusDescription;
ABSL_CONST_INIT extern const int kFlagMessageDataIsParcelable;
ABSL_CONST_INIT extern const int kFlagMessageDataIsPartial;
using Metadata = std::vector<std::pair<std::string, std::string>>;

@ -29,7 +29,7 @@ namespace grpc_binder {
class WireReader : public grpc_core::InternallyRefCounted<WireReader> {
public:
~WireReader() override = default;
virtual std::unique_ptr<WireWriter> SetupTransport(
virtual std::shared_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> endpoint_binder) = 0;
};

@ -79,7 +79,7 @@ WireReaderImpl::~WireReaderImpl() {
}
}
std::unique_ptr<WireWriter> WireReaderImpl::SetupTransport(
std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport(
std::unique_ptr<Binder> binder) {
gpr_log(GPR_INFO, "Setting up transport");
if (!is_client_) {
@ -88,7 +88,9 @@ std::unique_ptr<WireWriter> WireReaderImpl::SetupTransport(
} else {
SendSetupTransport(binder.get());
auto other_end_binder = RecvSetupTransport();
return absl::make_unique<WireWriterImpl>(std::move(other_end_binder));
wire_writer_ =
std::make_shared<WireWriterImpl>(std::move(other_end_binder));
return wire_writer_;
}
}
@ -233,6 +235,13 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
transport_stream_receiver_->NotifyRecvTrailingMetadata(code, status, 0);
}
}
if ((num_incoming_bytes_ - num_acknowledged_bytes_) >= kFlowControlAckBytes) {
absl::Status ack_status = wire_writer_->Ack(num_incoming_bytes_);
if (status.ok()) {
status = ack_status;
}
num_acknowledged_bytes_ = num_incoming_bytes_;
}
return status;
}
@ -306,7 +315,13 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data));
}
gpr_log(GPR_INFO, "msg_data = %s", msg_data.c_str());
transport_stream_receiver_->NotifyRecvMessage(code, std::move(msg_data));
message_buffer_[code] += msg_data;
num_incoming_bytes_ += count;
if ((flags & kFlagMessageDataIsPartial) == 0) {
std::string s = std::move(message_buffer_[code]);
message_buffer_.erase(code);
transport_stream_receiver_->NotifyRecvMessage(code, std::move(s));
}
*cancellation_flags &= ~kFlagMessageData;
}
if (flags & kFlagSuffix) {

@ -62,7 +62,7 @@ class WireReaderImpl : public WireReader {
/// setup, we assume that the first half of SETUP_TRANSPORT (up to step 2) is
/// already done somewhere else (see test/end2end/binder_transport_test.cc for
/// how it's handled in the testing environment).
std::unique_ptr<WireWriter> SetupTransport(
std::shared_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> binder) override;
absl::Status ProcessTransaction(transaction_code_t code,
@ -104,11 +104,20 @@ class WireReaderImpl : public WireReader {
// called. Be cautious not to access it afterward.
std::unique_ptr<Binder> other_end_binder_;
absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_;
absl::flat_hash_map<transaction_code_t, std::string> message_buffer_;
std::unique_ptr<TransactionReceiver> tx_receiver_;
bool is_client_;
// When WireReaderImpl gets destructed, call on_destruct_callback_. This is
// mostly for decrementing the reference count of its transport.
std::function<void()> on_destruct_callback_;
// ACK every 16k bytes.
static constexpr int64_t kFlowControlAckBytes = 16 * 1024;
int64_t num_incoming_bytes_ = 0;
int64_t num_acknowledged_bytes_ = 0;
// Used to send ACK.
std::shared_ptr<WireWriter> wire_writer_;
};
} // namespace grpc_binder

@ -77,4 +77,13 @@ absl::Status WireWriterImpl::RpcCall(const Transaction& tx) {
// is an undefined behavior.
return binder_->Transact(BinderTransportTxCode(tx.GetTxCode()));
}
absl::Status WireWriterImpl::Ack(int64_t num_bytes) {
grpc_core::MutexLock lock(&mu_);
RETURN_IF_ERROR(binder_->PrepareTransaction());
WritableParcel* parcel = binder_->GetWritableParcel();
RETURN_IF_ERROR(parcel->WriteInt64(num_bytes));
return binder_->Transact(BinderTransportTxCode::ACKNOWLEDGE_BYTES);
}
} // namespace grpc_binder

@ -32,12 +32,14 @@ class WireWriter {
public:
virtual ~WireWriter() = default;
virtual absl::Status RpcCall(const Transaction& call) = 0;
virtual absl::Status Ack(int64_t num_bytes) = 0;
};
class WireWriterImpl : public WireWriter {
public:
explicit WireWriterImpl(std::unique_ptr<Binder> binder);
absl::Status RpcCall(const Transaction& tx) override;
absl::Status Ack(int64_t num_bytes) override;
private:
grpc_core::Mutex mu_;

@ -42,6 +42,12 @@ absl::Status FakeWritableParcel::WriteInt32(int32_t data) {
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteInt64(int64_t data) {
data_[data_position_] = data;
SetDataPosition(data_position_ + 1).IgnoreError();
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteBinder(HasRawBinder* binder) {
data_[data_position_] = binder->GetRawBinder();
SetDataPosition(data_position_ + 1).IgnoreError();
@ -70,6 +76,15 @@ absl::Status FakeReadableParcel::ReadInt32(int32_t* data) const {
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadInt64(int64_t* data) const {
if (data_position_ >= data_.size() ||
!absl::holds_alternative<int64_t>(data_[data_position_])) {
return absl::InternalError("ReadInt64 failed");
}
*data = absl::get<int64_t>(data_[data_position_++]);
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadBinder(
std::unique_ptr<Binder>* data) const {
if (data_position_ >= data_.size() ||

@ -72,7 +72,7 @@ namespace grpc_binder {
namespace end2end_testing {
using FakeData = std::vector<
absl::variant<int32_t, void*, std::string, std::vector<int8_t>>>;
absl::variant<int32_t, int64_t, void*, std::string, std::vector<int8_t>>>;
// A fake writable parcel.
//
@ -85,6 +85,7 @@ class FakeWritableParcel final : public WritableParcel {
int32_t GetDataPosition() const override;
absl::Status SetDataPosition(int32_t pos) override;
absl::Status WriteInt32(int32_t data) override;
absl::Status WriteInt64(int64_t data) override;
absl::Status WriteBinder(HasRawBinder* binder) override;
absl::Status WriteString(absl::string_view s) override;
absl::Status WriteByteArray(const int8_t* buffer, int32_t length) override;
@ -104,6 +105,7 @@ class FakeReadableParcel final : public ReadableParcel {
public:
explicit FakeReadableParcel(FakeData data) : data_(std::move(data)) {}
absl::Status ReadInt32(int32_t* data) const override;
absl::Status ReadInt64(int64_t* data) const override;
absl::Status ReadBinder(std::unique_ptr<Binder>* data) const override;
absl::Status ReadByteArray(std::string* data) const override;
absl::Status ReadString(char data[111]) const override;

@ -29,6 +29,7 @@ class MockWritableParcel : public WritableParcel {
MOCK_METHOD(int32_t, GetDataPosition, (), (const, override));
MOCK_METHOD(absl::Status, SetDataPosition, (int32_t), (override));
MOCK_METHOD(absl::Status, WriteInt32, (int32_t), (override));
MOCK_METHOD(absl::Status, WriteInt64, (int64_t), (override));
MOCK_METHOD(absl::Status, WriteBinder, (HasRawBinder*), (override));
MOCK_METHOD(absl::Status, WriteString, (absl::string_view), (override));
MOCK_METHOD(absl::Status, WriteByteArray, (const int8_t*, int32_t),
@ -40,6 +41,7 @@ class MockWritableParcel : public WritableParcel {
class MockReadableParcel : public ReadableParcel {
public:
MOCK_METHOD(absl::Status, ReadInt32, (int32_t*), (const, override));
MOCK_METHOD(absl::Status, ReadInt64, (int64_t*), (const, override));
MOCK_METHOD(absl::Status, ReadBinder, (std::unique_ptr<Binder>*),
(const, override));
MOCK_METHOD(absl::Status, ReadByteArray, (std::string*), (const, override));
@ -86,6 +88,7 @@ class MockTransactionReceiver : public TransactionReceiver {
class MockWireWriter : public WireWriter {
public:
MOCK_METHOD(absl::Status, RpcCall, (const Transaction&), (override));
MOCK_METHOD(absl::Status, Ack, (int64_t), (override));
};
class MockTransportStreamReceiver : public TransportStreamReceiver {

@ -269,6 +269,39 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) {
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, InBoundFlowControl) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagMessageData | kFlagMessageDataIsPartial);
// sequence number
ExpectReadInt32(0);
// message size
ExpectReadInt32(1000);
EXPECT_CALL(mock_readable_parcel_, ReadByteArray)
.WillOnce(DoAll(SetArgPointee<0>(std::string(1000, 'a')),
Return(absl::OkStatus())));
// Data is not completed. No callback will be triggered.
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
// flag
ExpectReadInt32(kFlagMessageData);
// sequence number
ExpectReadInt32(1);
// message size
ExpectReadInt32(1000);
EXPECT_CALL(mock_readable_parcel_, ReadByteArray)
.WillOnce(DoAll(SetArgPointee<0>(std::string(1000, 'b')),
Return(absl::OkStatus())));
EXPECT_CALL(*transport_stream_receiver_,
NotifyRecvMessage(kFirstCallId,
StatusOrContainerEq(std::string(1000, 'a') +
std::string(1000, 'b'))));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
} // namespace grpc_binder
int main(int argc, char** argv) {

Loading…
Cancel
Save