|
|
|
@ -29,8 +29,6 @@ |
|
|
|
|
#include "absl/strings/str_cat.h" |
|
|
|
|
#include "absl/strings/substitute.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/ext/transport/binder/transport/binder_stream.h" |
|
|
|
|
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" |
|
|
|
|
#include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h" |
|
|
|
@ -108,8 +106,8 @@ void grpc_binder_transport::InitStream(grpc_stream* gs, |
|
|
|
|
grpc_stream_refcount* refcount, |
|
|
|
|
const void* server_data, |
|
|
|
|
grpc_core::Arena* arena) { |
|
|
|
|
gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, this, gs, refcount, |
|
|
|
|
server_data, arena); |
|
|
|
|
LOG(INFO) << __func__ << " = " << this << " " << gs << " " << refcount << " " |
|
|
|
|
<< server_data << " " << arena; |
|
|
|
|
// Note that this function is not locked and may be invoked concurrently
|
|
|
|
|
new (gs) grpc_binder_stream(this, refcount, server_data, arena, |
|
|
|
|
NewStreamTxCode(), is_client); |
|
|
|
@ -132,9 +130,8 @@ static void AssignMetadata(grpc_metadata_batch* mb, |
|
|
|
|
for (auto& p : md) { |
|
|
|
|
mb->Append(p.first, grpc_core::Slice::FromCopiedString(p.second), |
|
|
|
|
[&](absl::string_view error, const grpc_core::Slice&) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, "Failed to parse metadata: %s", |
|
|
|
|
absl::StrCat("key=", p.first, " error=", error).c_str()); |
|
|
|
|
VLOG(2) << "Failed to parse metadata: " |
|
|
|
|
<< "key=" << p.first << " error=" << error; |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -192,9 +189,8 @@ static void recv_initial_metadata_locked(void* arg, |
|
|
|
|
RecvInitialMetadataArgs* args = static_cast<RecvInitialMetadataArgs*>(arg); |
|
|
|
|
grpc_binder_stream* stream = args->stream; |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"recv_initial_metadata_locked is_client = %d is_closed = %d", |
|
|
|
|
stream->is_client, stream->is_closed); |
|
|
|
|
LOG(INFO) << "recv_initial_metadata_locked is_client = " << stream->is_client |
|
|
|
|
<< " is_closed = " << stream->is_closed; |
|
|
|
|
|
|
|
|
|
if (!stream->is_closed) { |
|
|
|
|
grpc_error_handle error = [&] { |
|
|
|
@ -230,8 +226,8 @@ static void recv_message_locked(void* arg, grpc_error_handle /*error*/) { |
|
|
|
|
RecvMessageArgs* args = static_cast<RecvMessageArgs*>(arg); |
|
|
|
|
grpc_binder_stream* stream = args->stream; |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d", |
|
|
|
|
stream->is_client, stream->is_closed); |
|
|
|
|
LOG(INFO) << "recv_message_locked is_client = " << stream->is_client |
|
|
|
|
<< " is_closed = " << stream->is_closed; |
|
|
|
|
|
|
|
|
|
if (!stream->is_closed) { |
|
|
|
|
grpc_error_handle error = [&] { |
|
|
|
@ -273,9 +269,8 @@ static void recv_trailing_metadata_locked(void* arg, |
|
|
|
|
RecvTrailingMetadataArgs* args = static_cast<RecvTrailingMetadataArgs*>(arg); |
|
|
|
|
grpc_binder_stream* stream = args->stream; |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"recv_trailing_metadata_locked is_client = %d is_closed = %d", |
|
|
|
|
stream->is_client, stream->is_closed); |
|
|
|
|
LOG(INFO) << "recv_trailing_metadata_locked is_client = " << stream->is_client |
|
|
|
|
<< " is_closed = " << stream->is_closed; |
|
|
|
|
|
|
|
|
|
if (!stream->is_closed) { |
|
|
|
|
grpc_error_handle error = [&] { |
|
|
|
@ -378,8 +373,8 @@ static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) { |
|
|
|
|
transport, transport); |
|
|
|
|
} else { |
|
|
|
|
++transport->accept_stream_fn_called_count_; |
|
|
|
|
gpr_log(GPR_INFO, "accept_stream_fn not set, current count = %d", |
|
|
|
|
transport->accept_stream_fn_called_count_); |
|
|
|
|
LOG(INFO) << "accept_stream_fn not set, current count = " |
|
|
|
|
<< transport->accept_stream_fn_called_count_; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -580,8 +575,8 @@ static void perform_stream_op_locked(void* stream_op, |
|
|
|
|
void grpc_binder_transport::PerformStreamOp( |
|
|
|
|
grpc_stream* gs, grpc_transport_stream_op_batch* op) { |
|
|
|
|
grpc_binder_stream* stream = reinterpret_cast<grpc_binder_stream*>(gs); |
|
|
|
|
gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, this, gs, op, |
|
|
|
|
stream->is_client); |
|
|
|
|
LOG(INFO) << __func__ << " = " << this << " " << gs << " " << op |
|
|
|
|
<< " is_client = " << stream->is_client; |
|
|
|
|
GRPC_BINDER_STREAM_REF(stream, "perform_stream_op"); |
|
|
|
|
op->handler_private.extra_arg = stream; |
|
|
|
|
combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, |
|
|
|
@ -621,8 +616,8 @@ static void perform_transport_op_locked(void* transport_op, |
|
|
|
|
transport->accept_stream_fn = op->set_accept_stream_fn; |
|
|
|
|
transport->registered_method_matcher_cb = |
|
|
|
|
op->set_registered_method_matcher_fn; |
|
|
|
|
gpr_log(GPR_DEBUG, "accept_stream_fn_called_count_ = %d", |
|
|
|
|
transport->accept_stream_fn_called_count_); |
|
|
|
|
VLOG(2) << "accept_stream_fn_called_count_ = " |
|
|
|
|
<< transport->accept_stream_fn_called_count_; |
|
|
|
|
while (transport->accept_stream_fn_called_count_ > 0) { |
|
|
|
|
--transport->accept_stream_fn_called_count_; |
|
|
|
|
transport->combiner->Run( |
|
|
|
@ -647,7 +642,7 @@ static void perform_transport_op_locked(void* transport_op, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_binder_transport::PerformOp(grpc_transport_op* op) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
op->handler_private.extra_arg = this; |
|
|
|
|
GRPC_BINDER_REF_TRANSPORT(this, "perform_transport_op"); |
|
|
|
|
combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, |
|
|
|
@ -668,7 +663,7 @@ static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) { |
|
|
|
|
|
|
|
|
|
void grpc_binder_transport::DestroyStream(grpc_stream* gs, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
grpc_binder_stream* stream = reinterpret_cast<grpc_binder_stream*>(gs); |
|
|
|
|
stream->destroy_stream_then_closure = then_schedule_closure; |
|
|
|
|
stream->t->combiner->Run( |
|
|
|
@ -688,7 +683,7 @@ static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_binder_transport::Orphan() { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, this, nullptr), |
|
|
|
|
absl::OkStatus()); |
|
|
|
|
} |
|
|
|
@ -707,7 +702,7 @@ grpc_binder_transport::grpc_binder_transport( |
|
|
|
|
is_client ? "binder_transport_client" : "binder_transport_server", |
|
|
|
|
GRPC_CHANNEL_READY), |
|
|
|
|
refs(1, nullptr) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
transport_stream_receiver = |
|
|
|
|
std::make_shared<grpc_binder::TransportStreamReceiverImpl>( |
|
|
|
|
is_client, /*accept_stream_callback=*/[this] { |
|
|
|
@ -736,7 +731,7 @@ grpc_core::Transport* grpc_create_binder_transport_client( |
|
|
|
|
std::unique_ptr<grpc_binder::Binder> endpoint_binder, |
|
|
|
|
std::shared_ptr<grpc::experimental::binder::SecurityPolicy> |
|
|
|
|
security_policy) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
|
|
|
|
|
CHECK(endpoint_binder != nullptr); |
|
|
|
|
CHECK_NE(security_policy, nullptr); |
|
|
|
@ -751,7 +746,7 @@ grpc_core::Transport* grpc_create_binder_transport_server( |
|
|
|
|
std::unique_ptr<grpc_binder::Binder> client_binder, |
|
|
|
|
std::shared_ptr<grpc::experimental::binder::SecurityPolicy> |
|
|
|
|
security_policy) { |
|
|
|
|
gpr_log(GPR_INFO, __func__); |
|
|
|
|
LOG(INFO) << __func__; |
|
|
|
|
|
|
|
|
|
CHECK(client_binder != nullptr); |
|
|
|
|
CHECK_NE(security_policy, nullptr); |
|
|
|
|