diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index aeb7389ca21..acb3b4c2ddd 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -102,14 +102,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, "chttp2_refcount"); /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin_locked(void* t, grpc_error* error); +static void write_action_begin(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error); -static void write_action_end_locked(void* t, grpc_error* error); +static void write_action_end(void* t, grpc_error* error); -static void read_action_locked(void* t, grpc_error* error); +static void read_action(void* t, grpc_error* error); static void continue_read_action_locked(grpc_chttp2_transport* t); -static void complete_fetch_locked(void* gs, grpc_error* error); +static void complete_fetch(void* gs, grpc_error* error); /** Set a transport level setting, and push it to our peer */ static void queue_setting_update(grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value); @@ -124,8 +124,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, const char* reason); -static void benign_reclaimer_locked(void* t, grpc_error* error); -static void destructive_reclaimer_locked(void* t, grpc_error* error); +static void benign_reclaimer(void* t, grpc_error* error); +static void destructive_reclaimer(void* t, grpc_error* error); static void post_benign_reclaimer(grpc_chttp2_transport* t); static void post_destructive_reclaimer(grpc_chttp2_transport* t); @@ -135,20 +135,20 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error); static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); static void start_bdp_ping_locked(void* tp, grpc_error* error); -static void finish_bdp_ping_locked(void* tp, grpc_error* error); -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error); +static void finish_bdp_ping(void* tp, grpc_error* error); +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_complete); -static void retry_initiate_ping_locked(void* tp, grpc_error* error); +static void retry_initiate_ping(void* tp, grpc_error* error); /** keepalive-relevant functions */ -static void init_keepalive_ping_locked(void* arg, grpc_error* error); +static void init_keepalive_ping(void* arg, grpc_error* error); static void start_keepalive_ping_locked(void* arg, grpc_error* error); -static void finish_keepalive_ping_locked(void* arg, grpc_error* error); -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); +static void finish_keepalive_ping(void* arg, grpc_error* error); +static void keepalive_watchdog_fired(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error); @@ -197,8 +197,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_stream_map_destroy(&stream_map); - GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); - cancel_pings(this, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); @@ -392,33 +390,27 @@ static bool read_channel_args(grpc_chttp2_transport* t, } static void init_transport_closures(grpc_chttp2_transport* t) { - GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->read_action, read_action, t, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->benign_reclaimer, benign_reclaimer, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->destructive_reclaimer, destructive_reclaimer, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->retry_initiate_ping, retry_initiate_ping, t, + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, - next_bdp_ping_timer_expired_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->finish_bdp_ping, finish_bdp_ping, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired, + next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping, init_keepalive_ping, t, + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, - keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner)); + start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->finish_keepalive_ping, finish_keepalive_ping, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired, keepalive_watchdog_fired, t, + grpc_schedule_on_exec_ctx); } static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { @@ -458,7 +450,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no inflight keeaplive timers */ @@ -473,7 +465,6 @@ grpc_chttp2_transport::grpc_chttp2_transport( ep(ep), peer_string(grpc_endpoint_get_peer(ep)), resource_user(resource_user), - combiner(grpc_combiner_create()), state_tracker(is_client ? "client_transport" : "server_transport", GRPC_CHANNEL_READY), is_client(is_client), @@ -557,22 +548,18 @@ grpc_chttp2_transport::grpc_chttp2_transport( post_benign_reclaimer(this); } -static void destroy_transport_locked(void* tp, grpc_error* error) { - grpc_chttp2_transport* t = static_cast(tp); - t->destroying = 1; - close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - // Must be the last line. - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); -} - static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + { + grpc_core::MutexLock lock(&t->mu); + t->destroying = 1; + close_transport_locked( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + } + // Must be the last line. + GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } static void close_transport_locked(grpc_chttp2_transport* t, @@ -684,64 +671,67 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, - grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&complete_fetch, ::complete_fetch, this, + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); } grpc_chttp2_stream::~grpc_chttp2_stream() { - if (t->channelz_socket != nullptr) { - if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { - t->channelz_socket->RecordStreamSucceeded(); - } else { - t->channelz_socket->RecordStreamFailed(); + { + grpc_core::MutexLock lock(&t->mu); + if (t->channelz_socket != nullptr) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } } - } - GPR_ASSERT((write_closed && read_closed) || id == 0); - if (id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); - } + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); + } - grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&frame_storage); - if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - } - if (stream_decompression_method != - GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + if (stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } - grpc_chttp2_list_remove_stalled_by_transport(t, this); - grpc_chttp2_list_remove_stalled_by_stream(t, this); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); - for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(included[i])) { - gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", id, i); - abort(); + for (int i = 0; i < STREAM_LIST_COUNT; i++) { + if (GPR_UNLIKELY(included[i])) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->is_client ? "client" : "server", id, i); + abort(); + } } - } - GPR_ASSERT(send_initial_metadata_finished == nullptr); - GPR_ASSERT(fetching_send_message == nullptr); - GPR_ASSERT(send_trailing_metadata_finished == nullptr); - GPR_ASSERT(recv_initial_metadata_ready == nullptr); - GPR_ASSERT(recv_message_ready == nullptr); - GPR_ASSERT(recv_trailing_metadata_finished == nullptr); - grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); - GRPC_ERROR_UNREF(read_closed_error); - GRPC_ERROR_UNREF(write_closed_error); - GRPC_ERROR_UNREF(byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); - flow_control.Destroy(); + flow_control.Destroy(); - if (t->resource_user != nullptr) { - grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -755,16 +745,9 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, return 0; } -static void destroy_stream_locked(void* sp, grpc_error* error) { - GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_stream* s = static_cast(sp); - s->~grpc_chttp2_stream(); -} - static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); if (s->stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && @@ -780,10 +763,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, } s->destroy_stream_arg = then_schedule_closure; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + s->~grpc_chttp2_stream(); } grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, @@ -928,26 +908,29 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - /* Note that the 'write_action_begin_locked' closure is being scheduled + // TODO(yashykt) : When we were using combiners, we were using the finally + // version, so that the write action would happen after we were done + // queueing up all the writes that we wanted. Maybe do something similar? + // Keeping the earlier comment for posterity - + /* Note that the 'write_action_begin' closure is being scheduled * on the 'finally_scheduler' of t->combiner. This means that - * 'write_action_begin_locked' is called only *after* all the other + * 'write_action_begin' is called only *after* all the other * closures (some of which are potentially initiating more writes on the * transport) are executed on the t->combiner. * * The reason for scheduling on finally_scheduler is to make sure we batch - * as many writes as possible. 'write_action_begin_locked' is the function + * as many writes as possible. 'write_action_begin' is the function * that gathers all the relevant bytes (which are at various places in the * grpc_chttp2_transport structure) and append them to 'outbuf' field in * grpc_chttp2_transport thereby batching what would have been potentially * multiple write operations. * - * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. + * Also, 'write_action_begin' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the - * 'write_action' (which is scheduled by 'write_action_begin_locked') */ + * 'write_action' (which is scheduled by 'write_action_begin') */ GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: @@ -1013,9 +996,10 @@ static const char* begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { - GPR_TIMER_SCOPE("write_action_begin_locked", 0); +static void write_action_begin(void* gt, grpc_error* error_ignored) { + GPR_TIMER_SCOPE("write_action_begin", 0); grpc_chttp2_transport* t = static_cast(gt); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1056,9 +1040,11 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } + lock.Unlock(); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1068,63 +1054,67 @@ static void write_action(void* gt, grpc_error* error) { grpc_chttp2_transport* t = static_cast(gt); void* cl = t->cl; t->cl = nullptr; - grpc_endpoint_write( - t->ep, &t->outbuf, - GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), - cl); + grpc_endpoint_write(t->ep, &t->outbuf, + GRPC_CLOSURE_INIT(&t->write_action_end, write_action_end, + t, grpc_schedule_on_exec_ctx), + cl); } /* Callback from the grpc_endpoint after bytes have been written by calling * sendmsg */ -static void write_action_end_locked(void* tp, grpc_error* error) { +static void write_action_end(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); + { + grpc_core::MutexLock lock(&t->mu); + bool closed = false; + if (error != GRPC_ERROR_NONE) { + close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; + } - bool closed = false; - if (error != GRPC_ERROR_NONE) { - close_transport_locked(t, GRPC_ERROR_REF(error)); - closed = true; - } + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked( + t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + } + } - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; - closed = true; - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - close_transport_locked( - t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + switch (t->write_state) { + case GRPC_CHTTP2_WRITE_STATE_IDLE: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITE_STATE_WRITING: + GPR_TIMER_MARK("state=writing", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); + t->is_first_write_in_batch = false; + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } + // TODO(yashykt) : When we were using combiners, we were using the + // finally version, so that the write action would happen after we were + // done queueing up all the writes that we wanted. Maybe do something + // similar? + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); + break; } - } - switch (t->write_state) { - case GRPC_CHTTP2_WRITE_STATE_IDLE: - GPR_UNREACHABLE_CODE(break); - case GRPC_CHTTP2_WRITE_STATE_WRITING: - GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); - t->is_first_write_in_batch = false; - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - // If the transport is closed, we will retry writing on the endpoint - // and next write may contain part of the currently serialized frames. - // So, we should only call the run_after_write callbacks when the next - // write finishes, or the callbacks will be invoked when the stream is - // closed. - if (!closed) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); - } - GRPC_CLOSURE_RUN( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); - break; + grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); } - - grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1370,8 +1360,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } s->fetching_send_message.reset(); return; /* early out */ - } else if (s->fetching_send_message->Next(UINT32_MAX, - &s->complete_fetch_locked)) { + } else if (s->fetching_send_message->Next(UINT32_MAX, &s->complete_fetch)) { grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { s->fetching_send_message.reset(); @@ -1383,9 +1372,10 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } } -static void complete_fetch_locked(void* gs, grpc_error* error) { +static void complete_fetch(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; + grpc_core::MutexLock lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1412,24 +1402,40 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, } } -static void perform_stream_op_locked(void* stream_op, - grpc_error* error_ignored) { - GPR_TIMER_SCOPE("perform_stream_op_locked", 0); - - grpc_transport_stream_op_batch* op = - static_cast(stream_op); - grpc_chttp2_stream* s = - static_cast(op->handler_private.extra_arg); +static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, + grpc_transport_stream_op_batch* op) { + GPR_TIMER_SCOPE("perform_stream_op", 0); + grpc_chttp2_transport* t = reinterpret_cast(gt); + grpc_chttp2_stream* s = reinterpret_cast(gs); grpc_transport_stream_op_batch_payload* op_payload = op->payload; - grpc_chttp2_transport* t = s->t; + if (!t->is_client) { + if (op->send_initial_metadata) { + grpc_millis deadline = + op_payload->send_initial_metadata.send_initial_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + if (op->send_trailing_metadata) { + grpc_millis deadline = + op_payload->send_trailing_metadata.send_trailing_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + } + + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* str = grpc_transport_stream_op_batch_string(op); + gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); + gpr_free(str); + } + + grpc_core::MutexLock lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); - s->context = op->payload->context; + s->context = op_payload->context; s->traced = op->is_traced; if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str, + gpr_log(GPR_INFO, "perform_stream_op: %s; on_complete = %p", str, op->on_complete); gpr_free(str); if (op->send_initial_metadata) { @@ -1702,41 +1708,6 @@ static void perform_stream_op_locked(void* stream_op, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } - - GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); -} - -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, - grpc_transport_stream_op_batch* op) { - GPR_TIMER_SCOPE("perform_stream_op", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); - grpc_chttp2_stream* s = reinterpret_cast(gs); - - if (!t->is_client) { - if (op->send_initial_metadata) { - grpc_millis deadline = - op->payload->send_initial_metadata.send_initial_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - if (op->send_trailing_metadata) { - grpc_millis deadline = - op->payload->send_trailing_metadata.send_trailing_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - } - - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); - gpr_free(str); - } - - GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); - op->handler_private.extra_arg = gs; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, - op, grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1746,7 +1717,12 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + if (j == GRPC_CHTTP2_PCL_INITIATE) { + GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); + } else { + // TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too. + GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + } } GRPC_ERROR_UNREF(error); } @@ -1774,8 +1750,9 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + // TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too + GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1783,22 +1760,25 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { /* There is a ping in flight. Add yourself to the inflight closure list. */ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], - &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); + &t->finish_keepalive_ping, GRPC_ERROR_NONE); return; } grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], &t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], - &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); + &t->finish_keepalive_ping, GRPC_ERROR_NONE); } -static void retry_initiate_ping_locked(void* tp, grpc_error* error) { +static void retry_initiate_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - t->ping_state.is_delayed_ping_timer_set = false; - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + { + grpc_core::MutexLock lock(&t->mu); + t->ping_state.is_delayed_ping_timer_set = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); } void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { @@ -1809,6 +1789,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } + // TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); @@ -1846,12 +1827,14 @@ void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { } } -static void perform_transport_op_locked(void* stream_op, - grpc_error* error_ignored) { - grpc_transport_op* op = static_cast(stream_op); - grpc_chttp2_transport* t = - static_cast(op->handler_private.extra_arg); - +static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { + grpc_chttp2_transport* t = reinterpret_cast(gt); + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* msg = grpc_transport_op_string(op); + gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); + gpr_free(msg); + } + grpc_core::MutexLock lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1886,24 +1869,7 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); - - GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); -} - -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { - grpc_chttp2_transport* t = reinterpret_cast(gt); - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* msg = grpc_transport_op_string(op); - gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); - gpr_free(msg); - } - op->handler_private.extra_arg = gt; - GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, - perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } /******************************************************************************* @@ -2544,11 +2510,11 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { return error; } -static void read_action_locked(void* tp, grpc_error* error) { +static void read_action(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - + grpc_core::ReleasableMutexLock lock(&t->mu); GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2632,7 +2598,9 @@ static void read_action_locked(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } + lock.Unlock(); } else { + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2641,7 +2609,7 @@ static void read_action_locked(void* tp, grpc_error* error) { static void continue_read_action_locked(grpc_chttp2_transport* t) { const bool urgent = t->goaway_error != GRPC_ERROR_NONE; - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); } @@ -2649,11 +2617,13 @@ static void continue_read_action_locked(grpc_chttp2_transport* t) { // that kicks off finishes, it's unreffed static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { t->flow_control->bdp_estimator()->SchedulePing(); - send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping); } static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + // No need to take a lock. This closure will always be run while already + // holding the lock. if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2668,13 +2638,15 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { t->flow_control->bdp_estimator()->StartPing(); } -static void finish_bdp_ping_locked(void* tp, grpc_error* error) { +static void finish_bdp_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + grpc_core::ReleasableMutexLock lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2684,14 +2656,16 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) { GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, - &t->next_bdp_ping_timer_expired_locked); + &t->next_bdp_ping_timer_expired); } -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2765,31 +2739,35 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } -static void init_keepalive_ping_locked(void* arg, grpc_error* error) { +static void init_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - } else if (error == GRPC_ERROR_NONE) { - if (t->keepalive_permit_without_calls || - grpc_chttp2_stream_map_size(&t->stream_map) > 0) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - grpc_timer_init_unset(&t->keepalive_watchdog_timer); - send_keepalive_ping_locked(t); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); - } else { + { + grpc_core::MutexLock lock(&t->mu); + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + } else if (error == GRPC_ERROR_NONE) { + if (t->keepalive_permit_without_calls || + grpc_chttp2_stream_map_size(&t->stream_map) > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); + send_keepalive_ping_locked(t); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); + } + } else if (error == GRPC_ERROR_CANCELLED) { + /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } - } else if (error == GRPC_ERROR_CANCELLED) { - /* The keepalive ping timer may be cancelled by bdp */ - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2799,6 +2777,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + // No need to take a lock. This closure will always be run while already + // holding the lock. if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2808,46 +2788,52 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, - &t->keepalive_watchdog_fired_locked); + &t->keepalive_watchdog_fired); } -static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { +static void finish_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + } + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(&t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); } - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(&t->keepalive_watchdog_timer); - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); } } GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { +static void keepalive_watchdog_fired(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", - t->peer_string); - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE)); - } - } else { - /* The watchdog timer should have been cancelled by - * finish_keepalive_ping_locked. */ - if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { - gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", - t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", + t->peer_string); + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked( + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); + } + } else { + /* The watchdog timer should have been cancelled by + * finish_keepalive_ping. */ + if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } } } GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); @@ -2887,6 +2873,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); + grpc_core::MutexLock lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2916,30 +2903,23 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( stream->byte_stream_error = GRPC_ERROR_NONE; } -void Chttp2IncomingByteStream::OrphanLocked(void* arg, - grpc_error* error_ignored) { - Chttp2IncomingByteStream* bs = static_cast(arg); - grpc_chttp2_stream* s = bs->stream_; +void Chttp2IncomingByteStream::Orphan() { + GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); + grpc_chttp2_stream* s = stream_; grpc_chttp2_transport* t = s->t; - bs->Unref(); + grpc_core::MutexLock lock(&t->mu); + Unref(); s->pending_byte_stream = false; grpc_chttp2_maybe_complete_recv_message(t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } -void Chttp2IncomingByteStream::Orphan() { - GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); -} - +// TODO(yashykt) : Merge this with Next void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; + grpc_core::MutexLock lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2989,11 +2969,10 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, Ref(); next_action_.max_size_hint = max_size_hint; next_action_.on_complete = on_complete; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&next_action_.closure, - &Chttp2IncomingByteStream::NextLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, + this, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); return false; } } @@ -3106,7 +3085,7 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - false, &t->benign_reclaimer_locked); + false, &t->benign_reclaimer); } } @@ -3115,66 +3094,72 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - true, &t->destructive_reclaimer_locked); + true, &t->destructive_reclaimer); } } -static void benign_reclaimer_locked(void* arg, grpc_error* error) { +static void benign_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - if (error == GRPC_ERROR_NONE && - grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - /* Channel with no active streams: send a goaway to try and make it - * disconnect cleanly */ - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", - t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } + send_goaway( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + } else if (error == GRPC_ERROR_NONE && + GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); } - send_goaway(t, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - } else if (error == GRPC_ERROR_NONE && - GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, - "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR - " streams", - t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); - } - t->benign_reclaimer_registered = false; - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(void* arg, grpc_error* error) { +static void destructive_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - size_t n = grpc_chttp2_stream_map_size(&t->stream_map); - t->destructive_reclaimer_registered = false; - if (error == GRPC_ERROR_NONE && n > 0) { - grpc_chttp2_stream* s = static_cast( - grpc_chttp2_stream_map_rand(&t->stream_map)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, - s->id); - } - grpc_chttp2_cancel_stream( - t, s, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_ENHANCE_YOUR_CALM)); - if (n > 1) { - /* Since we cancel one stream per destructive reclamation, if - there are more streams left, we can immediately post a new - reclaimer in case the resource quota needs to free more - memory */ - post_destructive_reclaimer(t); - } - } - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); + { + grpc_core::MutexLock lock(&t->mu); + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream* s = static_cast( + grpc_chttp2_stream_map_rand(&t->stream_map)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + t, s, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); + } } GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } @@ -3274,5 +3259,5 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; - GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&t->read_action, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index a5142ffd96f..fb33c841428 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,11 +1669,15 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; - if (!s->write_closed) { - grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); - grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + { + grpc_core::MutexLock lock(&t->mu); + if (!s->write_closed) { + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); + grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + } } GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } @@ -1740,11 +1744,12 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, the stream. Wait until the combiner lock is ready to be released however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ + // TODO(yashykt) : When we were using combiners, we were using the + // finally version. Maybe do something similar? GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(force_client_rst_stream, s, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8ef26cd7981..6d13d368be7 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -39,6 +39,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" @@ -253,7 +254,6 @@ class Chttp2IncomingByteStream : public ByteStream { private: static void NextLocked(void* arg, grpc_error* error_ignored); - static void OrphanLocked(void* arg, grpc_error* error_ignored); void MaybeCreateStreamDecompressionCtx(); @@ -275,7 +275,6 @@ class Chttp2IncomingByteStream : public ByteStream { size_t max_size_hint; grpc_closure* on_complete; } next_action_; - grpc_closure destroy_action_; }; } // namespace grpc_core @@ -294,14 +293,13 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ + grpc_core::Mutex mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string; grpc_resource_user* resource_user; - grpc_combiner* combiner; - grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ @@ -327,11 +325,11 @@ struct grpc_chttp2_transport { /** maps stream id to grpc_chttp2_stream objects */ grpc_chttp2_stream_map stream_map; - grpc_closure write_action_begin_locked; + grpc_closure write_action_begin; grpc_closure write_action; - grpc_closure write_action_end_locked; + grpc_closure write_action_end; - grpc_closure read_action_locked; + grpc_closure read_action; /** incoming read bytes */ grpc_slice_buffer read_buffer; @@ -392,7 +390,7 @@ struct grpc_chttp2_transport { grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; uint64_t ping_ctr = 0; /* unique id for pings */ - grpc_closure retry_initiate_ping_locked; + grpc_closure retry_initiate_ping; /** ping acks */ size_t ping_ack_count = 0; @@ -442,9 +440,9 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb* write_cb_pool = nullptr; /* bdp estimator */ - grpc_closure next_bdp_ping_timer_expired_locked; + grpc_closure next_bdp_ping_timer_expired; grpc_closure start_bdp_ping_locked; - grpc_closure finish_bdp_ping_locked; + grpc_closure finish_bdp_ping; /* if non-NULL, close the transport with this error when writes are finished */ @@ -459,9 +457,9 @@ struct grpc_chttp2_transport { /** have we scheduled a destructive cleanup? */ bool destructive_reclaimer_registered = false; /** benign cleanup closure */ - grpc_closure benign_reclaimer_locked; + grpc_closure benign_reclaimer; /** destructive cleanup closure */ - grpc_closure destructive_reclaimer_locked; + grpc_closure destructive_reclaimer; /* next bdp ping timer */ bool have_next_bdp_ping_timer = false; @@ -469,13 +467,13 @@ struct grpc_chttp2_transport { /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ - grpc_closure init_keepalive_ping_locked; + grpc_closure init_keepalive_ping; /** Closure to run when the keepalive ping is sent */ grpc_closure start_keepalive_ping_locked; /** Cousure to run when the keepalive ping ack is received */ - grpc_closure finish_keepalive_ping_locked; + grpc_closure finish_keepalive_ping; /** Closrue to run when the keepalive ping timeouts */ - grpc_closure keepalive_watchdog_fired_locked; + grpc_closure keepalive_watchdog_fired; /** timer to initiate ping events */ grpc_timer keepalive_ping_timer; /** watchdog to kill the transport when waiting for the keepalive ping */ @@ -522,7 +520,6 @@ struct grpc_chttp2_stream { explicit Reffer(grpc_chttp2_stream* s); } reffer; - grpc_closure destroy_stream; grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; @@ -543,7 +540,7 @@ struct grpc_chttp2_stream { int64_t next_message_end_offset; int64_t flow_controlled_bytes_written = 0; int64_t flow_controlled_bytes_flowed = 0; - grpc_closure complete_fetch_locked; + grpc_closure complete_fetch; grpc_closure* fetching_send_message_finished = nullptr; grpc_metadata_batch* recv_initial_metadata; diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index d6d9e4521f6..4796dee4cf6 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -97,14 +97,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { t->ping_state.is_delayed_ping_timer_set = true; GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, - &t->retry_initiate_ping_locked); + &t->retry_initiate_ping); } return; } pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c7b2e8299b9..94667ae0a69 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -355,4 +355,43 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) { grpc_closure_list_sched(closure_list) #endif +#ifndef NDEBUG +inline void grpc_closure_list_run(const char* file, int line, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_run(grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; + GPR_ASSERT(c->cb != nullptr); +#endif + c->scheduler->vtable->run(c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + +/** Schedule all closures in a list to be run. Does not need to be run from a + * safe point. */ +#ifndef NDEBUG +#define GRPC_CLOSURE_LIST_RUN(closure_list) \ + grpc_closure_list_run(__FILE__, __LINE__, closure_list) +#else +#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list) +#endif + #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 721542544cd..c855d1535a9 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -465,6 +465,10 @@ void Executor::ShutdownAll() { bool Executor::IsThreaded(ExecutorType executor_type) { GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); + Executor* executor = executors[static_cast(executor_type)]; + if (executor == nullptr) { + return false; + } return executors[static_cast(executor_type)]->IsThreaded(); }