Stop using closure for inproc state machine, fix bugs

pull/21946/head
Vijay Pai 5 years ago
parent d6ecc3220a
commit 79de5c5385
  1. 64
      src/core/ext/transport/inproc/inproc_transport.cc

@ -51,7 +51,8 @@ grpc_slice g_fake_auth_value;
struct inproc_stream; struct inproc_stream;
bool cancel_stream_locked(inproc_stream* s, grpc_error* error); bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
void op_state_machine(void* arg, grpc_error* error); void maybe_process_ops_locked(inproc_stream* s, grpc_error* error);
void op_state_machine_locked(inproc_stream* s, grpc_error* error);
void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
bool is_initial); bool is_initial);
grpc_error* fill_in_metadata(inproc_stream* s, grpc_error* fill_in_metadata(inproc_stream* s,
@ -130,8 +131,6 @@ struct inproc_stream {
grpc_metadata_batch_init(&to_read_initial_md); grpc_metadata_batch_init(&to_read_initial_md);
grpc_metadata_batch_init(&to_read_trailing_md); grpc_metadata_batch_init(&to_read_trailing_md);
GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this,
grpc_schedule_on_exec_ctx);
grpc_metadata_batch_init(&write_buffer_initial_md); grpc_metadata_batch_init(&write_buffer_initial_md);
grpc_metadata_batch_init(&write_buffer_trailing_md); grpc_metadata_batch_init(&write_buffer_trailing_md);
@ -186,6 +185,7 @@ struct inproc_stream {
if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
cancel_other_error = cs->write_buffer_cancel_error; cancel_other_error = cs->write_buffer_cancel_error;
cs->write_buffer_cancel_error = GRPC_ERROR_NONE; cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
maybe_process_ops_locked(this, cancel_other_error);
} }
gpr_mu_unlock(&t->mu->mu); gpr_mu_unlock(&t->mu->mu);
@ -235,8 +235,6 @@ struct inproc_stream {
grpc_metadata_batch to_read_trailing_md; grpc_metadata_batch to_read_trailing_md;
bool to_read_trailing_md_filled = false; bool to_read_trailing_md_filled = false;
bool ops_needed = false; bool ops_needed = false;
bool op_closure_scheduled = false;
grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side // Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up // stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md; grpc_metadata_batch write_buffer_initial_md;
@ -396,12 +394,10 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
} }
} }
void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { void maybe_process_ops_locked(inproc_stream* s, grpc_error* error) {
if (s && s->ops_needed && !s->op_closure_scheduled) { if (s && (error != GRPC_ERROR_NONE || s->ops_needed)) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
s->ops_needed = false; s->ops_needed = false;
op_state_machine_locked(s, error);
} }
} }
@ -429,7 +425,7 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
if (other->cancel_other_error == GRPC_ERROR_NONE) { if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error); other->cancel_other_error = GRPC_ERROR_REF(error);
} }
maybe_schedule_op_closure_locked(other, error); maybe_process_ops_locked(other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error); s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
} }
@ -587,23 +583,17 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
sender->send_message_op = nullptr; sender->send_message_op = nullptr;
} }
void op_state_machine(void* arg, grpc_error* error) { void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
// This function gets called when we have contents in the unprocessed reads // This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted // Get what we want based on our ops wanted
// Schedule our appropriate closures // Schedule our appropriate closures
// and then return to ops_needed state if still needed // and then return to ops_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
grpc_error* new_err = GRPC_ERROR_NONE; grpc_error* new_err = GRPC_ERROR_NONE;
bool needs_close = false; bool needs_close = false;
INPROC_LOG(GPR_INFO, "op_state_machine %p", arg); INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
inproc_stream* s = static_cast<inproc_stream*>(arg);
gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
s->op_closure_scheduled = false;
// cancellation takes precedence // cancellation takes precedence
inproc_stream* other = s->other_side; inproc_stream* other = s->other_side;
@ -621,7 +611,7 @@ void op_state_machine(void* arg, grpc_error* error) {
if (s->send_message_op && other) { if (s->send_message_op && other) {
if (other->recv_message_op) { if (other->recv_message_op) {
message_transfer_locked(s, other); message_transfer_locked(s, other);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); maybe_process_ops_locked(other, GRPC_ERROR_NONE);
} else if (!s->t->is_client && s->trailing_md_sent) { } else if (!s->t->is_client && s->trailing_md_sent) {
// A server send will never be matched if the server already sent status // A server send will never be matched if the server already sent status
s->send_message_op->payload->send_message.send_message.reset(); s->send_message_op->payload->send_message.send_message.reset();
@ -679,7 +669,7 @@ void op_state_machine(void* arg, grpc_error* error) {
needs_close = true; needs_close = true;
} }
} }
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); maybe_process_ops_locked(other, GRPC_ERROR_NONE);
complete_if_batch_end_locked( complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_trailing_md_op, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
"op_state_machine scheduling send-trailing-metadata-on-complete"); "op_state_machine scheduling send-trailing-metadata-on-complete");
@ -741,7 +731,7 @@ void op_state_machine(void* arg, grpc_error* error) {
if (s->recv_message_op) { if (s->recv_message_op) {
if (other && other->send_message_op) { if (other && other->send_message_op) {
message_transfer_locked(other, s); message_transfer_locked(other, s);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); maybe_process_ops_locked(other, GRPC_ERROR_NONE);
} }
} }
if (s->to_read_trailing_md_filled) { if (s->to_read_trailing_md_filled) {
@ -808,7 +798,7 @@ void op_state_machine(void* arg, grpc_error* error) {
s->recv_trailing_md_op->on_complete, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err)); GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr; s->recv_trailing_md_op = nullptr;
needs_close = true; needs_close = s->trailing_md_sent;
} else { } else {
INPROC_LOG(GPR_INFO, INPROC_LOG(GPR_INFO,
"op_state_machine %p server needs to delay handling " "op_state_machine %p server needs to delay handling "
@ -860,7 +850,6 @@ done:
close_other_side_locked(s, "op_state_machine"); close_other_side_locked(s, "op_state_machine");
close_stream_locked(s); close_stream_locked(s);
} }
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err); GRPC_ERROR_UNREF(new_err);
} }
@ -870,7 +859,9 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
if (s->cancel_self_error == GRPC_ERROR_NONE) { if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true; ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error); s->cancel_self_error = GRPC_ERROR_REF(error);
maybe_schedule_op_closure_locked(s, s->cancel_self_error); // Catch current value of other before it gets closed off
inproc_stream* other = s->other_side;
maybe_process_ops_locked(s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we // Send trailing md to the other side indicating cancellation, even if we
// already have // already have
s->trailing_md_sent = true; s->trailing_md_sent = true;
@ -878,7 +869,6 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
grpc_metadata_batch cancel_md; grpc_metadata_batch cancel_md;
grpc_metadata_batch_init(&cancel_md); grpc_metadata_batch_init(&cancel_md);
inproc_stream* other = s->other_side;
grpc_metadata_batch* dest = (other == nullptr) grpc_metadata_batch* dest = (other == nullptr)
? &s->write_buffer_trailing_md ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md; : &other->to_read_trailing_md;
@ -891,7 +881,7 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
if (other->cancel_other_error == GRPC_ERROR_NONE) { if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
} }
maybe_schedule_op_closure_locked(other, other->cancel_other_error); maybe_process_ops_locked(other, other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
} }
@ -969,8 +959,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
} }
bool needs_close = false;
inproc_stream* other = s->other_side; inproc_stream* other = s->other_side;
if (error == GRPC_ERROR_NONE && if (error == GRPC_ERROR_NONE &&
(op->send_initial_metadata || op->send_trailing_metadata)) { (op->send_initial_metadata || op->send_trailing_metadata)) {
@ -991,7 +979,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
} else { } else {
if (!other || !other->closed) { if (!s->other_side_closed) {
fill_in_metadata( fill_in_metadata(
s, op->payload->send_initial_metadata.send_initial_metadata, s, op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags, op->payload->send_initial_metadata.send_initial_metadata_flags,
@ -1005,7 +993,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
s->initial_md_sent = true; s->initial_md_sent = true;
} }
} }
maybe_schedule_op_closure_locked(other, error); maybe_process_ops_locked(other, error);
} }
} }
@ -1013,7 +1001,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
(op->send_message || op->send_trailing_metadata || (op->send_message || op->send_trailing_metadata ||
op->recv_initial_metadata || op->recv_message || op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) { op->recv_trailing_metadata)) {
// Mark ops that need to be processed by the closure // Mark ops that need to be processed by the state machine
if (op->send_message) { if (op->send_message) {
s->send_message_op = op; s->send_message_op = op;
} }
@ -1030,7 +1018,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
s->recv_trailing_md_op = op; s->recv_trailing_md_op = op;
} }
// We want to initiate the closure if: // We want to initiate the state machine if:
// 1. We want to send a message and the other side wants to receive // 1. We want to send a message and the other side wants to receive
// 2. We want to send trailing metadata and there isn't an unmatched send // 2. We want to send trailing metadata and there isn't an unmatched send
// or the other side wants trailing metadata // or the other side wants trailing metadata
@ -1044,11 +1032,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
(op->recv_initial_metadata && s->to_read_initial_md_filled) || (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
(op->recv_message && other && other->send_message_op != nullptr) || (op->recv_message && other && other->send_message_op != nullptr) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) { (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) { op_state_machine_locked(s, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_NONE);
s->op_closure_scheduled = true;
}
} else { } else {
s->ops_needed = true; s->ops_needed = true;
} }
@ -1103,10 +1087,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
error); error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error)); grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
} }
if (needs_close) {
close_other_side_locked(s, "perform_stream_op:other_side");
close_stream_locked(s);
}
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }

Loading…
Cancel
Save