Refactoring to clean up

pull/37188/head
tanvi-jagtap 5 months ago
parent 76b2bd6a3f
commit 5586305aa9
  1. 159
      src/core/ext/transport/inproc/legacy_inproc_transport.cc

@ -68,8 +68,6 @@
#include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h"
#define INPROC_INFO_LOG LOG_IF(INFO, GRPC_TRACE_FLAG_ENABLED(inproc))
namespace {
struct inproc_stream;
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
@ -143,16 +141,16 @@ struct inproc_transport final : public grpc_core::FilterStackTransport {
void Orphan() override;
void ref() {
INPROC_INFO_LOG << "ref_transport " << this;
GRPC_TRACE_LOG(inproc, INFO) << "ref_transport " << this;
gpr_ref(&refs);
}
void unref() {
INPROC_INFO_LOG << "unref_transport " << this;
GRPC_TRACE_LOG(inproc, INFO) << "unref_transport " << this;
if (!gpr_unref(&refs)) {
return;
}
INPROC_INFO_LOG << "really_destroy_transport " << this;
GRPC_TRACE_LOG(inproc, INFO) << "really_destroy_transport " << this;
this->~inproc_transport();
gpr_free(this);
}
@ -196,8 +194,9 @@ struct inproc_stream {
// Pass the client-side stream address to the server-side for a ref
ref("inproc_init_stream:clt"); // ref it now on behalf of server
// side to avoid destruction
INPROC_INFO_LOG << "calling accept stream cb " << st->accept_stream_cb
<< " " << st->accept_stream_data;
GRPC_TRACE_LOG(inproc, INFO)
<< "calling accept stream cb " << st->accept_stream_cb << " "
<< st->accept_stream_data;
(*st->accept_stream_cb)(st->accept_stream_data, t, this);
} else {
// This is the server-side and is being called through accept_stream_cb
@ -246,12 +245,12 @@ struct inproc_stream {
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
#endif
void ref(const char* reason) {
INPROC_INFO_LOG << "ref_stream " << this << " " << reason;
GRPC_TRACE_LOG(inproc, INFO) << "ref_stream " << this << " " << reason;
STREAM_REF(refs, reason);
}
void unref(const char* reason) {
INPROC_INFO_LOG << "unref_stream " << this << " " << reason;
GRPC_TRACE_LOG(inproc, INFO) << "unref_stream " << this << " " << reason;
STREAM_UNREF(refs, reason);
}
#undef STREAM_REF
@ -367,7 +366,8 @@ void inproc_transport::InitStream(grpc_stream* gs,
grpc_stream_refcount* refcount,
const void* server_data,
grpc_core::Arena* arena) {
INPROC_INFO_LOG << "init_stream " << this << " " << gs << " " << server_data;
GRPC_TRACE_LOG(inproc, INFO)
<< "init_stream " << this << " " << gs << " " << server_data;
new (gs) inproc_stream(this, refcount, server_data, arena);
}
@ -429,8 +429,9 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_INFO_LOG << msg << " " << s << " " << op << " " << op->on_complete
<< " " << grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< msg << " " << s << " " << op << " " << op->on_complete << " "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
}
}
@ -443,7 +444,7 @@ void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
}
void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
INPROC_INFO_LOG << "op_state_machine " << s << " fail_helper";
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s << " fail_helper";
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if (!s->trailing_md_sent) {
@ -495,10 +496,10 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
*s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available = true;
}
INPROC_INFO_LOG << "fail_helper " << s
<< " scheduling initial-metadata-ready "
<< grpc_core::StatusToString(error) << " "
<< grpc_core::StatusToString(err);
GRPC_TRACE_LOG(inproc, INFO)
<< "fail_helper " << s << " scheduling initial-metadata-ready "
<< grpc_core::StatusToString(error) << " "
<< grpc_core::StatusToString(err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
@ -512,8 +513,9 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
s->recv_initial_md_op = nullptr;
}
if (s->recv_message_op) {
INPROC_INFO_LOG << "fail_helper " << s << " scheduling message-ready "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< "fail_helper " << s << " scheduling message-ready "
<< grpc_core::StatusToString(error);
if (s->recv_message_op->payload->recv_message
.call_failed_before_recv_message != nullptr) {
*s->recv_message_op->payload->recv_message
@ -541,17 +543,17 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
INPROC_INFO_LOG << "fail_helper " << s
<< " scheduling trailing-metadata-ready "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< "fail_helper " << s << " scheduling trailing-metadata-ready "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
error);
INPROC_INFO_LOG << "fail_helper " << s
<< " scheduling trailing-md-on-complete "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< "fail_helper " << s << " scheduling trailing-md-on-complete "
<< grpc_core::StatusToString(error);
complete_if_batch_end_locked(
s, error, s->recv_trailing_md_op,
"fail_helper scheduling recv-trailing-metadata-on-complete");
@ -575,8 +577,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
*receiver->recv_message_op->payload->recv_message.flags =
sender->send_message_op->payload->send_message.flags;
INPROC_INFO_LOG << "message_transfer_locked " << receiver
<< " scheduling message-ready";
GRPC_TRACE_LOG(inproc, INFO)
<< "message_transfer_locked " << receiver << " scheduling message-ready";
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
receiver->recv_message_op->payload->recv_message.recv_message_ready,
@ -602,7 +604,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
bool needs_close = false;
INPROC_INFO_LOG << "op_state_machine " << s;
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s;
// cancellation takes precedence
inproc_stream* other = s->other_side;
@ -649,7 +651,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
: &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error!
INPROC_INFO_LOG << "Extra trailing metadata " << s;
GRPC_TRACE_LOG(inproc, INFO) << "Extra trailing metadata " << s;
new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
fail_helper_locked(s, new_err);
goto done;
@ -665,15 +667,15 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
*s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
}
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_INFO_LOG << "op_state_machine " << s
<< " scheduling trailing-metadata-ready";
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s
<< " scheduling trailing-metadata-ready";
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
absl::OkStatus());
INPROC_INFO_LOG << "op_state_machine " << s
<< " scheduling trailing-md-on-complete";
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s
<< " scheduling trailing-md-on-complete";
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
absl::OkStatus());
@ -690,10 +692,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->recv_initial_md_op) {
if (s->initial_md_recvd) {
new_err = GRPC_ERROR_CREATE("Already recvd initial md");
INPROC_INFO_LOG << "op_state_machine " << s
<< " scheduling on_complete errors for already "
"recvd initial md "
<< grpc_core::StatusToString(new_err);
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s
<< " scheduling on_complete errors for already "
"recvd initial md "
<< grpc_core::StatusToString(new_err);
fail_helper_locked(s, new_err);
goto done;
}
@ -744,15 +747,16 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
if (s->trailing_md_recvd_implicit_only) {
INPROC_INFO_LOG << "op_state_machine " << s
<< " already implicitly received trailing metadata, so "
"ignoring new trailing metadata from client";
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s
<< " already implicitly received trailing metadata, so "
"ignoring new trailing metadata from client";
s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false;
s->trailing_md_recvd_implicit_only = false;
} else {
new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
INPROC_INFO_LOG
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s
<< " scheduling on_complete errors for already recvd trailing "
"md "
@ -765,8 +769,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
// This message needs to be wrapped up because it will never be
// satisfied
s->recv_message_op->payload->recv_message.recv_message->reset();
INPROC_INFO_LOG << "op_state_machine " << s
<< " scheduling message-ready";
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " scheduling message-ready";
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
@ -817,8 +821,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
needs_close = s->trailing_md_sent;
}
} else if (!s->trailing_md_recvd) {
INPROC_INFO_LOG << "op_state_machine " << s
<< " has trailing md but not yet waiting for it";
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s
<< " has trailing md but not yet waiting for it";
}
}
if (!s->t->is_client && s->trailing_md_sent &&
@ -826,9 +831,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
// In this case, we don't care to receive the write-close from the client
// because we have already sent status and the RPC is over as far as we
// are concerned.
INPROC_INFO_LOG << "op_state_machine " << s
<< " scheduling trailing-md-ready "
<< grpc_core::StatusToString(new_err);
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " scheduling trailing-md-ready "
<< grpc_core::StatusToString(new_err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
@ -846,7 +851,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
INPROC_INFO_LOG << "op_state_machine " << s << " scheduling message-ready";
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " scheduling message-ready";
s->recv_message_op->payload->recv_message.recv_message->reset();
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
@ -869,10 +875,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get rescheduled
INPROC_INFO_LOG << "op_state_machine " << s << " still needs closure "
<< s->send_message_op << " " << s->send_trailing_md_op
<< " " << s->recv_initial_md_op << " " << s->recv_message_op
<< " " << s->recv_trailing_md_op;
GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " still needs closure "
<< s->send_message_op << " " << s->send_trailing_md_op << " "
<< s->recv_initial_md_op << " " << s->recv_message_op << " "
<< s->recv_trailing_md_op;
s->ops_needed = true;
}
done:
@ -884,8 +891,8 @@ done:
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
bool ret = false; // was the cancel accepted
INPROC_INFO_LOG << "cancel_stream " << s << " with "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< "cancel_stream " << s << " with " << grpc_core::StatusToString(error);
if (s->cancel_self_error.ok()) {
ret = true;
s->cancel_self_error = error;
@ -938,7 +945,8 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
void inproc_transport::PerformStreamOp(grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_INFO_LOG << "perform_stream_op " << this << " " << gs << " " << op;
GRPC_TRACE_LOG(inproc, INFO)
<< "perform_stream_op " << this << " " << gs << " " << op;
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
gpr_mu_lock(mu);
@ -974,7 +982,7 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
// already self-canceled so still give it an error
error = s->cancel_self_error;
} else {
INPROC_INFO_LOG
GRPC_TRACE_LOG(inproc, INFO)
<< "perform_stream_op " << s
<< (s->t->is_client ? " client" : " server")
<< (op->send_initial_metadata ? " send_initial_metadata" : "")
@ -998,7 +1006,7 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
: &other->to_read_initial_md_filled;
if (*destfilled || s->initial_md_sent) {
// The buffer is already in use; that's an error!
INPROC_INFO_LOG << "Extra initial metadata " << s;
GRPC_TRACE_LOG(inproc, INFO) << "Extra initial metadata " << s;
error = GRPC_ERROR_CREATE("Extra initial metadata");
} else {
if (!s->other_side_closed) {
@ -1076,18 +1084,18 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
*op->payload->recv_initial_metadata.trailing_metadata_available =
true;
}
INPROC_INFO_LOG << "perform_stream_op error " << s
<< " scheduling initial-metadata-ready "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
<< " scheduling initial-metadata-ready "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
error);
}
if (op->recv_message) {
INPROC_INFO_LOG << "perform_stream_op error " << s
<< " scheduling recv message-ready "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
<< " scheduling recv message-ready "
<< grpc_core::StatusToString(error);
if (op->payload->recv_message.call_failed_before_recv_message !=
nullptr) {
*op->payload->recv_message.call_failed_before_recv_message = true;
@ -1097,24 +1105,26 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
error);
}
if (op->recv_trailing_metadata) {
INPROC_INFO_LOG << "perform_stream_op error " << s
<< " scheduling trailing-metadata-ready "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
<< " scheduling trailing-metadata-ready "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error);
}
}
INPROC_INFO_LOG << "perform_stream_op " << s << " scheduling on_complete "
<< grpc_core::StatusToString(error);
GRPC_TRACE_LOG(inproc, INFO)
<< "perform_stream_op " << s << " scheduling on_complete "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
}
gpr_mu_unlock(mu);
}
void close_transport_locked(inproc_transport* t) {
INPROC_INFO_LOG << "close_transport " << t << " " << t->is_closed;
GRPC_TRACE_LOG(inproc, INFO)
<< "close_transport " << t << " " << t->is_closed;
t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close transport");
@ -1133,7 +1143,7 @@ void close_transport_locked(inproc_transport* t) {
}
void inproc_transport::PerformOp(grpc_transport_op* op) {
INPROC_INFO_LOG << "perform_transport_op " << this << " " << op;
GRPC_TRACE_LOG(inproc, INFO) << "perform_transport_op " << this << " " << op;
gpr_mu_lock(&mu->mu);
if (op->start_connectivity_watch != nullptr) {
state_tracker.AddWatcher(op->start_connectivity_watch_state,
@ -1167,7 +1177,8 @@ void inproc_transport::PerformOp(grpc_transport_op* op) {
void inproc_transport::DestroyStream(grpc_stream* gs,
grpc_closure* then_schedule_closure) {
INPROC_INFO_LOG << "destroy_stream " << gs << " " << then_schedule_closure;
GRPC_TRACE_LOG(inproc, INFO)
<< "destroy_stream " << gs << " " << then_schedule_closure;
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu_lock(&mu->mu);
close_stream_locked(s);
@ -1178,7 +1189,7 @@ void inproc_transport::DestroyStream(grpc_stream* gs,
}
void inproc_transport::Orphan() {
INPROC_INFO_LOG << "destroy_transport " << this;
GRPC_TRACE_LOG(inproc, INFO) << "destroy_transport " << this;
gpr_mu_lock(&mu->mu);
close_transport_locked(this);
gpr_mu_unlock(&mu->mu);
@ -1211,7 +1222,7 @@ void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/,
//
void inproc_transports_create(grpc_core::Transport** server_transport,
grpc_core::Transport** client_transport) {
INPROC_INFO_LOG << "inproc_transports_create";
GRPC_TRACE_LOG(inproc, INFO) << "inproc_transports_create";
shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
inproc_transport* st =
new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false);

Loading…
Cancel
Save