Merge Master

pull/37190/head
tanvi-jagtap 8 months ago
commit 6dae72bc6e
  1. 194
      src/core/ext/transport/inproc/legacy_inproc_transport.cc
  2. 6
      test/distrib/ruby/run_distrib_test.sh
  3. 21
      tools/run_tests/artifacts/distribtest_targets.py

@ -40,7 +40,6 @@
#include <grpc/impl/connectivity_state.h> #include <grpc/impl/connectivity_state.h>
#include <grpc/status.h> #include <grpc/status.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
@ -68,13 +67,6 @@
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h" #include "src/core/server/server.h"
#define INPROC_LOG(...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(inproc)) { \
gpr_log(__VA_ARGS__); \
} \
} while (0)
namespace { namespace {
struct inproc_stream; struct inproc_stream;
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error); bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
@ -148,16 +140,16 @@ struct inproc_transport final : public grpc_core::FilterStackTransport {
void Orphan() override; void Orphan() override;
void ref() { void ref() {
INPROC_LOG(GPR_INFO, "ref_transport %p", this); GRPC_TRACE_LOG(inproc, INFO) << "ref_transport " << this;
gpr_ref(&refs); gpr_ref(&refs);
} }
void unref() { void unref() {
INPROC_LOG(GPR_INFO, "unref_transport %p", this); GRPC_TRACE_LOG(inproc, INFO) << "unref_transport " << this;
if (!gpr_unref(&refs)) { if (!gpr_unref(&refs)) {
return; return;
} }
INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); GRPC_TRACE_LOG(inproc, INFO) << "really_destroy_transport " << this;
this->~inproc_transport(); this->~inproc_transport();
gpr_free(this); gpr_free(this);
} }
@ -201,8 +193,9 @@ struct inproc_stream {
// Pass the client-side stream address to the server-side for a ref // Pass the client-side stream address to the server-side for a ref
ref("inproc_init_stream:clt"); // ref it now on behalf of server ref("inproc_init_stream:clt"); // ref it now on behalf of server
// side to avoid destruction // side to avoid destruction
INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", GRPC_TRACE_LOG(inproc, INFO)
st->accept_stream_cb, st->accept_stream_data); << "calling accept stream cb " << st->accept_stream_cb << " "
<< st->accept_stream_data;
(*st->accept_stream_cb)(st->accept_stream_data, t, this); (*st->accept_stream_cb)(st->accept_stream_data, t, this);
} else { } else {
// This is the server-side and is being called through accept_stream_cb // This is the server-side and is being called through accept_stream_cb
@ -251,12 +244,12 @@ struct inproc_stream {
#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
#endif #endif
void ref(const char* reason) { void ref(const char* reason) {
INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); GRPC_TRACE_LOG(inproc, INFO) << "ref_stream " << this << " " << reason;
STREAM_REF(refs, reason); STREAM_REF(refs, reason);
} }
void unref(const char* reason) { void unref(const char* reason) {
INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); GRPC_TRACE_LOG(inproc, INFO) << "unref_stream " << this << " " << reason;
STREAM_UNREF(refs, reason); STREAM_UNREF(refs, reason);
} }
#undef STREAM_REF #undef STREAM_REF
@ -372,7 +365,8 @@ void inproc_transport::InitStream(grpc_stream* gs,
grpc_stream_refcount* refcount, grpc_stream_refcount* refcount,
const void* server_data, const void* server_data,
grpc_core::Arena* arena) { grpc_core::Arena* arena) {
INPROC_LOG(GPR_INFO, "init_stream %p %p %p", this, gs, server_data); GRPC_TRACE_LOG(inproc, INFO)
<< "init_stream " << this << " " << gs << " " << server_data;
new (gs) inproc_stream(this, refcount, server_data, arena); new (gs) inproc_stream(this, refcount, server_data, arena);
} }
@ -434,8 +428,9 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete, GRPC_TRACE_LOG(inproc, INFO)
grpc_core::StatusToString(error).c_str()); << msg << " " << s << " " << op << " " << op->on_complete << " "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error); grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
} }
} }
@ -448,7 +443,7 @@ void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
} }
void fail_helper_locked(inproc_stream* s, grpc_error_handle error) { void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s << " fail_helper";
// If we're failing this side, we need to make sure that // If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata // we also send or have already sent trailing metadata
if (!s->trailing_md_sent) { if (!s->trailing_md_sent) {
@ -500,10 +495,10 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
*s->recv_initial_md_op->payload->recv_initial_metadata *s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available = true; .trailing_metadata_available = true;
} }
INPROC_LOG(GPR_INFO, GRPC_TRACE_LOG(inproc, INFO)
"fail_helper %p scheduling initial-metadata-ready %s %s", s, << "fail_helper " << s << " scheduling initial-metadata-ready "
grpc_core::StatusToString(error).c_str(), << grpc_core::StatusToString(error) << " "
grpc_core::StatusToString(err).c_str()); << grpc_core::StatusToString(err);
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata s->recv_initial_md_op->payload->recv_initial_metadata
@ -517,8 +512,9 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
s->recv_initial_md_op = nullptr; s->recv_initial_md_op = nullptr;
} }
if (s->recv_message_op) { if (s->recv_message_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s, GRPC_TRACE_LOG(inproc, INFO)
grpc_core::StatusToString(error).c_str()); << "fail_helper " << s << " scheduling message-ready "
<< grpc_core::StatusToString(error);
if (s->recv_message_op->payload->recv_message if (s->recv_message_op->payload->recv_message
.call_failed_before_recv_message != nullptr) { .call_failed_before_recv_message != nullptr) {
*s->recv_message_op->payload->recv_message *s->recv_message_op->payload->recv_message
@ -546,15 +542,17 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
s->send_trailing_md_op = nullptr; s->send_trailing_md_op = nullptr;
} }
if (s->recv_trailing_md_op) { if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s", GRPC_TRACE_LOG(inproc, INFO)
s, grpc_core::StatusToString(error).c_str()); << "fail_helper " << s << " scheduling trailing-metadata-ready "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready, .recv_trailing_metadata_ready,
error); error);
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s", GRPC_TRACE_LOG(inproc, INFO)
s, grpc_core::StatusToString(error).c_str()); << "fail_helper " << s << " scheduling trailing-md-on-complete "
<< grpc_core::StatusToString(error);
complete_if_batch_end_locked( complete_if_batch_end_locked(
s, error, s->recv_trailing_md_op, s, error, s->recv_trailing_md_op,
"fail_helper scheduling recv-trailing-metadata-on-complete"); "fail_helper scheduling recv-trailing-metadata-on-complete");
@ -578,8 +576,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
*receiver->recv_message_op->payload->recv_message.flags = *receiver->recv_message_op->payload->recv_message.flags =
sender->send_message_op->payload->send_message.flags; sender->send_message_op->payload->send_message.flags;
INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready", GRPC_TRACE_LOG(inproc, INFO)
receiver); << "message_transfer_locked " << receiver << " scheduling message-ready";
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
receiver->recv_message_op->payload->recv_message.recv_message_ready, receiver->recv_message_op->payload->recv_message.recv_message_ready,
@ -605,7 +603,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
bool needs_close = false; bool needs_close = false;
INPROC_LOG(GPR_INFO, "op_state_machine %p", s); GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s;
// cancellation takes precedence // cancellation takes precedence
inproc_stream* other = s->other_side; inproc_stream* other = s->other_side;
@ -652,7 +650,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
: &other->to_read_trailing_md_filled; : &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) { if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error! // The buffer is already in use; that's an error!
INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s); GRPC_TRACE_LOG(inproc, INFO) << "Extra trailing metadata " << s;
new_err = GRPC_ERROR_CREATE("Extra trailing metadata"); new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
fail_helper_locked(s, new_err); fail_helper_locked(s, new_err);
goto done; goto done;
@ -668,15 +666,15 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
*s->send_trailing_md_op->payload->send_trailing_metadata.sent = true; *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
} }
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s
"op_state_machine %p scheduling trailing-metadata-ready", s); << " scheduling trailing-metadata-ready";
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready, .recv_trailing_metadata_ready,
absl::OkStatus()); absl::OkStatus());
INPROC_LOG(GPR_INFO, GRPC_TRACE_LOG(inproc, INFO) << "op_state_machine " << s
"op_state_machine %p scheduling trailing-md-on-complete", s); << " scheduling trailing-md-on-complete";
grpc_core::ExecCtx::Run(DEBUG_LOCATION, grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete, s->recv_trailing_md_op->on_complete,
absl::OkStatus()); absl::OkStatus());
@ -693,11 +691,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->recv_initial_md_op) { if (s->recv_initial_md_op) {
if (s->initial_md_recvd) { if (s->initial_md_recvd) {
new_err = GRPC_ERROR_CREATE("Already recvd initial md"); new_err = GRPC_ERROR_CREATE("Already recvd initial md");
INPROC_LOG( GRPC_TRACE_LOG(inproc, INFO)
GPR_INFO, << "op_state_machine " << s
"op_state_machine %p scheduling on_complete errors for already " << " scheduling on_complete errors for already "
"recvd initial md %s", "recvd initial md "
s, grpc_core::StatusToString(new_err).c_str()); << grpc_core::StatusToString(new_err);
fail_helper_locked(s, new_err); fail_helper_locked(s, new_err);
goto done; goto done;
} }
@ -748,20 +746,20 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->to_read_trailing_md_filled) { if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) { if (s->trailing_md_recvd) {
if (s->trailing_md_recvd_implicit_only) { if (s->trailing_md_recvd_implicit_only) {
INPROC_LOG(GPR_INFO, GRPC_TRACE_LOG(inproc, INFO)
"op_state_machine %p already implicitly received trailing " << "op_state_machine " << s
"metadata, so ignoring new trailing metadata from client", << " already implicitly received trailing metadata, so "
s); "ignoring new trailing metadata from client";
s->to_read_trailing_md.Clear(); s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false; s->to_read_trailing_md_filled = false;
s->trailing_md_recvd_implicit_only = false; s->trailing_md_recvd_implicit_only = false;
} else { } else {
new_err = GRPC_ERROR_CREATE("Already recvd trailing md"); new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
INPROC_LOG( GRPC_TRACE_LOG(inproc, INFO)
GPR_INFO, << "op_state_machine " << s
"op_state_machine %p scheduling on_complete errors for already " << " scheduling on_complete errors for already recvd trailing "
"recvd trailing md %s", "md "
s, grpc_core::StatusToString(new_err).c_str()); << grpc_core::StatusToString(new_err);
fail_helper_locked(s, new_err); fail_helper_locked(s, new_err);
goto done; goto done;
} }
@ -770,7 +768,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
// This message needs to be wrapped up because it will never be // This message needs to be wrapped up because it will never be
// satisfied // satisfied
s->recv_message_op->payload->recv_message.recv_message->reset(); s->recv_message_op->payload->recv_message.recv_message->reset();
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " scheduling message-ready";
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready, s->recv_message_op->payload->recv_message.recv_message_ready,
@ -821,9 +820,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
needs_close = s->trailing_md_sent; needs_close = s->trailing_md_sent;
} }
} else if (!s->trailing_md_recvd) { } else if (!s->trailing_md_recvd) {
INPROC_LOG( GRPC_TRACE_LOG(inproc, INFO)
GPR_INFO, << "op_state_machine " << s
"op_state_machine %p has trailing md but not yet waiting for it", s); << " has trailing md but not yet waiting for it";
} }
} }
if (!s->t->is_client && s->trailing_md_sent && if (!s->t->is_client && s->trailing_md_sent &&
@ -831,8 +830,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
// In this case, we don't care to receive the write-close from the client // In this case, we don't care to receive the write-close from the client
// because we have already sent status and the RPC is over as far as we // because we have already sent status and the RPC is over as far as we
// are concerned. // are concerned.
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s", GRPC_TRACE_LOG(inproc, INFO)
s, grpc_core::StatusToString(new_err).c_str()); << "op_state_machine " << s << " scheduling trailing-md-ready "
<< grpc_core::StatusToString(new_err);
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata s->recv_trailing_md_op->payload->recv_trailing_metadata
@ -850,7 +850,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->trailing_md_recvd && s->recv_message_op) { if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the // No further message will come on this stream, so finish off the
// recv_message_op // recv_message_op
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); GRPC_TRACE_LOG(inproc, INFO)
<< "op_state_machine " << s << " scheduling message-ready";
s->recv_message_op->payload->recv_message.recv_message->reset(); s->recv_message_op->payload->recv_message.recv_message->reset();
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
@ -872,12 +873,12 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
} }
if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
s->recv_message_op || s->recv_trailing_md_op) { s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get // Didn't get the item we wanted so we still need to get rescheduled
// rescheduled GRPC_TRACE_LOG(inproc, INFO)
INPROC_LOG( << "op_state_machine " << s << " still needs closure "
GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s, << s->send_message_op << " " << s->send_trailing_md_op << " "
s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, << s->recv_initial_md_op << " " << s->recv_message_op << " "
s->recv_message_op, s->recv_trailing_md_op); << s->recv_trailing_md_op;
s->ops_needed = true; s->ops_needed = true;
} }
done: done:
@ -889,8 +890,8 @@ done:
bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) { bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
bool ret = false; // was the cancel accepted bool ret = false; // was the cancel accepted
INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, GRPC_TRACE_LOG(inproc, INFO)
grpc_core::StatusToString(error).c_str()); << "cancel_stream " << s << " with " << grpc_core::StatusToString(error);
if (s->cancel_self_error.ok()) { if (s->cancel_self_error.ok()) {
ret = true; ret = true;
s->cancel_self_error = error; s->cancel_self_error = error;
@ -943,7 +944,8 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
void inproc_transport::PerformStreamOp(grpc_stream* gs, void inproc_transport::PerformStreamOp(grpc_stream* gs,
grpc_transport_stream_op_batch* op) { grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", this, gs, op); GRPC_TRACE_LOG(inproc, INFO)
<< "perform_stream_op " << this << " " << gs << " " << op;
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
gpr_mu_lock(mu); gpr_mu_lock(mu);
@ -979,14 +981,15 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
// already self-canceled so still give it an error // already self-canceled so still give it an error
error = s->cancel_self_error; error = s->cancel_self_error;
} else { } else {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s, GRPC_TRACE_LOG(inproc, INFO)
s->t->is_client ? "client" : "server", << "perform_stream_op " << s
op->send_initial_metadata ? " send_initial_metadata" : "", << (s->t->is_client ? " client" : " server")
op->send_message ? " send_message" : "", << (op->send_initial_metadata ? " send_initial_metadata" : "")
op->send_trailing_metadata ? " send_trailing_metadata" : "", << (op->send_message ? " send_message" : "")
op->recv_initial_metadata ? " recv_initial_metadata" : "", << (op->send_trailing_metadata ? " send_trailing_metadata" : "")
op->recv_message ? " recv_message" : "", << (op->recv_initial_metadata ? " recv_initial_metadata" : "")
op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); << (op->recv_message ? " recv_message" : "")
<< (op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
} }
inproc_stream* other = s->other_side; inproc_stream* other = s->other_side;
@ -1002,7 +1005,7 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
: &other->to_read_initial_md_filled; : &other->to_read_initial_md_filled;
if (*destfilled || s->initial_md_sent) { if (*destfilled || s->initial_md_sent) {
// The buffer is already in use; that's an error! // The buffer is already in use; that's an error!
INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); GRPC_TRACE_LOG(inproc, INFO) << "Extra initial metadata " << s;
error = GRPC_ERROR_CREATE("Extra initial metadata"); error = GRPC_ERROR_CREATE("Extra initial metadata");
} else { } else {
if (!s->other_side_closed) { if (!s->other_side_closed) {
@ -1080,20 +1083,18 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
*op->payload->recv_initial_metadata.trailing_metadata_available = *op->payload->recv_initial_metadata.trailing_metadata_available =
true; true;
} }
INPROC_LOG( GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
GPR_INFO, << " scheduling initial-metadata-ready "
"perform_stream_op error %p scheduling initial-metadata-ready %s", << grpc_core::StatusToString(error);
s, grpc_core::StatusToString(error).c_str());
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready, op->payload->recv_initial_metadata.recv_initial_metadata_ready,
error); error);
} }
if (op->recv_message) { if (op->recv_message) {
INPROC_LOG( GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
GPR_INFO, << " scheduling recv message-ready "
"perform_stream_op error %p scheduling recv message-ready %s", s, << grpc_core::StatusToString(error);
grpc_core::StatusToString(error).c_str());
if (op->payload->recv_message.call_failed_before_recv_message != if (op->payload->recv_message.call_failed_before_recv_message !=
nullptr) { nullptr) {
*op->payload->recv_message.call_failed_before_recv_message = true; *op->payload->recv_message.call_failed_before_recv_message = true;
@ -1103,25 +1104,27 @@ void inproc_transport::PerformStreamOp(grpc_stream* gs,
error); error);
} }
if (op->recv_trailing_metadata) { if (op->recv_trailing_metadata) {
INPROC_LOG(GPR_INFO, GRPC_TRACE_LOG(inproc, INFO) << "perform_stream_op error " << s
"perform_stream_op error %p scheduling " << " scheduling trailing-metadata-ready "
"trailing-metadata-ready %s", << grpc_core::StatusToString(error);
s, grpc_core::StatusToString(error).c_str());
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error); error);
} }
} }
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s, GRPC_TRACE_LOG(inproc, INFO)
grpc_core::StatusToString(error).c_str()); << "perform_stream_op " << s << " scheduling on_complete "
<< grpc_core::StatusToString(error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error); grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
} }
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
} }
void close_transport_locked(inproc_transport* t) { void close_transport_locked(inproc_transport* t) {
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); GRPC_TRACE_LOG(inproc, INFO)
<< "close_transport " << t << " " << t->is_closed;
t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(), t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close transport"); "close transport");
if (!t->is_closed) { if (!t->is_closed) {
@ -1139,7 +1142,7 @@ void close_transport_locked(inproc_transport* t) {
} }
void inproc_transport::PerformOp(grpc_transport_op* op) { void inproc_transport::PerformOp(grpc_transport_op* op) {
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", this, op); GRPC_TRACE_LOG(inproc, INFO) << "perform_transport_op " << this << " " << op;
gpr_mu_lock(&mu->mu); gpr_mu_lock(&mu->mu);
if (op->start_connectivity_watch != nullptr) { if (op->start_connectivity_watch != nullptr) {
state_tracker.AddWatcher(op->start_connectivity_watch_state, state_tracker.AddWatcher(op->start_connectivity_watch_state,
@ -1173,7 +1176,8 @@ void inproc_transport::PerformOp(grpc_transport_op* op) {
void inproc_transport::DestroyStream(grpc_stream* gs, void inproc_transport::DestroyStream(grpc_stream* gs,
grpc_closure* then_schedule_closure) { grpc_closure* then_schedule_closure) {
INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); GRPC_TRACE_LOG(inproc, INFO)
<< "destroy_stream " << gs << " " << then_schedule_closure;
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu_lock(&mu->mu); gpr_mu_lock(&mu->mu);
close_stream_locked(s); close_stream_locked(s);
@ -1184,7 +1188,7 @@ void inproc_transport::DestroyStream(grpc_stream* gs,
} }
void inproc_transport::Orphan() { void inproc_transport::Orphan() {
INPROC_LOG(GPR_INFO, "destroy_transport %p", this); GRPC_TRACE_LOG(inproc, INFO) << "destroy_transport " << this;
gpr_mu_lock(&mu->mu); gpr_mu_lock(&mu->mu);
close_transport_locked(this); close_transport_locked(this);
gpr_mu_unlock(&mu->mu); gpr_mu_unlock(&mu->mu);
@ -1217,7 +1221,7 @@ void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/,
// //
void inproc_transports_create(grpc_core::Transport** server_transport, void inproc_transports_create(grpc_core::Transport** server_transport,
grpc_core::Transport** client_transport) { grpc_core::Transport** client_transport) {
INPROC_LOG(GPR_INFO, "inproc_transports_create"); GRPC_TRACE_LOG(inproc, INFO) << "inproc_transports_create";
shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
inproc_transport* st = inproc_transport* st =
new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false); new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false);

@ -25,6 +25,8 @@ function die {
ARCH="$1" ARCH="$1"
PLATFORM="$2" PLATFORM="$2"
PACKAGE_TYPE="$3" PACKAGE_TYPE="$3"
PROTOBUF_VERSION="$4"
echo "$EXTERNAL_GIT_ROOT" echo "$EXTERNAL_GIT_ROOT"
GRPC_VERSION="$(ruby -e 'require ENV["EXTERNAL_GIT_ROOT"] + "/src/ruby/lib/grpc/version.rb"; puts GRPC::VERSION')" GRPC_VERSION="$(ruby -e 'require ENV["EXTERNAL_GIT_ROOT"] + "/src/ruby/lib/grpc/version.rb"; puts GRPC::VERSION')"
if [[ "$PACKAGE_TYPE" == "source" ]]; then if [[ "$PACKAGE_TYPE" == "source" ]]; then
@ -46,6 +48,10 @@ fi;
gem install builder gem install builder
gem generate_index --directory "${GEM_SOURCE}" gem generate_index --directory "${GEM_SOURCE}"
if [[ -n "$PROTOBUF_VERSION" ]]; then
bundle add google-protobuf --version "~> $PROTOBUF_VERSION"
fi;
bundle install bundle install
bundle exec ./distribtest.rb bundle exec ./distribtest.rb

@ -220,6 +220,7 @@ class RubyDistribTest(object):
ruby_version=None, ruby_version=None,
source=False, source=False,
presubmit=False, presubmit=False,
protobuf_version="",
): ):
self.package_type = "binary" self.package_type = "binary"
if source: if source:
@ -231,10 +232,13 @@ class RubyDistribTest(object):
ruby_version or "unspecified", ruby_version or "unspecified",
self.package_type, self.package_type,
) )
if not protobuf_version == "":
self.name += "_protobuf_%s" % protobuf_version
self.platform = platform self.platform = platform
self.arch = arch self.arch = arch
self.docker_suffix = docker_suffix self.docker_suffix = docker_suffix
self.ruby_version = ruby_version self.ruby_version = ruby_version
self.protobuf_version = protobuf_version
self.labels = ["distribtest", "ruby", platform, arch, docker_suffix] self.labels = ["distribtest", "ruby", platform, arch, docker_suffix]
if presubmit: if presubmit:
self.labels.append("presubmit") self.labels.append("presubmit")
@ -261,8 +265,13 @@ class RubyDistribTest(object):
return create_docker_jobspec( return create_docker_jobspec(
self.name, self.name,
dockerfile_name, dockerfile_name,
"test/distrib/ruby/run_distrib_test.sh %s %s %s" "test/distrib/ruby/run_distrib_test.sh %s %s %s %s"
% (arch_to_gem_arch[self.arch], self.platform, self.package_type), % (
arch_to_gem_arch[self.arch],
self.platform,
self.package_type,
self.protobuf_version,
),
copy_rel_path="test/distrib", copy_rel_path="test/distrib",
) )
@ -485,6 +494,14 @@ def targets():
RubyDistribTest( RubyDistribTest(
"linux", "x64", "debian11", ruby_version="ruby_3_3", presubmit=True "linux", "x64", "debian11", ruby_version="ruby_3_3", presubmit=True
), ),
RubyDistribTest(
"linux",
"x64",
"debian11",
ruby_version="ruby_3_3",
protobuf_version="3.25",
presubmit=True,
),
RubyDistribTest("linux", "x64", "centos7"), RubyDistribTest("linux", "x64", "centos7"),
RubyDistribTest("linux", "x64", "ubuntu2004"), RubyDistribTest("linux", "x64", "ubuntu2004"),
RubyDistribTest("linux", "x64", "ubuntu2204", presubmit=True), RubyDistribTest("linux", "x64", "ubuntu2204", presubmit=True),

Loading…
Cancel
Save