|
|
|
@ -40,7 +40,6 @@ |
|
|
|
|
#include <grpc/impl/connectivity_state.h> |
|
|
|
|
#include <grpc/status.h> |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/port_platform.h> |
|
|
|
|
#include <grpc/support/sync.h> |
|
|
|
|
|
|
|
|
@ -69,13 +68,6 @@ |
|
|
|
|
#include "src/core/lib/transport/transport.h" |
|
|
|
|
#include "src/core/server/server.h" |
|
|
|
|
|
|
|
|
|
#define INPROC_LOG(...) \ |
|
|
|
|
do { \
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(inproc)) { \
|
|
|
|
|
gpr_log(__VA_ARGS__); \
|
|
|
|
|
} \
|
|
|
|
|
} while (0) |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
struct inproc_stream; |
|
|
|
|
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error); |
|
|
|
@ -149,16 +141,16 @@ struct inproc_transport final : public grpc_core::FilterStackTransport { |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
void ref() { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_transport %p", this); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "ref_transport " << this; |
|
|
|
|
gpr_ref(&refs); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void unref() { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_transport %p", this); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "unref_transport " << this; |
|
|
|
|
if (!gpr_unref(&refs)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "really_destroy_transport " << this; |
|
|
|
|
this->~inproc_transport(); |
|
|
|
|
gpr_free(this); |
|
|
|
|
} |
|
|
|
@ -202,8 +194,9 @@ struct inproc_stream { |
|
|
|
|
// Pass the client-side stream address to the server-side for a ref
|
|
|
|
|
ref("inproc_init_stream:clt"); // ref it now on behalf of server
|
|
|
|
|
// side to avoid destruction
|
|
|
|
|
INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", |
|
|
|
|
st->accept_stream_cb, st->accept_stream_data); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "calling accept stream cb " << st->accept_stream_cb << " " |
|
|
|
|
<< st->accept_stream_data; |
|
|
|
|
(*st->accept_stream_cb)(st->accept_stream_data, t, this); |
|
|
|
|
} else { |
|
|
|
|
// This is the server-side and is being called through accept_stream_cb
|
|
|
|
@ -252,12 +245,12 @@ struct inproc_stream { |
|
|
|
|
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) |
|
|
|
|
#endif |
|
|
|
|
void ref(const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "ref_stream " << this << " " << reason; |
|
|
|
|
STREAM_REF(refs, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void unref(const char* reason) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "unref_stream " << this << " " << reason; |
|
|
|
|
STREAM_UNREF(refs, reason); |
|
|
|
|
} |
|
|
|
|
#undef STREAM_REF |
|
|
|
@ -373,7 +366,8 @@ void inproc_transport::InitStream(grpc_stream* gs, |
|
|
|
|
grpc_stream_refcount* refcount, |
|
|
|
|
const void* server_data, |
|
|
|
|
grpc_core::Arena* arena) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "init_stream %p %p %p", this, gs, server_data); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "init_stream " << this << " " << gs << " " << server_data; |
|
|
|
|
new (gs) inproc_stream(this, refcount, server_data, arena); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -435,8 +429,9 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error, |
|
|
|
|
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); |
|
|
|
|
|
|
|
|
|
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete, |
|
|
|
|
grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< msg << " " << s << " " << op << " " << op->on_complete << " " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -449,7 +444,7 @@ void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s << " fail_helper"; |
|
|
|
|
// If we're failing this side, we need to make sure that
|
|
|
|
|
// we also send or have already sent trailing metadata
|
|
|
|
|
if (!s->trailing_md_sent) { |
|
|
|
@ -501,10 +496,10 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
*s->recv_initial_md_op->payload->recv_initial_metadata |
|
|
|
|
.trailing_metadata_available = true; |
|
|
|
|
} |
|
|
|
|
INPROC_LOG(GPR_INFO, |
|
|
|
|
"fail_helper %p scheduling initial-metadata-ready %s %s", s, |
|
|
|
|
grpc_core::StatusToString(error).c_str(), |
|
|
|
|
grpc_core::StatusToString(err).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "fail_helper " << s << " scheduling initial-metadata-ready " |
|
|
|
|
<< grpc_core::StatusToString(error) << " " |
|
|
|
|
<< grpc_core::StatusToString(err); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
s->recv_initial_md_op->payload->recv_initial_metadata |
|
|
|
@ -518,8 +513,9 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
s->recv_initial_md_op = nullptr; |
|
|
|
|
} |
|
|
|
|
if (s->recv_message_op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s, |
|
|
|
|
grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "fail_helper " << s << " scheduling message-ready " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
if (s->recv_message_op->payload->recv_message |
|
|
|
|
.call_failed_before_recv_message != nullptr) { |
|
|
|
|
*s->recv_message_op->payload->recv_message |
|
|
|
@ -547,15 +543,17 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
s->send_trailing_md_op = nullptr; |
|
|
|
|
} |
|
|
|
|
if (s->recv_trailing_md_op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s", |
|
|
|
|
s, grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "fail_helper " << s << " scheduling trailing-metadata-ready " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
s->recv_trailing_md_op->payload->recv_trailing_metadata |
|
|
|
|
.recv_trailing_metadata_ready, |
|
|
|
|
error); |
|
|
|
|
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s", |
|
|
|
|
s, grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "fail_helper " << s << " scheduling trailing-md-on-complete " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
complete_if_batch_end_locked( |
|
|
|
|
s, error, s->recv_trailing_md_op, |
|
|
|
|
"fail_helper scheduling recv-trailing-metadata-on-complete"); |
|
|
|
@ -579,8 +577,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { |
|
|
|
|
*receiver->recv_message_op->payload->recv_message.flags = |
|
|
|
|
sender->send_message_op->payload->send_message.flags; |
|
|
|
|
|
|
|
|
|
INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready", |
|
|
|
|
receiver); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "message_transfer_locked " << receiver << " scheduling message-ready"; |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
receiver->recv_message_op->payload->recv_message.recv_message_ready, |
|
|
|
@ -606,7 +604,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
|
|
|
|
|
bool needs_close = false; |
|
|
|
|
|
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s; |
|
|
|
|
// cancellation takes precedence
|
|
|
|
|
inproc_stream* other = s->other_side; |
|
|
|
|
|
|
|
|
@ -653,7 +651,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
: &other->to_read_trailing_md_filled; |
|
|
|
|
if (*destfilled || s->trailing_md_sent) { |
|
|
|
|
// The buffer is already in use; that's an error!
|
|
|
|
|
INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "Extra trailing metadata " << s; |
|
|
|
|
new_err = GRPC_ERROR_CREATE("Extra trailing metadata"); |
|
|
|
|
fail_helper_locked(s, new_err); |
|
|
|
|
goto done; |
|
|
|
@ -669,15 +667,15 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
*s->send_trailing_md_op->payload->send_trailing_metadata.sent = true; |
|
|
|
|
} |
|
|
|
|
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, |
|
|
|
|
"op_state_machine %p scheduling trailing-metadata-ready", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s |
|
|
|
|
<< " scheduling trailing-metadata-ready"; |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
s->recv_trailing_md_op->payload->recv_trailing_metadata |
|
|
|
|
.recv_trailing_metadata_ready, |
|
|
|
|
absl::OkStatus()); |
|
|
|
|
INPROC_LOG(GPR_INFO, |
|
|
|
|
"op_state_machine %p scheduling trailing-md-on-complete", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s |
|
|
|
|
<< " scheduling trailing-md-on-complete"; |
|
|
|
|
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
|
|
|
|
s->recv_trailing_md_op->on_complete, |
|
|
|
|
absl::OkStatus()); |
|
|
|
@ -694,11 +692,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
if (s->recv_initial_md_op) { |
|
|
|
|
if (s->initial_md_recvd) { |
|
|
|
|
new_err = GRPC_ERROR_CREATE("Already recvd initial md"); |
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"op_state_machine %p scheduling on_complete errors for already " |
|
|
|
|
"recvd initial md %s", |
|
|
|
|
s, grpc_core::StatusToString(new_err).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s |
|
|
|
|
<< " scheduling on_complete errors for already " |
|
|
|
|
"recvd initial md " |
|
|
|
|
<< grpc_core::StatusToString(new_err); |
|
|
|
|
fail_helper_locked(s, new_err); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
@ -749,20 +747,20 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
if (s->to_read_trailing_md_filled) { |
|
|
|
|
if (s->trailing_md_recvd) { |
|
|
|
|
if (s->trailing_md_recvd_implicit_only) { |
|
|
|
|
INPROC_LOG(GPR_INFO, |
|
|
|
|
"op_state_machine %p already implicitly received trailing " |
|
|
|
|
"metadata, so ignoring new trailing metadata from client", |
|
|
|
|
s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s |
|
|
|
|
<< " already implicitly received trailing metadata, so " |
|
|
|
|
"ignoring new trailing metadata from client"; |
|
|
|
|
s->to_read_trailing_md.Clear(); |
|
|
|
|
s->to_read_trailing_md_filled = false; |
|
|
|
|
s->trailing_md_recvd_implicit_only = false; |
|
|
|
|
} else { |
|
|
|
|
new_err = GRPC_ERROR_CREATE("Already recvd trailing md"); |
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"op_state_machine %p scheduling on_complete errors for already " |
|
|
|
|
"recvd trailing md %s", |
|
|
|
|
s, grpc_core::StatusToString(new_err).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s |
|
|
|
|
<< " scheduling on_complete errors for already recvd trailing " |
|
|
|
|
"md " |
|
|
|
|
<< grpc_core::StatusToString(new_err); |
|
|
|
|
fail_helper_locked(s, new_err); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
@ -771,7 +769,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
// This message needs to be wrapped up because it will never be
|
|
|
|
|
// satisfied
|
|
|
|
|
s->recv_message_op->payload->recv_message.recv_message->reset(); |
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s << " scheduling message-ready"; |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
s->recv_message_op->payload->recv_message.recv_message_ready, |
|
|
|
@ -822,9 +821,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
needs_close = s->trailing_md_sent; |
|
|
|
|
} |
|
|
|
|
} else if (!s->trailing_md_recvd) { |
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"op_state_machine %p has trailing md but not yet waiting for it", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s |
|
|
|
|
<< " has trailing md but not yet waiting for it"; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!s->t->is_client && s->trailing_md_sent && |
|
|
|
@ -832,8 +831,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
// In this case, we don't care to receive the write-close from the client
|
|
|
|
|
// because we have already sent status and the RPC is over as far as we
|
|
|
|
|
// are concerned.
|
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s", |
|
|
|
|
s, grpc_core::StatusToString(new_err).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s << " scheduling trailing-md-ready " |
|
|
|
|
<< grpc_core::StatusToString(new_err); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
s->recv_trailing_md_op->payload->recv_trailing_metadata |
|
|
|
@ -851,7 +851,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
if (s->trailing_md_recvd && s->recv_message_op) { |
|
|
|
|
// No further message will come on this stream, so finish off the
|
|
|
|
|
// recv_message_op
|
|
|
|
|
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s << " scheduling message-ready"; |
|
|
|
|
s->recv_message_op->payload->recv_message.recv_message->reset(); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
@ -873,12 +874,12 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
} |
|
|
|
|
if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || |
|
|
|
|
s->recv_message_op || s->recv_trailing_md_op) { |
|
|
|
|
// Didn't get the item we wanted so we still need to get
|
|
|
|
|
// rescheduled
|
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s, |
|
|
|
|
s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, |
|
|
|
|
s->recv_message_op, s->recv_trailing_md_op); |
|
|
|
|
// Didn't get the item we wanted so we still need to get rescheduled
|
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "op_state_machine " << s << " still needs closure " |
|
|
|
|
<< s->send_message_op << " " << s->send_trailing_md_op << " " |
|
|
|
|
<< s->recv_initial_md_op << " " << s->recv_message_op << " " |
|
|
|
|
<< s->recv_trailing_md_op; |
|
|
|
|
s->ops_needed = true; |
|
|
|
|
} |
|
|
|
|
done: |
|
|
|
@ -890,8 +891,8 @@ done: |
|
|
|
|
|
|
|
|
|
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
bool ret = false; // was the cancel accepted
|
|
|
|
|
INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, |
|
|
|
|
grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "cancel_stream " << s << " with " << grpc_core::StatusToString(error); |
|
|
|
|
if (s->cancel_self_error.ok()) { |
|
|
|
|
ret = true; |
|
|
|
|
s->cancel_self_error = error; |
|
|
|
@ -944,7 +945,8 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { |
|
|
|
|
|
|
|
|
|
void inproc_transport::PerformStreamOp(grpc_stream* gs, |
|
|
|
|
grpc_transport_stream_op_batch* op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", this, gs, op); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "perform_stream_op " << this << " " << gs << " " << op; |
|
|
|
|
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
|
|
|
|
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
|
|
|
|
|
gpr_mu_lock(mu); |
|
|
|
@ -980,14 +982,15 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, |
|
|
|
|
// already self-canceled so still give it an error
|
|
|
|
|
error = s->cancel_self_error; |
|
|
|
|
} else { |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s, |
|
|
|
|
s->t->is_client ? "client" : "server", |
|
|
|
|
op->send_initial_metadata ? " send_initial_metadata" : "", |
|
|
|
|
op->send_message ? " send_message" : "", |
|
|
|
|
op->send_trailing_metadata ? " send_trailing_metadata" : "", |
|
|
|
|
op->recv_initial_metadata ? " recv_initial_metadata" : "", |
|
|
|
|
op->recv_message ? " recv_message" : "", |
|
|
|
|
op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "perform_stream_op " << s |
|
|
|
|
<< (s->t->is_client ? " client" : " server") |
|
|
|
|
<< (op->send_initial_metadata ? " send_initial_metadata" : "") |
|
|
|
|
<< (op->send_message ? " send_message" : "") |
|
|
|
|
<< (op->send_trailing_metadata ? " send_trailing_metadata" : "") |
|
|
|
|
<< (op->recv_initial_metadata ? " recv_initial_metadata" : "") |
|
|
|
|
<< (op->recv_message ? " recv_message" : "") |
|
|
|
|
<< (op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inproc_stream* other = s->other_side; |
|
|
|
@ -1003,7 +1006,7 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, |
|
|
|
|
: &other->to_read_initial_md_filled; |
|
|
|
|
if (*destfilled || s->initial_md_sent) { |
|
|
|
|
// The buffer is already in use; that's an error!
|
|
|
|
|
INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "Extra initial metadata " << s; |
|
|
|
|
error = GRPC_ERROR_CREATE("Extra initial metadata"); |
|
|
|
|
} else { |
|
|
|
|
if (!s->other_side_closed) { |
|
|
|
@ -1081,20 +1084,18 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, |
|
|
|
|
*op->payload->recv_initial_metadata.trailing_metadata_available = |
|
|
|
|
true; |
|
|
|
|
} |
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"perform_stream_op error %p scheduling initial-metadata-ready %s", |
|
|
|
|
s, grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s |
|
|
|
|
<< " scheduling initial-metadata-ready " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
|
|
|
|
error); |
|
|
|
|
} |
|
|
|
|
if (op->recv_message) { |
|
|
|
|
INPROC_LOG( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"perform_stream_op error %p scheduling recv message-ready %s", s, |
|
|
|
|
grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s |
|
|
|
|
<< " scheduling recv message-ready " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
if (op->payload->recv_message.call_failed_before_recv_message != |
|
|
|
|
nullptr) { |
|
|
|
|
*op->payload->recv_message.call_failed_before_recv_message = true; |
|
|
|
@ -1104,25 +1105,27 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs, |
|
|
|
|
error); |
|
|
|
|
} |
|
|
|
|
if (op->recv_trailing_metadata) { |
|
|
|
|
INPROC_LOG(GPR_INFO, |
|
|
|
|
"perform_stream_op error %p scheduling " |
|
|
|
|
"trailing-metadata-ready %s", |
|
|
|
|
s, grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s |
|
|
|
|
<< " scheduling trailing-metadata-ready " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
|
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, |
|
|
|
|
error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s, |
|
|
|
|
grpc_core::StatusToString(error).c_str()); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "perform_stream_op " << s << " scheduling on_complete " |
|
|
|
|
<< grpc_core::StatusToString(error); |
|
|
|
|
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void close_transport_locked(inproc_transport* t) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "close_transport " << t << " " << t->is_closed; |
|
|
|
|
|
|
|
|
|
t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(), |
|
|
|
|
"close transport"); |
|
|
|
|
if (!t->is_closed) { |
|
|
|
@ -1140,7 +1143,7 @@ void close_transport_locked(inproc_transport* t) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void inproc_transport::PerformOp(grpc_transport_op* op) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", this, op); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "perform_transport_op " << this << " " << op; |
|
|
|
|
gpr_mu_lock(&mu->mu); |
|
|
|
|
if (op->start_connectivity_watch != nullptr) { |
|
|
|
|
state_tracker.AddWatcher(op->start_connectivity_watch_state, |
|
|
|
@ -1174,7 +1177,8 @@ void inproc_transport::PerformOp(grpc_transport_op* op) { |
|
|
|
|
|
|
|
|
|
void inproc_transport::DestroyStream(grpc_stream* gs, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) |
|
|
|
|
<< "destroy_stream " << gs << " " << then_schedule_closure; |
|
|
|
|
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); |
|
|
|
|
gpr_mu_lock(&mu->mu); |
|
|
|
|
close_stream_locked(s); |
|
|
|
@ -1185,7 +1189,7 @@ void inproc_transport::DestroyStream(grpc_stream* gs, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void inproc_transport::Orphan() { |
|
|
|
|
INPROC_LOG(GPR_INFO, "destroy_transport %p", this); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "destroy_transport " << this; |
|
|
|
|
gpr_mu_lock(&mu->mu); |
|
|
|
|
close_transport_locked(this); |
|
|
|
|
gpr_mu_unlock(&mu->mu); |
|
|
|
@ -1218,7 +1222,7 @@ void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/, |
|
|
|
|
//
|
|
|
|
|
void inproc_transports_create(grpc_core::Transport** server_transport, |
|
|
|
|
grpc_core::Transport** client_transport) { |
|
|
|
|
INPROC_LOG(GPR_INFO, "inproc_transports_create"); |
|
|
|
|
GRPC_TRACE_LOG(inproc, INFO) << "inproc_transports_create"; |
|
|
|
|
shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); |
|
|
|
|
inproc_transport* st = |
|
|
|
|
new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false); |
|
|
|
|