Import wire format implementation of BinderChannel protocol (#26788)

This is the import of part of our binder transport implementation from
internal repository.

This "Wire Format" part implements the protocol described at
https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md

This part of the code does not interact with gRPC transport layer. We
will import other components in the next few pull requests.

The code is well-tested in the repository we are importing from. Due to
the difficulty of setting up the tests properly (cannot automatically
import them by script), we will import them after we have finished
importing other codes.

For now we confirm that the code builds via
`bazel build //src/core/ext/transport/binder/wire_format:all`
and some local testing with actual Android tool chain.
pull/26881/head
Ming-Chuan 4 years ago committed by GitHub
parent 013e67a029
commit dd4209e972
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      src/core/ext/transport/binder/utils/BUILD
  2. 61
      src/core/ext/transport/binder/utils/transport_stream_receiver.h
  3. 139
      src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
  4. 81
      src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
  5. 111
      src/core/ext/transport/binder/wire_format/BUILD
  6. 101
      src/core/ext/transport/binder/wire_format/binder.h
  7. 240
      src/core/ext/transport/binder/wire_format/binder_android.cc
  8. 128
      src/core/ext/transport/binder/wire_format/binder_android.h
  9. 30
      src/core/ext/transport/binder/wire_format/binder_constants.cc
  10. 52
      src/core/ext/transport/binder/wire_format/binder_constants.h
  11. 29
      src/core/ext/transport/binder/wire_format/transaction.cc
  12. 103
      src/core/ext/transport/binder/wire_format/transaction.h
  13. 37
      src/core/ext/transport/binder/wire_format/wire_reader.h
  14. 273
      src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
  15. 76
      src/core/ext/transport/binder/wire_format/wire_reader_impl.h
  16. 80
      src/core/ext/transport/binder/wire_format/wire_writer.cc
  17. 49
      src/core/ext/transport/binder/wire_format/wire_writer.h

@ -0,0 +1,42 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library")
licenses(["notice"])
package(
default_visibility = ["//visibility:public"],
)
grpc_cc_library(
name = "transport_stream_receiver",
srcs = [
"transport_stream_receiver_impl.cc",
],
hdrs = [
"transport_stream_receiver.h",
"transport_stream_receiver_impl.h",
],
external_deps = [
"absl/strings",
"absl/synchronization",
"absl/types:optional",
],
deps = [
"//:gpr",
"//:gpr_base",
"//src/core/ext/transport/binder/wire_format:transaction",
],
)

@ -0,0 +1,61 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_H
#include <grpc/impl/codegen/port_platform.h>
#include <functional>
#include <string>
#include <vector>
#include "src/core/ext/transport/binder/wire_format/transaction.h"
namespace grpc_binder {
typedef int StreamIdentifier;
class TransportStreamReceiver {
public:
virtual ~TransportStreamReceiver() = default;
// Only handles single time invocation. Callback object will be deleted.
// The callback should be valid until invocation or unregister.
virtual void RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) = 0;
// TODO(mingcl): Use string_view
virtual void RegisterRecvMessage(
StreamIdentifier id, std::function<void(const std::string&)> cb) = 0;
virtual void RegisterRecvTrailingMetadata(
StreamIdentifier id, std::function<void(const Metadata&, int)> cb) = 0;
// TODO(mingcl): Provide a way to unregister callback?
// TODO(mingcl): Figure out how to handle the case where there is no callback
// registered for the stream. For now, I don't see this case happening in
// unary calls. So we would probably just crash the program for now.
// For streaming calls it does happen, for now we simply queue them.
virtual void NotifyRecvInitialMetadata(StreamIdentifier id,
const Metadata& initial_metadata) = 0;
virtual void NotifyRecvMessage(StreamIdentifier id,
const std::string& message) = 0;
virtual void NotifyRecvTrailingMetadata(StreamIdentifier id,
const Metadata& trailing_metadata,
int status) = 0;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_H

@ -0,0 +1,139 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
#include <grpc/support/log.h>
#include <functional>
#include <string>
#include <utility>
namespace grpc_binder {
void TransportStreamReceiverImpl::RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
GPR_ASSERT(initial_metadata_cbs_.count(id) == 0);
auto iter = pending_initial_metadata_.find(id);
if (iter == pending_initial_metadata_.end()) {
initial_metadata_cbs_[id] = std::move(cb);
} else {
cb(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_initial_metadata_.erase(iter);
}
}
}
void TransportStreamReceiverImpl::RegisterRecvMessage(
StreamIdentifier id, std::function<void(const std::string&)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
GPR_ASSERT(message_cbs_.count(id) == 0);
auto iter = pending_message_.find(id);
if (iter == pending_message_.end()) {
message_cbs_[id] = std::move(cb);
} else {
cb(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_message_.erase(iter);
}
}
}
void TransportStreamReceiverImpl::RegisterRecvTrailingMetadata(
StreamIdentifier id, std::function<void(const Metadata&, int)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
GPR_ASSERT(trailing_metadata_cbs_.count(id) == 0);
auto iter = pending_trailing_metadata_.find(id);
if (iter == pending_trailing_metadata_.end()) {
trailing_metadata_cbs_[id] = std::move(cb);
} else {
{
const auto& p = iter->second.front();
cb(p.first, p.second);
}
iter->second.pop();
if (iter->second.empty()) {
pending_trailing_metadata_.erase(iter);
}
}
}
void TransportStreamReceiverImpl::NotifyRecvInitialMetadata(
StreamIdentifier id, const Metadata& initial_metadata) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const Metadata&)> cb;
{
grpc_core::MutexLock l(&m_);
auto iter = initial_metadata_cbs_.find(id);
if (iter != initial_metadata_cbs_.end()) {
cb = iter->second;
initial_metadata_cbs_.erase(iter);
} else {
pending_initial_metadata_[id].push(initial_metadata);
}
}
if (cb != nullptr) {
cb(initial_metadata);
}
}
void TransportStreamReceiverImpl::NotifyRecvMessage(
StreamIdentifier id, const std::string& message) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const std::string&)> cb;
{
grpc_core::MutexLock l(&m_);
auto iter = message_cbs_.find(id);
if (iter != message_cbs_.end()) {
cb = iter->second;
message_cbs_.erase(iter);
} else {
pending_message_[id].push(message);
}
}
if (cb != nullptr) {
cb(message);
}
}
void TransportStreamReceiverImpl::NotifyRecvTrailingMetadata(
StreamIdentifier id, const Metadata& trailing_metadata, int status) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const Metadata&, int)> cb;
{
grpc_core::MutexLock l(&m_);
auto iter = trailing_metadata_cbs_.find(id);
if (iter != trailing_metadata_cbs_.end()) {
cb = iter->second;
trailing_metadata_cbs_.erase(iter);
} else {
pending_trailing_metadata_[id].emplace(trailing_metadata, status);
}
}
if (cb != nullptr) {
cb(trailing_metadata, status);
}
}
} // namespace grpc_binder

@ -0,0 +1,81 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
#include <grpc/impl/codegen/port_platform.h>
#include <functional>
#include <map>
#include <queue>
#include <string>
#include <vector>
#include "absl/types/optional.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_binder {
// Routes the data received from transport to corresponding streams
class TransportStreamReceiverImpl : public TransportStreamReceiver {
public:
void RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) override;
void RegisterRecvMessage(StreamIdentifier id,
std::function<void(const std::string&)> cb) override;
void RegisterRecvTrailingMetadata(
StreamIdentifier id,
std::function<void(const Metadata&, int)> cb) override;
void NotifyRecvInitialMetadata(StreamIdentifier id,
const Metadata& initial_metadata) override;
void NotifyRecvMessage(StreamIdentifier id,
const std::string& message) override;
void NotifyRecvTrailingMetadata(StreamIdentifier id,
const Metadata& trailing_metadata,
int status) override;
private:
std::map<StreamIdentifier, std::function<void(const Metadata&)>>
initial_metadata_cbs_;
std::map<StreamIdentifier, std::function<void(const std::string&)>>
message_cbs_;
std::map<StreamIdentifier, std::function<void(const Metadata&, int)>>
trailing_metadata_cbs_;
// TODO(waynetu): Better thread safety design. For example, use separate
// mutexes for different type of messages.
grpc_core::Mutex m_;
// TODO(waynetu): gRPC surface layer will not wait for the current message to
// be delivered before sending the next message. The following implementation
// is still buggy with the current implementation of wire writer if
// transaction issued first completes after the one issued later does. This is
// because we just take the first element out of the queue and assume it's the
// one issued first without further checking, which results in callbacks being
// invoked with incorrect data.
//
// This should be fixed in the wire writer level and make sure out-of-order
// messages will be re-ordered by it. In such case, the queueing approach will
// work fine. Refer to the TODO in WireWriterImpl::ProcessTransaction() at
// wire_reader_impl.cc for detecting and resolving out-of-order transactions.
std::map<StreamIdentifier, std::queue<Metadata>> pending_initial_metadata_
ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier, std::queue<std::string>> pending_message_
ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier, std::queue<std::pair<Metadata, int>>>
pending_trailing_metadata_ ABSL_GUARDED_BY(m_);
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H

@ -0,0 +1,111 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library")
licenses(["notice"])
package(
default_visibility = ["//visibility:public"],
)
grpc_cc_library(
name = "transaction",
srcs = [
"transaction.cc",
],
hdrs = [
"transaction.h",
],
external_deps = [
"absl/base:core_headers",
"absl/strings",
],
deps = [
"//:gpr",
],
)
grpc_cc_library(
name = "binder",
srcs = [
"binder_constants.cc",
],
hdrs = [
"binder.h",
"binder_constants.h",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/strings",
],
deps = [
"//:gpr",
],
)
grpc_cc_library(
name = "binder_android",
srcs = [
"binder_android.cc",
],
hdrs = [
"binder_android.h",
],
external_deps = [
"absl/synchronization",
"absl/memory",
],
deps = [
":binder",
],
)
grpc_cc_library(
name = "wire_writer",
srcs = ["wire_writer.cc"],
hdrs = ["wire_writer.h"],
external_deps = [
"absl/strings",
],
visibility = ["//visibility:public"],
deps = [
":binder",
":transaction",
"//:gpr",
"//:gpr_base",
],
)
grpc_cc_library(
name = "wire_reader",
srcs = ["wire_reader_impl.cc"],
hdrs = [
"wire_reader.h",
"wire_reader_impl.h",
],
external_deps = [
"absl/container:flat_hash_map",
"absl/synchronization",
"absl/memory",
"absl/status:statusor",
],
deps = [
":binder",
":wire_writer",
"//:gpr",
"//src/core/ext/transport/binder/utils:transport_stream_receiver",
],
)

@ -0,0 +1,101 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_H
#include <grpc/impl/codegen/port_platform.h>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "src/core/ext/transport/binder/wire_format/binder_constants.h"
namespace grpc_binder {
class HasRawBinder {
public:
virtual ~HasRawBinder() = default;
virtual void* GetRawBinder() = 0;
};
class Binder;
// TODO(waynetu): We might need other methods as well.
// TODO(waynetu): Find a better way to express the returned status than
// binder_status_t.
class WritableParcel {
public:
virtual ~WritableParcel() = default;
virtual int32_t GetDataPosition() const = 0;
virtual absl::Status SetDataPosition(int32_t pos) = 0;
virtual absl::Status WriteInt32(int32_t data) = 0;
virtual absl::Status WriteBinder(HasRawBinder* binder) = 0;
virtual absl::Status WriteString(absl::string_view s) = 0;
virtual absl::Status WriteByteArray(const int8_t* buffer, int32_t length) = 0;
absl::Status WriteByteArrayWithLength(absl::string_view buffer) {
absl::Status status = WriteInt32(buffer.length());
if (!status.ok()) return status;
if (buffer.empty()) return absl::OkStatus();
return WriteByteArray(reinterpret_cast<const int8_t*>(buffer.data()),
buffer.length());
}
};
// TODO(waynetu): We might need other methods as well.
// TODO(waynetu): Find a better way to express the returned status than
// binder_status_t.
class ReadableParcel {
public:
virtual ~ReadableParcel() = default;
virtual absl::Status ReadInt32(int32_t* data) const = 0;
virtual absl::Status ReadBinder(std::unique_ptr<Binder>* data) const = 0;
// TODO(waynetu): Provide better interfaces.
virtual absl::Status ReadByteArray(std::string* data) const = 0;
// FIXME(waynetu): This is just a temporary interface.
virtual absl::Status ReadString(char data[111]) const = 0;
};
class TransactionReceiver : public HasRawBinder {
public:
using OnTransactCb =
std::function<absl::Status(transaction_code_t, const ReadableParcel*)>;
~TransactionReceiver() override = default;
};
class Binder : public HasRawBinder {
public:
~Binder() override = default;
virtual void Initialize() = 0;
virtual absl::Status PrepareTransaction() = 0;
virtual absl::Status Transact(BinderTransportTxCode tx_code) = 0;
virtual WritableParcel* GetWritableParcel() const = 0;
virtual ReadableParcel* GetReadableParcel() const = 0;
// TODO(waynetu): Can we decouple the receiver from the binder?
virtual std::unique_ptr<TransactionReceiver> ConstructTxReceiver(
TransactionReceiver::OnTransactCb transact_cb) const = 0;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_H

@ -0,0 +1,240 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/wire_format/binder_android.h"
#if defined(ANDROID) || defined(__ANDROID__)
#include <grpc/support/log.h>
#include <map>
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "src/core/lib/gprpp/sync.h"
namespace {
struct AtomicCallback {
explicit AtomicCallback(void* callback) : mu{}, callback(callback) {}
grpc_core::Mutex mu;
void* callback ABSL_GUARDED_BY(mu);
};
void* f_onCreate_with_mutex(void* callback) {
return new AtomicCallback(callback);
}
void* f_onCreate_noop(void* args) { return nullptr; }
void f_onDestroy_noop(void* userData) {}
// TODO(mingcl): Consider if thread safety is a requirement here
binder_status_t f_onTransact(AIBinder* binder, transaction_code_t code,
const AParcel* in, AParcel* out) {
gpr_log(GPR_INFO, __func__);
gpr_log(GPR_INFO, "tx code = %u", code);
auto* user_data =
reinterpret_cast<AtomicCallback*>(AIBinder_getUserData(binder));
grpc_core::MutexLock lock(&user_data->mu);
// TODO(waynetu): What should be returned here?
if (!user_data->callback) return STATUS_OK;
auto* callback =
reinterpret_cast<grpc_binder::TransactionReceiver::OnTransactCb*>(
user_data->callback);
// Wrap the parcel in a ReadableParcel.
std::unique_ptr<grpc_binder::ReadableParcel> output =
absl::make_unique<grpc_binder::ReadableParcelAndroid>(in);
// The lock should be released "after" the callback finishes.
absl::Status status = (*callback)(code, output.get());
return status.ok() ? STATUS_OK : STATUS_UNKNOWN_ERROR;
}
} // namespace
namespace grpc_binder {
ndk::SpAIBinder FromJavaBinder(JNIEnv* jni_env, jobject binder) {
return ndk::SpAIBinder(AIBinder_fromJavaBinder(jni_env, binder));
}
TransactionReceiverAndroid::TransactionReceiverAndroid(OnTransactCb transact_cb)
: transact_cb_(transact_cb) {
// TODO(mingcl): For now interface descriptor is always empty, figure out if
// we want it to be something more meaningful (we can probably manually change
// interface descriptor by modifying Java code's reply to
// os.IBinder.INTERFACE_TRANSACTION)
AIBinder_Class* aibinder_class = AIBinder_Class_define(
/*interfaceDescriptor=*/"", f_onCreate_with_mutex, f_onDestroy_noop,
f_onTransact);
// Pass the on-transact callback to the on-create function of the binder. The
// on-create function equips the callback with a mutex and gives it to the
// user data stored in the binder which can be retrieved later.
binder_ = AIBinder_new(aibinder_class, &transact_cb_);
GPR_ASSERT(binder_);
gpr_log(GPR_INFO, "AIBinder_associateClass = %d",
static_cast<int>(AIBinder_associateClass(binder_, aibinder_class)));
}
TransactionReceiverAndroid::~TransactionReceiverAndroid() {
auto* user_data =
reinterpret_cast<AtomicCallback*>(AIBinder_getUserData(binder_));
{
grpc_core::MutexLock lock(&user_data->mu);
// Set the callback to null so that future calls to on-trasact are awared
// that the transaction receiver had been deallocated.
user_data->callback = nullptr;
}
// Release the binder.
AIBinder_decStrong(binder_);
}
namespace {
binder_status_t f_onTransact_noop(AIBinder* binder, transaction_code_t code,
const AParcel* in, AParcel* out) {
return {};
}
void AssociateWithNoopClass(AIBinder* binder) {
// Need to associate class before using it
AIBinder_Class* aibinder_class = AIBinder_Class_define(
"", f_onCreate_noop, f_onDestroy_noop, f_onTransact_noop);
gpr_log(GPR_INFO, "AIBinder_associateClass = %d",
static_cast<int>(AIBinder_associateClass(binder, aibinder_class)));
}
} // namespace
void BinderAndroid::Initialize() {
AIBinder* binder = binder_.get();
AssociateWithNoopClass(binder);
}
absl::Status BinderAndroid::PrepareTransaction() {
AIBinder* binder = binder_.get();
return AIBinder_prepareTransaction(binder, &input_parcel_->parcel_) ==
STATUS_OK
? absl::OkStatus()
: absl::InternalError("AIBinder_prepareTransaction failed");
}
absl::Status BinderAndroid::Transact(BinderTransportTxCode tx_code) {
AIBinder* binder = binder_.get();
return AIBinder_transact(binder, static_cast<transaction_code_t>(tx_code),
&input_parcel_->parcel_, &output_parcel_->parcel_,
FLAG_ONEWAY) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AIBinder_transact failed");
}
std::unique_ptr<TransactionReceiver> BinderAndroid::ConstructTxReceiver(
TransactionReceiver::OnTransactCb transact_cb) const {
return absl::make_unique<TransactionReceiverAndroid>(transact_cb);
}
int32_t WritableParcelAndroid::GetDataPosition() const {
return AParcel_getDataPosition(parcel_);
}
absl::Status WritableParcelAndroid::SetDataPosition(int32_t pos) {
return AParcel_setDataPosition(parcel_, pos) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_setDataPosition failed");
}
absl::Status WritableParcelAndroid::WriteInt32(int32_t data) {
return AParcel_writeInt32(parcel_, data) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_writeInt32 failed");
}
absl::Status WritableParcelAndroid::WriteBinder(HasRawBinder* binder) {
return AParcel_writeStrongBinder(
parcel_, reinterpret_cast<AIBinder*>(binder->GetRawBinder())) ==
STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_writeStrongBinder failed");
}
absl::Status WritableParcelAndroid::WriteString(absl::string_view s) {
return AParcel_writeString(parcel_, s.data(), s.length()) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_writeString failed");
}
absl::Status WritableParcelAndroid::WriteByteArray(const int8_t* buffer,
int32_t length) {
return AParcel_writeByteArray(parcel_, buffer, length) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_writeByteArray failed");
}
absl::Status ReadableParcelAndroid::ReadInt32(int32_t* data) const {
return AParcel_readInt32(parcel_, data) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_readInt32 failed");
}
absl::Status ReadableParcelAndroid::ReadBinder(
std::unique_ptr<Binder>* data) const {
AIBinder* binder;
if (AParcel_readStrongBinder(parcel_, &binder) != STATUS_OK) {
*data = nullptr;
return absl::InternalError("AParcel_readStrongBinder failed");
}
*data = absl::make_unique<BinderAndroid>(ndk::SpAIBinder(binder));
return absl::OkStatus();
}
namespace {
bool byte_array_allocator(void* arrayData, int32_t length, int8_t** outBuffer) {
std::string tmp;
tmp.resize(length);
*reinterpret_cast<std::string*>(arrayData) = tmp;
*outBuffer = reinterpret_cast<int8_t*>(
reinterpret_cast<std::string*>(arrayData)->data());
return true;
}
bool string_allocator(void* stringData, int32_t length, char** outBuffer) {
if (length > 0) {
// TODO(mingcl): Don't fix the length of the string
GPR_ASSERT(length < 100); // call should preallocate 100 bytes
*outBuffer = reinterpret_cast<char*>(stringData);
}
return true;
}
} // namespace
absl::Status ReadableParcelAndroid::ReadByteArray(std::string* data) const {
return AParcel_readByteArray(parcel_, data, byte_array_allocator) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_readByteArray failed");
}
absl::Status ReadableParcelAndroid::ReadString(char data[111]) const {
return AParcel_readString(parcel_, data, string_allocator) == STATUS_OK
? absl::OkStatus()
: absl::InternalError("AParcel_readString failed");
}
} // namespace grpc_binder
#endif // defined(ANDROID) || defined(__ANDROID__)

@ -0,0 +1,128 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_ANDROID_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_ANDROID_H
#if defined(ANDROID) || defined(__ANDROID__)
#include <grpc/impl/codegen/port_platform.h>
#include <android/binder_auto_utils.h>
#include <android/binder_ibinder.h>
#include <android/binder_ibinder_jni.h>
#include <android/binder_interface_utils.h>
#include <jni.h>
#include <memory>
#include "absl/memory/memory.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
// TODO(b/192208764): move this check to somewhere else
#if __ANDROID_API__ < 29
#error "We only support Android API level >= 29."
#endif
namespace grpc_binder {
ndk::SpAIBinder FromJavaBinder(JNIEnv* jni_env, jobject binder);
class BinderAndroid;
class WritableParcelAndroid final : public WritableParcel {
public:
WritableParcelAndroid() = default;
explicit WritableParcelAndroid(AParcel* parcel) : parcel_(parcel) {}
~WritableParcelAndroid() override = default;
int32_t GetDataPosition() const override;
absl::Status SetDataPosition(int32_t pos) override;
absl::Status WriteInt32(int32_t data) override;
absl::Status WriteBinder(HasRawBinder* binder) override;
absl::Status WriteString(absl::string_view s) override;
absl::Status WriteByteArray(const int8_t* buffer, int32_t length) override;
private:
AParcel* parcel_ = nullptr;
friend class BinderAndroid;
};
class ReadableParcelAndroid final : public ReadableParcel {
public:
ReadableParcelAndroid() = default;
// TODO(waynetu): Get rid of the const_cast.
explicit ReadableParcelAndroid(const AParcel* parcel)
: parcel_(const_cast<AParcel*>(parcel)) {}
~ReadableParcelAndroid() override = default;
absl::Status ReadInt32(int32_t* data) const override;
absl::Status ReadBinder(std::unique_ptr<Binder>* data) const override;
absl::Status ReadByteArray(std::string* data) const override;
// FIXME(waynetu): Fix the interface.
absl::Status ReadString(char data[111]) const override;
private:
AParcel* parcel_ = nullptr;
friend class BinderAndroid;
};
class BinderAndroid final : public Binder {
public:
explicit BinderAndroid(ndk::SpAIBinder binder)
: binder_(binder),
input_parcel_(absl::make_unique<WritableParcelAndroid>()),
output_parcel_(absl::make_unique<ReadableParcelAndroid>()) {}
~BinderAndroid() override = default;
void* GetRawBinder() override { return binder_.get(); }
void Initialize() override;
absl::Status PrepareTransaction() override;
absl::Status Transact(BinderTransportTxCode tx_code) override;
WritableParcel* GetWritableParcel() const override {
return input_parcel_.get();
}
ReadableParcel* GetReadableParcel() const override {
return output_parcel_.get();
};
std::unique_ptr<TransactionReceiver> ConstructTxReceiver(
TransactionReceiver::OnTransactCb transact_cb) const override;
private:
ndk::SpAIBinder binder_;
std::unique_ptr<WritableParcelAndroid> input_parcel_;
std::unique_ptr<ReadableParcelAndroid> output_parcel_;
};
class TransactionReceiverAndroid final : public TransactionReceiver {
public:
explicit TransactionReceiverAndroid(OnTransactCb transaction_cb);
~TransactionReceiverAndroid() override;
void* GetRawBinder() override { return binder_; }
private:
AIBinder* binder_;
OnTransactCb transact_cb_;
};
} // namespace grpc_binder
#endif // defined(ANDROID) || defined(__ANDROID__)
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_ANDROID_H

@ -0,0 +1,30 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/wire_format/binder_constants.h"
#ifndef ANDROID
const int FIRST_CALL_TRANSACTION = 0x00000001;
const int LAST_CALL_TRANSACTION = 0x00FFFFFF;
#endif // ANDROID
namespace grpc_binder {
const int kFirstCallId = FIRST_CALL_TRANSACTION + 1000;
} // namespace grpc_binder

@ -0,0 +1,52 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_CONSTANTS_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_CONSTANTS_H
#include <grpc/impl/codegen/port_platform.h>
#include "absl/base/attributes.h"
#if defined(ANDROID) || defined(__ANDROID__)
#include <android/binder_auto_utils.h>
#include <android/binder_ibinder.h>
#else
#include <cstdint>
using transaction_code_t = uint32_t;
ABSL_CONST_INIT extern const int FIRST_CALL_TRANSACTION;
ABSL_CONST_INIT extern const int LAST_CALL_TRANSACTION;
#endif // defined(ANDROID) || defined(__ANDROID__)
namespace grpc_binder {
enum class BinderTransportTxCode {
SETUP_TRANSPORT = 1,
SHUTDOWN_TRANSPORT = 2,
ACKNOWLEDGE_BYTES = 3,
PING = 4,
PING_RESPONSE = 5,
};
ABSL_CONST_INIT extern const int kFirstCallId;
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_BINDER_CONSTANTS_H

@ -0,0 +1,29 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/wire_format/transaction.h"
namespace grpc_binder {
const int kFlagPrefix = 0x1;
const int kFlagMessageData = 0x2;
const int kFlagSuffix = 0x4;
const int kFlagOutOfBandClose = 0x8;
const int kFlagExpectSingleMessage = 0x10;
const int kFlagStatusDescription = 0x20;
const int kFlagMessageDataIsParcelable = 0x40;
} // namespace grpc_binder

@ -0,0 +1,103 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_TRANSACTION_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_TRANSACTION_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/support/log.h>
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
namespace grpc_binder {
ABSL_CONST_INIT extern const int kFlagPrefix;
ABSL_CONST_INIT extern const int kFlagMessageData;
ABSL_CONST_INIT extern const int kFlagSuffix;
ABSL_CONST_INIT extern const int kFlagOutOfBandClose;
ABSL_CONST_INIT extern const int kFlagExpectSingleMessage;
ABSL_CONST_INIT extern const int kFlagStatusDescription;
ABSL_CONST_INIT extern const int kFlagMessageDataIsParcelable;
using Metadata = std::vector<std::pair<std::string, std::string>>;
class Transaction {
public:
Transaction(int tx_code, int seq_num, bool is_client)
: tx_code_(tx_code), seq_num_(seq_num), is_client_(is_client) {}
// TODO(mingcl): Consider using string_view
void SetPrefix(Metadata prefix_metadata) {
prefix_metadata_ = prefix_metadata;
GPR_ASSERT((flags_ & kFlagPrefix) == 0);
flags_ |= kFlagPrefix;
}
void SetMethodRef(std::string method_ref) {
GPR_ASSERT(is_client_);
method_ref_ = method_ref;
}
void SetData(std::string message_data) {
message_data_ = message_data;
GPR_ASSERT((flags_ & kFlagMessageData) == 0);
flags_ |= kFlagMessageData;
}
void SetSuffix(Metadata suffix_metadata) {
if (is_client_) GPR_ASSERT(suffix_metadata.empty());
suffix_metadata_ = suffix_metadata;
GPR_ASSERT((flags_ & kFlagSuffix) == 0);
flags_ |= kFlagSuffix;
}
void SetStatusDescription(std::string status_desc) {
GPR_ASSERT(!is_client_);
GPR_ASSERT((flags_ & kFlagStatusDescription) == 0);
status_desc_ = status_desc;
}
void SetStatus(int status) {
GPR_ASSERT(!is_client_);
GPR_ASSERT((flags_ >> 16) == 0);
GPR_ASSERT(status < (1 << 16));
flags_ |= (status << 16);
}
bool IsClient() const { return is_client_; }
bool IsServer() const { return !is_client_; }
int GetTxCode() const { return tx_code_; }
int GetSeqNum() const { return seq_num_; }
int GetFlags() const { return flags_; }
absl::string_view GetMethodRef() const { return method_ref_; }
const Metadata& GetPrefixMetadata() const { return prefix_metadata_; }
const Metadata& GetSuffixMetadata() const { return suffix_metadata_; }
absl::string_view GetMessageData() const { return message_data_; }
absl::string_view GetStatusDesc() const { return status_desc_; }
private:
int tx_code_;
int seq_num_;
bool is_client_;
Metadata prefix_metadata_;
Metadata suffix_metadata_;
std::string method_ref_;
std::string message_data_;
std::string status_desc_;
int flags_ = 0;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_TRANSACTION_H

@ -0,0 +1,37 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_H
#include <grpc/impl/codegen/port_platform.h>
#include <memory>
#include <utility>
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
namespace grpc_binder {
class WireReader {
public:
virtual ~WireReader() = default;
virtual std::unique_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> endpoint_binder) = 0;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_H

@ -0,0 +1,273 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include <grpc/support/log.h>
#include <functional>
#include <limits>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)
namespace grpc_binder {
namespace {
absl::StatusOr<Metadata> parse_metadata(const ReadableParcel* reader) {
int num_header;
RETURN_IF_ERROR(reader->ReadInt32(&num_header));
gpr_log(GPR_INFO, "num_header = %d", num_header);
std::vector<std::pair<std::string, std::string>> ret;
for (int i = 0; i != num_header; i++) {
int count;
RETURN_IF_ERROR(reader->ReadInt32(&count));
gpr_log(GPR_INFO, "count = %d", count);
std::string key{};
if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&key));
gpr_log(GPR_INFO, "key = %s", key.c_str());
RETURN_IF_ERROR(reader->ReadInt32(&count));
gpr_log(GPR_INFO, "count = %d", count);
std::string value{};
if (count > 0) RETURN_IF_ERROR(reader->ReadByteArray(&value));
gpr_log(GPR_INFO, "value = %s", value.c_str());
ret.push_back({key, value});
}
return ret;
}
} // namespace
WireReaderImpl::WireReaderImpl(
TransportStreamReceiver* transport_stream_receiver, bool is_client)
: transport_stream_receiver_(transport_stream_receiver),
is_client_(is_client) {}
WireReaderImpl::~WireReaderImpl() = default;
std::unique_ptr<WireWriter> WireReaderImpl::SetupTransport(
std::unique_ptr<Binder> binder) {
if (!is_client_) {
gpr_log(GPR_ERROR, "Server-side SETUP_TRANSPORT is not implemented yet.");
return nullptr;
}
gpr_log(GPR_INFO, "Setting up transport");
binder->Initialize();
gpr_log(GPR_INFO, "prepare transaction = %d",
binder->PrepareTransaction().ok());
// Only support client-side transport setup.
SendSetupTransport(binder.get());
RecvSetupTransport();
return absl::make_unique<WireWriterImpl>(std::move(other_end_binder_));
}
void WireReaderImpl::SendSetupTransport(Binder* binder) {
WritableParcel* writable_parcel = binder->GetWritableParcel();
gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition());
// gpr_log(GPR_INFO, "set data position to 0 = %d",
// writer->SetDataPosition(0));
gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition());
int32_t version = 77;
gpr_log(GPR_INFO, "write int32 = %d",
writable_parcel->WriteInt32(version).ok());
gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition());
// The lifetime of the transaction receiver is the same as the wire writer's.
// The transaction receiver is responsible for not calling the on-transact
// callback when it's dead.
tx_receiver_ = binder->ConstructTxReceiver(
[this](transaction_code_t code, const ReadableParcel* readable_parcel) {
return this->ProcessTransaction(code, readable_parcel);
});
gpr_log(GPR_INFO, "tx_receiver = %p", tx_receiver_->GetRawBinder());
gpr_log(GPR_INFO, "AParcel_writeStrongBinder = %d",
writable_parcel->WriteBinder(tx_receiver_.get()).ok());
gpr_log(GPR_INFO, "AIBinder_transact = %d",
binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok());
}
void WireReaderImpl::RecvSetupTransport() {
// TODO(b/191941760): avoid blocking, handle wire_writer_noti lifetime
// better
gpr_log(GPR_INFO, "start waiting for noti");
connection_noti_.WaitForNotification();
gpr_log(GPR_INFO, "end waiting for noti");
}
absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
const ReadableParcel* parcel) {
gpr_log(GPR_INFO, __func__);
gpr_log(GPR_INFO, "tx code = %u", code);
if (code >= static_cast<unsigned>(kFirstCallId)) {
gpr_log(GPR_INFO, "This is probably a Streaming Tx");
return ProcessStreamingTransaction(code, parcel);
}
if (!(code >= static_cast<transaction_code_t>(
BinderTransportTxCode::SETUP_TRANSPORT) &&
code <= static_cast<transaction_code_t>(
BinderTransportTxCode::PING_RESPONSE))) {
gpr_log(GPR_ERROR,
"Received unknown control message. Shutdown transport gracefully.");
// TODO(waynetu): Shutdown transport gracefully.
return absl::OkStatus();
}
switch (BinderTransportTxCode(code)) {
case BinderTransportTxCode::SETUP_TRANSPORT: {
// int datasize;
int version;
// getDataSize not supported until 31
// gpr_log(GPR_INFO, "getDataSize = %d", AParcel_getDataSize(in,
// &datasize));
RETURN_IF_ERROR(parcel->ReadInt32(&version));
// gpr_log(GPR_INFO, "data size = %d", datasize);
gpr_log(GPR_INFO, "version = %d", version);
std::unique_ptr<Binder> binder{};
RETURN_IF_ERROR(parcel->ReadBinder(&binder));
binder->Initialize();
other_end_binder_ = std::move(binder);
connection_noti_.Notify();
break;
}
case BinderTransportTxCode::SHUTDOWN_TRANSPORT: {
gpr_log(GPR_ERROR,
"Received SHUTDOWN_TRANSPORT request but not implemented yet.");
GPR_ASSERT(false);
break;
}
case BinderTransportTxCode::ACKNOWLEDGE_BYTES: {
int num_bytes = -1;
RETURN_IF_ERROR(parcel->ReadInt32(&num_bytes));
gpr_log(GPR_INFO, "received acknowledge bytes = %d", num_bytes);
break;
}
case BinderTransportTxCode::PING: {
GPR_ASSERT(!is_client_);
int ping_id = -1;
RETURN_IF_ERROR(parcel->ReadInt32(&ping_id));
gpr_log(GPR_INFO, "received ping id = %d", ping_id);
// TODO(waynetu): Ping back.
break;
}
case BinderTransportTxCode::PING_RESPONSE: {
int value = -1;
RETURN_IF_ERROR(parcel->ReadInt32(&value));
gpr_log(GPR_INFO, "received ping response = %d", value);
break;
}
}
return absl::OkStatus();
}
absl::Status WireReaderImpl::ProcessStreamingTransaction(
transaction_code_t code, const ReadableParcel* parcel) {
int flags;
RETURN_IF_ERROR(parcel->ReadInt32(&flags));
gpr_log(GPR_INFO, "flags = %d", flags);
// Ignore in-coming transaction with flag = 0 to match with Java
// implementation.
// TODO(waynetu): Check with grpc-java team to see whether this is the
// intended behavior.
// TODO(waynetu): What should be returned here?
if (flags == 0) return absl::OkStatus();
int status = flags >> 16;
gpr_log(GPR_INFO, "status = %d", status);
gpr_log(GPR_INFO, "FLAG_PREFIX = %d", (flags & kFlagPrefix));
gpr_log(GPR_INFO, "FLAG_MESSAGE_DATA = %d", (flags & kFlagMessageData));
gpr_log(GPR_INFO, "FLAG_SUFFIX = %d", (flags & kFlagSuffix));
int seq_num;
RETURN_IF_ERROR(parcel->ReadInt32(&seq_num));
// TODO(waynetu): For now we'll just assume that the transactions commit in
// the same order they're issued. The following assertion detects
// out-of-order or missing transactions. WireReaderImpl should be fixed if
// we indeed found such behavior.
int32_t& expectation = expected_seq_num_[code];
// TODO(mingcl): Don't assert here
GPR_ASSERT(seq_num >= 0);
GPR_ASSERT(seq_num == expectation && "Interleaved sequence number");
// TODO(waynetu): According to the protocol, "The sequence number will wrap
// around to 0 if more than 2^31 messages are sent." For now we'll just
// assert that it never reach such circumstances.
GPR_ASSERT(expectation < std::numeric_limits<int32_t>::max() &&
"Sequence number too large");
expectation++;
gpr_log(GPR_INFO, "sequence number = %d", seq_num);
if (flags & kFlagPrefix) {
char method_ref[111];
if (!is_client_) {
RETURN_IF_ERROR(parcel->ReadString(method_ref));
}
absl::StatusOr<Metadata> initial_metadata_or_error = parse_metadata(parcel);
if (!initial_metadata_or_error.ok()) {
return initial_metadata_or_error.status();
}
if (!is_client_) {
initial_metadata_or_error->emplace_back(":path", method_ref);
}
transport_stream_receiver_->NotifyRecvInitialMetadata(
code, *initial_metadata_or_error);
}
if (flags & kFlagMessageData) {
int count;
RETURN_IF_ERROR(parcel->ReadInt32(&count));
gpr_log(GPR_INFO, "count = %d", count);
std::string msg_data{};
if (count > 0) {
RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data));
}
gpr_log(GPR_INFO, "msg_data = %s", msg_data.c_str());
transport_stream_receiver_->NotifyRecvMessage(code, msg_data);
}
if (flags & kFlagSuffix) {
if (flags & kFlagStatusDescription) {
// FLAG_STATUS_DESCRIPTION set
char desc[111];
RETURN_IF_ERROR(parcel->ReadString(desc));
gpr_log(GPR_INFO, "description = %s", desc);
}
Metadata trailing_metadata;
if (is_client_) {
absl::StatusOr<Metadata> trailing_metadata_or_error =
parse_metadata(parcel);
if (!trailing_metadata_or_error.ok()) {
return trailing_metadata_or_error.status();
}
trailing_metadata = *trailing_metadata_or_error;
}
transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, trailing_metadata, status);
}
return absl::OkStatus();
}
} // namespace grpc_binder

@ -0,0 +1,76 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H
#include <grpc/impl/codegen/port_platform.h>
#include <memory>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/notification.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
namespace grpc_binder {
class WireReaderImpl : public WireReader {
public:
explicit WireReaderImpl(TransportStreamReceiver* transport_stream_receiver,
bool is_client);
~WireReaderImpl() override;
// Setup the transport between endpoint binders.
//
// The client and the server both call SetupTransport() when constructing
// transport.
//
// High-level overview of transaction setup:
// 0. Client obtain an |endpoint_binder| from the server.
// 1. Client creates a binder |client_binder| and hook its on-transaction
// callback to client's ProcessTransaction(). Client then sends
// |client_binder| through |endpoint_binder| to server.
// 2. Server receives |client_binder| via |endpoint_binder|.
// 3. Server creates a binder |server_binder| and hook its on-transaction
// callback to server's ProcessTransaction(). Server then sends
// |server_binder| through |client_binder| back to the client.
// 4. Client receives |server_binder| via |client_binder|'s on-transaction
// callback.
std::unique_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> endpoint_binder) override;
absl::Status ProcessTransaction(transaction_code_t code,
const ReadableParcel* parcel);
private:
void SendSetupTransport(Binder* binder);
void RecvSetupTransport();
absl::Status ProcessStreamingTransaction(transaction_code_t code,
const ReadableParcel* parcel);
TransportStreamReceiver* transport_stream_receiver_;
absl::Notification connection_noti_;
std::unique_ptr<Binder> other_end_binder_;
absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_;
std::unique_ptr<TransactionReceiver> tx_receiver_;
bool is_client_;
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_READER_IMPL_H

@ -0,0 +1,80 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include <grpc/support/log.h>
#include <utility>
#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)
namespace grpc_binder {
WireWriterImpl::WireWriterImpl(std::unique_ptr<Binder> binder)
: binder_(std::move(binder)) {}
absl::Status WireWriterImpl::RpcCall(const Transaction& tx) {
// TODO(mingcl): check tx_code <= last call id
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(tx.GetTxCode() >= kFirstCallId);
RETURN_IF_ERROR(binder_->PrepareTransaction());
WritableParcel* parcel = binder_->GetWritableParcel();
{
// fill parcel
RETURN_IF_ERROR(parcel->WriteInt32(tx.GetFlags()));
RETURN_IF_ERROR(parcel->WriteInt32(tx.GetSeqNum()));
if (tx.GetFlags() & kFlagPrefix) {
// prefix set
if (tx.IsClient()) {
// Only client sends method ref.
RETURN_IF_ERROR(parcel->WriteString(tx.GetMethodRef()));
}
RETURN_IF_ERROR(parcel->WriteInt32(tx.GetPrefixMetadata().size()));
for (const auto& md : tx.GetPrefixMetadata()) {
RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
}
}
if (tx.GetFlags() & kFlagMessageData) {
RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(tx.GetMessageData()));
}
if (tx.GetFlags() & kFlagSuffix) {
if (tx.IsServer()) {
if (tx.GetFlags() & kFlagStatusDescription) {
RETURN_IF_ERROR(parcel->WriteString(tx.GetStatusDesc()));
}
RETURN_IF_ERROR(parcel->WriteInt32(tx.GetSuffixMetadata().size()));
for (const auto& md : tx.GetSuffixMetadata()) {
RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
}
} else {
// client suffix currently is always empty according to the wireformat
if (!tx.GetSuffixMetadata().empty()) {
gpr_log(GPR_ERROR, "Got non-empty suffix metadata from client.");
}
}
}
}
// FIXME(waynetu): Construct BinderTransportTxCode from an arbitrary integer
// is an undefined behavior.
return binder_->Transact(BinderTransportTxCode(tx.GetTxCode()));
}
} // namespace grpc_binder

@ -0,0 +1,49 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/support/log.h>
#include <string>
#include <vector>
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/transaction.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_binder {
class WireWriter {
public:
virtual ~WireWriter() = default;
virtual absl::Status RpcCall(const Transaction& call) = 0;
};
class WireWriterImpl : public WireWriter {
public:
explicit WireWriterImpl(std::unique_ptr<Binder> binder);
absl::Status RpcCall(const Transaction& tx) override;
private:
grpc_core::Mutex mu_;
std::unique_ptr<Binder> binder_ ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_binder
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
Loading…
Cancel
Save