From c5b5840707279d36ebecdd3a2d6852c0470a7bad Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 2 Oct 2019 09:28:32 +0200 Subject: [PATCH] Revert "Chttp2combiner" --- .../chttp2/transport/chttp2_transport.cc | 739 +++++++++--------- .../chttp2/transport/hpack_parser.cc | 23 +- .../ext/transport/chttp2/transport/internal.h | 31 +- .../ext/transport/chttp2/transport/writing.cc | 4 +- src/core/lib/iomgr/closure.h | 39 - src/core/lib/iomgr/executor.cc | 4 - 6 files changed, 405 insertions(+), 435 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index acb3b4c2ddd..aeb7389ca21 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -102,14 +102,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, "chttp2_refcount"); /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin(void* t, grpc_error* error); +static void write_action_begin_locked(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error); -static void write_action_end(void* t, grpc_error* error); +static void write_action_end_locked(void* t, grpc_error* error); -static void read_action(void* t, grpc_error* error); +static void read_action_locked(void* t, grpc_error* error); static void continue_read_action_locked(grpc_chttp2_transport* t); -static void complete_fetch(void* gs, grpc_error* error); +static void complete_fetch_locked(void* gs, grpc_error* error); /** Set a transport level setting, and push it to our peer */ static void queue_setting_update(grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value); @@ -124,8 +124,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, const char* reason); -static void benign_reclaimer(void* t, grpc_error* error); -static void destructive_reclaimer(void* t, grpc_error* error); +static void benign_reclaimer_locked(void* t, grpc_error* error); +static void destructive_reclaimer_locked(void* t, grpc_error* error); static void post_benign_reclaimer(grpc_chttp2_transport* t); static void post_destructive_reclaimer(grpc_chttp2_transport* t); @@ -135,20 +135,20 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error); static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); static void start_bdp_ping_locked(void* tp, grpc_error* error); -static void finish_bdp_ping(void* tp, grpc_error* error); -static void next_bdp_ping_timer_expired(void* tp, grpc_error* error); +static void finish_bdp_ping_locked(void* tp, grpc_error* error); +static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_complete); -static void retry_initiate_ping(void* tp, grpc_error* error); +static void retry_initiate_ping_locked(void* tp, grpc_error* error); /** keepalive-relevant functions */ -static void init_keepalive_ping(void* arg, grpc_error* error); +static void init_keepalive_ping_locked(void* arg, grpc_error* error); static void start_keepalive_ping_locked(void* arg, grpc_error* error); -static void finish_keepalive_ping(void* arg, grpc_error* error); -static void keepalive_watchdog_fired(void* arg, grpc_error* error); +static void finish_keepalive_ping_locked(void* arg, grpc_error* error); +static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error); @@ -197,6 +197,8 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_stream_map_destroy(&stream_map); + GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); + cancel_pings(this, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); @@ -390,27 +392,33 @@ static bool read_channel_args(grpc_chttp2_transport* t, } static void init_transport_closures(grpc_chttp2_transport* t) { - GRPC_CLOSURE_INIT(&t->read_action, read_action, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->benign_reclaimer, benign_reclaimer, t, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->destructive_reclaimer, destructive_reclaimer, t, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->retry_initiate_ping, retry_initiate_ping, t, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, + t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->finish_bdp_ping, finish_bdp_ping, t, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired, - next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->init_keepalive_ping, init_keepalive_ping, t, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, + t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping, finish_keepalive_ping, t, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired, keepalive_watchdog_fired, t, - grpc_schedule_on_exec_ctx); + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner)); } static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { @@ -450,7 +458,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); + &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no inflight keeaplive timers */ @@ -465,6 +473,7 @@ grpc_chttp2_transport::grpc_chttp2_transport( ep(ep), peer_string(grpc_endpoint_get_peer(ep)), resource_user(resource_user), + combiner(grpc_combiner_create()), state_tracker(is_client ? "client_transport" : "server_transport", GRPC_CHANNEL_READY), is_client(is_client), @@ -548,20 +557,24 @@ grpc_chttp2_transport::grpc_chttp2_transport( post_benign_reclaimer(this); } -static void destroy_transport(grpc_transport* gt) { - grpc_chttp2_transport* t = reinterpret_cast(gt); - { - grpc_core::MutexLock lock(&t->mu); - t->destroying = 1; - close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - } +static void destroy_transport_locked(void* tp, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(tp); + t->destroying = 1; + close_transport_locked( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } +static void destroy_transport(grpc_transport* gt) { + grpc_chttp2_transport* t = reinterpret_cast(gt); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, + grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); +} + static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error) { end_all_the_calls(t, GRPC_ERROR_REF(error)); @@ -671,67 +684,64 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - GRPC_CLOSURE_INIT(&complete_fetch, ::complete_fetch, this, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, + grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(t->combiner)); } grpc_chttp2_stream::~grpc_chttp2_stream() { - { - grpc_core::MutexLock lock(&t->mu); - if (t->channelz_socket != nullptr) { - if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { - t->channelz_socket->RecordStreamSucceeded(); - } else { - t->channelz_socket->RecordStreamFailed(); - } + if (t->channelz_socket != nullptr) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); } + } - GPR_ASSERT((write_closed && read_closed) || id == 0); - if (id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); - } + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); + } - grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&frame_storage); - if (stream_compression_method != - GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - } - if (stream_decompression_method != - GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } - grpc_chttp2_list_remove_stalled_by_transport(t, this); - grpc_chttp2_list_remove_stalled_by_stream(t, this); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); - for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(included[i])) { - gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", id, i); - abort(); - } + for (int i = 0; i < STREAM_LIST_COUNT; i++) { + if (GPR_UNLIKELY(included[i])) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->is_client ? "client" : "server", id, i); + abort(); } + } - GPR_ASSERT(send_initial_metadata_finished == nullptr); - GPR_ASSERT(fetching_send_message == nullptr); - GPR_ASSERT(send_trailing_metadata_finished == nullptr); - GPR_ASSERT(recv_initial_metadata_ready == nullptr); - GPR_ASSERT(recv_message_ready == nullptr); - GPR_ASSERT(recv_trailing_metadata_finished == nullptr); - grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); - GRPC_ERROR_UNREF(read_closed_error); - GRPC_ERROR_UNREF(write_closed_error); - GRPC_ERROR_UNREF(byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); - flow_control.Destroy(); + flow_control.Destroy(); - if (t->resource_user != nullptr) { - grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); - } + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); } + GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -745,9 +755,16 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, return 0; } +static void destroy_stream_locked(void* sp, grpc_error* error) { + GPR_TIMER_SCOPE("destroy_stream", 0); + grpc_chttp2_stream* s = static_cast(sp); + s->~grpc_chttp2_stream(); +} + static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_SCOPE("destroy_stream", 0); + grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); if (s->stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && @@ -763,7 +780,10 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, } s->destroy_stream_arg = then_schedule_closure; - s->~grpc_chttp2_stream(); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, + grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); } grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, @@ -908,29 +928,26 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - // TODO(yashykt) : When we were using combiners, we were using the finally - // version, so that the write action would happen after we were done - // queueing up all the writes that we wanted. Maybe do something similar? - // Keeping the earlier comment for posterity - - /* Note that the 'write_action_begin' closure is being scheduled + /* Note that the 'write_action_begin_locked' closure is being scheduled * on the 'finally_scheduler' of t->combiner. This means that - * 'write_action_begin' is called only *after* all the other + * 'write_action_begin_locked' is called only *after* all the other * closures (some of which are potentially initiating more writes on the * transport) are executed on the t->combiner. * * The reason for scheduling on finally_scheduler is to make sure we batch - * as many writes as possible. 'write_action_begin' is the function + * as many writes as possible. 'write_action_begin_locked' is the function * that gathers all the relevant bytes (which are at various places in the * grpc_chttp2_transport structure) and append them to 'outbuf' field in * grpc_chttp2_transport thereby batching what would have been potentially * multiple write operations. * - * Also, 'write_action_begin' only gathers the bytes into outbuf. + * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the - * 'write_action' (which is scheduled by 'write_action_begin') */ + * 'write_action' (which is scheduled by 'write_action_begin_locked') */ GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, - grpc_schedule_on_exec_ctx), + GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: @@ -996,10 +1013,9 @@ static const char* begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin(void* gt, grpc_error* error_ignored) { - GPR_TIMER_SCOPE("write_action_begin", 0); +static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { + GPR_TIMER_SCOPE("write_action_begin_locked", 0); grpc_chttp2_transport* t = static_cast(gt); - grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1040,11 +1056,9 @@ static void write_action_begin(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } - lock.Unlock(); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); - lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1054,67 +1068,63 @@ static void write_action(void* gt, grpc_error* error) { grpc_chttp2_transport* t = static_cast(gt); void* cl = t->cl; t->cl = nullptr; - grpc_endpoint_write(t->ep, &t->outbuf, - GRPC_CLOSURE_INIT(&t->write_action_end, write_action_end, - t, grpc_schedule_on_exec_ctx), - cl); + grpc_endpoint_write( + t->ep, &t->outbuf, + GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, + grpc_combiner_scheduler(t->combiner)), + cl); } /* Callback from the grpc_endpoint after bytes have been written by calling * sendmsg */ -static void write_action_end(void* tp, grpc_error* error) { +static void write_action_end_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); - { - grpc_core::MutexLock lock(&t->mu); - bool closed = false; - if (error != GRPC_ERROR_NONE) { - close_transport_locked(t, GRPC_ERROR_REF(error)); - closed = true; - } - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; - closed = true; - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - close_transport_locked( - t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); - } - } + bool closed = false; + if (error != GRPC_ERROR_NONE) { + close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; + } - switch (t->write_state) { - case GRPC_CHTTP2_WRITE_STATE_IDLE: - GPR_UNREACHABLE_CODE(break); - case GRPC_CHTTP2_WRITE_STATE_WRITING: - GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); - t->is_first_write_in_batch = false; - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - // If the transport is closed, we will retry writing on the endpoint - // and next write may contain part of the currently serialized frames. - // So, we should only call the run_after_write callbacks when the next - // write finishes, or the callbacks will be invoked when the stream is - // closed. - if (!closed) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); - } - // TODO(yashykt) : When we were using combiners, we were using the - // finally version, so that the write action would happen after we were - // done queueing up all the writes that we wanted. Maybe do something - // similar? - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - break; + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked( + t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); } + } - grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + switch (t->write_state) { + case GRPC_CHTTP2_WRITE_STATE_IDLE: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITE_STATE_WRITING: + GPR_TIMER_MARK("state=writing", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); + t->is_first_write_in_batch = false; + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } + GRPC_CLOSURE_RUN( + GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner)), + GRPC_ERROR_NONE); + break; } + + grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1360,7 +1370,8 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } s->fetching_send_message.reset(); return; /* early out */ - } else if (s->fetching_send_message->Next(UINT32_MAX, &s->complete_fetch)) { + } else if (s->fetching_send_message->Next(UINT32_MAX, + &s->complete_fetch_locked)) { grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { s->fetching_send_message.reset(); @@ -1372,10 +1383,9 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } } -static void complete_fetch(void* gs, grpc_error* error) { +static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; - grpc_core::MutexLock lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1402,40 +1412,24 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, } } -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, - grpc_transport_stream_op_batch* op) { - GPR_TIMER_SCOPE("perform_stream_op", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); - grpc_chttp2_stream* s = reinterpret_cast(gs); - grpc_transport_stream_op_batch_payload* op_payload = op->payload; - - if (!t->is_client) { - if (op->send_initial_metadata) { - grpc_millis deadline = - op_payload->send_initial_metadata.send_initial_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - if (op->send_trailing_metadata) { - grpc_millis deadline = - op_payload->send_trailing_metadata.send_trailing_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - } +static void perform_stream_op_locked(void* stream_op, + grpc_error* error_ignored) { + GPR_TIMER_SCOPE("perform_stream_op_locked", 0); - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); - gpr_free(str); - } + grpc_transport_stream_op_batch* op = + static_cast(stream_op); + grpc_chttp2_stream* s = + static_cast(op->handler_private.extra_arg); + grpc_transport_stream_op_batch_payload* op_payload = op->payload; + grpc_chttp2_transport* t = s->t; - grpc_core::MutexLock lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); - s->context = op_payload->context; + s->context = op->payload->context; s->traced = op->is_traced; if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op: %s; on_complete = %p", str, + gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); if (op->send_initial_metadata) { @@ -1708,6 +1702,41 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } + + GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); +} + +static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, + grpc_transport_stream_op_batch* op) { + GPR_TIMER_SCOPE("perform_stream_op", 0); + grpc_chttp2_transport* t = reinterpret_cast(gt); + grpc_chttp2_stream* s = reinterpret_cast(gs); + + if (!t->is_client) { + if (op->send_initial_metadata) { + grpc_millis deadline = + op->payload->send_initial_metadata.send_initial_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + if (op->send_trailing_metadata) { + grpc_millis deadline = + op->payload->send_trailing_metadata.send_trailing_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + } + + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* str = grpc_transport_stream_op_batch_string(op); + gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); + gpr_free(str); + } + + GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); + op->handler_private.extra_arg = gs; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, + op, grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1717,12 +1746,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - if (j == GRPC_CHTTP2_PCL_INITIATE) { - GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); - } else { - // TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too. - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); - } + GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); } GRPC_ERROR_UNREF(error); } @@ -1750,9 +1774,8 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); - // TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1760,25 +1783,22 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { /* There is a ping in flight. Add yourself to the inflight closure list. */ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], - &t->finish_keepalive_ping, GRPC_ERROR_NONE); + &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); return; } grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], &t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], - &t->finish_keepalive_ping, GRPC_ERROR_NONE); + &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); } -static void retry_initiate_ping(void* tp, grpc_error* error) { +static void retry_initiate_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - { - grpc_core::MutexLock lock(&t->mu); - t->ping_state.is_delayed_ping_timer_set = false; - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); - } + t->ping_state.is_delayed_ping_timer_set = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); } void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { @@ -1789,7 +1809,6 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } - // TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); @@ -1827,14 +1846,12 @@ void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { } } -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { - grpc_chttp2_transport* t = reinterpret_cast(gt); - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* msg = grpc_transport_op_string(op); - gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); - gpr_free(msg); - } - grpc_core::MutexLock lock(&t->mu); +static void perform_transport_op_locked(void* stream_op, + grpc_error* error_ignored) { + grpc_transport_op* op = static_cast(stream_op); + grpc_chttp2_transport* t = + static_cast(op->handler_private.extra_arg); + if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1869,7 +1886,24 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { close_transport_locked(t, op->disconnect_with_error); } - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); + + GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); +} + +static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { + grpc_chttp2_transport* t = reinterpret_cast(gt); + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* msg = grpc_transport_op_string(op); + gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); + gpr_free(msg); + } + op->handler_private.extra_arg = gt; + GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, + perform_transport_op_locked, op, + grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -2510,11 +2544,11 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { return error; } -static void read_action(void* tp, grpc_error* error) { +static void read_action_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - grpc_core::ReleasableMutexLock lock(&t->mu); + GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2598,9 +2632,7 @@ static void read_action(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } - lock.Unlock(); } else { - lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2609,7 +2641,7 @@ static void read_action(void* tp, grpc_error* error) { static void continue_read_action_locked(grpc_chttp2_transport* t) { const bool urgent = t->goaway_error != GRPC_ERROR_NONE; - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action, urgent); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); } @@ -2617,13 +2649,11 @@ static void continue_read_action_locked(grpc_chttp2_transport* t) { // that kicks off finishes, it's unreffed static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { t->flow_control->bdp_estimator()->SchedulePing(); - send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping); + send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - // No need to take a lock. This closure will always be run while already - // holding the lock. if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2638,15 +2668,13 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { t->flow_control->bdp_estimator()->StartPing(); } -static void finish_bdp_ping(void* tp, grpc_error* error) { +static void finish_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - grpc_core::ReleasableMutexLock lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { - lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2656,16 +2684,14 @@ static void finish_bdp_ping(void* tp, grpc_error* error) { GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, - &t->next_bdp_ping_timer_expired); + &t->next_bdp_ping_timer_expired_locked); } -static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { +static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { - lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2739,35 +2765,31 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } -static void init_keepalive_ping(void* arg, grpc_error* error) { +static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - { - grpc_core::MutexLock lock(&t->mu); - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - } else if (error == GRPC_ERROR_NONE) { - if (t->keepalive_permit_without_calls || - grpc_chttp2_stream_map_size(&t->stream_map) > 0) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - grpc_timer_init_unset(&t->keepalive_watchdog_timer); - send_keepalive_ping_locked(t); - grpc_chttp2_initiate_write(t, - GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); - } else { - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); - } - } else if (error == GRPC_ERROR_CANCELLED) { - /* The keepalive ping timer may be cancelled by bdp */ + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + } else if (error == GRPC_ERROR_NONE) { + if (t->keepalive_permit_without_calls || + grpc_chttp2_stream_map_size(&t->stream_map) > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); + send_keepalive_ping_locked(t); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); + &t->init_keepalive_ping_locked); } + } else if (error == GRPC_ERROR_CANCELLED) { + /* The keepalive ping timer may be cancelled by bdp */ + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2777,8 +2799,6 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } - // No need to take a lock. This closure will always be run while already - // holding the lock. if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2788,52 +2808,46 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, - &t->keepalive_watchdog_fired); + &t->keepalive_watchdog_fired_locked); } -static void finish_keepalive_ping(void* arg, grpc_error* error) { +static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - { - grpc_core::MutexLock lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); - } - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(&t->keepalive_watchdog_timer); - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); } + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(&t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); } } GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } -static void keepalive_watchdog_fired(void* arg, grpc_error* error) { +static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - { - grpc_core::MutexLock lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", - t->peer_string); - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE)); - } - } else { - /* The watchdog timer should have been cancelled by - * finish_keepalive_ping. */ - if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { - gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", - t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); - } + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", + t->peer_string); + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked( + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); + } + } else { + /* The watchdog timer should have been cancelled by + * finish_keepalive_ping_locked. */ + if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } } GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); @@ -2873,7 +2887,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); - grpc_core::MutexLock lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2903,23 +2916,30 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( stream->byte_stream_error = GRPC_ERROR_NONE; } -void Chttp2IncomingByteStream::Orphan() { - GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_stream* s = stream_; +void Chttp2IncomingByteStream::OrphanLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast(arg); + grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_transport* t = s->t; - grpc_core::MutexLock lock(&t->mu); - Unref(); + bs->Unref(); s->pending_byte_stream = false; grpc_chttp2_maybe_complete_recv_message(t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } -// TODO(yashykt) : Merge this with Next +void Chttp2IncomingByteStream::Orphan() { + GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, this, + grpc_combiner_scheduler(transport_->combiner)), + GRPC_ERROR_NONE); +} + void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; - grpc_core::MutexLock lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2969,10 +2989,11 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, Ref(); next_action_.max_size_hint = max_size_hint; next_action_.on_complete = on_complete; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&next_action_.closure, - &Chttp2IncomingByteStream::NextLocked, - this, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, this, + grpc_combiner_scheduler(transport_->combiner)), + GRPC_ERROR_NONE); return false; } } @@ -3085,7 +3106,7 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - false, &t->benign_reclaimer); + false, &t->benign_reclaimer_locked); } } @@ -3094,72 +3115,66 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - true, &t->destructive_reclaimer); + true, &t->destructive_reclaimer_locked); } } -static void benign_reclaimer(void* arg, grpc_error* error) { +static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - { - grpc_core::MutexLock lock(&t->mu); - if (error == GRPC_ERROR_NONE && - grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - /* Channel with no active streams: send a goaway to try and make it - * disconnect cleanly */ - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", - t->peer_string); - } - send_goaway( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - } else if (error == GRPC_ERROR_NONE && - GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, - "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR - " streams", - t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); - } - t->benign_reclaimer_registered = false; - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", + t->peer_string); } + send_goaway(t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + } else if (error == GRPC_ERROR_NONE && + GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } -static void destructive_reclaimer(void* arg, grpc_error* error) { +static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - { - grpc_core::MutexLock lock(&t->mu); - size_t n = grpc_chttp2_stream_map_size(&t->stream_map); - t->destructive_reclaimer_registered = false; - if (error == GRPC_ERROR_NONE && n > 0) { - grpc_chttp2_stream* s = static_cast( - grpc_chttp2_stream_map_rand(&t->stream_map)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, - s->id); - } - grpc_chttp2_cancel_stream( - t, s, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - if (n > 1) { - /* Since we cancel one stream per destructive reclamation, if - there are more streams left, we can immediately post a new - reclaimer in case the resource quota needs to free more - memory */ - post_destructive_reclaimer(t); - } - } - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); - } + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream* s = static_cast( + grpc_chttp2_stream_map_rand(&t->stream_map)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + t, s, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_HTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } @@ -3259,5 +3274,5 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; - GRPC_CLOSURE_SCHED(&t->read_action, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index fb33c841428..a5142ffd96f 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,15 +1669,11 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; - { - grpc_core::MutexLock lock(&t->mu); - if (!s->write_closed) { - grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing); - grpc_chttp2_initiate_write(t, - GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); - grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); - } + if (!s->write_closed) { + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); + grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } @@ -1744,12 +1740,11 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, the stream. Wait until the combiner lock is ready to be released however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ - // TODO(yashykt) : When we were using combiners, we were using the - // finally version. Maybe do something similar? GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(force_client_rst_stream, s, + grpc_combiner_finally_scheduler(t->combiner)), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6d13d368be7..8ef26cd7981 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -39,7 +39,6 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" @@ -254,6 +253,7 @@ class Chttp2IncomingByteStream : public ByteStream { private: static void NextLocked(void* arg, grpc_error* error_ignored); + static void OrphanLocked(void* arg, grpc_error* error_ignored); void MaybeCreateStreamDecompressionCtx(); @@ -275,6 +275,7 @@ class Chttp2IncomingByteStream : public ByteStream { size_t max_size_hint; grpc_closure* on_complete; } next_action_; + grpc_closure destroy_action_; }; } // namespace grpc_core @@ -293,13 +294,14 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ - grpc_core::Mutex mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string; grpc_resource_user* resource_user; + grpc_combiner* combiner; + grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ @@ -325,11 +327,11 @@ struct grpc_chttp2_transport { /** maps stream id to grpc_chttp2_stream objects */ grpc_chttp2_stream_map stream_map; - grpc_closure write_action_begin; + grpc_closure write_action_begin_locked; grpc_closure write_action; - grpc_closure write_action_end; + grpc_closure write_action_end_locked; - grpc_closure read_action; + grpc_closure read_action_locked; /** incoming read bytes */ grpc_slice_buffer read_buffer; @@ -390,7 +392,7 @@ struct grpc_chttp2_transport { grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; uint64_t ping_ctr = 0; /* unique id for pings */ - grpc_closure retry_initiate_ping; + grpc_closure retry_initiate_ping_locked; /** ping acks */ size_t ping_ack_count = 0; @@ -440,9 +442,9 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb* write_cb_pool = nullptr; /* bdp estimator */ - grpc_closure next_bdp_ping_timer_expired; + grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure start_bdp_ping_locked; - grpc_closure finish_bdp_ping; + grpc_closure finish_bdp_ping_locked; /* if non-NULL, close the transport with this error when writes are finished */ @@ -457,9 +459,9 @@ struct grpc_chttp2_transport { /** have we scheduled a destructive cleanup? */ bool destructive_reclaimer_registered = false; /** benign cleanup closure */ - grpc_closure benign_reclaimer; + grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ - grpc_closure destructive_reclaimer; + grpc_closure destructive_reclaimer_locked; /* next bdp ping timer */ bool have_next_bdp_ping_timer = false; @@ -467,13 +469,13 @@ struct grpc_chttp2_transport { /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ - grpc_closure init_keepalive_ping; + grpc_closure init_keepalive_ping_locked; /** Closure to run when the keepalive ping is sent */ grpc_closure start_keepalive_ping_locked; /** Cousure to run when the keepalive ping ack is received */ - grpc_closure finish_keepalive_ping; + grpc_closure finish_keepalive_ping_locked; /** Closrue to run when the keepalive ping timeouts */ - grpc_closure keepalive_watchdog_fired; + grpc_closure keepalive_watchdog_fired_locked; /** timer to initiate ping events */ grpc_timer keepalive_ping_timer; /** watchdog to kill the transport when waiting for the keepalive ping */ @@ -520,6 +522,7 @@ struct grpc_chttp2_stream { explicit Reffer(grpc_chttp2_stream* s); } reffer; + grpc_closure destroy_stream; grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; @@ -540,7 +543,7 @@ struct grpc_chttp2_stream { int64_t next_message_end_offset; int64_t flow_controlled_bytes_written = 0; int64_t flow_controlled_bytes_flowed = 0; - grpc_closure complete_fetch; + grpc_closure complete_fetch_locked; grpc_closure* fetching_send_message_finished = nullptr; grpc_metadata_batch* recv_initial_metadata; diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 4796dee4cf6..d6d9e4521f6 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -97,14 +97,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { t->ping_state.is_delayed_ping_timer_set = true; GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, - &t->retry_initiate_ping); + &t->retry_initiate_ping_locked); } return; } pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 94667ae0a69..c7b2e8299b9 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -355,43 +355,4 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) { grpc_closure_list_sched(closure_list) #endif -#ifndef NDEBUG -inline void grpc_closure_list_run(const char* file, int line, - grpc_closure_list* list) { -#else -inline void grpc_closure_list_run(grpc_closure_list* list) { -#endif - grpc_closure* c = list->head; - while (c != nullptr) { - grpc_closure* next = c->next_data.next; -#ifndef NDEBUG - if (c->scheduled) { - gpr_log(GPR_ERROR, - "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", - c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); - abort(); - } - c->scheduled = true; - c->file_initiated = file; - c->line_initiated = line; - c->run = false; - GPR_ASSERT(c->cb != nullptr); -#endif - c->scheduler->vtable->run(c, c->error_data.error); - c = next; - } - list->head = list->tail = nullptr; -} - -/** Schedule all closures in a list to be run. Does not need to be run from a - * safe point. */ -#ifndef NDEBUG -#define GRPC_CLOSURE_LIST_RUN(closure_list) \ - grpc_closure_list_run(__FILE__, __LINE__, closure_list) -#else -#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list) -#endif - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index c855d1535a9..721542544cd 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -465,10 +465,6 @@ void Executor::ShutdownAll() { bool Executor::IsThreaded(ExecutorType executor_type) { GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); - Executor* executor = executors[static_cast(executor_type)]; - if (executor == nullptr) { - return false; - } return executors[static_cast(executor_type)]->IsThreaded(); }