Import binder transport implementation (#26901)

This commit imports the transport implementation from internal
repository. (Some updates to existing wire_format code are also included)

After this, we still need to import codes that creates channel, unit
tests and e2e test to complete the transition to GitHub.
pull/26959/head
Ming-Chuan 4 years ago committed by GitHub
parent f9581a6c4d
commit 0ef850772f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      src/core/ext/transport/binder/transport/BUILD
  2. 65
      src/core/ext/transport/binder/transport/binder_stream.h
  3. 463
      src/core/ext/transport/binder/transport/binder_transport.cc
  4. 85
      src/core/ext/transport/binder/transport/binder_transport.h
  5. 2
      src/core/ext/transport/binder/utils/BUILD
  6. 55
      src/core/ext/transport/binder/utils/transport_stream_receiver.h
  7. 216
      src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
  8. 59
      src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
  9. 4
      src/core/ext/transport/binder/wire_format/BUILD
  10. 4
      src/core/ext/transport/binder/wire_format/binder.h
  11. 69
      src/core/ext/transport/binder/wire_format/binder_android.cc
  12. 6
      src/core/ext/transport/binder/wire_format/binder_android.h
  13. 5
      src/core/ext/transport/binder/wire_format/wire_reader.h
  14. 101
      src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
  15. 80
      src/core/ext/transport/binder/wire_format/wire_reader_impl.h

@ -0,0 +1,48 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library")
licenses(["notice"])
package(
default_visibility = ["//visibility:public"],
)
grpc_cc_library(
name = "transport",
srcs = [
"binder_transport.cc",
],
hdrs = [
"binder_stream.h",
"binder_transport.h",
],
external_deps = [
"absl/strings",
"absl/memory",
],
deps = [
"//:gpr",
"//:gpr_base",
"//:grpc",
"//:grpc_base",
"//:grpc_base_c",
"//:grpc_codegen",
"//src/core/ext/transport/binder/utils:transport_stream_receiver",
"//src/core/ext/transport/binder/wire_format:binder",
"//src/core/ext/transport/binder/wire_format:wire_reader",
"//src/core/ext/transport/binder/wire_format:wire_writer",
],
)

@ -0,0 +1,65 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_STREAM_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_STREAM_H
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/transport/binder_transport.h"
// TODO(mingcl): Figure out if we want to use class instead of struct here
struct grpc_binder_stream {
// server_data will be null for client, and for server it will be whatever
// passed in to the accept_stream_fn callback by client.
grpc_binder_stream(grpc_binder_transport* t, grpc_core::Arena* arena,
const void* server_data, int tx_code, bool is_client)
: t(t), arena(arena), seq(0), tx_code(tx_code), is_client(is_client) {
if (!server_data) {
// The stream is a client-side stream. If we indeed know what the other
// end is (for example, in the testing envorinment), call the
// accept_stream_fn callback with server_data being "this" (currently we
// don't need the actual value of "this"; a non-null value should work).
grpc_binder_transport* server = t->other_end;
if (server && server->accept_stream_fn) {
(*server->accept_stream_fn)(server->accept_stream_user_data,
&server->base, this);
}
}
}
~grpc_binder_stream() = default;
int GetTxCode() { return tx_code; }
int GetThenIncSeq() { return seq++; }
grpc_binder_transport* t;
grpc_core::Arena* arena;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
int seq;
int tx_code;
bool is_client;
// TODO(waynetu): This should be guarded by a mutex.
absl::Status cancellation_error = absl::OkStatus();
// We store these fields passed from op batch, in order to access them through
// grpc_binder_stream
grpc_metadata_batch* recv_initial_metadata;
grpc_closure* recv_initial_metadata_ready = nullptr;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
grpc_closure* recv_message_ready = nullptr;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished = nullptr;
};
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_STREAM_H

@ -0,0 +1,463 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/impl/codegen/port_platform.h>
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/substitute.h"
#include "src/core/ext/transport/binder/transport/binder_stream.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_utils.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
#include "src/core/lib/transport/transport.h"
static int init_stream(grpc_transport* gt, grpc_stream* gs,
grpc_stream_refcount* refcount, const void* server_data,
grpc_core::Arena* arena) {
GPR_TIMER_SCOPE("init_stream", 0);
gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
server_data, arena);
grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
// TODO(mingcl): Figure out if we need to worry about concurrent invocation
// here
new (gs) grpc_binder_stream(t, arena, server_data, t->NewStreamTxCode(),
t->is_client);
return 0;
}
static void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* gp) {
gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, gp);
}
static void set_pollset_set(grpc_transport*, grpc_stream*, grpc_pollset_set*) {
gpr_log(GPR_INFO, __func__);
}
void AssignMetadata(grpc_metadata_batch* mb, grpc_core::Arena* arena,
const grpc_binder::Metadata& md) {
grpc_metadata_batch_init(mb);
for (auto& p : md) {
grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
arena->Alloc(sizeof(grpc_linked_mdelem)));
memset(glm, 0, sizeof(grpc_linked_mdelem));
grpc_slice key = grpc_slice_from_cpp_string(p.first);
grpc_slice value = grpc_slice_from_cpp_string(p.second);
glm->md = grpc_mdelem_from_slices(grpc_slice_intern(key),
grpc_slice_intern(value));
// Unref here to prevent memory leak
grpc_slice_unref_internal(key);
grpc_slice_unref_internal(value);
GPR_ASSERT(grpc_metadata_batch_link_tail(mb, glm) == GRPC_ERROR_NONE);
}
}
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
GPR_TIMER_SCOPE("perform_stream_op", 0);
gpr_log(GPR_INFO, "%s = %p %p %p", __func__, gt, gs, op);
grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
if (op->cancel_stream) {
// TODO(waynetu): Is this true?
GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
!op->send_trailing_metadata && !op->recv_initial_metadata &&
!op->recv_message && !op->recv_trailing_metadata);
gpr_log(GPR_INFO, "cancel_stream");
gpr_log(
GPR_ERROR, "cancel_stream error = %s",
grpc_error_std_string(op->payload->cancel_stream.cancel_error).c_str());
gbs->cancellation_error =
grpc_error_to_absl_status(op->payload->cancel_stream.cancel_error);
// Send trailing metadata to inform the other end about the cancellation,
// regardless if we'd already done that or not.
grpc_binder::Transaction cancel_tx(gbs->GetTxCode(), gbs->GetThenIncSeq(),
gbt->is_client);
cancel_tx.SetSuffix(grpc_binder::Metadata{});
absl::Status status = gbt->wire_writer->RpcCall(cancel_tx);
gbt->transport_stream_receiver->CancelStream(gbs->tx_code,
gbs->cancellation_error);
GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
if (op->on_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
absl_status_to_grpc_error(status));
gpr_log(GPR_INFO, "on_complete closure schuduled");
}
return;
}
std::unique_ptr<grpc_binder::Transaction> tx;
if (op->send_initial_metadata || op->send_message ||
op->send_trailing_metadata) {
// Only increment sequence number when there's a send operation.
tx = absl::make_unique<grpc_binder::Transaction>(
/*tx_code=*/gbs->GetTxCode(), /*seq_num=*/gbs->GetThenIncSeq(),
gbt->is_client);
}
if (op->send_initial_metadata && gbs->cancellation_error.ok()) {
gpr_log(GPR_INFO, "send_initial_metadata");
grpc_binder::Metadata init_md;
auto batch = op->payload->send_initial_metadata.send_initial_metadata;
GPR_ASSERT(tx);
for (grpc_linked_mdelem* md = batch->list.head; md != nullptr;
md = md->next) {
absl::string_view key =
grpc_core::StringViewFromSlice(GRPC_MDKEY(md->md));
absl::string_view value =
grpc_core::StringViewFromSlice(GRPC_MDVALUE(md->md));
gpr_log(GPR_INFO, "send initial metatday key-value %s",
absl::StrCat(key, " ", value).c_str());
if (grpc_slice_eq(GRPC_MDKEY(md->md), GRPC_MDSTR_PATH)) {
// TODO(b/192208403): Figure out if it is correct to simply drop '/'
// prefix and treat it as rpc method name
GPR_ASSERT(value[0] == '/');
std::string path = std::string(value).substr(1);
// Only client send method ref.
GPR_ASSERT(gbt->is_client);
tx->SetMethodRef(path);
} else {
init_md.emplace_back(std::string(key), std::string(value));
}
}
tx->SetPrefix(init_md);
}
if (op->send_message && gbs->cancellation_error.ok()) {
gpr_log(GPR_INFO, "send_message");
grpc_slice s;
bool next_result =
op->payload->send_message.send_message->Next(SIZE_MAX, nullptr);
gpr_log(GPR_INFO, "next_result = %d", static_cast<int>(next_result));
op->payload->send_message.send_message->Pull(&s);
auto* p = GRPC_SLICE_START_PTR(s);
int len = GRPC_SLICE_LENGTH(s);
std::string message_data(reinterpret_cast<char*>(p), len);
gpr_log(GPR_INFO, "message_data = %s", message_data.c_str());
GPR_ASSERT(tx);
tx->SetData(message_data);
// TODO(b/192369787): Are we supposed to reset here to avoid
// use-after-free issue in call.cc?
op->payload->send_message.send_message.reset();
grpc_slice_unref_internal(s);
}
if (op->send_trailing_metadata && gbs->cancellation_error.ok()) {
gpr_log(GPR_INFO, "send_trailing_metadata");
auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
grpc_binder::Metadata trailing_metadata;
GPR_ASSERT(tx);
for (grpc_linked_mdelem* md = batch->list.head; md != nullptr;
md = md->next) {
// Client will not send trailing metadata.
GPR_ASSERT(!gbt->is_client);
if (grpc_slice_eq(GRPC_MDKEY(md->md), GRPC_MDSTR_GRPC_STATUS)) {
int status = grpc_get_status_code_from_metadata(md->md);
gpr_log(GPR_INFO, "send trailing metadata status = %d", status);
tx->SetStatus(status);
} else {
absl::string_view key =
grpc_core::StringViewFromSlice(GRPC_MDKEY(md->md));
absl::string_view value =
grpc_core::StringViewFromSlice(GRPC_MDVALUE(md->md));
gpr_log(GPR_INFO, "send trailing metatday key-value %s",
absl::StrCat(key, " ", value).c_str());
trailing_metadata.emplace_back(std::string(key), std::string(value));
}
}
// TODO(mingcl): Will we ever has key-value pair here? According to
// wireformat client suffix data is always empty.
tx->SetSuffix(trailing_metadata);
if (op->payload->send_trailing_metadata.sent != nullptr) {
*op->payload->send_trailing_metadata.sent = true;
}
}
if (op->recv_initial_metadata) {
gpr_log(GPR_INFO, "recv_initial_metadata");
if (!gbs->cancellation_error.ok()) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
absl_status_to_grpc_error(gbs->cancellation_error));
} else {
gbs->recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
gbs->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
gbs->tx_code,
[gbs](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(gbs->recv_initial_metadata);
GPR_ASSERT(gbs->recv_initial_metadata_ready);
if (!initial_metadata.ok()) {
gpr_log(GPR_ERROR, "Failed to parse initial metadata");
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
absl_status_to_grpc_error(initial_metadata.status()));
return;
}
AssignMetadata(gbs->recv_initial_metadata, gbs->arena,
*initial_metadata);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
gbs->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
});
}
}
if (op->recv_message) {
gpr_log(GPR_INFO, "recv_message");
if (!gbs->cancellation_error.ok()) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, op->payload->recv_message.recv_message_ready,
absl_status_to_grpc_error(gbs->cancellation_error));
} else {
gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
gbs->recv_message = op->payload->recv_message.recv_message;
gbt->transport_stream_receiver->RegisterRecvMessage(
gbs->tx_code, [gbs](absl::StatusOr<std::string> message) {
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(gbs->recv_message);
GPR_ASSERT(gbs->recv_message_ready);
if (!message.ok()) {
gpr_log(GPR_ERROR, "Failed to receive message");
if (message.status().message() ==
grpc_binder::TransportStreamReceiver::
kGrpcBinderTransportCancelledGracefully) {
gpr_log(GPR_ERROR, "message cancelled gracefully");
// Cancelled because we've already received trailing metadata.
// It's not an error in this case.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
GRPC_ERROR_NONE);
} else {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, gbs->recv_message_ready,
absl_status_to_grpc_error(message.status()));
}
return;
}
grpc_slice_buffer buf;
grpc_slice_buffer_init(&buf);
grpc_slice_buffer_add(&buf, grpc_slice_from_cpp_string(*message));
gbs->sbs.Init(&buf, 0);
gbs->recv_message->reset(gbs->sbs.get());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready,
GRPC_ERROR_NONE);
});
}
}
if (op->recv_trailing_metadata) {
gpr_log(GPR_INFO, "recv_trailing_metadata");
if (!gbs->cancellation_error.ok()) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
absl_status_to_grpc_error(gbs->cancellation_error));
} else {
gbs->recv_trailing_metadata_finished =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
gbs->recv_trailing_metadata =
op->payload->recv_trailing_metadata.recv_trailing_metadata;
gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
gbs->tx_code,
[gbs](absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
int status) {
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(gbs->recv_trailing_metadata);
GPR_ASSERT(gbs->recv_trailing_metadata_finished);
if (!trailing_metadata.ok()) {
gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, gbs->recv_trailing_metadata_finished,
absl_status_to_grpc_error(trailing_metadata.status()));
return;
}
if (!gbs->is_client) {
// Client will not send non-empty trailing metadata.
if (!trailing_metadata.value().empty()) {
gpr_log(GPR_ERROR,
"Server receives non-empty trailing metadata.");
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
gbs->recv_trailing_metadata_finished,
GRPC_ERROR_CANCELLED);
return;
}
} else {
AssignMetadata(gbs->recv_trailing_metadata, gbs->arena,
*trailing_metadata);
// Append status to metadata
// TODO(b/192208695): See if we can avoid to manually put status
// code into the header
gpr_log(GPR_INFO, "status = %d", status);
grpc_linked_mdelem* glm = static_cast<grpc_linked_mdelem*>(
gbs->arena->Alloc(sizeof(grpc_linked_mdelem)));
glm->md = grpc_get_reffed_status_elem(status);
GPR_ASSERT(grpc_metadata_batch_link_tail(
gbs->recv_trailing_metadata, glm) ==
GRPC_ERROR_NONE);
gpr_log(GPR_ERROR, "trailing_metadata = %p",
gbs->recv_trailing_metadata);
gpr_log(GPR_ERROR, "glm = %p", glm);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
gbs->recv_trailing_metadata_finished,
GRPC_ERROR_NONE);
});
}
}
// Only send transaction when there's a send op presented.
absl::Status status = absl::OkStatus();
if (tx) {
status = gbt->wire_writer->RpcCall(*tx);
}
// Note that this should only be scheduled when all non-recv ops are
// completed
if (op->on_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
absl_status_to_grpc_error(status));
gpr_log(GPR_INFO, "on_complete closure schuduled");
}
}
static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
gpr_log(GPR_INFO, __func__);
grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
// TODO(waynetu): Should we lock here to avoid data race?
if (op->start_connectivity_watch != nullptr) {
gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
}
if (op->set_accept_stream) {
gbt->accept_stream_fn = op->set_accept_stream_fn;
gbt->accept_stream_user_data = op->set_accept_stream_user_data;
}
if (op->on_consumed) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
bool do_close = false;
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
do_close = true;
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
if (op->goaway_error != GRPC_ERROR_NONE) {
do_close = true;
GRPC_ERROR_UNREF(op->goaway_error);
}
if (do_close) {
gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
"transport closed due to disconnection/goaway");
}
}
static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
gpr_log(GPR_INFO, __func__);
grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
gbt->transport_stream_receiver->Clear(gbs->tx_code);
// TODO(waynetu): Currently, there's nothing to be cleaned up. If additional
// fields are added to grpc_binder_stream in the future, we might need to use
// reference-counting to determine who does the actual cleaning.
gbs->~grpc_binder_stream();
grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_transport* gt) {
gpr_log(GPR_INFO, __func__);
// TODO(waynetu): Should we do ref-counting here?
grpc_binder_transport* gbt = reinterpret_cast<grpc_binder_transport*>(gt);
delete gbt;
}
static grpc_endpoint* get_endpoint(grpc_transport*) {
gpr_log(GPR_INFO, __func__);
return nullptr;
}
// See grpc_transport_vtable declaration for meaning of each field
static const grpc_transport_vtable vtable = {sizeof(grpc_binder_stream),
"binder",
init_stream,
set_pollset,
set_pollset_set,
perform_stream_op,
perform_transport_op,
destroy_stream,
destroy_transport,
get_endpoint};
static const grpc_transport_vtable* get_vtable() { return &vtable; }
grpc_binder_transport::grpc_binder_transport(
std::unique_ptr<grpc_binder::Binder> binder, bool is_client)
: is_client(is_client),
state_tracker(is_client ? "binder_transport_client"
: "binder_transport_server") {
gpr_log(GPR_INFO, __func__);
base.vtable = get_vtable();
transport_stream_receiver =
std::make_shared<grpc_binder::TransportStreamReceiverImpl>(is_client);
wire_reader = grpc_core::MakeOrphanable<grpc_binder::WireReaderImpl>(
transport_stream_receiver, is_client);
wire_writer = wire_reader->SetupTransport(std::move(binder));
}
grpc_transport* grpc_create_binder_transport_client(
std::unique_ptr<grpc_binder::Binder> endpoint_binder) {
gpr_log(GPR_INFO, __func__);
grpc_binder_transport* t =
new grpc_binder_transport(std::move(endpoint_binder), /*is_client=*/true);
return &t->base;
}
grpc_transport* grpc_create_binder_transport_server(
std::unique_ptr<grpc_binder::Binder> client_binder) {
gpr_log(GPR_INFO, __func__);
gpr_log(GPR_ERROR,
"[WARNING] Creating server-side binder_transport is test-only.");
grpc_binder_transport* t =
new grpc_binder_transport(std::move(client_binder), /*is_client=*/false);
return &t->base;
}

@ -0,0 +1,85 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_TRANSPORT_H
#define GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_TRANSPORT_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/support/log.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
// TODO(mingcl): Consider putting the struct in a namespace (Eventually this
// depends on what style we want to follow)
// TODO(mingcl): Decide casing for this class name. Should we use C-style class
// name here or just go with C++ style?
struct grpc_binder_transport {
explicit grpc_binder_transport(std::unique_ptr<grpc_binder::Binder> binder,
bool is_client);
int NewStreamTxCode() {
// TODO(mingcl): Wrap around when all tx codes are used. "If we do detect a
// collision however, we will fail the new call with UNAVAILABLE, and shut
// down the transport gracefully."
GPR_ASSERT(next_free_tx_code <= LAST_CALL_TRANSACTION);
return next_free_tx_code++;
}
grpc_transport base; /* must be first */
std::shared_ptr<grpc_binder::TransportStreamReceiver>
transport_stream_receiver;
grpc_core::OrphanablePtr<grpc_binder::WireReader> wire_reader;
std::unique_ptr<grpc_binder::WireWriter> wire_writer;
bool is_client;
// The following fields are currently only for the in-memory end-to-end
// testing.
// TODO(waynetu): Figure out if we need these in the actual server environment
// or not.
// The other-end of the transport. Set when constructing client/server binders
// pair in the testing environment.
grpc_binder_transport* other_end = nullptr;
// The callback and the data for the callback when the stream is connected
// between client and server.
void (*accept_stream_fn)(void* user_data, grpc_transport* transport,
const void* server_data) = nullptr;
void* accept_stream_user_data = nullptr;
grpc_core::ConnectivityStateTracker state_tracker;
private:
int next_free_tx_code = grpc_binder::kFirstCallId;
};
grpc_transport* grpc_create_binder_transport_client(
std::unique_ptr<grpc_binder::Binder> endpoint_binder);
grpc_transport* grpc_create_binder_transport_server(
std::unique_ptr<grpc_binder::Binder> client_binder);
#endif // GRPC_CORE_EXT_TRANSPORT_BINDER_TRANSPORT_BINDER_TRANSPORT_H

@ -30,9 +30,9 @@ grpc_cc_library(
"transport_stream_receiver_impl.h",
],
external_deps = [
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
"absl/types:optional",
],
deps = [
"//:gpr",

@ -21,6 +21,7 @@
#include <string>
#include <vector>
#include "absl/status/statusor.h"
#include "src/core/ext/transport/binder/wire_format/transaction.h"
namespace grpc_binder {
@ -31,29 +32,47 @@ class TransportStreamReceiver {
public:
virtual ~TransportStreamReceiver() = default;
using InitialMetadataCallbackType =
std::function<void(absl::StatusOr<Metadata>)>;
using MessageDataCallbackType =
std::function<void(absl::StatusOr<std::string>)>;
using TrailingMetadataCallbackType =
std::function<void(absl::StatusOr<Metadata>, int)>;
// Only handles single time invocation. Callback object will be deleted.
// The callback should be valid until invocation or unregister.
virtual void RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) = 0;
// TODO(mingcl): Use string_view
virtual void RegisterRecvMessage(
StreamIdentifier id, std::function<void(const std::string&)> cb) = 0;
virtual void RegisterRecvInitialMetadata(StreamIdentifier id,
InitialMetadataCallbackType cb) = 0;
virtual void RegisterRecvMessage(StreamIdentifier id,
MessageDataCallbackType cb) = 0;
virtual void RegisterRecvTrailingMetadata(
StreamIdentifier id, std::function<void(const Metadata&, int)> cb) = 0;
// TODO(mingcl): Provide a way to unregister callback?
StreamIdentifier id, TrailingMetadataCallbackType cb) = 0;
// TODO(mingcl): Figure out how to handle the case where there is no callback
// registered for the stream. For now, I don't see this case happening in
// unary calls. So we would probably just crash the program for now.
// For streaming calls it does happen, for now we simply queue them.
virtual void NotifyRecvInitialMetadata(StreamIdentifier id,
const Metadata& initial_metadata) = 0;
// For the following functions, the second arguments are the transaction
// result received from the lower level. If it is None, that means there's
// something wrong when receiving the corresponding transaction. In such case,
// we should cancel the gRPC callback as well.
virtual void NotifyRecvInitialMetadata(
StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) = 0;
virtual void NotifyRecvMessage(StreamIdentifier id,
const std::string& message) = 0;
virtual void NotifyRecvTrailingMetadata(StreamIdentifier id,
const Metadata& trailing_metadata,
int status) = 0;
absl::StatusOr<std::string> message) = 0;
virtual void NotifyRecvTrailingMetadata(
StreamIdentifier id, absl::StatusOr<Metadata> trailing_metadata,
int status) = 0;
// Trailing metadata marks the end of one-side of the stream. Thus, after
// receiving trailing metadata from the other-end, we know that there will
// never be in-coming message data anymore, and all recv_message callbacks
// registered will never be satisfied. This function cancels all such
// callbacks gracefully (with GRPC_ERROR_NONE) to avoid being blocked waiting
// for them.
virtual void CancelRecvMessageCallbacksDueToTrailingMetadata(
StreamIdentifier id) = 0;
// Remove all entries associated with stream number `id`.
virtual void Clear(StreamIdentifier id) = 0;
virtual void CancelStream(StreamIdentifier id, absl::Status error) = 0;
static const absl::string_view kGrpcBinderTransportCancelledGracefully;
};
} // namespace grpc_binder

@ -23,67 +23,97 @@
#include <utility>
namespace grpc_binder {
const absl::string_view
TransportStreamReceiver::kGrpcBinderTransportCancelledGracefully =
"grpc-binder-transport: cancelled gracefully";
void TransportStreamReceiverImpl::RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
StreamIdentifier id, InitialMetadataCallbackType cb) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
GPR_ASSERT(initial_metadata_cbs_.count(id) == 0);
auto iter = pending_initial_metadata_.find(id);
if (iter == pending_initial_metadata_.end()) {
initial_metadata_cbs_[id] = std::move(cb);
} else {
cb(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_initial_metadata_.erase(iter);
absl::StatusOr<Metadata> initial_metadata{};
{
grpc_core::MutexLock l(&m_);
auto iter = pending_initial_metadata_.find(id);
if (iter == pending_initial_metadata_.end()) {
initial_metadata_cbs_[id] = std::move(cb);
cb = nullptr;
} else {
initial_metadata = std::move(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_initial_metadata_.erase(iter);
}
}
}
if (cb != nullptr) {
cb(std::move(initial_metadata));
}
}
void TransportStreamReceiverImpl::RegisterRecvMessage(
StreamIdentifier id, std::function<void(const std::string&)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
StreamIdentifier id, MessageDataCallbackType cb) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
GPR_ASSERT(message_cbs_.count(id) == 0);
auto iter = pending_message_.find(id);
if (iter == pending_message_.end()) {
message_cbs_[id] = std::move(cb);
} else {
cb(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_message_.erase(iter);
absl::StatusOr<std::string> message{};
{
grpc_core::MutexLock l(&m_);
auto iter = pending_message_.find(id);
if (iter == pending_message_.end()) {
// If we'd already received trailing-metadata and there's no pending
// messages, cancel the callback.
if (recv_message_cancelled_.count(id)) {
cb(absl::CancelledError(
TransportStreamReceiver::kGrpcBinderTransportCancelledGracefully));
} else {
message_cbs_[id] = std::move(cb);
}
cb = nullptr;
} else {
// We'll still keep all pending messages received before the trailing
// metadata since they're issued before the end of stream, as promised by
// WireReader which keeps transactions commit in-order.
message = std::move(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_message_.erase(iter);
}
}
}
if (cb != nullptr) {
cb(std::move(message));
}
}
void TransportStreamReceiverImpl::RegisterRecvTrailingMetadata(
StreamIdentifier id, std::function<void(const Metadata&, int)> cb) {
// TODO(mingcl): Don't lock the whole function
grpc_core::MutexLock l(&m_);
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
StreamIdentifier id, TrailingMetadataCallbackType cb) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
GPR_ASSERT(trailing_metadata_cbs_.count(id) == 0);
auto iter = pending_trailing_metadata_.find(id);
if (iter == pending_trailing_metadata_.end()) {
trailing_metadata_cbs_[id] = std::move(cb);
} else {
{
const auto& p = iter->second.front();
cb(p.first, p.second);
}
iter->second.pop();
if (iter->second.empty()) {
pending_trailing_metadata_.erase(iter);
std::pair<absl::StatusOr<Metadata>, int> trailing_metadata{};
{
grpc_core::MutexLock l(&m_);
auto iter = pending_trailing_metadata_.find(id);
if (iter == pending_trailing_metadata_.end()) {
trailing_metadata_cbs_[id] = std::move(cb);
cb = nullptr;
} else {
trailing_metadata = std::move(iter->second.front());
iter->second.pop();
if (iter->second.empty()) {
pending_trailing_metadata_.erase(iter);
}
}
}
if (cb != nullptr) {
cb(std::move(trailing_metadata.first), trailing_metadata.second);
}
}
void TransportStreamReceiverImpl::NotifyRecvInitialMetadata(
StreamIdentifier id, const Metadata& initial_metadata) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const Metadata&)> cb;
StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
InitialMetadataCallbackType cb;
{
grpc_core::MutexLock l(&m_);
auto iter = initial_metadata_cbs_.find(id);
@ -91,18 +121,17 @@ void TransportStreamReceiverImpl::NotifyRecvInitialMetadata(
cb = iter->second;
initial_metadata_cbs_.erase(iter);
} else {
pending_initial_metadata_[id].push(initial_metadata);
pending_initial_metadata_[id].push(std::move(initial_metadata));
return;
}
}
if (cb != nullptr) {
cb(initial_metadata);
}
cb(std::move(initial_metadata));
}
void TransportStreamReceiverImpl::NotifyRecvMessage(
StreamIdentifier id, const std::string& message) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const std::string&)> cb;
StreamIdentifier id, absl::StatusOr<std::string> message) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
MessageDataCallbackType cb;
{
grpc_core::MutexLock l(&m_);
auto iter = message_cbs_.find(id);
@ -110,18 +139,23 @@ void TransportStreamReceiverImpl::NotifyRecvMessage(
cb = iter->second;
message_cbs_.erase(iter);
} else {
pending_message_[id].push(message);
pending_message_[id].push(std::move(message));
return;
}
}
if (cb != nullptr) {
cb(message);
}
cb(std::move(message));
}
void TransportStreamReceiverImpl::NotifyRecvTrailingMetadata(
StreamIdentifier id, const Metadata& trailing_metadata, int status) {
gpr_log(GPR_ERROR, "%s id = %d", __func__, id);
std::function<void(const Metadata&, int)> cb;
StreamIdentifier id, absl::StatusOr<Metadata> trailing_metadata,
int status) {
// Trailing metadata mark the end of the stream. Since TransportStreamReceiver
// assumes in-order commitments of transactions and that trailing metadata is
// parsed after message data, we can safely cancel all upcoming callbacks of
// recv_message.
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
CancelRecvMessageCallbacksDueToTrailingMetadata(id);
TrailingMetadataCallbackType cb;
{
grpc_core::MutexLock l(&m_);
auto iter = trailing_metadata_cbs_.find(id);
@ -129,11 +163,77 @@ void TransportStreamReceiverImpl::NotifyRecvTrailingMetadata(
cb = iter->second;
trailing_metadata_cbs_.erase(iter);
} else {
pending_trailing_metadata_[id].emplace(trailing_metadata, status);
pending_trailing_metadata_[id].emplace(std::move(trailing_metadata),
status);
return;
}
}
cb(std::move(trailing_metadata), status);
}
void TransportStreamReceiverImpl::
CancelRecvMessageCallbacksDueToTrailingMetadata(StreamIdentifier id) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
MessageDataCallbackType cb = nullptr;
{
grpc_core::MutexLock l(&m_);
auto iter = message_cbs_.find(id);
if (iter != message_cbs_.end()) {
cb = std::move(iter->second);
message_cbs_.erase(iter);
}
recv_message_cancelled_.insert(id);
}
if (cb != nullptr) {
cb(trailing_metadata, status);
// The registered callback will never be satisfied. Cancel it.
cb(absl::CancelledError(
TransportStreamReceiver::kGrpcBinderTransportCancelledGracefully));
}
}
void TransportStreamReceiverImpl::CancelStream(StreamIdentifier id,
absl::Status error) {
InitialMetadataCallbackType initial_metadata_callback = nullptr;
MessageDataCallbackType message_data_callback = nullptr;
TrailingMetadataCallbackType trailing_metadata_callback = nullptr;
{
grpc_core::MutexLock l(&m_);
auto initial_metadata_iter = initial_metadata_cbs_.find(id);
if (initial_metadata_iter != initial_metadata_cbs_.end()) {
initial_metadata_callback = std::move(initial_metadata_iter->second);
initial_metadata_cbs_.erase(initial_metadata_iter);
}
auto message_data_iter = message_cbs_.find(id);
if (message_data_iter != message_cbs_.end()) {
message_data_callback = std::move(message_data_iter->second);
message_cbs_.erase(message_data_iter);
}
auto trailing_metadata_iter = trailing_metadata_cbs_.find(id);
if (trailing_metadata_iter != trailing_metadata_cbs_.end()) {
trailing_metadata_callback = std::move(trailing_metadata_iter->second);
trailing_metadata_cbs_.erase(trailing_metadata_iter);
}
}
if (initial_metadata_callback != nullptr) {
initial_metadata_callback(error);
}
if (message_data_callback != nullptr) {
message_data_callback(error);
}
if (trailing_metadata_callback != nullptr) {
trailing_metadata_callback(error, 0);
}
}
void TransportStreamReceiverImpl::Clear(StreamIdentifier id) {
gpr_log(GPR_ERROR, "%s id = %d is_client = %d", __func__, id, is_client_);
grpc_core::MutexLock l(&m_);
initial_metadata_cbs_.erase(id);
message_cbs_.erase(id);
trailing_metadata_cbs_.erase(id);
recv_message_cancelled_.erase(id);
pending_initial_metadata_.erase(id);
pending_message_.erase(id);
pending_trailing_metadata_.erase(id);
}
} // namespace grpc_binder

@ -20,10 +20,10 @@
#include <functional>
#include <map>
#include <queue>
#include <set>
#include <string>
#include <vector>
#include "absl/types/optional.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/lib/gprpp/sync.h"
@ -32,27 +32,31 @@ namespace grpc_binder {
// Routes the data received from transport to corresponding streams
class TransportStreamReceiverImpl : public TransportStreamReceiver {
public:
void RegisterRecvInitialMetadata(
StreamIdentifier id, std::function<void(const Metadata&)> cb) override;
explicit TransportStreamReceiverImpl(bool is_client)
: is_client_(is_client) {}
void RegisterRecvInitialMetadata(StreamIdentifier id,
InitialMetadataCallbackType cb) override;
void RegisterRecvMessage(StreamIdentifier id,
std::function<void(const std::string&)> cb) override;
void RegisterRecvTrailingMetadata(
StreamIdentifier id,
std::function<void(const Metadata&, int)> cb) override;
void NotifyRecvInitialMetadata(StreamIdentifier id,
const Metadata& initial_metadata) override;
MessageDataCallbackType cb) override;
void RegisterRecvTrailingMetadata(StreamIdentifier id,
TrailingMetadataCallbackType cb) override;
void NotifyRecvInitialMetadata(
StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) override;
void NotifyRecvMessage(StreamIdentifier id,
const std::string& message) override;
absl::StatusOr<std::string> message) override;
void NotifyRecvTrailingMetadata(StreamIdentifier id,
const Metadata& trailing_metadata,
absl::StatusOr<Metadata> trailing_metadata,
int status) override;
void CancelRecvMessageCallbacksDueToTrailingMetadata(
StreamIdentifier id) override;
void Clear(StreamIdentifier id) override;
void CancelStream(StreamIdentifier, absl::Status error) override;
private:
std::map<StreamIdentifier, std::function<void(const Metadata&)>>
initial_metadata_cbs_;
std::map<StreamIdentifier, std::function<void(const std::string&)>>
message_cbs_;
std::map<StreamIdentifier, std::function<void(const Metadata&, int)>>
std::map<StreamIdentifier, InitialMetadataCallbackType> initial_metadata_cbs_;
std::map<StreamIdentifier, MessageDataCallbackType> message_cbs_;
std::map<StreamIdentifier, TrailingMetadataCallbackType>
trailing_metadata_cbs_;
// TODO(waynetu): Better thread safety design. For example, use separate
// mutexes for different type of messages.
@ -69,12 +73,25 @@ class TransportStreamReceiverImpl : public TransportStreamReceiver {
// messages will be re-ordered by it. In such case, the queueing approach will
// work fine. Refer to the TODO in WireWriterImpl::ProcessTransaction() at
// wire_reader_impl.cc for detecting and resolving out-of-order transactions.
std::map<StreamIdentifier, std::queue<Metadata>> pending_initial_metadata_
ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier, std::queue<std::string>> pending_message_
ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier, std::queue<std::pair<Metadata, int>>>
//
// TODO(waynetu): Use absl::flat_hash_map.
std::map<StreamIdentifier, std::queue<absl::StatusOr<Metadata>>>
pending_initial_metadata_ ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier, std::queue<absl::StatusOr<std::string>>>
pending_message_ ABSL_GUARDED_BY(m_);
std::map<StreamIdentifier,
std::queue<std::pair<absl::StatusOr<Metadata>, int>>>
pending_trailing_metadata_ ABSL_GUARDED_BY(m_);
// Record whether or not the recv_message callbacks of a given stream is
// cancelled. Although we explicitly cancel the registered recv_message() in
// CancelRecvMessageCallbacksDueToTrailingMetadata(), there are chances that
// the registration comes "after" we receive trailing metadata. Therefore,
// when RegisterRecvMessage() gets called, we should check whether
// recv_message_cancelled_ contains the corresponding stream ID, and if so,
// directly cancel the callback gracefully without pending it.
std::set<StreamIdentifier> recv_message_cancelled_ ABSL_GUARDED_BY(m_);
bool is_client_;
};
} // namespace grpc_binder

@ -53,6 +53,7 @@ grpc_cc_library(
],
deps = [
"//:gpr",
"//:orphanable",
],
)
@ -70,6 +71,7 @@ grpc_cc_library(
],
deps = [
":binder",
":wire_reader",
],
)
@ -80,7 +82,6 @@ grpc_cc_library(
external_deps = [
"absl/strings",
],
visibility = ["//visibility:public"],
deps = [
":binder",
":transaction",
@ -106,6 +107,7 @@ grpc_cc_library(
":binder",
":wire_writer",
"//:gpr",
"//:orphanable",
"//src/core/ext/transport/binder/utils:transport_stream_receiver",
],
)

@ -25,6 +25,7 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "src/core/ext/transport/binder/wire_format/binder_constants.h"
#include "src/core/lib/gprpp/orphanable.h"
namespace grpc_binder {
@ -80,6 +81,8 @@ class TransactionReceiver : public HasRawBinder {
~TransactionReceiver() override = default;
};
class WireReader;
class Binder : public HasRawBinder {
public:
~Binder() override = default;
@ -93,6 +96,7 @@ class Binder : public HasRawBinder {
// TODO(waynetu): Can we decouple the receiver from the binder?
virtual std::unique_ptr<TransactionReceiver> ConstructTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb transact_cb) const = 0;
};

@ -26,16 +26,30 @@
#include "absl/strings/str_cat.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_binder {
namespace {
struct AtomicCallback {
explicit AtomicCallback(void* callback) : mu{}, callback(callback) {}
grpc_core::Mutex mu;
void* callback ABSL_GUARDED_BY(mu);
struct BinderUserData {
explicit BinderUserData(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb* callback)
: wire_reader_ref(wire_reader_ref), callback(callback) {}
grpc_core::RefCountedPtr<WireReader> wire_reader_ref;
TransactionReceiver::OnTransactCb* callback;
};
struct OnCreateArgs {
grpc_core::RefCountedPtr<WireReader> wire_reader_ref;
TransactionReceiver::OnTransactCb* callback;
};
void* f_onCreate_with_mutex(void* callback) {
return new AtomicCallback(callback);
void* f_onCreate_userdata(void* data) {
auto* args = static_cast<OnCreateArgs*>(data);
return new BinderUserData(args->wire_reader_ref, args->callback);
}
void f_onDestroy_delete(void* data) {
auto* user_data = static_cast<BinderUserData*>(data);
delete user_data;
}
void* f_onCreate_noop(void* args) { return nullptr; }
@ -46,59 +60,50 @@ binder_status_t f_onTransact(AIBinder* binder, transaction_code_t code,
const AParcel* in, AParcel* out) {
gpr_log(GPR_INFO, __func__);
gpr_log(GPR_INFO, "tx code = %u", code);
auto* user_data =
reinterpret_cast<AtomicCallback*>(AIBinder_getUserData(binder));
grpc_core::MutexLock lock(&user_data->mu);
// TODO(waynetu): What should be returned here?
if (!user_data->callback) return STATUS_OK;
auto* callback =
reinterpret_cast<grpc_binder::TransactionReceiver::OnTransactCb*>(
user_data->callback);
auto* user_data = static_cast<BinderUserData*>(AIBinder_getUserData(binder));
TransactionReceiver::OnTransactCb* callback = user_data->callback;
// Wrap the parcel in a ReadableParcel.
std::unique_ptr<grpc_binder::ReadableParcel> output =
absl::make_unique<grpc_binder::ReadableParcelAndroid>(in);
std::unique_ptr<ReadableParcel> output =
absl::make_unique<ReadableParcelAndroid>(in);
// The lock should be released "after" the callback finishes.
absl::Status status = (*callback)(code, output.get());
return status.ok() ? STATUS_OK : STATUS_UNKNOWN_ERROR;
}
} // namespace
namespace grpc_binder {
ndk::SpAIBinder FromJavaBinder(JNIEnv* jni_env, jobject binder) {
return ndk::SpAIBinder(AIBinder_fromJavaBinder(jni_env, binder));
}
TransactionReceiverAndroid::TransactionReceiverAndroid(OnTransactCb transact_cb)
TransactionReceiverAndroid::TransactionReceiverAndroid(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
OnTransactCb transact_cb)
: transact_cb_(transact_cb) {
// TODO(mingcl): For now interface descriptor is always empty, figure out if
// we want it to be something more meaningful (we can probably manually change
// interface descriptor by modifying Java code's reply to
// os.IBinder.INTERFACE_TRANSACTION)
AIBinder_Class* aibinder_class = AIBinder_Class_define(
/*interfaceDescriptor=*/"", f_onCreate_with_mutex, f_onDestroy_noop,
/*interfaceDescriptor=*/"", f_onCreate_userdata, f_onDestroy_delete,
f_onTransact);
// Pass the on-transact callback to the on-create function of the binder. The
// on-create function equips the callback with a mutex and gives it to the
// user data stored in the binder which can be retrieved later.
binder_ = AIBinder_new(aibinder_class, &transact_cb_);
// Also Ref() (called implicitly by the copy constructor of RefCountedPtr) the
// wire reader so that it would not be destructed during the callback
// invocation.
OnCreateArgs args;
args.wire_reader_ref = wire_reader_ref;
args.callback = &transact_cb_;
binder_ = AIBinder_new(aibinder_class, &args);
GPR_ASSERT(binder_);
gpr_log(GPR_INFO, "AIBinder_associateClass = %d",
static_cast<int>(AIBinder_associateClass(binder_, aibinder_class)));
}
TransactionReceiverAndroid::~TransactionReceiverAndroid() {
auto* user_data =
reinterpret_cast<AtomicCallback*>(AIBinder_getUserData(binder_));
{
grpc_core::MutexLock lock(&user_data->mu);
// Set the callback to null so that future calls to on-trasact are awared
// that the transaction receiver had been deallocated.
user_data->callback = nullptr;
}
// Release the binder.
AIBinder_decStrong(binder_);
}
@ -143,8 +148,10 @@ absl::Status BinderAndroid::Transact(BinderTransportTxCode tx_code) {
}
std::unique_ptr<TransactionReceiver> BinderAndroid::ConstructTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb transact_cb) const {
return absl::make_unique<TransactionReceiverAndroid>(transact_cb);
return absl::make_unique<TransactionReceiverAndroid>(wire_reader_ref,
transact_cb);
}
int32_t WritableParcelAndroid::GetDataPosition() const {

@ -29,6 +29,7 @@
#include "absl/memory/memory.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
// TODO(b/192208764): move this check to somewhere else
#if __ANDROID_API__ < 29
@ -102,6 +103,7 @@ class BinderAndroid final : public Binder {
};
std::unique_ptr<TransactionReceiver> ConstructTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb transact_cb) const override;
private:
@ -112,7 +114,9 @@ class BinderAndroid final : public Binder {
class TransactionReceiverAndroid final : public TransactionReceiver {
public:
explicit TransactionReceiverAndroid(OnTransactCb transaction_cb);
TransactionReceiverAndroid(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
OnTransactCb transaction_cb);
~TransactionReceiverAndroid() override;
void* GetRawBinder() override { return binder_; }

@ -22,12 +22,13 @@
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/gprpp/orphanable.h"
namespace grpc_binder {
class WireReader {
class WireReader : public grpc_core::InternallyRefCounted<WireReader> {
public:
virtual ~WireReader() = default;
~WireReader() override = default;
virtual std::unique_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> endpoint_binder) = 0;
};

@ -64,31 +64,32 @@ absl::StatusOr<Metadata> parse_metadata(const ReadableParcel* reader) {
} // namespace
WireReaderImpl::WireReaderImpl(
TransportStreamReceiver* transport_stream_receiver, bool is_client)
: transport_stream_receiver_(transport_stream_receiver),
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver,
bool is_client)
: transport_stream_receiver_(std::move(transport_stream_receiver)),
is_client_(is_client) {}
WireReaderImpl::~WireReaderImpl() = default;
std::unique_ptr<WireWriter> WireReaderImpl::SetupTransport(
std::unique_ptr<Binder> binder) {
gpr_log(GPR_INFO, "Setting up transport");
if (!is_client_) {
gpr_log(GPR_ERROR, "Server-side SETUP_TRANSPORT is not implemented yet.");
return nullptr;
gpr_log(GPR_ERROR,
"[WARNING] Server-side setup transport is currently test-only");
SendSetupTransport(binder.get());
return absl::make_unique<WireWriterImpl>(std::move(binder));
} else {
SendSetupTransport(binder.get());
auto other_end_binder = RecvSetupTransport();
return absl::make_unique<WireWriterImpl>(std::move(other_end_binder));
}
}
gpr_log(GPR_INFO, "Setting up transport");
void WireReaderImpl::SendSetupTransport(Binder* binder) {
binder->Initialize();
gpr_log(GPR_INFO, "prepare transaction = %d",
binder->PrepareTransaction().ok());
// Only support client-side transport setup.
SendSetupTransport(binder.get());
RecvSetupTransport();
return absl::make_unique<WireWriterImpl>(std::move(other_end_binder_));
}
void WireReaderImpl::SendSetupTransport(Binder* binder) {
WritableParcel* writable_parcel = binder->GetWritableParcel();
gpr_log(GPR_INFO, "data position = %d", writable_parcel->GetDataPosition());
// gpr_log(GPR_INFO, "set data position to 0 = %d",
@ -101,7 +102,11 @@ void WireReaderImpl::SendSetupTransport(Binder* binder) {
// The lifetime of the transaction receiver is the same as the wire writer's.
// The transaction receiver is responsible for not calling the on-transact
// callback when it's dead.
// Give TransactionReceiver a Ref() since WireReader cannot be destructed
// during callback execution. TransactionReceiver should make sure that the
// callback owns a Ref() when it's being invoked.
tx_receiver_ = binder->ConstructTxReceiver(
Ref(),
[this](transaction_code_t code, const ReadableParcel* readable_parcel) {
return this->ProcessTransaction(code, readable_parcel);
});
@ -113,12 +118,13 @@ void WireReaderImpl::SendSetupTransport(Binder* binder) {
binder->Transact(BinderTransportTxCode::SETUP_TRANSPORT).ok());
}
void WireReaderImpl::RecvSetupTransport() {
std::unique_ptr<Binder> WireReaderImpl::RecvSetupTransport() {
// TODO(b/191941760): avoid blocking, handle wire_writer_noti lifetime
// better
gpr_log(GPR_INFO, "start waiting for noti");
connection_noti_.WaitForNotification();
gpr_log(GPR_INFO, "end waiting for noti");
return std::move(other_end_binder_);
}
absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
@ -152,6 +158,9 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
gpr_log(GPR_INFO, "version = %d", version);
std::unique_ptr<Binder> binder{};
RETURN_IF_ERROR(parcel->ReadBinder(&binder));
if (!binder) {
return absl::InternalError("Read NULL binder from the parcel");
}
binder->Initialize();
other_end_binder_ = std::move(binder);
connection_noti_.Notify();
@ -160,8 +169,7 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
case BinderTransportTxCode::SHUTDOWN_TRANSPORT: {
gpr_log(GPR_ERROR,
"Received SHUTDOWN_TRANSPORT request but not implemented yet.");
GPR_ASSERT(false);
break;
return absl::UnimplementedError("SHUTDOWN_TRANSPORT");
}
case BinderTransportTxCode::ACKNOWLEDGE_BYTES: {
int num_bytes = -1;
@ -170,7 +178,9 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
break;
}
case BinderTransportTxCode::PING: {
GPR_ASSERT(!is_client_);
if (is_client_) {
return absl::FailedPreconditionError("Receive PING request in client");
}
int ping_id = -1;
RETURN_IF_ERROR(parcel->ReadInt32(&ping_id));
gpr_log(GPR_INFO, "received ping id = %d", ping_id);
@ -189,16 +199,52 @@ absl::Status WireReaderImpl::ProcessTransaction(transaction_code_t code,
absl::Status WireReaderImpl::ProcessStreamingTransaction(
transaction_code_t code, const ReadableParcel* parcel) {
// Indicate which callbacks should be cancelled. It will be initialized as the
// flags the in-coming transaction carries, and when a particular callback is
// completed, the corresponding bit in cancellation_flag will be set to 0 so
// that we won't cancel it afterward.
int cancellation_flags = 0;
absl::Status status =
ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags);
if (!status.ok()) {
gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s",
status.ToString().c_str());
// Something went wrong when receiving transaction. Cancel failed requests.
if (cancellation_flags & kFlagPrefix) {
gpr_log(GPR_ERROR, "cancelling initial metadata");
transport_stream_receiver_->NotifyRecvInitialMetadata(code, status);
}
if (cancellation_flags & kFlagMessageData) {
gpr_log(GPR_ERROR, "cancelling message data");
transport_stream_receiver_->NotifyRecvMessage(code, status);
}
if (cancellation_flags & kFlagSuffix) {
gpr_log(GPR_ERROR, "cancelling trailing metadata");
transport_stream_receiver_->NotifyRecvTrailingMetadata(code, status, 0);
}
}
return status;
}
absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
transaction_code_t code, const ReadableParcel* parcel,
int* cancellation_flags) {
GPR_ASSERT(cancellation_flags);
int flags;
RETURN_IF_ERROR(parcel->ReadInt32(&flags));
gpr_log(GPR_INFO, "flags = %d", flags);
*cancellation_flags = flags;
// Ignore in-coming transaction with flag = 0 to match with Java
// implementation.
// TODO(waynetu): Check with grpc-java team to see whether this is the
// intended behavior.
// TODO(waynetu): What should be returned here?
if (flags == 0) return absl::OkStatus();
if (flags == 0) {
gpr_log(GPR_INFO, "[WARNING] Receive empty transaction. Ignored.");
return absl::OkStatus();
}
int status = flags >> 16;
gpr_log(GPR_INFO, "status = %d", status);
@ -212,9 +258,10 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
// out-of-order or missing transactions. WireReaderImpl should be fixed if
// we indeed found such behavior.
int32_t& expectation = expected_seq_num_[code];
// TODO(mingcl): Don't assert here
GPR_ASSERT(seq_num >= 0);
GPR_ASSERT(seq_num == expectation && "Interleaved sequence number");
if (seq_num < 0 || seq_num != expectation) {
// Unexpected sequence number.
return absl::InternalError("Unexpected sequence number");
}
// TODO(waynetu): According to the protocol, "The sequence number will wrap
// around to 0 if more than 2^31 messages are sent." For now we'll just
// assert that it never reach such circumstances.
@ -224,6 +271,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
gpr_log(GPR_INFO, "sequence number = %d", seq_num);
if (flags & kFlagPrefix) {
char method_ref[111];
memset(method_ref, 0, sizeof(method_ref));
if (!is_client_) {
RETURN_IF_ERROR(parcel->ReadString(method_ref));
}
@ -232,10 +280,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
return initial_metadata_or_error.status();
}
if (!is_client_) {
initial_metadata_or_error->emplace_back(":path", method_ref);
initial_metadata_or_error->emplace_back(":path",
std::string("/") + method_ref);
}
transport_stream_receiver_->NotifyRecvInitialMetadata(
code, *initial_metadata_or_error);
*cancellation_flags &= ~kFlagPrefix;
}
if (flags & kFlagMessageData) {
int count;
@ -246,12 +296,14 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
RETURN_IF_ERROR(parcel->ReadByteArray(&msg_data));
}
gpr_log(GPR_INFO, "msg_data = %s", msg_data.c_str());
transport_stream_receiver_->NotifyRecvMessage(code, msg_data);
transport_stream_receiver_->NotifyRecvMessage(code, std::move(msg_data));
*cancellation_flags &= ~kFlagMessageData;
}
if (flags & kFlagSuffix) {
if (flags & kFlagStatusDescription) {
// FLAG_STATUS_DESCRIPTION set
char desc[111];
memset(desc, 0, sizeof(desc));
RETURN_IF_ERROR(parcel->ReadString(desc));
gpr_log(GPR_INFO, "description = %s", desc);
}
@ -265,7 +317,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
trailing_metadata = *trailing_metadata_or_error;
}
transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, trailing_metadata, status);
code, std::move(trailing_metadata), status);
*cancellation_flags &= ~kFlagSuffix;
}
return absl::OkStatus();
}

@ -31,40 +31,76 @@ namespace grpc_binder {
class WireReaderImpl : public WireReader {
public:
explicit WireReaderImpl(TransportStreamReceiver* transport_stream_receiver,
bool is_client);
explicit WireReaderImpl(
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver,
bool is_client);
~WireReaderImpl() override;
// Setup the transport between endpoint binders.
//
// The client and the server both call SetupTransport() when constructing
// transport.
//
// High-level overview of transaction setup:
// 0. Client obtain an |endpoint_binder| from the server.
// 1. Client creates a binder |client_binder| and hook its on-transaction
// callback to client's ProcessTransaction(). Client then sends
// |client_binder| through |endpoint_binder| to server.
// 2. Server receives |client_binder| via |endpoint_binder|.
// 3. Server creates a binder |server_binder| and hook its on-transaction
// callback to server's ProcessTransaction(). Server then sends
// |server_binder| through |client_binder| back to the client.
// 4. Client receives |server_binder| via |client_binder|'s on-transaction
// callback.
void Orphan() override { Unref(); }
/// Setup the transport between endpoint binders.
///
/// The client and the server both call SetupTransport() when constructing
/// transport.
///
/// High-level overview of transaction setup:
/// 0. Client obtains an |endpoint_binder| from the server (in the Android
/// setting, this can be achieved by "binding" to the server APK).
/// 1. Client creates a binder |client_binder| and hook its on-transaction
/// callback to client's ProcessTransaction(). Client then sends
/// |client_binder| through |endpoint_binder| to server.
/// 2. Server receives |client_binder| via |endpoint_binder|.
/// 3. Server creates a binder |server_binder| and hook its on-transaction
/// callback to server's ProcessTransaction(). Server then sends
/// |server_binder| through |client_binder| back to the client.
/// 4. Client receives |server_binder| via |client_binder|'s on-transaction
/// callback.
///
/// The parameter \p binder here means different things for client nad server.
/// For client, \p binder refers to |endpoint_binder|, and for server, \p
/// binder refers to |client_binder|. That is, for server-side transport
/// setup, we assume that the first half of SETUP_TRANSPORT (up to step 2) is
/// already done somewhere else (see test/end2end/binder_transport_test.cc for
/// how it's handled in the testing environment).
std::unique_ptr<WireWriter> SetupTransport(
std::unique_ptr<Binder> endpoint_binder) override;
std::unique_ptr<Binder> binder) override;
absl::Status ProcessTransaction(transaction_code_t code,
const ReadableParcel* parcel);
private:
/// Send SETUP_TRANSPORT request through \p binder.
///
/// This is the one half (for client it's the first half, and for server it's
/// the second) of the SETUP_TRANSPORT negotiation process. First, a new
/// binder is created. We take its "receiving" part and construct the
/// transaction receiver with it, and sends the "sending" part along with the
/// SETUP_TRANSPORT message through \p binder.
void SendSetupTransport(Binder* binder);
void RecvSetupTransport();
/// Recv SETUP_TRANSPORT request.
///
/// This is the other half of the SETUP_TRANSPORT process. We wait for
/// in-coming SETUP_TRANSPORT request with the "sending" part of a binder from
/// the other end. For client, the message is coming from the trasnaction
/// receiver we just constructed in SendSetupTransport(). For server, we
/// assume that this step is already completed.
// TODO(waynetu): In the testing environment, we still use this method (on
// another WireReader instance) for server-side transport setup, and thus it
// is marked as public. Try moving this method back to private, and hopefully
// we can also avoid moving |other_end_binder_| out in the implementation.
std::unique_ptr<Binder> RecvSetupTransport();
private:
absl::Status ProcessStreamingTransaction(transaction_code_t code,
const ReadableParcel* parcel);
absl::Status ProcessStreamingTransactionImpl(transaction_code_t code,
const ReadableParcel* parcel,
int* cancellation_flags);
TransportStreamReceiver* transport_stream_receiver_;
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
absl::Notification connection_noti_;
// NOTE: other_end_binder_ will be moved out when RecvSetupTransport() is
// called. Be cautious not to access it afterward.
std::unique_ptr<Binder> other_end_binder_;
absl::flat_hash_map<transaction_code_t, int32_t> expected_seq_num_;
std::unique_ptr<TransactionReceiver> tx_receiver_;

Loading…
Cancel
Save