diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5f5c480f9dc..91b6a25d213 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -198,8 +198,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_stream_map_destroy(&stream_map); grpc_connectivity_state_destroy(&channel_callback.state_tracker); - GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); - cancel_pings(this, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); @@ -214,6 +212,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { GRPC_ERROR_UNREF(closed_with_error); gpr_free(ping_acks); gpr_free(peer_string); + gpr_mu_destroy(&mu); } static const grpc_transport_vtable* get_vtable(void); @@ -394,32 +393,29 @@ static bool read_channel_args(grpc_chttp2_transport* t, static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); + destructive_reclaimer_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, next_bdp_ping_timer_expired_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + finish_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); } static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { @@ -474,13 +470,13 @@ grpc_chttp2_transport::grpc_chttp2_transport( ep(ep), peer_string(grpc_endpoint_get_peer(ep)), resource_user(resource_user), - combiner(grpc_combiner_create()), is_client(is_client), next_stream_id(is_client ? 1 : 2), deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); base.vtable = get_vtable(); + gpr_mu_init(&mu); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -561,11 +557,13 @@ grpc_chttp2_transport::grpc_chttp2_transport( static void destroy_transport_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); t->destroying = 1; close_transport_locked( t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + gpr_mu_unlock(&t->mu); // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -573,7 +571,7 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -687,12 +685,13 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&flow_controlled_buffer); GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); } grpc_chttp2_stream::~grpc_chttp2_stream() { + gpr_mu_lock(&t->mu); if (t->channelz_socket != nullptr) { if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { t->channelz_socket->RecordStreamSucceeded(); @@ -743,7 +742,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { if (t->resource_user != nullptr) { grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); } - + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -766,7 +765,6 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); if (s->stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && @@ -784,7 +782,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -947,11 +945,10 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the * 'write_action' (which is scheduled by 'write_action_begin_locked') */ - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, @@ -1019,6 +1016,7 @@ static const char* begin_writing_desc(bool partial, bool inlined) { static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { GPR_TIMER_SCOPE("write_action_begin_locked", 0); grpc_chttp2_transport* t = static_cast(gt); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1059,9 +1057,11 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } + gpr_mu_unlock(&t->mu); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1074,7 +1074,7 @@ static void write_action(void* gt, grpc_error* error) { grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), cl); } @@ -1083,7 +1083,7 @@ static void write_action(void* gt, grpc_error* error) { static void write_action_end_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); - + gpr_mu_lock(&t->mu); bool closed = false; if (error != GRPC_ERROR_NONE) { close_transport_locked(t, GRPC_ERROR_REF(error)); @@ -1119,15 +1119,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) { if (!closed) { GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); } - GRPC_CLOSURE_RUN( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; } grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1389,6 +1389,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1400,6 +1401,7 @@ static void complete_fetch_locked(void* gs, grpc_error* error) { s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } + gpr_mu_unlock(&t->mu); } static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, @@ -1425,6 +1427,7 @@ static void perform_stream_op_locked(void* stream_op, static_cast(op->handler_private.extra_arg); grpc_transport_stream_op_batch_payload* op_payload = op->payload; grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); @@ -1705,7 +1708,7 @@ static void perform_stream_op_locked(void* stream_op, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } - + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); } @@ -1738,7 +1741,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, op->handler_private.extra_arg = gs; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, - op, grpc_combiner_scheduler(t->combiner)), + op, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -1749,7 +1752,11 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + if (j == GRPC_CHTTP2_PCL_INITIATE) { + GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); + } else { + GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + } } GRPC_ERROR_UNREF(error); } @@ -1777,8 +1784,8 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1797,10 +1804,12 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void retry_initiate_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); t->ping_state.is_delayed_ping_timer_set = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); } @@ -1854,7 +1863,7 @@ static void perform_transport_op_locked(void* stream_op, grpc_transport_op* op = static_cast(stream_op); grpc_chttp2_transport* t = static_cast(op->handler_private.extra_arg); - + gpr_mu_lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1888,8 +1897,8 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); - + GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } @@ -1904,7 +1913,7 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -2550,7 +2559,7 @@ static void read_action_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - + gpr_mu_lock(&t->mu); GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2634,7 +2643,9 @@ static void read_action_locked(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } + gpr_mu_unlock(&t->mu); } else { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2656,6 +2667,8 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + // No need to take a lock. Closure scheduler will already have a lock. + // gpr_mu_lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2668,15 +2681,18 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_timer_cancel(&t->keepalive_ping_timer); } t->flow_control->bdp_estimator()->StartPing(); + // gpr_mu_unlock(&t->mu); } static void finish_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2687,17 +2703,21 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) { t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired_locked); + gpr_mu_unlock(&t->mu); } static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } schedule_bdp_ping_locked(t); + gpr_mu_unlock(&t->mu); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, @@ -2769,6 +2789,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; @@ -2793,6 +2814,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2801,6 +2823,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + // No need to take a lock. + // gpr_mu_lock(&t->mu); if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2811,10 +2835,12 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); + // gpr_mu_unlock(&t->mu); } static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { @@ -2828,11 +2854,13 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { &t->init_keepalive_ping_locked); } } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", @@ -2852,6 +2880,7 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } @@ -2890,6 +2919,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); + gpr_mu_lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2903,6 +2933,7 @@ static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } + gpr_mu_unlock(&s->t->mu); } namespace grpc_core { @@ -2924,25 +2955,28 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); bs->Unref(); s->pending_byte_stream = false; grpc_chttp2_maybe_complete_recv_message(t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + gpr_mu_unlock(&t->mu); } void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); + OrphanLocked(this, GRPC_ERROR_NONE); + /*GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, + this, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE);*/ } void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; + gpr_mu_lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2981,6 +3015,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, s->on_next = bs->next_action_.on_complete; } bs->Unref(); + gpr_mu_unlock(&t->mu); } bool Chttp2IncomingByteStream::Next(size_t max_size_hint, @@ -2992,11 +3027,10 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, Ref(); next_action_.max_size_hint = max_size_hint; next_action_.on_complete = on_complete; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&next_action_.closure, - &Chttp2IncomingByteStream::NextLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, + this, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); return false; } } @@ -3124,6 +3158,7 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { /* Channel with no active streams: send a goaway to try and make it @@ -3148,11 +3183,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_resource_user_finish_reclamation( grpc_endpoint_get_resource_user(t->ep)); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); size_t n = grpc_chttp2_stream_map_size(&t->stream_map); t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { @@ -3179,6 +3216,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_resource_user_finish_reclamation( grpc_endpoint_get_resource_user(t->ep)); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index a5142ffd96f..ab9f9e2c9eb 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,12 +1669,14 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); if (!s->write_closed) { grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } @@ -1741,10 +1743,9 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(force_client_rst_stream, s, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6e1db8a5707..784fcf42d31 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -39,6 +39,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" @@ -294,14 +295,13 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ + gpr_mu mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string; grpc_resource_user* resource_user; - grpc_combiner* combiner; - grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index d6d9e4521f6..38327549ea8 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -104,7 +104,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c7b2e8299b9..94667ae0a69 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -355,4 +355,43 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) { grpc_closure_list_sched(closure_list) #endif +#ifndef NDEBUG +inline void grpc_closure_list_run(const char* file, int line, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_run(grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; + GPR_ASSERT(c->cb != nullptr); +#endif + c->scheduler->vtable->run(c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + +/** Schedule all closures in a list to be run. Does not need to be run from a + * safe point. */ +#ifndef NDEBUG +#define GRPC_CLOSURE_LIST_RUN(closure_list) \ + grpc_closure_list_run(__FILE__, __LINE__, closure_list) +#else +#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list) +#endif + #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */