diff --git a/src/core/ext/transport/inproc/legacy_inproc_transport.cc b/src/core/ext/transport/inproc/legacy_inproc_transport.cc index fd8699f9b99..d052f59c5c0 100644 --- a/src/core/ext/transport/inproc/legacy_inproc_transport.cc +++ b/src/core/ext/transport/inproc/legacy_inproc_transport.cc @@ -40,7 +40,6 @@ #include #include #include -#include #include #include @@ -69,13 +68,6 @@ #include "src/core/lib/transport/transport.h" #include "src/core/server/server.h" -#define INPROC_LOG(...) \ - do { \ - if (GRPC_TRACE_FLAG_ENABLED(inproc)) { \ - gpr_log(__VA_ARGS__); \ - } \ - } while (0) - namespace { struct inproc_stream; bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error); @@ -149,16 +141,16 @@ struct inproc_transport final : public grpc_core::FilterStackTransport { void Orphan() override; void ref() { - INPROC_LOG(GPR_INFO, "ref_transport %p", this); + GRPC_TRACE_LOG(inproc, INFO) << "ref_transport " << this; gpr_ref(&refs); } void unref() { - INPROC_LOG(GPR_INFO, "unref_transport %p", this); + GRPC_TRACE_LOG(inproc, INFO) << "unref_transport " << this; if (!gpr_unref(&refs)) { return; } - INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); + GRPC_TRACE_LOG(inproc, INFO) << "really_destroy_transport " << this; this->~inproc_transport(); gpr_free(this); } @@ -202,8 +194,9 @@ struct inproc_stream { // Pass the client-side stream address to the server-side for a ref ref("inproc_init_stream:clt"); // ref it now on behalf of server // side to avoid destruction - INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", - st->accept_stream_cb, st->accept_stream_data); + GRPC_TRACE_LOG(inproc, INFO) + << "calling accept stream cb " << st->accept_stream_cb << " " + << st->accept_stream_data; (*st->accept_stream_cb)(st->accept_stream_data, t, this); } else { // This is the server-side and is being called through accept_stream_cb @@ -252,12 +245,12 @@ struct inproc_stream { #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) #endif void ref(const char* reason) { - INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); + GRPC_TRACE_LOG(inproc, INFO) << "ref_stream " << this << " " << reason; STREAM_REF(refs, reason); } void unref(const char* reason) { - INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); + GRPC_TRACE_LOG(inproc, INFO) << "unref_stream " << this << " " << reason; STREAM_UNREF(refs, reason); } #undef STREAM_REF @@ -373,7 +366,8 @@ void inproc_transport::InitStream(grpc_stream* gs, grpc_stream_refcount* refcount, const void* server_data, grpc_core::Arena* arena) { - INPROC_LOG(GPR_INFO, "init_stream %p %p %p", this, gs, server_data); + GRPC_TRACE_LOG(inproc, INFO) + << "init_stream " << this << " " << gs << " " << server_data; new (gs) inproc_stream(this, refcount, server_data, arena); } @@ -435,8 +429,9 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error, int is_rtm = static_cast(op == s->recv_trailing_md_op); if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { - INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << msg << " " << s << " " << op << " " << op->on_complete << " " + << grpc_core::StatusToString(error); grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error); } } @@ -449,7 +444,7 @@ void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) { } void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { - INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); + GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s << " fail_helper"; // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata if (!s->trailing_md_sent) { @@ -501,10 +496,10 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { *s->recv_initial_md_op->payload->recv_initial_metadata .trailing_metadata_available = true; } - INPROC_LOG(GPR_INFO, - "fail_helper %p scheduling initial-metadata-ready %s %s", s, - grpc_core::StatusToString(error).c_str(), - grpc_core::StatusToString(err).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "fail_helper " << s << " scheduling initial-metadata-ready " + << grpc_core::StatusToString(error) << " " + << grpc_core::StatusToString(err); grpc_core::ExecCtx::Run( DEBUG_LOCATION, s->recv_initial_md_op->payload->recv_initial_metadata @@ -518,8 +513,9 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { s->recv_initial_md_op = nullptr; } if (s->recv_message_op) { - INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "fail_helper " << s << " scheduling message-ready " + << grpc_core::StatusToString(error); if (s->recv_message_op->payload->recv_message .call_failed_before_recv_message != nullptr) { *s->recv_message_op->payload->recv_message @@ -547,15 +543,17 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { s->send_trailing_md_op = nullptr; } if (s->recv_trailing_md_op) { - INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s", - s, grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "fail_helper " << s << " scheduling trailing-metadata-ready " + << grpc_core::StatusToString(error); grpc_core::ExecCtx::Run( DEBUG_LOCATION, s->recv_trailing_md_op->payload->recv_trailing_metadata .recv_trailing_metadata_ready, error); - INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s", - s, grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "fail_helper " << s << " scheduling trailing-md-on-complete " + << grpc_core::StatusToString(error); complete_if_batch_end_locked( s, error, s->recv_trailing_md_op, "fail_helper scheduling recv-trailing-metadata-on-complete"); @@ -579,8 +577,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { *receiver->recv_message_op->payload->recv_message.flags = sender->send_message_op->payload->send_message.flags; - INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready", - receiver); + GRPC_TRACE_LOG(inproc, INFO) + << "message_transfer_locked " << receiver << " scheduling message-ready"; grpc_core::ExecCtx::Run( DEBUG_LOCATION, receiver->recv_message_op->payload->recv_message.recv_message_ready, @@ -606,7 +604,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { bool needs_close = false; - INPROC_LOG(GPR_INFO, "op_state_machine %p", s); + GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s; // cancellation takes precedence inproc_stream* other = s->other_side; @@ -653,7 +651,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { : &other->to_read_trailing_md_filled; if (*destfilled || s->trailing_md_sent) { // The buffer is already in use; that's an error! - INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s); + GRPC_TRACE_LOG(inproc, INFO) << "Extra trailing metadata " << s; new_err = GRPC_ERROR_CREATE("Extra trailing metadata"); fail_helper_locked(s, new_err); goto done; @@ -669,15 +667,15 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true; } if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready", s); + GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s + << " scheduling trailing-metadata-ready"; grpc_core::ExecCtx::Run( DEBUG_LOCATION, s->recv_trailing_md_op->payload->recv_trailing_metadata .recv_trailing_metadata_ready, absl::OkStatus()); - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-md-on-complete", s); + GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s + << " scheduling trailing-md-on-complete"; grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->recv_trailing_md_op->on_complete, absl::OkStatus()); @@ -694,11 +692,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { if (s->recv_initial_md_op) { if (s->initial_md_recvd) { new_err = GRPC_ERROR_CREATE("Already recvd initial md"); - INPROC_LOG( - GPR_INFO, - "op_state_machine %p scheduling on_complete errors for already " - "recvd initial md %s", - s, grpc_core::StatusToString(new_err).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s + << " scheduling on_complete errors for already " + "recvd initial md " + << grpc_core::StatusToString(new_err); fail_helper_locked(s, new_err); goto done; } @@ -749,20 +747,20 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { if (s->trailing_md_recvd_implicit_only) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p already implicitly received trailing " - "metadata, so ignoring new trailing metadata from client", - s); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s + << " already implicitly received trailing metadata, so " + "ignoring new trailing metadata from client"; s->to_read_trailing_md.Clear(); s->to_read_trailing_md_filled = false; s->trailing_md_recvd_implicit_only = false; } else { new_err = GRPC_ERROR_CREATE("Already recvd trailing md"); - INPROC_LOG( - GPR_INFO, - "op_state_machine %p scheduling on_complete errors for already " - "recvd trailing md %s", - s, grpc_core::StatusToString(new_err).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s + << " scheduling on_complete errors for already recvd trailing " + "md " + << grpc_core::StatusToString(new_err); fail_helper_locked(s, new_err); goto done; } @@ -771,7 +769,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { // This message needs to be wrapped up because it will never be // satisfied s->recv_message_op->payload->recv_message.recv_message->reset(); - INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s << " scheduling message-ready"; grpc_core::ExecCtx::Run( DEBUG_LOCATION, s->recv_message_op->payload->recv_message.recv_message_ready, @@ -822,9 +821,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { needs_close = s->trailing_md_sent; } } else if (!s->trailing_md_recvd) { - INPROC_LOG( - GPR_INFO, - "op_state_machine %p has trailing md but not yet waiting for it", s); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s + << " has trailing md but not yet waiting for it"; } } if (!s->t->is_client && s->trailing_md_sent && @@ -832,8 +831,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { // In this case, we don't care to receive the write-close from the client // because we have already sent status and the RPC is over as far as we // are concerned. - INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s", - s, grpc_core::StatusToString(new_err).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s << " scheduling trailing-md-ready " + << grpc_core::StatusToString(new_err); grpc_core::ExecCtx::Run( DEBUG_LOCATION, s->recv_trailing_md_op->payload->recv_trailing_metadata @@ -851,7 +851,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { if (s->trailing_md_recvd && s->recv_message_op) { // No further message will come on this stream, so finish off the // recv_message_op - INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s << " scheduling message-ready"; s->recv_message_op->payload->recv_message.recv_message->reset(); grpc_core::ExecCtx::Run( DEBUG_LOCATION, @@ -873,12 +874,12 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { } if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || s->recv_message_op || s->recv_trailing_md_op) { - // Didn't get the item we wanted so we still need to get - // rescheduled - INPROC_LOG( - GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s, - s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, - s->recv_message_op, s->recv_trailing_md_op); + // Didn't get the item we wanted so we still need to get rescheduled + GRPC_TRACE_LOG(inproc, INFO) + << "op_state_machine " << s << " still needs closure " + << s->send_message_op << " " << s->send_trailing_md_op << " " + << s->recv_initial_md_op << " " << s->recv_message_op << " " + << s->recv_trailing_md_op; s->ops_needed = true; } done: @@ -890,8 +891,8 @@ done: bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { bool ret = false; // was the cancel accepted - INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "cancel_stream " << s << " with " << grpc_core::StatusToString(error); if (s->cancel_self_error.ok()) { ret = true; s->cancel_self_error = error; @@ -944,7 +945,8 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { void inproc_transport::PerformStreamOp(grpc_stream* gs, grpc_transport_stream_op_batch* op) { - INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", this, gs, op); + GRPC_TRACE_LOG(inproc, INFO) + << "perform_stream_op " << this << " " << gs << " " << op; inproc_stream* s = reinterpret_cast(gs); gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed gpr_mu_lock(mu); @@ -980,14 +982,15 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, // already self-canceled so still give it an error error = s->cancel_self_error; } else { - INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s, - s->t->is_client ? "client" : "server", - op->send_initial_metadata ? " send_initial_metadata" : "", - op->send_message ? " send_message" : "", - op->send_trailing_metadata ? " send_trailing_metadata" : "", - op->recv_initial_metadata ? " recv_initial_metadata" : "", - op->recv_message ? " recv_message" : "", - op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); + GRPC_TRACE_LOG(inproc, INFO) + << "perform_stream_op " << s + << (s->t->is_client ? " client" : " server") + << (op->send_initial_metadata ? " send_initial_metadata" : "") + << (op->send_message ? " send_message" : "") + << (op->send_trailing_metadata ? " send_trailing_metadata" : "") + << (op->recv_initial_metadata ? " recv_initial_metadata" : "") + << (op->recv_message ? " recv_message" : "") + << (op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); } inproc_stream* other = s->other_side; @@ -1003,7 +1006,7 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, : &other->to_read_initial_md_filled; if (*destfilled || s->initial_md_sent) { // The buffer is already in use; that's an error! - INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); + GRPC_TRACE_LOG(inproc, INFO) << "Extra initial metadata " << s; error = GRPC_ERROR_CREATE("Extra initial metadata"); } else { if (!s->other_side_closed) { @@ -1081,20 +1084,18 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, *op->payload->recv_initial_metadata.trailing_metadata_available = true; } - INPROC_LOG( - GPR_INFO, - "perform_stream_op error %p scheduling initial-metadata-ready %s", - s, grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s + << " scheduling initial-metadata-ready " + << grpc_core::StatusToString(error); grpc_core::ExecCtx::Run( DEBUG_LOCATION, op->payload->recv_initial_metadata.recv_initial_metadata_ready, error); } if (op->recv_message) { - INPROC_LOG( - GPR_INFO, - "perform_stream_op error %p scheduling recv message-ready %s", s, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s + << " scheduling recv message-ready " + << grpc_core::StatusToString(error); if (op->payload->recv_message.call_failed_before_recv_message != nullptr) { *op->payload->recv_message.call_failed_before_recv_message = true; @@ -1104,25 +1105,27 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, error); } if (op->recv_trailing_metadata) { - INPROC_LOG(GPR_INFO, - "perform_stream_op error %p scheduling " - "trailing-metadata-ready %s", - s, grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s + << " scheduling trailing-metadata-ready " + << grpc_core::StatusToString(error); grpc_core::ExecCtx::Run( DEBUG_LOCATION, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, error); } } - INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_LOG(inproc, INFO) + << "perform_stream_op " << s << " scheduling on_complete " + << grpc_core::StatusToString(error); grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error); } gpr_mu_unlock(mu); } void close_transport_locked(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); + GRPC_TRACE_LOG(inproc, INFO) + << "close_transport " << t << " " << t->is_closed; + t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(), "close transport"); if (!t->is_closed) { @@ -1140,7 +1143,7 @@ void close_transport_locked(inproc_transport* t) { } void inproc_transport::PerformOp(grpc_transport_op* op) { - INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", this, op); + GRPC_TRACE_LOG(inproc, INFO) << "perform_transport_op " << this << " " << op; gpr_mu_lock(&mu->mu); if (op->start_connectivity_watch != nullptr) { state_tracker.AddWatcher(op->start_connectivity_watch_state, @@ -1174,7 +1177,8 @@ void inproc_transport::PerformOp(grpc_transport_op* op) { void inproc_transport::DestroyStream(grpc_stream* gs, grpc_closure* then_schedule_closure) { - INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); + GRPC_TRACE_LOG(inproc, INFO) + << "destroy_stream " << gs << " " << then_schedule_closure; inproc_stream* s = reinterpret_cast(gs); gpr_mu_lock(&mu->mu); close_stream_locked(s); @@ -1185,7 +1189,7 @@ void inproc_transport::DestroyStream(grpc_stream* gs, } void inproc_transport::Orphan() { - INPROC_LOG(GPR_INFO, "destroy_transport %p", this); + GRPC_TRACE_LOG(inproc, INFO) << "destroy_transport " << this; gpr_mu_lock(&mu->mu); close_transport_locked(this); gpr_mu_unlock(&mu->mu); @@ -1218,7 +1222,7 @@ void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/, // void inproc_transports_create(grpc_core::Transport** server_transport, grpc_core::Transport** client_transport) { - INPROC_LOG(GPR_INFO, "inproc_transports_create"); + GRPC_TRACE_LOG(inproc, INFO) << "inproc_transports_create"; shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); inproc_transport* st = new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false);