[binder] Fix WireReaderImpl bugs & races (#27303)

There was a bug found by the fuzzer where we might access wire_writer_ before
finishing SETUP_TRANSPORT (and thus constructing wire_writer_). This PR
fixes such issue by making sure that we won't proceed with any requests
until the connection is fully established.

Since binder transactions may be coming from multiple different threads,
this PRs guard some of the WireReaderImpl's member with a mutex to make
sure there's no races between threads.
pull/27243/head
Ta-Wei Tu 3 years ago committed by GitHub
parent e7dbac5e26
commit 554bbb6ca5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
  2. 17
      src/core/ext/transport/binder/wire_format/wire_reader_impl.h
  3. 34
      test/core/transport/binder/wire_reader_test.cc

@ -85,12 +85,21 @@ std::shared_ptr<WireWriter> WireReaderImpl::SetupTransport(
gpr_log(GPR_INFO, "Setting up transport"); gpr_log(GPR_INFO, "Setting up transport");
if (!is_client_) { if (!is_client_) {
SendSetupTransport(binder.get()); SendSetupTransport(binder.get());
return absl::make_unique<WireWriterImpl>(std::move(binder)); {
grpc_core::MutexLock lock(&mu_);
connected_ = true;
wire_writer_ = std::make_shared<WireWriterImpl>(std::move(binder));
}
return wire_writer_;
} else { } else {
SendSetupTransport(binder.get()); SendSetupTransport(binder.get());
auto other_end_binder = RecvSetupTransport(); auto other_end_binder = RecvSetupTransport();
wire_writer_ = {
std::make_shared<WireWriterImpl>(std::move(other_end_binder)); grpc_core::MutexLock lock(&mu_);
connected_ = true;
wire_writer_ =
std::make_shared<WireWriterImpl>(std::move(other_end_binder));
}
return wire_writer_; return wire_writer_;
} }
} }
@ -155,12 +164,20 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
return absl::OkStatus(); return absl::OkStatus();
} }
grpc_core::MutexLock lock(&mu_);
if (BinderTransportTxCode(code) != BinderTransportTxCode::SETUP_TRANSPORT &&
!connected_) {
return absl::InvalidArgumentError("Transports not connected yet");
}
switch (BinderTransportTxCode(code)) { switch (BinderTransportTxCode(code)) {
case BinderTransportTxCode::SETUP_TRANSPORT: { case BinderTransportTxCode::SETUP_TRANSPORT: {
if (connected_) { if (recvd_setup_transport_) {
return absl::InvalidArgumentError("Already connected"); return absl::InvalidArgumentError(
"Already received a SETUP_TRANSPORT request");
} }
connected_ = true; recvd_setup_transport_ = true;
// int datasize; // int datasize;
int version; int version;
// getDataSize not supported until 31 // getDataSize not supported until 31
@ -212,6 +229,11 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
absl::Status WireReaderImpl::ProcessStreamingTransaction( absl::Status WireReaderImpl::ProcessStreamingTransaction(
transaction_code_t code, const ReadableParcel* parcel) { transaction_code_t code, const ReadableParcel* parcel) {
grpc_core::MutexLock lock(&mu_);
if (!connected_) {
return absl::InvalidArgumentError("Transports not connected yet");
}
// Indicate which callbacks should be cancelled. It will be initialized as the // Indicate which callbacks should be cancelled. It will be initialized as the
// flags the in-coming transaction carries, and when a particular callback is // flags the in-coming transaction carries, and when a particular callback is
// completed, the corresponding bit in cancellation_flag will be set to 0 so // completed, the corresponding bit in cancellation_flag will be set to 0 so

@ -96,16 +96,21 @@ class WireReaderImpl : public WireReader {
const ReadableParcel* parcel); const ReadableParcel* parcel);
absl::Status ProcessStreamingTransactionImpl(transaction_code_t code, absl::Status ProcessStreamingTransactionImpl(transaction_code_t code,
const ReadableParcel* parcel, const ReadableParcel* parcel,
int* cancellation_flags); int* cancellation_flags)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_; std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
absl::Notification connection_noti_; absl::Notification connection_noti_;
bool connected_ = false; grpc_core::Mutex mu_;
bool connected_ ABSL_GUARDED_BY(mu_) = false;
bool recvd_setup_transport_ ABSL_GUARDED_BY(mu_) = false;
// NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is // NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is
// called. Be cautious not to access it afterward. // called. Be cautious not to access it afterward.
std::unique_ptr<Binder> other_end_binder_; std::unique_ptr<Binder> other_end_binder_;
absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_; absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_
absl::flat_hash_map<transaction_code_t, std::string> message_buffer_; ABSL_GUARDED_BY(mu_);
absl::flat_hash_map<transaction_code_t, std::string> message_buffer_
ABSL_GUARDED_BY(mu_);
std::unique_ptr<TransactionReceiver> tx_receiver_; std::unique_ptr<TransactionReceiver> tx_receiver_;
bool is_client_; bool is_client_;
// When WireReaderImpl gets destructed, call on_destruct_callback_. This is // When WireReaderImpl gets destructed, call on_destruct_callback_. This is
@ -114,8 +119,8 @@ class WireReaderImpl : public WireReader {
// ACK every 16k bytes. // ACK every 16k bytes.
static constexpr int64_t kFlowControlAckBytes = 16 * 1024; static constexpr int64_t kFlowControlAckBytes = 16 * 1024;
int64_t num_incoming_bytes_ = 0; int64_t num_incoming_bytes_ ABSL_GUARDED_BY(mu_) = 0;
int64_t num_acknowledged_bytes_ = 0; int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(mu_) = 0;
// Used to send ACK. // Used to send ACK.
std::shared_ptr<WireWriter> wire_writer_; std::shared_ptr<WireWriter> wire_writer_;

@ -20,6 +20,7 @@
// receiver are correct in all possible situations. // receiver are correct in all possible situations.
#include <memory> #include <memory>
#include <string> #include <string>
#include <thread>
#include <utility> #include <utility>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -63,6 +64,14 @@ class WireReaderTest : public ::testing::Test {
} }
} }
void UnblockSetupTransport() {
// SETUP_TRANSPORT should finish before we can proceed with any other
// requests and streaming calls. The MockBinder will construct a
// MockTransactionReceiver, which will then sends SETUP_TRANSPORT request
// back to us.
wire_reader_.SetupTransport(absl::make_unique<MockBinder>());
}
template <typename T> template <typename T>
absl::Status CallProcessTransaction(T tx_code) { absl::Status CallProcessTransaction(T tx_code) {
return wire_reader_.ProcessTransaction( return wire_reader_.ProcessTransaction(
@ -117,23 +126,12 @@ TEST_F(WireReaderTest, SetupTransport) {
TEST_F(WireReaderTest, ProcessTransactionControlMessageSetupTransport) { TEST_F(WireReaderTest, ProcessTransactionControlMessageSetupTransport) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
EXPECT_CALL(mock_readable_parcel_, ReadInt32);
EXPECT_CALL(mock_readable_parcel_, ReadBinder)
.WillOnce([&](std::unique_ptr<Binder>* binder) {
auto mock_binder = absl::make_unique<MockBinder>();
// binder that is read from the output parcel must first be initialized
// before it can be used.
EXPECT_CALL(*mock_binder, Initialize);
*binder = std::move(mock_binder);
return absl::OkStatus();
});
EXPECT_TRUE(
CallProcessTransaction(BinderTransportTxCode::SETUP_TRANSPORT).ok());
} }
TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) { TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) {
::testing::InSequence sequence;
UnblockSetupTransport();
EXPECT_CALL(mock_readable_parcel_, ReadInt32); EXPECT_CALL(mock_readable_parcel_, ReadInt32);
EXPECT_TRUE( EXPECT_TRUE(
CallProcessTransaction(BinderTransportTxCode::PING_RESPONSE).ok()); CallProcessTransaction(BinderTransportTxCode::PING_RESPONSE).ok());
@ -141,6 +139,7 @@ TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) {
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// first transaction: empty flag // first transaction: empty flag
ExpectReadInt32(0); ExpectReadInt32(0);
@ -151,6 +150,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) {
TEST_F(WireReaderTest, TEST_F(WireReaderTest,
ProcessTransactionServerRpcDataFlagPrefixWithoutMetadata) { ProcessTransactionServerRpcDataFlagPrefixWithoutMetadata) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagPrefix); ExpectReadInt32(kFlagPrefix);
@ -168,6 +168,7 @@ TEST_F(WireReaderTest,
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagPrefix); ExpectReadInt32(kFlagPrefix);
@ -200,6 +201,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) {
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagMessageData); ExpectReadInt32(kFlagMessageData);
@ -218,6 +220,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) {
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagMessageData); ExpectReadInt32(kFlagMessageData);
@ -236,6 +239,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) {
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
constexpr int kStatus = 0x1234; constexpr int kStatus = 0x1234;
// flag // flag
@ -255,6 +259,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) {
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) { TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagSuffix); ExpectReadInt32(kFlagSuffix);
@ -272,6 +277,7 @@ TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) {
TEST_F(WireReaderTest, InBoundFlowControl) { TEST_F(WireReaderTest, InBoundFlowControl) {
::testing::InSequence sequence; ::testing::InSequence sequence;
UnblockSetupTransport();
// flag // flag
ExpectReadInt32(kFlagMessageData | kFlagMessageDataIsPartial); ExpectReadInt32(kFlagMessageData | kFlagMessageDataIsPartial);

Loading…
Cancel
Save