diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1271e79abf7..7b24347c073 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -51,7 +51,8 @@ grpc_slice g_fake_auth_value; struct inproc_stream; bool cancel_stream_locked(inproc_stream* s, grpc_error* error); -void op_state_machine(void* arg, grpc_error* error); +void maybe_process_ops_locked(inproc_stream* s, grpc_error* error); +void op_state_machine_locked(inproc_stream* s, grpc_error* error); void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, bool is_initial); grpc_error* fill_in_metadata(inproc_stream* s, @@ -130,8 +131,6 @@ struct inproc_stream { grpc_metadata_batch_init(&to_read_initial_md); grpc_metadata_batch_init(&to_read_trailing_md); - GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this, - grpc_schedule_on_exec_ctx); grpc_metadata_batch_init(&write_buffer_initial_md); grpc_metadata_batch_init(&write_buffer_trailing_md); @@ -186,6 +185,7 @@ struct inproc_stream { if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { cancel_other_error = cs->write_buffer_cancel_error; cs->write_buffer_cancel_error = GRPC_ERROR_NONE; + maybe_process_ops_locked(this, cancel_other_error); } gpr_mu_unlock(&t->mu->mu); @@ -235,8 +235,6 @@ struct inproc_stream { grpc_metadata_batch to_read_trailing_md; bool to_read_trailing_md_filled = false; bool ops_needed = false; - bool op_closure_scheduled = false; - grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; @@ -396,12 +394,10 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, } } -void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { - if (s && s->ops_needed && !s->op_closure_scheduled) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure, - GRPC_ERROR_REF(error)); - s->op_closure_scheduled = true; +void maybe_process_ops_locked(inproc_stream* s, grpc_error* error) { + if (s && (error != GRPC_ERROR_NONE || s->ops_needed)) { s->ops_needed = false; + op_state_machine_locked(s, error); } } @@ -429,7 +425,7 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) { if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - maybe_schedule_op_closure_locked(other, error); + maybe_process_ops_locked(other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -587,23 +583,17 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { sender->send_message_op = nullptr; } -void op_state_machine(void* arg, grpc_error* error) { +void op_state_machine_locked(inproc_stream* s, grpc_error* error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures // and then return to ops_needed state if still needed - // Since this is a closure directly invoked by the combiner, it should not - // unref the error parameter explicitly; the combiner will do that implicitly grpc_error* new_err = GRPC_ERROR_NONE; bool needs_close = false; - INPROC_LOG(GPR_INFO, "op_state_machine %p", arg); - inproc_stream* s = static_cast(arg); - gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed - gpr_mu_lock(mu); - s->op_closure_scheduled = false; + INPROC_LOG(GPR_INFO, "op_state_machine %p", s); // cancellation takes precedence inproc_stream* other = s->other_side; @@ -621,7 +611,7 @@ void op_state_machine(void* arg, grpc_error* error) { if (s->send_message_op && other) { if (other->recv_message_op) { message_transfer_locked(s, other); - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); + maybe_process_ops_locked(other, GRPC_ERROR_NONE); } else if (!s->t->is_client && s->trailing_md_sent) { // A server send will never be matched if the server already sent status s->send_message_op->payload->send_message.send_message.reset(); @@ -679,7 +669,7 @@ void op_state_machine(void* arg, grpc_error* error) { needs_close = true; } } - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); + maybe_process_ops_locked(other, GRPC_ERROR_NONE); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_trailing_md_op, "op_state_machine scheduling send-trailing-metadata-on-complete"); @@ -741,7 +731,7 @@ void op_state_machine(void* arg, grpc_error* error) { if (s->recv_message_op) { if (other && other->send_message_op) { message_transfer_locked(other, s); - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); + maybe_process_ops_locked(other, GRPC_ERROR_NONE); } } if (s->to_read_trailing_md_filled) { @@ -808,7 +798,7 @@ void op_state_machine(void* arg, grpc_error* error) { s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; - needs_close = true; + needs_close = s->trailing_md_sent; } else { INPROC_LOG(GPR_INFO, "op_state_machine %p server needs to delay handling " @@ -860,7 +850,6 @@ done: close_other_side_locked(s, "op_state_machine"); close_stream_locked(s); } - gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } @@ -870,7 +859,9 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - maybe_schedule_op_closure_locked(s, s->cancel_self_error); + // Catch current value of other before it gets closed off + inproc_stream* other = s->other_side; + maybe_process_ops_locked(s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -878,7 +869,6 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { grpc_metadata_batch cancel_md; grpc_metadata_batch_init(&cancel_md); - inproc_stream* other = s->other_side; grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; @@ -891,7 +881,7 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - maybe_schedule_op_closure_locked(other, other->cancel_other_error); + maybe_process_ops_locked(other, other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -969,8 +959,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); } - bool needs_close = false; - inproc_stream* other = s->other_side; if (error == GRPC_ERROR_NONE && (op->send_initial_metadata || op->send_trailing_metadata)) { @@ -991,7 +979,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); } else { - if (!other || !other->closed) { + if (!s->other_side_closed) { fill_in_metadata( s, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, @@ -1005,7 +993,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, s->initial_md_sent = true; } } - maybe_schedule_op_closure_locked(other, error); + maybe_process_ops_locked(other, error); } } @@ -1013,7 +1001,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, (op->send_message || op->send_trailing_metadata || op->recv_initial_metadata || op->recv_message || op->recv_trailing_metadata)) { - // Mark ops that need to be processed by the closure + // Mark ops that need to be processed by the state machine if (op->send_message) { s->send_message_op = op; } @@ -1030,7 +1018,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, s->recv_trailing_md_op = op; } - // We want to initiate the closure if: + // We want to initiate the state machine if: // 1. We want to send a message and the other side wants to receive // 2. We want to send trailing metadata and there isn't an unmatched send // or the other side wants trailing metadata @@ -1044,11 +1032,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, (op->recv_initial_metadata && s->to_read_initial_md_filled) || (op->recv_message && other && other->send_message_op != nullptr) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { - if (!s->op_closure_scheduled) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure, - GRPC_ERROR_NONE); - s->op_closure_scheduled = true; - } + op_state_machine_locked(s, error); } else { s->ops_needed = true; } @@ -1103,10 +1087,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, error); grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error)); } - if (needs_close) { - close_other_side_locked(s, "perform_stream_op:other_side"); - close_stream_locked(s); - } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(error); }