binder: rename gbs to stream and gbt to transport (#35357)

Per https://google.github.io/styleguide/cppguide.html#General_Naming_Rules
- Use "names that would be clear even to people on a different team".
- "Minimize the use of abbreviations ... (especially acronyms and initialisms)"
- "Do not worry about saving horizontal space"

Also supports rename of grpc_binder_stream and grpc_binder_transport in an upcoming PR.

Closes #35357

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35357 from jdcormie:rename bef505c760
PiperOrigin-RevId: 592578920
test_589060063
John Cormie 11 months ago committed by Copybara-Service
parent 7fd3277e4e
commit 971c506025
  1. 28
      src/core/ext/transport/binder/transport/binder_stream.h
  2. 362
      src/core/ext/transport/binder/transport/binder_transport.cc

@ -20,30 +20,30 @@
#include "src/core/ext/transport/binder/transport/binder_transport.h"
struct RecvInitialMetadataArgs {
grpc_binder_stream* gbs;
grpc_binder_transport* gbt;
grpc_binder_stream* stream;
grpc_binder_transport* transport;
int tx_code;
absl::StatusOr<grpc_binder::Metadata> initial_metadata;
};
struct RecvMessageArgs {
grpc_binder_stream* gbs;
grpc_binder_transport* gbt;
grpc_binder_stream* stream;
grpc_binder_transport* transport;
int tx_code;
absl::StatusOr<std::string> message;
};
struct RecvTrailingMetadataArgs {
grpc_binder_stream* gbs;
grpc_binder_transport* gbt;
grpc_binder_stream* stream;
grpc_binder_transport* transport;
int tx_code;
absl::StatusOr<grpc_binder::Metadata> trailing_metadata;
int status;
};
struct RegisterStreamArgs {
grpc_binder_stream* gbs;
grpc_binder_transport* gbt;
grpc_binder_stream* stream;
grpc_binder_transport* transport;
};
// TODO(mingcl): Figure out if we want to use class instead of struct here
@ -59,12 +59,12 @@ struct grpc_binder_stream {
tx_code(tx_code),
is_client(is_client),
is_closed(false) {
recv_initial_metadata_args.gbs = this;
recv_initial_metadata_args.gbt = t;
recv_message_args.gbs = this;
recv_message_args.gbt = t;
recv_trailing_metadata_args.gbs = this;
recv_trailing_metadata_args.gbt = t;
recv_initial_metadata_args.stream = this;
recv_initial_metadata_args.transport = t;
recv_message_args.stream = this;
recv_message_args.transport = t;
recv_trailing_metadata_args.stream = this;
recv_trailing_metadata_args.transport = t;
}
~grpc_binder_stream() {

@ -99,7 +99,7 @@ static void grpc_binder_unref_transport(grpc_binder_transport* t) {
static void register_stream_locked(void* arg, grpc_error_handle /*error*/) {
RegisterStreamArgs* args = static_cast<RegisterStreamArgs*>(arg);
args->gbt->registered_stream[args->gbs->GetTxCode()] = args->gbs;
args->transport->registered_stream[args->stream->GetTxCode()] = args->stream;
}
void grpc_binder_transport::InitStream(grpc_stream* gs,
@ -114,14 +114,14 @@ void grpc_binder_transport::InitStream(grpc_stream* gs,
// `grpc_binder_transport::registered_stream` should only be updated in
// combiner
grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
gbs->register_stream_args.gbs = gbs;
gbs->register_stream_args.gbt = this;
grpc_binder_stream* stream = reinterpret_cast<grpc_binder_stream*>(gs);
stream->register_stream_args.stream = stream;
stream->register_stream_args.transport = this;
grpc_core::ExecCtx exec_ctx;
combiner->Run(
GRPC_CLOSURE_INIT(&gbs->register_stream_closure, register_stream_locked,
&gbs->register_stream_args, nullptr),
absl::OkStatus());
combiner->Run(GRPC_CLOSURE_INIT(&stream->register_stream_closure,
register_stream_locked,
&stream->register_stream_args, nullptr),
absl::OkStatus());
}
static void AssignMetadata(grpc_metadata_batch* mb,
@ -137,35 +137,36 @@ static void AssignMetadata(grpc_metadata_batch* mb,
}
}
static void cancel_stream_locked(grpc_binder_transport* gbt,
grpc_binder_stream* gbs,
static void cancel_stream_locked(grpc_binder_transport* transport,
grpc_binder_stream* stream,
grpc_error_handle error) {
gpr_log(GPR_INFO, "cancel_stream_locked");
if (!gbs->is_closed) {
GPR_ASSERT(gbs->cancel_self_error.ok());
gbs->is_closed = true;
gbs->cancel_self_error = error;
gbt->transport_stream_receiver->CancelStream(gbs->tx_code);
gbt->registered_stream.erase(gbs->tx_code);
if (gbs->recv_initial_metadata_ready != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready,
error);
gbs->recv_initial_metadata_ready = nullptr;
gbs->recv_initial_metadata = nullptr;
gbs->trailing_metadata_available = nullptr;
if (!stream->is_closed) {
GPR_ASSERT(stream->cancel_self_error.ok());
stream->is_closed = true;
stream->cancel_self_error = error;
transport->transport_stream_receiver->CancelStream(stream->tx_code);
transport->registered_stream.erase(stream->tx_code);
if (stream->recv_initial_metadata_ready != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
stream->recv_initial_metadata_ready, error);
stream->recv_initial_metadata_ready = nullptr;
stream->recv_initial_metadata = nullptr;
stream->trailing_metadata_available = nullptr;
}
if (gbs->recv_message_ready != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready, error);
gbs->recv_message_ready = nullptr;
gbs->recv_message->reset();
gbs->recv_message = nullptr;
gbs->call_failed_before_recv_message = nullptr;
if (stream->recv_message_ready != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream->recv_message_ready,
error);
stream->recv_message_ready = nullptr;
stream->recv_message->reset();
stream->recv_message = nullptr;
stream->call_failed_before_recv_message = nullptr;
}
if (gbs->recv_trailing_metadata_finished != nullptr) {
if (stream->recv_trailing_metadata_finished != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
gbs->recv_trailing_metadata_finished, error);
gbs->recv_trailing_metadata_finished = nullptr;
gbs->recv_trailing_metadata = nullptr;
stream->recv_trailing_metadata_finished, error);
stream->recv_trailing_metadata_finished = nullptr;
stream->recv_trailing_metadata = nullptr;
}
}
}
@ -187,53 +188,53 @@ static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) {
static void recv_initial_metadata_locked(void* arg,
grpc_error_handle /*error*/) {
RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg);
grpc_binder_stream* gbs = args->gbs;
grpc_binder_stream* stream = args->stream;
gpr_log(GPR_INFO,
"recv_initial_metadata_locked is_client = %d is_closed = %d",
gbs->is_client, gbs->is_closed);
stream->is_client, stream->is_closed);
if (!gbs->is_closed) {
if (!stream->is_closed) {
grpc_error_handle error = [&] {
GPR_ASSERT(gbs->recv_initial_metadata);
GPR_ASSERT(gbs->recv_initial_metadata_ready);
GPR_ASSERT(stream->recv_initial_metadata);
GPR_ASSERT(stream->recv_initial_metadata_ready);
if (!args->initial_metadata.ok()) {
gpr_log(GPR_ERROR, "Failed to parse initial metadata");
return absl_status_to_grpc_error(args->initial_metadata.status());
}
if (!gbs->is_client) {
if (!stream->is_client) {
// For server, we expect :authority and :path in initial metadata.
if (!ContainsAuthorityAndPath(*args->initial_metadata)) {
return GRPC_ERROR_CREATE(
"Missing :authority or :path in initial metadata");
}
}
AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata);
AssignMetadata(stream->recv_initial_metadata, *args->initial_metadata);
return absl::OkStatus();
}();
if (gbs->t->registered_method_matcher_cb != nullptr) {
gbs->t->registered_method_matcher_cb(gbs->t->accept_stream_user_data,
gbs->recv_initial_metadata);
if (stream->t->registered_method_matcher_cb != nullptr) {
stream->t->registered_method_matcher_cb(
stream->t->accept_stream_user_data, stream->recv_initial_metadata);
}
grpc_closure* cb = gbs->recv_initial_metadata_ready;
gbs->recv_initial_metadata_ready = nullptr;
gbs->recv_initial_metadata = nullptr;
grpc_closure* cb = stream->recv_initial_metadata_ready;
stream->recv_initial_metadata_ready = nullptr;
stream->recv_initial_metadata = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
GRPC_BINDER_STREAM_UNREF(stream, "recv_initial_metadata");
}
static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg);
grpc_binder_stream* gbs = args->gbs;
grpc_binder_stream* stream = args->stream;
gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d",
gbs->is_client, gbs->is_closed);
stream->is_client, stream->is_closed);
if (!gbs->is_closed) {
if (!stream->is_closed) {
grpc_error_handle error = [&] {
GPR_ASSERT(gbs->recv_message);
GPR_ASSERT(gbs->recv_message_ready);
GPR_ASSERT(stream->recv_message);
GPR_ASSERT(stream->recv_message_ready);
if (!args->message.ok()) {
gpr_log(GPR_ERROR, "Failed to receive message");
if (args->message.status().message() ==
@ -249,62 +250,63 @@ static void recv_message_locked(void* arg, grpc_error_handle /*error*/) {
}
grpc_core::SliceBuffer buf;
buf.Append(grpc_core::Slice(grpc_slice_from_cpp_string(*args->message)));
*gbs->recv_message = std::move(buf);
*stream->recv_message = std::move(buf);
return absl::OkStatus();
}();
if (!error.ok() && gbs->call_failed_before_recv_message != nullptr) {
*gbs->call_failed_before_recv_message = true;
if (!error.ok() && stream->call_failed_before_recv_message != nullptr) {
*stream->call_failed_before_recv_message = true;
}
grpc_closure* cb = gbs->recv_message_ready;
gbs->recv_message_ready = nullptr;
gbs->recv_message = nullptr;
grpc_closure* cb = stream->recv_message_ready;
stream->recv_message_ready = nullptr;
stream->recv_message = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
GRPC_BINDER_STREAM_UNREF(stream, "recv_message");
}
static void recv_trailing_metadata_locked(void* arg,
grpc_error_handle /*error*/) {
RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg);
grpc_binder_stream* gbs = args->gbs;
grpc_binder_stream* stream = args->stream;
gpr_log(GPR_INFO,
"recv_trailing_metadata_locked is_client = %d is_closed = %d",
gbs->is_client, gbs->is_closed);
stream->is_client, stream->is_closed);
if (!gbs->is_closed) {
if (!stream->is_closed) {
grpc_error_handle error = [&] {
GPR_ASSERT(gbs->recv_trailing_metadata);
GPR_ASSERT(gbs->recv_trailing_metadata_finished);
GPR_ASSERT(stream->recv_trailing_metadata);
GPR_ASSERT(stream->recv_trailing_metadata_finished);
if (!args->trailing_metadata.ok()) {
gpr_log(GPR_ERROR, "Failed to receive trailing metadata");
return absl_status_to_grpc_error(args->trailing_metadata.status());
}
if (!gbs->is_client) {
if (!stream->is_client) {
// Client will not send non-empty trailing metadata.
if (!args->trailing_metadata.value().empty()) {
gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata.");
return absl::CancelledError();
}
} else {
AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata);
AssignMetadata(stream->recv_trailing_metadata,
*args->trailing_metadata);
// Append status to metadata
// TODO(b/192208695): See if we can avoid to manually put status
// code into the header
gpr_log(GPR_INFO, "status = %d", args->status);
gbs->recv_trailing_metadata->Set(
stream->recv_trailing_metadata->Set(
grpc_core::GrpcStatusMetadata(),
static_cast<grpc_status_code>(args->status));
}
return absl::OkStatus();
}();
if (gbs->is_client || gbs->trailing_metadata_sent) {
grpc_closure* cb = gbs->recv_trailing_metadata_finished;
gbs->recv_trailing_metadata_finished = nullptr;
gbs->recv_trailing_metadata = nullptr;
if (stream->is_client || stream->trailing_metadata_sent) {
grpc_closure* cb = stream->recv_trailing_metadata_finished;
stream->recv_trailing_metadata_finished = nullptr;
stream->recv_trailing_metadata = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
} else {
// According to transport explaineer - "Server extra: This op shouldn't
@ -313,10 +315,10 @@ static void recv_trailing_metadata_locked(void* arg,
//
// We haven't sent trailing metadata yet, so we have to delay completing
// the recv_trailing_metadata callback.
gbs->need_to_call_trailing_metadata_callback = true;
stream->need_to_call_trailing_metadata_callback = true;
}
}
GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
GRPC_BINDER_STREAM_UNREF(stream, "recv_trailing_metadata");
}
namespace grpc_binder {
@ -366,15 +368,16 @@ class MetadataEncoder {
} // namespace grpc_binder
static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) {
grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
if (gbt->accept_stream_fn) {
grpc_binder_transport* transport = static_cast<grpc_binder_transport*>(gt);
if (transport->accept_stream_fn) {
gpr_log(GPR_INFO, "Accepting a stream");
// must pass in a non-null value.
(*gbt->accept_stream_fn)(gbt->accept_stream_user_data, gbt, gbt);
(*transport->accept_stream_fn)(transport->accept_stream_user_data,
transport, transport);
} else {
++gbt->accept_stream_fn_called_count_;
++transport->accept_stream_fn_called_count_;
gpr_log(GPR_INFO, "accept_stream_fn not set, current count = %d",
gbt->accept_stream_fn_called_count_);
transport->accept_stream_fn_called_count_);
}
}
@ -382,34 +385,35 @@ static void perform_stream_op_locked(void* stream_op,
grpc_error_handle /*error*/) {
grpc_transport_stream_op_batch* op =
static_cast<grpc_transport_stream_op_batch*>(stream_op);
grpc_binder_stream* gbs =
grpc_binder_stream* stream =
static_cast<grpc_binder_stream*>(op->handler_private.extra_arg);
grpc_binder_transport* gbt = gbs->t;
grpc_binder_transport* transport = stream->t;
if (op->cancel_stream) {
// TODO(waynetu): Is this true?
GPR_ASSERT(!op->send_initial_metadata && !op->send_message &&
!op->send_trailing_metadata && !op->recv_initial_metadata &&
!op->recv_message && !op->recv_trailing_metadata);
gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client);
if (!gbs->is_client) {
gpr_log(GPR_INFO, "cancel_stream is_client = %d", stream->is_client);
if (!stream->is_client) {
// Send trailing metadata to inform the other end about the cancellation,
// regardless if we'd already done that or not.
auto cancel_tx = std::make_unique<grpc_binder::Transaction>(
gbs->GetTxCode(), gbt->is_client);
stream->GetTxCode(), transport->is_client);
cancel_tx->SetSuffix(grpc_binder::Metadata{});
cancel_tx->SetStatus(1);
(void)gbt->wire_writer->RpcCall(std::move(cancel_tx));
(void)transport->wire_writer->RpcCall(std::move(cancel_tx));
}
cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error);
cancel_stream_locked(transport, stream,
op->payload->cancel_stream.cancel_error);
if (op->on_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
absl::OkStatus());
}
GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op");
return;
}
if (gbs->is_closed) {
if (stream->is_closed) {
if (op->send_message) {
// Reset the send_message payload to prevent memory leaks.
op->payload->send_message.send_message->Clear();
@ -418,36 +422,38 @@ static void perform_stream_op_locked(void* stream_op,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
gbs->cancel_self_error);
stream->cancel_self_error);
}
if (op->recv_message) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
gbs->cancel_self_error);
stream->cancel_self_error);
}
if (op->recv_trailing_metadata) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
gbs->cancel_self_error);
stream->cancel_self_error);
}
if (op->on_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
gbs->cancel_self_error);
stream->cancel_self_error);
}
GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op");
return;
}
int tx_code = gbs->tx_code;
auto tx = std::make_unique<grpc_binder::Transaction>(tx_code, gbt->is_client);
int tx_code = stream->tx_code;
auto tx =
std::make_unique<grpc_binder::Transaction>(tx_code, transport->is_client);
if (op->send_initial_metadata) {
gpr_log(GPR_INFO, "send_initial_metadata");
grpc_binder::Metadata init_md;
auto batch = op->payload->send_initial_metadata.send_initial_metadata;
grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), &init_md);
grpc_binder::MetadataEncoder encoder(transport->is_client, tx.get(),
&init_md);
batch->Encode(&encoder);
tx->SetPrefix(init_md);
}
@ -461,7 +467,7 @@ static void perform_stream_op_locked(void* stream_op,
auto batch = op->payload->send_trailing_metadata.send_trailing_metadata;
grpc_binder::Metadata trailing_metadata;
grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(),
grpc_binder::MetadataEncoder encoder(transport->is_client, tx.get(),
&trailing_metadata);
batch->Encode(&encoder);
@ -471,68 +477,70 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->recv_initial_metadata) {
gpr_log(GPR_INFO, "recv_initial_metadata");
gbs->recv_initial_metadata_ready =
stream->recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
gbs->recv_initial_metadata =
stream->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
gbs->trailing_metadata_available =
stream->trailing_metadata_available =
op->payload->recv_initial_metadata.trailing_metadata_available;
GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata");
gbt->transport_stream_receiver->RegisterRecvInitialMetadata(
tx_code, [tx_code, gbs,
gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
GRPC_BINDER_STREAM_REF(stream, "recv_initial_metadata");
transport->transport_stream_receiver->RegisterRecvInitialMetadata(
tx_code, [tx_code, stream, transport](
absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
grpc_core::ExecCtx exec_ctx;
gbs->recv_initial_metadata_args.tx_code = tx_code;
gbs->recv_initial_metadata_args.initial_metadata =
stream->recv_initial_metadata_args.tx_code = tx_code;
stream->recv_initial_metadata_args.initial_metadata =
std::move(initial_metadata);
gbt->combiner->Run(
GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure,
transport->combiner->Run(
GRPC_CLOSURE_INIT(&stream->recv_initial_metadata_closure,
recv_initial_metadata_locked,
&gbs->recv_initial_metadata_args, nullptr),
&stream->recv_initial_metadata_args, nullptr),
absl::OkStatus());
});
}
if (op->recv_message) {
gpr_log(GPR_INFO, "recv_message");
gbs->recv_message_ready = op->payload->recv_message.recv_message_ready;
gbs->recv_message = op->payload->recv_message.recv_message;
gbs->call_failed_before_recv_message =
stream->recv_message_ready = op->payload->recv_message.recv_message_ready;
stream->recv_message = op->payload->recv_message.recv_message;
stream->call_failed_before_recv_message =
op->payload->recv_message.call_failed_before_recv_message;
if (op->payload->recv_message.flags != nullptr) {
*op->payload->recv_message.flags = 0;
}
GRPC_BINDER_STREAM_REF(gbs, "recv_message");
gbt->transport_stream_receiver->RegisterRecvMessage(
tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
GRPC_BINDER_STREAM_REF(stream, "recv_message");
transport->transport_stream_receiver->RegisterRecvMessage(
tx_code,
[tx_code, stream, transport](absl::StatusOr<std::string> message) {
grpc_core::ExecCtx exec_ctx;
gbs->recv_message_args.tx_code = tx_code;
gbs->recv_message_args.message = std::move(message);
gbt->combiner->Run(
GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked,
&gbs->recv_message_args, nullptr),
stream->recv_message_args.tx_code = tx_code;
stream->recv_message_args.message = std::move(message);
transport->combiner->Run(
GRPC_CLOSURE_INIT(&stream->recv_message_closure,
recv_message_locked, &stream->recv_message_args,
nullptr),
absl::OkStatus());
});
}
if (op->recv_trailing_metadata) {
gpr_log(GPR_INFO, "recv_trailing_metadata");
gbs->recv_trailing_metadata_finished =
stream->recv_trailing_metadata_finished =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
gbs->recv_trailing_metadata =
stream->recv_trailing_metadata =
op->payload->recv_trailing_metadata.recv_trailing_metadata;
GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata");
gbt->transport_stream_receiver->RegisterRecvTrailingMetadata(
tx_code, [tx_code, gbs, gbt](
GRPC_BINDER_STREAM_REF(stream, "recv_trailing_metadata");
transport->transport_stream_receiver->RegisterRecvTrailingMetadata(
tx_code, [tx_code, stream, transport](
absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
int status) {
grpc_core::ExecCtx exec_ctx;
gbs->recv_trailing_metadata_args.tx_code = tx_code;
gbs->recv_trailing_metadata_args.trailing_metadata =
stream->recv_trailing_metadata_args.tx_code = tx_code;
stream->recv_trailing_metadata_args.trailing_metadata =
std::move(trailing_metadata);
gbs->recv_trailing_metadata_args.status = status;
gbt->combiner->Run(
GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure,
stream->recv_trailing_metadata_args.status = status;
transport->combiner->Run(
GRPC_CLOSURE_INIT(&stream->recv_trailing_metadata_closure,
recv_trailing_metadata_locked,
&gbs->recv_trailing_metadata_args, nullptr),
&stream->recv_trailing_metadata_args, nullptr),
absl::OkStatus());
});
}
@ -540,20 +548,20 @@ static void perform_stream_op_locked(void* stream_op,
absl::Status status;
if (op->send_initial_metadata || op->send_message ||
op->send_trailing_metadata) {
status = gbt->wire_writer->RpcCall(std::move(tx));
if (!gbs->is_client && op->send_trailing_metadata) {
gbs->trailing_metadata_sent = true;
status = transport->wire_writer->RpcCall(std::move(tx));
if (!stream->is_client && op->send_trailing_metadata) {
stream->trailing_metadata_sent = true;
// According to transport explaineer - "Server extra: This op shouldn't
// actually be considered complete until the server has also sent trailing
// metadata to provide the other side with final status"
//
// Because we've done sending trailing metadata here, we can safely
// complete the recv_trailing_metadata callback here.
if (gbs->need_to_call_trailing_metadata_callback) {
grpc_closure* cb = gbs->recv_trailing_metadata_finished;
gbs->recv_trailing_metadata_finished = nullptr;
if (stream->need_to_call_trailing_metadata_callback) {
grpc_closure* cb = stream->recv_trailing_metadata_finished;
stream->recv_trailing_metadata_finished = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus());
gbs->need_to_call_trailing_metadata_callback = false;
stream->need_to_call_trailing_metadata_callback = false;
}
}
}
@ -564,27 +572,28 @@ static void perform_stream_op_locked(void* stream_op,
absl_status_to_grpc_error(status));
gpr_log(GPR_INFO, "on_complete closure scheduled");
}
GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op");
GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op");
}
void grpc_binder_transport::PerformStreamOp(
grpc_stream* gs, grpc_transport_stream_op_batch* op) {
grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
grpc_binder_stream* stream = reinterpret_cast<grpc_binder_stream*>(gs);
gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, this, gs, op,
gbs->is_client);
GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op");
op->handler_private.extra_arg = gbs;
stream->is_client);
GRPC_BINDER_STREAM_REF(stream, "perform_stream_op");
op->handler_private.extra_arg = stream;
combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_stream_op_locked, op, nullptr),
absl::OkStatus());
}
static void close_transport_locked(grpc_binder_transport* gbt) {
gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
"transport closed due to disconnection/goaway");
while (!gbt->registered_stream.empty()) {
static void close_transport_locked(grpc_binder_transport* transport) {
transport->state_tracker.SetState(
GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
"transport closed due to disconnection/goaway");
while (!transport->registered_stream.empty()) {
cancel_stream_locked(
gbt, gbt->registered_stream.begin()->second,
transport, transport->registered_stream.begin()->second,
grpc_error_set_int(GRPC_ERROR_CREATE("transport closed"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
@ -594,26 +603,28 @@ static void close_transport_locked(grpc_binder_transport* gbt) {
static void perform_transport_op_locked(void* transport_op,
grpc_error_handle /*error*/) {
grpc_transport_op* op = static_cast<grpc_transport_op*>(transport_op);
grpc_binder_transport* gbt =
grpc_binder_transport* transport =
static_cast<grpc_binder_transport*>(op->handler_private.extra_arg);
// TODO(waynetu): Should we lock here to avoid data race?
if (op->start_connectivity_watch != nullptr) {
gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
transport->state_tracker.AddWatcher(
op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
transport->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
}
if (op->set_accept_stream) {
gbt->accept_stream_user_data = op->set_accept_stream_user_data;
gbt->accept_stream_fn = op->set_accept_stream_fn;
gbt->registered_method_matcher_cb = op->set_registered_method_matcher_fn;
transport->accept_stream_user_data = op->set_accept_stream_user_data;
transport->accept_stream_fn = op->set_accept_stream_fn;
transport->registered_method_matcher_cb =
op->set_registered_method_matcher_fn;
gpr_log(GPR_DEBUG, "accept_stream_fn_called_count_ = %d",
gbt->accept_stream_fn_called_count_);
while (gbt->accept_stream_fn_called_count_ > 0) {
--gbt->accept_stream_fn_called_count_;
gbt->combiner->Run(
GRPC_CLOSURE_CREATE(accept_stream_locked, gbt, nullptr),
transport->accept_stream_fn_called_count_);
while (transport->accept_stream_fn_called_count_ > 0) {
--transport->accept_stream_fn_called_count_;
transport->combiner->Run(
GRPC_CLOSURE_CREATE(accept_stream_locked, transport, nullptr),
absl::OkStatus());
}
}
@ -628,9 +639,9 @@ static void perform_transport_op_locked(void* transport_op,
do_close = true;
}
if (do_close) {
close_transport_locked(gbt);
close_transport_locked(transport);
}
GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op");
GRPC_BINDER_UNREF_TRANSPORT(transport, "perform_transport_op");
}
void grpc_binder_transport::PerformOp(grpc_transport_op* op) {
@ -643,34 +654,35 @@ void grpc_binder_transport::PerformOp(grpc_transport_op* op) {
}
static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(sp);
grpc_binder_transport* gbt = gbs->t;
grpc_binder_stream* stream = static_cast<grpc_binder_stream*>(sp);
grpc_binder_transport* transport = stream->t;
cancel_stream_locked(
gbt, gbs,
transport, stream,
grpc_error_set_int(GRPC_ERROR_CREATE("destroy stream"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
gbs->~grpc_binder_stream();
stream->~grpc_binder_stream();
}
void grpc_binder_transport::DestroyStream(grpc_stream* gs,
grpc_closure* then_schedule_closure) {
gpr_log(GPR_INFO, __func__);
grpc_binder_stream* gbs = reinterpret_cast<grpc_binder_stream*>(gs);
gbs->destroy_stream_then_closure = then_schedule_closure;
gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream,
destroy_stream_locked, gbs, nullptr),
absl::OkStatus());
grpc_binder_stream* stream = reinterpret_cast<grpc_binder_stream*>(gs);
stream->destroy_stream_then_closure = then_schedule_closure;
stream->t->combiner->Run(
GRPC_CLOSURE_INIT(&stream->destroy_stream, destroy_stream_locked, stream,
nullptr),
absl::OkStatus());
}
static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) {
grpc_binder_transport* gbt = static_cast<grpc_binder_transport*>(gt);
close_transport_locked(gbt);
grpc_binder_transport* transport = static_cast<grpc_binder_transport*>(gt);
close_transport_locked(transport);
// Release the references held by the transport.
gbt->wire_reader = nullptr;
gbt->transport_stream_receiver = nullptr;
gbt->wire_writer = nullptr;
GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed");
transport->wire_reader = nullptr;
transport->transport_stream_receiver = nullptr;
transport->wire_writer = nullptr;
GRPC_BINDER_UNREF_TRANSPORT(transport, "transport destroyed");
}
void grpc_binder_transport::Orphan() {

Loading…
Cancel
Save